report/tools/parsexlsx.py

384 lines
16 KiB
Python
Raw Normal View History

2023-07-31 13:49:34 +08:00
import json
import re
import sys
import time
from collections import defaultdict
import pandas as pd
from tools.readxlsx import read
def tree():
return defaultdict(tree)
class BaseAssignment:
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.result = tree() # 报告结果
self.signtb = set() # 具有明确或潜在临床意义的基因变异
self.signdurg = set() # 潜在临床获益的治疗药物
2023-08-01 15:51:00 +08:00
self.sign_from_type = defaultdict(dict) # 不同来源有意义突变记录NCCN使用
2023-07-31 13:49:34 +08:00
self.drugs_type = dict()
class Parse(BaseAssignment):
def __init__(self, sampledata, *args, **kwargs):
super().__init__(*args, **kwargs)
self.sampledata = sampledata
def cms(self):
"""
样本信息处理
"""
data = pd.DataFrame(self.sampledata['sample_info'])
if data.empty:
raise UserWarning('sample_info表为空生成报告失败')
data = data.applymap(
2023-08-01 15:51:00 +08:00
lambda x: str(x).replace('.', '/').replace('-', '/').replace('——', '/') if str(x) in ['.', '-',
'——'] else x)
2023-07-31 13:49:34 +08:00
data_dict = data.to_dict('index')[0]
data_dict['receiveTime'] = re.split(' ', data_dict['receiveTime'])[0]
data_dict['reportTime'] = time.strftime("%Y-%m-%d", time.localtime())
self.result['c'] = data_dict
def target(self):
data = pd.DataFrame(self.sampledata['snvindel'])
res = []
if data.empty:
self.result['snvindel'] = res
return
2023-08-01 18:09:34 +08:00
data['freq'] = data['mutant_frequency'].apply(lambda x: float(x.replace('%', '')))
# 去除不可信vus中的 %2以下的突变
data = data[data['Validated'] == 1 & (~((data['freq'] < 2) & (data['AMP_mut_level'] == 'III')))].reset_index()
2023-07-31 13:49:34 +08:00
data['muttype'] = '/'
data.loc[data['ExonicFunc.refGene'].str.match('nonsynonymous SNV'), 'muttype'] = '错义突变'
data.loc[data['ExonicFunc.refGene'].str.match('^frameshift'), 'muttype'] = '移码突变'
data.loc[data['ExonicFunc.refGene'].str.match('^nonframeshift'), 'muttype'] = '非移码突变'
data.loc[data['ExonicFunc.refGene'].str.match('stopgain'), 'muttype'] = '提前终止'
# 拆分hgvs
data[['gene', 'transcript', 'exon', 'nacid', 'aacid']] = data['AAChange.refGene'].str.split(':', expand=True)
# 没有氨基酸改变用核苷酸改变代替
2023-08-01 15:51:00 +08:00
# data['aacid'] = data['aacid'].fillna(data['nacid'])
data['aacid'] = data['aacid'].fillna('/')
2023-07-31 13:49:34 +08:00
for alter, alter_data in data.groupby('AAChange.refGene'):
alter_data_need = alter_data[['gene', 'transcript', 'exon', 'nacid', 'aacid', 'mutant_frequency',
'AMP_mut_level', 'muttype', 'Gene_function']]
alter_res = alter_data_need.iloc[0].to_dict()
alter_res['drug_category'] = self._drug_category(alter_data)
drug_content = alter_data[
['DrugCn', 'Response_Type', 'Indication', 'Evidence_Source', 'Efficacy_Evidence']]
drug_content = drug_content[drug_content['DrugCn'] != '.']
alter_res['drug_content'] = drug_content.reset_index().to_dict('records')
alter_res['alter'] = alter
res.append(alter_res)
# 汇总
if alter_res['AMP_mut_level'] in ['I', 'II']:
self.signtb.add(alter)
2023-08-01 18:09:34 +08:00
self.sign_from_type['target'][alter_res['gene']] = '%s(%s)' % (
alter_res['aacid'], alter_res['mutant_frequency'])
2023-07-31 13:49:34 +08:00
self.result['snvindel'] = res
def fusion(self):
data = pd.DataFrame(self.sampledata['fusion'])
res = []
if data.empty:
self.result['fusion'] = res
return
data = data[data['Validated'] == 1].reset_index()
for alter, alter_data in data.groupby('FUSION'):
2023-08-01 15:51:00 +08:00
alter_data_need = alter_data[['FUSION', 'FREQ1', 'AMP_mut_level', 'Gene_function', 'Gene_Symbol']]
2023-07-31 13:49:34 +08:00
alter_res = alter_data_need.iloc[0].to_dict()
alter_res['drug_category'] = self._drug_category(alter_data)
drug_content = alter_data[
['DrugCn', 'Response_Type', 'Indication', 'Evidence_Source', 'Efficacy_Evidence']]
drug_content = drug_content[drug_content['DrugCn'] != '.']
alter_res['drug_content'] = drug_content.reset_index().to_dict('records')
2023-08-01 15:51:00 +08:00
des = '%s 融合' % (alter_res['FUSION'].replace('-', '::'))
alter_res['alter'] = des
2023-07-31 13:49:34 +08:00
res.append(alter_res)
# 汇总
if alter_res['AMP_mut_level'] in ['I', 'II']:
self.signtb.add(alter)
2023-08-01 15:51:00 +08:00
self.sign_from_type['fusion'][alter_res['Gene_Symbol']] = des
2023-07-31 13:49:34 +08:00
self.result['fusion'] = res
def cnv(self):
data = pd.DataFrame(self.sampledata['cnv'])
res = []
if data.empty:
self.result['cnv'] = res
return
data = data[data['Validated'] == 1].reset_index()
for alter, alter_data in data.groupby('Gene_Symbol'):
alter_data_need = alter_data[['Gene_Symbol', 'Copy_number', 'AMP_mut_level', 'Gene_function']].reset_index()
alter_data_need['muttype'] = '缺失'
alter_data_need.loc[alter_data_need['Copy_number'] > 2, 'muttype'] = '扩增'
alter_res = alter_data_need.iloc[0].to_dict()
alter_res['drug_category'] = self._drug_category(alter_data)
drug_content = alter_data[
['DrugCn', 'Response_Type', 'Indication', 'Evidence_Source', 'Efficacy_Evidence']]
drug_content = drug_content[drug_content['DrugCn'] != '.']
alter_res['drug_content'] = drug_content.reset_index().to_dict('records')
2023-08-01 15:51:00 +08:00
des = '%s %s' % (alter, alter_res['muttype'])
2023-07-31 13:49:34 +08:00
alter_res['alter'] = '%s %s' % (alter, alter_res['muttype'])
res.append(alter_res)
# 汇总
if alter_res['AMP_mut_level'] in ['I', 'II']:
self.signtb.add(alter)
2023-08-01 15:51:00 +08:00
self.sign_from_type['cnv'][alter_res['Gene_Symbol']] = des
2023-07-31 13:49:34 +08:00
self.result['cnv'] = res
def hotspot(self):
self._to_records('hotspot')
def met(self):
self._to_records('MET')
def longindel(self):
self._to_records('longindel')
def mmr(self):
data = pd.DataFrame(self.sampledata['MMR'])
result_summary = '未检测到相关基因突变'
predict = '对免疫检查点抑制剂可能不敏感'
mmr_num = 0
res = []
if not data.empty:
tmdf = data[['gene', 'p_change']].reset_index()
tmdf['result_summary'] = tmdf.apply(lambda x: '%s %s' % (x['gene'], x['p_change']), axis=1)
result_summary = ' | '.join(tmdf['result_summary'].to_list())
predict = '对免疫检查点抑制剂可能敏感'
mmr_num = len(data.index)
res = data.to_dict('records')
self.result['MMR'] = res
self.result['sum']['mmr'] = dict(
result_summary=result_summary,
predict=predict,
mmr_num=mmr_num
)
def msi(self):
self._to_dicts('MSI')
def chemo(self):
chemo_res = self._to_records('chemo_res', need=True)
2023-08-01 15:51:00 +08:00
chemo_res_list = []
chemo_sign_drug_num = 0
chemo_drug_category = dict()
if chemo_res:
chemo_res_df = pd.DataFrame(chemo_res)
chemo_res_df.index = chemo_res_df.index + 1
chemo_res_df = chemo_res_df.reset_index()
chemo_res_list = chemo_res_df.to_dict('records')
chemo_sign_drug_num = len(chemo_res_df[chemo_res_df['推荐程度'] == '推荐'])
chemo_drug_category = chemo_res_df.groupby('推荐程度')['药物名称'].apply(
','.join).to_dict()
self.result['chemo']['chemo_res'] = chemo_res_list
self.result['sum']['chemo']['drug_num'] = chemo_sign_drug_num
self.result['sum']['chemo']['drug_category'] = chemo_drug_category
2023-07-31 13:49:34 +08:00
chemo_comb = self._to_records('chemo_comb', need=True)
chemo_comb_res = dict()
if chemo_comb:
chemo_comb_res = pd.DataFrame(chemo_comb).groupby('癌种').apply(
lambda group: group.set_index('癌种').to_dict('records')).to_dict()
self.result['chemo']['chemo_comb'] = chemo_comb_res
chemo_info = self._to_records('chemo_info', need=True)
chemo_info_res = dict()
if chemo_info:
chemo_info_res = pd.DataFrame(chemo_info).groupby('药物').apply(
lambda group: group.set_index('药物').to_dict('records')).to_dict()
self.result['chemo']['chemo_info'] = chemo_info_res
def hcs(self):
self._to_records('HCS')
self.result['sum']['hcs']['num'] = len(self.result['HCS'])
def heredity(self):
"""
遗传的结果文件
:return:
"""
hereditary = pd.DataFrame(self.sampledata['hereditary'])
result = '/'
disease = '/'
risk = '/'
if not hereditary.empty:
result = '|'.join(hereditary.apply(lambda x: '%s %s' % (x['基因'], x['检测结果']), axis=1).to_list())
disease = '|'.join(hereditary['遗传性肿瘤综合征'].to_list())
hereditary_risk = pd.DataFrame(self.sampledata['hereditary_risk'])
if not hereditary_risk.empty:
2023-08-01 15:51:00 +08:00
risk = ','.join(hereditary_risk[hereditary_risk['风险值'] == '偏高']['肿瘤类型'].to_list()) + '风险可能较高'
2023-07-31 13:49:34 +08:00
self.result['hereditary'] = hereditary.to_dict('records')
self.result['sum']['hereditary']['result'] = result
self.result['sum']['hereditary']['disease'] = disease
self.result['sum']['hereditary']['risk'] = risk
2023-08-01 15:51:00 +08:00
self.result['sum']['hereditary']['num'] = len(hereditary.index)
2023-07-31 13:49:34 +08:00
def qc(self):
# self._to_dicts('qc')
data = pd.DataFrame(self.sampledata['qc'])
res = {}
if not data.empty:
data.rename(columns={
'Q30(%)': 'q30',
'mean_depth(dedup)': 'depth',
'coverage(>=0.2*meanx)': 'coverage'
}, inplace=True)
res = data.to_dict('index')[0]
self.result['qc'] = res
def drugs(self):
data = pd.DataFrame(self.sampledata['drugs'])
res = {}
if not data.empty:
data = data.dropna()
data = data[data['drug_detail'] != '.']
2023-08-02 09:25:39 +08:00
data = data.applymap(lambda x: str(x).replace('||', '\n') if '||' in str(x) else x)
2023-07-31 13:49:34 +08:00
res = data.set_index('drug_name')['drug_detail'].to_dict()
self.result['drugs']['drugs_detail'] = res
def indication(self):
2023-08-01 15:51:00 +08:00
indication_res = self._to_records('indication', need=True)
trans = dict(
突变='target',
融合='fusion',
扩增='cnv'
)
res = list()
if indication_res:
# indication_res_df = pd.DataFrame(indication_res)
# indication_res_df['变异'] = indication_res_df.apply(
# lambda x: self.sign_from_type.get(trans.get(tbtype, ''), '') for tbtype in x['检测内容'].split('/'))
for indication_sp in indication_res:
gene_tbtype_res = list()
for tbtype in indication_sp['检测内容'].split('/'):
if tbtype not in trans:
continue
if trans[tbtype] not in self.sign_from_type:
continue
if indication_sp['基因'] not in self.sign_from_type[trans[tbtype]]:
continue
gene_tbtype_res.append(self.sign_from_type[trans[tbtype]][indication_sp['基因']])
indication_sp['检测情况'] = '\n'.join(gene_tbtype_res)
res.append(indication_sp)
self.result['indication'] = res
2023-07-31 13:49:34 +08:00
def _to_records(self, sheetname, need=False):
"""
for many lines
:param sheetname:
:return:
"""
data = pd.DataFrame(self.sampledata[sheetname])
res = []
if data.empty:
self.result[sheetname] = res
return
res = data.to_dict('records')
if need:
return res
self.result[sheetname] = res
def _to_dicts(self, sheetname):
"""
for single line
:param sheetname:
:return:
"""
data = pd.DataFrame(self.sampledata[sheetname])
res = {}
if data.empty:
self.result[sheetname] = res
return
res = data.to_dict('index')[0]
self.result[sheetname] = res
def _drug_category(self, groupdata):
drug_category_res = dict()
for drug_category, drug_category_alter_data in groupdata.groupby('Drug_Category'):
if drug_category == '.':
continue
2023-08-01 15:51:00 +08:00
drug_category_alter_data['drug_split'] = drug_category_alter_data['DrugCn'].str.split(',')
drug_category_alter_data_split = drug_category_alter_data.explode('drug_split').reset_index()
2023-07-31 13:49:34 +08:00
# 敏感,可能敏感药物统计
if drug_category in ['a', 'b', 'c']:
2023-08-01 15:51:00 +08:00
self.signdurg.update(set(drug_category_alter_data_split['drug_split'].tolist()))
drug_category_alter_data_split['drugdes'] = drug_category_alter_data_split.apply(
lambda x: '%s%s 级】' % (x['drug_split'], x['AMP_evidence_level']), axis=1)
2023-08-01 18:09:34 +08:00
drug_category_res[drug_category] = '\n'.join(set(drug_category_alter_data_split['drugdes'].to_list()))
2023-07-31 13:49:34 +08:00
# 所有药物信息
groupdata['list_col'] = groupdata['DrugCn'].str.replace(' + ', '+').str.split(r'[+,]')
exploded_df = groupdata.explode('list_col').reset_index()
exploded_df = exploded_df[(exploded_df['list_col'] != '.') & (exploded_df['list_col'] != '')]
2023-08-01 15:51:00 +08:00
exploded_df.loc[exploded_df['Response_Type'].str.contains('敏感'), 'Response_Type'] = '可能敏感'
exploded_df.loc[exploded_df['Response_Type'].str.contains('耐药'), 'Response_Type'] = '可能耐药'
2023-07-31 13:49:34 +08:00
exploded_dict = exploded_df.groupby('Response_Type')['list_col'].agg(lambda x: list(set(x))).to_dict()
for drug_type in exploded_dict:
if drug_type in self.drugs_type:
self.drugs_type[drug_type].extend(exploded_dict[drug_type])
else:
self.drugs_type[drug_type] = exploded_dict[drug_type]
# for drugall in exploded_df['Drug_Detail'].to_list():
# for drug in drugall.split('|'):
# match = re.search(r'\[\[(.*?)]](.*?)$', drug)
# if match:
# self.drugs_record['drugs'].update({match.group(1).strip(): match.group(2).strip()})
return drug_category_res
def collect(self):
self.cms()
self.target()
self.fusion()
self.cnv()
self.hotspot()
self.met()
self.longindel()
self.mmr()
self.msi()
self.chemo()
self.hcs()
self.heredity()
self.qc()
self.indication()
self.drugs()
# 汇总
self.result['sum']['signtb_num'] = len(self.signtb)
self.result['sum']['signdrug_num'] = len(self.signdurg)
self.result['drugs']['drugs_type'] = {key: self.drugs_type[key] for key in sorted(self.drugs_type.keys())}
return self.result
def run(path):
parse = Parse(read(path))
res = parse.collect()
resjson = json.dumps(res, indent=4, ensure_ascii=False)
2023-08-01 15:51:00 +08:00
# with open('t.json', 'w') as f:
# f.write(resjson)
2023-07-31 13:49:34 +08:00
return resjson
if __name__ == '__main__':
run(sys.argv[1])