report/tools/parsexlsx.py

458 lines
18 KiB
Python
Raw Normal View History

2023-07-31 13:49:34 +08:00
import json
import re
import sys
import time
from collections import defaultdict
import pandas as pd
from tools.readxlsx import read
def tree():
return defaultdict(tree)
class BaseAssignment:
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.result = tree() # 报告结果
self.signtb = set() # 具有明确或潜在临床意义的基因变异
self.signdurg = set() # 潜在临床获益的治疗药物
self.drugs_type = dict()
class Parse(BaseAssignment):
def __init__(self, sampledata, *args, **kwargs):
super().__init__(*args, **kwargs)
self.sampledata = sampledata
def cms(self):
"""
样本信息处理
"""
data = pd.DataFrame(self.sampledata['sample_info'])
if data.empty:
raise UserWarning('sample_info表为空生成报告失败')
data = data.applymap(
lambda x: str(x).replace('.', '/').replace('-', '/').replace('——', '/') if str(x) in ['.', '-', '——'] else x)
data_dict = data.to_dict('index')[0]
data_dict['receiveTime'] = re.split(' ', data_dict['receiveTime'])[0]
data_dict['reportTime'] = time.strftime("%Y-%m-%d", time.localtime())
self.result['c'] = data_dict
def target(self):
data = pd.DataFrame(self.sampledata['snvindel'])
res = []
if data.empty:
self.result['snvindel'] = res
return
data = data[data['Validated'] == 1].reset_index()
data['muttype'] = '/'
data.loc[data['ExonicFunc.refGene'].str.match('nonsynonymous SNV'), 'muttype'] = '错义突变'
data.loc[data['ExonicFunc.refGene'].str.match('^frameshift'), 'muttype'] = '移码突变'
data.loc[data['ExonicFunc.refGene'].str.match('^nonframeshift'), 'muttype'] = '非移码突变'
data.loc[data['ExonicFunc.refGene'].str.match('stopgain'), 'muttype'] = '提前终止'
# 拆分hgvs
data[['gene', 'transcript', 'exon', 'nacid', 'aacid']] = data['AAChange.refGene'].str.split(':', expand=True)
# 没有氨基酸改变用核苷酸改变代替
data['aacid'] = data['aacid'].fillna(data['nacid'])
for alter, alter_data in data.groupby('AAChange.refGene'):
alter_data_need = alter_data[['gene', 'transcript', 'exon', 'nacid', 'aacid', 'mutant_frequency',
'AMP_mut_level', 'muttype', 'Gene_function']]
alter_res = alter_data_need.iloc[0].to_dict()
alter_res['drug_category'] = self._drug_category(alter_data)
drug_content = alter_data[
['DrugCn', 'Response_Type', 'Indication', 'Evidence_Source', 'Efficacy_Evidence']]
drug_content = drug_content[drug_content['DrugCn'] != '.']
alter_res['drug_content'] = drug_content.reset_index().to_dict('records')
alter_res['alter'] = alter
res.append(alter_res)
# 汇总
if alter_res['AMP_mut_level'] in ['I', 'II']:
self.signtb.add(alter)
self.result['snvindel'] = res
def fusion(self):
data = pd.DataFrame(self.sampledata['fusion'])
res = []
if data.empty:
self.result['fusion'] = res
return
data = data[data['Validated'] == 1].reset_index()
for alter, alter_data in data.groupby('FUSION'):
alter_data_need = alter_data[['FUSION', 'FREQ1', 'AMP_mut_level', 'Gene_function']]
alter_res = alter_data_need.iloc[0].to_dict()
alter_res['drug_category'] = self._drug_category(alter_data)
drug_content = alter_data[
['DrugCn', 'Response_Type', 'Indication', 'Evidence_Source', 'Efficacy_Evidence']]
drug_content = drug_content[drug_content['DrugCn'] != '.']
alter_res['drug_content'] = drug_content.reset_index().to_dict('records')
alter_res['alter'] = '%s 融合' % (alter_res['FUSION'].replace('-', ':'))
res.append(alter_res)
# 汇总
if alter_res['AMP_mut_level'] in ['I', 'II']:
self.signtb.add(alter)
self.result['fusion'] = res
def cnv(self):
data = pd.DataFrame(self.sampledata['cnv'])
res = []
if data.empty:
self.result['cnv'] = res
return
data = data[data['Validated'] == 1].reset_index()
for alter, alter_data in data.groupby('Gene_Symbol'):
alter_data_need = alter_data[['Gene_Symbol', 'Copy_number', 'AMP_mut_level', 'Gene_function']].reset_index()
alter_data_need['muttype'] = '缺失'
alter_data_need.loc[alter_data_need['Copy_number'] > 2, 'muttype'] = '扩增'
alter_res = alter_data_need.iloc[0].to_dict()
alter_res['drug_category'] = self._drug_category(alter_data)
drug_content = alter_data[
['DrugCn', 'Response_Type', 'Indication', 'Evidence_Source', 'Efficacy_Evidence']]
drug_content = drug_content[drug_content['DrugCn'] != '.']
alter_res['drug_content'] = drug_content.reset_index().to_dict('records')
alter_res['alter'] = '%s %s' % (alter, alter_res['muttype'])
res.append(alter_res)
# 汇总
if alter_res['AMP_mut_level'] in ['I', 'II']:
self.signtb.add(alter)
self.result['cnv'] = res
def hotspot(self):
self._to_records('hotspot')
def met(self):
self._to_records('MET')
def longindel(self):
self._to_records('longindel')
def mmr(self):
data = pd.DataFrame(self.sampledata['MMR'])
result_summary = '未检测到相关基因突变'
predict = '对免疫检查点抑制剂可能不敏感'
mmr_num = 0
res = []
if not data.empty:
tmdf = data[['gene', 'p_change']].reset_index()
tmdf['result_summary'] = tmdf.apply(lambda x: '%s %s' % (x['gene'], x['p_change']), axis=1)
result_summary = ' | '.join(tmdf['result_summary'].to_list())
predict = '对免疫检查点抑制剂可能敏感'
mmr_num = len(data.index)
res = data.to_dict('records')
self.result['MMR'] = res
self.result['sum']['mmr'] = dict(
result_summary=result_summary,
predict=predict,
mmr_num=mmr_num
)
def msi(self):
self._to_dicts('MSI')
# def chemo(self):
# data = pd.DataFrame(self.sampledata['chemo'])
#
# project = data['project'].to_list()[0]
#
# # 分类汇总 同位点,药物合并 drug.infos.txt
# drugrsid = data[['drugname', 'genename', 'rsid', 'result', 'level', 'tips', 'drugsort']]
# drugrsid = drugrsid.drop_duplicates()
# resdrugrsid = drugrsid.groupby(['drugname', 'genename', 'rsid', 'result', 'level', 'drugsort'])['tips'].agg(
# ','.join).reset_index()
# resdrugrsid.rename(columns=
# {'drugname': '药物', 'genename': '检测基因', 'rsid': '检测位点', 'result': '基因型',
# 'level': '证据等级', 'tips': '用药提示'},
# inplace=True)
# resdrugrsid = resdrugrsid.sort_values(by=['drugsort', '药物', '检测基因'])
# self.result['chemo']['druginfo'] = resdrugrsid.to_dict('records')
#
# # 药物 药物疗效 推荐程度合并 drug.res.txt
# drugtypesum = data[['drugname', 'drugtype', 'rsid', 'weights']]
# drugtypesum = drugtypesum.drop_duplicates()
# drugtyperes = list()
# drugsum = dict()
# for drug, drugdata in drugtypesum.groupby('drugname'):
# tipsnum = drugdata.groupby(['drugtype']).agg({'weights': 'sum'}).to_dict('index')
# sumlist = list()
# if 'LX' in tipsnum:
# LX = tipsnum['LX']['weights']
# if LX > 0:
# lxdes = '疗效较好'
# lxnum = 1
# elif LX == 0:
# lxdes = '疗效一般'
# lxnum = 0
# else:
# lxdes = '疗效较差'
# lxnum = -1
# sumlist.append(lxdes)
# else:
# LX = 0
# lxnum = 0
# if 'DF' in tipsnum:
# DF = tipsnum['DF']['weights']
# if DF > 0:
# dfdes = '毒副较低'
# dfnum = 1
# elif DF == 0:
# dfdes = '毒副一般'
# dfnum = 0
# else:
# dfdes = '毒副较高'
# dfnum = -1
# sumlist.append(dfdes)
# else:
# DF = 0
# dfnum = 0
#
# # 评价方式 疗效 1 0 -1, 毒副 1 0 -1 可形成9宫格
# sumnum = lxnum + dfnum
# if sumnum > 0:
# sumdes = '推荐'
# elif sumnum == 0:
# sumdes = '常规'
# else:
# sumdes = '谨慎'
#
# # 特别药物处理
# if (drug == "氟尿嘧啶" or drug == "卡培他滨") and DF < 0:
# sumdes = '谨慎'
#
# drugtyperes.append(dict(
# 药物名称=drug,
# 疗效=LX,
# 毒副=DF,
# 推荐程度=sumdes,
# 疗效和毒副总结=','.join(sumlist)
# ))
# drugsum[drug] = sumdes
#
# # 报告中展示药物有顺序
# drugsort = data[['drugname', 'drugsort']].drop_duplicates()
# drugsort_dict = drugsort.set_index('drugname')['drugsort'].to_dict()
# drugtyperes_sort = sorted(drugtyperes, key=lambda x: (
# drugsort_dict[x['药物名称']] if x['药物名称'] in drugsort_dict else 100, x['药物名称']))
#
# drugtyperes_sort_df = pd.DataFrame(drugtyperes_sort)
# self.result['chemo']['sum'] = drugtyperes_sort_df.groupby('推荐程度')['药物名称'].apply(','.join).to_dict()
# self.result['chemo']['drugres'] = drugtyperes_sort_df.to_dict('records')
#
# # 联合用药
# drug_combine_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'database',
# 'chemo_drug_combine.csv')
# drug_combine = pd.read_csv(drug_combine_path, sep='\t')
# drug_combine.fillna('.', inplace=True)
# drug_combine_data = drug_combine[drug_combine['source'].str.contains(project)]
# drug_combine_data = drug_combine_data.reset_index()
# if not drug_combine_data.empty:
# drug_combine_data['临床提示'] = drug_combine_data['用药方案'].apply(self._get_drug_plan, args=(drugsum,))
# self.result['chemo']['combine'] = drug_combine_data.groupby('癌种').apply(
# lambda group: group.set_index('癌种').to_dict('records')).to_dict()
# else:
# self.result['chemo']['combine'] = dict()
# self.result['sum']['chemo_drug_num'] = len(drugsum.keys())
def chemo(self):
chemo_res = self._to_records('chemo_res', need=True)
chemo_res_df = pd.DataFrame(chemo_res)
chemo_res_df.index = chemo_res_df.index + 1
chemo_res_df = chemo_res_df.reset_index()
self.result['chemo']['chemo_res'] = chemo_res_df.to_dict('records')
self.result['sum']['chemo']['drug_num'] = len(chemo_res)
self.result['sum']['chemo']['drug_category'] = pd.DataFrame(chemo_res).groupby('推荐程度')['药物名称'].apply(
','.join).to_dict()
chemo_comb = self._to_records('chemo_comb', need=True)
chemo_comb_res = dict()
if chemo_comb:
chemo_comb_res = pd.DataFrame(chemo_comb).groupby('癌种').apply(
lambda group: group.set_index('癌种').to_dict('records')).to_dict()
self.result['chemo']['chemo_comb'] = chemo_comb_res
chemo_info = self._to_records('chemo_info', need=True)
chemo_info_res = dict()
if chemo_info:
chemo_info_res = pd.DataFrame(chemo_info).groupby('药物').apply(
lambda group: group.set_index('药物').to_dict('records')).to_dict()
self.result['chemo']['chemo_info'] = chemo_info_res
def hcs(self):
self._to_records('HCS')
self.result['sum']['hcs']['num'] = len(self.result['HCS'])
def heredity(self):
"""
遗传的结果文件
:return:
"""
hereditary = pd.DataFrame(self.sampledata['hereditary'])
result = '/'
disease = '/'
risk = '/'
if not hereditary.empty:
result = '|'.join(hereditary.apply(lambda x: '%s %s' % (x['基因'], x['检测结果']), axis=1).to_list())
disease = '|'.join(hereditary['遗传性肿瘤综合征'].to_list())
hereditary_risk = pd.DataFrame(self.sampledata['hereditary_risk'])
if not hereditary_risk.empty:
risk = ','.join(hereditary_risk[hereditary_risk['风险值'] == '偏高']['肿瘤类型'].to_list())
self.result['hereditary'] = hereditary.to_dict('records')
self.result['sum']['hereditary']['result'] = result
self.result['sum']['hereditary']['disease'] = disease
self.result['sum']['hereditary']['risk'] = risk
def qc(self):
# self._to_dicts('qc')
data = pd.DataFrame(self.sampledata['qc'])
res = {}
if not data.empty:
data.rename(columns={
'Q30(%)': 'q30',
'mean_depth(dedup)': 'depth',
'coverage(>=0.2*meanx)': 'coverage'
}, inplace=True)
res = data.to_dict('index')[0]
self.result['qc'] = res
def drugs(self):
data = pd.DataFrame(self.sampledata['drugs'])
res = {}
if not data.empty:
data = data.dropna()
data = data[data['drug_detail'] != '.']
res = data.set_index('drug_name')['drug_detail'].to_dict()
self.result['drugs']['drugs_detail'] = res
def indication(self):
self._to_records('indication')
def _to_records(self, sheetname, need=False):
"""
for many lines
:param sheetname:
:return:
"""
data = pd.DataFrame(self.sampledata[sheetname])
res = []
if data.empty:
self.result[sheetname] = res
return
res = data.to_dict('records')
if need:
return res
self.result[sheetname] = res
def _to_dicts(self, sheetname):
"""
for single line
:param sheetname:
:return:
"""
data = pd.DataFrame(self.sampledata[sheetname])
res = {}
if data.empty:
self.result[sheetname] = res
return
res = data.to_dict('index')[0]
self.result[sheetname] = res
def _drug_category(self, groupdata):
drug_category_res = dict()
for drug_category, drug_category_alter_data in groupdata.groupby('Drug_Category'):
if drug_category == '.':
continue
# 敏感,可能敏感药物统计
if drug_category in ['a', 'b', 'c']:
self.signdurg.update(set(drug_category_alter_data['DrugCn'].str.split(',').explode().tolist()))
drug_category_alter_data['drugdes'] = drug_category_alter_data.apply(
lambda x: '%s%s 级】' % (x['DrugCn'], x['AMP_evidence_level']), axis=1)
drug_category_res[drug_category] = '\n'.join(drug_category_alter_data['drugdes'].to_list())
# 所有药物信息
groupdata['list_col'] = groupdata['DrugCn'].str.replace(' + ', '+').str.split(r'[+,]')
exploded_df = groupdata.explode('list_col').reset_index()
exploded_df = exploded_df[(exploded_df['list_col'] != '.') & (exploded_df['list_col'] != '')]
exploded_dict = exploded_df.groupby('Response_Type')['list_col'].agg(lambda x: list(set(x))).to_dict()
for drug_type in exploded_dict:
if drug_type in self.drugs_type:
self.drugs_type[drug_type].extend(exploded_dict[drug_type])
else:
self.drugs_type[drug_type] = exploded_dict[drug_type]
# for drugall in exploded_df['Drug_Detail'].to_list():
# for drug in drugall.split('|'):
# match = re.search(r'\[\[(.*?)]](.*?)$', drug)
# if match:
# self.drugs_record['drugs'].update({match.group(1).strip(): match.group(2).strip()})
return drug_category_res
@staticmethod
def _get_drug_plan(x, drugsum):
tlist = x.split('+')
tdeslist = list()
for tdes in tlist:
if tdes.strip() in drugsum:
t1_des = drugsum[tdes.strip()]
tdeslist.append(t1_des)
if '慎用' in tdeslist or '谨慎' in tdeslist:
return '慎用'
elif '推荐' in tdeslist:
return '推荐'
elif '常规' in tdeslist:
return '可选'
else:
return '可选'
def collect(self):
self.cms()
self.target()
self.fusion()
self.cnv()
self.hotspot()
self.met()
self.longindel()
self.mmr()
self.msi()
self.chemo()
self.hcs()
self.heredity()
self.qc()
self.indication()
self.drugs()
# 汇总
self.result['sum']['signtb_num'] = len(self.signtb)
self.result['sum']['signdrug_num'] = len(self.signdurg)
self.result['drugs']['drugs_type'] = {key: self.drugs_type[key] for key in sorted(self.drugs_type.keys())}
return self.result
def run(path):
parse = Parse(read(path))
res = parse.collect()
resjson = json.dumps(res, indent=4, ensure_ascii=False)
with open('t.json', 'w') as f:
f.write(resjson)
return resjson
if __name__ == '__main__':
run(sys.argv[1])