pipeline/codes/chemo.py

246 lines
9.8 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

#! /usr/bin/env python3
import argparse
import logging
import os
import pandas as pd
import pysam
class ChemoRun:
def __init__(self, database, probe, cancer, project, output_dir, name, vcf):
self.database = database
self.probe = probe
self.output_dir = output_dir
self.name = name
self.vcf = vcf
self.cancer = cancer
self.project = project
@staticmethod
def read_info(project):
info_path = os.path.join(os.environ.get('DATABASE'), 'info.csv')
read_res = pd.read_csv(info_path)
read_res = read_res.fillna('.')
read_res = read_res[read_res['project'] == project]
gene_list = list()
if any(read_res['chemotherapy_drug'].values):
genecontent = read_res['chemotherapy_drug'].values[0]
if genecontent != '.':
gene_list = genecontent.split('/')
if not gene_list:
raise UserWarning(f'化疗检测基因为空!{project} ,请查看参数project是否正确')
return gene_list
@staticmethod
def get_drug_plan(x, drugsum):
tlist = x.split('+')
tdeslist = list()
for tdes in tlist:
if tdes.strip() in drugsum:
t1_des = drugsum[tdes.strip()]
tdeslist.append(t1_des)
if '慎用' in tdeslist or '谨慎' in tdeslist:
return '慎用'
elif '推荐' in tdeslist:
return '推荐'
elif '常规' in tdeslist:
return '可选'
else:
return '可选'
@staticmethod
def build_record_dict(vcf_file):
vcfdata = pysam.VariantFile(vcf_file)
records = {}
for record in vcfdata:
chrom = record.chrom
pos = record.pos
records[(chrom, pos)] = record
return records
def parsedata(self):
"""
处理chemo_drug_rsid_snppos.xlsx, 并生成对应bed文件
"""
data = pd.read_excel(self.database, None)
gene_list = self.read_info(self.project)
drug_rsid = data['drug_rsid']
drug_combine = data['drug_combine']
drug_rsid.fillna('.', inplace=True)
drug_combine.fillna('.', inplace=True)
drug_rsid_data = drug_rsid[
(drug_rsid['source'].str.contains(self.probe) & (drug_rsid['genename'].apply(lambda x: x in gene_list)))]
drug_combine_data = drug_combine[drug_combine['treatResult'].apply(lambda x: x in self.cancer)]
if drug_rsid_data.empty:
logging.error(f'无此项目!{self.probe} ,请查看参数probe是否正确')
raise UserWarning(f'无此项目!{self.probe} ,请查看参数probe是否正确')
# 生成bed文件
beddata = drug_rsid_data[['chr', 'start', 'end', 'rsid']].drop_duplicates()
beddata.rename(columns={'chr': '#chr'}, inplace=True)
bedpath = os.path.join(self.output_dir, f'{self.probe}_chemo.bed')
beddata.to_csv(bedpath, sep='\t', index=False)
return drug_rsid_data, drug_combine_data, bedpath
def parse_vcf(self, vcfpath, drug_rsid_data, drug_combine_data):
records = self.build_record_dict(vcfpath)
beddata = drug_rsid_data[['chr', 'start', 'end', 'rsid']].drop_duplicates()
fliterdata = pd.DataFrame()
for _, row in beddata.iterrows():
chrom = row['chr']
end = row['end']
fliter = pd.DataFrame()
if (chrom, end) not in records:
fliter = pd.concat([fliter, drug_rsid_data[
(drug_rsid_data['chr'] == chrom) &
(drug_rsid_data['end'] == end) &
(drug_rsid_data['genotype'] == '0/0')
]])
else:
record = records[(chrom, end)]
ref = record.ref
alt = record.alts[0]
# gt = '/'.join(list(map(str, sorted(record.samples.get(record.samples.keys()[0]).get('GT')))))
freq = record.samples.get(record.samples.keys()[-1]).get('AF')[0]
if freq > 0.9:
gt = '1/1'
elif 0.9 >= freq > 0.1:
gt = '0/1'
else:
gt = '0/0'
fliter = pd.concat([fliter, drug_rsid_data[
(drug_rsid_data['chr'] == chrom) &
(drug_rsid_data['end'] == end) &
(drug_rsid_data['ref'] == ref) &
(drug_rsid_data['alt'] == alt) &
(drug_rsid_data['genotype'] == gt)
]])
if fliter.empty:
raise UserWarning(
'chr: %s , end: %s 数据库未能匹配, 野生型0/0也未能匹配' % (chrom, end))
fliterdata = pd.concat([fliterdata, fliter])
# 生成过滤之后文件
respath = os.path.join(self.output_dir, f'{self.name}.chemo.res.csv')
if not fliterdata.empty:
fliterdata['probe'] = self.probe
fliterdata.to_csv(respath, sep='\t', index=False)
# 分类汇总 同位点,药物合并 drug.infos.txt
drugrsid = fliterdata[['drugname', 'genename', 'rsid', 'result', 'level', 'tips', 'drugsort']]
drugrsid = drugrsid.drop_duplicates()
resdrugrsid = drugrsid.groupby(['drugname', 'genename', 'rsid', 'result', 'level', 'drugsort'])['tips'].agg(
','.join).reset_index()
resdrugrsid.rename(columns=
{'drugname': '药物', 'genename': '检测基因', 'rsid': '检测位点', 'result': '基因型',
'level': '证据等级', 'tips': '用药提示'},
inplace=True)
resdrugrsid = resdrugrsid.sort_values(by=['drugsort', '药物', '检测基因'])
resdrugrsid.to_csv(os.path.join(self.output_dir, f'{self.name}.drug.infos.txt'), index=False, sep='\t')
# 药物 药物疗效 推荐程度合并 drug.res.txt
drugtypesum = fliterdata[['drugname', 'drugtype', 'rsid', 'weights']]
drugtypesum = drugtypesum.drop_duplicates()
drugtyperes = list()
drugsum = dict()
for drug, drugdata in drugtypesum.groupby('drugname'):
tipsnum = drugdata.groupby(['drugtype']).agg({'weights': 'sum'}).to_dict('index')
sumlist = list()
if 'LX' in tipsnum:
LX = tipsnum['LX']['weights']
if LX > 0:
lxdes = '疗效较好'
lxnum = 1
elif LX == 0:
lxdes = '疗效一般'
lxnum = 0
else:
lxdes = '疗效较差'
lxnum = -1
sumlist.append(lxdes)
else:
LX = 0
lxnum = 0
if 'DF' in tipsnum:
DF = tipsnum['DF']['weights']
if DF > 0:
dfdes = '毒副较低'
dfnum = 1
elif DF == 0:
dfdes = '毒副一般'
dfnum = 0
else:
dfdes = '毒副较高'
dfnum = -1
sumlist.append(dfdes)
else:
DF = 0
dfnum = 0
# 评价方式 疗效 1 0 -1, 毒副 1 0 -1 可形成9宫格
sumnum = lxnum + dfnum
if sumnum > 0:
sumdes = '推荐'
elif sumnum == 0:
sumdes = '常规'
else:
sumdes = '谨慎'
# 特别药物处理
if (drug == "氟尿嘧啶" or drug == "卡培他滨") and DF < 0:
sumdes = '谨慎'
drugtyperes.append(dict(
药物名称=drug,
疗效=LX,
毒副=DF,
推荐程度=sumdes,
疗效和毒副总结=','.join(sumlist)
))
drugsum[drug] = sumdes
# 报告中展示药物有顺序
drugsort = fliterdata[['drugname', 'drugsort']].drop_duplicates()
drugsort_dict = drugsort.set_index('drugname')['drugsort'].to_dict()
drugtyperes_sort = sorted(drugtyperes, key=lambda x: (
drugsort_dict[x['药物名称']] if x['药物名称'] in drugsort_dict else 100, x['药物名称']))
# 生成数据
pd.DataFrame(drugtyperes_sort).to_csv(os.path.join(self.output_dir, f'{self.name}.drug.res.txt'), index=False,
sep='\t')
# 联合用药
if not drug_combine_data.empty:
drug_combine_data['临床提示'] = drug_combine_data['用药方案'].apply(self.get_drug_plan, args=(drugsum,))
drug_combine_data.to_csv(os.path.join(self.output_dir, f'{self.name}.chemo.comb.txt'), index=False,
sep='\t')
def run(self):
try:
drug_rsid_data, drug_combine_data, bedpath = self.parsedata()
self.parse_vcf(self.vcf, drug_rsid_data, drug_combine_data)
except UserWarning as e:
raise UserWarning(e)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Chemotherapy Process Script")
parser.add_argument('-d', '--database', help="Path to chemo_drug's database", required=True)
parser.add_argument('-probe', '--probe', help="Probe name", required=True)
parser.add_argument('-n', '--name', help="Name for sample", required=True)
parser.add_argument('-v', '--vcf', help="germline vcf", required=True)
parser.add_argument('-c', '--cancer', help="cancer", required=True)
parser.add_argument('-p', '--project', help="Project", required=True)
parser.add_argument('-o', '--output_dir', help="Output directory, default ./", default='')
args = parser.parse_args()
chemorun = ChemoRun(args.database, args.probe, args.cancer, args.project, args.output_dir, args.name, args.vcf)
chemorun.run()