import os import socket import pandas as pd from collections import defaultdict from datetime import datetime import time import logging import os from tools.common import basedir, log class AutoLayout: """ 自动化派样 """ def __init__(self, path, output=basedir, data_limit=1600): self.path = path self.output = output self.data_limit = data_limit self.index_assignments = defaultdict(list) # 芯片数量量大小 self.chip_size = dict() # 芯片是否极致 self.chip_type = dict() # 芯片barcode self.chip_barcode_recode = defaultdict(set) # 芯片原始数据读取 self.ori_data = self.read_excel() # 当前锚芯片 self.loc_chip_num = 1 # 芯片 文库计数 self.chip_lib_type = defaultdict(dict) self.logger = log(os.path.basename(f'{path}.txt')) self.return_log = list() def read_excel(self): """ 原始数据处理 :return: """ merge = pd.read_excel(self.path, None) ori_data = dict() for name, sheet in merge.items(): sheet.fillna('.', inplace=True) ori_data[name] = sheet.to_dict('records') return ori_data def add_new_data(self, chipname, library_data, newer=True): """ 增加新数据到已知芯片上 :param chipname: :param library_data: :param newer: :return: """ self.index_assignments[chipname].extend(library_data['data']) self.chip_barcode_recode[chipname].update({item['barcode'] for item in library_data['data']}) if newer: self.chip_size[chipname] = library_data['size'] else: self.chip_size[chipname] += library_data['size'] if library_data['lib_type'] in self.chip_lib_type[chipname]: self.chip_lib_type[chipname][library_data['lib_type']] += library_data['size'] else: self.chip_lib_type[chipname][library_data['lib_type']] = library_data['size'] def dec_barcode_radio(self, chipname): data = self.index_assignments[chipname] df = pd.DataFrame(data) barcode_df = pd.DataFrame(df['barcode'].str.split('', expand=True).iloc[:, 1:-1].values, columns=['T' + str(x) for x in range(16)]).join(df['data_needed']) total = barcode_df['data_needed'].sum() is_not_balance_list = [] for i in range(16): column = 'T' + str(i) col_df = barcode_df.groupby(column).agg({'data_needed': 'sum'}) # 去掉N计数 if 'N' in col_df.index: base_N_size = col_df.loc['N', 'data_needed'] col_df = col_df.drop('N') else: base_N_size = 0 col_df['ratio'] = (col_df['data_needed']) / (total - base_N_size) need_base_list = list() ratio = col_df['ratio'].to_dict() for decbase in ['A', 'T', 'C']: if decbase not in ratio: ratio[decbase] = 0 need_base_list.append(decbase) continue if ratio[decbase] < 0.1: need_base_list.append(decbase) # 小于标准的base 是不是空的,空的说明都满足 if len(need_base_list) > 2: is_not_balance_list.append( '[%s] 第%s位置, %s 有碱基不平衡,算出结果为 %s' % (chipname, i, need_base_list, ratio) ) # 对于G不能超过10% if 'G' not in ratio: ratio['G'] = 0 if ratio['G'] > 0.7: is_not_balance_list.append( '[%s] 第%s位置, G 含量超过70%%,算出结果为 %s' % (chipname, i, ratio['G']) ) if is_not_balance_list: self.return_log.extend(is_not_balance_list) print('有碱基不平衡性!\n', '\n'.join(is_not_balance_list)) @staticmethod def read_rule(): df = pd.read_excel(os.path.join(basedir, 'rule', 'lib_type_limit.xlsx')) return df.to_dict('index') @staticmethod def level(row): if row['customer'] == '百奥益康' and '3\'' in row['lib_type']: return 1 elif row['customer'] == '百奥益康' and '5\'' in row['lib_type']: return 2 else: return 100 def judge_data(self, chipname, library_data): size = library_data['size'] library = library_data['library'] # 芯片大小不能超过设定限制 sizelimit = True if self.chip_size[chipname] + size > self.data_limit: sizelimit = False self.logger.error(f'{library} {chipname} 文库相加大于设定限制') # barcode有重复 notrepeatbarcode = True if self.chip_barcode_recode[chipname].intersection({item['barcode'] for item in library_data['data']}): notrepeatbarcode = False self.logger.error(f'{library} {chipname} 文库有barcode重复') # 特定文库不能超过限制 sp_lib1 = True for _, myrule in self.read_rule().items(): lib_type = myrule['lib_type'] limit = myrule['limit'] if lib_type in self.chip_lib_type[chipname]: if self.chip_lib_type[chipname][lib_type] + size > self.data_limit * limit: sp_lib1 = False self.logger.error(f'{library} {chipname} 文库有大于设定限制') break if sizelimit and notrepeatbarcode and sp_lib1: return True return False def assign_samples(self): ori_library_data = list() ori_library_df = pd.DataFrame(self.ori_data['未测']) ori_library_df['level'] = ori_library_df.apply(self.level, axis=1) for library, library_df in ori_library_df.groupby('#library'): ori_library_data.append(dict( library=library, size=library_df['data_needed'].sum(), time=library_df['time'].values[0], customer=library_df['customer'].values[0], level=library_df['level'].values[0], status=library_df['status'].values[0], lib_type=library_df['lib_type'].values[0], data=library_df.to_dict('records') )) ori_sort_data = sorted(ori_library_data, key=lambda x: (x['level'], x['customer'], -x['size'], x['time'])) while ori_sort_data: library_data = ori_sort_data[0] chipname = f'lane{self.loc_chip_num}' # 空白芯片直接添加 if chipname not in self.index_assignments: self.add_new_data(chipname, library_data) ori_sort_data.remove(library_data) continue # 判断条件 if self.judge_data(chipname, library_data): self.add_new_data(chipname, library_data, newer=False) ori_sort_data.remove(library_data) else: for j in range(len(ori_sort_data)): newlibrary_data = ori_sort_data[j] if self.judge_data(chipname, newlibrary_data): ori_sort_data.remove(newlibrary_data) self.add_new_data(chipname, newlibrary_data, newer=False) break j += 1 else: # 代表接下来的数据放到这个chip当中都不行,只有换chip了 self.loc_chip_num += 1 # 加完之后下面的数据可能加上去就慢了就换chip if self.chip_size[chipname] > self.data_limit * 0.99: self.loc_chip_num += 1 def run(self): try: self.assign_samples() except Exception as e: self.return_log.append(f'nova_xplus排样出错, 请联系!{e}') self.index_assignments = {} outputname = 'assignments_%s_%s' % (datetime.now().strftime("%m%d%H%M"), os.path.basename(self.path)) outputpath = os.path.join(self.output, 'result', outputname) writer = pd.ExcelWriter(outputpath) no_assign_data = list() no_assign_chip = list() for chip_idx, chip_assignments in self.index_assignments.items(): self.dec_barcode_radio(chip_idx) df = pd.DataFrame(chip_assignments) if df['data_needed'].sum() < self.data_limit * 0.8: no_assign_chip.append(chip_idx) no_assign_data.extend(chip_assignments) continue df.to_excel(writer, sheet_name=chip_idx, index=False) pd.DataFrame(no_assign_data).to_excel(writer, sheet_name='未测', index=False) if self.return_log: log_res = [splog for splog in self.return_log if not any(f'[{chip}]' in str(splog) for chip in no_assign_chip)] pd.DataFrame(log_res).to_excel(writer, sheet_name='log', index=False) writer.close() return outputpath if __name__ == '__main__': start_time = time.time() excel_file = '../example/t1(1).xlsx' output_file = '' layout = AutoLayout(excel_file, output_file, data_limit=800) layout.run() end_time = time.time() execution_time = end_time - start_time print(f"代码执行时间为:{execution_time} 秒")