搭建生信分析流水線,如工廠一樣24小時運轉(zhuǎn)Snakemake——進(jìn)階命令

小云上次介紹了分析流程管理工具snakemake,以及講解了他的基礎(chǔ)用法,并搭建了一個基礎(chǔ)的分析流程。在此基礎(chǔ)上,小云繼續(xù)帶大家學(xué)習(xí)他強大的進(jìn)階命令,完善示例流程。
?
這些命令包括:
指定流程的線程數(shù)及CPU內(nèi)核數(shù)。
為流程添加配置文件。
引入輸入文件的函數(shù)。
添加規(guī)則參數(shù)。
為流程添加日志。
設(shè)置輸出文件為臨時文件或者保護(hù)文件
1.?指定流程線程數(shù)
對于生信的某些工具,小云建議使用多個線程以加快計算速度。該如何實現(xiàn)呢?我們可以通過threads指令讓Snakemake知道規(guī)則需要的線程。例如:
rule bwa_map:
????input:
????????"data/genome.fa",
????????"data/samples/{sample}.fastq"
????output:
????????"mapped_reads/{sample}.bam"
????threads: 8
????shell:
????????"bwa mem -t {threads}?{input}?| samtools view -Sb - > {output}"
添加一行命令,就可以指定rule bwa_map使用8個線程。
值得一提的是snakemake會確保同時運行的所有作業(yè)的線程總數(shù),不超過給定數(shù)量的CPU內(nèi)核。
我們在執(zhí)行snakemake工作流程時,使用--cores參數(shù),指定使用的CPU內(nèi)核數(shù)量。
snakemake --cores 10
比如這條命令,就會用10個CPU內(nèi)核執(zhí)行該工作流。
rule bwa_map指定了8個線程,那么snakemake就會用剩下兩個內(nèi)核去執(zhí)行其他的任務(wù),例如samtools_sort。
當(dāng)提供的內(nèi)核少于線程時,規(guī)則使用的線程數(shù)將減少到給定內(nèi)核數(shù)。
如果給出的--cores沒有數(shù)字,則使用所有可用的cores。
2.設(shè)置配置文件
AMPLES =?["A", "B"]
rule bcftools_call:
????input:
????????fa="data/genome.fa",
????????bam=expand("sorted_reads/{sample}.bam", sample=SAMPLES),
????????bai=expand("sorted_reads/{sample}.bam.bai", sample=SAMPLES)
????output:
????????"calls/all.vcf"
????shell:
????????"bcftools mpileup -f {input.fa}?{input.bam}?| "
????????"bcftools call -mv - > {output}"
上期講解中我們使用SAMPLES列表,來匹配所有我們需要處理的文件。A,B。
但是,當(dāng)新的數(shù)據(jù)來了需要我們的流程分析時,需要手動地更改,難以適應(yīng)新的數(shù)據(jù)。
Snakemake考慮到了這種情況,它提供了一種配置文件機(jī)制。配置文件可以用JSON或YAML編寫,并與指令一起使用。
?
configfile: "config.yaml"
?
我們只需要在工作流的頂端添加這一行命令。Snakemake將加載配置文件,并將其內(nèi)容存儲到名為的全局可用字典中。
我們創(chuàng)建一個config.yaml文件,編寫samples的文件路徑
samples:????
A:?data/samples/A.fastq????
B:?data/samples/B.fastq
現(xiàn)在我們就可以去掉SAMPLES列表,并將rule改為
rule bcftools_call:
????input:
????????fa="data/genome.fa",
????????bam=expand("sorted_reads/{sample}.bam", sample=config["samples"]),
????????bai=expand("sorted_reads/{sample}.bam.bai", sample=config["samples"])
????output:
????????"calls/all.vcf"
????shell:
????????"bcftools mpileup -f {input.fa}?{input.bam}?| "
????????"bcftools call -mv - > {output}"
3.?定義輸入函數(shù)
我們已經(jīng)將sample文件的路徑存儲在配置文件中,因此我們還可以定義一個函數(shù)來為使用這些路徑。
?
Snakemake 工作流的執(zhí)行分為三個階段。分別是
1.?在初始化階段,將解析定義工作流的文件,并實例化所有規(guī)則。
2.?在DAG階段,通過填充通配符并將輸入文件與輸出文件匹配,可以構(gòu)建所有作業(yè)的有向無循環(huán)依賴圖。
3.?在調(diào)度階段,執(zhí)行作業(yè)的DAG,根據(jù)可用資源啟動作業(yè)。
?
在初始化階段,輸入文件的函數(shù)會被執(zhí)行,但是在這個階段作業(yè)、通配符值和規(guī)則依賴關(guān)系還不知道。
我們需要將輸入文件的確定推遲到DAG階段。這可以通過在輸入指令中指定輸入函數(shù)而不是字符串來實現(xiàn)。對于規(guī)則,其工作原理如下:
?
def?get_bwa_map_input_fastqs(wildcards):
????return?config["samples"][wildcards.sample]
定義一個獲取輸入文件路徑的函數(shù)。
rule bwa_map:
????input:
????????"data/genome.fa",
????????get_bwa_map_input_fastqs?#調(diào)用輸入函數(shù)
????output:
????????"mapped_reads/{sample}.bam"
????threads: 8
????shell:
????????"bwa mem -t {threads}?{input}?| samtools view -Sb - > {output}"
?
這個函數(shù)會返回輸入文件的路徑,[wildcards.sample]通過這個命令可以返回文件
4.?設(shè)置規(guī)則的參數(shù)
有時,shell命令不僅僅由輸入和輸出文件以及一些靜態(tài)標(biāo)志組成。有時候可能需要根據(jù)作業(yè)的通配符值設(shè)置其他參數(shù)。Snakemake允許使用指令params為規(guī)則定義任意參數(shù)。例如
?
rule bwa_map:
????input:
????????"data/genome.fa",
????????get_bwa_map_input_fastqs
????output:
????????"mapped_reads/{sample}.bam"
????params:
????????rg=r"@RG\tID:{sample}\tSM:{sample}"
????threads: 8
????shell:
????????"bwa mem -R '{params.rg}' -t {threads}?{input}?| samtools view -Sb - > {output}"
?
突變分析會考慮很多參數(shù)。一個特別重要的是先前的突變率(默認(rèn)為1e-3)。
?
我們可以向配置文件中添加一個key并使用規(guī)則中的指令,將其調(diào)用到shell命令來配置這個key。
5.?日志
在執(zhí)行一個大型工作流時,通常需要將每個作業(yè)的日志記錄輸出存儲到一個單獨的文件中,而不是在多個作業(yè)并行運行時將所有日志記錄輸出打印到終端,這會導(dǎo)致輸出混亂。為此,Snakemake允許為規(guī)則指定日志文件??梢酝ㄟ^添加log命令來完成。
rule bwa_map:
????input:
????????"data/genome.fa",
????????get_bwa_map_input_fastqs
????output:
????????"mapped_reads/{sample}.bam"
????params:
????????rg=r"@RG\tID:{sample}\tSM:{sample}"
????log:
????????"logs/bwa_mem/{sample}.log"
????threads: 8
????shell:
????????"(bwa mem -R '{params.rg}' -t {threads}?{input}?| "
????????"samtools view -Sb - > {output}) 2> {log}"
?
執(zhí)行shell命令時,可以通過管道傳輸?shù)饺罩疚募小?/p>
6.?臨時文件和保護(hù)文件
當(dāng)我們運行一個大型工作流時,會產(chǎn)生許多中間文件,這些文件會占用大量的磁盤空間,并且也需要耗費計算資源和時間。為了避免這種情況可以將輸出文件設(shè)置為臨時文件,一旦需要它的作業(yè)執(zhí)行完畢,就會刪除這個臨時文件,避免磁盤占用過多。
?
rule bwa_map:
????input:
????????"data/genome.fa",
????????get_bwa_map_input_fastqs
????output:
????????temp("mapped_reads/{sample}.bam")
????params:
????????rg=r"@RG\tID:{sample}\tSM:{sample}"
????log:
????????"logs/bwa_mem/{sample}.log"
????threads: 8
????shell:
????????"(bwa mem -R '{params.rg}' -t {threads}?{input}?| "
????????"samtools view -Sb - > {output}) 2> {log}"
有的時候,我們已經(jīng)執(zhí)行完了工作流程,得到了想要的結(jié)果,不希望結(jié)果遭到篡改。我們可以將結(jié)果問價你設(shè)置為保護(hù)文件。Snakemake會對文件系統(tǒng)中的輸出文件進(jìn)行寫保護(hù),這樣就不會意外地覆蓋或刪除它。
流程總結(jié)
創(chuàng)建配置文件
samples:????
A:?data/samples/A.fastq????
B:?data/samples/B.fastq
prior_mutation_rate:?0.001
Snakemake工作流程文件Snakefile
?
configfile: "config.yaml"?#指定配置文件
?
rule all:
????input:
????????"plots/quals.svg"
?
def?get_bwa_map_input_fastqs(wildcards):?#定義輸入函數(shù)
????return?config["samples"][wildcards.sample]
?
rule bwa_map:
????input:
????????"data/genome.fa",
????????get_bwa_map_input_fastqs
????output:
????????temp("mapped_reads/{sample}.bam")#設(shè)置臨時文件
????params:
????????rg=r"@RG\tID:{sample}\tSM:{sample}"
????log:
????????"logs/bwa_mem/{sample}.log" #輸出日志
????threads: 8 #設(shè)定線程
????shell:
????????"(bwa mem -R '{params.rg}' -t {threads}?{input}?| "
????????"samtools view -Sb - > {output}) 2> {log}"
?
rule samtools_sort:
????input:
????????"mapped_reads/{sample}.bam"
????output:
????????protected("sorted_reads/{sample}.bam")
????shell:
????????"samtools sort -T sorted_reads/{wildcards.sample}?"
????????"-O bam {input}?> {output}"
?
rule samtools_index:
????input:
????????"sorted_reads/{sample}.bam"
????output:
????????"sorted_reads/{sample}.bam.bai"
????shell:
????????"samtools index {input}"
?
rule bcftools_call:
????input:
????????fa="data/genome.fa",
????????bam=expand("sorted_reads/{sample}.bam", sample=config["samples"]),
????????bai=expand("sorted_reads/{sample}.bam.bai", sample=config["samples"])
????output:
????????"calls/all.vcf"
????params:
????????rate=config["prior_mutation_rate"]
????log:
????????"logs/bcftools_call/all.log"
????shell:
????????"(bcftools mpileup -f {input.fa}?{input.bam}?| "
????????"bcftools call -mv -P {params.rate}?- > {output}) 2> {log}"
?
rule plot_quals:
????input:
????????"calls/all.vcf"
????output:
????????"plots/quals.svg"
????script:
????????"scripts/plot-quals.py"
?
好了小云今天的講解就到這里了,下期再見~
?