pipeline/codes/vcf_filter.py

65 lines
2.1 KiB
Python
Executable File

#! /usr/bin/env python3
import argparse
import re
import sys
import pysam
class VCFFilter:
def __init__(self, input_vcf, output_vcf, filter_expression):
self.input_vcf = input_vcf
self.output_vcf = output_vcf
self.filter_expression = filter_expression
def parse_filter(self, record):
filter_expression = self.filter_expression
variable_name = list(dict(record=record).keys())[0]
# 定义正则表达式模式替换
patterns = [
(r'INFO/(\w+)', fr'{variable_name}.info.get("\1")'),
(r'FORMAT/(\w+)\[(\d+)\]',
fr'{variable_name}.samples.get({variable_name}.samples.keys()[\2]).get("\1")[0]'),
(r'\|{1,2}', ' or '),
(r'&{1,2}', ' and '),
(r'(?<![>=<])=', '==')
]
# 替换
for pattern, replacement in patterns:
filter_expression = re.sub(pattern, replacement, filter_expression)
return eval(filter_expression)
def filter_vcf(self):
if self.output_vcf:
vcf_out = open(self.output_vcf, 'w')
else:
vcf_out = sys.stdout
with pysam.VariantFile(self.input_vcf, 'r') as vcf_in:
for chrome in range(1, 23):
vcf_in.header.add_line(f'##contig=<ID=chr{chrome}>')
for chrome in ['X', 'Y', 'MT']:
vcf_in.header.add_line(f'##contig=<ID=chr{chrome}>')
header = vcf_in.header
vcf_out.write(str(header))
for record in vcf_in:
if self.parse_filter(record):
vcf_out.write(str(record))
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="VCF Filter")
parser.add_argument("-i", "--input_vcf", help="Input VCF file")
parser.add_argument("-o", "--output_vcf", help="Output VCF file")
parser.add_argument("-e", "--filter_expression", required=True,
help="Filter expression (e.g.,'INFO/STATUS==\"StrongSomatic\"', like bcftools -i expression)")
args = parser.parse_args()
vcf_filter = VCFFilter(args.input_vcf, args.output_vcf, args.filter_expression)
vcf_filter.filter_vcf()