diff --git a/tools/t7.py b/tools/t7.py index 3935a8e..a3987b7 100644 --- a/tools/t7.py +++ b/tools/t7.py @@ -3,15 +3,22 @@ import os import time from collections import defaultdict, Counter from datetime import datetime +from io import BytesIO +import openpyxl import pandas as pd -from tools.common import basedir, log +from tools.common import basedir -# 定义一个格式化函数 -def format_date(date): - return date.strftime('%Y-%m-%d') +# 复制样式函数 +def copy_cell_style(src_cell, dest_cell): + dest_cell.font = copy.copy(src_cell.font) + dest_cell.border = copy.copy(src_cell.border) + dest_cell.fill = copy.copy(src_cell.fill) + dest_cell.number_format = copy.copy(src_cell.number_format) + dest_cell.protection = copy.copy(src_cell.protection) + dest_cell.alignment = copy.copy(src_cell.alignment) class AutoLayout: @@ -19,8 +26,8 @@ class AutoLayout: 自动化派样 """ - def __init__(self, path, librarynum, is_use_balance=1, is_use_max=0, output=basedir, data_limit=1750, - data_lower=1700): + def __init__(self, path, librarynum, is_use_balance=1, is_use_max=0, output=basedir, data_limit=1650, + data_lower=1600): self.path = path self.output = output self.librarynum = int(librarynum) @@ -72,14 +79,14 @@ class AutoLayout: # 排序好的文库数据 self.ori_lib_data = list() - self.logger = log(os.path.basename(f'{path}.txt')) + # self.logger = log(os.path.basename(f'{path}.txt')) self.return_log = list() self.no_assign_data = list() # 包lane处理 self.order_assign_data = list() - self.need_cols = self.read_cols() + # self.need_cols = self.read_cols() self.is_use_balance = is_use_balance self.is_use_max = is_use_max @@ -197,6 +204,9 @@ class AutoLayout: today_date = datetime.now() + if '贞固' in row['companynamea'].lower(): + return 999 + if 'nextera' in row['librarystructure'].lower(): return 1000 @@ -206,10 +216,7 @@ class AutoLayout: if row['cycletype'] == '极致周期' or '极致' in row['cycletype']: return 2000 - if row['retestflag'] == '是': - return 3000 - - mytime = row['receivedtime'] + mytime = row['createdtime'] # 判断日期是之前的还是之后的 if mytime < today_date: return 5000 @@ -233,8 +240,8 @@ class AutoLayout: duplicate_groups = grouped.filter(lambda x: len(x) > 1) # 提取这些分组,计算文库重复次数 - grouped_names = duplicate_groups.groupby('indexi5i7')['samplename'].apply(list).reset_index() - random_list = list(set(tuple(sublst) for sublst in list(grouped_names['samplename']))) + grouped_names = duplicate_groups.groupby('indexi5i7')['presamplename'].apply(list).reset_index() + random_list = list(set(tuple(sublst) for sublst in list(grouped_names['presamplename']))) new_lst = [spdata for data in random_list for spdata in data] counts = Counter(new_lst) @@ -430,6 +437,11 @@ class AutoLayout: if self.chip_sublib[chipname].intersection({item['subsamplename'] for item in library_data['data']}): notrepeatsublib = False + # 不平衡文库不能放散样1 + is_not_balance_lib_chip1 = True + if is_balance_lib == '否' and self.loc_chip_num == 1 : + is_not_balance_lib_chip1 = False + if sizelimit and notrepeatbarcode and \ exclusive_classific and \ exclusive_customer and \ @@ -438,7 +450,8 @@ class AutoLayout: spmethylibrary and \ use_huada and \ notrepeatsublib and \ - sizelimit_N: + sizelimit_N and \ + is_not_balance_lib_chip1: return True return False @@ -490,7 +503,7 @@ class AutoLayout: # 数据标准格式 numeric_mask = pd.to_numeric(ori_library_df['orderdatavolume'], errors='coerce').notna() - time_mask = pd.to_datetime(ori_library_df['receivedtime'], errors='coerce').notna() + time_mask = pd.to_datetime(ori_library_df['createdtime'], errors='coerce').notna() # 非正常barcode barcode_mask = ori_library_df['indexi5i7'].str.len() != 16 @@ -510,19 +523,19 @@ class AutoLayout: self.order_assign_data = ori_library_df[orderlane_mask].to_dict('records') - # 使用布尔索引筛选出不是数字和非日期的行,并且不是暂不排样的行, 以及非16位置barcode + # 使用布尔索引筛选出不是数字和非日期的行,包lane的 ori_library_df = ori_library_df[(numeric_mask & time_mask) & (~orderlane_mask)] # 时间格式化 - ori_library_df['receivedtime'] = pd.to_datetime(ori_library_df['receivedtime'], errors='coerce') + ori_library_df['createdtime'] = pd.to_datetime(ori_library_df['createdtime'], errors='coerce') ori_library_df['level'] = ori_library_df.apply(self.level, axis=1) # 极致客户有重复的,把等级调到1900,防止放到了最后,到了未测里 must_lib_df = ori_library_df[ori_library_df['level'] == 2000] - must_lib = set(must_lib_df[must_lib_df.duplicated(subset='indexi5i7', keep=False)]['samplename'].to_list()) - ori_library_df.loc[ori_library_df['samplename'].isin(must_lib), 'level'] = 1900 + must_lib = set(must_lib_df[must_lib_df.duplicated(subset='indexi5i7', keep=False)]['presamplename'].to_list()) + ori_library_df.loc[ori_library_df['presamplename'].isin(must_lib), 'level'] = 1900 - for library, library_df in ori_library_df.groupby('samplename'): + for library, library_df in ori_library_df.groupby('presamplename'): size = library_df['orderdatavolume'].sum() is_balance_lib = library_df['librarybalancedflag'].values[0] @@ -639,13 +652,14 @@ class AutoLayout: ori_library_df = pd.DataFrame(left_data) ori_library_df['level'] = ori_library_df.apply(self.level, axis=1) ori_lib_data = list() - for library, library_df in ori_library_df.groupby('samplename'): + for library, library_df in ori_library_df.groupby('presamplename'): level = library_df['level'].values[0] if library in self.split_lib: level = 1950 ori_lib_data.append(dict( library=library, + sample_code=library_df['sampleCode'].values[0], is_balance_lib=library_df['librarybalancedflag'].values[0], size=library_df['orderdatavolume'].sum(), split_method=library_df['cycletype'].values[0], @@ -693,12 +707,11 @@ class AutoLayout: try: self.assign_samples() self.assign_again_size() - # self.assign_again_size(max_barcode='indexi7') - # self.assign_again_size(max_barcode='indexi5') except Exception as e: self.return_log.append(f'T7排样出错, 请联系!{e}') self.index_assignments = {} outputname = 'assignments_%s_%s' % (datetime.now().strftime("%m%d%H%M"), os.path.basename(self.path)) + outputpath = os.path.join(self.output, 'result', outputname) writer = pd.ExcelWriter(outputpath) @@ -708,7 +721,7 @@ class AutoLayout: if not chip_assignments: continue df = pd.DataFrame(chip_assignments) - df['receivedtime'] = df['receivedtime'].dt.strftime('%Y-%m-%d') + # df['receivedtime'] = df['receivedtime'].dt.strftime('%Y-%m-%d') if [method for method in df['cycletype'].values if '极致' in method]: addname = 'X' @@ -724,14 +737,15 @@ class AutoLayout: df['note'] = '排样管数超标' self.no_assign_data.extend(df.to_dict('records')) continue - librarynum += len(set(df['samplename'].values)) + librarynum += len(set(df['presamplename'].values)) self.dec_barcode_radio(chip_idx) chipname = addname + chip_idx + other_name sum_list = list() - for library, library_df in df.groupby('samplename'): + for library, library_df in df.groupby('presamplename'): sum_list.append(dict( 预排文库编号=library_df['sampleCode'].values[0], + 预排样本名称=library_df['presamplename'].values[0], 二次拆分=library, 客户=library_df['companynamea'].values[0], 类型=library_df['librarystructure'].values[0], @@ -739,6 +753,7 @@ class AutoLayout: )) df_sum = pd.DataFrame(sum_list) res_df = pd.concat([df, df_sum], axis=1) + res_df = pd.concat([pd.DataFrame(self.items), res_df]).reset_index(drop=True) res_df.to_excel(writer, sheet_name=chipname, index=False) chip_loc += 1 @@ -753,22 +768,21 @@ class AutoLayout: no_assign_df = pd.DataFrame(self.no_assign_data) if not no_assign_df.empty: - no_assign_df = no_assign_df.applymap(lambda x: format_date(x) if isinstance(x, pd.Timestamp) else x) no_assign_df_not_balance = ','.join( - set([lib for lib in no_assign_df['samplename'] if lib in self.split_lib])) + set([lib for lib in no_assign_df['presamplename'] if lib in self.split_lib])) if no_assign_df_not_balance: self.return_log.append(f'文库{no_assign_df_not_balance}有做不平衡文库拆分处理,并且没有排完,请核查!') - # if not no_assign_df.empty: - # no_assign_df = no_assign_df[self.need_cols] no_assign_df = pd.concat([pd.DataFrame(self.items), no_assign_df]).reset_index(drop=True) no_assign_df.to_excel(writer, sheet_name='未测', index=False) order_assign_df = pd.DataFrame(self.order_assign_data) + if not order_assign_df.empty: order_assign_df = pd.concat([pd.DataFrame(self.items), order_assign_df]).reset_index(drop=True) order_assign_df.to_excel(writer, sheet_name='包lane', index=False) if self.return_log: pd.DataFrame(self.return_log).to_excel(writer, sheet_name='log', index=False) writer.close() + return outputpath