pipeline/codes/run_wdl.py

222 lines
8.4 KiB
Python
Executable File

#! /usr/bin/env python3
import argparse
import json
import os
import re
import subprocess
import time
from datetime import datetime
import pandas as pd
import requests
# 创建全局的 Session 对象
session = requests.Session()
def get_branch_nodes(graph, start_node):
def dfs(node):
if node not in visited:
visited.add(node)
branch_nodes.add(node)
for neighbor in graph[node]:
dfs(neighbor)
branch_nodes = set()
visited = set()
for split_start_node in start_node.split(','):
dfs(split_start_node.strip())
return branch_nodes
def get_catecode(project, start_node='addQc'):
needcode = ['addQc', 'addAlignment', 'addTarget', 'addFusion', 'addCnv', 'addMsi', 'addChemo', 'addHcs',
'addAutoReport', 'addTmb', 'addPollution', 'addNeoantigen']
dag = {
'addQc': ['addAlignment'],
'addAlignment': ['addTarget', 'addFusion', 'addCnv', 'addMsi', 'addNeoantigen'],
'addTarget': ['addChemo', 'addHcs', 'addTmb', 'addPollution'],
'addFusion': ['addAutoReport'],
'addCnv': ['addAutoReport'],
'addMsi': ['addAutoReport'],
'addChemo': ['addAutoReport'],
'addHcs': ['addAutoReport'],
'addTmb': ['addAutoReport'],
'addPollution': ['addAutoReport'],
'addNeoantigen': ['addAutoReport'],
'addAutoReport': []
}
runcode = get_branch_nodes(dag, start_node)
public = os.environ.get('DATABASE')
path = os.path.join(public, 'info.csv')
info = pd.read_csv(path)
project = info[info['project'] == project]['CateCode']
catecode = project.values[0].split(';') if not project.empty else []
catecode.extend(['addQc', 'addAlignment', 'addAutoReport', 'addPollution'])
cate_dict = dict()
for ele in needcode:
if ele in runcode and ele in catecode:
cate_dict[ele] = True
else:
cate_dict[ele] = False
return cate_dict
def send_ding(msg, isat=True):
'''
发送钉钉消息功能
'''
url = 'https://oapi.dingtalk.com/robot/send?access_token=0c4b2dc1b6a1b459826512cc27adbd14e4f6aa2e661b7a7c284669065bbccfc5'
# url = 'https://oapi.dingtalk.com/robot/send?access_token=d4a0749cc7ff87bd12079a79dd74ca3423becb1ce161c3088acc6628a7a188dd'
data = {"msgtype": "text", "text": {"content": "pipeline:" + str(msg)}, "at": {"isAtAll": isat}}
headers = {'Content-Type': 'application/json;charset=UTF-8'}
send_data = json.dumps(data).encode('utf-8')
session.post(url=url, data=send_data, headers=headers)
print(msg)
def run(barcode, normal, umi, input_dir, output_dir, project, cancer, probe, wdl, start_node, debug):
input_dir = os.path.realpath(input_dir)
output_dir = os.path.realpath(output_dir)
wdl = os.path.realpath(wdl)
catecodes = get_catecode(project, start_node=start_node)
arg = {
"pipeline.tumor": barcode,
"pipeline.normal": normal,
"pipeline.umi": umi,
"pipeline.input_dir": input_dir,
"pipeline.output_dir": output_dir,
"pipeline.project": project,
"pipeline.cancer": cancer,
"pipeline.probe": probe,
"pipeline.catecode": catecodes
}
arg = {key: value for key, value in arg.items() if value not in (None, '', False)}
# generate json
logname = datetime.now().strftime("%m%d%H%M")
jsfile_path = os.path.join(output_dir, f'{barcode}_{logname}.json')
with open(jsfile_path, 'w') as jsfile:
jsfile.write(json.dumps(arg, indent=4, ensure_ascii=False))
# run pipeline
cmd3 = f'cd {output_dir}'
cmd4 = f'/usr/bin/java -DLOG_MODE=standard ' \
f'-Dconfig.file=$WORKFLOW/codes/cromwell.examples.conf ' \
f'-jar $WORKFLOW/software/cromwell-51.jar run {wdl} --inputs {jsfile_path} '
cmd = f'{cmd3}; {cmd4}'
# 记录开始时间
start_time = time.time()
if not debug:
send_ding(msg=f'\n样本: {barcode}\n分析地址: \n{output_dir} \n 开始分析', isat=False)
ret = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding="utf-8")
pidnum = ret.pid
with open(os.path.join(output_dir, 'pid'), 'w') as pidfile:
pidfile.write(str(pidnum))
# ret.wait()
# 等待进程完成,并获取输出和错误
output, error = ret.communicate()
print(output, error)
stdout = open(os.path.join(output_dir, f'{args.barcode}_{logname}_stdout.log'), 'w')
stderr = open(os.path.join(output_dir, f'{args.barcode}_{logname}_stderr.log'), 'w')
# 记录结束时间
end_time = time.time()
# 计算运行时间
elapsed_time = end_time - start_time
# 使用正则表达式提取UUID
match = re.search(r'UUID\(([^)]+)\)', output, re.MULTILINE)
workflow_id = ''
if match:
workflow_id = match.group(1)
stdout.write(f"任务 ID: {workflow_id}\n\n")
else:
stderr.write("未提取到任务 ID\n\n")
stdout.write("任务耗时: {:.2f}\n\n".format(elapsed_time))
workflow_path = os.path.join(output_dir, 'cromwell-executions', 'pipeline', workflow_id)
stdout.write(f'workflow 地址: {workflow_path}\n\n')
stdout_files = list()
stderr_files = list()
for root, dirs, files in os.walk(workflow_path):
for file in files:
if str(file).endswith('stdout'):
file_path = os.path.join(root, file)
stdout_files.append(str(file_path))
if str(file).endswith('stderr'):
file_path = os.path.join(root, file)
stderr_files.append(str(file_path))
# 按照文件的修改时间倒序排序
sorted_stdout_files = sorted(stdout_files, key=lambda x: os.path.getmtime(x), reverse=True)
sorted_stderr_files = sorted(stderr_files, key=lambda x: os.path.getmtime(x), reverse=True)
for file_path in sorted_stdout_files:
with open(file_path, 'r') as f:
stdout.write('#' * 50 + '\n')
stdout.write(file_path + '\n')
f_read = f.read()
stdout.write(f_read)
stdout.write('\n\n')
for file_path in sorted_stderr_files:
with open(file_path, 'r') as f:
stderr.write('#' * 50 + '\n')
stderr.write(file_path + '\n')
stderr.write(f.read())
stderr.write('\n\n')
qcfail_f = os.path.join(output_dir, 'report', f'{barcode}_qcfail.txt')
if os.path.exists(qcfail_f):
with open(qcfail_f, "r") as qcfail_f_out:
qcfail = qcfail_f_out.read()
else:
qcfail = '流程失败,请核查!!!'
if not qcfail:
qcfail = '合格'
if not debug:
send_ding(msg=f'\n样本: {barcode}\n分析地址: \n{output_dir} \n 分析完成,请查看!\n\n qcfail: \n {qcfail}\n')
stdout.close()
stderr.close()
if __name__ == '__main__':
parser = argparse.ArgumentParser(description="JM to run pipeline")
parser.add_argument('-n', '--barcode', help="sample's barcode", required=True)
parser.add_argument('-s', '--normal', help="sample's normal", default='', required=False, nargs='?')
parser.add_argument('-u', '--umi', action='store_true', help="is umi sample", default=False)
parser.add_argument('-i', '--input_dir', help="sample's input_dir/workdir", required=True)
parser.add_argument('-o', '--output_dir', help="Output directory, default ./", default='./')
parser.add_argument('-p', '--project', help="project", required=True)
parser.add_argument('-c', '--cancer', help="cancer", required=True)
parser.add_argument('-b', '--probe', help="probe, 682, 624, 160, 17 for now ", required=True)
parser.add_argument('-w', '--wdl', help="wdl", default='$WORKFLOW/pipeline.wdl')
parser.add_argument('-node', '--start_node',
help="node begain to run; 'addQc', 'addAlignment', "
"'addTarget', 'addFusion', 'addCnv', 'addMsi', 'addChemo',"
" 'addHcs, 'addTmb', 'addNeoantigen', 'addPollution', addAutoReport' "
"and also run more than one node ,"
"like this 'addTarget,addFusion'",
default='addQc')
parser.add_argument('-d', '--debug', action='store_true', help="debug", default=False)
args = parser.parse_args()
run(args.barcode, args.normal, args.umi, args.input_dir, args.output_dir,
args.project, args.cancer, args.probe, args.wdl, args.start_node, args.debug)
session.close()