#! /usr/bin/env python3 import argparse import json import os import re import subprocess import time from datetime import datetime import pandas as pd def get_branch_nodes(graph, start_node): def dfs(node): if node not in visited: visited.add(node) branch_nodes.add(node) for neighbor in graph[node]: dfs(neighbor) branch_nodes = set() visited = set() for split_start_node in start_node.split(','): dfs(split_start_node.strip()) return branch_nodes def get_catecode(project, start_node='addQc'): needcode = ['addQc', 'addAlignment', 'addTarget', 'addFusion', 'addCnv', 'addMsi', 'addChemo', 'addHcs', 'addAutoReport', 'addTmb', 'addPollution', 'addNeoantigen'] dag = { 'addQc': ['addAlignment'], 'addAlignment': ['addTarget', 'addFusion', 'addCnv', 'addMsi', 'addNeoantigen'], 'addTarget': ['addChemo', 'addHcs', 'addTmb', 'addPollution'], 'addFusion': ['addAutoReport'], 'addCnv': ['addAutoReport'], 'addMsi': ['addAutoReport'], 'addChemo': ['addAutoReport'], 'addHcs': ['addAutoReport'], 'addTmb': ['addAutoReport'], 'addPollution': ['addAutoReport'], 'addNeoantigen': ['addAutoReport'], 'addAutoReport': [] } runcode = get_branch_nodes(dag, start_node) public = os.environ.get('DATABASE') path = os.path.join(public, 'info.csv') info = pd.read_csv(path) project = info[info['project'] == project]['CateCode'] catecode = project.values[0].split(';') if not project.empty else [] catecode.extend(['addQc', 'addAlignment', 'addAutoReport', 'addPollution']) cate_dict = dict() for ele in needcode: if ele in runcode and ele in catecode: cate_dict[ele] = True else: cate_dict[ele] = False return cate_dict def run(barcode, normal, umi, input_dir, output_dir, project, cancer, probe, wdl, start_node): input_dir = os.path.realpath(input_dir) output_dir = os.path.realpath(output_dir) wdl = os.path.realpath(wdl) catecodes = get_catecode(project, start_node=start_node) arg = { "pipeline.tumor": barcode, "pipeline.normal": normal, "pipeline.umi": umi, "pipeline.input_dir": input_dir, "pipeline.output_dir": output_dir, "pipeline.project": project, "pipeline.cancer": cancer, "pipeline.probe": probe, "pipeline.catecode": catecodes } arg = {key: value for key, value in arg.items() if value not in (None, '', False)} # generate json logname = datetime.now().strftime("%m%d%H%M") jsfile_path = os.path.join(output_dir, f'{barcode}_{logname}.json') with open(jsfile_path, 'w') as jsfile: jsfile.write(json.dumps(arg, indent=4, ensure_ascii=False)) # run pipeline cmd3 = f'cd {output_dir}' cmd4 = f'/usr/bin/java -DLOG_MODE=standard ' \ f'-Dconfig.file=$WORKFLOW/codes/cromwell.examples.conf ' \ f'-jar $WORKFLOW/software/cromwell-51.jar run {wdl} --inputs {jsfile_path} ' cmd = f'{cmd3}; {cmd4}' # 记录开始时间 start_time = time.time() ret = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding="utf-8") pidnum = ret.pid with open(os.path.join(output_dir, 'pid'), 'w') as pidfile: pidfile.write(str(pidnum)) # ret.wait() # 等待进程完成,并获取输出和错误 output, error = ret.communicate() print(output, error) stdout = open(os.path.join(output_dir, f'{args.barcode}_{logname}_stdout.log'), 'w') stderr = open(os.path.join(output_dir, f'{args.barcode}_{logname}_stderr.log'), 'w') # 记录结束时间 end_time = time.time() # 计算运行时间 elapsed_time = end_time - start_time # 使用正则表达式提取UUID match = re.search(r'UUID\(([^)]+)\)', output, re.MULTILINE) workflow_id = '' if match: workflow_id = match.group(1) stdout.write(f"任务 ID: {workflow_id}\n\n") else: stderr.write("未提取到任务 ID\n\n") stdout.write("任务耗时: {:.2f} 秒\n\n".format(elapsed_time)) workflow_path = os.path.join(output_dir, 'cromwell-executions', 'pipeline', workflow_id) stdout.write(f'workflow 地址: {workflow_path}\n\n') stdout_files = list() stderr_files = list() for root, dirs, files in os.walk(workflow_path): for file in files: if str(file).endswith('stdout'): file_path = os.path.join(root, file) stdout_files.append(str(file_path)) if str(file).endswith('stderr'): file_path = os.path.join(root, file) stderr_files.append(str(file_path)) # 按照文件的修改时间倒序排序 sorted_stdout_files = sorted(stdout_files, key=lambda x: os.path.getmtime(x), reverse=True) sorted_stderr_files = sorted(stderr_files, key=lambda x: os.path.getmtime(x), reverse=True) for file_path in sorted_stdout_files: with open(file_path, 'r') as f: stdout.write('#' * 50 + '\n') stdout.write(file_path + '\n') stdout.write(f.read()) stdout.write('\n\n') for file_path in sorted_stderr_files: with open(file_path, 'r') as f: stderr.write('#' * 50 + '\n') stderr.write(file_path + '\n') stderr.write(f.read()) stderr.write('\n\n') stdout.close() stderr.close() if __name__ == '__main__': parser = argparse.ArgumentParser(description="JM to run pipeline") parser.add_argument('-n', '--barcode', help="sample's barcode", required=True) parser.add_argument('-s', '--normal', help="sample's normal", default='', required=False, nargs='?') parser.add_argument('-u', '--umi', action='store_true', help="is umi sample", default=False) parser.add_argument('-i', '--input_dir', help="sample's input_dir/workdir", required=True) parser.add_argument('-o', '--output_dir', help="Output directory, default ./", default='./') parser.add_argument('-p', '--project', help="project", required=True) parser.add_argument('-c', '--cancer', help="cancer", required=True) parser.add_argument('-b', '--probe', help="probe, 682, 624, 160, 17 for now ", required=True) parser.add_argument('-w', '--wdl', help="wdl", default='$WORKFLOW/pipeline.wdl') parser.add_argument('-node', '--start_node', help="node begain to run; 'addQc', 'addAlignment', " "'addTarget', 'addFusion', 'addCnv', 'addMsi', 'addChemo'," " 'addHcs, 'addTmb', 'addNeoantigen', 'addPollution', addAutoReport' " "and also run more than one node ," "like this 'addTarget,addFusion'", default='addQc') args = parser.parse_args() run(args.barcode, args.normal, args.umi, args.input_dir, args.output_dir, args.project, args.cancer, args.probe, args.wdl, args.start_node)