222 lines
8.4 KiB
Python
Executable File
222 lines
8.4 KiB
Python
Executable File
#! /usr/bin/env python3
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import re
|
|
import subprocess
|
|
import time
|
|
from datetime import datetime
|
|
|
|
import pandas as pd
|
|
import requests
|
|
|
|
# 创建全局的 Session 对象
|
|
session = requests.Session()
|
|
|
|
|
|
def get_branch_nodes(graph, start_node):
|
|
def dfs(node):
|
|
if node not in visited:
|
|
visited.add(node)
|
|
branch_nodes.add(node)
|
|
for neighbor in graph[node]:
|
|
dfs(neighbor)
|
|
|
|
branch_nodes = set()
|
|
visited = set()
|
|
for split_start_node in start_node.split(','):
|
|
dfs(split_start_node.strip())
|
|
return branch_nodes
|
|
|
|
|
|
def get_catecode(project, start_node='addQc'):
|
|
needcode = ['addQc', 'addAlignment', 'addTarget', 'addFusion', 'addCnv', 'addMsi', 'addChemo', 'addHcs',
|
|
'addAutoReport', 'addTmb', 'addPollution', 'addNeoantigen']
|
|
dag = {
|
|
'addQc': ['addAlignment'],
|
|
'addAlignment': ['addTarget', 'addFusion', 'addCnv', 'addMsi', 'addNeoantigen'],
|
|
'addTarget': ['addChemo', 'addHcs', 'addTmb', 'addPollution'],
|
|
'addFusion': ['addAutoReport'],
|
|
'addCnv': ['addAutoReport'],
|
|
'addMsi': ['addAutoReport'],
|
|
'addChemo': ['addAutoReport'],
|
|
'addHcs': ['addAutoReport'],
|
|
'addTmb': ['addAutoReport'],
|
|
'addPollution': ['addAutoReport'],
|
|
'addNeoantigen': ['addAutoReport'],
|
|
'addAutoReport': []
|
|
}
|
|
|
|
runcode = get_branch_nodes(dag, start_node)
|
|
public = os.environ.get('DATABASE')
|
|
path = os.path.join(public, 'info.csv')
|
|
info = pd.read_csv(path)
|
|
project = info[info['project'] == project]['CateCode']
|
|
catecode = project.values[0].split(';') if not project.empty else []
|
|
|
|
catecode.extend(['addQc', 'addAlignment', 'addAutoReport', 'addPollution'])
|
|
cate_dict = dict()
|
|
for ele in needcode:
|
|
if ele in runcode and ele in catecode:
|
|
cate_dict[ele] = True
|
|
else:
|
|
cate_dict[ele] = False
|
|
|
|
return cate_dict
|
|
|
|
|
|
def send_ding(msg, isat=True):
|
|
'''
|
|
发送钉钉消息功能
|
|
'''
|
|
url = 'https://oapi.dingtalk.com/robot/send?access_token=0c4b2dc1b6a1b459826512cc27adbd14e4f6aa2e661b7a7c284669065bbccfc5'
|
|
# url = 'https://oapi.dingtalk.com/robot/send?access_token=d4a0749cc7ff87bd12079a79dd74ca3423becb1ce161c3088acc6628a7a188dd'
|
|
data = {"msgtype": "text", "text": {"content": "pipeline:" + str(msg)}, "at": {"isAtAll": isat}}
|
|
headers = {'Content-Type': 'application/json;charset=UTF-8'}
|
|
send_data = json.dumps(data).encode('utf-8')
|
|
session.post(url=url, data=send_data, headers=headers)
|
|
print(msg)
|
|
|
|
|
|
def run(barcode, normal, umi, input_dir, output_dir, project, cancer, probe, wdl, start_node, debug):
|
|
input_dir = os.path.realpath(input_dir)
|
|
output_dir = os.path.realpath(output_dir)
|
|
wdl = os.path.realpath(wdl)
|
|
catecodes = get_catecode(project, start_node=start_node)
|
|
|
|
arg = {
|
|
"pipeline.tumor": barcode,
|
|
"pipeline.normal": normal,
|
|
"pipeline.umi": umi,
|
|
"pipeline.input_dir": input_dir,
|
|
"pipeline.output_dir": output_dir,
|
|
"pipeline.project": project,
|
|
"pipeline.cancer": cancer,
|
|
"pipeline.probe": probe,
|
|
"pipeline.catecode": catecodes
|
|
|
|
}
|
|
|
|
arg = {key: value for key, value in arg.items() if value not in (None, '', False)}
|
|
|
|
# generate json
|
|
logname = datetime.now().strftime("%m%d%H%M")
|
|
jsfile_path = os.path.join(output_dir, f'{barcode}_{logname}.json')
|
|
with open(jsfile_path, 'w') as jsfile:
|
|
jsfile.write(json.dumps(arg, indent=4, ensure_ascii=False))
|
|
|
|
# run pipeline
|
|
cmd3 = f'cd {output_dir}'
|
|
|
|
cmd4 = f'/usr/bin/java -DLOG_MODE=standard ' \
|
|
f'-Dconfig.file=$WORKFLOW/codes/cromwell.examples.conf ' \
|
|
f'-jar $WORKFLOW/software/cromwell-51.jar run {wdl} --inputs {jsfile_path} '
|
|
cmd = f'{cmd3}; {cmd4}'
|
|
|
|
# 记录开始时间
|
|
start_time = time.time()
|
|
if not debug:
|
|
send_ding(msg=f'\n样本: {barcode}\n分析地址: \n{output_dir} \n 开始分析', isat=False)
|
|
ret = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding="utf-8")
|
|
pidnum = ret.pid
|
|
with open(os.path.join(output_dir, 'pid'), 'w') as pidfile:
|
|
pidfile.write(str(pidnum))
|
|
# ret.wait()
|
|
# 等待进程完成,并获取输出和错误
|
|
output, error = ret.communicate()
|
|
print(output, error)
|
|
|
|
stdout = open(os.path.join(output_dir, f'{args.barcode}_{logname}_stdout.log'), 'w')
|
|
stderr = open(os.path.join(output_dir, f'{args.barcode}_{logname}_stderr.log'), 'w')
|
|
|
|
# 记录结束时间
|
|
end_time = time.time()
|
|
# 计算运行时间
|
|
elapsed_time = end_time - start_time
|
|
|
|
# 使用正则表达式提取UUID
|
|
match = re.search(r'UUID\(([^)]+)\)', output, re.MULTILINE)
|
|
|
|
workflow_id = ''
|
|
if match:
|
|
workflow_id = match.group(1)
|
|
stdout.write(f"任务 ID: {workflow_id}\n\n")
|
|
else:
|
|
stderr.write("未提取到任务 ID\n\n")
|
|
|
|
stdout.write("任务耗时: {:.2f} 秒\n\n".format(elapsed_time))
|
|
|
|
workflow_path = os.path.join(output_dir, 'cromwell-executions', 'pipeline', workflow_id)
|
|
stdout.write(f'workflow 地址: {workflow_path}\n\n')
|
|
|
|
stdout_files = list()
|
|
stderr_files = list()
|
|
for root, dirs, files in os.walk(workflow_path):
|
|
for file in files:
|
|
if str(file).endswith('stdout'):
|
|
file_path = os.path.join(root, file)
|
|
stdout_files.append(str(file_path))
|
|
if str(file).endswith('stderr'):
|
|
file_path = os.path.join(root, file)
|
|
stderr_files.append(str(file_path))
|
|
|
|
# 按照文件的修改时间倒序排序
|
|
sorted_stdout_files = sorted(stdout_files, key=lambda x: os.path.getmtime(x), reverse=True)
|
|
sorted_stderr_files = sorted(stderr_files, key=lambda x: os.path.getmtime(x), reverse=True)
|
|
for file_path in sorted_stdout_files:
|
|
with open(file_path, 'r') as f:
|
|
stdout.write('#' * 50 + '\n')
|
|
stdout.write(file_path + '\n')
|
|
f_read = f.read()
|
|
stdout.write(f_read)
|
|
stdout.write('\n\n')
|
|
|
|
for file_path in sorted_stderr_files:
|
|
with open(file_path, 'r') as f:
|
|
stderr.write('#' * 50 + '\n')
|
|
stderr.write(file_path + '\n')
|
|
stderr.write(f.read())
|
|
stderr.write('\n\n')
|
|
|
|
qcfail_f = os.path.join(output_dir, 'report', f'{barcode}_qcfail.txt')
|
|
if os.path.exists(qcfail_f):
|
|
with open(qcfail_f, "r") as qcfail_f_out:
|
|
qcfail = qcfail_f_out.read()
|
|
else:
|
|
qcfail = '流程失败,请核查!!!'
|
|
if not qcfail:
|
|
qcfail = '合格'
|
|
if not debug:
|
|
send_ding(msg=f'\n样本: {barcode}\n分析地址: \n{output_dir} \n 分析完成,请查看!\n\n qcfail: \n {qcfail}\n')
|
|
|
|
stdout.close()
|
|
stderr.close()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
parser = argparse.ArgumentParser(description="JM to run pipeline")
|
|
|
|
parser.add_argument('-n', '--barcode', help="sample's barcode", required=True)
|
|
parser.add_argument('-s', '--normal', help="sample's normal", default='', required=False, nargs='?')
|
|
parser.add_argument('-u', '--umi', action='store_true', help="is umi sample", default=False)
|
|
parser.add_argument('-i', '--input_dir', help="sample's input_dir/workdir", required=True)
|
|
parser.add_argument('-o', '--output_dir', help="Output directory, default ./", default='./')
|
|
parser.add_argument('-p', '--project', help="project", required=True)
|
|
parser.add_argument('-c', '--cancer', help="cancer", required=True)
|
|
parser.add_argument('-b', '--probe', help="probe, 682, 624, 160, 17 for now ", required=True)
|
|
parser.add_argument('-w', '--wdl', help="wdl", default='$WORKFLOW/pipeline.wdl')
|
|
parser.add_argument('-node', '--start_node',
|
|
help="node begain to run; 'addQc', 'addAlignment', "
|
|
"'addTarget', 'addFusion', 'addCnv', 'addMsi', 'addChemo',"
|
|
" 'addHcs, 'addTmb', 'addNeoantigen', 'addPollution', addAutoReport' "
|
|
"and also run more than one node ,"
|
|
"like this 'addTarget,addFusion'",
|
|
default='addQc')
|
|
parser.add_argument('-d', '--debug', action='store_true', help="debug", default=False)
|
|
args = parser.parse_args()
|
|
|
|
run(args.barcode, args.normal, args.umi, args.input_dir, args.output_dir,
|
|
args.project, args.cancer, args.probe, args.wdl, args.start_node, args.debug)
|
|
session.close()
|