layout/tools/t7.py

524 lines
21 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import copy
import os
import time
from collections import defaultdict
from datetime import datetime
import pandas as pd
from tools.common import basedir, log
# 定义一个格式化函数
def format_date(date):
return date.strftime('%Y-%m-%d')
class AutoLayout:
"""
自动化派样
"""
def __init__(self, path, librarynum, is_use_balance=1, is_use_max=0, output=basedir, data_limit=1750):
self.path = path
self.output = output
self.librarynum = int(librarynum)
self.data_limit = data_limit
self.index_assignments = defaultdict(list)
# 芯片数量量大小
self.chip_size = dict()
# 芯片是否极致
self.chip_type = dict()
# 芯片barcode
self.chip_barcode_recode = defaultdict(set)
# 芯片原始数据读取
self.ori_data = self.read_excel()
# 当前锚芯片
self.loc_chip_num = 1
# 芯片客户
self.chip_customer = defaultdict(set)
# 文库
self.chip_classification = defaultdict(set)
self.rule = self.read_rule()
# 不平衡文库
self.chip_speciallib_size = dict()
# 甲基化文库
self.chip_methylib_size = dict()
# Nextera 文库大小
self.chip_speciallib_nextera_size = dict()
# 华大 文库
self.chip_speciallib_huada_size = dict()
self.logger = log(os.path.basename(f'{path}.txt'))
self.return_log = list()
self.no_assign_data = list()
self.ori_lib_data = list()
self.need_cols = self.read_cols()
self.is_use_balance = is_use_balance
self.is_use_max = is_use_max
def count_barcode_radio(self, data):
df = pd.DataFrame(data)
ratio_sites = dict()
is_not_balance_list = []
if df.empty:
return ratio_sites, is_not_balance_list
df['barcode'] = df['barcode'].str.slice(0, 16)
barcode_df = pd.DataFrame(df['barcode'].str.split('', expand=True).iloc[:, 1:-1].values,
columns=['T' + str(x) for x in range(16)]).join(df['data_needed'])
total = barcode_df['data_needed'].sum()
for i in range(16):
column = 'T' + str(i)
col_df = barcode_df.groupby(column).agg({'data_needed': 'sum'})
# 去掉N计数
if 'N' in col_df.index:
base_n_size = col_df.loc['N', 'data_needed']
col_df = col_df.drop('N')
else:
base_n_size = 0
col_df['ratio'] = (col_df['data_needed']) / (total - base_n_size)
ratio = col_df['ratio'].to_dict()
ratio_sites[i] = ratio
A, B, C, D, E, F, G = list(), list(), list(), list(), list(), list(), list()
for decbase in ['A', 'T', 'C', 'G']:
if decbase not in ratio:
ratio[decbase] = 0
if ratio[decbase] >= 0.6:
A.append(decbase)
if 0.2 <= ratio[decbase] < 0.6:
B.append(decbase)
if 0.15 <= ratio[decbase] < 0.2:
C.append(decbase)
if 0.1 <= ratio[decbase] < 0.15:
D.append(decbase)
if 0.08 <= ratio[decbase] < 0.1:
E.append(decbase)
if ratio[decbase] < 0.08:
F.append(decbase)
# 新增一个碱基可行规则
if 0.125 <= ratio[decbase] <= 0.625:
G.append(decbase)
A_num, B_num, C_num, D_num, E_num, F_num, G_num = len(A), len(B), len(C), len(D), len(E), len(F), len(G)
if not ((B_num + C_num + D_num == 4) or (F_num == 1 and (A_num + B_num) == 3) or (
E_num == 1 and D_num == 1 and (A_num + B_num + C_num) == 2) or (
E_num == 1 and (A_num + B_num + C_num) == 3) or (
F_num == 1 and G_num == 3 and self.is_use_max)):
is_not_balance_list.append(
'%s位置,算出结果为 %s' % (i, ratio)
)
return ratio_sites, is_not_balance_list
def dec_barcode_radio(self, chipname):
data = self.index_assignments[chipname]
ratio_sites, is_not_balance_list = self.count_barcode_radio(data)
if is_not_balance_list:
desc = '\n'.join(is_not_balance_list)
self.return_log.append(f'芯片{chipname}有碱基不平衡:\n{desc}')
print(f'芯片{chipname}有碱基不平衡:\n{desc}')
@staticmethod
def level(row):
today_date = datetime.now()
if 'nextera' in row['classification'].lower():
return 10
if '华大' in row['classification']:
return 11
if row['拆分方式'] == '极致周期' or '极致' in row['拆分方式']:
return 20
mytime = row['time']
# 判断日期是之前的还是之后的
if mytime < today_date:
return 30
if '加急' in row['priority']:
return 40
if '补测' in row['priority']:
return 50
else:
return 1000
@staticmethod
def read_rule():
df = pd.read_excel(os.path.join(basedir, 'rule', 'exclusive_classfication.xlsx'))
newdf = pd.DataFrame()
newdf['c1'] = df['c2']
newdf['c2'] = df['c1']
res = pd.concat([df, newdf])
return res.reset_index()
@staticmethod
def read_cols():
df = pd.read_excel(os.path.join(basedir, 'rule', 'columns.xlsx'))
cols = list(df['cols'].values)
return cols
def read_excel(self):
"""
原始数据处理
:return:
"""
merge = pd.read_excel(self.path, None)
ori_data = dict()
for name, sheet in merge.items():
sheet.fillna('', inplace=True)
ori_data[name] = sheet.to_dict('records')
return ori_data
def add_new_data(self, chipname, library_data, newer=True):
"""
增加新数据到已知芯片上
:param chipname:
:param library_data:
:param newer:
:return:
"""
self.index_assignments[chipname].extend(library_data['data'])
self.chip_barcode_recode[chipname].update({item['barcode'] for item in library_data['data']})
self.chip_customer[chipname].add(library_data['customer'])
self.chip_classification[chipname].add(library_data['classification'])
if newer:
self.chip_size[chipname] = library_data['size']
# if library_data['classification'] in ['扩增子', '不平衡文库', '单细胞文库以及甲基化']:
if library_data['is_balance_lib'] == '':
self.chip_speciallib_size[chipname] = library_data['size']
elif library_data['is_balance_lib'] == '甲基化':
self.chip_methylib_size[chipname] = library_data['size']
else:
self.chip_speciallib_size[chipname] = 0
self.chip_methylib_size[chipname] = 0
if 'nextera' in library_data['classification'].lower():
self.chip_speciallib_nextera_size[chipname] = library_data['size']
else:
self.chip_speciallib_nextera_size[chipname] = 0
if '华大' in library_data['classification']:
self.chip_speciallib_huada_size[chipname] = library_data['size']
else:
self.chip_speciallib_huada_size[chipname] = 0
else:
self.chip_size[chipname] += library_data['size']
if library_data['is_balance_lib'] == '':
self.chip_speciallib_size[chipname] += library_data['size']
if library_data['is_balance_lib'] == '甲基化':
self.chip_methylib_size[chipname] += library_data['size']
if 'nextera' in library_data['classification'].lower():
self.chip_speciallib_huada_size[chipname] += library_data['size']
if '华大' in library_data['classification']:
self.chip_speciallib_huada_size[chipname] += library_data['size']
def use_rule(self, chipname, classfication):
may_classfic = set(self.rule[self.rule['c1'] == classfication]['c2'])
if self.chip_customer[chipname].intersection(may_classfic):
return True
return False
def judge_data(self, chipname, library_data):
"""
约束条件
"""
size = library_data['size']
classification = library_data['classification']
is_balance_lib = library_data['is_balance_lib']
# 芯片大小不能超过设定限制
sizelimit = True
if self.chip_size[chipname] + size > self.data_limit:
sizelimit = False
# barcode有重复
notrepeatbarcode = True
if self.chip_barcode_recode[chipname].intersection({item['barcode'] for item in library_data['data']}) or \
self.chip_barcode_recode[chipname].intersection(
{'N' * 8 + item['i7'] for item in library_data['data']}):
notrepeatbarcode = False
# 互斥的文库
exclusive_classific = True
if self.use_rule(chipname, classification):
exclusive_classific = False
# 不平衡文库大于250G 不能添加
splibrary = True
if is_balance_lib == '' and self.chip_speciallib_size[chipname] + size > 250:
splibrary = False
# 甲基化文库不能大于250G
spmethylibrary = True
if is_balance_lib == '甲基化' and self.chip_methylib_size[chipname] + size > 250:
spmethylibrary = False
# 不使用平衡文库
if not self.is_use_balance:
splibrary = True
spmethylibrary = True
# 碱基不平衡不过不添加,保证前面的数据, 在数据达到1200G的时候开始
base_balance = True
if self.chip_size[chipname] > 900:
current_data = copy.deepcopy(self.index_assignments[chipname])
new_data = library_data['data']
current_data.extend(new_data)
ratio_sites, is_not_balance_list = self.count_barcode_radio(current_data)
if is_not_balance_list:
base_balance = False
if sizelimit and notrepeatbarcode and exclusive_classific and splibrary and base_balance and spmethylibrary:
return True
return False
def add_loc_num(self):
"""
锚定芯片号增加
"""
# 有nextera, 华大文库 必须满足大于50G
chipname = f'chip{self.loc_chip_num}'
nextera_size = self.chip_speciallib_nextera_size[chipname]
huada_size = self.chip_speciallib_huada_size[chipname]
flag = True
if 0 < nextera_size < 50:
# 有nextera文库但是不满足50G 去除
nextary_barcode = set()
no_nextary_data = list()
for libdata in self.index_assignments[chipname]:
if libdata['classification'].lower() != 'nextera':
no_nextary_data.append(libdata)
else:
self.no_assign_data.append(libdata)
nextary_barcode.update(libdata['barcode'])
self.index_assignments[chipname] = no_nextary_data
self.chip_barcode_recode[chipname] -= nextary_barcode
self.chip_speciallib_nextera_size[chipname] = 0
self.chip_size[chipname] -= nextera_size
flag = False
if 0 < huada_size < 50:
# 有华大文库但是不满足50G 去除
huada_barcode = set()
no_huada_data = list()
for libdata in self.index_assignments[chipname]:
if libdata['classification'] != '华大':
no_huada_data.append(libdata)
else:
self.no_assign_data.append(libdata)
huada_barcode.update(libdata['barcode'])
self.index_assignments[chipname] = no_huada_data
self.chip_barcode_recode[chipname] -= huada_barcode
self.chip_speciallib_huada_size[chipname] = 0
self.chip_size[chipname] -= huada_size
flag = False
if flag:
self.loc_chip_num += 1
def assign_samples(self):
ori_library_data = list()
if '未测' not in self.ori_data.keys():
raise UserWarning('提供excel没有 未测 sheet ,请核查!')
ori_library_df = pd.DataFrame(self.ori_data['未测'])
# 检查提供excel 是否有必须表头
get_col = set(ori_library_df.columns)
unhave_col = set(self.need_cols) - get_col
if unhave_col:
unhave_from = '; '.join(unhave_col)
raise UserWarning(f'未测表里没有{unhave_from} 表头,请核查!')
# 数据标准格式
numeric_mask = pd.to_numeric(ori_library_df['data_needed'], errors='coerce').notna()
time_mask = pd.to_datetime(ori_library_df['time'], errors='coerce').notna()
# 添加处理status列的逻辑
status_mask = ori_library_df['status'] == '暂不排样'
# 非正常barcode
barcode_mask = ori_library_df['barcode'].str.len() != 16
ori_library_df['note'] = ''
ori_library_df.loc[~numeric_mask, 'note'] = 'data_needed 列非数字'
ori_library_df.loc[~time_mask, 'note'] = 'time 列非日期'
ori_library_df.loc[status_mask, 'note'] = '暂不排样'
ori_library_df.loc[barcode_mask, 'note'] = '非16位barcode'
no_ori_data = ori_library_df[~(numeric_mask & time_mask) | status_mask | barcode_mask]
self.no_assign_data.extend(no_ori_data.to_dict('records'))
# 使用布尔索引筛选出不是数字和非日期的行,并且不是暂不排样的行
ori_library_df = ori_library_df[(numeric_mask & time_mask) & ~status_mask & ~barcode_mask]
# 某个客户的检测的数据超过1个T就单独处理
# summary = ori_library_df.groupby('customer').agg({'data_needed': 'sum'})
# print(summary)
# 时间格式化
ori_library_df['time'] = pd.to_datetime(ori_library_df['time'], errors='coerce')
ori_library_df['level'] = ori_library_df.apply(self.level, axis=1)
# 极致客户有重复的把等级调到0防止放到了最后到了未测里
ori_library_df.loc[
(ori_library_df.duplicated(subset='barcode')) & (ori_library_df['level'] == 20), 'level'] = 19
for library, library_df in ori_library_df.groupby('#library'):
size = library_df['data_needed'].sum()
# 文库内部有重复
if len(library_df['barcode'].values) > len(set(library_df['barcode'].values)):
library_df['note'] = '文库内部有重复'
self.no_assign_data.extend(library_df.to_dict('records'))
continue
# 拆分处理
flag = False
if size > (self.data_limit) / 2:
library_df['data_needed'] = library_df['data_needed'] / 2
flag = True
ori_library_data.append(dict(
library=library,
is_balance_lib=library_df['is_balance_lib'].values[0],
size=library_df['data_needed'].sum(),
split_method=library_df['拆分方式'].values[0],
time=library_df['time'].values[0],
level=library_df['level'].values[0],
customer=library_df['customer'].values[0],
classification=library_df['classification'].values[0],
data=library_df[self.need_cols].to_dict('records')
))
# 拆分对半
if flag:
self.return_log.append(f'文库{library} 已做拆分处理, 请注意!!! ')
ori_library_data.append(dict(
library=library,
is_balance_lib=library_df['is_balance_lib'].values[0],
size=library_df['data_needed'].sum(),
split_method=library_df['拆分方式'].values[0],
time=library_df['time'].values[0],
level=library_df['level'].values[0],
customer=library_df['customer'].values[0],
classification=library_df['classification'].values[0],
data=library_df[self.need_cols].to_dict('records')
))
self.ori_lib_data = sorted(ori_library_data, key=lambda x: (x['level'], x['time']))
# self.ori_lib_data = ori_sort_data
while self.ori_lib_data:
library_data = self.ori_lib_data[0]
chipname = f'chip{self.loc_chip_num}'
# 空白芯片直接添加
if chipname not in self.index_assignments:
self.add_new_data(chipname, library_data)
self.ori_lib_data.remove(library_data)
continue
# 判断条件
if self.judge_data(chipname, library_data):
self.add_new_data(chipname, library_data, newer=False)
self.ori_lib_data.remove(library_data)
else:
for j in range(len(self.ori_lib_data)):
newlibrary_data = self.ori_lib_data[j]
if self.judge_data(chipname, newlibrary_data):
self.ori_lib_data.remove(newlibrary_data)
self.add_new_data(chipname, newlibrary_data, newer=False)
break
j += 1
else:
self.add_loc_num()
if self.chip_size[chipname] > self.data_limit:
self.add_loc_num()
def run(self):
# self.assign_samples()
try:
self.assign_samples()
except Exception as e:
self.return_log.append(f'T7排样出错 请联系!{e}')
self.index_assignments = {}
outputname = 'assignments_%s_%s' % (datetime.now().strftime("%m%d%H%M"), os.path.basename(self.path))
outputpath = os.path.join(self.output, 'result', outputname)
writer = pd.ExcelWriter(outputpath)
chip_loc = 1
librarynum = 0
for chip_idx, chip_assignments in self.index_assignments.items():
if not chip_assignments:
continue
df = pd.DataFrame(chip_assignments)
df['time'] = df['time'].dt.strftime('%Y-%m-%d')
if [method for method in df['拆分方式'].values if '极致' in method]:
addname = 'X'
else:
addname = ''
if df['data_needed'].sum() < 1600 and not addname:
df['note'] = '排样数据量不足1600G'
self.no_assign_data.extend(df.to_dict('records'))
continue
if librarynum > self.librarynum:
df['note'] = '排样管数超标'
self.no_assign_data.extend(df.to_dict('records'))
continue
librarynum += len(set(df['#library'].values))
self.dec_barcode_radio(chip_idx)
chipname = addname + chip_idx
sum_list = list()
for library, library_df in df.groupby('#library'):
sum_list.append(dict(
二次拆分=library,
客户=library_df['customer'].values[0],
类型=library_df['classification'].values[0],
打折前=library_df['data_needed'].sum()
))
df_sum = pd.DataFrame(sum_list)
res_df = pd.concat([df, df_sum], axis=1)
res_df.to_excel(writer, sheet_name=chipname, index=False)
chip_loc += 1
# self.no_assign_data.extend(self.diffic_assign_data)
no_assign_df = pd.DataFrame(self.no_assign_data)
no_assign_df = no_assign_df.applymap(lambda x: format_date(x) if isinstance(x, pd.Timestamp) else x)
if not no_assign_df.empty:
no_assign_df = no_assign_df[self.need_cols]
no_assign_df.to_excel(writer, sheet_name='未测', index=False)
if self.return_log:
pd.DataFrame(self.return_log).to_excel(writer, sheet_name='log', index=False)
writer.close()
return outputpath
if __name__ == '__main__':
start_time = time.time()
filepath = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'example', 'input排样表.xlsx')
output_file = ''
layout = AutoLayout(filepath, output_file)
layout.run()
end_time = time.time()
execution_time = end_time - start_time
print(f"代码执行时间为:{execution_time}")
# server()