report/scripts/check_report_merge_20230713.py

487 lines
23 KiB
Python
Raw Permalink Normal View History

2023-07-31 13:49:34 +08:00
#!/usr/bin/python3
# -*- coding: UTF-8 -*-
import pandas as pd
from pandas import DataFrame
import numpy as np
import logging
import re
import sys
import os
import json
import glob
import openpyxl
from openpyxl import Workbook, load_workbook
from openpyxl.drawing.image import Image
if len(sys.argv) != 3:
print(" ".join(['usage:python', sys.argv[0], 'output_dir', 'name']))
sys.exit()
def snv_fusion_cnv(output_dir, name):
out_xlsx = "".join([output_dir, '/report/', name, '.check_new.xlsx'])
# genefunction
genefunction = {}
gf = open("/dataseq/jmdna/codes/reportbase/gene_function.txt", 'r', encoding='utf-8').readlines()
for line in gf[1:]:
gene = line.strip().split("\t")[0]
func = line.strip().split("\t")[1]
genefunction[gene.upper()] = func
genefunction['.'] = '.'
##drug_mechanism
drug_mechanism = {}
drug_fh = open("/dataseq/jmdna/codes/reportbase/target_drug.txt", 'r', encoding='utf-8').readlines()
for line in drug_fh[1:]:
disease = line.split("\t")[8]
mechanism = line.split("\t")[11]
drugs = line.split("\t")[0].split('|') + line.split("\t")[1].split('|')
if disease or mechanism:
for drug in drugs:
drug_mechanism[drug.upper()] = "\\\\".join([disease, mechanism]).strip()
'''
snvindel_sheet
'''
##input
filter_file = "".join([output_dir, '/report/', name, '.snp.indel.Somatic.annoall.hg19_multianno_filtered.txt'])
pos_file = "".join([output_dir, '/mutation/', name, '.snvindel.pos.dedup.txt'])
vus_file = "".join([output_dir, '/mutation/', name, '.snvindel.vus.txt'])
neg_file = "".join([output_dir, '/mutation/', name, '.snvindel.neg.txt'])
##filter_file
if os.path.getsize(filter_file) > 0:
snv = pd.read_table(filter_file, sep="\t")
cols = [index for index, row in snv[snv['可信'] == 0].iterrows()]
snv.drop(cols, inplace=True)
snv.insert(loc=24, column='ACMG_level', value=0)
snv.insert(loc=25, column='Deleterious', value=0)
snv.insert(loc=26, column='freq_high', value=0)
for index, row in snv.iterrows():
if re.search("Likely_pathogenic|drug", (row['CLNSIG']), re.I):
snv.loc[index, 'ACMG_level'] = '2'
elif re.search("pathogenic", (row['CLNSIG']), re.I) and not re.search("Conflicting", (row['CLNSIG']), re.I):
snv.loc[index, 'ACMG_level'] = '1'
else:
snv.loc[index, 'ACMG_level'] = '3'
snv.loc[index, "Deleterious"] = (
snv.loc[index, ['MutationTaster_pred', 'FATHMM_pred', 'MetaLR_pred']].tolist().count("D"))
snv.loc[index, "freq_high"] = ((snv.loc[
index, ['1000g2015aug_all', '1000g2015aug_eas', 'esp6500siv2_all', 'ExAC_nontcga_ALL',
'ExAC_nontcga_EAS', 'gnomAD_genome_ALL', 'gnomAD_genome_EAS']]).replace('.', '0')).max()
snv_1 = snv.iloc[:, list(range(14)) + [15, 17, 18, 20, 23, 24, 25, 26, 111, 112, 113]]
else:
snv_1 = pd.DataFrame(columns=[])
##pos_file
if os.path.getsize(pos_file) > 0:
pos = pd.read_table(pos_file, sep="\t")
pos = pos.iloc[:, [7, 10, 18, 23, 24, 25, 29, 30, 31, 32]]
pos_1 = pd.DataFrame(
columns=['AAChange.refGene', 'OKBSIG', 'AMP_evidence_level', 'AMP_mut_level', 'Indication', 'Drug',
'Response_Type', 'Evidence_Source', 'EfficacyEvidence', 'Drug_Detail', 'Gene_function',
'Drug_Category'])
pos = list(pos.groupby(['Gene.refGene', 'AAChange.refGene', 'fun_change']))
for i in pos:
for index, row in i[1].iterrows():
drugs = row['药物中文名'].replace(" + ", ",")
drugs = list(set(drugs.split(",")))
drug_mm = ''
for drug in drugs:
if drug.upper() in drug_mechanism.keys():
drug_mm += '[[' + drug + ']]' + drug_mechanism[drug.upper()]
i[1].loc[index, ['Drug_Detail']] = drug_mm
if row['标签'] == '非适应症':
row['证据等级'] = 'C'
if (re.search("敏感", row['Response_Type_C']) and row['证据等级'] == 'A'):
i[1].loc[index, ['Drug_Category']] = 'a'
elif re.search("敏感", row['Response_Type_C']) and row['证据等级'] == 'C':
i[1].loc[index, ['Drug_Category']] = 'b'
elif re.search("耐药", row['Response_Type_C']):
i[1].loc[index, ['Drug_Category']] = 'd'
else:
i[1].loc[index, ['Drug_Category']] = 'c'
i[1]['AMP_mut_level'] = i[1]['证据等级'].replace(['A', 'B', 'C', 'D'], ['I', 'I', 'II', 'II'])
pos_1.loc[len(pos_1)] = [i[0][1], i[0][2], '|'.join(list(i[1]['证据等级'])),
'|'.join(list(i[1]['AMP_mut_level'])), '|'.join(list(i[1]['疾病中文名'])),
'|'.join(list(i[1]['药物中文名'])), \
'|'.join(list(i[1]['Response_Type_C'])), '|'.join(list(i[1]['Evidence_Source_C'])),
'|'.join(list(i[1]['EfficacyEvidence'])), '|'.join(list(i[1]['Drug_Detail'])),
genefunction[i[0][0].upper()], '|'.join(list(i[1]['Drug_Category']))]
else:
pos_1 = pd.DataFrame(columns=[])
##vus_file
if os.path.getsize(vus_file) > 0:
vus = pd.read_table(vus_file, sep="\t")
vus_1 = vus.iloc[:, [9, 17]]
vus_1.insert(loc=2, column='AMP_mut_level', value='III')
vus_1 = vus_1.rename(columns={'fun_change': 'OKBSIG'})
else:
vus_1 = pd.DataFrame(columns=[])
##neg_file
if os.path.getsize(neg_file) > 0:
neg = pd.read_table(neg_file, sep="\t")
neg_1 = neg.iloc[:, [9, 17]]
neg_1.insert(loc=2, column='AMP_mut_level', value='IIII')
neg_1 = neg_1.rename(columns={'fun_change': 'OKBSIG'})
else:
neg_1 = pd.DataFrame(columns=[])
snvindel_sheet = pd.DataFrame(
columns=['可信', 'Chr', 'Start', 'End', 'Ref', 'Alt', 'AAChange.refGene', 'mutant_frequency', 'total_reads',
'mutant_reads', 'strand_bias', 'Otherinfo10', 'Func.refGene', 'Gene.refGene', 'ExonicFunc.refGene',
'avsnp150', 'cosmic91', 'CLNDN', 'CLNSIG', 'ACMG_level', 'Deleterious', 'freq_high', 'OKBSIG',
'AMP_evidence_level', 'AMP_mut_level', 'Indication', 'Drug', 'Response_Type', 'Evidence_Source',
'EfficacyEvidence', 'Drug_Detail', 'Gene_function', 'Drug_Category', 'Otherinfo11', 'Otherinfo12',
'Otherinfo13'])
pos_vus_neg = pd.concat([pos_1, vus_1, neg_1])
snv_pos_vus_neg = snv_1.merge(pos_vus_neg, how='left', on='AAChange.refGene')
snvindel_sheet = pd.concat([snvindel_sheet, snv_pos_vus_neg])
snvindel_sheet.rename(columns={"可信": "Validated"})
snvindel_sheet = snvindel_sheet.replace(np.nan, '.')
snvindel_sheet.rename(columns={"可信": "Validated"}, inplace=True)
'''
fusion_sheet
'''
fusion_pos_file = "".join([output_dir, '/fusion/', name, '.fusion.pos.dedup.txt'])
fusion_vus_file = "".join([output_dir, '/fusion/', name, '.fusion.vus.txt'])
if os.path.getsize(fusion_pos_file) > 0:
fusion_pos = pd.read_table(fusion_pos_file, sep="\t")
else:
fusion_pos = pd.DataFrame(columns=[])
if os.path.getsize(fusion_vus_file) > 0:
fusion_vus = pd.read_table(fusion_vus_file, sep="\t")
fusion_vus.insert(loc=0, column='可信', value=1)
else:
fusion_vus = pd.DataFrame(columns=[])
fusion_pos_vus = pd.concat([fusion_pos, fusion_vus])
fusion_sheet = pd.DataFrame(
columns=['Validated', 'CHROM1', 'POS1', 'CHROM2', 'POS2', 'GENE1', 'GENE2', 'FUSION', 'Support_reads(PE:SR)',
'Depth', 'FREQ1', 'FREQ2', 'OKBSIG', 'AMP_evidence_level', \
'AMP_mut_level', 'Indication', 'Drug', 'Response_Type', 'Evidence_Source', 'Efficacy_Evidence',
'Drug_Detail', 'Gene_function', 'Drug_Category', 'INFO', 'FORMAT', 'Sample'])
if not fusion_pos_vus.empty:
fusion_pos_vus = fusion_pos_vus.replace(np.nan, '.')
fusion = list(fusion_pos_vus.groupby(
['可信', '#CHROM', 'POS', 'CHROM2', 'POS2', 'GENE1', 'GENE2', 'FUSION', 'FREQ1', 'FREQ2', 'fun_change',
'INFO', 'FORMAT', name, 'Gene_Symbol']))
for i in fusion:
for index, row in i[1].iterrows():
drugs = row['药物中文名'].replace(" + ", ",")
drugs = list(set(drugs.split(",")))
drug_mm = ''
for drug in drugs:
if drug.upper() in drug_mechanism.keys():
drug_mm += '[[' + drug + ']]' + drug_mechanism[drug.upper()]
i[1].loc[index, ['Drug_Detail']] = drug_mm
if row['标签'] == '非适应症':
row['证据等级'] = 'C'
if (re.search("敏感", row['Response_Type_C']) and row['证据等级'] == 'A'):
i[1].loc[index, ['Drug_Category']] = 'a'
elif re.search("敏感", row['Response_Type_C']) and row['证据等级'] == 'C':
i[1].loc[index, ['Drug_Category']] = 'b'
elif re.search("耐药", row['Response_Type_C']):
i[1].loc[index, ['Drug_Category']] = 'd'
elif row['Response_Type_C'] == '.':
i[1].loc[index, ['Drug_Category']] = '.'
else:
i[1].loc[index, ['Drug_Category']] = 'c'
i[1]['AMP_mut_level'] = i[1]['证据等级'].replace(['A', 'B', 'C', 'D'], ['I', 'I', 'II', 'II'])
fusion_sheet.loc[len(fusion_sheet)] = list(i[0][0:8]) + [i[0][13].split(":")[1],
i[0][13].split(":")[7]] + list(i[0][8:11]) + [
'|'.join(list(i[1]['证据等级'])),
'|'.join(list(i[1]['AMP_mut_level'])), \
'|'.join(list(i[1]['疾病中文名'])), '|'.join(list(i[1]['药物中文名'])),
'|'.join(list(i[1]['Response_Type_C'])),
'|'.join(list(i[1]['Evidence_Source_C'])),
'|'.join(list(i[1]['EfficacyEvidence'])), \
'|'.join(list(i[1]['Drug_Detail'])),
genefunction[i[0][14].upper()],
'|'.join(list(i[1]['Drug_Category']))] + list(i[0][11:14])
fusion_sheet = fusion_sheet.replace(np.nan, '.')
'''
cnv_sheet
'''
cnv_pos_file = "/home/jm001/test/reference_standard/lung85gene/Tissue/BKDL202603539-1a/cnvkit/BKDL202603539-1a.cnv.pos.dedup.txt"
cnv_sheet = pd.DataFrame(
columns=['Validated', 'Chromosome', 'Start', 'End', 'Gene', 'Depth', 'Probes', 'Copy_number', 'OKBSIG',
'Gene_Symbol', 'AMP_evidence_level', 'AMP_mut_level', \
'Indication', 'Drug', 'Response_Type', 'Evidence_Source', 'Efficacy_Evidence', 'Drug_Detail',
'Gene_Function', 'Drug_Category'])
if os.path.getsize(cnv_pos_file) > 0:
cnv_pos = pd.read_table(cnv_pos_file, sep="\t")
cnv = list(cnv_pos.groupby(
['可信', 'chromosome', 'start', 'end', 'gene', 'depth', 'probes', 'cn', 'fun_change', 'Gene_Symbol']))
for i in cnv:
for index, row in i[1].iterrows():
drugs = row['药物中文名'].replace(" + ", ",")
drugs = list(set(drugs.split(",")))
drug_mm = ''
for drug in drugs:
if drug.upper() in drug_mechanism.keys():
drug_mm += '[[' + drug + ']]' + drug_mechanism[drug.upper()]
i[1].loc[index, ['Drug_Detail']] = drug_mm
if row['标签'] == '非适应症':
row['证据等级'] = 'C'
if (re.search("敏感", row['Response_Type_C']) and row['证据等级'] == 'A'):
i[1].loc[index, ['Drug_Category']] = 'a'
elif re.search("敏感", row['Response_Type_C']) and row['证据等级'] == 'C':
i[1].loc[index, ['Drug_Category']] = 'b'
elif re.search("耐药", row['Response_Type_C']):
i[1].loc[index, ['Drug_Category']] = 'd'
elif row['Response_Type_C'] == '.':
i[1].loc[index, ['Drug_Category']] = '.'
else:
i[1].loc[index, ['Drug_Category']] = 'c'
i[1]['AMP_mut_level'] = i[1]['证据等级'].replace(['A', 'B', 'C', 'D'], ['I', 'I', 'II', 'II'])
cnv_sheet.loc[len(cnv_sheet)] = list(i[0][0:10]) + ['|'.join(list(i[1]['证据等级'])),
'|'.join(list(i[1]['AMP_mut_level'])), \
'|'.join(list(i[1]['疾病中文名'])),
'|'.join(list(i[1]['药物中文名'])),
'|'.join(list(i[1]['Response_Type_C'])),
'|'.join(list(i[1]['Evidence_Source_C'])),
'|'.join(list(i[1]['EfficacyEvidence'])), \
'|'.join(list(i[1]['Drug_Detail'])),
genefunction[i[0][9].upper()],
'|'.join(list(i[1]['Drug_Category']))]
else:
cnv_pos = pd.DataFrame(columns=[])
with pd.ExcelWriter(out_xlsx) as writer:
snvindel_sheet.to_excel(writer, sheet_name="snvindel", index=False)
fusion_sheet.to_excel(writer, sheet_name="fusion", index=False)
cnv_sheet.to_excel(writer, sheet_name="cnv", index=False)
##加入cnvkit/*.cnv.png
wb = openpyxl.load_workbook(filename=out_xlsx)
ws = wb['cnv']
mr = ws.max_row
cell = 'C' + str(mr + 4)
cnv_pic = "".join([output_dir, '/cnvkit/', name, '.cnv.png'])
image = Image(cnv_pic)
ws.add_image(image, cell)
wb.save(out_xlsx)
class PostProcess:
"""
excel处理
"""
def __init__(self, path, outpath):
self.path = path
self.outpath = outpath
self.neeecol = self.need_col()
def need_col(self):
"""
读取所需列
"""
path = os.path.join(os.path.dirname(__file__), 'columns.csv')
cols = pd.read_csv(path)
cols = cols.fillna('')
cols_record = cols.to_dict('list')
for sheet in cols_record:
cols_record[sheet] = [x for x in cols_record[sheet] if x]
return cols_record
def msi(self):
"""
Process msi result files
"""
msi_files = glob.glob(os.path.join(self.path, 'MSI', '*.msi'))
msi_res = dict()
if msi_files:
df = pd.read_csv(msi_files[0], sep='\t')
res = df.to_dict('records')[0]
msi_res['msi_count'] = res['Total_Number_of_Sites']
msi_res['msi_value'] = res['%']
if msi_res['msi_value'] >= 0.3:
msi_res['msi_result'] = 'MSI-H'
msi_res['msi_predict'] = '对免疫检查点抑制剂可能敏感'
else:
msi_res['msi_result'] = 'MSS'
msi_res['msi_predict'] = '对免疫检查点抑制剂可能不敏感'
return [msi_res]
def chemo(self):
"""
化疗
"""
chemo_files = glob.glob(os.path.join(self.path, 'chemo', '*chemo.res.txt'))
chemo_res = []
if chemo_files:
df = pd.read_csv(chemo_files[0], sep='\t')
df = df.fillna('.')
chemo_res = df.to_dict('records')
return chemo_res
def heredity(self):
"""
遗传
"""
heredi_files = glob.glob(os.path.join(self.path, 'mutation', '*Germline*filtered.txt'))
heredires = []
if heredi_files:
df = pd.read_csv(heredi_files[0], sep='\t')
df = df.fillna('.')
tmdf1 = df[
['1000g2015aug_all', '1000g2015aug_eas', 'esp6500siv2_all', 'ExAC_nontcga_ALL', 'ExAC_nontcga_EAS',
'gnomAD_genome_ALL', 'gnomAD_genome_EAS']].replace('.', 0).applymap(lambda x: eval(str(x)))
df['freq_high'] = tmdf1.max(axis=1)
tmdf2 = df[['MutationTaster_pred', 'FATHMM_pred', 'MetaLR_pred']]
df['Deleterious'] = tmdf2.apply(lambda x: x.tolist().count('D'), axis=1)
df_need = df[self.neeecol.get('HCS', [])]
try:
heredires = df_need.to_dict('records')
except KeyError as e:
raise UserWarning('表头设置和配置文件不对应', e)
return heredires
def MMR(self):
"""
MMR
"""
mmr_files = glob.glob(os.path.join(self.path, 'MMR', '*mmr.pre.txt'))
mmr = []
if mmr_files:
df = pd.read_csv(mmr_files[0], sep='\t')
df = df.fillna('.')
tmdf1 = df[
['1000g2015aug_all', '1000g2015aug_eas', 'esp6500siv2_all', 'ExAC_nontcga_ALL', 'ExAC_nontcga_EAS',
'gnomAD_genome_ALL', 'gnomAD_genome_EAS']].replace('.', 0).applymap(lambda x: eval(str(x)))
df['freq_high'] = tmdf1.max(axis=1)
tmdf2 = df[['MutationTaster_pred', 'FATHMM_pred', 'MetaLR_pred']]
df['Deleterious'] = tmdf2.apply(lambda x: x.tolist().count('D'), axis=1)
df_need = df[self.neeecol.get('HCS', [])]
try:
mmr = df_need.to_dict('records')
except KeyError as e:
raise UserWarning('表头设置和配置文件不对应', e)
return mmr
def hotspot(self):
hotspot_files = glob.glob(
os.path.join(self.path, 'mutation', 'hotspot', '*hotspot.snp.indel.filter.anno.hg19_multianno.txt'))
if hotspot_files:
return self.txt_2_excel(hotspot_files[0])
def splicing(self):
splicing_files = glob.glob(
os.path.join(self.path, 'mutation', '*.target.splicing.txt'))
if splicing_files:
return self.txt_2_excel(splicing_files[0])
def indication(self):
indication_files = glob.glob(
os.path.join(self.path, 'mutation', '*indication.txt'))
if indication_files:
return self.txt_2_excel(indication_files[0])
def longindel(self):
longindel_files = glob.glob(
os.path.join(self.path, 'fusion', '*.longindel.pos.txt'))
if longindel_files:
return self.txt_2_excel(longindel_files[0])
def cms(self):
"""
样本信息
"""
cms_files = glob.glob(os.path.join(self.path, 'qc', '*_post.json'))
cms_info_need = []
if cms_files:
file_read = open(cms_files[0], 'r')
cms_info = json.load(file_read)['data']
file_read.close()
df = pd.DataFrame(cms_info)
df_need = df[self.neeecol.get('sample_info', [])]
try:
cms_info_need = df_need.to_dict('records')
except KeyError as e:
raise UserWarning('表头设置和配置文件不对应', e)
return cms_info_need
def qc(self):
qc_files = glob.glob(os.path.join(self.path, 'qc', '*_post.json'))
qc_res = []
if qc_files:
df = pd.read_csv(qc_files[0], sep='\t', header=None)
df = df.set_index(0).T
qc_res = df.to_dict('records')
return qc_res
#
# def snv(self):
# # filter file
# filter_files = glob.glob(os.path.join(self.path, 'report', '*snp.indel.Somatic.annoall.hg19_multianno_filtered.txt'))
# if filter_files:
# snv = pd.read_csv(filter_files[0], sep="\t")
# def sign_drug_Category(x):
# if '敏感' in x['Response_Type_C'] and x['证据等级'] == 'A':
# return 'a'
# elif '敏感' in x['Response_Type_C'] and x['证据等级'] == 'C':
# return 'b'
# elif '耐药' in x['Response_Type_C']:
# return 'd'
# else:
# return 'c'
# # pos_file 处理
# pos_files = glob.glob(os.path.join(self.path, 'mutation', '*snvindel.pos.txt'))
# if pos_files:
# pos = pd.read_csv(pos_files[0], sep='\t')
# pos['证据等级'] = pos.apply(lambda x: 'C' if x['标签'] == '非适应症' else x['证据等级'], axis=1)
# pos['Drug_Category'] = pos.apply(sign_drug_Category, axis=1)
# pos['AMP_mut_level'] = pos['证据等级'].replace(['A', 'B', 'C', 'D'], ['I', 'I', 'II', 'II'])
# agg_list = ['证据等级', 'AMP_mut_level', '疾病中文名', '药物中文名', '证据等级', 'Response_Type_C', 'Evidence_Source_C',
# 'EfficacyEvidence', 'Drug_Category']
# agg_dict = {column: ','.join for column in agg_list}
# pos_group =pos.groupby(['Gene.refGene','AAChange.refGene','fun_change']).agg(agg_dict, axis=1)
def txt_2_excel(self, path):
try:
df = pd.read_csv(path, sep='\t')
except pd.errors.EmptyDataError:
return []
return df.to_dict('records')
def collect(self):
writer = pd.ExcelWriter(self.outpath, mode='a', engine='openpyxl')
sheet = {
'MSI': self.msi(),
'chemo': self.chemo(),
'HCS': self.heredity(),
'sample_info': self.cms(),
'MMR': self.MMR(),
'hotspot': self.hotspot(),
'MET': self.splicing(),
'indication': self.indication(),
'longindel': self.longindel(),
'qc': self.qc()
}
# 遍历CSV文件列表
for sheet_name in sheet:
# 读取CSV文件为DataFrame
df = pd.DataFrame(sheet[sheet_name])
df.to_excel(writer, sheet_name=sheet_name, index=False)
# 保存并关闭Excel写入器
writer.close()
if __name__ == '__main__':
snv_fusion_cnv(sys.argv[1], sys.argv[2])
# 未加日志,未添加路径
out_xlsx = "".join([sys.argv[1], '/report/', sys.argv[2], '.check_new.xlsx'])
postprocess = PostProcess(sys.argv[1], out_xlsx)
postprocess.collect()