pipeline/script/hereditary/Check_Gene.py

265 lines
8.8 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

#!/usr/bin/env python3
# -*- coding: UTF-8 -*-
"""
Created on: 2021-09-30
@author: cjs
# 用途:处理遗传性并发症信息,生成hereditary.py脚本所需的基因信息文件
# panel.hereditary.txt为panel中有综合征信息的基因
# gene_dd.txt为由综合征解释的基因。有的基因不在panel中只是在综合征的解释中
# 版本0.0.2
# 2022-02-10 0.0.2 项目模板文件取消对别称的输入,所以别称的检查也取消
# 最后编辑日期: 2021-09-30
"""
from cjs_test.cjs_logger import Logger
from openpyxl import load_workbook
from collections import defaultdict
import pandas as pd
import datetime
import traceback
import os
import sys
# 全局参数
GLog = None
Exe_Path = ''
# xlsx文件
XLSX_FILE = '160panel遗传性肿瘤综合征.庞杰.2022.02.10修.xlsx'
# XLSX_FILE = '650panel遗传性肿瘤综合征.庞杰.2022.02.10修.xlsx'
TXT_IN = '160_panel_genes.txt'
# TXT_IN = '650_panel_genes.txt'
DD_OUT = 'gene_dd.160.txt'
TXT_OUT = '160_panel.hereditary.txt'
# 脚本变量
Gene_Dd = defaultdict(list) # 记录基因与综合征,肿瘤的对应关系
# HGNC基因列表在列表中的直接当做基因不在列表中的当做其它基因的别称
Genes_Ls = []
# 基因别称的字典
Gas_Dd = {}
# 处理基因别名
def Creat_Gas():
global Gas_Dd
global Genes_Ls
hgnc_base = 'gene_with_protein_product2.txt'
hgnc_full = os.path.join(Exe_Path, 'ref', '001', 'HGNC', hgnc_base)
pd_txt = pd.read_table(hgnc_full) # 这个是带有标题的文件
gene_names = list(pd_txt['symbol']) # 根据标题来取值
head_list = list(pd_txt.iloc[0].keys())
name_index = head_list.index('symbol')
alias_index = head_list.index('alias_symbol')
prev_index = head_list.index('prev_symbol')
with open(hgnc_full, 'r', encoding='utf8') as ff:
for line in ff:
line = line.replace('\n', '')
if len(line) > 0 and not line.startswith('#'):
lns = line.split('\t')
# print(lns)
gene_name = lns[name_index]
if gene_name not in Genes_Ls:
Genes_Ls.append(gene_name)
alias_symbols = lns[alias_index]
prev_symbols = lns[prev_index]
# 首先核对alias_symbols
for gene_symbol in alias_symbols.split('|'):
if len(gene_symbol) > 0:
gene_symbol = gene_symbol.replace(r'"', '')
if gene_symbol not in gene_names:
if gene_symbol not in Gas_Dd:
Gas_Dd[gene_symbol] = gene_name
# 再核对prev_symbols
for gene_symbol in prev_symbols.split('|'):
if len(gene_symbol) > 0:
gene_symbol = gene_symbol.replace(r'"', '')
if gene_symbol not in gene_names:
if gene_symbol not in Gas_Dd:
Gas_Dd[gene_symbol] = gene_name
# 处理综合征,基因,肿瘤的对应关系
def Get_Gene_Dd(df_xlsx):
global Gene_Dd
# 需要的信息:遗传性肿瘤综合征中文名\致病基因\相关肿瘤
# 遗传方式默认AD
wb = load_workbook(df_xlsx, data_only=True) # 公式时取值
sheets = wb.sheetnames
ws = wb[sheets[0]]
col_max = ws.max_column # 数据终止行, 以0开始计数
col_min = ws.min_column # 数据开始行, 以0开始计数
row_max = ws.max_row # 数据终止列(包含) 以0开始计数
row_min = ws.min_row # 数据开始列(包含) 以0开始计数
# 按照行处理数据
sheet_rows = [row for row in ws.rows] # 获取所有行
# 表头处理
syndrome_head = '遗传性肿瘤综合征中文名'
gene_head = '致病基因'
cancer_head = '相关肿瘤'
key_list = [syndrome_head, gene_head, cancer_head]
key_index = {}
for d_key in key_list:
key_index[d_key] = -1
cell_index = col_min - 1
for cell in sheet_rows[row_min - 1][col_min - 1:col_max]:
cell_str = str(cell.value)
# print(cell_index, cell_str)
for d_key in key_list:
if cell_str.find(d_key) > -1:
key_index[d_key] = cell_index
break
cell_index += 1
check_pos = 1
for d_key in key_list:
if key_index[d_key] == -1:
check_pos = 0
print('%s, 表头中未发现' % d_key)
if check_pos == 1:
for sheet_row in sheet_rows[row_min:row_max]:
temp_dd = {}
for d_key in key_list:
d_index = key_index[d_key]
temp_dd[d_key] = str(sheet_row[d_index].value)
# print(temp_dd)
genes = temp_dd[gene_head].strip()
syndrome = temp_dd[syndrome_head].strip()
cancers = temp_dd[cancer_head].strip()
# 基因信息分隔,分隔标识:中文顿号,空格
genes = genes.replace('', ',')
genes = genes.replace(' ', ',')
for gene in genes.split(','):
# 多个空格连续在一起会造成分隔出空值
if len(gene) > 0:
if gene not in Genes_Ls:
print(gene, '基因不是HGNC标准名称')
gene_gas = Gas_Dd.get(gene, gene)
Gene_Dd[gene_gas].append([syndrome, cancers])
else:
Gene_Dd[gene].append([syndrome, cancers])
# 处理结束
wb.close()
print('BRCA1', Gene_Dd['BRCA1'])
def Check_Gene_Dd():
txt_dd = {}
# 处理初步的gene可能是别称的字典
for gene in Gene_Dd:
her_syns = Gene_Dd[gene]
# 处理更改过名称的基因
her_ls = []
syn_ls = []
for her_syn in her_syns:
hers = her_syn[0].strip()
syns = her_syn[1].strip()
hers = hers.replace('', ',')
hers = hers.replace('', ',')
# hers = hers.replace(' ', ',')
hers = hers.replace('', '')
syns = syns.replace('', ',')
syns = syns.replace('', ',')
# syns = syns.replace(' ', ',')
syns = syns.replace('', '')
for her in hers.split(','):
if len(her) > 0:
if her not in her_ls:
her_ls.append(her)
for syn in syns.split(','):
if len(syn) > 0:
if syn not in syn_ls:
syn_ls.append(syn)
# 遗传方式默认AD
hy_type = ['AD']
# 使用英文逗号分隔各个综合征
teml_ls = [';'.join(her_ls)] + [';'.join(syn_ls)] + hy_type
txt_dd[gene] = teml_ls
# 写入字典到文件
out_path = os.path.join(Exe_Path, 'config')
if not os.path.exists(out_path):
os.makedirs(out_path)
txt_full = os.path.join(out_path, DD_OUT)
with open(txt_full, 'w', encoding='utf8') as ff:
for gene in txt_dd:
line = '%s\t%s\n' % (gene, '|'.join(txt_dd[gene]))
ff.write(line)
def Check_txt(pro_gene_txt):
genes = []
with open(pro_gene_txt, 'r', encoding='utf8') as ff:
for line in ff:
if not line.startswith('#'):
line = line.strip()
if len(line) > 0:
if line not in genes:
genes.append(line)
out_path = os.path.join(Exe_Path, 'config')
if not os.path.exists(out_path):
os.makedirs(out_path)
out_full = os.path.join(out_path, TXT_OUT)
with open(out_full, 'w', encoding='utf8') as ff:
ff.write('#该panel在遗传综合征数据库匹配到的基因\n')
for gene in genes:
if gene in Gene_Dd:
ff.write(gene + '\n')
def Check_Pros():
txt_full = os.path.join(Exe_Path, 'ref', TXT_IN)
Check_txt(txt_full)
if __name__ == '__main__':
file_real = os.path.realpath(sys.argv[0])
Exe_Path = os.path.dirname(file_real)
bin_name = os.path.basename(file_real)
log_path = os.path.join(Exe_Path, 'logs', bin_name)
if not os.path.exists(log_path):
os.makedirs(log_path)
starttime = datetime.datetime.now()
ymd = starttime.__format__('%Y%m%d_%H%M%S')
log_base = '%s_%s.log' % (bin_name, ymd)
log_full = os.path.join(log_path, log_base)
GLog = Logger(log_full, mode='w')
GLog.info('start')
Creat_Gas()
xlsx_ff = os.path.join(Exe_Path, 'ref', XLSX_FILE)
Get_Gene_Dd(xlsx_ff)
Check_Gene_Dd()
Check_Pros()
try:
pass
except BaseException:
GLog.error(traceback.format_exc())
print(traceback.format_exc())
endtime = datetime.datetime.now()
GLog.info('end')
GLog.info('run time:%s seconds' % ((endtime - starttime).seconds))
GLog.close()