pipeline/codes/run_wdl.py

211 lines
7.9 KiB
Python
Raw Normal View History

2024-01-01 14:25:34 +08:00
#! /usr/bin/env python3
2023-11-01 10:09:29 +08:00
import argparse
import json
import os
2024-01-02 02:01:20 +08:00
import re
2023-11-01 10:09:29 +08:00
import subprocess
import time
2023-12-25 14:06:30 +08:00
from datetime import datetime
2023-11-01 10:09:29 +08:00
2023-11-29 15:13:30 +08:00
import pandas as pd
2024-01-03 17:18:03 +08:00
import requests
2023-11-01 10:09:29 +08:00
2024-01-04 13:32:19 +08:00
# 创建全局的 Session 对象
session = requests.Session()
2023-11-29 15:13:30 +08:00
def get_branch_nodes(graph, start_node):
def dfs(node):
if node not in visited:
visited.add(node)
branch_nodes.add(node)
for neighbor in graph[node]:
dfs(neighbor)
branch_nodes = set()
visited = set()
for split_start_node in start_node.split(','):
dfs(split_start_node.strip())
return branch_nodes
def get_catecode(project, start_node='addQc'):
needcode = ['addQc', 'addAlignment', 'addTarget', 'addFusion', 'addCnv', 'addMsi', 'addChemo', 'addHcs',
2023-12-19 13:37:52 +08:00
'addAutoReport', 'addTmb', 'addPollution', 'addNeoantigen']
2023-11-29 15:13:30 +08:00
dag = {
'addQc': ['addAlignment'],
2023-12-19 13:37:52 +08:00
'addAlignment': ['addTarget', 'addFusion', 'addCnv', 'addMsi', 'addNeoantigen'],
'addTarget': ['addChemo', 'addHcs', 'addTmb', 'addPollution'],
2023-11-29 15:13:30 +08:00
'addFusion': ['addAutoReport'],
'addCnv': ['addAutoReport'],
'addMsi': ['addAutoReport'],
'addChemo': ['addAutoReport'],
'addHcs': ['addAutoReport'],
'addTmb': ['addAutoReport'],
2023-12-19 13:37:52 +08:00
'addPollution': ['addAutoReport'],
'addNeoantigen': ['addAutoReport'],
2023-11-29 15:13:30 +08:00
'addAutoReport': []
}
runcode = get_branch_nodes(dag, start_node)
2023-11-30 15:31:35 +08:00
public = os.environ.get('DATABASE')
2023-11-29 15:13:30 +08:00
path = os.path.join(public, 'info.csv')
info = pd.read_csv(path)
project = info[info['project'] == project]['CateCode']
catecode = project.values[0].split(';') if not project.empty else []
2023-12-19 13:37:52 +08:00
catecode.extend(['addQc', 'addAlignment', 'addAutoReport', 'addPollution'])
2023-11-29 15:13:30 +08:00
cate_dict = dict()
for ele in needcode:
if ele in runcode and ele in catecode:
cate_dict[ele] = True
else:
cate_dict[ele] = False
return cate_dict
2024-01-03 17:18:03 +08:00
def send_ding(msg):
'''
发送钉钉消息功能
'''
2024-01-04 13:32:19 +08:00
url = 'https://oapi.dingtalk.com/robot/send?access_token=0c4b2dc1b6a1b459826512cc27adbd14e4f6aa2e661b7a7c284669065bbccfc5'
2024-01-03 17:18:03 +08:00
data = {"msgtype": "text", "text": {"content": "pipeline:" + str(msg)}, "at": {"isAtAll": True}}
headers = {'Content-Type': 'application/json;charset=UTF-8'}
send_data = json.dumps(data).encode('utf-8')
2024-01-04 13:32:19 +08:00
session.post(url=url, data=send_data, headers=headers)
print(msg)
2024-01-03 17:18:03 +08:00
2024-01-04 13:32:19 +08:00
def run(barcode, normal, umi, input_dir, output_dir, project, cancer, probe, wdl, start_node, debug):
2023-11-01 10:09:29 +08:00
input_dir = os.path.realpath(input_dir)
output_dir = os.path.realpath(output_dir)
wdl = os.path.realpath(wdl)
2023-11-29 15:13:30 +08:00
catecodes = get_catecode(project, start_node=start_node)
2023-11-01 10:09:29 +08:00
arg = {
"pipeline.tumor": barcode,
"pipeline.normal": normal,
"pipeline.umi": umi,
"pipeline.input_dir": input_dir,
"pipeline.output_dir": output_dir,
"pipeline.project": project,
"pipeline.cancer": cancer,
2023-11-29 15:13:30 +08:00
"pipeline.probe": probe,
"pipeline.catecode": catecodes
2023-11-01 10:09:29 +08:00
}
arg = {key: value for key, value in arg.items() if value not in (None, '', False)}
# generate json
2023-12-25 14:06:30 +08:00
logname = datetime.now().strftime("%m%d%H%M")
jsfile_path = os.path.join(output_dir, f'{barcode}_{logname}.json')
2023-11-01 10:09:29 +08:00
with open(jsfile_path, 'w') as jsfile:
jsfile.write(json.dumps(arg, indent=4, ensure_ascii=False))
# run pipeline
cmd3 = f'cd {output_dir}'
2023-12-28 09:14:58 +08:00
cmd4 = f'/usr/bin/java -DLOG_MODE=standard ' \
2023-12-26 10:18:15 +08:00
f'-Dconfig.file=$WORKFLOW/codes/cromwell.examples.conf ' \
2023-12-28 09:14:58 +08:00
f'-jar $WORKFLOW/software/cromwell-51.jar run {wdl} --inputs {jsfile_path} '
2023-11-30 15:31:35 +08:00
cmd = f'{cmd3}; {cmd4}'
2023-11-01 10:09:29 +08:00
# 记录开始时间
start_time = time.time()
2024-01-04 13:32:19 +08:00
if not debug:
send_ding(msg=f'\n样本: {barcode}\n分析地址: \n{output_dir} \n 开始分析')
2023-11-01 10:09:29 +08:00
ret = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding="utf-8")
pidnum = ret.pid
with open(os.path.join(output_dir, 'pid'), 'w') as pidfile:
pidfile.write(str(pidnum))
2024-01-01 14:25:34 +08:00
# ret.wait()
# 等待进程完成,并获取输出和错误
output, error = ret.communicate()
2024-01-02 02:01:20 +08:00
print(output, error)
stdout = open(os.path.join(output_dir, f'{args.barcode}_{logname}_stdout.log'), 'w')
stderr = open(os.path.join(output_dir, f'{args.barcode}_{logname}_stderr.log'), 'w')
2023-11-01 10:09:29 +08:00
# 记录结束时间
end_time = time.time()
# 计算运行时间
elapsed_time = end_time - start_time
2024-01-02 02:01:20 +08:00
# 使用正则表达式提取UUID
match = re.search(r'UUID\(([^)]+)\)', output, re.MULTILINE)
workflow_id = ''
if match:
workflow_id = match.group(1)
stdout.write(f"任务 ID: {workflow_id}\n\n")
else:
stderr.write("未提取到任务 ID\n\n")
stdout.write("任务耗时: {:.2f}\n\n".format(elapsed_time))
workflow_path = os.path.join(output_dir, 'cromwell-executions', 'pipeline', workflow_id)
stdout.write(f'workflow 地址: {workflow_path}\n\n')
stdout_files = list()
stderr_files = list()
for root, dirs, files in os.walk(workflow_path):
for file in files:
if str(file).endswith('stdout'):
file_path = os.path.join(root, file)
stdout_files.append(str(file_path))
if str(file).endswith('stderr'):
file_path = os.path.join(root, file)
stderr_files.append(str(file_path))
# 按照文件的修改时间倒序排序
sorted_stdout_files = sorted(stdout_files, key=lambda x: os.path.getmtime(x), reverse=True)
sorted_stderr_files = sorted(stderr_files, key=lambda x: os.path.getmtime(x), reverse=True)
for file_path in sorted_stdout_files:
with open(file_path, 'r') as f:
stdout.write('#' * 50 + '\n')
stdout.write(file_path + '\n')
stdout.write(f.read())
stdout.write('\n\n')
for file_path in sorted_stderr_files:
with open(file_path, 'r') as f:
stderr.write('#' * 50 + '\n')
stderr.write(file_path + '\n')
stderr.write(f.read())
stderr.write('\n\n')
2024-01-04 13:32:19 +08:00
if not debug:
send_ding(msg=f'\n样本: {barcode}\n分析地址: \n{output_dir} \n 分析完成,请查看!')
2024-01-02 02:01:20 +08:00
stdout.close()
stderr.close()
2023-11-01 10:09:29 +08:00
if __name__ == '__main__':
parser = argparse.ArgumentParser(description="JM to run pipeline")
parser.add_argument('-n', '--barcode', help="sample's barcode", required=True)
parser.add_argument('-s', '--normal', help="sample's normal", default='', required=False, nargs='?')
parser.add_argument('-u', '--umi', action='store_true', help="is umi sample", default=False)
parser.add_argument('-i', '--input_dir', help="sample's input_dir/workdir", required=True)
parser.add_argument('-o', '--output_dir', help="Output directory, default ./", default='./')
parser.add_argument('-p', '--project', help="project", required=True)
parser.add_argument('-c', '--cancer', help="cancer", required=True)
2023-11-29 15:13:30 +08:00
parser.add_argument('-b', '--probe', help="probe, 682, 624, 160, 17 for now ", required=True)
2023-12-25 14:06:30 +08:00
parser.add_argument('-w', '--wdl', help="wdl", default='$WORKFLOW/pipeline.wdl')
2023-11-29 15:13:30 +08:00
parser.add_argument('-node', '--start_node',
help="node begain to run; 'addQc', 'addAlignment', "
"'addTarget', 'addFusion', 'addCnv', 'addMsi', 'addChemo',"
2023-12-19 13:37:52 +08:00
" 'addHcs, 'addTmb', 'addNeoantigen', 'addPollution', addAutoReport' "
"and also run more than one node ,"
2023-11-30 15:31:35 +08:00
"like this 'addTarget,addFusion'",
2023-11-29 15:13:30 +08:00
default='addQc')
2024-01-04 13:32:19 +08:00
parser.add_argument('-d', '--debug', action='store_true', help="debug", default=False)
2023-11-01 10:09:29 +08:00
args = parser.parse_args()
run(args.barcode, args.normal, args.umi, args.input_dir, args.output_dir,
2024-01-04 13:32:19 +08:00
args.project, args.cancer, args.probe, args.wdl, args.start_node, args.debug)
session.close()