diff --git a/codes/cromwell.examples.conf b/codes/cromwell.examples.conf index 98822b9..e164a50 100755 --- a/codes/cromwell.examples.conf +++ b/codes/cromwell.examples.conf @@ -42,19 +42,19 @@ system { graceful-server-shutdown = true # Cromwell will cap the number of running workflows at N - max-concurrent-workflows = 10000 + max-concurrent-workflows = 5000 # Cromwell will launch up to N submitted workflows at a time, regardless of how many open workflow slots exist - max-workflow-launch-count = 1000 + max-workflow-launch-count = 50 # Number of seconds between workflow launches - new-workflow-poll-rate = 20 + new-workflow-poll-rate = 10 # Since the WorkflowLogCopyRouter is initialized in code, this is the number of workers - #number-of-workflow-log-copy-workers = 10 + number-of-workflow-log-copy-workers = 10 # Default number of cache read workers - #number-of-cache-read-workers = 25 + number-of-cache-read-workers = 25 io { # throttle { @@ -65,7 +65,7 @@ system { # } # Number of times an I/O operation should be attempted before giving up and failing it. - number-of-attempts = 5 + #number-of-attempts = 5 } # Maximum number of input file bytes allowed in order to read each type. @@ -95,7 +95,7 @@ system { # These are the default values in Cromwell, in most circumstances there should not be a need to change them. # How frequently Cromwell should scan for aborts. - scan-frequency: 30 seconds + scan-frequency: 2 seconds # The cache of in-progress aborts. Cromwell will add entries to this cache once a WorkflowActor has been messaged to abort. # If on the next scan an 'Aborting' status is found for a workflow that has an entry in this cache, Cromwell will not ask @@ -103,16 +103,16 @@ system { cache { enabled: true # Guava cache concurrency. - concurrency: 10 + concurrency: 4 # How long entries in the cache should live from the time they are added to the cache. - ttl: 10 minutes + ttl: 20 minutes # Maximum number of entries in the cache. - size: 200000 + size: 100000 } } # Cromwell reads this value into the JVM's `networkaddress.cache.ttl` setting to control DNS cache expiration - dns-cache-ttl: 30 minutes + dns-cache-ttl: 3 minutes } workflow-options { @@ -140,7 +140,7 @@ workflow-options { workflow-type-version: "draft-2" # To set a default hog group rather than defaulting to workflow ID: - hogGroup: "static" + #hogGroup: "static" } } @@ -148,20 +148,20 @@ workflow-options { call-caching { # Allows re-use of existing results for jobs you've already run # (default: false) - enabled = false + #enabled = false # Whether to invalidate a cache result forever if we cannot reuse them. Disable this if you expect some cache copies # to fail for external reasons which should not invalidate the cache (e.g. auth differences between users): # (default: true) - #invalidate-bad-cache-results = true + invalidate-bad-cache-results = false # The maximum number of times Cromwell will attempt to copy cache hits before giving up and running the job. #max-failed-copy-attempts = 1000000 - # blacklist-cache { + blacklist-cache { # # The call caching blacklist cache is off by default. This cache is used to blacklist cache hits based on cache # # hit ids or buckets of cache hit paths that Cromwell has previously failed to copy for permissions reasons. - # enabled: true + enabled: false # # # A blacklist grouping can be specified in workflow options which will inform the blacklister which workflows # # should share a blacklist cache. @@ -170,7 +170,7 @@ call-caching { # concurrency: 10000 # ttl: 2 hours # size: 1000 - # } + } # # buckets { # # Guava cache concurrency. @@ -193,6 +193,58 @@ call-caching { # } } +# Google configuration +google { + + #application-name = "cromwell" + + # Default: just application default + #auths = [ + + # Application default + #{ + # name = "application-default" + # scheme = "application_default" + #}, + + # Use a static service account + #{ + # name = "service-account" + # scheme = "service_account" + # Choose between PEM file and JSON file as a credential format. They're mutually exclusive. + # PEM format: + # service-account-id = "my-service-account" + # pem-file = "/path/to/file.pem" + # JSON format: + # json-file = "/path/to/file.json" + #} + + # Use service accounts provided through workflow options + #{ + # name = "user-service-account" + # scheme = "user_service_account" + #} + #] +} + +docker { + hash-lookup { + # Set this to match your available quota against the Google Container Engine API + #gcr-api-queries-per-100-seconds = 1000 + + # Time in minutes before an entry expires from the docker hashes cache and needs to be fetched again + #cache-entry-ttl = "20 minutes" + + # Maximum number of elements to be kept in the cache. If the limit is reached, old elements will be removed from the cache + #cache-size = 200 + + # How should docker hashes be looked up. Possible values are "local" and "remote" + # "local": Lookup hashes on the local docker daemon using the cli + # "remote": Lookup hashes on docker hub, gcr, gar, quay + #method = "remote" + } +} + engine { # This instructs the engine which filesystems are at its disposal to perform any IO operation that it might need. # For instance, WDL variables declared at the Workflow level will be evaluated using the filesystems declared here. @@ -222,29 +274,29 @@ languages { WDL { versions { "draft-2" { - language-factory = "languages.wdl.draft2.WdlDraft2LanguageFactory" - config { - # strict-validation: true - enabled: true - caching { - # # WDL Draft 2 namespace caching is off by default, this value must be set to true to enable it. - enabled: true - # # Guava cache concurrency - concurrency: 4 - # # How long entries in the cache should live from the time of their last access. - ttl: 20 minutes - # # Maximum number of entries in the cache (i.e. the number of workflow source + imports => namespace entries). - size: 100000 - } - } + # language-factory = "languages.wdl.draft2.WdlDraft2LanguageFactory" + # config { + # strict-validation: true + # enabled: true + # caching { + # # WDL Draft 2 namespace caching is off by default, this value must be set to true to enable it. + # enabled: false + # # Guava cache concurrency + # concurrency: 2 + # # How long entries in the cache should live from the time of their last access. + # ttl: 20 minutes + # # Maximum number of entries in the cache (i.e. the number of workflow source + imports => namespace entries). + # size: 1000 + # } + # } } # draft-3 is the same as 1.0 so files should be able to be submitted to Cromwell as 1.0 # "draft-3" { - # language-factory = "languages.wdl.draft3.WdlDraft3LanguageFactory" - # config { - # strict-validation: true - # enabled: true - # } + # language-factory = "languages.wdl.draft3.WdlDraft3LanguageFactory" + # config { + # strict-validation: true + # enabled: true + # } # } "1.0" { # 1.0 is just a rename of draft-3, so yes, they really do use the same factory: @@ -301,7 +353,7 @@ backend { config { # Optional limits on the number of concurrent jobs - #concurrent-job-limit = 5 + concurrent-job-limit = 50 # If true submits scripts to the bash background using "&". Only usefull for dispatchers that do NOT submit # the job and then immediately return a scheduled job id. @@ -321,16 +373,16 @@ backend { # `script-epilogue` configures a shell command to run after the execution of every command block. # # If this value is not set explicitly, the default value is `sync`, equivalent to: - # script-epilogue = "" + # script-epilogue = "sync" # # To turn off the default `sync` behavior set this value to an empty string: # script-epilogue = "" - # `glob-link-command` specifies command used to link glob outputs, by default using hard-links. - # If filesystem doesn't allow hard-links (e.g., beeGFS), change to soft-links as follows: - # glob-link-command = "ln -sL GLOB_PATTERN GLOB_DIRECTORY" + # `glob-link-command` specifies command used to link glob outputs, by default using hard-links. + # If filesystem doesn't allow hard-links (e.g., beeGFS), change to soft-links as follows: + # glob-link-command = "ln -sL GLOB_PATTERN GLOB_DIRECTORY" - # The list of possible runtime custom attributes. + # The list of possible runtime custom attributes. runtime-attributes = """ String? docker String? docker_user diff --git a/codes/run_pipeline.py b/codes/run_pipeline.py index eaad506..ece6257 100755 --- a/codes/run_pipeline.py +++ b/codes/run_pipeline.py @@ -45,7 +45,7 @@ if __name__ == '__main__': logname = datetime.now().strftime("%m%d%H%M") - cmd = f'nohup python ' \ + cmd = f'nohup python3 ' \ f'{run_wdl_path} -n {args.barcode} -s {args.normal} ' \ f'{"-u " if args.umi else ""} -i {args.input_dir} ' \ f'-node {args.start_node} ' \ diff --git a/codes/run_wdl.py b/codes/run_wdl.py index b94a7d7..3d7e61b 100755 --- a/codes/run_wdl.py +++ b/codes/run_wdl.py @@ -1,3 +1,5 @@ +#! /usr/bin/env python3 + import argparse import json import os @@ -87,17 +89,11 @@ def run(barcode, normal, umi, input_dir, output_dir, project, cancer, probe, wdl jsfile.write(json.dumps(arg, indent=4, ensure_ascii=False)) # run pipeline - # cmd1 = 'export PATH=/home/zhangchao/project/pipeline/workflow/script:$PATH' - # cmd2 = 'export PUBLIC=/home/zhangchao/project/pipeline/workflow/script/public' cmd3 = f'cd {output_dir}' - # f'{"-Dcall-caching.enabled=false " if uncache else ""}' - # f'-Dconfig.file=/home/zhangchao/project/pipeline/workflow/script/cromwell.examples.conf ' \ - cmd4 = f'/usr/bin/java -DLOG_MODE=standard ' \ f'-Dconfig.file=$WORKFLOW/codes/cromwell.examples.conf ' \ f'-jar $WORKFLOW/software/cromwell-51.jar run {wdl} --inputs {jsfile_path} ' - # cmd = f'{cmd1}; {cmd2}; {cmd3}; {cmd4}' cmd = f'{cmd3}; {cmd4}' # 记录开始时间 @@ -107,7 +103,9 @@ def run(barcode, normal, umi, input_dir, output_dir, project, cancer, probe, wdl pidnum = ret.pid with open(os.path.join(output_dir, 'pid'), 'w') as pidfile: pidfile.write(str(pidnum)) - ret.wait() + # ret.wait() + # 等待进程完成,并获取输出和错误 + output, error = ret.communicate() # 记录结束时间 end_time = time.time() # 计算运行时间 @@ -115,7 +113,7 @@ def run(barcode, normal, umi, input_dir, output_dir, project, cancer, probe, wdl print("\n运行时间:{:.2f} 秒".format(elapsed_time)) - print(ret.stdout.read(), ret.stderr.read()) + print(output, error) print('#' * 50) print('读取日志') diff --git a/pipeline.wdl b/pipeline.wdl index c34e497..8535217 100644 --- a/pipeline.wdl +++ b/pipeline.wdl @@ -212,6 +212,6 @@ workflow pipeline { } output { - String result = "${output_dir}/report/${tumor}.merged_file.xlsx" + String result = call_postprocess.merged } } \ No newline at end of file diff --git a/wdl/chemo.wdl b/wdl/chemo.wdl index 35fc2be..5b2fe75 100755 --- a/wdl/chemo.wdl +++ b/wdl/chemo.wdl @@ -7,16 +7,17 @@ task run_chemo { String vcf String cancer String project - command <<< - + command { if [ ! -d ${output_dir}/chemo ];then mkdir ${output_dir}/chemo fi chemo.py -d $DATABASE/chemo_database.xlsx -probe ${probe} -n ${name} -v ${vcf} -o ${output_dir}/chemo -c ${cancer} -p ${project} - >>> - + } + output { + String chemo_res = "${output_dir}/chemo/${name}.drug.res.txt" + } } workflow call_chemo { diff --git a/wdl/hereditary.wdl b/wdl/hereditary.wdl index 8b50fe0..ace0271 100755 --- a/wdl/hereditary.wdl +++ b/wdl/hereditary.wdl @@ -5,7 +5,7 @@ task run_hereditary { String output_dir String project String filter_txt - command <<< + command { if [ ! -d ${output_dir}/hereditary ];then mkdir ${output_dir}/hereditary @@ -13,7 +13,11 @@ task run_hereditary { hereditary.py -d $DATABASE/hereditary_database.xlsx -p ${project} -n ${name} -f ${filter_txt} -o ${output_dir}/hereditary - >>> + } + + output { + String hereditary_txt = "${output_dir}/hereditary/${name}.hereditary.txt" + } } diff --git a/wdl/neoantigen.wdl b/wdl/neoantigen.wdl index 87e9087..ac87b99 100755 --- a/wdl/neoantigen.wdl +++ b/wdl/neoantigen.wdl @@ -151,6 +151,10 @@ task run_neoantigen { >>> + output { + String neoantigen_txt = "${output_dir}neoantigen/MHC_Class_I/neoantigen.txt" + } + } workflow call_neoantigen { diff --git a/wdl/pollution.wdl b/wdl/pollution.wdl index 7dc4bb1..bb1d96f 100755 --- a/wdl/pollution.wdl +++ b/wdl/pollution.wdl @@ -7,7 +7,7 @@ task run_pollution { String vcf String? vcf2 - command <<< + command { if [ ! -d ${output_dir}/pollution ];then mkdir ${output_dir}/pollution @@ -20,7 +20,7 @@ task run_pollution { -b $PUBLIC/pollution/${probe}_contaminate_ref.bed \ -c $PUBLIC/pollution/${probe}_contaminate_cnvkit.bed - >>> + } } workflow call_pollution { diff --git a/wdl/postprocess.wdl b/wdl/postprocess.wdl index e2c7855..bc5c7a6 100755 --- a/wdl/postprocess.wdl +++ b/wdl/postprocess.wdl @@ -16,17 +16,19 @@ task run_post { String cancer String project - command <<< - + command { if [ ! -d ${output_dir}/report ];then mkdir ${output_dir}/report fi indication.pl ${output_dir} ${cancer} ${project} sample_post.py -s ${name} -o ${output_dir} postprocess.py -n ${name} -s ${normal} -c ${output_dir} -o ${output_dir}/report/${name}.merged_file.xlsx + sleep 2m - >>> - + } + output { + String merged = "${output_dir}/report/${name}.merged_file.xlsx" + } } workflow call_postprocess { diff --git a/wdl/statistics.wdl b/wdl/statistics.wdl index 6068bc7..09435a6 100755 --- a/wdl/statistics.wdl +++ b/wdl/statistics.wdl @@ -8,7 +8,7 @@ task run_statistics { String ref String bed - command <<< + command { if [ ! -d ${output_dir}/qc/${name}_bamdst ];then mkdir -p ${output_dir}/qc/${name}_bamdst @@ -23,7 +23,7 @@ task run_statistics { # ${output_dir}/qc/${name}_InsertAndDepthStat \ # ${output_dir}/qc/${name}_bamdst/insertsize.plot \ # ${output_dir}/qc/${name}_bamdst/depth_distribution.plot - >>> + } } workflow statistics { diff --git a/wdl/tmb.wdl b/wdl/tmb.wdl index 6a4f2b5..3abe1fb 100755 --- a/wdl/tmb.wdl +++ b/wdl/tmb.wdl @@ -7,8 +7,7 @@ task run_tmb { String sample_type String output_dir - command <<< - + command { if [ ! -d ${output_dir}/tmb ];then mkdir ${output_dir}/tmb fi @@ -19,8 +18,12 @@ task run_tmb { ${project} \ ${sample_type} \ tmb - >>> + } + + output { + String tmb_txt = "${output_dir}/tmb/${name}.tmb.txt" + } } workflow call_tmb { @@ -34,26 +37,13 @@ workflow call_tmb { Boolean umi if (run) { - if (umi) { - call run_tmb as run_tmb_umi { - input: - name=name, - file=file, - project=project, - sample_type='c', - output_dir=output_dir - } - } - if (!umi) { - call run_tmb as run_tmb_tissue { - input: - name=name, - file=file, - project=project, - sample_type='t', - output_dir=output_dir - } - + call run_tmb { + input: + name=name, + file=file, + project=project, + sample_type=if umi then 'c' else 't', + output_dir=output_dir } }