#! /usr/bin/env python3 import argparse import glob import json import os import re from collections import defaultdict import openpyxl import pandas as pd from openpyxl.drawing.image import Image def database(): """ 增持匹配信息 """ path = os.environ.get('DATABASE') if not path: raise UserWarning('未设置DATABASE环境变量') # gene function 基因功能描述信息 gene_function_path = os.path.join(path, 'gene_function.txt') gene_function_df = pd.read_csv(gene_function_path, sep='\t') gene_function_df = gene_function_df.fillna('.') gene_function_df['genename'] = gene_function_df.apply(lambda x: x['基因'].upper(), axis=1) # return {基因: 基因功能翻译} gene_function = gene_function_df.set_index('genename')['基因功能翻译'].to_dict() # drug_mechanism 药物描述信息 drug_mechanism_path = os.path.join(path, 'target_drug.txt') drug_mechanism_df = pd.read_csv(drug_mechanism_path, usecols=list(range(18)), sep='\t') drug_mechanism_df = drug_mechanism_df.fillna('.') drug_mechanism_df['drugs'] = drug_mechanism_df.apply(lambda x: '|'.join( [tm.upper() for tm in [x['药物名称-英文'], x['药物名称-中文']] if tm != '.']), axis=1) drug_mechanism_df['dec'] = drug_mechanism_df.apply(lambda x: "||".join( [tm for tm in [x['适应症(中文)'], x['作用机制-中文']] if tm != '.']), axis=1) # return {drugA: drugAdes\\\\drugAdes中文, drugA中文: drugAdes\\\\drugAdes中文,} drug_mechanism = {} for _, row in drug_mechanism_df.iterrows(): drug_mechanism.update(dict(zip(row['drugs'].split('|'), [row['dec']] * len(row['drugs'].split('|'))))) return dict(gene_function=gene_function, drug_mechanism=drug_mechanism) def check_file_exist_and_empty(file_path): # 检查文件是否存在 if not os.path.exists(file_path): return f"文件 '{file_path}' 不存在" # 检查文件是否为空 if os.path.getsize(file_path) == 0: return f"文件 '{file_path}' 是空文件" def tree(): return defaultdict(tree) class PostProcess: """ excel处理 """ def __init__(self, sample_name, normal_name, path, outpath): self.path = path self.outpath = outpath self.sample_name = sample_name self.normal_name = normal_name # self.needcol = self.need_col() self.database = database() self.drug_record = list() self.sheet = tree() @staticmethod def txt_2_excel(path): try: df = pd.read_csv(path, sep='\t') except (pd.errors.EmptyDataError, FileNotFoundError): return [] return df.to_dict('records') @staticmethod def _add_columns(df: pd.DataFrame): """ 添加必须列,经过转换得到新列,这个应该后期最好在注释的时候进行 """ # 证据等级转换 if 'Label' in df.columns and '证据等级' in df.columns: df['证据等级'] = df.apply(lambda x: 'C' if x['Label'] == '非适应症' else x['证据等级'], axis=1) # 突变的等级分类 if '证据等级' in df.columns: df['AMP_mut_level'] = df['证据等级'].replace(['A', 'B', 'C', 'D'], ['I', 'I', 'II', 'II']) # 药物等级划分 if 'Response_Type_C' in df.columns and '证据等级' in df.columns: df['Drug_Category'] = 'c' df.loc[df['Response_Type_C'].str.contains('敏感') & (df['证据等级'] == 'A'), 'Drug_Category'] = 'a' # df.loc[df['Response_Type_C'].str.startswith('敏感') & (df['证据等级'] == 'C'), 'Drug_Category'] = 'b' df.loc[df['Response_Type_C'].str.contains('敏感') & (df['Label'] == '非适应症'), 'Drug_Category'] = 'b' df.loc[df['Response_Type_C'].str.contains('耐药'), 'Drug_Category'] = 'd' # 胚系突变分类 通过 CLNSIG列区分突变等级 if 'CLNSIG' in df.columns: df['ACMG_level'] = '3' df.loc[df['CLNSIG'].str.lower().str.startswith('likely_pathogenic'), 'ACMG_level'] = '2' df.loc[(df['CLNSIG'].str.lower().str.startswith('pathogenic')), 'ACMG_level'] = '1' return df @staticmethod def _add_cal_col(df: pd.DataFrame): try: # 统计 'MutationTaster_pred', 'FATHMM_pred', 'MetaLR_pred' 的D的数目 tmdf2 = df[['MutationTaster_pred', 'FATHMM_pred', 'MetaLR_pred']] df['Deleterious'] = tmdf2.apply(lambda x: x.tolist().count('D'), axis=1) # 计算 这几列中最大的值 # eval(str(x)) 科学计数法转换 tmdf1 = df[ ['1000g2015aug_all', '1000g2015aug_eas', 'esp6500siv2_all', 'ExAC_nontcga_ALL', 'ExAC_nontcga_EAS', 'gnomAD_genome_ALL', 'gnomAD_genome_EAS']].replace('.', 0).applymap(lambda x: eval(str(x))) df['freq_high'] = tmdf1.max(axis=1) except KeyError as e: print(e) return df def _add_gene_function(self, df: pd.DataFrame, colname='Gene_refGene'): if colname in df.columns: df['Gene_function'] = df.apply(lambda x: self.database['gene_function'].get(x[colname], ''), axis=1) return df def drug_parse(self, drugs: list): """ 药物处理 """ # 药物单独处理 sheetname # for drug in drugs['药物中文名'].to_list(): for drug in drugs: for spdrug in re.split(" \+ |,", drug): self.drug_record.append( {'drug_name': spdrug, 'drug_detail': self.database['drug_mechanism'].get(spdrug, '')}) def cms(self): """ 样本信息 """ # cms_files = glob.glob(os.path.join(self.path, 'qc', '*_post.json')) cms_info_need = [] cms_file = os.path.join(self.path, 'qc', f'{self.sample_name}_post.json') check_file = check_file_exist_and_empty(cms_file) if check_file: return cms_info_need file_read = open(cms_file, 'r') try: cms_info = json.load(file_read)['data'] except Exception as e: raise UserWarning('cms 文件加载有误!', e) if not cms_info: raise UserWarning('cms 内容为空!') file_read.close() df = pd.DataFrame(cms_info) df['normal'] = self.normal_name cms_info_need = df.to_dict('records') self.sheet['cms'] = cms_info_need def snv(self): filter_sum = os.path.join(self.path, 'mutation', f'{self.sample_name}.snp_indel.somatic.hg19_multianno.filter.sum.txt') filter_pos = os.path.join(self.path, 'mutation', f'{self.sample_name}.snp_indel.somatic.hg19_multianno.filter.sum.pos.txt') filter_vus = os.path.join(self.path, 'mutation', f'{self.sample_name}.snp_indel.somatic.hg19_multianno.filter.sum.vus.txt') filter_neg = os.path.join(self.path, 'mutation', f'{self.sample_name}.snp_indel.somatic.hg19_multianno.filter.sum.neg.txt') tmb_file = os.path.join(self.path, 'tmb', f'{self.sample_name}.tmb.txt') filter_sum_pos_res = list() # 从pos_files中获取药物信息 pos_check = check_file_exist_and_empty(filter_pos) pos_dict = dict() if not pos_check: pos = pd.read_csv(filter_pos, sep='\t') # 添加常规列 pos = self._add_columns(pos) # 添加基因功能 pos = self._add_gene_function(pos) # 药物处理 self.drug_parse(pos['DrugCn'].to_list()) pos = pos.fillna('.') grouped_df = pos.groupby('AAChange_refGene') # 对每个分组进行操作 for group_name, group_data in grouped_df: if any(group_data['AMP_mut_level'] == 'I'): pos.loc[pos['AAChange_refGene'] == group_name, 'AMP_mut_level'] = 'I' pos_dict = pos.set_index(['Chr', 'Start', 'End'])['AMP_mut_level'].to_dict() filter_sum_pos_res = pos.to_dict('records') # 从vus_file中获取vus等级分类 'III' 级 vus_dict = dict() vus_check = check_file_exist_and_empty(filter_vus) if not vus_check: vus = pd.read_csv(filter_vus, sep='\t') vus['AMP_mut_level'] = 'III' vus_dict = vus.set_index(['Chr', 'Start', 'End'])['AMP_mut_level'].to_dict() # 从neg_file中获取neg等级分类 'IIII' 级 neg_dict = dict() neg_check = check_file_exist_and_empty(filter_neg) if not neg_check: neg = pd.read_csv(filter_neg, sep='\t') neg['AMP_mut_level'] = 'IIII' neg_dict = neg.set_index(['Chr', 'Start', 'End'])['AMP_mut_level'].to_dict() filter_sum_df = pd.DataFrame() filter_sum_check = check_file_exist_and_empty(filter_sum) if not filter_sum_check: filter_sum_df = pd.read_csv(filter_sum, sep='\t') level_dict = dict() level_dict.update(pos_dict) level_dict.update(vus_dict) level_dict.update(neg_dict) filter_sum_df['AMP_mut_level'] = filter_sum_df.set_index(['Chr', 'Start', 'End']).index.map(level_dict) cols = list(filter_sum_df.columns) tmb_file_check = check_file_exist_and_empty(tmb_file) if not tmb_file_check: tmb_df = pd.read_csv(tmb_file, sep='\t') key_cols = ['Chr', 'Start', 'End'] filter_sum_df = filter_sum_df.set_index(key_cols) tmb_df = tmb_df.set_index(key_cols) # 在filter_sum_df中的process列中追加字符串";tmb",对应tmb_df中的行 并且 是非 1,2类突变 filter_sum_df['process'] = filter_sum_df.index.map( lambda x: filter_sum_df.at[x, 'process'] + ';tmb' if x in tmb_df.index and filter_sum_df.at[ x, 'AMP_mut_level'] not in ['I', 'II'] else filter_sum_df.at[x, 'process']) # 找到tmb_df中不在filter_sum_df中的行,并将这些新的行添加到filter_sum_df中 new_rows = tmb_df[~tmb_df.index.isin(filter_sum_df.index)] filter_sum_df = pd.concat([filter_sum_df, new_rows]) # 重置索引 filter_sum_df = filter_sum_df.reset_index() # 按之前列排 filter_sum_df = filter_sum_df[cols] filter_sum_df = filter_sum_df.fillna('.') filter_sum_res = filter_sum_df.to_dict('records') self.sheet['target_mut'] = filter_sum_res self.sheet['target_drug'] = filter_sum_pos_res def fusion(self): filter_sum_pos = os.path.join(self.path, 'fusion', f'{self.sample_name}.fusion.hg19_multianno.filter.pos.txt') filter_sum_pos_check = check_file_exist_and_empty(filter_sum_pos) filter_sum_pos_res = list() pos_dict = dict() if not filter_sum_pos_check: filter_sum_pos_df = pd.read_csv(filter_sum_pos, sep='\t') # 添加常规列 filter_sum_pos_df = self._add_columns(filter_sum_pos_df) # 添加基因功能 filter_sum_pos_df = self._add_gene_function(filter_sum_pos_df, colname='ref_gene') # 药物处理 self.drug_parse(filter_sum_pos_df['DrugCn'].to_list()) filter_sum_pos_df['Validated'] = 1 filter_sum_pos_df = filter_sum_pos_df.fillna('.') grouped_df = filter_sum_pos_df.groupby(['POS', 'REF', 'ALT']) # 对每个分组进行操作 for group_name, group_data in grouped_df: pos, ref, alt = group_name if any(group_data['AMP_mut_level'] == 'I'): filter_condition = (filter_sum_pos_df['POS'] == pos) & \ (filter_sum_pos_df['REF'] == ref) & \ (filter_sum_pos_df['ALT'] == alt) filter_sum_pos_df.loc[filter_condition, 'AMP_mut_level'] = 'I' pos_dict = filter_sum_pos_df.set_index(['POS', 'REF', 'ALT'])['AMP_mut_level'].to_dict() filter_sum_pos_res = filter_sum_pos_df.to_dict('records') filter_sum = os.path.join(self.path, 'fusion', f'{self.sample_name}.fusion.hg19_multianno.filter.txt') filter_sum_check = check_file_exist_and_empty(filter_sum) filter_sum_res = list() if not filter_sum_check: filter_sum_df = pd.read_csv(filter_sum, sep='\t') filter_sum_df['Validated'] = 1 level_dict = dict() level_dict.update(pos_dict) filter_sum_df['AMP_mut_level'] = filter_sum_df.set_index(['POS', 'REF', 'ALT']).index.map(level_dict) filter_sum_df = filter_sum_df.fillna('.') filter_sum_res = filter_sum_df.to_dict('records') self.sheet['fusion_mut'] = filter_sum_res self.sheet['fusion_drug'] = filter_sum_pos_res def cnv(self): filter_sum_pos = os.path.join(self.path, 'cnv', f'{self.sample_name}.rmdup.cns.filter.pos.txt') filter_sum_pos_check = check_file_exist_and_empty(filter_sum_pos) filter_sum_pos_res = list() pos_dict = dict() if not filter_sum_pos_check: filter_sum_pos_df = pd.read_csv(filter_sum_pos, sep='\t') # 添加常规列 filter_sum_pos_df = self._add_columns(filter_sum_pos_df) # 添加基因功能 filter_sum_pos_df = self._add_gene_function(filter_sum_pos_df, colname='ref_gene') # 药物处理 self.drug_parse(filter_sum_pos_df['DrugCn'].to_list()) filter_sum_pos_df = filter_sum_pos_df.fillna('.') filter_sum_pos_df['Validated'] = 1 grouped_df = filter_sum_pos_df.groupby(['chromosome', 'start', 'end', 'ref_gene']) # 对每个分组进行操作 for group_name, group_data in grouped_df: chromosome, start, end, ref_gene = group_name if any(group_data['AMP_mut_level'] == 'I'): filter_condition = (filter_sum_pos_df['chromosome'] == chromosome) & \ (filter_sum_pos_df['start'] == start) & \ (filter_sum_pos_df['end'] == end) & \ (filter_sum_pos_df['ref_gene'] == ref_gene) filter_sum_pos_df.loc[filter_condition, 'AMP_mut_level'] = 'I' filter_sum_pos_res = filter_sum_pos_df.to_dict('records') pos_dict = filter_sum_pos_df.set_index(['chromosome', 'start', 'end', 'ref_gene'])[ 'AMP_mut_level'].to_dict() filter_sum = os.path.join(self.path, 'cnv', f'{self.sample_name}.rmdup.cns.filter.txt') filter_sum_check = check_file_exist_and_empty(filter_sum) filter_sum_res = list() if not filter_sum_check: filter_sum_df = pd.read_csv(filter_sum, sep='\t') filter_sum_df['Validated'] = 1 level_dict = dict() level_dict.update(pos_dict) filter_sum_df['AMP_mut_level'] = filter_sum_df.set_index( ['chromosome', 'start', 'end', 'ref_gene']).index.map(level_dict) filter_sum_df = filter_sum_df.fillna('.') filter_sum_res = filter_sum_df.to_dict('records') self.sheet['cnv_mut'] = filter_sum_res self.sheet['cnv_drug'] = filter_sum_pos_res def msi(self): """ Process msi result files """ msi_file = os.path.join(self.path, 'msi', f'{self.sample_name}.msi.txt') msi_check = check_file_exist_and_empty(msi_file) msi_res = dict() if not msi_check: df = pd.read_csv(msi_file, sep='\t') res = df.to_dict('records')[0] msi_res['msi_count'] = res['Total_Number_of_Sites'] msi_res['msi_value'] = res['%'] if msi_res['msi_value'] >= 30: msi_res['msi_result'] = 'MSI-H' msi_res['msi_predict'] = '对免疫检查点抑制剂可能敏感' else: msi_res['msi_result'] = 'MSS' msi_res['msi_predict'] = '对免疫检查点抑制剂可能不敏感' self.sheet['msi'] = [msi_res] def chemo(self): """ 化疗 """ # 化疗文件 .chemo.comb.txt .drug.infos.txt .drug.res.txt res_file = os.path.join(self.path, 'chemo', f'{self.sample_name}.drug.res.txt') info_file = os.path.join(self.path, 'chemo', f'{self.sample_name}.drug.infos.txt') comb_file = os.path.join(self.path, 'chemo', f'{self.sample_name}.chemo.comb.txt') res_check = check_file_exist_and_empty(res_file) info_check = check_file_exist_and_empty(info_file) comb_check = check_file_exist_and_empty(comb_file) chemo_res = dict() check_list = [res_check, info_check, comb_check] file_list = [res_file, info_file, comb_file] for index, name in enumerate(['chemo_res', 'chemo_info', 'chemo_comb']): if not check_list[index]: df = pd.read_csv(file_list[index], sep='\t') df = df.fillna('.') self.sheet[name] = df.to_dict('records') chemo_res[name] = df.to_dict('records') else: print(check_list[index]) # raise UserWarning('%s 目录下缺%s 文件' % (self.path, name)) def germline(self): germline_file = os.path.join(self.path, 'mutation', f'{self.sample_name}.snp_indel.germline.hg19_multianno.filter.txt') germlineres = [] germline_check = check_file_exist_and_empty(germline_file) if not germline_check: df = pd.read_csv(germline_file, sep='\t') df = df.fillna('.') germlineres = df.to_dict('records') self.sheet['germline'] = germlineres def heredity(self): """ 遗传 """ heredi_file = os.path.join(self.path, 'hereditary', f'{self.sample_name}.hereditary.pre.txt') heredires = [] heredi_check = check_file_exist_and_empty(heredi_file) if not heredi_check: df = pd.read_csv(heredi_file, sep='\t') df = df.fillna('.') heredires = df.to_dict('records') self.sheet['hereditary'] = heredires def heredity_res(self): """ 遗传结果文件 """ hereditary_file = os.path.join(self.path, 'hereditary', f'{self.sample_name}.hereditary.txt') heredi_risk_file = os.path.join(self.path, 'hereditary', f'{self.sample_name}.risk.txt') for name, file in zip(['hereditary', 'hereditary_risk', ], [hereditary_file, heredi_risk_file]): file_check = check_file_exist_and_empty(file) if not file_check: df = pd.read_csv(file, sep='\t') df = df.fillna('.') self.sheet[name] = df.to_dict('records') else: # raise UserWarning('%s 目录下缺%s 文件' % (self.path, name)) print(file_check) self.sheet[name] = [] def indication(self): indication_file = os.path.join(self.path, 'mutation', 'indication.txt') file_check = check_file_exist_and_empty(indication_file) if not file_check: df = pd.read_csv(indication_file, sep='\t') df = df.fillna('.') self.sheet['indication'] = df.to_dict('records') else: # raise UserWarning('%s 目录下缺%s 文件' % (self.path, name)) print(file_check) def longindel(self): filter_sum_pos = os.path.join(self.path, 'fusion', f'{self.sample_name}.longindel.hg19_multianno.filter.pos.txt') filter_sum_pos_check = check_file_exist_and_empty(filter_sum_pos) filter_sum_pos_res = list() pos_dict = dict() if not filter_sum_pos_check: filter_sum_pos_df = pd.read_csv(filter_sum_pos, sep='\t') # 添加常规列 filter_sum_pos_df = self._add_columns(filter_sum_pos_df) # 添加基因功能 filter_sum_pos_df = self._add_gene_function(filter_sum_pos_df, colname='ref_gene') # 药物处理 self.drug_parse(filter_sum_pos_df['DrugCn'].to_list()) filter_sum_pos_df['Validated'] = 1 filter_sum_pos_df = filter_sum_pos_df.fillna('.') grouped_df = filter_sum_pos_df.groupby(['#CHROM', 'POS', 'REF', 'ALT']) # 对每个分组进行操作 for group_name, group_data in grouped_df: chr, pos, ref, alt = group_name if any(group_data['AMP_mut_level'] == 'I'): filter_condition = (filter_sum_pos_df['#CHROM'] == chr) & \ (filter_sum_pos_df['POS'] == pos) & \ (filter_sum_pos_df['REF'] == ref) & \ (filter_sum_pos_df['ALT'] == alt) filter_sum_pos_df.loc[filter_condition, 'AMP_mut_level'] = 'I' pos_dict = filter_sum_pos_df.set_index(['#CHROM', 'POS', 'REF', 'ALT'])['AMP_mut_level'].to_dict() filter_sum_pos_res = filter_sum_pos_df.to_dict('records') filter_sum = os.path.join(self.path, 'fusion', f'{self.sample_name}.longindel.hg19_multianno.filter.txt') filter_sum_check = check_file_exist_and_empty(filter_sum) filter_sum_res = list() if not filter_sum_check: filter_sum_df = pd.read_csv(filter_sum, sep='\t') filter_sum_df['Validated'] = 1 level_dict = dict() level_dict.update(pos_dict) filter_sum_df['AMP_mut_level'] = filter_sum_df.set_index(['#CHROM', 'POS', 'REF', 'ALT']).index.map( level_dict) filter_sum_df = filter_sum_df.fillna('.') filter_sum_res = filter_sum_df.to_dict('records') self.sheet['longindel_mut'] = filter_sum_res self.sheet['longindel_drug'] = filter_sum_pos_res def neoantigen(self): neoantigen = os.path.join(self.path, 'neoantigen', f'MHC_Class_I', 'neoantigen.txt') hla = os.path.join(self.path, 'neoantigen', f'hla', f'{self.normal_name}_result.tsv') self.sheet['neoantigen'] = self.txt_2_excel(neoantigen) self.sheet['hla'] = self.txt_2_excel(hla) def qc(self): qc_files = glob.glob(os.path.join(self.path, 'qc', f'{self.sample_name}_qc.txt')) qc_res = [] if qc_files: if os.path.getsize(qc_files[0]) > 0: qc = pd.read_csv(qc_files[0], sep='\t', header=None) qc_res = [qc.set_index(0)[1].to_dict()] self.sheet['qc'] = qc_res def drugs(self): drug_record = pd.DataFrame(self.drug_record) drug_record = drug_record.drop_duplicates() self.sheet['drugs'] = drug_record.to_dict('records') def collect(self): writer = pd.ExcelWriter(self.outpath) self.cms() self.qc() self.snv() self.fusion() self.longindel() self.cnv() self.msi() self.germline() self.heredity() self.heredity_res() self.chemo() self.indication() self.neoantigen() self.drugs() # 遍历CSV文件列表 for sheet_name in self.sheet: # 读取CSV文件为DataFrame df = pd.DataFrame(self.sheet[sheet_name]) df.to_excel(writer, sheet_name=sheet_name, index=False) # 保存并关闭Excel写入器 writer.close() # 加入cnv.png ??? wb = openpyxl.load_workbook(filename=self.outpath) ws = wb['cnv_mut'] mr = ws.max_row cell = 'C' + str(mr + 4) cnv_pic_paths = glob.glob(os.path.join(self.path, 'cnv', '*cnv.png')) if cnv_pic_paths: image = Image(cnv_pic_paths[0]) ws.add_image(image, cell) wb.save(self.outpath) wb.close() if __name__ == '__main__': # 未加日志,未添加路径 parser = argparse.ArgumentParser(description="post Process Script") parser.add_argument('-n', '--barcode', help="sample's barcode", required=True) parser.add_argument('-s', '--normal', help="sample's normal", default='', required=False, nargs='?') parser.add_argument('-c', '--path', help="workflow run path", required=True) parser.add_argument('-o', '--output', help="Output", required=True) args = parser.parse_args() postprocess = PostProcess(args.barcode, args.normal, args.path, args.output) postprocess.collect()