pipeline/script/postprocess.py

477 lines
20 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

#! /usr/bin/env python3
import argparse
import glob
import json
import os
import re
from collections import defaultdict
import openpyxl
import pandas as pd
from openpyxl.drawing.image import Image
def database():
"""
增持匹配信息
"""
path = os.environ.get('DATABASE')
if not path:
raise UserWarning('未设置DATABASE环境变量')
# gene function 基因功能描述信息
gene_function_path = os.path.join(path, 'gene_function.txt')
gene_function_df = pd.read_csv(gene_function_path, sep='\t')
gene_function_df = gene_function_df.fillna('.')
gene_function_df['genename'] = gene_function_df.apply(lambda x: x['基因'].upper(), axis=1)
# return {基因: 基因功能翻译}
gene_function = gene_function_df.set_index('genename')['基因功能翻译'].to_dict()
# drug_mechanism 药物描述信息
drug_mechanism_path = os.path.join(path, 'target_drug.txt')
drug_mechanism_df = pd.read_csv(drug_mechanism_path, usecols=list(range(18)), sep='\t')
drug_mechanism_df = drug_mechanism_df.fillna('.')
drug_mechanism_df['drugs'] = drug_mechanism_df.apply(lambda x: '|'.join(
[tm.upper() for tm in [x['药物名称-英文'], x['药物名称-中文']] if tm != '.']), axis=1)
drug_mechanism_df['dec'] = drug_mechanism_df.apply(lambda x: "||".join(
[tm for tm in [x['适应症(中文)'], x['作用机制-中文']] if tm != '.']), axis=1)
# return {drugA: drugAdes\\\\drugAdes中文 drugA中文: drugAdes\\\\drugAdes中文}
drug_mechanism = {}
for _, row in drug_mechanism_df.iterrows():
drug_mechanism.update(dict(zip(row['drugs'].split('|'), [row['dec']] * len(row['drugs'].split('|')))))
return dict(gene_function=gene_function, drug_mechanism=drug_mechanism)
def check_file_exist_and_empty(file_path):
# 检查文件是否存在
if not os.path.exists(file_path):
return f"文件 '{file_path}' 不存在"
# 检查文件是否为空
if os.path.getsize(file_path) == 0:
return f"文件 '{file_path}' 是空文件"
def tree():
return defaultdict(tree)
class PostProcess:
"""
excel处理
"""
def __init__(self, sample_name, normal_name, path, outpath):
self.path = path
self.outpath = outpath
self.sample_name = sample_name
self.normal_name = normal_name
# self.needcol = self.need_col()
self.database = database()
self.drug_record = list()
self.sheet = tree()
@staticmethod
def txt_2_excel(path):
try:
df = pd.read_csv(path, sep='\t')
except pd.errors.EmptyDataError:
return []
return df.to_dict('records')
@staticmethod
def _add_columns(df: pd.DataFrame):
"""
添加必须列,经过转换得到新列,这个应该后期最好在注释的时候进行
"""
# 证据等级转换
if 'Label' in df.columns and '证据等级' in df.columns:
df['证据等级'] = df.apply(lambda x: 'C' if x['Label'] == '非适应症' else x['证据等级'], axis=1)
# 突变的等级分类
if '证据等级' in df.columns:
df['AMP_mut_level'] = df['证据等级'].replace(['A', 'B', 'C', 'D'], ['I', 'I', 'II', 'II'])
# 药物等级划分
if 'Response_Type_C' in df.columns and '证据等级' in df.columns:
df['Drug_Category'] = 'c'
df.loc[df['Response_Type_C'].str.contains('敏感') & (df['证据等级'] == 'A'), 'Drug_Category'] = 'a'
# df.loc[df['Response_Type_C'].str.startswith('敏感') & (df['证据等级'] == 'C'), 'Drug_Category'] = 'b'
df.loc[df['Response_Type_C'].str.contains('敏感') & (df['Label'] == '非适应症'), 'Drug_Category'] = 'b'
df.loc[df['Response_Type_C'].str.contains('耐药'), 'Drug_Category'] = 'd'
# 胚系突变分类 通过 CLNSIG列区分突变等级
if 'CLNSIG' in df.columns:
df['ACMG_level'] = '3'
df.loc[df['CLNSIG'].str.lower().str.startswith('likely_pathogenic'), 'ACMG_level'] = '2'
df.loc[(df['CLNSIG'].str.lower().str.startswith('pathogenic')), 'ACMG_level'] = '1'
return df
@staticmethod
def _add_cal_col(df: pd.DataFrame):
try:
# 统计 'MutationTaster_pred', 'FATHMM_pred', 'MetaLR_pred' 的D的数目
tmdf2 = df[['MutationTaster_pred', 'FATHMM_pred', 'MetaLR_pred']]
df['Deleterious'] = tmdf2.apply(lambda x: x.tolist().count('D'), axis=1)
# 计算 这几列中最大的值
# eval(str(x)) 科学计数法转换
tmdf1 = df[
['1000g2015aug_all', '1000g2015aug_eas', 'esp6500siv2_all', 'ExAC_nontcga_ALL', 'ExAC_nontcga_EAS',
'gnomAD_genome_ALL', 'gnomAD_genome_EAS']].replace('.', 0).applymap(lambda x: eval(str(x)))
df['freq_high'] = tmdf1.max(axis=1)
except KeyError as e:
print(e)
return df
def _add_gene_function(self, df: pd.DataFrame, colname='Gene_refGene'):
if colname in df.columns:
df['Gene_function'] = df.apply(lambda x: self.database['gene_function'].get(x[colname], ''),
axis=1)
return df
def drug_parse(self, drugs: list):
"""
药物处理
"""
# 药物单独处理 sheetname
# for drug in drugs['药物中文名'].to_list():
for drug in drugs:
for spdrug in re.split(" \+ |,", drug):
self.drug_record.append(
{'drug_name': spdrug, 'drug_detail': self.database['drug_mechanism'].get(spdrug, '')})
def cms(self):
"""
样本信息
"""
# cms_files = glob.glob(os.path.join(self.path, 'qc', '*_post.json'))
cms_info_need = []
cms_file = os.path.join(self.path, 'qc', f'{self.sample_name}_post.json')
check_file = check_file_exist_and_empty(cms_file)
if check_file:
return cms_info_need
file_read = open(cms_file, 'r')
try:
cms_info = json.load(file_read)['data']
except Exception as e:
raise UserWarning('cms 文件加载有误!', e)
if not cms_info:
raise UserWarning('cms 内容为空!')
file_read.close()
df = pd.DataFrame(cms_info)
df['normal'] = self.normal_name
cms_info_need = df.to_dict('records')
self.sheet['cms'] = cms_info_need
def snv(self):
filter_sum = os.path.join(self.path, 'mutation',
f'{self.sample_name}.snp_indel.somatic.hg19_multianno.filter.sum.txt')
filter_pos = os.path.join(self.path, 'mutation',
f'{self.sample_name}.snp_indel.somatic.hg19_multianno.filter.sum.pos.txt')
filter_vus = os.path.join(self.path, 'mutation',
f'{self.sample_name}.snp_indel.somatic.hg19_multianno.filter.sum.vus.txt')
filter_neg = os.path.join(self.path, 'mutation',
f'{self.sample_name}.snp_indel.somatic.hg19_multianno.filter.sum.neg.txt')
filter_sum_pos_res = list()
# 从pos_files中获取药物信息
pos_check = check_file_exist_and_empty(filter_pos)
pos_dict = dict()
if not pos_check:
pos = pd.read_csv(filter_pos, sep='\t')
# 添加常规列
pos = self._add_columns(pos)
# 添加基因功能
pos = self._add_gene_function(pos)
# 药物处理
self.drug_parse(pos['DrugCn'].to_list())
pos = pos.fillna('.')
grouped_df = pos.groupby('AAChange_refGene')
# 对每个分组进行操作
for group_name, group_data in grouped_df:
if any(group_data['AMP_mut_level'] == 'I'):
pos.loc[pos['AAChange_refGene'] == group_name, 'AMP_mut_level'] = 'I'
pos_dict = pos.set_index(['Chr', 'Start', 'End'])['AMP_mut_level'].to_dict()
filter_sum_pos_res = pos.to_dict('records')
# 从vus_file中获取vus等级分类 'III' 级
vus_dict = dict()
vus_check = check_file_exist_and_empty(filter_vus)
if not vus_check:
vus = pd.read_csv(filter_vus, sep='\t')
vus['AMP_mut_level'] = 'III'
vus_dict = vus.set_index(['Chr', 'Start', 'End'])['AMP_mut_level'].to_dict()
# 从neg_file中获取neg等级分类 'IIII' 级
neg_dict = dict()
neg_check = check_file_exist_and_empty(filter_neg)
if not neg_check:
neg = pd.read_csv(filter_neg, sep='\t')
neg['AMP_mut_level'] = 'IIII'
neg_dict = neg.set_index(['Chr', 'Start', 'End'])['AMP_mut_level'].to_dict()
filter_sum_res = list()
filter_sum_check = check_file_exist_and_empty(filter_sum)
if not filter_sum_check:
filter_sum_df = pd.read_csv(filter_sum, sep='\t')
level_dict = dict()
level_dict.update(pos_dict)
level_dict.update(vus_dict)
level_dict.update(neg_dict)
filter_sum_df['AMP_mut_level'] = filter_sum_df.set_index(['Chr', 'Start', 'End']).index.map(level_dict)
filter_sum_df = filter_sum_df.fillna('.')
filter_sum_res = filter_sum_df.to_dict('records')
self.sheet['target_mut'] = filter_sum_res
self.sheet['target_drug'] = filter_sum_pos_res
def fusion(self):
filter_sum_pos = os.path.join(self.path, 'fusion',
f'{self.sample_name}.fusion.hg19_multianno.filter.pos.txt')
filter_sum_pos_check = check_file_exist_and_empty(filter_sum_pos)
filter_sum_pos_res = list()
pos_dict = dict()
if not filter_sum_pos_check:
filter_sum_pos_df = pd.read_csv(filter_sum_pos, sep='\t')
# 添加常规列
filter_sum_pos_df = self._add_columns(filter_sum_pos_df)
# 添加基因功能
filter_sum_pos_df = self._add_gene_function(filter_sum_pos_df, colname='ref_gene')
# 药物处理
self.drug_parse(filter_sum_pos_df['DrugCn'].to_list())
filter_sum_pos_df['Validated'] = 1
filter_sum_pos_df = filter_sum_pos_df.fillna('.')
filter_sum_pos_res = filter_sum_pos_df.to_dict('records')
pos_dict = filter_sum_pos_df.set_index(['POS', 'REF', 'ALT'])['AMP_mut_level'].to_dict()
filter_sum = os.path.join(self.path, 'fusion',
f'{self.sample_name}.fusion.hg19_multianno.filter.txt')
filter_sum_check = check_file_exist_and_empty(filter_sum)
filter_sum_res = list()
if not filter_sum_check:
filter_sum_df = pd.read_csv(filter_sum, sep='\t')
filter_sum_df['Validated'] = 1
level_dict = dict()
level_dict.update(pos_dict)
filter_sum_df['AMP_mut_level'] = filter_sum_df.set_index(['POS', 'REF', 'ALT']).index.map(level_dict)
filter_sum_df = filter_sum_df.fillna('.')
filter_sum_res = filter_sum_df.to_dict('records')
self.sheet['fusion_mut'] = filter_sum_res
self.sheet['fusion_drug'] = filter_sum_pos_res
def cnv(self):
filter_sum_pos = os.path.join(self.path, 'cnv',
f'{self.sample_name}.rmdup.cns.filter.pos.txt')
filter_sum_pos_check = check_file_exist_and_empty(filter_sum_pos)
filter_sum_pos_res = list()
pos_dict = dict()
if not filter_sum_pos_check:
filter_sum_pos_df = pd.read_csv(filter_sum_pos, sep='\t')
# 添加常规列
filter_sum_pos_df = self._add_columns(filter_sum_pos_df)
# 添加基因功能
filter_sum_pos_df = self._add_gene_function(filter_sum_pos_df, colname='ref_gene')
# 药物处理
self.drug_parse(filter_sum_pos_df['DrugCn'].to_list())
filter_sum_pos_df = filter_sum_pos_df.fillna('.')
filter_sum_pos_df['Validated'] = 1
filter_sum_pos_res = filter_sum_pos_df.to_dict('records')
pos_dict = filter_sum_pos_df.set_index(['chromosome', 'start', 'end', 'ref_gene'])[
'AMP_mut_level'].to_dict()
filter_sum = os.path.join(self.path, 'cnv',
f'{self.sample_name}.rmdup.cns.filter.txt')
filter_sum_check = check_file_exist_and_empty(filter_sum)
filter_sum_res = list()
if not filter_sum_check:
filter_sum_df = pd.read_csv(filter_sum, sep='\t')
filter_sum_df['Validated'] = 1
level_dict = dict()
level_dict.update(pos_dict)
filter_sum_df['AMP_mut_level'] = filter_sum_df.set_index(
['chromosome', 'start', 'end', 'ref_gene']).index.map(level_dict)
filter_sum_df = filter_sum_df.fillna('.')
filter_sum_res = filter_sum_df.to_dict('records')
self.sheet['cnv_mut'] = filter_sum_res
self.sheet['cnv_drug'] = filter_sum_pos_res
def msi(self):
"""
Process msi result files
"""
msi_file = os.path.join(self.path, 'msi', f'{self.sample_name}.msi.txt')
msi_check = check_file_exist_and_empty(msi_file)
msi_res = dict()
if not msi_check:
df = pd.read_csv(msi_file, sep='\t')
res = df.to_dict('records')[0]
msi_res['msi_count'] = res['Total_Number_of_Sites']
msi_res['msi_value'] = res['%']
if msi_res['msi_value'] >= 0.3:
msi_res['msi_result'] = 'MSI-H'
msi_res['msi_predict'] = '对免疫检查点抑制剂可能敏感'
else:
msi_res['msi_result'] = 'MSS'
msi_res['msi_predict'] = '对免疫检查点抑制剂可能不敏感'
self.sheet['msi'] = [msi_res]
def chemo(self):
"""
化疗
"""
# 化疗文件 .chemo.comb.txt .drug.infos.txt .drug.res.txt
res_file = os.path.join(self.path, 'chemo', f'{self.sample_name}.drug.res.txt')
info_file = os.path.join(self.path, 'chemo', f'{self.sample_name}.drug.infos.txt')
comb_file = os.path.join(self.path, 'chemo', f'{self.sample_name}.chemo.comb.txt')
res_check = check_file_exist_and_empty(res_file)
info_check = check_file_exist_and_empty(info_file)
comb_check = check_file_exist_and_empty(comb_file)
chemo_res = dict()
check_list = [res_check, info_check, comb_check]
file_list = [res_file, info_file, comb_file]
for index, name in enumerate(['chemo_res', 'chemo_info', 'chemo_comb']):
if not check_list[index]:
df = pd.read_csv(file_list[index], sep='\t')
df = df.fillna('.')
self.sheet[name] = df.to_dict('records')
chemo_res[name] = df.to_dict('records')
else:
print(check_list[index])
# raise UserWarning('%s 目录下缺%s 文件' % (self.path, name))
def germline(self):
germline_file = os.path.join(self.path, 'mutation',
f'{self.sample_name}.snp_indel.germline.hg19_multianno.filter.txt')
germlineres = []
germline_check = check_file_exist_and_empty(germline_file)
if not germline_check:
df = pd.read_csv(germline_file, sep='\t')
df = df.fillna('.')
germlineres = df.to_dict('records')
self.sheet['germline'] = germlineres
def heredity(self):
"""
遗传
"""
heredi_file = os.path.join(self.path, 'hereditary', f'{self.sample_name}.hereditary.pre.txt')
heredires = []
heredi_check = check_file_exist_and_empty(heredi_file)
if not heredi_check:
df = pd.read_csv(heredi_file, sep='\t')
df = df.fillna('.')
heredires = df.to_dict('records')
self.sheet['hereditary'] = heredires
def heredity_res(self):
"""
遗传结果文件
"""
hereditary_file = os.path.join(self.path, 'hereditary', f'{self.sample_name}.hereditary.txt')
heredi_risk_file = os.path.join(self.path, 'hereditary', f'{self.sample_name}.risk.txt')
for name, file in zip(['hereditary', 'hereditary_risk', ], [hereditary_file, heredi_risk_file]):
file_check = check_file_exist_and_empty(file)
if not file_check:
df = pd.read_csv(file, sep='\t')
df = df.fillna('.')
self.sheet[name] = df.to_dict('records')
else:
# raise UserWarning('%s 目录下缺%s 文件' % (self.path, name))
print(file_check)
def indication(self):
indication_file = os.path.join(self.path, 'mutation', 'indication.txt')
file_check = check_file_exist_and_empty(indication_file)
if not file_check:
df = pd.read_csv(indication_file, sep='\t')
df = df.fillna('.')
self.sheet['indication'] = df.to_dict('records')
else:
# raise UserWarning('%s 目录下缺%s 文件' % (self.path, name))
print(file_check)
def longindel(self):
longindel_files = glob.glob(
os.path.join(self.path, 'fusion', '*.longindel.pos.txt'))
if longindel_files:
# return self.txt_2_excel(longindel_files[0])
self.sheet['longindel'] = self.txt_2_excel(longindel_files[0])
def qc(self):
qc_files = glob.glob(os.path.join(self.path, 'qc', '*_qc.txt'))
qc_res = []
if qc_files:
if os.path.getsize(qc_files[0]) > 0:
qc = pd.read_csv(qc_files[0], sep='\t', header=None)
qc_res = [qc.set_index(0)[1].to_dict()]
self.sheet['qc'] = qc_res
def drugs(self):
drug_record = pd.DataFrame(self.drug_record)
drug_record = drug_record.drop_duplicates()
self.sheet['drugs'] = drug_record.to_dict('records')
def collect(self):
writer = pd.ExcelWriter(self.outpath)
self.cms()
self.snv()
self.fusion()
self.cnv()
self.msi()
self.germline()
self.heredity()
self.heredity_res()
self.longindel()
self.chemo()
self.indication()
self.qc()
self.drugs()
# 遍历CSV文件列表
for sheet_name in self.sheet:
# 读取CSV文件为DataFrame
df = pd.DataFrame(self.sheet[sheet_name])
df.to_excel(writer, sheet_name=sheet_name, index=False)
# 保存并关闭Excel写入器
writer.close()
# 加入cnv.png ???
wb = openpyxl.load_workbook(filename=self.outpath)
ws = wb['cnv_mut']
mr = ws.max_row
cell = 'C' + str(mr + 4)
cnv_pic_paths = glob.glob(os.path.join(self.path, 'cnv', '*cnv.png'))
if cnv_pic_paths:
image = Image(cnv_pic_paths[0])
ws.add_image(image, cell)
wb.save(self.outpath)
wb.close()
if __name__ == '__main__':
# 未加日志,未添加路径
parser = argparse.ArgumentParser(description="post Process Script")
parser.add_argument('-n', '--barcode', help="sample's barcode", required=True)
parser.add_argument('-s', '--normal', help="sample's normal", default='', required=False, nargs='?')
parser.add_argument('-c', '--path', help="workflow run path", required=True)
parser.add_argument('-o', '--output', help="Output", required=True)
args = parser.parse_args()
postprocess = PostProcess(args.barcode, args.normal, args.path, args.output)
postprocess.collect()