import copy import os import time from collections import defaultdict from datetime import datetime import pandas as pd from tools.common import basedir, log # 定义一个格式化函数 def format_date(date): return date.strftime('%Y-%m-%d') class AutoLayout: """ 自动化派样 """ def __init__(self, path, librarynum, is_use_balance=1, is_use_max=0, output=basedir, data_limit=1750): self.path = path self.output = output self.librarynum = int(librarynum) self.data_limit = data_limit self.index_assignments = defaultdict(list) # 芯片数量量大小 self.chip_size = dict() # 芯片是否极致 self.chip_type = dict() # 芯片barcode self.chip_barcode_recode = defaultdict(set) # 芯片原始数据读取 self.ori_data = self.read_excel() # 当前锚芯片 self.loc_chip_num = 1 # 芯片客户 self.chip_customer = defaultdict(set) # 文库 self.chip_classification = defaultdict(set) self.rule = self.read_rule() # 不平衡文库 self.chip_speciallib_size = dict() # 甲基化文库 self.chip_methylib_size = dict() # Nextera 文库大小 self.chip_speciallib_nextera_size = dict() # 华大 文库 self.chip_speciallib_huada_size = dict() self.logger = log(os.path.basename(f'{path}.txt')) self.return_log = list() self.no_assign_data = list() self.ori_lib_data = list() self.need_cols = self.read_cols() self.is_use_balance = is_use_balance self.is_use_max = is_use_max def count_barcode_radio(self, data): df = pd.DataFrame(data) ratio_sites = dict() is_not_balance_list = [] if df.empty: return ratio_sites, is_not_balance_list df['barcode'] = df['barcode'].str.slice(0, 16) barcode_df = pd.DataFrame(df['barcode'].str.split('', expand=True).iloc[:, 1:-1].values, columns=['T' + str(x) for x in range(16)]).join(df['data_needed']) total = barcode_df['data_needed'].sum() for i in range(16): column = 'T' + str(i) col_df = barcode_df.groupby(column).agg({'data_needed': 'sum'}) # 去掉N计数 if 'N' in col_df.index: base_n_size = col_df.loc['N', 'data_needed'] col_df = col_df.drop('N') else: base_n_size = 0 col_df['ratio'] = (col_df['data_needed']) / (total - base_n_size) ratio = col_df['ratio'].to_dict() ratio_sites[i] = ratio A, B, C, D, E, F, G = list(), list(), list(), list(), list(), list(), list() for decbase in ['A', 'T', 'C', 'G']: if decbase not in ratio: ratio[decbase] = 0 if ratio[decbase] >= 0.6: A.append(decbase) if 0.2 <= ratio[decbase] < 0.6: B.append(decbase) if 0.15 <= ratio[decbase] < 0.2: C.append(decbase) if 0.1 <= ratio[decbase] < 0.15: D.append(decbase) if 0.08 <= ratio[decbase] < 0.1: E.append(decbase) if ratio[decbase] < 0.08: F.append(decbase) # 新增一个碱基可行规则 if 0.125 <= ratio[decbase] <= 0.625: G.append(decbase) A_num, B_num, C_num, D_num, E_num, F_num, G_num = len(A), len(B), len(C), len(D), len(E), len(F), len(G) if not ((B_num + C_num + D_num == 4) or (F_num == 1 and (A_num + B_num) == 3) or ( E_num == 1 and D_num == 1 and (A_num + B_num + C_num) == 2) or ( E_num == 1 and (A_num + B_num + C_num) == 3) or ( F_num == 1 and G_num == 3 and self.is_use_max)): is_not_balance_list.append( '第%s位置,算出结果为 %s' % (i, ratio) ) return ratio_sites, is_not_balance_list def dec_barcode_radio(self, chipname): data = self.index_assignments[chipname] ratio_sites, is_not_balance_list = self.count_barcode_radio(data) if is_not_balance_list: desc = '\n'.join(is_not_balance_list) self.return_log.append(f'芯片{chipname}有碱基不平衡:\n{desc}') print(f'芯片{chipname}有碱基不平衡:\n{desc}') @staticmethod def level(row): today_date = datetime.now() if 'nextera' in row['classification'].lower(): return 10 if '华大' in row['classification']: return 11 if row['拆分方式'] == '极致周期' or '极致' in row['拆分方式']: return 20 mytime = row['time'] # 判断日期是之前的还是之后的 if mytime < today_date: return 30 if '加急' in row['priority']: return 40 if '补测' in row['priority']: return 50 else: return 1000 @staticmethod def read_rule(): df = pd.read_excel(os.path.join(basedir, 'rule', 'exclusive_classfication.xlsx')) newdf = pd.DataFrame() newdf['c1'] = df['c2'] newdf['c2'] = df['c1'] res = pd.concat([df, newdf]) return res.reset_index() @staticmethod def read_cols(): df = pd.read_excel(os.path.join(basedir, 'rule', 'columns.xlsx')) cols = list(df['cols'].values) return cols def read_excel(self): """ 原始数据处理 :return: """ merge = pd.read_excel(self.path, None) ori_data = dict() for name, sheet in merge.items(): sheet.fillna('', inplace=True) ori_data[name] = sheet.to_dict('records') return ori_data def add_new_data(self, chipname, library_data, newer=True): """ 增加新数据到已知芯片上 :param chipname: :param library_data: :param newer: :return: """ self.index_assignments[chipname].extend(library_data['data']) self.chip_barcode_recode[chipname].update({item['barcode'] for item in library_data['data']}) self.chip_customer[chipname].add(library_data['customer']) self.chip_classification[chipname].add(library_data['classification']) if newer: self.chip_size[chipname] = library_data['size'] # if library_data['classification'] in ['扩增子', '不平衡文库', '单细胞文库以及甲基化']: if library_data['is_balance_lib'] == '否': self.chip_speciallib_size[chipname] = library_data['size'] elif library_data['is_balance_lib'] == '甲基化': self.chip_methylib_size[chipname] = library_data['size'] else: self.chip_speciallib_size[chipname] = 0 self.chip_methylib_size[chipname] = 0 if 'nextera' in library_data['classification'].lower(): self.chip_speciallib_nextera_size[chipname] = library_data['size'] else: self.chip_speciallib_nextera_size[chipname] = 0 if '华大' in library_data['classification']: self.chip_speciallib_huada_size[chipname] = library_data['size'] else: self.chip_speciallib_huada_size[chipname] = 0 else: self.chip_size[chipname] += library_data['size'] if library_data['is_balance_lib'] == '否': self.chip_speciallib_size[chipname] += library_data['size'] if library_data['is_balance_lib'] == '甲基化': self.chip_methylib_size[chipname] += library_data['size'] if 'nextera' in library_data['classification'].lower(): self.chip_speciallib_huada_size[chipname] += library_data['size'] if '华大' in library_data['classification']: self.chip_speciallib_huada_size[chipname] += library_data['size'] def use_rule(self, chipname, classfication): may_classfic = set(self.rule[self.rule['c1'] == classfication]['c2']) if self.chip_customer[chipname].intersection(may_classfic): return True return False def judge_data(self, chipname, library_data): """ 约束条件 """ size = library_data['size'] classification = library_data['classification'] is_balance_lib = library_data['is_balance_lib'] # 芯片大小不能超过设定限制 sizelimit = True if self.chip_size[chipname] + size > self.data_limit: sizelimit = False # barcode有重复 notrepeatbarcode = True if self.chip_barcode_recode[chipname].intersection({item['barcode'] for item in library_data['data']}): notrepeatbarcode = False # 互斥的文库 exclusive_classific = True if self.use_rule(chipname, classification): exclusive_classific = False # 不平衡文库大于250G 不能添加 splibrary = True if is_balance_lib == '否' and self.chip_speciallib_size[chipname] + size > 250: splibrary = False # 甲基化文库不能大于250G spmethylibrary = True if is_balance_lib == '甲基化' and self.chip_methylib_size[chipname] + size > 250: spmethylibrary = False # 不使用平衡文库 if not self.is_use_balance: splibrary = True spmethylibrary = True # 碱基不平衡不过不添加,保证前面的数据, 在数据达到1200G的时候开始 base_balance = True if self.chip_size[chipname] > 900: current_data = copy.deepcopy(self.index_assignments[chipname]) new_data = library_data['data'] current_data.extend(new_data) ratio_sites, is_not_balance_list = self.count_barcode_radio(current_data) if is_not_balance_list: base_balance = False if sizelimit and notrepeatbarcode and exclusive_classific and splibrary and base_balance and spmethylibrary: return True return False def add_loc_num(self): """ 锚定芯片号增加 """ # 有nextera, 华大文库 必须满足大于50G chipname = f'chip{self.loc_chip_num}' nextera_size = self.chip_speciallib_nextera_size[chipname] huada_size = self.chip_speciallib_huada_size[chipname] flag = True if 0 < nextera_size < 50: # 有nextera文库,但是不满足50G 去除 nextary_barcode = set() no_nextary_data = list() for libdata in self.index_assignments[chipname]: if libdata['classification'].lower() != 'nextera': no_nextary_data.append(libdata) else: self.no_assign_data.append(libdata) nextary_barcode.update(libdata['barcode']) self.index_assignments[chipname] = no_nextary_data self.chip_barcode_recode[chipname] -= nextary_barcode self.chip_speciallib_nextera_size[chipname] = 0 self.chip_size[chipname] -= nextera_size flag = False if 0 < huada_size < 50: # 有华大文库,但是不满足50G 去除 huada_barcode = set() no_huada_data = list() for libdata in self.index_assignments[chipname]: if libdata['classification'] != '华大': no_huada_data.append(libdata) else: self.no_assign_data.append(libdata) huada_barcode.update(libdata['barcode']) self.index_assignments[chipname] = no_huada_data self.chip_barcode_recode[chipname] -= huada_barcode self.chip_speciallib_huada_size[chipname] = 0 self.chip_size[chipname] -= huada_size flag = False if flag: self.loc_chip_num += 1 def assign_samples(self): ori_library_data = list() if '未测' not in self.ori_data.keys(): raise UserWarning('提供excel没有 未测 sheet ,请核查!') ori_library_df = pd.DataFrame(self.ori_data['未测']) # 检查提供excel 是否有必须表头 get_col = set(ori_library_df.columns) unhave_col = set(self.need_cols) - get_col if unhave_col: unhave_from = '; '.join(unhave_col) raise UserWarning(f'未测表里没有{unhave_from} 表头,请核查!') # 数据标准格式 numeric_mask = pd.to_numeric(ori_library_df['data_needed'], errors='coerce').notna() time_mask = pd.to_datetime(ori_library_df['time'], errors='coerce').notna() # 添加处理status列的逻辑 status_mask = ori_library_df['status'] == '暂不排样' # 非正常barcode barcode_mask = ori_library_df['barcode'].str.len() != 16 ori_library_df['note'] = '' ori_library_df.loc[~numeric_mask, 'note'] = 'data_needed 列非数字' ori_library_df.loc[~time_mask, 'note'] = 'time 列非日期' ori_library_df.loc[status_mask, 'note'] = '暂不排样' ori_library_df.loc[barcode_mask, 'note'] = '非16位barcode' no_ori_data = ori_library_df[~(numeric_mask & time_mask) | status_mask | barcode_mask] self.no_assign_data.extend(no_ori_data.to_dict('records')) # 使用布尔索引筛选出不是数字和非日期的行,并且不是暂不排样的行 ori_library_df = ori_library_df[(numeric_mask & time_mask) & ~status_mask & ~barcode_mask] # 某个客户的检测的数据超过1个T就单独处理 # summary = ori_library_df.groupby('customer').agg({'data_needed': 'sum'}) # print(summary) # 时间格式化 ori_library_df['time'] = pd.to_datetime(ori_library_df['time'], errors='coerce') ori_library_df['level'] = ori_library_df.apply(self.level, axis=1) # 极致客户有重复的,把等级调到0,防止放到了最后,到了未测里 ori_library_df.loc[ (ori_library_df.duplicated(subset='barcode')) & (ori_library_df['level'] == 20), 'level'] = 19 for library, library_df in ori_library_df.groupby('#library'): size = library_df['data_needed'].sum() # 文库内部有重复 if len(library_df['barcode'].values) > len(set(library_df['barcode'].values)): library_df['note'] = '文库内部有重复' self.no_assign_data.extend(library_df.to_dict('records')) continue # 拆分处理 flag = False if size > (self.data_limit) / 2: library_df['data_needed'] = library_df['data_needed'] / 2 flag = True ori_library_data.append(dict( library=library, is_balance_lib=library_df['is_balance_lib'].values[0], size=library_df['data_needed'].sum(), split_method=library_df['拆分方式'].values[0], time=library_df['time'].values[0], level=library_df['level'].values[0], customer=library_df['customer'].values[0], classification=library_df['classification'].values[0], data=library_df[self.need_cols].to_dict('records') )) # 拆分对半 if flag: self.return_log.append(f'文库{library} 已做拆分处理, 请注意!!! ') ori_library_data.append(dict( library=library, is_balance_lib=library_df['is_balance_lib'].values[0], size=library_df['data_needed'].sum(), split_method=library_df['拆分方式'].values[0], time=library_df['time'].values[0], level=library_df['level'].values[0], customer=library_df['customer'].values[0], classification=library_df['classification'].values[0], data=library_df[self.need_cols].to_dict('records') )) self.ori_lib_data = sorted(ori_library_data, key=lambda x: (x['level'], x['time'])) # self.ori_lib_data = ori_sort_data while self.ori_lib_data: library_data = self.ori_lib_data[0] chipname = f'chip{self.loc_chip_num}' # 空白芯片直接添加 if chipname not in self.index_assignments: self.add_new_data(chipname, library_data) self.ori_lib_data.remove(library_data) continue # 判断条件 if self.judge_data(chipname, library_data): self.add_new_data(chipname, library_data, newer=False) self.ori_lib_data.remove(library_data) else: for j in range(len(self.ori_lib_data)): newlibrary_data = self.ori_lib_data[j] if self.judge_data(chipname, newlibrary_data): self.ori_lib_data.remove(newlibrary_data) self.add_new_data(chipname, newlibrary_data, newer=False) break j += 1 else: self.add_loc_num() if self.chip_size[chipname] > self.data_limit: self.add_loc_num() def run(self): # self.assign_samples() try: self.assign_samples() except Exception as e: self.return_log.append(f'T7排样出错, 请联系!{e}') self.index_assignments = {} outputname = 'assignments_%s_%s' % (datetime.now().strftime("%m%d%H%M"), os.path.basename(self.path)) outputpath = os.path.join(self.output, 'result', outputname) writer = pd.ExcelWriter(outputpath) chip_loc = 1 librarynum = 0 for chip_idx, chip_assignments in self.index_assignments.items(): if not chip_assignments: continue df = pd.DataFrame(chip_assignments) df['time'] = df['time'].dt.strftime('%Y-%m-%d') if [method for method in df['拆分方式'].values if '极致' in method]: addname = 'X' else: addname = '' if df['data_needed'].sum() < 1600 and not addname: df['note'] = '排样数据量不足1600G' self.no_assign_data.extend(df.to_dict('records')) continue if librarynum > self.librarynum: df['note'] = '排样管数超标' self.no_assign_data.extend(df.to_dict('records')) continue librarynum += len(set(df['#library'].values)) self.dec_barcode_radio(chip_idx) chipname = addname + chip_idx sum_list = list() for library, library_df in df.groupby('#library'): sum_list.append(dict( 二次拆分=library, 客户=library_df['customer'].values[0], 类型=library_df['classification'].values[0], 打折前=library_df['data_needed'].sum() )) df_sum = pd.DataFrame(sum_list) res_df = pd.concat([df, df_sum], axis=1) res_df.to_excel(writer, sheet_name=chipname, index=False) chip_loc += 1 # self.no_assign_data.extend(self.diffic_assign_data) no_assign_df = pd.DataFrame(self.no_assign_data) no_assign_df = no_assign_df.applymap(lambda x: format_date(x) if isinstance(x, pd.Timestamp) else x) if not no_assign_df.empty: no_assign_df = no_assign_df[self.need_cols] no_assign_df.to_excel(writer, sheet_name='未测', index=False) if self.return_log: pd.DataFrame(self.return_log).to_excel(writer, sheet_name='log', index=False) writer.close() return outputpath if __name__ == '__main__': start_time = time.time() filepath = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'example', 'input排样表.xlsx') output_file = '' layout = AutoLayout(filepath, output_file) layout.run() end_time = time.time() execution_time = end_time - start_time print(f"代码执行时间为:{execution_time} 秒") # server()