pipeline/codes/vcf_filter.py

102 lines
3.4 KiB
Python
Executable File

#! /usr/bin/env python3
import argparse
import re
import sys
import pysam
contig_length = """
##contig=<ID=chr1,length=249250621>
##contig=<ID=chr2,length=243199373>
##contig=<ID=chr3,length=198022430>
##contig=<ID=chr4,length=191154276>
##contig=<ID=chr5,length=180915260>
##contig=<ID=chr6,length=171115067>
##contig=<ID=chr7,length=159138663>
##contig=<ID=chrX,length=155270560>
##contig=<ID=chr8,length=146364022>
##contig=<ID=chr9,length=141213431>
##contig=<ID=chr10,length=135534747>
##contig=<ID=chr11,length=135006516>
##contig=<ID=chr12,length=133851895>
##contig=<ID=chr13,length=115169878>
##contig=<ID=chr14,length=107349540>
##contig=<ID=chr15,length=102531392>
##contig=<ID=chr16,length=90354753>
##contig=<ID=chr17,length=81195210>
##contig=<ID=chr18,length=78077248>
##contig=<ID=chr20,length=63025520>
##contig=<ID=chrY,length=59373566>
##contig=<ID=chr19,length=59128983>
##contig=<ID=chr22,length=51304566>
##contig=<ID=chr21,length=48129895>
##contig=<ID=chrM,length=16571>
"""
class VCFFilter:
def __init__(self, input_vcf, output_vcf, filter_expression):
self.input_vcf = input_vcf
self.output_vcf = output_vcf
self.filter_expression = filter_expression
def parse_filter(self, record):
filter_expression = self.filter_expression
variable_name = list(dict(record=record).keys())[0]
# 定义正则表达式模式替换
patterns = [
(r'INFO/(\w+)', fr'{variable_name}.info.get("\1")'),
(r'FORMAT/(\w+)\[(\d+)\]',
fr'{variable_name}.samples.get({variable_name}.samples.keys()[\2]).get("\1")[0]'),
(r'\|{1,2}', ' or '),
(r'&{1,2}', ' and '),
(r'(?<![>=<])=', '==')
]
# 替换
for pattern, replacement in patterns:
filter_expression = re.sub(pattern, replacement, filter_expression)
return eval(filter_expression)
def filter_vcf(self):
if self.output_vcf:
vcf_out = open(self.output_vcf, 'w')
else:
vcf_out = sys.stdout
with pysam.VariantFile(self.input_vcf, 'r') as vcf_in:
# # 获取当前已存在的过滤器信息
# existing_filters = vcf_in.header.filters
# # 添加新的过滤器
# existing_filters.add(id='noise', number='', type='',description='noise; JM add ')
# for chrome in range(1, 23):
# vcf_in.header.add_line(f'##contig=<ID=chr{chrome}>')
# for chrome in ['X', 'Y', 'MT']:
# vcf_in.header.add_line(f'##contig=<ID=chr{chrome}>')
for line in contig_length.strip().splitlines():
vcf_in.header.add_line(line)
header = vcf_in.header
vcf_out.write(str(header))
for record in vcf_in:
if self.parse_filter(record):
vcf_out.write(str(record))
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="VCF Filter")
parser.add_argument("-i", "--input_vcf", help="Input VCF file")
parser.add_argument("-o", "--output_vcf", help="Output VCF file")
parser.add_argument("-e", "--filter_expression", required=True,
help="Filter expression (e.g.,'INFO/STATUS==\"StrongSomatic\"', like bcftools -i expression)")
args = parser.parse_args()
vcf_filter = VCFFilter(args.input_vcf, args.output_vcf, args.filter_expression)
vcf_filter.filter_vcf()