pipeline/codes/vcf_operations.py

107 lines
4.3 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

#! /usr/bin/env python3
import argparse
import pysam
class VcfSetOperations:
def __init__(self, vcf_file1, vcf_file2=None, bed_file=None, merge=False, snp=False):
self.vcf1 = pysam.VariantFile(vcf_file1)
self.vcf2 = pysam.VariantFile(vcf_file2) if vcf_file2 else None
self.bed_regions = self.load_bed_regions(bed_file) if bed_file else None
self.snp = snp
self.merge = merge
def build_record_dict(self, vcf_file):
records = {}
for record in vcf_file:
key = ('chr' + record.contig, record.pos, record.ref, record.alts[0])
records[key] = record
return records
def load_bed_regions(self, bed_file):
bed_regions = set()
if bed_file:
with open(bed_file, 'r') as file:
for line in file:
parts = line.strip().split('\t')
if len(parts) >= 3:
chrom, start, end = parts[0], int(parts[1]), int(parts[2])
bed_regions.add((chrom, start, end))
return bed_regions
def find_intersection(self, output_file):
records2 = self.build_record_dict(self.vcf2) if self.vcf2 else None
for chr in range(1, 23):
self.vcf1.header.add_line(f'##contig=<ID=chr{chr}>')
for chr in ['X', 'Y', 'MT']:
self.vcf1.header.add_line(f'##contig=<ID=chr{chr}>')
output_vcf = pysam.VariantFile(output_file, 'w', header=self.vcf1.header)
for record1 in self.vcf1:
key = (record1.contig, record1.pos, record1.ref, record1.alts[0])
if self.is_within_bed_region(record1, self.snp) and (records2 is None or key in records2):
output_vcf.write(record1)
output_vcf.close()
def merge_vcfs(self, output_file):
output_vcf = pysam.VariantFile(output_file, 'w', header=self.vcf1.header)
for record1 in self.vcf1:
output_vcf.write(record1)
if self.vcf2:
for record2 in self.vcf2:
output_vcf.write(record2)
output_vcf.close()
def is_within_bed_region(self, record, snp=False):
# 如果没有提供 BED 文件,则不筛选
if not self.bed_regions:
return True
# 检查记录是否是点突变 # 如果是点突变,不考虑
if len(record.ref) == 1 and len(record.alts[0]) == 1 and not snp:
return False
for (bed_chrom, bed_start, bed_end) in self.bed_regions:
# 如果在 BED 区域内有插入、删除或替代的变异,返回 True
if record.contig == bed_chrom and bed_start < record.pos <= bed_end:
return True
# 缺失的情况考虑终点位置在bed区域内
if len(record.ref) > len(record.alts[0]):
end_pos = record.pos + len(record.ref)
if record.contig == bed_chrom and bed_start < end_pos <= bed_end:
return True
# 如果在 BED 区域内没有插入、删除或替代的变异,返回 False
return False
def close_files(self):
if self.vcf1:
self.vcf1.close()
if self.vcf2:
self.vcf2.close()
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="VCF Set Operations")
parser.add_argument("vcf1", help="First VCF file")
parser.add_argument("-v", "--vcf2", help="Second VCF file for intersection (optional)")
parser.add_argument("-b", "--bed", help="BED file for filtering (optional)")
parser.add_argument("-s", "--snp", action="store_true", help="BED region for snp ")
parser.add_argument("-m", "--merge", action="store_true", help="Merge two VCF files (optional)")
parser.add_argument("-o", "--output", help="Output file (optional)")
args = parser.parse_args()
if args.merge:
vcf_set_operations = VcfSetOperations(args.vcf1, args.vcf2, args.bed)
vcf_set_operations.merge_vcfs(args.output)
vcf_set_operations.close_files()
else:
if not args.vcf2 and not args.bed:
print("Error: You need to specify either VCF2 or BED filtering.")
else:
vcf_set_operations = VcfSetOperations(args.vcf1, args.vcf2, args.bed, args.snp)
vcf_set_operations.find_intersection(args.output)
vcf_set_operations.close_files()