185 lines
4.9 KiB
Python
185 lines
4.9 KiB
Python
|
|
#!/usr/bin/env python3
|
|||
|
|
# -*- coding: UTF-8 -*-
|
|||
|
|
|
|||
|
|
"""
|
|||
|
|
Created on: 2023-02-17
|
|||
|
|
@author: cjs
|
|||
|
|
# 用途:统计放化疗的结果,看看rs所有的分型是不是都被检测到
|
|||
|
|
# 版本:0.0.1
|
|||
|
|
# 最后编辑日期: 2023-02-17
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
from cjs_test.cjs_logger import Logger
|
|||
|
|
from openpyxl import load_workbook
|
|||
|
|
from glob import glob
|
|||
|
|
import datetime
|
|||
|
|
import traceback
|
|||
|
|
import os
|
|||
|
|
import sys
|
|||
|
|
|
|||
|
|
# 全局参数
|
|||
|
|
Exe_Bin = ''
|
|||
|
|
Exe_Path = ''
|
|||
|
|
GLog = None
|
|||
|
|
Start_Time = None
|
|||
|
|
Drug_Dd = {}
|
|||
|
|
# 命令参数
|
|||
|
|
HEAD_S = '基因名称' # 需要处理列的开始
|
|||
|
|
HEAD_E = '证据等级' # 需要处理列的结束
|
|||
|
|
|
|||
|
|
def Exit_Print(pline=''):
|
|||
|
|
"""显示错误的信息,退出脚本."""
|
|||
|
|
print('%s -O Pro_Out' % Exe_Bin)
|
|||
|
|
if len(pline) > 0:
|
|||
|
|
print(pline)
|
|||
|
|
if GLog:
|
|||
|
|
GLog.info('exit')
|
|||
|
|
GLog.close()
|
|||
|
|
sys.exit(0)
|
|||
|
|
|
|||
|
|
|
|||
|
|
# 处理运行参数
|
|||
|
|
def Get_Opts():
|
|||
|
|
"""获取运行的环境变量."""
|
|||
|
|
global Exe_Bin
|
|||
|
|
global Exe_Path
|
|||
|
|
global GLog
|
|||
|
|
global Start_Time
|
|||
|
|
|
|||
|
|
file_real = os.path.realpath(sys.argv[0])
|
|||
|
|
Exe_Path = os.path.dirname(file_real)
|
|||
|
|
Exe_Bin = os.path.basename(file_real)
|
|||
|
|
|
|||
|
|
Start_Time = datetime.datetime.now()
|
|||
|
|
ymd = Start_Time.__format__('%Y%m%d_%H%M%S')
|
|||
|
|
|
|||
|
|
# 开启日志
|
|||
|
|
log_path = os.path.join(Exe_Path, 'logs', Exe_Bin)
|
|||
|
|
if not os.path.exists(log_path):
|
|||
|
|
os.makedirs(log_path)
|
|||
|
|
log_base = '%s_%s.log' % (Exe_Bin, ymd)
|
|||
|
|
log_full = os.path.join(log_path, log_base)
|
|||
|
|
GLog = Logger(log_full, mode='w')
|
|||
|
|
GLog.info('start')
|
|||
|
|
|
|||
|
|
return 0
|
|||
|
|
|
|||
|
|
|
|||
|
|
def Get_Rs():
|
|||
|
|
"""获取rs的所有分型."""
|
|||
|
|
global Drug_Dd
|
|||
|
|
file = '650.panel化疗位点注释及指导说明.庞杰.20230403.xlsx'
|
|||
|
|
Pro_xlsx = os.path.join(Exe_Path, file)
|
|||
|
|
wb = load_workbook(Pro_xlsx, read_only=True, data_only=False)
|
|||
|
|
wb_sheets = wb.sheetnames
|
|||
|
|
sheet = wb_sheets[0]
|
|||
|
|
ws1 = wb[sheet]
|
|||
|
|
sheet_rows = [row for row in ws1.rows] # 获取所有行
|
|||
|
|
# 表头处理
|
|||
|
|
head_row = sheet_rows[0]
|
|||
|
|
head_lines = []
|
|||
|
|
for cell in head_row:
|
|||
|
|
cell_str = str(cell.value)
|
|||
|
|
head_lines.append(cell_str)
|
|||
|
|
head_start = head_lines.index(HEAD_S)
|
|||
|
|
head_end = head_lines.index(HEAD_E)
|
|||
|
|
end_pos = 0
|
|||
|
|
for row in sheet_rows[1:]:
|
|||
|
|
if end_pos == 1:
|
|||
|
|
break
|
|||
|
|
row_lines = []
|
|||
|
|
for col_index in range(head_start, head_end + 1):
|
|||
|
|
col_str = str(row[col_index].value)
|
|||
|
|
if col_str == 'None':
|
|||
|
|
end_pos = 1
|
|||
|
|
break
|
|||
|
|
row_lines.append(col_str)
|
|||
|
|
# 用药信息的字典构建
|
|||
|
|
snp_rs = row_lines[1]
|
|||
|
|
snp_type = row_lines[2]
|
|||
|
|
if snp_rs not in Drug_Dd:
|
|||
|
|
Drug_Dd[snp_rs] = []
|
|||
|
|
Drug_Dd[snp_rs].append(snp_type)
|
|||
|
|
wb.close()
|
|||
|
|
|
|||
|
|
|
|||
|
|
def Get_Chemo_Txt():
|
|||
|
|
"""获取所有chemo的infos.txt ."""
|
|||
|
|
txts = []
|
|||
|
|
cheom_out = os.path.join(Exe_Path, "cheom_out")
|
|||
|
|
for txt in glob('%s/**/*.drug.infos.txt' % cheom_out, recursive=True):
|
|||
|
|
txts.append(txt)
|
|||
|
|
return txts
|
|||
|
|
|
|||
|
|
|
|||
|
|
def Check_Txts(txts):
|
|||
|
|
|
|||
|
|
# 构建字典
|
|||
|
|
rs_dd = {}
|
|||
|
|
for rs in Drug_Dd:
|
|||
|
|
rs_types = Drug_Dd[rs]
|
|||
|
|
if rs not in rs_dd:
|
|||
|
|
rs_dd[rs] = {}
|
|||
|
|
for rs_type in rs_types:
|
|||
|
|
rs_dd[rs][rs_type] = 0
|
|||
|
|
for txt in txts:
|
|||
|
|
txt_lines = []
|
|||
|
|
txt_rs = {} # 保障每个txt的rs只统计一次
|
|||
|
|
with open(txt, 'r') as ff:
|
|||
|
|
txt_lines = ff.readlines()
|
|||
|
|
for line in txt_lines[1:]:
|
|||
|
|
lns = line.split('\t')
|
|||
|
|
if len(lns) > 1:
|
|||
|
|
rs = lns[2]
|
|||
|
|
rs_type = lns[3]
|
|||
|
|
if rs not in txt_rs:
|
|||
|
|
txt_rs[rs] = []
|
|||
|
|
if rs_type not in txt_rs[rs]:
|
|||
|
|
txt_rs[rs].append(rs_type)
|
|||
|
|
if rs_type in rs_dd[rs]:
|
|||
|
|
rs_dd[rs][rs_type] += 1
|
|||
|
|
else:
|
|||
|
|
if len(rs_type) == 2:
|
|||
|
|
rs_ntype = rs_type[-1] + rs_type[-2]
|
|||
|
|
if rs_ntype in rs_dd[rs]:
|
|||
|
|
rs_dd[rs][rs_ntype] += 1
|
|||
|
|
else:
|
|||
|
|
print('txt:%s, rs:%s, rs_type:%s' % (txt, rs, rs_type))
|
|||
|
|
|
|||
|
|
txt_res = []
|
|||
|
|
for rs in rs_dd:
|
|||
|
|
rs_types = rs_dd[rs]
|
|||
|
|
rs_miss = 1
|
|||
|
|
for rs_type in rs_types:
|
|||
|
|
type_num = rs_dd[rs][rs_type]
|
|||
|
|
line = '%s\t%s\t%s\n' % (rs, rs_type, type_num)
|
|||
|
|
txt_res.append(line)
|
|||
|
|
if type_num != 0:
|
|||
|
|
rs_miss = 0
|
|||
|
|
if rs_miss == 1:
|
|||
|
|
print("rs:%s, 一直没有被检出" % rs)
|
|||
|
|
with open('rs.chemo.res.txt' ,'w') as ff:
|
|||
|
|
ff.writelines(txt_res)
|
|||
|
|
|
|||
|
|
|
|||
|
|
|
|||
|
|
|
|||
|
|
|
|||
|
|
|
|||
|
|
if __name__ == '__main__':
|
|||
|
|
Get_Opts()
|
|||
|
|
Get_Rs()
|
|||
|
|
txts = Get_Chemo_Txt()
|
|||
|
|
print(len(txts))
|
|||
|
|
Check_Txts(txts)
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
pass
|
|||
|
|
except BaseException:
|
|||
|
|
GLog.error(traceback.format_exc())
|
|||
|
|
print(traceback.format_exc())
|
|||
|
|
endtime = datetime.datetime.now()
|
|||
|
|
GLog.info('end')
|
|||
|
|
GLog.info('run time:%s seconds' % ((endtime - Start_Time).seconds))
|
|||
|
|
GLog.close()
|