pipeline/script/chemo/sum_chemo.py

185 lines
4.9 KiB
Python
Raw Normal View History

2023-08-25 10:06:31 +08:00
#!/usr/bin/env python3
# -*- coding: UTF-8 -*-
"""
Created on: 2023-02-17
@author: cjs
# 用途统计放化疗的结果看看rs所有的分型是不是都被检测到
# 版本0.0.1
# 最后编辑日期: 2023-02-17
"""
from cjs_test.cjs_logger import Logger
from openpyxl import load_workbook
from glob import glob
import datetime
import traceback
import os
import sys
# 全局参数
Exe_Bin = ''
Exe_Path = ''
GLog = None
Start_Time = None
Drug_Dd = {}
# 命令参数
HEAD_S = '基因名称' # 需要处理列的开始
HEAD_E = '证据等级' # 需要处理列的结束
def Exit_Print(pline=''):
"""显示错误的信息,退出脚本."""
print('%s -O Pro_Out' % Exe_Bin)
if len(pline) > 0:
print(pline)
if GLog:
GLog.info('exit')
GLog.close()
sys.exit(0)
# 处理运行参数
def Get_Opts():
"""获取运行的环境变量."""
global Exe_Bin
global Exe_Path
global GLog
global Start_Time
file_real = os.path.realpath(sys.argv[0])
Exe_Path = os.path.dirname(file_real)
Exe_Bin = os.path.basename(file_real)
Start_Time = datetime.datetime.now()
ymd = Start_Time.__format__('%Y%m%d_%H%M%S')
# 开启日志
log_path = os.path.join(Exe_Path, 'logs', Exe_Bin)
if not os.path.exists(log_path):
os.makedirs(log_path)
log_base = '%s_%s.log' % (Exe_Bin, ymd)
log_full = os.path.join(log_path, log_base)
GLog = Logger(log_full, mode='w')
GLog.info('start')
return 0
def Get_Rs():
"""获取rs的所有分型."""
global Drug_Dd
file = '650.panel化疗位点注释及指导说明.庞杰.20230403.xlsx'
Pro_xlsx = os.path.join(Exe_Path, file)
wb = load_workbook(Pro_xlsx, read_only=True, data_only=False)
wb_sheets = wb.sheetnames
sheet = wb_sheets[0]
ws1 = wb[sheet]
sheet_rows = [row for row in ws1.rows] # 获取所有行
# 表头处理
head_row = sheet_rows[0]
head_lines = []
for cell in head_row:
cell_str = str(cell.value)
head_lines.append(cell_str)
head_start = head_lines.index(HEAD_S)
head_end = head_lines.index(HEAD_E)
end_pos = 0
for row in sheet_rows[1:]:
if end_pos == 1:
break
row_lines = []
for col_index in range(head_start, head_end + 1):
col_str = str(row[col_index].value)
if col_str == 'None':
end_pos = 1
break
row_lines.append(col_str)
# 用药信息的字典构建
snp_rs = row_lines[1]
snp_type = row_lines[2]
if snp_rs not in Drug_Dd:
Drug_Dd[snp_rs] = []
Drug_Dd[snp_rs].append(snp_type)
wb.close()
def Get_Chemo_Txt():
"""获取所有chemo的infos.txt ."""
txts = []
cheom_out = os.path.join(Exe_Path, "cheom_out")
for txt in glob('%s/**/*.drug.infos.txt' % cheom_out, recursive=True):
txts.append(txt)
return txts
def Check_Txts(txts):
# 构建字典
rs_dd = {}
for rs in Drug_Dd:
rs_types = Drug_Dd[rs]
if rs not in rs_dd:
rs_dd[rs] = {}
for rs_type in rs_types:
rs_dd[rs][rs_type] = 0
for txt in txts:
txt_lines = []
txt_rs = {} # 保障每个txt的rs只统计一次
with open(txt, 'r') as ff:
txt_lines = ff.readlines()
for line in txt_lines[1:]:
lns = line.split('\t')
if len(lns) > 1:
rs = lns[2]
rs_type = lns[3]
if rs not in txt_rs:
txt_rs[rs] = []
if rs_type not in txt_rs[rs]:
txt_rs[rs].append(rs_type)
if rs_type in rs_dd[rs]:
rs_dd[rs][rs_type] += 1
else:
if len(rs_type) == 2:
rs_ntype = rs_type[-1] + rs_type[-2]
if rs_ntype in rs_dd[rs]:
rs_dd[rs][rs_ntype] += 1
else:
print('txt:%s, rs:%s, rs_type:%s' % (txt, rs, rs_type))
txt_res = []
for rs in rs_dd:
rs_types = rs_dd[rs]
rs_miss = 1
for rs_type in rs_types:
type_num = rs_dd[rs][rs_type]
line = '%s\t%s\t%s\n' % (rs, rs_type, type_num)
txt_res.append(line)
if type_num != 0:
rs_miss = 0
if rs_miss == 1:
print("rs:%s, 一直没有被检出" % rs)
with open('rs.chemo.res.txt' ,'w') as ff:
ff.writelines(txt_res)
if __name__ == '__main__':
Get_Opts()
Get_Rs()
txts = Get_Chemo_Txt()
print(len(txts))
Check_Txts(txts)
try:
pass
except BaseException:
GLog.error(traceback.format_exc())
print(traceback.format_exc())
endtime = datetime.datetime.now()
GLog.info('end')
GLog.info('run time:%s seconds' % ((endtime - Start_Time).seconds))
GLog.close()