解决不能自动终止的问题

master
chaopower 2024-01-01 14:25:34 +08:00
parent 9f9fb4bf4a
commit ab356a15f0
11 changed files with 141 additions and 90 deletions

View File

@ -42,19 +42,19 @@ system {
graceful-server-shutdown = true graceful-server-shutdown = true
# Cromwell will cap the number of running workflows at N # Cromwell will cap the number of running workflows at N
max-concurrent-workflows = 10000 max-concurrent-workflows = 5000
# Cromwell will launch up to N submitted workflows at a time, regardless of how many open workflow slots exist # Cromwell will launch up to N submitted workflows at a time, regardless of how many open workflow slots exist
max-workflow-launch-count = 1000 max-workflow-launch-count = 50
# Number of seconds between workflow launches # Number of seconds between workflow launches
new-workflow-poll-rate = 20 new-workflow-poll-rate = 10
# Since the WorkflowLogCopyRouter is initialized in code, this is the number of workers # Since the WorkflowLogCopyRouter is initialized in code, this is the number of workers
#number-of-workflow-log-copy-workers = 10 number-of-workflow-log-copy-workers = 10
# Default number of cache read workers # Default number of cache read workers
#number-of-cache-read-workers = 25 number-of-cache-read-workers = 25
io { io {
# throttle { # throttle {
@ -65,7 +65,7 @@ system {
# } # }
# Number of times an I/O operation should be attempted before giving up and failing it. # Number of times an I/O operation should be attempted before giving up and failing it.
number-of-attempts = 5 #number-of-attempts = 5
} }
# Maximum number of input file bytes allowed in order to read each type. # Maximum number of input file bytes allowed in order to read each type.
@ -95,7 +95,7 @@ system {
# These are the default values in Cromwell, in most circumstances there should not be a need to change them. # These are the default values in Cromwell, in most circumstances there should not be a need to change them.
# How frequently Cromwell should scan for aborts. # How frequently Cromwell should scan for aborts.
scan-frequency: 30 seconds scan-frequency: 2 seconds
# The cache of in-progress aborts. Cromwell will add entries to this cache once a WorkflowActor has been messaged to abort. # The cache of in-progress aborts. Cromwell will add entries to this cache once a WorkflowActor has been messaged to abort.
# If on the next scan an 'Aborting' status is found for a workflow that has an entry in this cache, Cromwell will not ask # If on the next scan an 'Aborting' status is found for a workflow that has an entry in this cache, Cromwell will not ask
@ -103,16 +103,16 @@ system {
cache { cache {
enabled: true enabled: true
# Guava cache concurrency. # Guava cache concurrency.
concurrency: 10 concurrency: 4
# How long entries in the cache should live from the time they are added to the cache. # How long entries in the cache should live from the time they are added to the cache.
ttl: 10 minutes ttl: 20 minutes
# Maximum number of entries in the cache. # Maximum number of entries in the cache.
size: 200000 size: 100000
} }
} }
# Cromwell reads this value into the JVM's `networkaddress.cache.ttl` setting to control DNS cache expiration # Cromwell reads this value into the JVM's `networkaddress.cache.ttl` setting to control DNS cache expiration
dns-cache-ttl: 30 minutes dns-cache-ttl: 3 minutes
} }
workflow-options { workflow-options {
@ -140,7 +140,7 @@ workflow-options {
workflow-type-version: "draft-2" workflow-type-version: "draft-2"
# To set a default hog group rather than defaulting to workflow ID: # To set a default hog group rather than defaulting to workflow ID:
hogGroup: "static" #hogGroup: "static"
} }
} }
@ -148,20 +148,20 @@ workflow-options {
call-caching { call-caching {
# Allows re-use of existing results for jobs you've already run # Allows re-use of existing results for jobs you've already run
# (default: false) # (default: false)
enabled = false #enabled = false
# Whether to invalidate a cache result forever if we cannot reuse them. Disable this if you expect some cache copies # Whether to invalidate a cache result forever if we cannot reuse them. Disable this if you expect some cache copies
# to fail for external reasons which should not invalidate the cache (e.g. auth differences between users): # to fail for external reasons which should not invalidate the cache (e.g. auth differences between users):
# (default: true) # (default: true)
#invalidate-bad-cache-results = true invalidate-bad-cache-results = false
# The maximum number of times Cromwell will attempt to copy cache hits before giving up and running the job. # The maximum number of times Cromwell will attempt to copy cache hits before giving up and running the job.
#max-failed-copy-attempts = 1000000 #max-failed-copy-attempts = 1000000
# blacklist-cache { blacklist-cache {
# # The call caching blacklist cache is off by default. This cache is used to blacklist cache hits based on cache # # The call caching blacklist cache is off by default. This cache is used to blacklist cache hits based on cache
# # hit ids or buckets of cache hit paths that Cromwell has previously failed to copy for permissions reasons. # # hit ids or buckets of cache hit paths that Cromwell has previously failed to copy for permissions reasons.
# enabled: true enabled: false
# #
# # A blacklist grouping can be specified in workflow options which will inform the blacklister which workflows # # A blacklist grouping can be specified in workflow options which will inform the blacklister which workflows
# # should share a blacklist cache. # # should share a blacklist cache.
@ -170,7 +170,7 @@ call-caching {
# concurrency: 10000 # concurrency: 10000
# ttl: 2 hours # ttl: 2 hours
# size: 1000 # size: 1000
# } }
# #
# buckets { # buckets {
# # Guava cache concurrency. # # Guava cache concurrency.
@ -193,6 +193,58 @@ call-caching {
# } # }
} }
# Google configuration
google {
#application-name = "cromwell"
# Default: just application default
#auths = [
# Application default
#{
# name = "application-default"
# scheme = "application_default"
#},
# Use a static service account
#{
# name = "service-account"
# scheme = "service_account"
# Choose between PEM file and JSON file as a credential format. They're mutually exclusive.
# PEM format:
# service-account-id = "my-service-account"
# pem-file = "/path/to/file.pem"
# JSON format:
# json-file = "/path/to/file.json"
#}
# Use service accounts provided through workflow options
#{
# name = "user-service-account"
# scheme = "user_service_account"
#}
#]
}
docker {
hash-lookup {
# Set this to match your available quota against the Google Container Engine API
#gcr-api-queries-per-100-seconds = 1000
# Time in minutes before an entry expires from the docker hashes cache and needs to be fetched again
#cache-entry-ttl = "20 minutes"
# Maximum number of elements to be kept in the cache. If the limit is reached, old elements will be removed from the cache
#cache-size = 200
# How should docker hashes be looked up. Possible values are "local" and "remote"
# "local": Lookup hashes on the local docker daemon using the cli
# "remote": Lookup hashes on docker hub, gcr, gar, quay
#method = "remote"
}
}
engine { engine {
# This instructs the engine which filesystems are at its disposal to perform any IO operation that it might need. # This instructs the engine which filesystems are at its disposal to perform any IO operation that it might need.
# For instance, WDL variables declared at the Workflow level will be evaluated using the filesystems declared here. # For instance, WDL variables declared at the Workflow level will be evaluated using the filesystems declared here.
@ -222,21 +274,21 @@ languages {
WDL { WDL {
versions { versions {
"draft-2" { "draft-2" {
language-factory = "languages.wdl.draft2.WdlDraft2LanguageFactory" # language-factory = "languages.wdl.draft2.WdlDraft2LanguageFactory"
config { # config {
# strict-validation: true # strict-validation: true
enabled: true # enabled: true
caching { # caching {
# # WDL Draft 2 namespace caching is off by default, this value must be set to true to enable it. # # WDL Draft 2 namespace caching is off by default, this value must be set to true to enable it.
enabled: true # enabled: false
# # Guava cache concurrency # # Guava cache concurrency
concurrency: 4 # concurrency: 2
# # How long entries in the cache should live from the time of their last access. # # How long entries in the cache should live from the time of their last access.
ttl: 20 minutes # ttl: 20 minutes
# # Maximum number of entries in the cache (i.e. the number of workflow source + imports => namespace entries). # # Maximum number of entries in the cache (i.e. the number of workflow source + imports => namespace entries).
size: 100000 # size: 1000
} # }
} # }
} }
# draft-3 is the same as 1.0 so files should be able to be submitted to Cromwell as 1.0 # draft-3 is the same as 1.0 so files should be able to be submitted to Cromwell as 1.0
# "draft-3" { # "draft-3" {
@ -301,7 +353,7 @@ backend {
config { config {
# Optional limits on the number of concurrent jobs # Optional limits on the number of concurrent jobs
#concurrent-job-limit = 5 concurrent-job-limit = 50
# If true submits scripts to the bash background using "&". Only usefull for dispatchers that do NOT submit # If true submits scripts to the bash background using "&". Only usefull for dispatchers that do NOT submit
# the job and then immediately return a scheduled job id. # the job and then immediately return a scheduled job id.
@ -321,7 +373,7 @@ backend {
# `script-epilogue` configures a shell command to run after the execution of every command block. # `script-epilogue` configures a shell command to run after the execution of every command block.
# #
# If this value is not set explicitly, the default value is `sync`, equivalent to: # If this value is not set explicitly, the default value is `sync`, equivalent to:
# script-epilogue = "" # script-epilogue = "sync"
# #
# To turn off the default `sync` behavior set this value to an empty string: # To turn off the default `sync` behavior set this value to an empty string:
# script-epilogue = "" # script-epilogue = ""

View File

@ -45,7 +45,7 @@ if __name__ == '__main__':
logname = datetime.now().strftime("%m%d%H%M") logname = datetime.now().strftime("%m%d%H%M")
cmd = f'nohup python ' \ cmd = f'nohup python3 ' \
f'{run_wdl_path} -n {args.barcode} -s {args.normal} ' \ f'{run_wdl_path} -n {args.barcode} -s {args.normal} ' \
f'{"-u " if args.umi else ""} -i {args.input_dir} ' \ f'{"-u " if args.umi else ""} -i {args.input_dir} ' \
f'-node {args.start_node} ' \ f'-node {args.start_node} ' \

View File

@ -1,3 +1,5 @@
#! /usr/bin/env python3
import argparse import argparse
import json import json
import os import os
@ -87,17 +89,11 @@ def run(barcode, normal, umi, input_dir, output_dir, project, cancer, probe, wdl
jsfile.write(json.dumps(arg, indent=4, ensure_ascii=False)) jsfile.write(json.dumps(arg, indent=4, ensure_ascii=False))
# run pipeline # run pipeline
# cmd1 = 'export PATH=/home/zhangchao/project/pipeline/workflow/script:$PATH'
# cmd2 = 'export PUBLIC=/home/zhangchao/project/pipeline/workflow/script/public'
cmd3 = f'cd {output_dir}' cmd3 = f'cd {output_dir}'
# f'{"-Dcall-caching.enabled=false " if uncache else ""}'
# f'-Dconfig.file=/home/zhangchao/project/pipeline/workflow/script/cromwell.examples.conf ' \
cmd4 = f'/usr/bin/java -DLOG_MODE=standard ' \ cmd4 = f'/usr/bin/java -DLOG_MODE=standard ' \
f'-Dconfig.file=$WORKFLOW/codes/cromwell.examples.conf ' \ f'-Dconfig.file=$WORKFLOW/codes/cromwell.examples.conf ' \
f'-jar $WORKFLOW/software/cromwell-51.jar run {wdl} --inputs {jsfile_path} ' f'-jar $WORKFLOW/software/cromwell-51.jar run {wdl} --inputs {jsfile_path} '
# cmd = f'{cmd1}; {cmd2}; {cmd3}; {cmd4}'
cmd = f'{cmd3}; {cmd4}' cmd = f'{cmd3}; {cmd4}'
# 记录开始时间 # 记录开始时间
@ -107,7 +103,9 @@ def run(barcode, normal, umi, input_dir, output_dir, project, cancer, probe, wdl
pidnum = ret.pid pidnum = ret.pid
with open(os.path.join(output_dir, 'pid'), 'w') as pidfile: with open(os.path.join(output_dir, 'pid'), 'w') as pidfile:
pidfile.write(str(pidnum)) pidfile.write(str(pidnum))
ret.wait() # ret.wait()
# 等待进程完成,并获取输出和错误
output, error = ret.communicate()
# 记录结束时间 # 记录结束时间
end_time = time.time() end_time = time.time()
# 计算运行时间 # 计算运行时间
@ -115,7 +113,7 @@ def run(barcode, normal, umi, input_dir, output_dir, project, cancer, probe, wdl
print("\n运行时间:{:.2f}".format(elapsed_time)) print("\n运行时间:{:.2f}".format(elapsed_time))
print(ret.stdout.read(), ret.stderr.read()) print(output, error)
print('#' * 50) print('#' * 50)
print('读取日志') print('读取日志')

View File

@ -212,6 +212,6 @@ workflow pipeline {
} }
output { output {
String result = "${output_dir}/report/${tumor}.merged_file.xlsx" String result = call_postprocess.merged
} }
} }

View File

@ -7,16 +7,17 @@ task run_chemo {
String vcf String vcf
String cancer String cancer
String project String project
command <<< command {
if [ ! -d ${output_dir}/chemo ];then if [ ! -d ${output_dir}/chemo ];then
mkdir ${output_dir}/chemo mkdir ${output_dir}/chemo
fi fi
chemo.py -d $DATABASE/chemo_database.xlsx -probe ${probe} -n ${name} -v ${vcf} -o ${output_dir}/chemo -c ${cancer} -p ${project} chemo.py -d $DATABASE/chemo_database.xlsx -probe ${probe} -n ${name} -v ${vcf} -o ${output_dir}/chemo -c ${cancer} -p ${project}
>>> }
output {
String chemo_res = "${output_dir}/chemo/${name}.drug.res.txt"
}
} }
workflow call_chemo { workflow call_chemo {

View File

@ -5,7 +5,7 @@ task run_hereditary {
String output_dir String output_dir
String project String project
String filter_txt String filter_txt
command <<< command {
if [ ! -d ${output_dir}/hereditary ];then if [ ! -d ${output_dir}/hereditary ];then
mkdir ${output_dir}/hereditary mkdir ${output_dir}/hereditary
@ -13,7 +13,11 @@ task run_hereditary {
hereditary.py -d $DATABASE/hereditary_database.xlsx -p ${project} -n ${name} -f ${filter_txt} -o ${output_dir}/hereditary hereditary.py -d $DATABASE/hereditary_database.xlsx -p ${project} -n ${name} -f ${filter_txt} -o ${output_dir}/hereditary
>>> }
output {
String hereditary_txt = "${output_dir}/hereditary/${name}.hereditary.txt"
}
} }

View File

@ -151,6 +151,10 @@ task run_neoantigen {
>>> >>>
output {
String neoantigen_txt = "${output_dir}neoantigen/MHC_Class_I/neoantigen.txt"
}
} }
workflow call_neoantigen { workflow call_neoantigen {

View File

@ -7,7 +7,7 @@ task run_pollution {
String vcf String vcf
String? vcf2 String? vcf2
command <<< command {
if [ ! -d ${output_dir}/pollution ];then if [ ! -d ${output_dir}/pollution ];then
mkdir ${output_dir}/pollution mkdir ${output_dir}/pollution
@ -20,7 +20,7 @@ task run_pollution {
-b $PUBLIC/pollution/${probe}_contaminate_ref.bed \ -b $PUBLIC/pollution/${probe}_contaminate_ref.bed \
-c $PUBLIC/pollution/${probe}_contaminate_cnvkit.bed -c $PUBLIC/pollution/${probe}_contaminate_cnvkit.bed
>>> }
} }
workflow call_pollution { workflow call_pollution {

View File

@ -16,17 +16,19 @@ task run_post {
String cancer String cancer
String project String project
command <<< command {
if [ ! -d ${output_dir}/report ];then if [ ! -d ${output_dir}/report ];then
mkdir ${output_dir}/report mkdir ${output_dir}/report
fi fi
indication.pl ${output_dir} ${cancer} ${project} indication.pl ${output_dir} ${cancer} ${project}
sample_post.py -s ${name} -o ${output_dir} sample_post.py -s ${name} -o ${output_dir}
postprocess.py -n ${name} -s ${normal} -c ${output_dir} -o ${output_dir}/report/${name}.merged_file.xlsx postprocess.py -n ${name} -s ${normal} -c ${output_dir} -o ${output_dir}/report/${name}.merged_file.xlsx
sleep 2m
>>> }
output {
String merged = "${output_dir}/report/${name}.merged_file.xlsx"
}
} }
workflow call_postprocess { workflow call_postprocess {

View File

@ -8,7 +8,7 @@ task run_statistics {
String ref String ref
String bed String bed
command <<< command {
if [ ! -d ${output_dir}/qc/${name}_bamdst ];then if [ ! -d ${output_dir}/qc/${name}_bamdst ];then
mkdir -p ${output_dir}/qc/${name}_bamdst mkdir -p ${output_dir}/qc/${name}_bamdst
@ -23,7 +23,7 @@ task run_statistics {
# ${output_dir}/qc/${name}_InsertAndDepthStat \ # ${output_dir}/qc/${name}_InsertAndDepthStat \
# ${output_dir}/qc/${name}_bamdst/insertsize.plot \ # ${output_dir}/qc/${name}_bamdst/insertsize.plot \
# ${output_dir}/qc/${name}_bamdst/depth_distribution.plot # ${output_dir}/qc/${name}_bamdst/depth_distribution.plot
>>> }
} }
workflow statistics { workflow statistics {

View File

@ -7,8 +7,7 @@ task run_tmb {
String sample_type String sample_type
String output_dir String output_dir
command <<< command {
if [ ! -d ${output_dir}/tmb ];then if [ ! -d ${output_dir}/tmb ];then
mkdir ${output_dir}/tmb mkdir ${output_dir}/tmb
fi fi
@ -19,8 +18,12 @@ task run_tmb {
${project} \ ${project} \
${sample_type} \ ${sample_type} \
tmb tmb
>>>
}
output {
String tmb_txt = "${output_dir}/tmb/${name}.tmb.txt"
}
} }
workflow call_tmb { workflow call_tmb {
@ -34,27 +37,14 @@ workflow call_tmb {
Boolean umi Boolean umi
if (run) { if (run) {
if (umi) { call run_tmb {
call run_tmb as run_tmb_umi {
input: input:
name=name, name=name,
file=file, file=file,
project=project, project=project,
sample_type='c', sample_type=if umi then 'c' else 't',
output_dir=output_dir output_dir=output_dir
} }
}
if (!umi) {
call run_tmb as run_tmb_tissue {
input:
name=name,
file=file,
project=project,
sample_type='t',
output_dir=output_dir
}
}
} }