#! /usr/bin/env python3 import argparse import os import re import pandas as pd class HereditaryRun: def __init__(self, database, project, output_dir, name, file): self.database = database self.project = project self.output_dir = output_dir self.name = name self.file = file def filter(self): # 过滤掉不包含 hcs 的,还有是等于1,2 级的 data = pd.read_csv(self.file, sep='\t') data.fillna('.', inplace=True) data = data[((data['ClinicalSign'] == 1) | (data['ClinicalSign'] == 2)) & (data['genetag'].str.contains('hcs'))] prefile = os.path.join(self.output_dir, f'{self.name}.hereditary.pre.txt') data.to_csv(prefile, sep='\t', index=False) database = pd.read_excel(self.database) database.fillna('.', inplace=True) database = database[database['ItemId'].str.contains(self.project)] expanded_database = database.assign(Gene=database['Gene'].str.split(';')).explode('Gene') result_df = pd.DataFrame(columns=['Gene', 'Syndrome_Cn', 'inheritance', 'genotype', 'mutation']) for _, rows in data.iterrows(): matches = re.match(r"([A-Za-z0-9]+):.*:(p\..*)", rows['AAChange_refGene']) row_df = pd.DataFrame(columns=['Gene', 'Syndrome_Cn', 'inheritance', 'genotype', 'mutation', 'ClinicalSign']) gene, mutation = '', '' if matches: gene = matches.group(1) mutation = matches.group(2) else: raise UserWarning('HGVS 解析错误!') selected_rows = expanded_database[expanded_database['Gene'].str.split(';').apply(lambda x: gene in x)] row_df['Syndrome_Cn'] = selected_rows['Syndrome_Cn'] row_df['inheritance'] = selected_rows['inheritance'] row_df['Gene'] = gene row_df['mutation'] = mutation row_df['genotype'] = '纯合' if rows['Freq'] > 0.9 else '杂合' row_df['ClinicalSign'] = str(rows['ClinicalSign']) result_df = pd.concat([result_df, row_df]) hereditaryfile = os.path.join(self.output_dir, f'{self.name}.hereditary.txt') result_df.to_csv(hereditaryfile, sep='\t', index=False) database['res'] = '同一般人群' database.loc[database['Syndrome_Cn'].apply(lambda x: x in result_df['Syndrome_Cn'].values), 'res'] = '风险较高' risk = database[['Cancer_Cn', 'res']] expanded_risk = risk.assign(Cancer_Cn=risk['Cancer_Cn'].str.split('、')).explode('Cancer_Cn') expanded_risk['res'] = expanded_risk.groupby('Cancer_Cn')['res'].transform( lambda x: '风险较高' if '风险较高' in x.values else x.iloc[0]) expanded_risk.drop_duplicates(inplace=True) riskfile = os.path.join(self.output_dir, f'{self.name}.risk.txt') expanded_risk.to_csv(riskfile, sep='\t', index=False) if __name__ == "__main__": parser = argparse.ArgumentParser(description="hereditary Process Script") parser.add_argument('-d', '--database', help="Path to hereditary_mut's database", required=True) parser.add_argument('-p', '--project', help="Project name", required=True) parser.add_argument('-n', '--name', help="Name for sample", required=True) parser.add_argument('-f', '--file', help="germline filter file", required=True) parser.add_argument('-o', '--output_dir', help="Output directory, default ./", default='') args = parser.parse_args() hereditary = HereditaryRun(args.database, args.project, args.output_dir, args.name, args.file) hereditary.filter()