#!/usr/bin/env python3 # -*- coding: UTF-8 -*- """ Created on: 2021-10-09 @author: cjs # 用途:处理遗传性并发症信息 # 版本:0.0.3 # 0.0.2 2021-11-09 输出文件1,如果没有阳性结果,输出文件大小为0kb,表头也不写 # 0.0.3 2022-01-28 160和650基因信息用不同的文件 # 0.0.3 2022-01-28 日志中记录参数列表以空格合并 # 0.0.4 2022-12-06 增加pro.txt,检测到的基因原始信息,临床意义包含1、2、3 # 0.0.4 2022-12-06 1.txt修改文件名为:hereditary.txt,临床意义包含1、2、3;2.txt修改为:hereditary.risk.txt # 0.0.4 2022-12-06 风险等级"高风险"修改为"偏高" # 0.0.5 2023-03-06 增加624项目 # 最后编辑日期: 2022-01-28 """ from cjs_test.cjs_logger import Logger from glob import glob from collections import defaultdict import datetime import traceback import os import sys # 全局参数 Output_dir = r'' P_Name = r'' Project = r'' Panle_Txt = 'r' # 脚本运行的参数 Exe_Path = r'' GLog = None StartTime = r'' # 配置文件 Gene_Panle = {'160gene': '160_panel.hereditary.txt', '624gene': '624_panel.hereditary.txt', '650gene': '650_panel.hereditary.txt'} Gene_TD = {'160gene': 'gene_dd.160.txt', '624gene': 'gene_dd.624.txt', '650gene': 'gene_dd.650.txt'} Min_Rate = 0.10 # 胚系突变默认最低突变频率 # 基因常见转录本信息 Ref_Txt = 'oncokbgene.txt' Gene_NM = {} def usage(df_exe): """使用方法.""" print("Usage:") print("%s -o Output_dir -p Project [--n P_Name ]" % df_exe) sys.exit(0) def Get_Argvs(): """获取环境变量.""" global Output_dir global P_Name global Project global Panle_Txt global Exe_Path global GLog global StartTime argvs = sys.argv file_real = os.path.realpath(argvs[0]) Exe_Path = os.path.dirname(file_real) bin_name = os.path.basename(file_real) StartTime = datetime.datetime.now() ymd = StartTime.__format__('%Y%m%d_%H%M%S') # print(argvs) if len(argvs) < 5: print('参数列表数量不对, %s' % argvs) usage(bin_name) else: # 参数解析 opt_normal = '' argv_index = 1 for argv in argvs[1:]: argv_index += 1 argv = argv.upper() # 必选参数获取 if argv == '-O': Output_dir = os.path.realpath(argvs[argv_index]) # 新建日志 if not os.path.exists(Output_dir): print('项目路径不存在: %s' % Output_dir) usage(bin_name) elif argv == '-P': Project = argvs[argv_index] Panle_Txt = Gene_Panle.get(Project, '') # 可选参数获得 elif argv == '--N': opt_normal = argvs[argv_index] # 核对参数解析结果 if Output_dir == '': print('未能解析出项目路径') usage(bin_name) elif Panle_Txt == '': print('-p:%s, 指定的项目类型不正确' % Project) usage(bin_name) # 解析noraml mut_Path = os.path.join(Output_dir, 'mutation') ff_txt = '' txt_base = '' ft_txts = [] if opt_normal == '': ft_txts = glob('%s/*Germline*filtered.txt' % (mut_Path)) else: ft_txts = glob('%s/*%s*Germline*filtered.txt' % ( mut_Path, opt_normal)) if len(ft_txts) == 0: if opt_normal == '': print('%s,项目路径未发现filtered.txt文件' % Output_dir) else: print('项目路径未发现名称为:%s的相关filtered.txt文件' % opt_normal) usage(bin_name) else: # 理论只能找到1个相关文件,只取列表中的第一个文件 ff_txt = ft_txts[0] txt_base = os.path.basename(ff_txt) print('使用的txt文件:\n%s' % txt_base) txt_name = txt_base.split('.')[0] if opt_normal == '': P_Name = txt_name else: P_Name = opt_normal # 日志 hd_path = os.path.join(Output_dir, 'hereditary') if not os.path.exists(hd_path): os.makedirs(hd_path) log_base = '%s_%s.log' % (P_Name, ymd) log_full = os.path.join(hd_path, log_base) GLog = Logger(log_full, mode='w') GLog.info('argvs: %s' % ' '.join(argvs)) GLog.info('start') GLog.info('project-txt:%s' % txt_base) GLog.info('project-name:%s' % P_Name) return ff_txt def Preocess_NM(): """读取基因的常见转录本信息.""" global Gene_NM ref_txt = os.path.join(Exe_Path, 'ref', Ref_Txt) # ref_txt = '/home/codes/reportbase/oncokbgene.txt' if os.path.isfile(ref_txt): with open(ref_txt, 'r', encoding='utf8') as ff: for line in ff: line = line.strip() lns = line.split('\t') if len(lns) > 2 and not lns[0].startswith('#'): gene = lns[0] nm = lns[2].split('.')[0] # 去除版本号 Gene_NM[gene] = nm else: eline = '不存在ref文件:%s' % ref_txt print(eline) GLog.info(eline) # 处理txt文件 def Process_Txt(df_txt): """读取项目的txt文件.""" Preocess_NM() # 处理当前项目的基因列表 pro_genes = [] p_txt = os.path.join(Exe_Path, 'config', Panle_Txt) with open(p_txt, 'r', encoding='utf-8') as ff: for line in ff: gene = line.strip() if not line.startswith('#') and len(gene) > 0: pro_genes.append(gene) # 处理gene与综合征、遗传疾病的关系 gene_dd = {} dd_base = Gene_TD.get(Project) dd_full = os.path.join(Exe_Path, 'config', dd_base) with open(dd_full, 'r', encoding='utf-8') as ff: for line in ff: line = line.strip() if not line.startswith('#') and len(line) > 0: lns = line.split('\t') gene = lns[0] gene_ls = lns[1].split('|') gene_dd[gene] = gene_ls # 筛选条件1:临床意义为1致病的2疑似致病的 # 筛选条件2:突变频率大于10% muts_dd = defaultdict(list) inlines = [] with open(df_txt, 'r', encoding='utf-8') as ff: inlines = ff.readlines() head = inlines[0] mut_index = -1 head_infos = head.strip().split('\t') try: mut_index = head_infos.index("Otherinfo13") except Exception: print('未发现Otherinfo13列信息') res3 = [head] if mut_index > -1: for line in inlines[1:]: lns = line.split('\t') if len(lns) >= 100: normal_info = lns[mut_index] mut_rate = 0 gene = lns[7] gene_type = '' gene_p = '' try: mut_infos = normal_info.split(':') mut_type = mut_infos[0] if mut_type == '0/1': gene_type = '杂合' elif mut_type == '1/1': gene_type = '纯合' # 区别对待组织版(频率带%)和血液版(频率不带%) if normal_info.find(r'%') > -1: mut_chrs = mut_infos[5] mut_rate = float(mut_chrs.strip('%')) / 100 else: # # 血液版区别对待1/1 mut_chrs = mut_infos[4] # if normal_info.startswith('1/1'): # mut_chrs = mut_infos[4] mut_rate = float(mut_chrs.strip()) except Exception: print(normal_info) print(traceback.format_exc()) if gene in pro_genes: if mut_rate >= Min_Rate: # 只要在项目内的基因都收集到pre.txt res3.append(line) nm_pos = 0 gene_nms = lns[10].split(':') nm_index = 0 gene_nm = Gene_NM.get(gene, '') if gene_nm != '': for nm in gene_nms: if nm.startswith('NM_'): if nm == gene_nm: gene_p = gene_nms[nm_index + 4] # 防止格式不一致,多一个基因 gene_p = gene_p.split(',')[0] nm_pos = 1 break else: # 没有发现常见转录本 gene_p = gene_nms[-1] if nm_pos == 0: print('%s, Gene未发现常见转录本' % gene) GLog.info('%s, Gene未发现常见转录本' % gene) GLog.info('NM:%s' % lns[10]) temp_ls = [gene_type, gene_p] if lns[0] != '3': muts_dd[gene].append(temp_ls) # 写最终结果文件 # 1 hd_path = os.path.join(Output_dir, 'hereditary') txt1_base = '%s.hereditary.txt' % P_Name txt3_base = '%s.hereditary.pre.txt' % P_Name txt1_full = os.path.join(hd_path, txt1_base) txt3_full = os.path.join(hd_path, txt3_base) with open(txt1_full, 'w', encoding='utf-8') as ff: ff.write('基因\t遗传性肿瘤综合征\t遗传方式\t杂合/纯合\t检测结果\n') if len(muts_dd) > 0: for gene in muts_dd: gene_values = muts_dd[gene] if gene in pro_genes: gene_info = gene_dd.get(gene, ['', ''])[0] gene_types = [] gene_ps = [] for gene_value_ls in gene_values: gene_types.append(gene_value_ls[0]) gene_ps.append(gene_value_ls[1]) # 多个综合征分隔符 split_pos = ';' gene_type = split_pos.join(gene_types) gene_p = split_pos.join(gene_ps) line = '%s\t%s\t%s' % (gene, gene_info, 'AD') line += '\t%s\t%s\n' % (gene_type, gene_p) ff.write(line) # 3 with open(txt3_full, 'w', encoding='utf-8') as ff: if len(res3) > 1: ff.writelines(res3) # 2 txt2_base = '%s.risk.txt' % P_Name txt2_full = os.path.join(hd_path, txt2_base) cancers_high = [] # 记录已经统计过的高风险肿瘤类型 cancers_normal = [] # 记录已经统计过的正常风险肿瘤类型 for gene in pro_genes: gene_info = gene_dd.get(gene, ['', ''])[1] for cancer in gene_info.split(';'): # 突变有高风险 if gene in muts_dd: if cancer not in cancers_high: cancers_high.append(cancer) # 正常分享 else: if cancer not in cancers_normal: cancers_normal.append(cancer) cancers = [] # 记录所有的肿瘤类型 with open(txt2_full, 'w', encoding='utf-8') as ff: ff.write('肿瘤类型\t风险值\n') for gene in pro_genes: gene_info = gene_dd.get(gene, ['', ''])[1] for cancer in gene_info.split(';'): if cancer not in cancers: cancers.append(cancer) cancer_risk = '' if cancer in cancers_high: cancer_risk = '偏高' else: cancer_risk = '同一般人群' line = '%s\t%s\n' % (cancer, cancer_risk) ff.write(line) if __name__ == '__main__': ff_txt = Get_Argvs() try: Process_Txt(ff_txt) except BaseException: GLog.error(traceback.format_exc()) print(traceback.format_exc()) endtime = datetime.datetime.now() GLog.info('end') GLog.info('run time:%s seconds' % ((endtime - StartTime).seconds)) GLog.close()