#! /usr/bin/env python3 import argparse import re import sys import pysam contig_length = """ ##contig= ##contig= ##contig= ##contig= ##contig= ##contig= ##contig= ##contig= ##contig= ##contig= ##contig= ##contig= ##contig= ##contig= ##contig= ##contig= ##contig= ##contig= ##contig= ##contig= ##contig= ##contig= ##contig= ##contig= ##contig= """ class VCFFilter: def __init__(self, input_vcf, output_vcf, filter_expression): self.input_vcf = input_vcf self.output_vcf = output_vcf self.filter_expression = filter_expression def parse_filter(self, record): filter_expression = self.filter_expression variable_name = list(dict(record=record).keys())[0] # 定义正则表达式模式替换 patterns = [ (r'INFO/(\w+)', fr'{variable_name}.info.get("\1")'), (r'FORMAT/(\w+)\[(\d+)\]', fr'{variable_name}.samples.get({variable_name}.samples.keys()[\2]).get("\1")[0]'), (r'\|{1,2}', ' or '), (r'&{1,2}', ' and '), (r'(?=<])=', '==') ] # 替换 for pattern, replacement in patterns: filter_expression = re.sub(pattern, replacement, filter_expression) return eval(filter_expression) def filter_vcf(self): if self.output_vcf: vcf_out = open(self.output_vcf, 'w') else: vcf_out = sys.stdout with pysam.VariantFile(self.input_vcf, 'r') as vcf_in: # # 获取当前已存在的过滤器信息 # existing_filters = vcf_in.header.filters # # 添加新的过滤器 # existing_filters.add(id='noise', number='', type='',description='noise; JM add ') # for chrome in range(1, 23): # vcf_in.header.add_line(f'##contig=') # for chrome in ['X', 'Y', 'MT']: # vcf_in.header.add_line(f'##contig=') for line in contig_length.strip().splitlines(): vcf_in.header.add_line(line) header = vcf_in.header vcf_out.write(str(header)) for record in vcf_in: if self.parse_filter(record): vcf_out.write(str(record)) if __name__ == "__main__": parser = argparse.ArgumentParser(description="VCF Filter") parser.add_argument("-i", "--input_vcf", help="Input VCF file") parser.add_argument("-o", "--output_vcf", help="Output VCF file") parser.add_argument("-e", "--filter_expression", required=True, help="Filter expression (e.g.,'INFO/STATUS==\"StrongSomatic\"', like bcftools -i expression)") args = parser.parse_args() vcf_filter = VCFFilter(args.input_vcf, args.output_vcf, args.filter_expression) vcf_filter.filter_vcf()