#!/usr/bin/python3 # -*- coding: UTF-8 -*- import pandas as pd from pandas import DataFrame import numpy as np import logging import re import sys import os import json import glob import openpyxl from openpyxl import Workbook, load_workbook from openpyxl.drawing.image import Image if len(sys.argv) != 3: print(" ".join(['usage:python', sys.argv[0], 'output_dir', 'name'])) sys.exit() def snv_fusion_cnv(output_dir, name): out_xlsx = "".join([output_dir, '/report/', name, '.check_new.xlsx']) # genefunction genefunction = {} gf = open("/dataseq/jmdna/codes/reportbase/gene_function.txt", 'r', encoding='utf-8').readlines() for line in gf[1:]: gene = line.strip().split("\t")[0] func = line.strip().split("\t")[1] genefunction[gene.upper()] = func genefunction['.'] = '.' ##drug_mechanism drug_mechanism = {} drug_fh = open("/dataseq/jmdna/codes/reportbase/target_drug.txt", 'r', encoding='utf-8').readlines() for line in drug_fh[1:]: disease = line.split("\t")[8] mechanism = line.split("\t")[11] drugs = line.split("\t")[0].split('|') + line.split("\t")[1].split('|') if disease or mechanism: for drug in drugs: drug_mechanism[drug.upper()] = "\\\\".join([disease, mechanism]).strip() ''' snvindel_sheet ''' ##input filter_file = "".join([output_dir, '/report/', name, '.snp.indel.Somatic.annoall.hg19_multianno_filtered.txt']) pos_file = "".join([output_dir, '/mutation/', name, '.snvindel.pos.dedup.txt']) vus_file = "".join([output_dir, '/mutation/', name, '.snvindel.vus.txt']) neg_file = "".join([output_dir, '/mutation/', name, '.snvindel.neg.txt']) ##filter_file if os.path.getsize(filter_file) > 0: snv = pd.read_table(filter_file, sep="\t") cols = [index for index, row in snv[snv['可信'] == 0].iterrows()] snv.drop(cols, inplace=True) snv.insert(loc=24, column='ACMG_level', value=0) snv.insert(loc=25, column='Deleterious', value=0) snv.insert(loc=26, column='freq_high', value=0) for index, row in snv.iterrows(): if re.search("Likely_pathogenic|drug", (row['CLNSIG']), re.I): snv.loc[index, 'ACMG_level'] = '2' elif re.search("pathogenic", (row['CLNSIG']), re.I) and not re.search("Conflicting", (row['CLNSIG']), re.I): snv.loc[index, 'ACMG_level'] = '1' else: snv.loc[index, 'ACMG_level'] = '3' snv.loc[index, "Deleterious"] = ( snv.loc[index, ['MutationTaster_pred', 'FATHMM_pred', 'MetaLR_pred']].tolist().count("D")) snv.loc[index, "freq_high"] = ((snv.loc[ index, ['1000g2015aug_all', '1000g2015aug_eas', 'esp6500siv2_all', 'ExAC_nontcga_ALL', 'ExAC_nontcga_EAS', 'gnomAD_genome_ALL', 'gnomAD_genome_EAS']]).replace('.', '0')).max() snv_1 = snv.iloc[:, list(range(14)) + [15, 17, 18, 20, 23, 24, 25, 26, 111, 112, 113]] else: snv_1 = pd.DataFrame(columns=[]) ##pos_file if os.path.getsize(pos_file) > 0: pos = pd.read_table(pos_file, sep="\t") pos = pos.iloc[:, [7, 10, 18, 23, 24, 25, 29, 30, 31, 32]] pos_1 = pd.DataFrame( columns=['AAChange.refGene', 'OKBSIG', 'AMP_evidence_level', 'AMP_mut_level', 'Indication', 'Drug', 'Response_Type', 'Evidence_Source', 'EfficacyEvidence', 'Drug_Detail', 'Gene_function', 'Drug_Category']) pos = list(pos.groupby(['Gene.refGene', 'AAChange.refGene', 'fun_change'])) for i in pos: for index, row in i[1].iterrows(): drugs = row['药物中文名'].replace(" + ", ",") drugs = list(set(drugs.split(","))) drug_mm = '' for drug in drugs: if drug.upper() in drug_mechanism.keys(): drug_mm += '[[' + drug + ']]' + drug_mechanism[drug.upper()] i[1].loc[index, ['Drug_Detail']] = drug_mm if row['标签'] == '非适应症': row['证据等级'] = 'C' if (re.search("敏感", row['Response_Type_C']) and row['证据等级'] == 'A'): i[1].loc[index, ['Drug_Category']] = 'a' elif re.search("敏感", row['Response_Type_C']) and row['证据等级'] == 'C': i[1].loc[index, ['Drug_Category']] = 'b' elif re.search("耐药", row['Response_Type_C']): i[1].loc[index, ['Drug_Category']] = 'd' else: i[1].loc[index, ['Drug_Category']] = 'c' i[1]['AMP_mut_level'] = i[1]['证据等级'].replace(['A', 'B', 'C', 'D'], ['I', 'I', 'II', 'II']) pos_1.loc[len(pos_1)] = [i[0][1], i[0][2], '|'.join(list(i[1]['证据等级'])), '|'.join(list(i[1]['AMP_mut_level'])), '|'.join(list(i[1]['疾病中文名'])), '|'.join(list(i[1]['药物中文名'])), \ '|'.join(list(i[1]['Response_Type_C'])), '|'.join(list(i[1]['Evidence_Source_C'])), '|'.join(list(i[1]['EfficacyEvidence'])), '|'.join(list(i[1]['Drug_Detail'])), genefunction[i[0][0].upper()], '|'.join(list(i[1]['Drug_Category']))] else: pos_1 = pd.DataFrame(columns=[]) ##vus_file if os.path.getsize(vus_file) > 0: vus = pd.read_table(vus_file, sep="\t") vus_1 = vus.iloc[:, [9, 17]] vus_1.insert(loc=2, column='AMP_mut_level', value='III') vus_1 = vus_1.rename(columns={'fun_change': 'OKBSIG'}) else: vus_1 = pd.DataFrame(columns=[]) ##neg_file if os.path.getsize(neg_file) > 0: neg = pd.read_table(neg_file, sep="\t") neg_1 = neg.iloc[:, [9, 17]] neg_1.insert(loc=2, column='AMP_mut_level', value='IIII') neg_1 = neg_1.rename(columns={'fun_change': 'OKBSIG'}) else: neg_1 = pd.DataFrame(columns=[]) snvindel_sheet = pd.DataFrame( columns=['可信', 'Chr', 'Start', 'End', 'Ref', 'Alt', 'AAChange.refGene', 'mutant_frequency', 'total_reads', 'mutant_reads', 'strand_bias', 'Otherinfo10', 'Func.refGene', 'Gene.refGene', 'ExonicFunc.refGene', 'avsnp150', 'cosmic91', 'CLNDN', 'CLNSIG', 'ACMG_level', 'Deleterious', 'freq_high', 'OKBSIG', 'AMP_evidence_level', 'AMP_mut_level', 'Indication', 'Drug', 'Response_Type', 'Evidence_Source', 'EfficacyEvidence', 'Drug_Detail', 'Gene_function', 'Drug_Category', 'Otherinfo11', 'Otherinfo12', 'Otherinfo13']) pos_vus_neg = pd.concat([pos_1, vus_1, neg_1]) snv_pos_vus_neg = snv_1.merge(pos_vus_neg, how='left', on='AAChange.refGene') snvindel_sheet = pd.concat([snvindel_sheet, snv_pos_vus_neg]) snvindel_sheet.rename(columns={"可信": "Validated"}) snvindel_sheet = snvindel_sheet.replace(np.nan, '.') snvindel_sheet.rename(columns={"可信": "Validated"}, inplace=True) ''' fusion_sheet ''' fusion_pos_file = "".join([output_dir, '/fusion/', name, '.fusion.pos.dedup.txt']) fusion_vus_file = "".join([output_dir, '/fusion/', name, '.fusion.vus.txt']) if os.path.getsize(fusion_pos_file) > 0: fusion_pos = pd.read_table(fusion_pos_file, sep="\t") else: fusion_pos = pd.DataFrame(columns=[]) if os.path.getsize(fusion_vus_file) > 0: fusion_vus = pd.read_table(fusion_vus_file, sep="\t") fusion_vus.insert(loc=0, column='可信', value=1) else: fusion_vus = pd.DataFrame(columns=[]) fusion_pos_vus = pd.concat([fusion_pos, fusion_vus]) fusion_sheet = pd.DataFrame( columns=['Validated', 'CHROM1', 'POS1', 'CHROM2', 'POS2', 'GENE1', 'GENE2', 'FUSION', 'Support_reads(PE:SR)', 'Depth', 'FREQ1', 'FREQ2', 'OKBSIG', 'AMP_evidence_level', \ 'AMP_mut_level', 'Indication', 'Drug', 'Response_Type', 'Evidence_Source', 'Efficacy_Evidence', 'Drug_Detail', 'Gene_function', 'Drug_Category', 'INFO', 'FORMAT', 'Sample']) if not fusion_pos_vus.empty: fusion_pos_vus = fusion_pos_vus.replace(np.nan, '.') fusion = list(fusion_pos_vus.groupby( ['可信', '#CHROM', 'POS', 'CHROM2', 'POS2', 'GENE1', 'GENE2', 'FUSION', 'FREQ1', 'FREQ2', 'fun_change', 'INFO', 'FORMAT', name, 'Gene_Symbol'])) for i in fusion: for index, row in i[1].iterrows(): drugs = row['药物中文名'].replace(" + ", ",") drugs = list(set(drugs.split(","))) drug_mm = '' for drug in drugs: if drug.upper() in drug_mechanism.keys(): drug_mm += '[[' + drug + ']]' + drug_mechanism[drug.upper()] i[1].loc[index, ['Drug_Detail']] = drug_mm if row['标签'] == '非适应症': row['证据等级'] = 'C' if (re.search("敏感", row['Response_Type_C']) and row['证据等级'] == 'A'): i[1].loc[index, ['Drug_Category']] = 'a' elif re.search("敏感", row['Response_Type_C']) and row['证据等级'] == 'C': i[1].loc[index, ['Drug_Category']] = 'b' elif re.search("耐药", row['Response_Type_C']): i[1].loc[index, ['Drug_Category']] = 'd' elif row['Response_Type_C'] == '.': i[1].loc[index, ['Drug_Category']] = '.' else: i[1].loc[index, ['Drug_Category']] = 'c' i[1]['AMP_mut_level'] = i[1]['证据等级'].replace(['A', 'B', 'C', 'D'], ['I', 'I', 'II', 'II']) fusion_sheet.loc[len(fusion_sheet)] = list(i[0][0:8]) + [i[0][13].split(":")[1], i[0][13].split(":")[7]] + list(i[0][8:11]) + [ '|'.join(list(i[1]['证据等级'])), '|'.join(list(i[1]['AMP_mut_level'])), \ '|'.join(list(i[1]['疾病中文名'])), '|'.join(list(i[1]['药物中文名'])), '|'.join(list(i[1]['Response_Type_C'])), '|'.join(list(i[1]['Evidence_Source_C'])), '|'.join(list(i[1]['EfficacyEvidence'])), \ '|'.join(list(i[1]['Drug_Detail'])), genefunction[i[0][14].upper()], '|'.join(list(i[1]['Drug_Category']))] + list(i[0][11:14]) fusion_sheet = fusion_sheet.replace(np.nan, '.') ''' cnv_sheet ''' cnv_pos_file = "/home/jm001/test/reference_standard/lung85gene/Tissue/BKDL202603539-1a/cnvkit/BKDL202603539-1a.cnv.pos.dedup.txt" cnv_sheet = pd.DataFrame( columns=['Validated', 'Chromosome', 'Start', 'End', 'Gene', 'Depth', 'Probes', 'Copy_number', 'OKBSIG', 'Gene_Symbol', 'AMP_evidence_level', 'AMP_mut_level', \ 'Indication', 'Drug', 'Response_Type', 'Evidence_Source', 'Efficacy_Evidence', 'Drug_Detail', 'Gene_Function', 'Drug_Category']) if os.path.getsize(cnv_pos_file) > 0: cnv_pos = pd.read_table(cnv_pos_file, sep="\t") cnv = list(cnv_pos.groupby( ['可信', 'chromosome', 'start', 'end', 'gene', 'depth', 'probes', 'cn', 'fun_change', 'Gene_Symbol'])) for i in cnv: for index, row in i[1].iterrows(): drugs = row['药物中文名'].replace(" + ", ",") drugs = list(set(drugs.split(","))) drug_mm = '' for drug in drugs: if drug.upper() in drug_mechanism.keys(): drug_mm += '[[' + drug + ']]' + drug_mechanism[drug.upper()] i[1].loc[index, ['Drug_Detail']] = drug_mm if row['标签'] == '非适应症': row['证据等级'] = 'C' if (re.search("敏感", row['Response_Type_C']) and row['证据等级'] == 'A'): i[1].loc[index, ['Drug_Category']] = 'a' elif re.search("敏感", row['Response_Type_C']) and row['证据等级'] == 'C': i[1].loc[index, ['Drug_Category']] = 'b' elif re.search("耐药", row['Response_Type_C']): i[1].loc[index, ['Drug_Category']] = 'd' elif row['Response_Type_C'] == '.': i[1].loc[index, ['Drug_Category']] = '.' else: i[1].loc[index, ['Drug_Category']] = 'c' i[1]['AMP_mut_level'] = i[1]['证据等级'].replace(['A', 'B', 'C', 'D'], ['I', 'I', 'II', 'II']) cnv_sheet.loc[len(cnv_sheet)] = list(i[0][0:10]) + ['|'.join(list(i[1]['证据等级'])), '|'.join(list(i[1]['AMP_mut_level'])), \ '|'.join(list(i[1]['疾病中文名'])), '|'.join(list(i[1]['药物中文名'])), '|'.join(list(i[1]['Response_Type_C'])), '|'.join(list(i[1]['Evidence_Source_C'])), '|'.join(list(i[1]['EfficacyEvidence'])), \ '|'.join(list(i[1]['Drug_Detail'])), genefunction[i[0][9].upper()], '|'.join(list(i[1]['Drug_Category']))] else: cnv_pos = pd.DataFrame(columns=[]) with pd.ExcelWriter(out_xlsx) as writer: snvindel_sheet.to_excel(writer, sheet_name="snvindel", index=False) fusion_sheet.to_excel(writer, sheet_name="fusion", index=False) cnv_sheet.to_excel(writer, sheet_name="cnv", index=False) ##加入cnvkit/*.cnv.png wb = openpyxl.load_workbook(filename=out_xlsx) ws = wb['cnv'] mr = ws.max_row cell = 'C' + str(mr + 4) cnv_pic = "".join([output_dir, '/cnvkit/', name, '.cnv.png']) image = Image(cnv_pic) ws.add_image(image, cell) wb.save(out_xlsx) class PostProcess: """ excel处理 """ def __init__(self, path, outpath): self.path = path self.outpath = outpath self.neeecol = self.need_col() def need_col(self): """ 读取所需列 """ path = os.path.join(os.path.dirname(__file__), 'columns.csv') cols = pd.read_csv(path) cols = cols.fillna('') cols_record = cols.to_dict('list') for sheet in cols_record: cols_record[sheet] = [x for x in cols_record[sheet] if x] return cols_record def msi(self): """ Process msi result files """ msi_files = glob.glob(os.path.join(self.path, 'MSI', '*.msi')) msi_res = dict() if msi_files: df = pd.read_csv(msi_files[0], sep='\t') res = df.to_dict('records')[0] msi_res['msi_count'] = res['Total_Number_of_Sites'] msi_res['msi_value'] = res['%'] if msi_res['msi_value'] >= 0.3: msi_res['msi_result'] = 'MSI-H' msi_res['msi_predict'] = '对免疫检查点抑制剂可能敏感' else: msi_res['msi_result'] = 'MSS' msi_res['msi_predict'] = '对免疫检查点抑制剂可能不敏感' return [msi_res] def chemo(self): """ 化疗 """ chemo_files = glob.glob(os.path.join(self.path, 'chemo', '*chemo.res.txt')) chemo_res = [] if chemo_files: df = pd.read_csv(chemo_files[0], sep='\t') df = df.fillna('.') chemo_res = df.to_dict('records') return chemo_res def heredity(self): """ 遗传 """ heredi_files = glob.glob(os.path.join(self.path, 'mutation', '*Germline*filtered.txt')) heredires = [] if heredi_files: df = pd.read_csv(heredi_files[0], sep='\t') df = df.fillna('.') tmdf1 = df[ ['1000g2015aug_all', '1000g2015aug_eas', 'esp6500siv2_all', 'ExAC_nontcga_ALL', 'ExAC_nontcga_EAS', 'gnomAD_genome_ALL', 'gnomAD_genome_EAS']].replace('.', 0).applymap(lambda x: eval(str(x))) df['freq_high'] = tmdf1.max(axis=1) tmdf2 = df[['MutationTaster_pred', 'FATHMM_pred', 'MetaLR_pred']] df['Deleterious'] = tmdf2.apply(lambda x: x.tolist().count('D'), axis=1) df_need = df[self.neeecol.get('HCS', [])] try: heredires = df_need.to_dict('records') except KeyError as e: raise UserWarning('表头设置和配置文件不对应', e) return heredires def MMR(self): """ MMR """ mmr_files = glob.glob(os.path.join(self.path, 'MMR', '*mmr.pre.txt')) mmr = [] if mmr_files: df = pd.read_csv(mmr_files[0], sep='\t') df = df.fillna('.') tmdf1 = df[ ['1000g2015aug_all', '1000g2015aug_eas', 'esp6500siv2_all', 'ExAC_nontcga_ALL', 'ExAC_nontcga_EAS', 'gnomAD_genome_ALL', 'gnomAD_genome_EAS']].replace('.', 0).applymap(lambda x: eval(str(x))) df['freq_high'] = tmdf1.max(axis=1) tmdf2 = df[['MutationTaster_pred', 'FATHMM_pred', 'MetaLR_pred']] df['Deleterious'] = tmdf2.apply(lambda x: x.tolist().count('D'), axis=1) df_need = df[self.neeecol.get('HCS', [])] try: mmr = df_need.to_dict('records') except KeyError as e: raise UserWarning('表头设置和配置文件不对应', e) return mmr def hotspot(self): hotspot_files = glob.glob( os.path.join(self.path, 'mutation', 'hotspot', '*hotspot.snp.indel.filter.anno.hg19_multianno.txt')) if hotspot_files: return self.txt_2_excel(hotspot_files[0]) def splicing(self): splicing_files = glob.glob( os.path.join(self.path, 'mutation', '*.target.splicing.txt')) if splicing_files: return self.txt_2_excel(splicing_files[0]) def indication(self): indication_files = glob.glob( os.path.join(self.path, 'mutation', '*indication.txt')) if indication_files: return self.txt_2_excel(indication_files[0]) def longindel(self): longindel_files = glob.glob( os.path.join(self.path, 'fusion', '*.longindel.pos.txt')) if longindel_files: return self.txt_2_excel(longindel_files[0]) def cms(self): """ 样本信息 """ cms_files = glob.glob(os.path.join(self.path, 'qc', '*_post.json')) cms_info_need = [] if cms_files: file_read = open(cms_files[0], 'r') cms_info = json.load(file_read)['data'] file_read.close() df = pd.DataFrame(cms_info) df_need = df[self.neeecol.get('sample_info', [])] try: cms_info_need = df_need.to_dict('records') except KeyError as e: raise UserWarning('表头设置和配置文件不对应', e) return cms_info_need def qc(self): qc_files = glob.glob(os.path.join(self.path, 'qc', '*_post.json')) qc_res = [] if qc_files: df = pd.read_csv(qc_files[0], sep='\t', header=None) df = df.set_index(0).T qc_res = df.to_dict('records') return qc_res # # def snv(self): # # filter file # filter_files = glob.glob(os.path.join(self.path, 'report', '*snp.indel.Somatic.annoall.hg19_multianno_filtered.txt')) # if filter_files: # snv = pd.read_csv(filter_files[0], sep="\t") # def sign_drug_Category(x): # if '敏感' in x['Response_Type_C'] and x['证据等级'] == 'A': # return 'a' # elif '敏感' in x['Response_Type_C'] and x['证据等级'] == 'C': # return 'b' # elif '耐药' in x['Response_Type_C']: # return 'd' # else: # return 'c' # # pos_file 处理 # pos_files = glob.glob(os.path.join(self.path, 'mutation', '*snvindel.pos.txt')) # if pos_files: # pos = pd.read_csv(pos_files[0], sep='\t') # pos['证据等级'] = pos.apply(lambda x: 'C' if x['标签'] == '非适应症' else x['证据等级'], axis=1) # pos['Drug_Category'] = pos.apply(sign_drug_Category, axis=1) # pos['AMP_mut_level'] = pos['证据等级'].replace(['A', 'B', 'C', 'D'], ['I', 'I', 'II', 'II']) # agg_list = ['证据等级', 'AMP_mut_level', '疾病中文名', '药物中文名', '证据等级', 'Response_Type_C', 'Evidence_Source_C', # 'EfficacyEvidence', 'Drug_Category'] # agg_dict = {column: ','.join for column in agg_list} # pos_group =pos.groupby(['Gene.refGene','AAChange.refGene','fun_change']).agg(agg_dict, axis=1) def txt_2_excel(self, path): try: df = pd.read_csv(path, sep='\t') except pd.errors.EmptyDataError: return [] return df.to_dict('records') def collect(self): writer = pd.ExcelWriter(self.outpath, mode='a', engine='openpyxl') sheet = { 'MSI': self.msi(), 'chemo': self.chemo(), 'HCS': self.heredity(), 'sample_info': self.cms(), 'MMR': self.MMR(), 'hotspot': self.hotspot(), 'MET': self.splicing(), 'indication': self.indication(), 'longindel': self.longindel(), 'qc': self.qc() } # 遍历CSV文件列表 for sheet_name in sheet: # 读取CSV文件为DataFrame df = pd.DataFrame(sheet[sheet_name]) df.to_excel(writer, sheet_name=sheet_name, index=False) # 保存并关闭Excel写入器 writer.close() if __name__ == '__main__': snv_fusion_cnv(sys.argv[1], sys.argv[2]) # 未加日志,未添加路径 out_xlsx = "".join([sys.argv[1], '/report/', sys.argv[2], '.check_new.xlsx']) postprocess = PostProcess(sys.argv[1], out_xlsx) postprocess.collect()