import argparse import pysam class VcfSetOperations: def __init__(self, vcf_file1, vcf_file2=None, bed_file=None, merge=False): self.vcf1 = pysam.VariantFile(vcf_file1) self.vcf2 = pysam.VariantFile(vcf_file2) if vcf_file2 else None self.bed_regions = self.load_bed_regions(bed_file) if bed_file else None self.merge = merge def build_record_dict(self, vcf_file): records = {} for record in vcf_file: key = ('chr' + record.contig, record.pos, record.ref, record.alts[0]) records[key] = record return records def load_bed_regions(self, bed_file): bed_regions = set() if bed_file: with open(bed_file, 'r') as file: for line in file: parts = line.strip().split('\t') if len(parts) >= 3: chrom, start, end = parts[0], int(parts[1]), int(parts[2]) bed_regions.add((chrom, start, end)) return bed_regions def find_intersection(self, output_file): records2 = self.build_record_dict(self.vcf2) if self.vcf2 else None for chr in range(1, 23): self.vcf1.header.add_line(f'##contig=') for chr in ['X', 'Y', 'MT']: self.vcf1.header.add_line(f'##contig=') output_vcf = pysam.VariantFile(output_file, 'w', header=self.vcf1.header) for record1 in self.vcf1: key = (record1.contig, record1.pos, record1.ref, record1.alts[0]) if self.is_within_bed_region(record1) and (records2 is None or key in records2): output_vcf.write(record1) output_vcf.close() def merge_vcfs(self, output_file): output_vcf = pysam.VariantFile(output_file, 'w', header=self.vcf1.header) for record1 in self.vcf1: output_vcf.write(record1) if self.vcf2: for record2 in self.vcf2: output_vcf.write(record2) output_vcf.close() def is_within_bed_region(self, record): if not self.bed_regions: return True # 如果没有提供 BED 文件,则不筛选 # 检查记录是否是点突变 if len(record.ref) == 1 and len(record.alts[0]) == 1: return False # 如果是点突变,不考虑 for (bed_chrom, bed_start, bed_end) in self.bed_regions: if record.contig == bed_chrom and bed_start <= record.pos <= bed_end: return True # 如果在 BED 区域内有插入、删除或替代的变异,返回 True return False # 如果在 BED 区域内没有插入、删除或替代的变异,返回 False def close_files(self): if self.vcf1: self.vcf1.close() if self.vcf2: self.vcf2.close() if __name__ == "__main__": parser = argparse.ArgumentParser(description="VCF Set Operations") parser.add_argument("vcf1", help="First VCF file") parser.add_argument("-v", "--vcf2", help="Second VCF file for intersection (optional)") parser.add_argument("-b", "--bed", help="BED file for filtering (optional)") parser.add_argument("-m", "--merge", action="store_true", help="Merge two VCF files (optional)") parser.add_argument("-o", "--output", help="Output file (optional)") args = parser.parse_args() if args.merge: vcf_set_operations = VcfSetOperations(args.vcf1, args.vcf2, args.bed) vcf_set_operations.merge_vcfs(args.output) vcf_set_operations.close_files() else: if not args.vcf2 and not args.bed: print("Error: You need to specify either VCF2 or BED filtering.") else: vcf_set_operations = VcfSetOperations(args.vcf1, args.vcf2, args.bed) vcf_set_operations.find_intersection(args.output) vcf_set_operations.close_files()