pipeline/script/chemo/sum_chemo.py

185 lines
4.9 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

#!/usr/bin/env python3
# -*- coding: UTF-8 -*-
"""
Created on: 2023-02-17
@author: cjs
# 用途统计放化疗的结果看看rs所有的分型是不是都被检测到
# 版本0.0.1
# 最后编辑日期: 2023-02-17
"""
from cjs_test.cjs_logger import Logger
from openpyxl import load_workbook
from glob import glob
import datetime
import traceback
import os
import sys
# 全局参数
Exe_Bin = ''
Exe_Path = ''
GLog = None
Start_Time = None
Drug_Dd = {}
# 命令参数
HEAD_S = '基因名称' # 需要处理列的开始
HEAD_E = '证据等级' # 需要处理列的结束
def Exit_Print(pline=''):
"""显示错误的信息,退出脚本."""
print('%s -O Pro_Out' % Exe_Bin)
if len(pline) > 0:
print(pline)
if GLog:
GLog.info('exit')
GLog.close()
sys.exit(0)
# 处理运行参数
def Get_Opts():
"""获取运行的环境变量."""
global Exe_Bin
global Exe_Path
global GLog
global Start_Time
file_real = os.path.realpath(sys.argv[0])
Exe_Path = os.path.dirname(file_real)
Exe_Bin = os.path.basename(file_real)
Start_Time = datetime.datetime.now()
ymd = Start_Time.__format__('%Y%m%d_%H%M%S')
# 开启日志
log_path = os.path.join(Exe_Path, 'logs', Exe_Bin)
if not os.path.exists(log_path):
os.makedirs(log_path)
log_base = '%s_%s.log' % (Exe_Bin, ymd)
log_full = os.path.join(log_path, log_base)
GLog = Logger(log_full, mode='w')
GLog.info('start')
return 0
def Get_Rs():
"""获取rs的所有分型."""
global Drug_Dd
file = '650.panel化疗位点注释及指导说明.庞杰.20230403.xlsx'
Pro_xlsx = os.path.join(Exe_Path, file)
wb = load_workbook(Pro_xlsx, read_only=True, data_only=False)
wb_sheets = wb.sheetnames
sheet = wb_sheets[0]
ws1 = wb[sheet]
sheet_rows = [row for row in ws1.rows] # 获取所有行
# 表头处理
head_row = sheet_rows[0]
head_lines = []
for cell in head_row:
cell_str = str(cell.value)
head_lines.append(cell_str)
head_start = head_lines.index(HEAD_S)
head_end = head_lines.index(HEAD_E)
end_pos = 0
for row in sheet_rows[1:]:
if end_pos == 1:
break
row_lines = []
for col_index in range(head_start, head_end + 1):
col_str = str(row[col_index].value)
if col_str == 'None':
end_pos = 1
break
row_lines.append(col_str)
# 用药信息的字典构建
snp_rs = row_lines[1]
snp_type = row_lines[2]
if snp_rs not in Drug_Dd:
Drug_Dd[snp_rs] = []
Drug_Dd[snp_rs].append(snp_type)
wb.close()
def Get_Chemo_Txt():
"""获取所有chemo的infos.txt ."""
txts = []
cheom_out = os.path.join(Exe_Path, "cheom_out")
for txt in glob('%s/**/*.drug.infos.txt' % cheom_out, recursive=True):
txts.append(txt)
return txts
def Check_Txts(txts):
# 构建字典
rs_dd = {}
for rs in Drug_Dd:
rs_types = Drug_Dd[rs]
if rs not in rs_dd:
rs_dd[rs] = {}
for rs_type in rs_types:
rs_dd[rs][rs_type] = 0
for txt in txts:
txt_lines = []
txt_rs = {} # 保障每个txt的rs只统计一次
with open(txt, 'r') as ff:
txt_lines = ff.readlines()
for line in txt_lines[1:]:
lns = line.split('\t')
if len(lns) > 1:
rs = lns[2]
rs_type = lns[3]
if rs not in txt_rs:
txt_rs[rs] = []
if rs_type not in txt_rs[rs]:
txt_rs[rs].append(rs_type)
if rs_type in rs_dd[rs]:
rs_dd[rs][rs_type] += 1
else:
if len(rs_type) == 2:
rs_ntype = rs_type[-1] + rs_type[-2]
if rs_ntype in rs_dd[rs]:
rs_dd[rs][rs_ntype] += 1
else:
print('txt:%s, rs:%s, rs_type:%s' % (txt, rs, rs_type))
txt_res = []
for rs in rs_dd:
rs_types = rs_dd[rs]
rs_miss = 1
for rs_type in rs_types:
type_num = rs_dd[rs][rs_type]
line = '%s\t%s\t%s\n' % (rs, rs_type, type_num)
txt_res.append(line)
if type_num != 0:
rs_miss = 0
if rs_miss == 1:
print("rs:%s, 一直没有被检出" % rs)
with open('rs.chemo.res.txt' ,'w') as ff:
ff.writelines(txt_res)
if __name__ == '__main__':
Get_Opts()
Get_Rs()
txts = Get_Chemo_Txt()
print(len(txts))
Check_Txts(txts)
try:
pass
except BaseException:
GLog.error(traceback.format_exc())
print(traceback.format_exc())
endtime = datetime.datetime.now()
GLog.info('end')
GLog.info('run time:%s seconds' % ((endtime - Start_Time).seconds))
GLog.close()