import argparse import json import os import subprocess import time import pandas as pd def get_branch_nodes(graph, start_node): def dfs(node): if node not in visited: visited.add(node) branch_nodes.add(node) for neighbor in graph[node]: dfs(neighbor) branch_nodes = set() visited = set() for split_start_node in start_node.split(','): dfs(split_start_node.strip()) return branch_nodes def get_catecode(project, start_node='addQc'): needcode = ['addQc', 'addAlignment', 'addTarget', 'addFusion', 'addCnv', 'addMsi', 'addChemo', 'addHcs', 'addAutoReport', 'addTmb', 'addPollution', 'addNeoantigen'] dag = { 'addQc': ['addAlignment'], 'addAlignment': ['addTarget', 'addFusion', 'addCnv', 'addMsi', 'addNeoantigen'], 'addTarget': ['addChemo', 'addHcs', 'addTmb', 'addPollution'], 'addFusion': ['addAutoReport'], 'addCnv': ['addAutoReport'], 'addMsi': ['addAutoReport'], 'addChemo': ['addAutoReport'], 'addHcs': ['addAutoReport'], 'addTmb': ['addAutoReport'], 'addPollution': ['addAutoReport'], 'addNeoantigen': ['addAutoReport'], 'addAutoReport': [] } runcode = get_branch_nodes(dag, start_node) public = os.environ.get('DATABASE') path = os.path.join(public, 'info.csv') info = pd.read_csv(path) project = info[info['project'] == project]['CateCode'] catecode = project.values[0].split(';') if not project.empty else [] catecode.extend(['addQc', 'addAlignment', 'addAutoReport', 'addPollution']) cate_dict = dict() for ele in needcode: if ele in runcode and ele in catecode: cate_dict[ele] = True else: cate_dict[ele] = False return cate_dict def run(barcode, normal, umi, input_dir, output_dir, project, cancer, probe, wdl, start_node): input_dir = os.path.realpath(input_dir) output_dir = os.path.realpath(output_dir) wdl = os.path.realpath(wdl) catecodes = get_catecode(project, start_node=start_node) arg = { "pipeline.tumor": barcode, "pipeline.normal": normal, "pipeline.umi": umi, "pipeline.input_dir": input_dir, "pipeline.output_dir": output_dir, "pipeline.project": project, "pipeline.cancer": cancer, "pipeline.probe": probe, "pipeline.catecode": catecodes } arg = {key: value for key, value in arg.items() if value not in (None, '', False)} # generate json jsfile_path = os.path.join(output_dir, f'{barcode}.json') with open(jsfile_path, 'w') as jsfile: jsfile.write(json.dumps(arg, indent=4, ensure_ascii=False)) # run pipeline # cmd1 = 'export PATH=/home/zhangchao/project/pipeline/workflow/script:$PATH' # cmd2 = 'export PUBLIC=/home/zhangchao/project/pipeline/workflow/script/public' cmd3 = f'cd {output_dir}' # f'{"-Dcall-caching.enabled=false " if uncache else ""}' # f'-Dconfig.file=/home/zhangchao/project/pipeline/workflow/script/cromwell.examples.conf ' \ cmd4 = f'/usr/bin/java -jar $WORKFLOW/software/cromwell-51.jar run --inputs {jsfile_path} {wdl}' # cmd = f'{cmd1}; {cmd2}; {cmd3}; {cmd4}' cmd = f'{cmd3}; {cmd4}' # 记录开始时间 start_time = time.time() print(cmd) ret = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding="utf-8") pidnum = ret.pid with open(os.path.join(output_dir, 'pid'), 'w') as pidfile: pidfile.write(str(pidnum)) ret.wait() # 记录结束时间 end_time = time.time() # 计算运行时间 elapsed_time = end_time - start_time print("\n运行时间:{:.2f} 秒".format(elapsed_time)) print(ret.stdout.read(), ret.stderr.read()) print('#' * 50) print('读取日志') if __name__ == '__main__': parser = argparse.ArgumentParser(description="JM to run pipeline") parser.add_argument('-n', '--barcode', help="sample's barcode", required=True) parser.add_argument('-s', '--normal', help="sample's normal", default='', required=False, nargs='?') parser.add_argument('-u', '--umi', action='store_true', help="is umi sample", default=False) parser.add_argument('-i', '--input_dir', help="sample's input_dir/workdir", required=True) parser.add_argument('-o', '--output_dir', help="Output directory, default ./", default='./') parser.add_argument('-p', '--project', help="project", required=True) parser.add_argument('-c', '--cancer', help="cancer", required=True) parser.add_argument('-b', '--probe', help="probe, 682, 624, 160, 17 for now ", required=True) parser.add_argument('-w', '--wdl', help="wdl", default='/home/zhangchao/project/pipeline/workflow/pipeline.wdl') parser.add_argument('-node', '--start_node', help="node begain to run; 'addQc', 'addAlignment', " "'addTarget', 'addFusion', 'addCnv', 'addMsi', 'addChemo'," " 'addHcs, 'addTmb', 'addNeoantigen', 'addPollution', addAutoReport' " "and also run more than one node ," "like this 'addTarget,addFusion'", default='addQc') args = parser.parse_args() run(args.barcode, args.normal, args.umi, args.input_dir, args.output_dir, args.project, args.cancer, args.probe, args.wdl, args.start_node)