import copy import os import time from collections import defaultdict, Counter from datetime import datetime import pandas as pd from tools.common import basedir, log # 定义一个格式化函数 def format_date(date): return date.strftime('%Y-%m-%d') class AutoLayout: """ 自动化派样 """ def __init__(self, path, librarynum, is_use_balance=1, is_use_max=0, output=basedir, data_limit=1750, data_lower=1700): self.path = path self.output = output self.librarynum = int(librarynum) self.data_limit = data_limit self.data_lower = data_lower # 芯片原始数据读取 self.ori_data = self.read_excel() # 记录所有的排好的芯片数据 self.index_assignments = defaultdict(list) # 记录每个芯片数量大小 self.chip_size = dict() # 含N端芯片数量大小 self.chip_size_N = dict() # 记录芯片barcode, i7, i5 barcode信息 self.chip_barcode_recode = defaultdict(set) self.chip_barcodei7_recode = defaultdict(set) self.chip_barcodei5_recode = defaultdict(set) # 当前锚芯片 self.loc_chip_num = 1 # 芯片客户 self.chip_customer = defaultdict(set) # 文库 self.chip_classification = defaultdict(set) self.rule = self.read_rule() self.rule_exclusive_customer = self.read_rule_exclusive_customer() # 子文库名称 self.chip_sublib = defaultdict(set) # 不平衡文库 self.chip_speciallib_size = dict() # 甲基化文库 self.chip_methylib_size = dict() # Nextera 文库大小 self.chip_speciallib_nextera_size = dict() # 华大 文库 self.chip_speciallib_huada_size = dict() # 排序好的文库数据 self.ori_lib_data = list() self.logger = log(os.path.basename(f'{path}.txt')) self.return_log = list() self.no_assign_data = list() self.need_cols = self.read_cols() self.is_use_balance = is_use_balance self.is_use_max = is_use_max # 记录拆分的不平衡文库 self.split_lib = set() @staticmethod def read_cols(): df = pd.read_excel(os.path.join(basedir, 'rule', 'columns.xlsx')) cols = list(df['cols'].values) return cols def read_excel(self): """ 原始数据处理 :return: """ merge = pd.read_excel(self.path, None) ori_data = dict() for name, sheet in merge.items(): sheet.fillna('', inplace=True) ori_data[name] = sheet.to_dict('records') return ori_data @staticmethod def read_rule(): df = pd.read_excel(os.path.join(basedir, 'rule', 'exclusive_classfication.xlsx')) newdf = pd.DataFrame() newdf['c1'] = df['c2'] newdf['c2'] = df['c1'] res = pd.concat([df, newdf]) return res.reset_index() @staticmethod def read_rule_exclusive_customer(): df = pd.read_excel(os.path.join(basedir, 'rule', 'exclusive_customer.xlsx')) newdf = pd.DataFrame() newdf['customer1'] = df['customer2'] newdf['customer2'] = df['customer1'] res = pd.concat([df, newdf]) return res.reset_index() def count_barcode_radio(self, data, maxt=''): df = pd.DataFrame(data) ratio_sites = dict() is_not_balance_list = [] if df.empty: return ratio_sites, is_not_balance_list s, e = 0, 16 if maxt == 'i7': s, e = 8, 16 if maxt == 'i5': s, e = 0, 8 num = e - s df['barcode'] = df['barcode'].str.slice(s, e) barcode_df = pd.DataFrame(df['barcode'].str.split('', expand=True).iloc[:, 1:-1].values, columns=['T' + str(x) for x in range(num)]).join(df['data_needed']) total = barcode_df['data_needed'].sum() for i in range(num): column = 'T' + str(i) col_df = barcode_df.groupby(column).agg({'data_needed': 'sum'}) # 去掉N计数 if 'N' in col_df.index: base_n_size = col_df.loc['N', 'data_needed'] col_df = col_df.drop('N') else: base_n_size = 0 col_df['ratio'] = (col_df['data_needed']) / (total - base_n_size) ratio = col_df['ratio'].to_dict() ratio_sites[i] = ratio A, B, C, D, E, F, G = list(), list(), list(), list(), list(), list(), list() for decbase in ['A', 'T', 'C', 'G']: if decbase not in ratio: ratio[decbase] = 0 if ratio[decbase] >= 0.6: A.append(decbase) if 0.2 <= ratio[decbase] < 0.6: B.append(decbase) if 0.15 <= ratio[decbase] < 0.2: C.append(decbase) if 0.1 <= ratio[decbase] < 0.15: D.append(decbase) if 0.08 <= ratio[decbase] < 0.1: E.append(decbase) if ratio[decbase] < 0.08: F.append(decbase) # 新增一个碱基可行规则 if 0.125 <= ratio[decbase] <= 0.625: G.append(decbase) A_num, B_num, C_num, D_num, E_num, F_num, G_num = len(A), len(B), len(C), len(D), len(E), len(F), len(G) if not ((B_num + C_num + D_num == 4) or (F_num == 1 and (A_num + B_num) == 3) or ( E_num == 1 and D_num == 1 and (A_num + B_num + C_num) == 2) or ( E_num == 1 and (A_num + B_num + C_num) == 3) or ( F_num == 1 and G_num == 3 and self.is_use_max)): is_not_balance_list.append( '第%s位置,算出结果为 %s' % (i, ratio) ) return ratio_sites, is_not_balance_list def dec_barcode_radio(self, chipname): data = self.index_assignments[chipname] ratio_sites, is_not_balance_list = self.count_barcode_radio(data) if is_not_balance_list: desc = '\n'.join(is_not_balance_list) self.return_log.append(f'芯片{chipname}有碱基不平衡:\n{desc}') print(f'芯片{chipname}有碱基不平衡:\n{desc}') def dec_lib_data_scatter(self, data): """ """ pass @staticmethod def level(row): today_date = datetime.now() if 'nextera' in row['classification'].lower(): return 1000 if '华大' in row['classification'] and 'NN' in row['i5']: return 1100 if '超加急' in str(row['priority']): return 1500 if row['拆分方式'] == '极致周期' or '极致' in row['拆分方式']: return 2000 if '加急' in str(row['priority']): return 3000 if '补测' in str(row['priority']): return 4000 mytime = row['time'] # 判断日期是之前的还是之后的 if mytime < today_date: return 5000 else: return 100000 def combinations_same_barcode(self): """ barcode 有重复的极致样本 进行排列组合,汇集成新的可能性 """ same_barcode_df = pd.DataFrame( [spdata for data in self.ori_lib_data if data['level'] == 1900 for spdata in data['data']]) # 按照 'barcode' 列进行分组 if same_barcode_df.empty: return grouped = same_barcode_df.groupby('barcode') # 获取具有重复的 'barcode' 分组 duplicate_groups = grouped.filter(lambda x: len(x) > 1) # 提取这些分组,计算文库重复次数 grouped_names = duplicate_groups.groupby('barcode')['#library'].apply(list).reset_index() random_list = list(set(tuple(sublst) for sublst in list(grouped_names['#library']))) new_lst = [spdata for data in random_list for spdata in data] counts = Counter(new_lst) correct_data = list() for data in self.ori_lib_data: if data['library'] in counts: data['level'] -= counts[data['library']] correct_data.append(data) self.ori_lib_data = correct_data def add_new_data(self, chipname, library_data, newer=True): """ 增加新数据到已知芯片上 :param chipname: :param library_data: :param newer: :return: """ self.index_assignments[chipname].extend(library_data['data']) self.chip_barcode_recode[chipname].update({item['barcode'] for item in library_data['data']}) self.chip_barcodei7_recode[chipname].update({item['i7'] for item in library_data['data']}) self.chip_barcodei5_recode[chipname].update({item['i5'] for item in library_data['data']}) # 华大的 文库 i7 不能重复,添加N+i7 if '华大' in library_data['classification']: self.chip_barcode_recode[chipname].update({'N' * 8 + item['i7'] for item in library_data['data']}) # self.chip_barcode_recode[chipname].update({item['i5'] + 'N' * 8 for item in library_data['data']}) # 子文库 self.chip_sublib[chipname].update({item['sublibrary'] for item in library_data['data']}) self.chip_customer[chipname].add(library_data['customer']) self.chip_classification[chipname].add(library_data['classification']) if newer: self.chip_size[chipname] = library_data['size'] self.chip_size_N[chipname] = 0 if 'N' in library_data['data'][0]['barcode']: # print(library_data['data'][0]['barcode']) self.chip_size_N[chipname] = library_data['size'] # if library_data['classification'] in ['扩增子', '不平衡文库', '单细胞文库以及甲基化']: if library_data['is_balance_lib'] == '否': self.chip_speciallib_size[chipname] = library_data['size'] elif library_data['is_balance_lib'] == '甲基化': self.chip_methylib_size[chipname] = library_data['size'] else: self.chip_speciallib_size[chipname] = 0 self.chip_methylib_size[chipname] = 0 if 'nextera' in library_data['classification'].lower(): self.chip_speciallib_nextera_size[chipname] = library_data['size'] else: self.chip_speciallib_nextera_size[chipname] = 0 if '华大' in library_data['classification']: self.chip_speciallib_huada_size[chipname] = library_data['size'] else: self.chip_speciallib_huada_size[chipname] = 0 else: self.chip_size[chipname] += library_data['size'] if library_data['is_balance_lib'] == '否': self.chip_speciallib_size[chipname] += library_data['size'] if library_data['is_balance_lib'] == '甲基化': self.chip_methylib_size[chipname] += library_data['size'] if 'nextera' in library_data['classification'].lower(): self.chip_speciallib_nextera_size[chipname] += library_data['size'] if '华大' in library_data['classification']: self.chip_speciallib_huada_size[chipname] += library_data['size'] if 'N' in library_data['data'][0]['barcode']: # print(library_data['data'][0]['barcode']) self.chip_size_N[chipname] += library_data['size'] def use_rule_exclusive_classfication(self, chipname, classfication): """ 文库不能排在一起 """ may_classfic = set(self.rule[self.rule['c1'] == classfication]['c2']) if self.chip_customer[chipname].intersection(may_classfic): return True return False def use_rule_exclusive_customer(self, chipname, customer): """文库不能排在一起""" may_classfic = set( self.rule_exclusive_customer[self.rule_exclusive_customer['customer1'] == customer]['customer2']) if self.chip_customer[chipname].intersection(may_classfic): return True return False def judge_data(self, chipname, library_data, max_barcode='all'): """ 约束条件 """ size = library_data['size'] size_N = 0 if 'N' in library_data['data'][0]['barcode']: size_N = library_data['size'] classification = library_data['classification'] customer = library_data['customer'] is_balance_lib = library_data['is_balance_lib'] # library = library_data['library'] # 芯片大小不能超过设定限制 sizelimit = True if self.chip_size[chipname] + size > self.data_limit: sizelimit = False # print(chipname, library, '芯片大小不能超过设定限制') # barcode有重复 notrepeatbarcode = True if self.chip_barcode_recode[chipname].intersection({item['barcode'] for item in library_data['data']}) or \ self.chip_barcode_recode[chipname].intersection( {'N' * 8 + item['i7'] for item in library_data['data']}) or \ self.chip_barcode_recode[chipname].intersection( {item['i5'] + 'N' * 8 for item in library_data['data']}): notrepeatbarcode = False # print(chipname, library, 'barcode有重复') # 互斥的文库 exclusive_classific = True if self.use_rule_exclusive_classfication(chipname, classification): exclusive_classific = False # print(chipname, library, '互斥的文库') # 互斥的用户 exclusive_customer = True if self.use_rule_exclusive_customer(chipname, customer): exclusive_customer = False # print(chipname, library, '互斥的用户') # 不平衡文库大于250G 不能添加 splibrary = True if is_balance_lib == '否' and self.chip_speciallib_size[chipname] + size > 250: splibrary = False # print(chipname, library, '不平衡文库大于250G') # 甲基化文库不能大于250G # 甲基化更改成100G spmethylibrary = True if is_balance_lib == '甲基化' and self.chip_methylib_size[chipname] + size > 100: spmethylibrary = False # print(chipname, library, '甲基化文库不能大于100G') # 不使用不平衡文库的判断 if not self.is_use_balance: splibrary = True spmethylibrary = True # 碱基不平衡不过不添加,保证前面的数据, 在数据达到1200G的时候开始 base_balance = True if self.chip_size[chipname] > 900: current_data = copy.deepcopy(self.index_assignments[chipname]) new_data = library_data['data'] current_data.extend(new_data) ratio_sites, is_not_balance_list = self.count_barcode_radio(current_data) if is_not_balance_list: base_balance = False # print(chipname, library, '碱基不平衡') # 含N端的数据量不超过 上面设定碱基不平衡的900G的一半 sizelimit_N = True if self.chip_size_N[chipname] + size_N > 450: sizelimit_N = False # 华大的文库不能超过限制的一半, 华大的数据就不能再加 use_huada = True if (self.chip_speciallib_huada_size[chipname] > self.data_limit / 2) and ('华大' in classification): use_huada = False # print(chipname, library, '华大的文库不能超过限制的一半') # 开启i5或者i7 if max_barcode != 'all': base_balance = True notrepeatbarcode = True if self.chip_barcodei7_recode[chipname].intersection( {item['i7'] for item in library_data['data']}) and max_barcode == 'i7': notrepeatbarcode = False if self.chip_barcodei5_recode[chipname].intersection( {item['i5'] for item in library_data['data']}) and max_barcode == 'i5': notrepeatbarcode = False # 是个N的取消 if ('N' * 8 in {item['i5'] for item in library_data['data']}) and max_barcode == 'i5': notrepeatbarcode = False if ('N' * 8 in {item['i7'] for item in library_data['data']}) and max_barcode == 'i7': notrepeatbarcode = False if self.chip_size[chipname] > 900: current_data = copy.deepcopy(self.index_assignments[chipname]) new_data = library_data['data'] current_data.extend(new_data) ratio_sites, is_not_balance_list = self.count_barcode_radio(current_data, maxt=max_barcode) if is_not_balance_list: base_balance = False # 子文库名不能重复 notrepeatsublib = True if self.chip_sublib[chipname].intersection({item['sublibrary'] for item in library_data['data']}): notrepeatsublib = False if sizelimit and notrepeatbarcode and \ exclusive_classific and \ exclusive_customer and \ splibrary and \ base_balance and \ spmethylibrary and \ use_huada and \ notrepeatsublib and \ sizelimit_N: return True return False def add_loc_num(self, chipname): """ 锚定芯片号增加 """ # 有nextera, 华大文库 必须满足大于50G 到了芯片结算 # chipname = f'chip{self.loc_chip_num}' nextera_size = self.chip_speciallib_nextera_size[chipname] huada_size = self.chip_speciallib_huada_size[chipname] flag = True if 0 < nextera_size < 50: # 有nextera文库,但是不满足50G 去除 nextary_barcode = set() no_nextary_data = list() for libdata in self.index_assignments[chipname]: if libdata['classification'].lower() != 'nextera': no_nextary_data.append(libdata) else: self.no_assign_data.append(libdata) nextary_barcode.update(libdata['barcode']) self.index_assignments[chipname] = no_nextary_data self.chip_barcode_recode[chipname] -= nextary_barcode self.chip_speciallib_nextera_size[chipname] = 0 self.chip_size[chipname] -= nextera_size flag = False if 0 < huada_size < 50: # 有华大文库,但是不满足50G 去除 huada_barcode = set() no_huada_data = list() for libdata in self.index_assignments[chipname]: if '华大' not in libdata['classification']: no_huada_data.append(libdata) else: self.no_assign_data.append(libdata) huada_barcode.update(libdata['barcode']) self.index_assignments[chipname] = no_huada_data self.chip_barcode_recode[chipname] -= huada_barcode self.chip_speciallib_huada_size[chipname] = 0 self.chip_size[chipname] -= huada_size flag = False if flag: self.loc_chip_num += 1 def assign_samples(self): if '未测' not in self.ori_data.keys(): raise UserWarning('提供excel没有 未测 sheet ,请核查!') ori_library_df = pd.DataFrame(self.ori_data['未测']) # 检查提供excel 是否有必须表头 get_col = set(ori_library_df.columns) unhave_col = set(self.need_cols) - get_col if unhave_col: unhave_from = '; '.join(unhave_col) raise UserWarning(f'未测表里没有 {unhave_from} 表头,请核查!') # 数据标准格式 numeric_mask = pd.to_numeric(ori_library_df['data_needed'], errors='coerce').notna() time_mask = pd.to_datetime(ori_library_df['time'], errors='coerce').notna() # 添加处理status列的逻辑 status_mask = ori_library_df['status'] == '暂不排样' # 非正常barcode barcode_mask = ori_library_df['barcode'].str.len() != 16 ori_library_df['note'] = '' ori_library_df.loc[~numeric_mask, 'note'] = 'data_needed 列非数字' ori_library_df.loc[~time_mask, 'note'] = 'time 列非日期' ori_library_df.loc[status_mask, 'note'] = '暂不排样' ori_library_df.loc[barcode_mask, 'note'] = '非16位barcode' no_ori_data = ori_library_df[~(numeric_mask & time_mask) | status_mask | barcode_mask] # 某个客户的检测的数据超过1个T就单独处理 # summary = ori_library_df.groupby('customer').agg({'data_needed': 'sum'}) # print(summary) self.no_assign_data.extend(no_ori_data.to_dict('records')) # 使用布尔索引筛选出不是数字和非日期的行,并且不是暂不排样的行, 以及非16位置barcode ori_library_df = ori_library_df[(numeric_mask & time_mask) & ~status_mask & ~barcode_mask] # 时间格式化 ori_library_df['time'] = pd.to_datetime(ori_library_df['time'], errors='coerce') ori_library_df['level'] = ori_library_df.apply(self.level, axis=1) # 极致客户有重复的,把等级调到1900,防止放到了最后,到了未测里 must_lib_df = ori_library_df[ori_library_df['level'] == 2000] must_lib = set(must_lib_df[must_lib_df.duplicated(subset='barcode', keep=False)]['#library'].to_list()) ori_library_df.loc[ori_library_df['#library'].isin(must_lib), 'level'] = 1900 for library, library_df in ori_library_df.groupby('#library'): size = library_df['data_needed'].sum() is_balance_lib = library_df['is_balance_lib'].values[0] # 文库内部有重复 if len(library_df['barcode'].values) > len(set(library_df['barcode'].values)): library_df['note'] = '文库内部有重复' self.no_assign_data.extend(library_df.to_dict('records')) continue # 不平衡文库 大于250G 的数据 先进行拆分 if is_balance_lib == '否' and size > 250: self.return_log.append(f'文库{library} 是不平衡文库, 数据为{size}, 大于250G, 已做拆分处理, 请注意!!! ') data_needed = library_df['data_needed'].copy() for num in range(int(size), 0, -200): addnum = 200 if num <= 200: addnum = num library_df['data_needed'] = (addnum / size) * data_needed self.ori_lib_data.append(dict( library=library, is_balance_lib=library_df['is_balance_lib'].values[0], size=library_df['data_needed'].sum(), split_method=library_df['拆分方式'].values[0], time=library_df['time'].values[0], level=1950, customer=library_df['customer'].values[0], classification=library_df['classification'].values[0], data=library_df[self.need_cols].to_dict('records') )) self.split_lib.add(library) continue # 拆分处理 分为了2个大文库 if size > self.data_limit / 2: library_df['data_needed'] = library_df['data_needed'] / 2 self.return_log.append(f'文库{library} 已做拆分处理, 请注意!!! ') self.ori_lib_data.append(dict( library=library, is_balance_lib=library_df['is_balance_lib'].values[0], size=library_df['data_needed'].sum(), split_method=library_df['拆分方式'].values[0], time=library_df['time'].values[0], level=library_df['level'].values[0], customer=library_df['customer'].values[0], classification=library_df['classification'].values[0], data=library_df[self.need_cols].to_dict('records') )) self.ori_lib_data.append(dict( library=library, is_balance_lib=library_df['is_balance_lib'].values[0], size=library_df['data_needed'].sum(), split_method=library_df['拆分方式'].values[0], time=library_df['time'].values[0], level=library_df['level'].values[0], customer=library_df['customer'].values[0], classification=library_df['classification'].values[0], data=library_df[self.need_cols].to_dict('records') )) self.combinations_same_barcode() self.ori_lib_data = sorted(self.ori_lib_data, key=lambda x: (x['level'], x['time'])) # self.ori_lib_data = sorted(self.ori_lib_data, key=lambda x: (x['level'] != 100000, -x['size'])) while self.ori_lib_data: library_data = self.ori_lib_data[0] chipname = f'chip{self.loc_chip_num}' # 空白芯片直接添加 if chipname not in self.index_assignments: self.add_new_data(chipname, library_data) self.ori_lib_data.remove(library_data) continue # 判断条件 if self.judge_data(chipname, library_data): self.add_new_data(chipname, library_data, newer=False) self.ori_lib_data.remove(library_data) else: for j in range(len(self.ori_lib_data)): newlibrary_data = self.ori_lib_data[j] if self.judge_data(chipname, newlibrary_data): self.ori_lib_data.remove(newlibrary_data) self.add_new_data(chipname, newlibrary_data, newer=False) break j += 1 else: self.add_loc_num(chipname) if self.chip_size[chipname] > self.data_limit: self.add_loc_num(chipname) def assign_again_size(self, max_barcode='all'): """ 剩余的数据 """ left_data = list() no_need_chipname = list() for chip_idx, chip_assignments in self.index_assignments.items(): if not chip_assignments: continue df = pd.DataFrame(chip_assignments) if df['data_needed'].sum() < self.data_lower: left_data.extend(chip_assignments) no_need_chipname.append(chip_idx) for chip_idx in no_need_chipname: del self.index_assignments[chip_idx] ori_library_df = pd.DataFrame(left_data) ori_library_df['level'] = ori_library_df.apply(self.level, axis=1) ori_lib_data = list() for library, library_df in ori_library_df.groupby('#library'): level = library_df['level'].values[0] if library in self.split_lib: level = 1950 ori_lib_data.append(dict( library=library, is_balance_lib=library_df['is_balance_lib'].values[0], size=library_df['data_needed'].sum(), split_method=library_df['拆分方式'].values[0], time=library_df['time'].values[0], level=level, customer=library_df['customer'].values[0], classification=library_df['classification'].values[0], data=library_df[self.need_cols].to_dict('records') )) ori_lib_data = sorted(ori_lib_data, key=lambda x: (x['level'], x['time'], -x['size'])) self.loc_chip_num = 100 while ori_lib_data: library_data = ori_lib_data[0] chipname = f'chip{self.loc_chip_num}_{max_barcode}' if max_barcode != 'all' else f'chip{self.loc_chip_num}' # 空白芯片直接添加 if chipname not in self.index_assignments: self.add_new_data(chipname, library_data) ori_lib_data.remove(library_data) continue # 判断条件 if self.judge_data(chipname, library_data, max_barcode=max_barcode): self.add_new_data(chipname, library_data, newer=False) ori_lib_data.remove(library_data) else: for j in range(len(ori_lib_data)): newlibrary_data = ori_lib_data[j] if self.judge_data(chipname, newlibrary_data, max_barcode=max_barcode): ori_lib_data.remove(newlibrary_data) self.add_new_data(chipname, newlibrary_data, newer=False) break j += 1 else: self.add_loc_num(chipname) if self.chip_size[chipname] > self.data_limit: self.add_loc_num(chipname) def run(self): # print('# 测试代码') # self.assign_samples() # self.assign_again() try: self.assign_samples() self.assign_again_size() # self.assign_again_size(max_barcode='i7') # self.assign_again_size(max_barcode='i5') except Exception as e: self.return_log.append(f'T7排样出错, 请联系!{e}') self.index_assignments = {} outputname = 'assignments_%s_%s' % (datetime.now().strftime("%m%d%H%M"), os.path.basename(self.path)) outputpath = os.path.join(self.output, 'result', outputname) writer = pd.ExcelWriter(outputpath) chip_loc = 1 librarynum = 0 for chip_idx, chip_assignments in self.index_assignments.items(): if not chip_assignments: continue df = pd.DataFrame(chip_assignments) df['time'] = df['time'].dt.strftime('%Y-%m-%d') if [method for method in df['拆分方式'].values if '极致' in method]: addname = 'X' else: addname = '' other_name = '' # if 'chipB' in chip_idx and df['barcode'].duplicated().any(): # other_name = '_i7' if df['data_needed'].sum() < (self.data_lower - 50) and not addname: df['note'] = f'排样数据量不足{self.data_lower - 50}G' self.no_assign_data.extend(df.to_dict('records')) continue if librarynum > self.librarynum: df['note'] = '排样管数超标' self.no_assign_data.extend(df.to_dict('records')) continue librarynum += len(set(df['#library'].values)) self.dec_barcode_radio(chip_idx) chipname = addname + chip_idx + other_name sum_list = list() for library, library_df in df.groupby('#library'): sum_list.append(dict( 二次拆分=library, 客户=library_df['customer'].values[0], 类型=library_df['classification'].values[0], 打折前=library_df['data_needed'].sum() )) df_sum = pd.DataFrame(sum_list) res_df = pd.concat([df, df_sum], axis=1) res_df.to_excel(writer, sheet_name=chipname, index=False) chip_loc += 1 no_assign_df = pd.DataFrame(self.no_assign_data) no_assign_df = no_assign_df.applymap(lambda x: format_date(x) if isinstance(x, pd.Timestamp) else x) no_assign_df_not_balance = ','.join(set([lib for lib in no_assign_df['#library'] if lib in self.split_lib])) if no_assign_df_not_balance: self.return_log.append(f'文库{no_assign_df_not_balance}有做不平衡文库拆分处理,并且没有排完,请核查!') if not no_assign_df.empty: no_assign_df = no_assign_df[self.need_cols] no_assign_df.to_excel(writer, sheet_name='未测', index=False) if self.return_log: pd.DataFrame(self.return_log).to_excel(writer, sheet_name='log', index=False) writer.close() return outputpath if __name__ == '__main__': start_time = time.time() filepath = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'example', 'input排样表.xlsx') output_file = '' layout = AutoLayout(filepath, output_file) layout.run() end_time = time.time() execution_time = end_time - start_time print(f"代码执行时间为:{execution_time} 秒")