pipeline/codes/postprocess.py

579 lines
24 KiB
Python
Raw Normal View History

2023-11-29 15:13:30 +08:00
#! /usr/bin/env python3
import argparse
import glob
import json
import os
import re
from collections import defaultdict
import openpyxl
import pandas as pd
from openpyxl.drawing.image import Image
def database():
"""
增持匹配信息
"""
2023-11-30 15:31:35 +08:00
path = os.environ.get('DATABASE')
if not path:
raise UserWarning('未设置DATABASE环境变量')
2023-11-29 15:13:30 +08:00
# gene function 基因功能描述信息
gene_function_path = os.path.join(path, 'gene_function.txt')
gene_function_df = pd.read_csv(gene_function_path, sep='\t')
gene_function_df = gene_function_df.fillna('.')
gene_function_df['genename'] = gene_function_df.apply(lambda x: x['基因'].upper(), axis=1)
# return {基因: 基因功能翻译}
gene_function = gene_function_df.set_index('genename')['基因功能翻译'].to_dict()
# drug_mechanism 药物描述信息
drug_mechanism_path = os.path.join(path, 'target_drug.txt')
drug_mechanism_df = pd.read_csv(drug_mechanism_path, usecols=list(range(18)), sep='\t')
drug_mechanism_df = drug_mechanism_df.fillna('.')
drug_mechanism_df['drugs'] = drug_mechanism_df.apply(lambda x: '|'.join(
[tm.upper() for tm in [x['药物名称-英文'], x['药物名称-中文']] if tm != '.']), axis=1)
drug_mechanism_df['dec'] = drug_mechanism_df.apply(lambda x: "||".join(
[tm for tm in [x['适应症(中文)'], x['作用机制-中文']] if tm != '.']), axis=1)
# return {drugA: drugAdes\\\\drugAdes中文 drugA中文: drugAdes\\\\drugAdes中文}
drug_mechanism = {}
for _, row in drug_mechanism_df.iterrows():
drug_mechanism.update(dict(zip(row['drugs'].split('|'), [row['dec']] * len(row['drugs'].split('|')))))
return dict(gene_function=gene_function, drug_mechanism=drug_mechanism)
def check_file_exist_and_empty(file_path):
# 检查文件是否存在
if not os.path.exists(file_path):
return f"文件 '{file_path}' 不存在"
# 检查文件是否为空
if os.path.getsize(file_path) == 0:
return f"文件 '{file_path}' 是空文件"
def tree():
return defaultdict(tree)
class PostProcess:
"""
excel处理
"""
def __init__(self, sample_name, normal_name, path, outpath):
self.path = path
self.outpath = outpath
self.sample_name = sample_name
self.normal_name = normal_name
# self.needcol = self.need_col()
self.database = database()
self.drug_record = list()
self.sheet = tree()
@staticmethod
def txt_2_excel(path):
try:
df = pd.read_csv(path, sep='\t')
2023-12-25 14:06:30 +08:00
except (pd.errors.EmptyDataError, FileNotFoundError):
2023-11-29 15:13:30 +08:00
return []
return df.to_dict('records')
@staticmethod
def _add_columns(df: pd.DataFrame):
"""
添加必须列经过转换得到新列这个应该后期最好在注释的时候进行
"""
# 证据等级转换
if 'Label' in df.columns and '证据等级' in df.columns:
df['证据等级'] = df.apply(lambda x: 'C' if x['Label'] == '非适应症' else x['证据等级'], axis=1)
# 突变的等级分类
if '证据等级' in df.columns:
df['AMP_mut_level'] = df['证据等级'].replace(['A', 'B', 'C', 'D'], ['I', 'I', 'II', 'II'])
# 药物等级划分
if 'Response_Type_C' in df.columns and '证据等级' in df.columns:
df['Drug_Category'] = 'c'
df.loc[df['Response_Type_C'].str.contains('敏感') & (df['证据等级'] == 'A'), 'Drug_Category'] = 'a'
# df.loc[df['Response_Type_C'].str.startswith('敏感') & (df['证据等级'] == 'C'), 'Drug_Category'] = 'b'
df.loc[df['Response_Type_C'].str.contains('敏感') & (df['Label'] == '非适应症'), 'Drug_Category'] = 'b'
df.loc[df['Response_Type_C'].str.contains('耐药'), 'Drug_Category'] = 'd'
# 胚系突变分类 通过 CLNSIG列区分突变等级
if 'CLNSIG' in df.columns:
df['ACMG_level'] = '3'
df.loc[df['CLNSIG'].str.lower().str.startswith('likely_pathogenic'), 'ACMG_level'] = '2'
df.loc[(df['CLNSIG'].str.lower().str.startswith('pathogenic')), 'ACMG_level'] = '1'
return df
@staticmethod
def _add_cal_col(df: pd.DataFrame):
try:
# 统计 'MutationTaster_pred', 'FATHMM_pred', 'MetaLR_pred' 的D的数目
tmdf2 = df[['MutationTaster_pred', 'FATHMM_pred', 'MetaLR_pred']]
df['Deleterious'] = tmdf2.apply(lambda x: x.tolist().count('D'), axis=1)
# 计算 这几列中最大的值
# eval(str(x)) 科学计数法转换
tmdf1 = df[
['1000g2015aug_all', '1000g2015aug_eas', 'esp6500siv2_all', 'ExAC_nontcga_ALL', 'ExAC_nontcga_EAS',
'gnomAD_genome_ALL', 'gnomAD_genome_EAS']].replace('.', 0).applymap(lambda x: eval(str(x)))
df['freq_high'] = tmdf1.max(axis=1)
except KeyError as e:
print(e)
return df
def _add_gene_function(self, df: pd.DataFrame, colname='Gene_refGene'):
if colname in df.columns:
df['Gene_function'] = df.apply(lambda x: self.database['gene_function'].get(x[colname], ''),
axis=1)
return df
def drug_parse(self, drugs: list):
"""
药物处理
"""
# 药物单独处理 sheetname
# for drug in drugs['药物中文名'].to_list():
for drug in drugs:
for spdrug in re.split(" \+ |,", drug):
self.drug_record.append(
{'drug_name': spdrug, 'drug_detail': self.database['drug_mechanism'].get(spdrug, '')})
def cms(self):
"""
样本信息
"""
# cms_files = glob.glob(os.path.join(self.path, 'qc', '*_post.json'))
cms_info_need = []
cms_file = os.path.join(self.path, 'qc', f'{self.sample_name}_post.json')
check_file = check_file_exist_and_empty(cms_file)
if check_file:
return cms_info_need
file_read = open(cms_file, 'r')
try:
cms_info = json.load(file_read)['data']
except Exception as e:
raise UserWarning('cms 文件加载有误!', e)
if not cms_info:
raise UserWarning('cms 内容为空!')
file_read.close()
df = pd.DataFrame(cms_info)
df['normal'] = self.normal_name
cms_info_need = df.to_dict('records')
self.sheet['cms'] = cms_info_need
def snv(self):
filter_sum = os.path.join(self.path, 'mutation',
f'{self.sample_name}.snp_indel.somatic.hg19_multianno.filter.sum.txt')
filter_pos = os.path.join(self.path, 'mutation',
f'{self.sample_name}.snp_indel.somatic.hg19_multianno.filter.sum.pos.txt')
filter_vus = os.path.join(self.path, 'mutation',
f'{self.sample_name}.snp_indel.somatic.hg19_multianno.filter.sum.vus.txt')
filter_neg = os.path.join(self.path, 'mutation',
f'{self.sample_name}.snp_indel.somatic.hg19_multianno.filter.sum.neg.txt')
2023-12-25 14:06:30 +08:00
tmb_file = os.path.join(self.path, 'tmb', f'{self.sample_name}.tmb.txt')
2023-11-29 15:13:30 +08:00
filter_sum_pos_res = list()
# 从pos_files中获取药物信息
pos_check = check_file_exist_and_empty(filter_pos)
pos_dict = dict()
if not pos_check:
pos = pd.read_csv(filter_pos, sep='\t')
# 添加常规列
pos = self._add_columns(pos)
# 添加基因功能
pos = self._add_gene_function(pos)
# 药物处理
self.drug_parse(pos['DrugCn'].to_list())
pos = pos.fillna('.')
grouped_df = pos.groupby('AAChange_refGene')
# 对每个分组进行操作
for group_name, group_data in grouped_df:
if any(group_data['AMP_mut_level'] == 'I'):
pos.loc[pos['AAChange_refGene'] == group_name, 'AMP_mut_level'] = 'I'
pos_dict = pos.set_index(['Chr', 'Start', 'End'])['AMP_mut_level'].to_dict()
filter_sum_pos_res = pos.to_dict('records')
# 从vus_file中获取vus等级分类 'III' 级
vus_dict = dict()
vus_check = check_file_exist_and_empty(filter_vus)
if not vus_check:
vus = pd.read_csv(filter_vus, sep='\t')
vus['AMP_mut_level'] = 'III'
vus_dict = vus.set_index(['Chr', 'Start', 'End'])['AMP_mut_level'].to_dict()
# 从neg_file中获取neg等级分类 'IIII' 级
neg_dict = dict()
neg_check = check_file_exist_and_empty(filter_neg)
if not neg_check:
neg = pd.read_csv(filter_neg, sep='\t')
neg['AMP_mut_level'] = 'IIII'
neg_dict = neg.set_index(['Chr', 'Start', 'End'])['AMP_mut_level'].to_dict()
2023-12-25 14:06:30 +08:00
filter_sum_df = pd.DataFrame()
2023-11-29 15:13:30 +08:00
filter_sum_check = check_file_exist_and_empty(filter_sum)
if not filter_sum_check:
filter_sum_df = pd.read_csv(filter_sum, sep='\t')
level_dict = dict()
level_dict.update(pos_dict)
level_dict.update(vus_dict)
level_dict.update(neg_dict)
filter_sum_df['AMP_mut_level'] = filter_sum_df.set_index(['Chr', 'Start', 'End']).index.map(level_dict)
2023-12-25 14:06:30 +08:00
cols = list(filter_sum_df.columns)
tmb_file_check = check_file_exist_and_empty(tmb_file)
if not tmb_file_check:
tmb_df = pd.read_csv(tmb_file, sep='\t')
key_cols = ['Chr', 'Start', 'End']
filter_sum_df = filter_sum_df.set_index(key_cols)
tmb_df = tmb_df.set_index(key_cols)
# 在filter_sum_df中的process列中追加字符串";tmb"对应tmb_df中的行 并且 是非 12类突变
filter_sum_df['process'] = filter_sum_df.index.map(
lambda x: filter_sum_df.at[x, 'process'] + ';tmb' if x in tmb_df.index and filter_sum_df.at[
x, 'AMP_mut_level'] not in ['I', 'II'] else filter_sum_df.at[x, 'process'])
# 找到tmb_df中不在filter_sum_df中的行并将这些新的行添加到filter_sum_df中
new_rows = tmb_df[~tmb_df.index.isin(filter_sum_df.index)]
filter_sum_df = pd.concat([filter_sum_df, new_rows])
# 重置索引
filter_sum_df = filter_sum_df.reset_index()
# 按之前列排
filter_sum_df = filter_sum_df[cols]
filter_sum_df = filter_sum_df.fillna('.')
filter_sum_res = filter_sum_df.to_dict('records')
2023-11-29 15:13:30 +08:00
self.sheet['target_mut'] = filter_sum_res
self.sheet['target_drug'] = filter_sum_pos_res
def fusion(self):
filter_sum_pos = os.path.join(self.path, 'fusion',
f'{self.sample_name}.fusion.hg19_multianno.filter.pos.txt')
filter_sum_pos_check = check_file_exist_and_empty(filter_sum_pos)
filter_sum_pos_res = list()
pos_dict = dict()
if not filter_sum_pos_check:
filter_sum_pos_df = pd.read_csv(filter_sum_pos, sep='\t')
# 添加常规列
filter_sum_pos_df = self._add_columns(filter_sum_pos_df)
# 添加基因功能
filter_sum_pos_df = self._add_gene_function(filter_sum_pos_df, colname='ref_gene')
# 药物处理
self.drug_parse(filter_sum_pos_df['DrugCn'].to_list())
filter_sum_pos_df['Validated'] = 1
filter_sum_pos_df = filter_sum_pos_df.fillna('.')
2023-12-12 10:59:08 +08:00
grouped_df = filter_sum_pos_df.groupby(['POS', 'REF', 'ALT'])
# 对每个分组进行操作
for group_name, group_data in grouped_df:
pos, ref, alt = group_name
if any(group_data['AMP_mut_level'] == 'I'):
filter_condition = (filter_sum_pos_df['POS'] == pos) & \
(filter_sum_pos_df['REF'] == ref) & \
(filter_sum_pos_df['ALT'] == alt)
filter_sum_pos_df.loc[filter_condition, 'AMP_mut_level'] = 'I'
2023-11-29 15:13:30 +08:00
pos_dict = filter_sum_pos_df.set_index(['POS', 'REF', 'ALT'])['AMP_mut_level'].to_dict()
2023-12-12 10:59:08 +08:00
filter_sum_pos_res = filter_sum_pos_df.to_dict('records')
2023-11-29 15:13:30 +08:00
filter_sum = os.path.join(self.path, 'fusion',
f'{self.sample_name}.fusion.hg19_multianno.filter.txt')
filter_sum_check = check_file_exist_and_empty(filter_sum)
filter_sum_res = list()
if not filter_sum_check:
filter_sum_df = pd.read_csv(filter_sum, sep='\t')
filter_sum_df['Validated'] = 1
level_dict = dict()
level_dict.update(pos_dict)
filter_sum_df['AMP_mut_level'] = filter_sum_df.set_index(['POS', 'REF', 'ALT']).index.map(level_dict)
filter_sum_df = filter_sum_df.fillna('.')
filter_sum_res = filter_sum_df.to_dict('records')
self.sheet['fusion_mut'] = filter_sum_res
self.sheet['fusion_drug'] = filter_sum_pos_res
def cnv(self):
filter_sum_pos = os.path.join(self.path, 'cnv',
f'{self.sample_name}.rmdup.cns.filter.pos.txt')
filter_sum_pos_check = check_file_exist_and_empty(filter_sum_pos)
filter_sum_pos_res = list()
pos_dict = dict()
if not filter_sum_pos_check:
filter_sum_pos_df = pd.read_csv(filter_sum_pos, sep='\t')
# 添加常规列
filter_sum_pos_df = self._add_columns(filter_sum_pos_df)
# 添加基因功能
filter_sum_pos_df = self._add_gene_function(filter_sum_pos_df, colname='ref_gene')
# 药物处理
self.drug_parse(filter_sum_pos_df['DrugCn'].to_list())
filter_sum_pos_df = filter_sum_pos_df.fillna('.')
filter_sum_pos_df['Validated'] = 1
2023-12-12 10:59:08 +08:00
grouped_df = filter_sum_pos_df.groupby(['chromosome', 'start', 'end', 'ref_gene'])
# 对每个分组进行操作
for group_name, group_data in grouped_df:
chromosome, start, end, ref_gene = group_name
if any(group_data['AMP_mut_level'] == 'I'):
filter_condition = (filter_sum_pos_df['chromosome'] == chromosome) & \
(filter_sum_pos_df['start'] == start) & \
(filter_sum_pos_df['end'] == end) & \
(filter_sum_pos_df['ref_gene'] == ref_gene)
filter_sum_pos_df.loc[filter_condition, 'AMP_mut_level'] = 'I'
2023-11-29 15:13:30 +08:00
filter_sum_pos_res = filter_sum_pos_df.to_dict('records')
pos_dict = filter_sum_pos_df.set_index(['chromosome', 'start', 'end', 'ref_gene'])[
'AMP_mut_level'].to_dict()
filter_sum = os.path.join(self.path, 'cnv',
f'{self.sample_name}.rmdup.cns.filter.txt')
filter_sum_check = check_file_exist_and_empty(filter_sum)
filter_sum_res = list()
if not filter_sum_check:
filter_sum_df = pd.read_csv(filter_sum, sep='\t')
filter_sum_df['Validated'] = 1
level_dict = dict()
level_dict.update(pos_dict)
filter_sum_df['AMP_mut_level'] = filter_sum_df.set_index(
['chromosome', 'start', 'end', 'ref_gene']).index.map(level_dict)
filter_sum_df = filter_sum_df.fillna('.')
filter_sum_res = filter_sum_df.to_dict('records')
self.sheet['cnv_mut'] = filter_sum_res
self.sheet['cnv_drug'] = filter_sum_pos_res
def msi(self):
"""
Process msi result files
"""
msi_file = os.path.join(self.path, 'msi', f'{self.sample_name}.msi.txt')
msi_check = check_file_exist_and_empty(msi_file)
msi_res = dict()
if not msi_check:
df = pd.read_csv(msi_file, sep='\t')
res = df.to_dict('records')[0]
msi_res['msi_count'] = res['Total_Number_of_Sites']
msi_res['msi_value'] = res['%']
if msi_res['msi_value'] >= 0.3:
msi_res['msi_result'] = 'MSI-H'
msi_res['msi_predict'] = '对免疫检查点抑制剂可能敏感'
else:
msi_res['msi_result'] = 'MSS'
msi_res['msi_predict'] = '对免疫检查点抑制剂可能不敏感'
self.sheet['msi'] = [msi_res]
def chemo(self):
"""
化疗
"""
# 化疗文件 .chemo.comb.txt .drug.infos.txt .drug.res.txt
res_file = os.path.join(self.path, 'chemo', f'{self.sample_name}.drug.res.txt')
info_file = os.path.join(self.path, 'chemo', f'{self.sample_name}.drug.infos.txt')
comb_file = os.path.join(self.path, 'chemo', f'{self.sample_name}.chemo.comb.txt')
res_check = check_file_exist_and_empty(res_file)
info_check = check_file_exist_and_empty(info_file)
comb_check = check_file_exist_and_empty(comb_file)
chemo_res = dict()
check_list = [res_check, info_check, comb_check]
file_list = [res_file, info_file, comb_file]
for index, name in enumerate(['chemo_res', 'chemo_info', 'chemo_comb']):
if not check_list[index]:
df = pd.read_csv(file_list[index], sep='\t')
df = df.fillna('.')
self.sheet[name] = df.to_dict('records')
chemo_res[name] = df.to_dict('records')
else:
print(check_list[index])
# raise UserWarning('%s 目录下缺%s 文件' % (self.path, name))
def germline(self):
germline_file = os.path.join(self.path, 'mutation',
f'{self.sample_name}.snp_indel.germline.hg19_multianno.filter.txt')
germlineres = []
germline_check = check_file_exist_and_empty(germline_file)
if not germline_check:
df = pd.read_csv(germline_file, sep='\t')
df = df.fillna('.')
germlineres = df.to_dict('records')
self.sheet['germline'] = germlineres
def heredity(self):
"""
遗传
"""
heredi_file = os.path.join(self.path, 'hereditary', f'{self.sample_name}.hereditary.pre.txt')
heredires = []
heredi_check = check_file_exist_and_empty(heredi_file)
if not heredi_check:
df = pd.read_csv(heredi_file, sep='\t')
df = df.fillna('.')
heredires = df.to_dict('records')
self.sheet['hereditary'] = heredires
def heredity_res(self):
"""
遗传结果文件
"""
hereditary_file = os.path.join(self.path, 'hereditary', f'{self.sample_name}.hereditary.txt')
heredi_risk_file = os.path.join(self.path, 'hereditary', f'{self.sample_name}.risk.txt')
for name, file in zip(['hereditary', 'hereditary_risk', ], [hereditary_file, heredi_risk_file]):
file_check = check_file_exist_and_empty(file)
if not file_check:
df = pd.read_csv(file, sep='\t')
df = df.fillna('.')
self.sheet[name] = df.to_dict('records')
else:
# raise UserWarning('%s 目录下缺%s 文件' % (self.path, name))
print(file_check)
2023-12-08 17:57:03 +08:00
self.sheet[name] = []
2023-11-29 15:13:30 +08:00
def indication(self):
indication_file = os.path.join(self.path, 'mutation', 'indication.txt')
file_check = check_file_exist_and_empty(indication_file)
if not file_check:
df = pd.read_csv(indication_file, sep='\t')
df = df.fillna('.')
self.sheet['indication'] = df.to_dict('records')
else:
# raise UserWarning('%s 目录下缺%s 文件' % (self.path, name))
print(file_check)
def longindel(self):
2023-12-25 14:06:30 +08:00
filter_sum_pos = os.path.join(self.path, 'fusion',
f'{self.sample_name}.longindel.hg19_multianno.filter.pos.txt')
filter_sum_pos_check = check_file_exist_and_empty(filter_sum_pos)
filter_sum_pos_res = list()
pos_dict = dict()
if not filter_sum_pos_check:
filter_sum_pos_df = pd.read_csv(filter_sum_pos, sep='\t')
# 添加常规列
filter_sum_pos_df = self._add_columns(filter_sum_pos_df)
# 添加基因功能
filter_sum_pos_df = self._add_gene_function(filter_sum_pos_df, colname='ref_gene')
# 药物处理
self.drug_parse(filter_sum_pos_df['DrugCn'].to_list())
filter_sum_pos_df['Validated'] = 1
filter_sum_pos_df = filter_sum_pos_df.fillna('.')
grouped_df = filter_sum_pos_df.groupby(['#CHROM', 'POS', 'REF', 'ALT'])
# 对每个分组进行操作
for group_name, group_data in grouped_df:
chr, pos, ref, alt = group_name
if any(group_data['AMP_mut_level'] == 'I'):
filter_condition = (filter_sum_pos_df['#CHROM'] == chr) & \
(filter_sum_pos_df['POS'] == pos) & \
(filter_sum_pos_df['REF'] == ref) & \
(filter_sum_pos_df['ALT'] == alt)
filter_sum_pos_df.loc[filter_condition, 'AMP_mut_level'] = 'I'
pos_dict = filter_sum_pos_df.set_index(['#CHROM', 'POS', 'REF', 'ALT'])['AMP_mut_level'].to_dict()
filter_sum_pos_res = filter_sum_pos_df.to_dict('records')
filter_sum = os.path.join(self.path, 'fusion', f'{self.sample_name}.longindel.hg19_multianno.filter.txt')
filter_sum_check = check_file_exist_and_empty(filter_sum)
filter_sum_res = list()
if not filter_sum_check:
filter_sum_df = pd.read_csv(filter_sum, sep='\t')
filter_sum_df['Validated'] = 1
level_dict = dict()
level_dict.update(pos_dict)
filter_sum_df['AMP_mut_level'] = filter_sum_df.set_index(['#CHROM', 'POS', 'REF', 'ALT']).index.map(
level_dict)
filter_sum_df = filter_sum_df.fillna('.')
filter_sum_res = filter_sum_df.to_dict('records')
self.sheet['longindel_mut'] = filter_sum_res
self.sheet['longindel_drug'] = filter_sum_pos_res
def neoantigen(self):
neoantigen = os.path.join(self.path, 'neoantigen', f'MHC_Class_I', 'neoantigen.txt')
hla = os.path.join(self.path, 'neoantigen', f'hla', f'{self.normal_name}_result.tsv')
self.sheet['neoantigen'] = self.txt_2_excel(neoantigen)
self.sheet['hla'] = self.txt_2_excel(hla)
2023-11-29 15:13:30 +08:00
def qc(self):
qc_files = glob.glob(os.path.join(self.path, 'qc', '*_qc.txt'))
qc_res = []
if qc_files:
if os.path.getsize(qc_files[0]) > 0:
qc = pd.read_csv(qc_files[0], sep='\t', header=None)
qc_res = [qc.set_index(0)[1].to_dict()]
self.sheet['qc'] = qc_res
def drugs(self):
drug_record = pd.DataFrame(self.drug_record)
drug_record = drug_record.drop_duplicates()
self.sheet['drugs'] = drug_record.to_dict('records')
def collect(self):
writer = pd.ExcelWriter(self.outpath)
self.cms()
2023-12-25 14:06:30 +08:00
self.qc()
2023-11-29 15:13:30 +08:00
self.snv()
self.fusion()
2023-12-25 14:06:30 +08:00
self.longindel()
2023-11-29 15:13:30 +08:00
self.cnv()
self.msi()
self.germline()
self.heredity()
self.heredity_res()
self.chemo()
self.indication()
2023-12-25 14:06:30 +08:00
self.neoantigen()
2023-11-29 15:13:30 +08:00
self.drugs()
# 遍历CSV文件列表
for sheet_name in self.sheet:
# 读取CSV文件为DataFrame
df = pd.DataFrame(self.sheet[sheet_name])
df.to_excel(writer, sheet_name=sheet_name, index=False)
# 保存并关闭Excel写入器
writer.close()
# 加入cnv.png ???
wb = openpyxl.load_workbook(filename=self.outpath)
ws = wb['cnv_mut']
mr = ws.max_row
cell = 'C' + str(mr + 4)
cnv_pic_paths = glob.glob(os.path.join(self.path, 'cnv', '*cnv.png'))
if cnv_pic_paths:
image = Image(cnv_pic_paths[0])
ws.add_image(image, cell)
wb.save(self.outpath)
wb.close()
if __name__ == '__main__':
# 未加日志,未添加路径
parser = argparse.ArgumentParser(description="post Process Script")
parser.add_argument('-n', '--barcode', help="sample's barcode", required=True)
parser.add_argument('-s', '--normal', help="sample's normal", default='', required=False, nargs='?')
parser.add_argument('-c', '--path', help="workflow run path", required=True)
parser.add_argument('-o', '--output', help="Output", required=True)
args = parser.parse_args()
postprocess = PostProcess(args.barcode, args.normal, args.path, args.output)
postprocess.collect()