pipeline/codes/vcf_operations.py

107 lines
4.3 KiB
Python
Raw Permalink Normal View History

2023-11-01 10:09:29 +08:00
#! /usr/bin/env python3
2023-10-18 15:59:11 +08:00
2023-11-01 10:09:29 +08:00
import argparse
2023-12-29 10:11:01 +08:00
2023-10-18 15:59:11 +08:00
import pysam
class VcfSetOperations:
2023-12-29 10:11:01 +08:00
def __init__(self, vcf_file1, vcf_file2=None, bed_file=None, merge=False, snp=False):
2023-10-18 15:59:11 +08:00
self.vcf1 = pysam.VariantFile(vcf_file1)
self.vcf2 = pysam.VariantFile(vcf_file2) if vcf_file2 else None
self.bed_regions = self.load_bed_regions(bed_file) if bed_file else None
2023-12-29 10:11:01 +08:00
self.snp = snp
2023-10-18 15:59:11 +08:00
self.merge = merge
def build_record_dict(self, vcf_file):
records = {}
for record in vcf_file:
key = ('chr' + record.contig, record.pos, record.ref, record.alts[0])
records[key] = record
return records
def load_bed_regions(self, bed_file):
bed_regions = set()
if bed_file:
with open(bed_file, 'r') as file:
for line in file:
parts = line.strip().split('\t')
if len(parts) >= 3:
chrom, start, end = parts[0], int(parts[1]), int(parts[2])
bed_regions.add((chrom, start, end))
return bed_regions
def find_intersection(self, output_file):
records2 = self.build_record_dict(self.vcf2) if self.vcf2 else None
for chr in range(1, 23):
self.vcf1.header.add_line(f'##contig=<ID=chr{chr}>')
for chr in ['X', 'Y', 'MT']:
self.vcf1.header.add_line(f'##contig=<ID=chr{chr}>')
output_vcf = pysam.VariantFile(output_file, 'w', header=self.vcf1.header)
for record1 in self.vcf1:
key = (record1.contig, record1.pos, record1.ref, record1.alts[0])
2023-12-29 10:11:01 +08:00
if self.is_within_bed_region(record1, self.snp) and (records2 is None or key in records2):
2023-10-18 15:59:11 +08:00
output_vcf.write(record1)
output_vcf.close()
def merge_vcfs(self, output_file):
output_vcf = pysam.VariantFile(output_file, 'w', header=self.vcf1.header)
for record1 in self.vcf1:
output_vcf.write(record1)
if self.vcf2:
for record2 in self.vcf2:
output_vcf.write(record2)
output_vcf.close()
2023-12-29 10:11:01 +08:00
def is_within_bed_region(self, record, snp=False):
# 如果没有提供 BED 文件,则不筛选
2023-10-18 15:59:11 +08:00
if not self.bed_regions:
2023-12-29 10:11:01 +08:00
return True
# 检查记录是否是点突变 # 如果是点突变,不考虑
if len(record.ref) == 1 and len(record.alts[0]) == 1 and not snp:
return False
2023-10-18 15:59:11 +08:00
for (bed_chrom, bed_start, bed_end) in self.bed_regions:
2023-12-29 10:11:01 +08:00
# 如果在 BED 区域内有插入、删除或替代的变异,返回 True
2023-11-01 10:09:29 +08:00
if record.contig == bed_chrom and bed_start < record.pos <= bed_end:
2023-12-29 10:11:01 +08:00
return True
# 缺失的情况考虑终点位置在bed区域内
if len(record.ref) > len(record.alts[0]):
end_pos = record.pos + len(record.ref)
if record.contig == bed_chrom and bed_start < end_pos <= bed_end:
return True
# 如果在 BED 区域内没有插入、删除或替代的变异,返回 False
return False
2023-10-18 15:59:11 +08:00
def close_files(self):
if self.vcf1:
self.vcf1.close()
if self.vcf2:
self.vcf2.close()
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="VCF Set Operations")
parser.add_argument("vcf1", help="First VCF file")
parser.add_argument("-v", "--vcf2", help="Second VCF file for intersection (optional)")
parser.add_argument("-b", "--bed", help="BED file for filtering (optional)")
2023-12-29 10:11:01 +08:00
parser.add_argument("-s", "--snp", action="store_true", help="BED region for snp ")
2023-10-18 15:59:11 +08:00
parser.add_argument("-m", "--merge", action="store_true", help="Merge two VCF files (optional)")
parser.add_argument("-o", "--output", help="Output file (optional)")
args = parser.parse_args()
if args.merge:
vcf_set_operations = VcfSetOperations(args.vcf1, args.vcf2, args.bed)
vcf_set_operations.merge_vcfs(args.output)
vcf_set_operations.close_files()
else:
if not args.vcf2 and not args.bed:
print("Error: You need to specify either VCF2 or BED filtering.")
else:
2023-12-29 10:11:01 +08:00
vcf_set_operations = VcfSetOperations(args.vcf1, args.vcf2, args.bed, args.snp)
2023-10-18 15:59:11 +08:00
vcf_set_operations.find_intersection(args.output)
vcf_set_operations.close_files()