pipeline/codes/run_wdl.py

142 lines
5.4 KiB
Python
Raw Normal View History

2023-11-01 10:09:29 +08:00
import argparse
import json
import os
import subprocess
import time
2023-11-29 15:13:30 +08:00
import pandas as pd
2023-11-01 10:09:29 +08:00
2023-11-29 15:13:30 +08:00
def get_branch_nodes(graph, start_node):
def dfs(node):
if node not in visited:
visited.add(node)
branch_nodes.add(node)
for neighbor in graph[node]:
dfs(neighbor)
branch_nodes = set()
visited = set()
for split_start_node in start_node.split(','):
dfs(split_start_node.strip())
return branch_nodes
def get_catecode(project, start_node='addQc'):
needcode = ['addQc', 'addAlignment', 'addTarget', 'addFusion', 'addCnv', 'addMsi', 'addChemo', 'addHcs',
2023-12-19 13:37:52 +08:00
'addAutoReport', 'addTmb', 'addPollution', 'addNeoantigen']
2023-11-29 15:13:30 +08:00
dag = {
'addQc': ['addAlignment'],
2023-12-19 13:37:52 +08:00
'addAlignment': ['addTarget', 'addFusion', 'addCnv', 'addMsi', 'addNeoantigen'],
'addTarget': ['addChemo', 'addHcs', 'addTmb', 'addPollution'],
2023-11-29 15:13:30 +08:00
'addFusion': ['addAutoReport'],
'addCnv': ['addAutoReport'],
'addMsi': ['addAutoReport'],
'addChemo': ['addAutoReport'],
'addHcs': ['addAutoReport'],
'addTmb': ['addAutoReport'],
2023-12-19 13:37:52 +08:00
'addPollution': ['addAutoReport'],
'addNeoantigen': ['addAutoReport'],
2023-11-29 15:13:30 +08:00
'addAutoReport': []
}
runcode = get_branch_nodes(dag, start_node)
2023-11-30 15:31:35 +08:00
public = os.environ.get('DATABASE')
2023-11-29 15:13:30 +08:00
path = os.path.join(public, 'info.csv')
info = pd.read_csv(path)
project = info[info['project'] == project]['CateCode']
catecode = project.values[0].split(';') if not project.empty else []
2023-12-19 13:37:52 +08:00
catecode.extend(['addQc', 'addAlignment', 'addAutoReport', 'addPollution'])
2023-11-29 15:13:30 +08:00
cate_dict = dict()
for ele in needcode:
if ele in runcode and ele in catecode:
cate_dict[ele] = True
else:
cate_dict[ele] = False
return cate_dict
def run(barcode, normal, umi, input_dir, output_dir, project, cancer, probe, wdl, start_node):
2023-11-01 10:09:29 +08:00
input_dir = os.path.realpath(input_dir)
output_dir = os.path.realpath(output_dir)
wdl = os.path.realpath(wdl)
2023-11-29 15:13:30 +08:00
catecodes = get_catecode(project, start_node=start_node)
2023-11-01 10:09:29 +08:00
arg = {
"pipeline.tumor": barcode,
"pipeline.normal": normal,
"pipeline.umi": umi,
"pipeline.input_dir": input_dir,
"pipeline.output_dir": output_dir,
"pipeline.project": project,
"pipeline.cancer": cancer,
2023-11-29 15:13:30 +08:00
"pipeline.probe": probe,
"pipeline.catecode": catecodes
2023-11-01 10:09:29 +08:00
}
arg = {key: value for key, value in arg.items() if value not in (None, '', False)}
# generate json
jsfile_path = os.path.join(output_dir, f'{barcode}.json')
with open(jsfile_path, 'w') as jsfile:
jsfile.write(json.dumps(arg, indent=4, ensure_ascii=False))
# run pipeline
2023-11-30 15:31:35 +08:00
# cmd1 = 'export PATH=/home/zhangchao/project/pipeline/workflow/script:$PATH'
# cmd2 = 'export PUBLIC=/home/zhangchao/project/pipeline/workflow/script/public'
2023-11-01 10:09:29 +08:00
cmd3 = f'cd {output_dir}'
2023-11-29 15:13:30 +08:00
# f'{"-Dcall-caching.enabled=false " if uncache else ""}'
# f'-Dconfig.file=/home/zhangchao/project/pipeline/workflow/script/cromwell.examples.conf ' \
2023-11-30 15:31:35 +08:00
cmd4 = f'/usr/bin/java -jar $WORKFLOW/software/cromwell-51.jar run --inputs {jsfile_path} {wdl}'
# cmd = f'{cmd1}; {cmd2}; {cmd3}; {cmd4}'
cmd = f'{cmd3}; {cmd4}'
2023-11-01 10:09:29 +08:00
# 记录开始时间
start_time = time.time()
print(cmd)
ret = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding="utf-8")
pidnum = ret.pid
with open(os.path.join(output_dir, 'pid'), 'w') as pidfile:
pidfile.write(str(pidnum))
ret.wait()
# 记录结束时间
end_time = time.time()
# 计算运行时间
elapsed_time = end_time - start_time
print("\n运行时间:{:.2f}".format(elapsed_time))
print(ret.stdout.read(), ret.stderr.read())
print('#' * 50)
print('读取日志')
if __name__ == '__main__':
parser = argparse.ArgumentParser(description="JM to run pipeline")
parser.add_argument('-n', '--barcode', help="sample's barcode", required=True)
parser.add_argument('-s', '--normal', help="sample's normal", default='', required=False, nargs='?')
parser.add_argument('-u', '--umi', action='store_true', help="is umi sample", default=False)
parser.add_argument('-i', '--input_dir', help="sample's input_dir/workdir", required=True)
parser.add_argument('-o', '--output_dir', help="Output directory, default ./", default='./')
parser.add_argument('-p', '--project', help="project", required=True)
parser.add_argument('-c', '--cancer', help="cancer", required=True)
2023-11-29 15:13:30 +08:00
parser.add_argument('-b', '--probe', help="probe, 682, 624, 160, 17 for now ", required=True)
2023-11-01 10:09:29 +08:00
parser.add_argument('-w', '--wdl', help="wdl", default='/home/zhangchao/project/pipeline/workflow/pipeline.wdl')
2023-11-29 15:13:30 +08:00
parser.add_argument('-node', '--start_node',
help="node begain to run; 'addQc', 'addAlignment', "
"'addTarget', 'addFusion', 'addCnv', 'addMsi', 'addChemo',"
2023-12-19 13:37:52 +08:00
" 'addHcs, 'addTmb', 'addNeoantigen', 'addPollution', addAutoReport' "
"and also run more than one node ,"
2023-11-30 15:31:35 +08:00
"like this 'addTarget,addFusion'",
2023-11-29 15:13:30 +08:00
default='addQc')
2023-11-01 10:09:29 +08:00
args = parser.parse_args()
run(args.barcode, args.normal, args.umi, args.input_dir, args.output_dir,
2023-11-29 15:13:30 +08:00
args.project, args.cancer, args.probe, args.wdl, args.start_node)