report/tools/parsexlsx.py

384 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import json
import re
import sys
import time
from collections import defaultdict
import pandas as pd
from tools.readxlsx import read
def tree():
return defaultdict(tree)
class BaseAssignment:
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.result = tree() # 报告结果
self.signtb = set() # 具有明确或潜在临床意义的基因变异
self.signdurg = set() # 潜在临床获益的治疗药物
self.sign_from_type = defaultdict(dict) # 不同来源有意义突变记录NCCN使用
self.drugs_type = dict()
class Parse(BaseAssignment):
def __init__(self, sampledata, *args, **kwargs):
super().__init__(*args, **kwargs)
self.sampledata = sampledata
def cms(self):
"""
样本信息处理
"""
data = pd.DataFrame(self.sampledata['sample_info'])
if data.empty:
raise UserWarning('sample_info表为空生成报告失败')
data = data.applymap(
lambda x: str(x).replace('.', '/').replace('-', '/').replace('——', '/') if str(x) in ['.', '-',
'——'] else x)
data_dict = data.to_dict('index')[0]
data_dict['receiveTime'] = re.split(' ', data_dict['receiveTime'])[0]
data_dict['reportTime'] = time.strftime("%Y-%m-%d", time.localtime())
self.result['c'] = data_dict
def target(self):
data = pd.DataFrame(self.sampledata['snvindel'])
res = []
if data.empty:
self.result['snvindel'] = res
return
data['freq'] = data['mutant_frequency'].apply(lambda x: float(x.replace('%', '')))
# 去除不可信vus中的 %2以下的突变
data = data[data['Validated'] == 1 & (~((data['freq'] < 2) & (data['AMP_mut_level'] == 'III')))].reset_index()
data['muttype'] = '/'
data.loc[data['ExonicFunc.refGene'].str.match('nonsynonymous SNV'), 'muttype'] = '错义突变'
data.loc[data['ExonicFunc.refGene'].str.match('^frameshift'), 'muttype'] = '移码突变'
data.loc[data['ExonicFunc.refGene'].str.match('^nonframeshift'), 'muttype'] = '非移码突变'
data.loc[data['ExonicFunc.refGene'].str.match('stopgain'), 'muttype'] = '提前终止'
# 拆分hgvs
data[['gene', 'transcript', 'exon', 'nacid', 'aacid']] = data['AAChange.refGene'].str.split(':', expand=True)
# 没有氨基酸改变用核苷酸改变代替
# data['aacid'] = data['aacid'].fillna(data['nacid'])
data['aacid'] = data['aacid'].fillna('/')
for alter, alter_data in data.groupby('AAChange.refGene'):
alter_data_need = alter_data[['gene', 'transcript', 'exon', 'nacid', 'aacid', 'mutant_frequency',
'AMP_mut_level', 'muttype', 'Gene_function']]
alter_res = alter_data_need.iloc[0].to_dict()
alter_res['drug_category'] = self._drug_category(alter_data)
drug_content = alter_data[
['DrugCn', 'Response_Type', 'Indication', 'Evidence_Source', 'Efficacy_Evidence']]
drug_content = drug_content[drug_content['DrugCn'] != '.']
alter_res['drug_content'] = drug_content.reset_index().to_dict('records')
alter_res['alter'] = alter
res.append(alter_res)
# 汇总
if alter_res['AMP_mut_level'] in ['I', 'II']:
self.signtb.add(alter)
self.sign_from_type['target'][alter_res['gene']] = '%s(%s)' % (
alter_res['aacid'], alter_res['mutant_frequency'])
self.result['snvindel'] = res
def fusion(self):
data = pd.DataFrame(self.sampledata['fusion'])
res = []
if data.empty:
self.result['fusion'] = res
return
data = data[data['Validated'] == 1].reset_index()
for alter, alter_data in data.groupby('FUSION'):
alter_data_need = alter_data[['FUSION', 'FREQ1', 'AMP_mut_level', 'Gene_function', 'Gene_Symbol']]
alter_res = alter_data_need.iloc[0].to_dict()
alter_res['drug_category'] = self._drug_category(alter_data)
drug_content = alter_data[
['DrugCn', 'Response_Type', 'Indication', 'Evidence_Source', 'Efficacy_Evidence']]
drug_content = drug_content[drug_content['DrugCn'] != '.']
alter_res['drug_content'] = drug_content.reset_index().to_dict('records')
des = '%s 融合' % (alter_res['FUSION'].replace('-', '::'))
alter_res['alter'] = des
res.append(alter_res)
# 汇总
if alter_res['AMP_mut_level'] in ['I', 'II']:
self.signtb.add(alter)
self.sign_from_type['fusion'][alter_res['Gene_Symbol']] = des
self.result['fusion'] = res
def cnv(self):
data = pd.DataFrame(self.sampledata['cnv'])
res = []
if data.empty:
self.result['cnv'] = res
return
data = data[data['Validated'] == 1].reset_index()
for alter, alter_data in data.groupby('Gene_Symbol'):
alter_data_need = alter_data[['Gene_Symbol', 'Copy_number', 'AMP_mut_level', 'Gene_function']].reset_index()
alter_data_need['muttype'] = '缺失'
alter_data_need.loc[alter_data_need['Copy_number'] > 2, 'muttype'] = '扩增'
alter_res = alter_data_need.iloc[0].to_dict()
alter_res['drug_category'] = self._drug_category(alter_data)
drug_content = alter_data[
['DrugCn', 'Response_Type', 'Indication', 'Evidence_Source', 'Efficacy_Evidence']]
drug_content = drug_content[drug_content['DrugCn'] != '.']
alter_res['drug_content'] = drug_content.reset_index().to_dict('records')
des = '%s %s' % (alter, alter_res['muttype'])
alter_res['alter'] = '%s %s' % (alter, alter_res['muttype'])
res.append(alter_res)
# 汇总
if alter_res['AMP_mut_level'] in ['I', 'II']:
self.signtb.add(alter)
self.sign_from_type['cnv'][alter_res['Gene_Symbol']] = des
self.result['cnv'] = res
def hotspot(self):
self._to_records('hotspot')
def met(self):
self._to_records('MET')
def longindel(self):
self._to_records('longindel')
def mmr(self):
data = pd.DataFrame(self.sampledata['MMR'])
result_summary = '未检测到相关基因突变'
predict = '对免疫检查点抑制剂可能不敏感'
mmr_num = 0
res = []
if not data.empty:
tmdf = data[['gene', 'p_change']].reset_index()
tmdf['result_summary'] = tmdf.apply(lambda x: '%s %s' % (x['gene'], x['p_change']), axis=1)
result_summary = ' | '.join(tmdf['result_summary'].to_list())
predict = '对免疫检查点抑制剂可能敏感'
mmr_num = len(data.index)
res = data.to_dict('records')
self.result['MMR'] = res
self.result['sum']['mmr'] = dict(
result_summary=result_summary,
predict=predict,
mmr_num=mmr_num
)
def msi(self):
self._to_dicts('MSI')
def chemo(self):
chemo_res = self._to_records('chemo_res', need=True)
chemo_res_list = []
chemo_sign_drug_num = 0
chemo_drug_category = dict()
if chemo_res:
chemo_res_df = pd.DataFrame(chemo_res)
chemo_res_df.index = chemo_res_df.index + 1
chemo_res_df = chemo_res_df.reset_index()
chemo_res_list = chemo_res_df.to_dict('records')
chemo_sign_drug_num = len(chemo_res_df[chemo_res_df['推荐程度'] == '推荐'])
chemo_drug_category = chemo_res_df.groupby('推荐程度')['药物名称'].apply(
','.join).to_dict()
self.result['chemo']['chemo_res'] = chemo_res_list
self.result['sum']['chemo']['drug_num'] = chemo_sign_drug_num
self.result['sum']['chemo']['drug_category'] = chemo_drug_category
chemo_comb = self._to_records('chemo_comb', need=True)
chemo_comb_res = dict()
if chemo_comb:
chemo_comb_res = pd.DataFrame(chemo_comb).groupby('癌种').apply(
lambda group: group.set_index('癌种').to_dict('records')).to_dict()
self.result['chemo']['chemo_comb'] = chemo_comb_res
chemo_info = self._to_records('chemo_info', need=True)
chemo_info_res = dict()
if chemo_info:
chemo_info_res = pd.DataFrame(chemo_info).groupby('药物').apply(
lambda group: group.set_index('药物').to_dict('records')).to_dict()
self.result['chemo']['chemo_info'] = chemo_info_res
def hcs(self):
self._to_records('HCS')
self.result['sum']['hcs']['num'] = len(self.result['HCS'])
def heredity(self):
"""
遗传的结果文件
:return:
"""
hereditary = pd.DataFrame(self.sampledata['hereditary'])
result = '/'
disease = '/'
risk = '/'
if not hereditary.empty:
result = '|'.join(hereditary.apply(lambda x: '%s %s' % (x['基因'], x['检测结果']), axis=1).to_list())
disease = '|'.join(hereditary['遗传性肿瘤综合征'].to_list())
hereditary_risk = pd.DataFrame(self.sampledata['hereditary_risk'])
if not hereditary_risk.empty:
risk = ','.join(hereditary_risk[hereditary_risk['风险值'] == '偏高']['肿瘤类型'].to_list()) + '风险可能较高'
self.result['hereditary'] = hereditary.to_dict('records')
self.result['sum']['hereditary']['result'] = result
self.result['sum']['hereditary']['disease'] = disease
self.result['sum']['hereditary']['risk'] = risk
self.result['sum']['hereditary']['num'] = len(hereditary.index)
def qc(self):
# self._to_dicts('qc')
data = pd.DataFrame(self.sampledata['qc'])
res = {}
if not data.empty:
data.rename(columns={
'Q30(%)': 'q30',
'mean_depth(dedup)': 'depth',
'coverage(>=0.2*meanx)': 'coverage'
}, inplace=True)
res = data.to_dict('index')[0]
self.result['qc'] = res
def drugs(self):
data = pd.DataFrame(self.sampledata['drugs'])
res = {}
if not data.empty:
data = data.dropna()
data = data[data['drug_detail'] != '.']
data = data.applymap(lambda x: str(x).replace('||', '\n') if '||' in str(x) else x)
res = data.set_index('drug_name')['drug_detail'].to_dict()
self.result['drugs']['drugs_detail'] = res
def indication(self):
indication_res = self._to_records('indication', need=True)
trans = dict(
突变='target',
融合='fusion',
扩增='cnv'
)
res = list()
if indication_res:
# indication_res_df = pd.DataFrame(indication_res)
# indication_res_df['变异'] = indication_res_df.apply(
# lambda x: self.sign_from_type.get(trans.get(tbtype, ''), '') for tbtype in x['检测内容'].split('/'))
for indication_sp in indication_res:
gene_tbtype_res = list()
for tbtype in indication_sp['检测内容'].split('/'):
if tbtype not in trans:
continue
if trans[tbtype] not in self.sign_from_type:
continue
if indication_sp['基因'] not in self.sign_from_type[trans[tbtype]]:
continue
gene_tbtype_res.append(self.sign_from_type[trans[tbtype]][indication_sp['基因']])
indication_sp['检测情况'] = '\n'.join(gene_tbtype_res)
res.append(indication_sp)
self.result['indication'] = res
def _to_records(self, sheetname, need=False):
"""
for many lines
:param sheetname:
:return:
"""
data = pd.DataFrame(self.sampledata[sheetname])
res = []
if data.empty:
self.result[sheetname] = res
return
res = data.to_dict('records')
if need:
return res
self.result[sheetname] = res
def _to_dicts(self, sheetname):
"""
for single line
:param sheetname:
:return:
"""
data = pd.DataFrame(self.sampledata[sheetname])
res = {}
if data.empty:
self.result[sheetname] = res
return
res = data.to_dict('index')[0]
self.result[sheetname] = res
def _drug_category(self, groupdata):
drug_category_res = dict()
for drug_category, drug_category_alter_data in groupdata.groupby('Drug_Category'):
if drug_category == '.':
continue
drug_category_alter_data['drug_split'] = drug_category_alter_data['DrugCn'].str.split(',')
drug_category_alter_data_split = drug_category_alter_data.explode('drug_split').reset_index()
# 敏感,可能敏感药物统计
if drug_category in ['a', 'b', 'c']:
self.signdurg.update(set(drug_category_alter_data_split['drug_split'].tolist()))
drug_category_alter_data_split['drugdes'] = drug_category_alter_data_split.apply(
lambda x: '%s%s 级】' % (x['drug_split'], x['AMP_evidence_level']), axis=1)
drug_category_res[drug_category] = '\n'.join(set(drug_category_alter_data_split['drugdes'].to_list()))
# 所有药物信息
groupdata['list_col'] = groupdata['DrugCn'].str.replace(' + ', '+').str.split(r'[+,]')
exploded_df = groupdata.explode('list_col').reset_index()
exploded_df = exploded_df[(exploded_df['list_col'] != '.') & (exploded_df['list_col'] != '')]
exploded_df.loc[exploded_df['Response_Type'].str.contains('敏感'), 'Response_Type'] = '可能敏感'
exploded_df.loc[exploded_df['Response_Type'].str.contains('耐药'), 'Response_Type'] = '可能耐药'
exploded_dict = exploded_df.groupby('Response_Type')['list_col'].agg(lambda x: list(set(x))).to_dict()
for drug_type in exploded_dict:
if drug_type in self.drugs_type:
self.drugs_type[drug_type].extend(exploded_dict[drug_type])
else:
self.drugs_type[drug_type] = exploded_dict[drug_type]
# for drugall in exploded_df['Drug_Detail'].to_list():
# for drug in drugall.split('|'):
# match = re.search(r'\[\[(.*?)]](.*?)$', drug)
# if match:
# self.drugs_record['drugs'].update({match.group(1).strip(): match.group(2).strip()})
return drug_category_res
def collect(self):
self.cms()
self.target()
self.fusion()
self.cnv()
self.hotspot()
self.met()
self.longindel()
self.mmr()
self.msi()
self.chemo()
self.hcs()
self.heredity()
self.qc()
self.indication()
self.drugs()
# 汇总
self.result['sum']['signtb_num'] = len(self.signtb)
self.result['sum']['signdrug_num'] = len(self.signdurg)
self.result['drugs']['drugs_type'] = {key: self.drugs_type[key] for key in sorted(self.drugs_type.keys())}
return self.result
def run(path):
parse = Parse(read(path))
res = parse.collect()
resjson = json.dumps(res, indent=4, ensure_ascii=False)
with open('t.json', 'w') as f:
f.write(resjson)
return resjson
if __name__ == '__main__':
run(sys.argv[1])