layout/tools/t7.py

811 lines
35 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import copy
import os
import time
from collections import defaultdict, Counter
from datetime import datetime
from io import BytesIO
import openpyxl
import pandas as pd
from tools.common import basedir
# 复制样式函数
def copy_cell_style(src_cell, dest_cell):
dest_cell.font = copy.copy(src_cell.font)
dest_cell.border = copy.copy(src_cell.border)
dest_cell.fill = copy.copy(src_cell.fill)
dest_cell.number_format = copy.copy(src_cell.number_format)
dest_cell.protection = copy.copy(src_cell.protection)
dest_cell.alignment = copy.copy(src_cell.alignment)
class AutoLayout:
"""
自动化派样
"""
def __init__(self, path, librarynum, is_use_balance=1, is_use_max=0, output=basedir, data_limit=1650,
data_lower=1600):
self.path = path
self.output = output
self.librarynum = int(librarynum)
self.data_limit = data_limit
self.data_lower = data_lower
self.get_col = list()
self.items = list()
# 芯片原始数据读取
self.ori_data = self.read_excel()
# 记录所有的排好的芯片数据
self.index_assignments = defaultdict(list)
# 记录每个芯片数量大小
self.chip_size = dict()
# 含N端芯片数量大小
self.chip_size_N = dict()
# 记录芯片barcode, i7, i5 barcode信息
self.chip_barcode_recode = defaultdict(set)
self.chip_barcodei7_recode = defaultdict(set)
self.chip_barcodei5_recode = defaultdict(set)
# 当前锚芯片
self.loc_chip_num = 1
# 芯片客户
self.chip_customer = defaultdict(set)
# 文库
self.chip_classification = defaultdict(set)
self.rule = self.read_rule()
self.rule_exclusive_customer = self.read_rule_exclusive_customer()
# subsamplename
self.chip_sublib = defaultdict(set)
# 不平衡文库
self.chip_speciallib_size = dict()
# 甲基化文库
self.chip_methylib_size = dict()
# Nextera 文库大小
self.chip_speciallib_nextera_size = dict()
# 华大 文库
self.chip_speciallib_huada_size = dict()
# 排序好的文库数据
self.ori_lib_data = list()
# self.logger = log(os.path.basename(f'{path}.txt'))
self.return_log = list()
self.no_assign_data = list()
# 包lane处理
self.order_assign_data = list()
# self.need_cols = self.read_cols()
self.is_use_balance = is_use_balance
self.is_use_max = is_use_max
# 记录拆分的不平衡文库
self.split_lib = set()
@staticmethod
def read_cols():
df = pd.read_excel(os.path.join(basedir, 'rule', 'columns.xlsx'))
cols = list(df['cols'].values)
return cols
def read_excel(self):
"""
原始数据处理
:return:
"""
# 获取表头备注
nrow = pd.read_excel(self.path, nrows=1)
self.items = nrow.to_dict('records')
merge = pd.read_excel(self.path, skiprows=[1])
merge.fillna('', inplace=True)
ori_data = merge.to_dict('records')
return ori_data
@staticmethod
def read_rule():
df = pd.read_excel(os.path.join(basedir, 'rule', 'exclusive_classfication.xlsx'))
newdf = pd.DataFrame()
newdf['c1'] = df['c2']
newdf['c2'] = df['c1']
res = pd.concat([df, newdf])
return res.reset_index()
@staticmethod
def read_rule_exclusive_customer():
df = pd.read_excel(os.path.join(basedir, 'rule', 'exclusive_customer.xlsx'))
newdf = pd.DataFrame()
newdf['customer1'] = df['customer2']
newdf['customer2'] = df['customer1']
res = pd.concat([df, newdf])
return res.reset_index()
def count_barcode_radio(self, data, maxt=''):
df = pd.DataFrame(data)
ratio_sites = dict()
is_not_balance_list = []
if df.empty:
return ratio_sites, is_not_balance_list
s, e = 0, 16
if maxt == 'indexi7':
s, e = 8, 16
if maxt == 'indexi5':
s, e = 0, 8
num = e - s
df['indexi5i7'] = df['indexi5i7'].str.slice(s, e)
barcode_df = pd.DataFrame(df['indexi5i7'].str.split('', expand=True).iloc[:, 1:-1].values,
columns=['T' + str(x) for x in range(num)]).join(df['orderdatavolume'])
total = barcode_df['orderdatavolume'].sum()
for i in range(num):
column = 'T' + str(i)
col_df = barcode_df.groupby(column).agg({'orderdatavolume': 'sum'})
# 去掉N计数
if 'N' in col_df.index:
base_n_size = col_df.loc['N', 'orderdatavolume']
col_df = col_df.drop('N')
else:
base_n_size = 0
col_df['ratio'] = (col_df['orderdatavolume']) / (total - base_n_size)
ratio = col_df['ratio'].to_dict()
ratio_sites[i] = ratio
A, B, C, D, E, F, G = list(), list(), list(), list(), list(), list(), list()
for decbase in ['A', 'T', 'C', 'G']:
if decbase not in ratio:
ratio[decbase] = 0
if ratio[decbase] >= 0.6:
A.append(decbase)
if 0.2 <= ratio[decbase] < 0.6:
B.append(decbase)
if 0.15 <= ratio[decbase] < 0.2:
C.append(decbase)
if 0.1 <= ratio[decbase] < 0.15:
D.append(decbase)
if 0.08 <= ratio[decbase] < 0.1:
E.append(decbase)
if ratio[decbase] < 0.08:
F.append(decbase)
# 新增一个碱基可行规则
if 0.125 <= ratio[decbase] <= 0.625:
G.append(decbase)
A_num, B_num, C_num, D_num, E_num, F_num, G_num = len(A), len(B), len(C), len(D), len(E), len(F), len(G)
if not ((B_num + C_num + D_num == 4) or (F_num == 1 and (A_num + B_num) == 3) or (
E_num == 1 and D_num == 1 and (A_num + B_num + C_num) == 2) or (
E_num == 1 and (A_num + B_num + C_num) == 3) or (
F_num == 1 and G_num == 3 and self.is_use_max)):
is_not_balance_list.append(
'%s位置,算出结果为 %s' % (i, ratio)
)
return ratio_sites, is_not_balance_list
def dec_barcode_radio(self, chipname):
data = self.index_assignments[chipname]
ratio_sites, is_not_balance_list = self.count_barcode_radio(data)
if is_not_balance_list:
desc = '\n'.join(is_not_balance_list)
self.return_log.append(f'芯片{chipname}有碱基不平衡:\n{desc}')
print(f'芯片{chipname}有碱基不平衡:\n{desc}')
@staticmethod
def level(row):
today_date = datetime.now()
if '贞固' in row['companynamea'].lower():
return 999
if 'nextera' in row['classification'].lower():
return 1000
if '华大' in row['classification']:
return 1100
if row['cycletype'] == '极致周期' or '极致' in row['cycletype']:
return 2000
mytime = row['createdtime']
# 判断日期是之前的还是之后的
if mytime < today_date:
return 5000
else:
return 100000
def combinations_same_barcode(self):
"""
barcode 有重复的极致样本 进行排列组合,汇集成新的可能性
"""
same_barcode_df = pd.DataFrame(
[spdata for data in self.ori_lib_data if data['level'] == 1900 for spdata in data['data']])
# 按照 'indexi5i7' 列进行分组
if same_barcode_df.empty:
return
grouped = same_barcode_df.groupby('indexi5i7')
# 获取具有重复的 'indexi5i7' 分组
duplicate_groups = grouped.filter(lambda x: len(x) > 1)
# 提取这些分组,计算文库重复次数
grouped_names = duplicate_groups.groupby('indexi5i7')['presamplename'].apply(list).reset_index()
random_list = list(set(tuple(sublst) for sublst in list(grouped_names['presamplename'])))
new_lst = [spdata for data in random_list for spdata in data]
counts = Counter(new_lst)
correct_data = list()
for data in self.ori_lib_data:
if data['library'] in counts:
data['level'] -= counts[data['library']]
correct_data.append(data)
self.ori_lib_data = correct_data
def add_new_data(self, chipname, library_data, newer=True):
"""
增加新数据到已知芯片上
:param chipname:
:param library_data:
:param newer:
:return:
"""
self.index_assignments[chipname].extend(library_data['data'])
self.chip_barcode_recode[chipname].update({item['indexi5i7'] for item in library_data['data']})
self.chip_barcodei7_recode[chipname].update({item['indexi7'] for item in library_data['data']})
self.chip_barcodei5_recode[chipname].update({item['indexi5'] for item in library_data['data']})
# 华大的 文库 i7 不能重复添加N+i7
if '华大' in library_data['classification']:
self.chip_barcode_recode[chipname].update({'N' * 8 + item['indexi7'] for item in library_data['data']})
# self.chip_barcode_recode[chipname].update({item['indexi5'] + 'N' * 8 for item in library_data['data']})
# 子文库
self.chip_sublib[chipname].update({item['subsamplename'] for item in library_data['data']})
self.chip_customer[chipname].add(library_data['customer'])
self.chip_classification[chipname].add(library_data['classification'])
if newer:
self.chip_size[chipname] = library_data['size']
self.chip_size_N[chipname] = 0
if 'N' in library_data['data'][0]['indexi5i7']:
self.chip_size_N[chipname] = library_data['size']
if library_data['is_balance_lib'] == '':
self.chip_speciallib_size[chipname] = library_data['size']
elif '甲基化' in library_data['classification']:
self.chip_methylib_size[chipname] = library_data['size']
else:
self.chip_speciallib_size[chipname] = 0
self.chip_methylib_size[chipname] = 0
if 'nextera' in library_data['classification'].lower():
self.chip_speciallib_nextera_size[chipname] = library_data['size']
else:
self.chip_speciallib_nextera_size[chipname] = 0
if '华大' in library_data['classification']:
self.chip_speciallib_huada_size[chipname] = library_data['size']
else:
self.chip_speciallib_huada_size[chipname] = 0
else:
self.chip_size[chipname] += library_data['size']
if library_data['is_balance_lib'] == '':
self.chip_speciallib_size[chipname] += library_data['size']
if '甲基化' in library_data['classification']:
self.chip_methylib_size[chipname] += library_data['size']
if 'nextera' in library_data['classification'].lower():
self.chip_speciallib_nextera_size[chipname] += library_data['size']
if '华大' in library_data['classification']:
self.chip_speciallib_huada_size[chipname] += library_data['size']
if 'N' in library_data['data'][0]['indexi5i7']:
self.chip_size_N[chipname] += library_data['size']
def use_rule_exclusive_classfication(self, chipname, classfication):
"""
文库不能排在一起
"""
may_classfic = set(self.rule[self.rule['c1'] == classfication]['c2'])
if self.chip_customer[chipname].intersection(may_classfic):
return True
return False
def use_rule_exclusive_customer(self, chipname, customer):
"""文库不能排在一起"""
may_classfic = set(
self.rule_exclusive_customer[self.rule_exclusive_customer['customer1'] == customer]['customer2'])
if self.chip_customer[chipname].intersection(may_classfic):
return True
return False
def judge_data(self, chipname, library_data, max_barcode='all'):
"""
约束条件
"""
size = library_data['size']
size_N = 0
if 'N' in library_data['data'][0]['indexi5i7']:
size_N = library_data['size']
classification = library_data['classification']
customer = library_data['customer']
is_balance_lib = library_data['is_balance_lib']
# library = library_data['library']
# 芯片大小不能超过设定限制
sizelimit = True
if self.chip_size[chipname] + size > self.data_limit:
sizelimit = False
# print(chipname, library, '芯片大小不能超过设定限制')
# barcode有重复
notrepeatbarcode = True
if self.chip_barcode_recode[chipname].intersection({item['indexi5i7'] for item in library_data['data']}) or \
self.chip_barcode_recode[chipname].intersection(
{'N' * 8 + item['indexi7'] for item in library_data['data']}) or \
self.chip_barcode_recode[chipname].intersection(
{item['indexi5'] + 'N' * 8 for item in library_data['data']}):
notrepeatbarcode = False
# print(chipname, library, 'barcode有重复')
# 互斥的文库
exclusive_classific = True
if self.use_rule_exclusive_classfication(chipname, classification):
exclusive_classific = False
# print(chipname, library, '互斥的文库')
# 互斥的用户
exclusive_customer = True
if self.use_rule_exclusive_customer(chipname, customer):
exclusive_customer = False
# print(chipname, library, '互斥的用户')
# 不平衡文库大于250G 不能添加
splibrary = True
if is_balance_lib == '' and self.chip_speciallib_size[chipname] + size > 250:
splibrary = False
# print(chipname, library, '不平衡文库大于250G')
# 甲基化文库不能大于250G
# 甲基化更改成100G
spmethylibrary = True
if is_balance_lib == '甲基化' and self.chip_methylib_size[chipname] + size > 100:
spmethylibrary = False
# print(chipname, library, '甲基化文库不能大于100G')
# 不使用不平衡文库的判断
if not self.is_use_balance:
splibrary = True
spmethylibrary = True
# 碱基不平衡不过不添加,保证前面的数据, 在数据达到1200G的时候开始
base_balance = True
if self.chip_size[chipname] > 900:
current_data = copy.deepcopy(self.index_assignments[chipname])
new_data = library_data['data']
current_data.extend(new_data)
ratio_sites, is_not_balance_list = self.count_barcode_radio(current_data)
if is_not_balance_list:
base_balance = False
# print(chipname, library, '碱基不平衡')
# 含N端的数据量不超过 上面设定碱基不平衡的900G的一半
sizelimit_N = True
if self.chip_size_N[chipname] + size_N > 450:
sizelimit_N = False
# 华大的文库不能超过限制的一半, 华大的数据就不能再加
use_huada = True
if (self.chip_speciallib_huada_size[chipname] > self.data_limit / 2) and ('华大' in classification):
use_huada = False
# print(chipname, library, '华大的文库不能超过限制的一半')
# 开启i5或者i7
if max_barcode != 'all':
base_balance = True
notrepeatbarcode = True
if self.chip_barcodei7_recode[chipname].intersection(
{item['indexi7'] for item in library_data['data']}) and max_barcode == 'indexi7':
notrepeatbarcode = False
if self.chip_barcodei5_recode[chipname].intersection(
{item['indexi5'] for item in library_data['data']}) and max_barcode == 'indexi5':
notrepeatbarcode = False
# 是个N的取消
if ('N' * 8 in {item['indexi5'] for item in library_data['data']}) and max_barcode == 'indexi5':
notrepeatbarcode = False
if ('N' * 8 in {item['indexi7'] for item in library_data['data']}) and max_barcode == 'indexi7':
notrepeatbarcode = False
if self.chip_size[chipname] > 900:
current_data = copy.deepcopy(self.index_assignments[chipname])
new_data = library_data['data']
current_data.extend(new_data)
ratio_sites, is_not_balance_list = self.count_barcode_radio(current_data, maxt=max_barcode)
if is_not_balance_list:
base_balance = False
# 子文库名不能重复
notrepeatsublib = True
if self.chip_sublib[chipname].intersection({item['subsamplename'] for item in library_data['data']}):
notrepeatsublib = False
# 不平衡文库不能放散样1
is_not_balance_lib_chip1 = True
if is_balance_lib == '' and self.loc_chip_num == 1 :
is_not_balance_lib_chip1 = False
if sizelimit and notrepeatbarcode and \
exclusive_classific and \
exclusive_customer and \
splibrary and \
base_balance and \
spmethylibrary and \
use_huada and \
notrepeatsublib and \
sizelimit_N and \
is_not_balance_lib_chip1:
return True
return False
def add_loc_num(self, chipname):
"""
锚定芯片号增加
"""
# 有nextera, 华大文库 必须满足大于50G 到了芯片结算
# chipname = f'chip{self.loc_chip_num}'
nextera_size = self.chip_speciallib_nextera_size[chipname]
huada_size = self.chip_speciallib_huada_size[chipname]
flag = True
if 0 < nextera_size < 50:
# 有nextera文库但是不满足50G 去除
nextary_barcode = set()
no_nextary_data = list()
for libdata in self.index_assignments[chipname]:
if libdata['classification'].lower() != 'nextera':
no_nextary_data.append(libdata)
else:
self.no_assign_data.append(libdata)
nextary_barcode.update(libdata['indexi5i7'])
self.index_assignments[chipname] = no_nextary_data
self.chip_barcode_recode[chipname] -= nextary_barcode
self.chip_speciallib_nextera_size[chipname] = 0
self.chip_size[chipname] -= nextera_size
flag = False
if 0 < huada_size < 50:
# 有华大文库但是不满足50G 去除
huada_barcode = set()
no_huada_data = list()
for libdata in self.index_assignments[chipname]:
if '华大' not in libdata['classification']:
no_huada_data.append(libdata)
else:
self.no_assign_data.append(libdata)
huada_barcode.update(libdata['indexi5i7'])
self.index_assignments[chipname] = no_huada_data
self.chip_barcode_recode[chipname] -= huada_barcode
self.chip_speciallib_huada_size[chipname] = 0
self.chip_size[chipname] -= huada_size
flag = False
if flag:
self.loc_chip_num += 1
def assign_samples(self):
ori_library_df = pd.DataFrame(self.ori_data)
# 数据标准格式
numeric_mask = pd.to_numeric(ori_library_df['orderdatavolume'], errors='coerce').notna()
time_mask = pd.to_datetime(ori_library_df['createdtime'], errors='coerce').notna()
# 非正常barcode
barcode_mask = ori_library_df['indexi5i7'].str.len() != 16
ori_library_df.loc[barcode_mask, 'indexi5i7'] = ori_library_df.loc[barcode_mask, 'indexi5'].str[-8:] + \
ori_library_df.loc[barcode_mask, 'indexi7'].str[-8:]
ori_library_df.loc[barcode_mask, 'indexi5'] = ori_library_df.loc[barcode_mask, 'indexi5'].str[-8:]
ori_library_df.loc[barcode_mask, 'indexi7'] = ori_library_df.loc[barcode_mask, 'indexi7'].str[-8:]
# 非上海实验室
no_need_lab_mask = ori_library_df['createdbyorgid'] != '上海实验室'
ori_library_df['note'] = ''
ori_library_df.loc[~numeric_mask, 'note'] = 'data_needed 列非数字'
ori_library_df.loc[~time_mask, 'note'] = 'time 列非日期'
ori_library_df.loc[barcode_mask, 'note'] = '非16位barcode已修改'
ori_library_df.loc[no_need_lab_mask, 'note'] = '非上海实验室'
no_ori_data = ori_library_df[(~(numeric_mask & time_mask)) | no_need_lab_mask]
self.no_assign_data.extend(no_ori_data.to_dict('records'))
# 包lane的剔除
# orderlane_mask = ori_library_df['lanepackcode'].str.contains('包lane')
orderlane_mask = ori_library_df['lanepackcode'] != ''
self.order_assign_data = ori_library_df[orderlane_mask].to_dict('records')
# 使用布尔索引筛选出不是数字和非日期的行包lane的
ori_library_df = ori_library_df[(numeric_mask & time_mask) & (~orderlane_mask) & (~no_need_lab_mask)]
# 时间格式化
ori_library_df['createdtime'] = pd.to_datetime(ori_library_df['createdtime'], errors='coerce')
ori_library_df['level'] = ori_library_df.apply(self.level, axis=1)
# 极致客户有重复的把等级调到1900防止放到了最后到了未测里
must_lib_df = ori_library_df[ori_library_df['level'] == 2000]
must_lib = set(must_lib_df[must_lib_df.duplicated(subset='indexi5i7', keep=False)]['presamplename'].to_list())
ori_library_df.loc[ori_library_df['presamplename'].isin(must_lib), 'level'] = 1900
for library, library_df in ori_library_df.groupby('presamplename'):
size = library_df['orderdatavolume'].sum()
is_balance_lib = library_df['librarybalancedflag'].values[0]
# 文库内部有重复
if len(library_df['indexi5i7'].values) > len(set(library_df['indexi5i7'].values)):
library_df['note'] = '文库内部有重复'
self.no_assign_data.extend(library_df.to_dict('records'))
continue
# 不平衡文库 大于250G 的数据 先进行拆分
# 取消 20240912
if is_balance_lib == '' and size > 250:
self.return_log.append(f'文库{library} 是不平衡文库, 数据为{size}, 大于250G, 已做拆分处理, 请注意!!! ')
data_needed = library_df['orderdatavolume'].copy()
for num in range(int(size), 0, -200):
addnum = 200
if num <= 200:
addnum = num
library_df['orderdatavolume'] = (addnum / size) * data_needed
self.ori_lib_data.append(dict(
library=library,
sample_code=library_df['sampleCode'].values[0],
is_balance_lib=library_df['librarybalancedflag'].values[0],
size=library_df['orderdatavolume'].sum(),
split_method=library_df['cycletype'].values[0],
time=library_df['receivedtime'].values[0],
level=1950,
customer=library_df['companynamea'].values[0],
classification=library_df['classification'].values[0],
data=library_df.to_dict('records')
))
self.split_lib.add(library)
continue
# # 拆分处理 分为了2个大文库
# 取消 20240912
if size > self.data_limit / 2:
library_df['orderdatavolume'] = library_df['orderdatavolume'] / 2
self.return_log.append(f'文库{library} 已做拆分处理, 请注意!!! ')
self.ori_lib_data.append(dict(
library=library,
sample_code=library_df['sampleCode'].values[0],
is_balance_lib=library_df['librarybalancedflag'].values[0],
size=library_df['orderdatavolume'].sum(),
split_method=library_df['cycletype'].values[0],
time=library_df['receivedtime'].values[0],
level=library_df['level'].values[0],
customer=library_df['companynamea'].values[0],
classification=library_df['classification'].values[0],
data=library_df.to_dict('records')
))
self.ori_lib_data.append(dict(
library=library,
sample_code=library_df['sampleCode'].values[0],
is_balance_lib=library_df['librarybalancedflag'].values[0],
size=library_df['orderdatavolume'].sum(),
split_method=library_df['cycletype'].values[0],
time=library_df['receivedtime'].values[0],
level=library_df['level'].values[0],
customer=library_df['companynamea'].values[0],
classification=library_df['classification'].values[0],
data=library_df.to_dict('records')
))
self.combinations_same_barcode()
self.ori_lib_data = sorted(self.ori_lib_data, key=lambda x: (x['level'], x['time']))
while self.ori_lib_data:
library_data = self.ori_lib_data[0]
chipname = f'chip{self.loc_chip_num}'
# 空白芯片直接添加
if chipname not in self.index_assignments:
self.add_new_data(chipname, library_data)
self.ori_lib_data.remove(library_data)
continue
# 判断条件
if self.judge_data(chipname, library_data):
self.add_new_data(chipname, library_data, newer=False)
self.ori_lib_data.remove(library_data)
else:
for j in range(len(self.ori_lib_data)):
newlibrary_data = self.ori_lib_data[j]
if self.judge_data(chipname, newlibrary_data):
self.ori_lib_data.remove(newlibrary_data)
self.add_new_data(chipname, newlibrary_data, newer=False)
break
j += 1
else:
self.add_loc_num(chipname)
if self.chip_size[chipname] > self.data_limit:
self.add_loc_num(chipname)
def assign_again_size(self, max_barcode='all'):
"""
剩余的数据
"""
left_data = list()
no_need_chipname = list()
for chip_idx, chip_assignments in self.index_assignments.items():
if not chip_assignments:
continue
df = pd.DataFrame(chip_assignments)
if df['orderdatavolume'].sum() < self.data_lower:
left_data.extend(chip_assignments)
no_need_chipname.append(chip_idx)
for chip_idx in no_need_chipname:
del self.index_assignments[chip_idx]
if not left_data:
return
ori_library_df = pd.DataFrame(left_data)
ori_library_df['level'] = ori_library_df.apply(self.level, axis=1)
ori_lib_data = list()
for library, library_df in ori_library_df.groupby('presamplename'):
level = library_df['level'].values[0]
if library in self.split_lib:
level = 1950
ori_lib_data.append(dict(
library=library,
sample_code=library_df['sampleCode'].values[0],
is_balance_lib=library_df['librarybalancedflag'].values[0],
size=library_df['orderdatavolume'].sum(),
split_method=library_df['cycletype'].values[0],
time=library_df['receivedtime'].values[0],
level=level,
customer=library_df['companynamea'].values[0],
classification=library_df['classification'].values[0],
data=library_df.to_dict('records')
))
ori_lib_data = sorted(ori_lib_data, key=lambda x: (x['level'], x['time'], -x['size']))
self.loc_chip_num = 100
while ori_lib_data:
library_data = ori_lib_data[0]
chipname = f'chip{self.loc_chip_num}_{max_barcode}' if max_barcode != 'all' else f'chip{self.loc_chip_num}'
# 空白芯片直接添加
if chipname not in self.index_assignments:
self.add_new_data(chipname, library_data)
ori_lib_data.remove(library_data)
continue
# 判断条件
if self.judge_data(chipname, library_data, max_barcode=max_barcode):
self.add_new_data(chipname, library_data, newer=False)
ori_lib_data.remove(library_data)
else:
for j in range(len(ori_lib_data)):
newlibrary_data = ori_lib_data[j]
if self.judge_data(chipname, newlibrary_data, max_barcode=max_barcode):
ori_lib_data.remove(newlibrary_data)
self.add_new_data(chipname, newlibrary_data, newer=False)
break
j += 1
else:
self.add_loc_num(chipname)
if self.chip_size[chipname] > self.data_limit:
self.add_loc_num(chipname)
def run(self):
# print('# 测试代码')
# self.assign_samples()
# self.assign_again_size()
try:
self.assign_samples()
self.assign_again_size()
except Exception as e:
self.return_log.append(f'T7排样出错 请联系!{e}')
self.index_assignments = {}
outputname = 'assignments_%s_%s' % (datetime.now().strftime("%m%d%H%M"), os.path.basename(self.path))
outputpath = os.path.join(self.output, 'result', outputname)
writer = pd.ExcelWriter(outputpath)
chip_loc = 1
librarynum = 0
for chip_idx, chip_assignments in self.index_assignments.items():
if not chip_assignments:
continue
df = pd.DataFrame(chip_assignments)
# df['receivedtime'] = df['receivedtime'].dt.strftime('%Y-%m-%d')
if [method for method in df['cycletype'].values if '极致' in method]:
addname = 'X'
else:
addname = ''
other_name = ''
if df['orderdatavolume'].sum() < (self.data_lower - 50) and not addname:
df['note'] = f'排样数据量不足{self.data_lower - 50}G'
self.no_assign_data.extend(df.to_dict('records'))
continue
if librarynum > self.librarynum:
df['note'] = '排样管数超标'
self.no_assign_data.extend(df.to_dict('records'))
continue
librarynum += len(set(df['presamplename'].values))
self.dec_barcode_radio(chip_idx)
chipname = addname + chip_idx + other_name
sum_list = list()
for library, library_df in df.groupby('presamplename'):
sum_list.append(dict(
预排文库编号=library_df['sampleCode'].values[0],
预排样本名称=library_df['presamplename'].values[0],
二次拆分=library,
客户=library_df['companynamea'].values[0],
文库结构=library_df['classification'].values[0],
打折前=library_df['orderdatavolume'].sum(),
建库类型=library_df['librarytype'].values[0],
文库备注=library_df['customerremarks'].values[0],
测序类型=library_df['seqtype'].values[0],
下单备注=library_df['orderremarks'].values[0],
))
df_sum = pd.DataFrame(sum_list)
res_df = pd.concat([df, df_sum], axis=1)
res_df = pd.concat([pd.DataFrame(self.items), res_df]).reset_index(drop=True)
res_df['id'] = res_df['id'].astype(str)
res_df.to_excel(writer, sheet_name=chipname, index=False)
chip_loc += 1
# res_df = pd.DataFrame(res)
# res_df = pd.concat([pd.DataFrame(self.items), res_df]).reset_index(drop=True)
# res_df.to_excel(writer, sheet_name='assignment', index=False)
# for sum_sheet in sum_res:
# sheetname = sum_sheet.get('sheetname')
# df_data = sum_sheet.get('data')
# df_data.to_excel(writer, sheet_name=sheetname, index=False)
no_assign_df = pd.DataFrame(self.no_assign_data)
if not no_assign_df.empty:
no_assign_df_not_balance = ','.join(
set([lib for lib in no_assign_df['presamplename'] if lib in self.split_lib]))
if no_assign_df_not_balance:
self.return_log.append(f'文库{no_assign_df_not_balance}有做不平衡文库拆分处理,并且没有排完,请核查!')
no_assign_df = pd.concat([pd.DataFrame(self.items), no_assign_df]).reset_index(drop=True)
no_assign_df.to_excel(writer, sheet_name='未测', index=False)
order_assign_df = pd.DataFrame(self.order_assign_data)
if not order_assign_df.empty:
order_assign_df = pd.concat([pd.DataFrame(self.items), order_assign_df]).reset_index(drop=True)
order_assign_df.to_excel(writer, sheet_name='包lane', index=False)
if self.return_log:
pd.DataFrame(self.return_log).to_excel(writer, sheet_name='log', index=False)
writer.close()
return outputpath
if __name__ == '__main__':
start_time = time.time()
filepath = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'example', 'input排样表.xlsx')
output_file = ''
layout = AutoLayout(filepath, output_file)
layout.run()
end_time = time.time()
execution_time = end_time - start_time
print(f"代码执行时间为:{execution_time}")