#! /usr/bin/env python3 import argparse import os import re import pandas as pd def split_hgvs(hgvs): hgvs_split = hgvs.split(':') if len(hgvs_split) == 4: gene, position, transcript_version, coordinate_type = hgvs_split # pattern = r'c\.\d+([\+\-])[12]\D+>\D+' # match = re.search(pattern, coordinate_type) # # if match: # # transcript_version = # # if match.group(1) == '-':X variant_version = None elif len(hgvs_split) == 5: gene, position, transcript_version, coordinate_type, variant_version = hgvs_split else: raise ValueError(f'Invalid HGVS format{hgvs}') return { 'gene': gene, 'transcript': position, 'exon': transcript_version, 'nacid': coordinate_type, 'aacid': variant_version } class HereditaryRun: def __init__(self, database, project, output_dir, name, file): self.database = database self.project = project self.output_dir = output_dir self.name = name self.file = file def filter(self): # 过滤掉不包含 hcs 的,还有是等于1,2 级的 data = pd.read_csv(self.file, sep='\t') data.fillna('.', inplace=True) data = data[((data['ClinicalSign'] == 1) | (data['ClinicalSign'] == 2)) & (data['genetag'].str.contains('hcs'))] prefile = os.path.join(self.output_dir, f'{self.name}.hereditary.pre.txt') data.to_csv(prefile, sep='\t', index=False) database = pd.read_excel(self.database) database.fillna('.', inplace=True) database = database[database['ItemId'].str.contains(self.project)] expanded_database = database.assign(Gene=database['Gene'].str.split(';')).explode('Gene') result_df = pd.DataFrame(columns=['Gene', 'Syndrome_Cn', 'inheritance', 'genotype', 'mutation']) for _, rows in data.iterrows(): # matches = re.match(r"([A-Za-z0-9]+):.*:(p\..*)", rows['AAChange_refGene']) matches = split_hgvs(rows['AAChange_refGene']) gene = matches['gene'] aacid = matches['aacid'] if matches['aacid'] else matches['nacid'] row_df = pd.DataFrame( columns=['Gene', 'Syndrome_Cn', 'inheritance', 'genotype', 'mutation', 'ClinicalSign']) selected_rows = expanded_database[expanded_database['Gene'].str.split(';').apply(lambda x: gene in x)] row_df['Syndrome_Cn'] = selected_rows['Syndrome_Cn'] row_df['inheritance'] = selected_rows['inheritance'] row_df['Gene'] = gene row_df['mutation'] = aacid row_df['genotype'] = '纯合' if rows['Freq'] > 0.9 else '杂合' row_df['ClinicalSign'] = str(rows['ClinicalSign']) result_df = pd.concat([result_df, row_df]) hereditaryfile = os.path.join(self.output_dir, f'{self.name}.hereditary.txt') result_df.to_csv(hereditaryfile, sep='\t', index=False) database['res'] = '同一般人群' database.loc[database['Syndrome_Cn'].apply(lambda x: x in result_df['Syndrome_Cn'].values), 'res'] = '风险较高' risk = database[['Cancer_Cn', 'res']] expanded_risk = risk.assign(Cancer_Cn=risk['Cancer_Cn'].str.split('、')).explode('Cancer_Cn') expanded_risk['res'] = expanded_risk.groupby('Cancer_Cn')['res'].transform( lambda x: '风险较高' if '风险较高' in x.values else x.iloc[0]) expanded_risk.drop_duplicates(inplace=True) riskfile = os.path.join(self.output_dir, f'{self.name}.risk.txt') expanded_risk.to_csv(riskfile, sep='\t', index=False) if __name__ == "__main__": parser = argparse.ArgumentParser(description="hereditary Process Script") parser.add_argument('-d', '--database', help="Path to hereditary_mut's database", required=True) parser.add_argument('-p', '--project', help="Project name", required=True) parser.add_argument('-n', '--name', help="Name for sample", required=True) parser.add_argument('-f', '--file', help="germline filter file", required=True) parser.add_argument('-o', '--output_dir', help="Output directory, default ./", default='') args = parser.parse_args() hereditary = HereditaryRun(args.database, args.project, args.output_dir, args.name, args.file) hereditary.filter()