import os import time from collections import defaultdict from datetime import datetime import pandas as pd from tools.common import basedir, log class AutoLayout: """ 自动化派样 """ def __init__(self, path, chipnum, output=basedir, data_limit=1800): self.path = path self.output = output self.chipnum = int(chipnum) self.data_limit = data_limit self.index_assignments = defaultdict(list) # 芯片数量量大小 self.chip_size = dict() # 芯片是否极致 self.chip_type = dict() # 芯片barcode self.chip_barcode_recode = defaultdict(set) # 芯片原始数据读取 self.ori_data = self.read_excel() # 当前锚芯片 self.loc_chip_num = 1 # 芯片客户 self.chip_customer = defaultdict(set) # 文库 self.chip_classification = defaultdict(set) self.rule = self.read_rule() # 甲基化文库不大于200,WGBS文库不大于200G self.chip_speciallib_size = dict() self.logger = log(os.path.basename(f'{path}.txt')) self.return_log = list() self.no_assign_data = list() def read_excel(self): """ 原始数据处理 :return: """ merge = pd.read_excel(self.path, None) ori_data = dict() for name, sheet in merge.items(): sheet.fillna('.', inplace=True) ori_data[name] = sheet.to_dict('records') return ori_data def add_new_data(self, chipname, library_data, newer=True): """ 增加新数据到已知芯片上 :param chipname: :param library_data: :param newer: :return: """ self.index_assignments[chipname].extend(library_data['data']) self.chip_barcode_recode[chipname].update({item['barcode'] for item in library_data['data']}) if newer: self.chip_size[chipname] = library_data['size'] if library_data['classification'] in ['扩增子', '不平衡文库', '单细胞文库以及甲基化']: self.chip_speciallib_size[chipname] = library_data['size'] else: self.chip_speciallib_size[chipname] = 0 else: self.chip_size[chipname] += library_data['size'] if library_data['classification'] in ['扩增子', '不平衡文库', '单细胞文库', '甲基化']: self.chip_speciallib_size[chipname] += library_data['size'] self.chip_customer[chipname].add(library_data['customer']) self.chip_classification[chipname].add(library_data['classification']) # def add_new_chip(self, library_data): # """ # 要新增到芯片上的数据 # :param library_data: # :return: # """ # chip_num_tmp = self.loc_chip_num # while True: # chip_num_tmp += 1 # chipname_tmp = f'chip{chip_num_tmp}' # library = library_data['library'] # if chipname_tmp not in self.index_assignments: # self.logger.error(f'{library} {chipname_tmp} 常规添加') # self.add_new_data(chipname_tmp, library_data) # break # else: # is_same_barcode = self.chip_barcode_recode[chipname_tmp].intersection( # {item['barcode'] for item in library_data['data']}) # # 没有从重复的index,并且也不互斥的 # if ((self.chip_size[chipname_tmp] + library_data['size']) > self.data_limit): # self.logger.error(f'{library} {chipname_tmp} 文库相加大于设定限制') # if ((self.chip_speciallib_size[chipname_tmp] + library_data['size']) >= 200): # self.logger.error(f'{library} {chipname_tmp} 不平衡文库相加大于设定限制') # if is_same_barcode: # self.logger.error(f'{library} {chipname_tmp} 文库有barcode重复') # if self.use_rule(chipname_tmp, library_data['customer']): # self.logger.error(f'{library} {chipname_tmp} 有互斥单位') # if ((self.chip_size[chipname_tmp] + library_data['size']) <= self.data_limit) \ # and ((self.chip_speciallib_size[chipname_tmp] + library_data['size']) < 200) \ # and (not is_same_barcode) \ # and (not self.use_rule(chipname_tmp, library_data['customer'])): # self.add_new_data(chipname_tmp, library_data, newer=False) # break def dec_barcode_radio(self, chipname): data = self.index_assignments[chipname] df = pd.DataFrame(data) df['barcode'] = df['barcode'].str.slice(0, 16) barcode_df = pd.DataFrame(df['barcode'].str.split('', expand=True).iloc[:, 1:-1].values, columns=['T' + str(x) for x in range(16)]).join(df['data_needed']) total = barcode_df['data_needed'].sum() is_not_balance_list = [] for i in range(16): column = 'T' + str(i) col_df = barcode_df.groupby(column).agg({'data_needed': 'sum'}) # 去掉N计数 if 'N' in col_df.index: base_N_size = col_df.loc['N', 'data_needed'] col_df = col_df.drop('N') else: base_N_size = 0 col_df['ratio'] = (col_df['data_needed']) / (total - base_N_size) # is_need_base = col_df.index[col_df['ratio'] < 0.088] A, B, C, D = list(), list(), list(), list(), ratio = col_df['ratio'].to_dict() for decbase in ['A', 'T', 'C', 'G']: if decbase not in ratio: ratio[decbase] = 0 if ratio[decbase] >= 0.6: A.append(decbase) if 0.2 <= ratio[decbase] < 0.6: B.append(decbase) if 0.08 <= ratio[decbase] < 0.2: C.append(decbase) if ratio[decbase] <= 0.8: D.append(decbase) if not ((len(B) + len(C) == 4) or (len(D) == 1 and len(C) == 3)): is_not_balance_list.append( '%s 第%s位置,有碱基不平衡,算出结果为 %s' % (chipname, i, ratio) ) if len(is_not_balance_list): self.return_log.append('有碱基不平衡性!') self.return_log.extend(is_not_balance_list) print('有碱基不平衡性!\n', '\n'.join(is_not_balance_list)) @staticmethod def level(row): today_date = datetime.now() # 将时间字符串转换为 datetime 对象 # mytime = datetime.strptime(row['time'], "%Y-%m-%d") # mytime = row['time'].strftime("%Y-%m-%d") mytime = row['time'] if row['拆分方式'] == '极致周期' or '极致' in row['拆分方式']: return 2 # 判断日期是之前的还是之后的 if mytime < today_date: return 3 if '加急' in row['priority']: return 4 if '补测' in row['priority']: return 5 else: return 100 @staticmethod def read_rule(): df = pd.read_excel(os.path.join(basedir, 'rule', 'exclusive_classfication.xlsx')) newdf = pd.DataFrame() newdf['c1'] = df['c2'] newdf['c2'] = df['c1'] res = pd.concat([df, newdf]) return res.reset_index() def use_rule(self, chipname, classfication): may_classfic = set(self.rule[self.rule['c1'] == classfication]['c2']) if self.chip_customer[chipname].intersection(may_classfic): return True return False def judge_data(self, chipname, library_data): size = library_data['size'] # customer = library_data['customer'] library = library_data['library'] classification = library_data['classification'] # 芯片大小不能超过设定限制 sizelimit = True if self.chip_size[chipname] + size > self.data_limit: sizelimit = False self.logger.error(f'{library} {chipname} 文库相加大于设定限制') # barcode有重复 notrepeatbarcode = True if self.chip_barcode_recode[chipname].intersection({item['barcode'] for item in library_data['data']}): notrepeatbarcode = False self.logger.error(f'{library} {chipname} 文库有barcode重复') # # 互斥的客户 # exclusivecostom = True # if self.use_rule(chipname, customer): # exclusivecostom = False # self.logger.error(f'{library} {chipname} 有互斥单位') # 互斥的文库 exclusive_classific = True if self.use_rule(chipname, classification): exclusive_classific = False self.logger.error(f'{library} {chipname} 有互斥单位') # 不平衡文库大于200G 不能添加 splibrary = True if classification in ['扩增子', '不平衡文库', '单细胞文库', '甲基化'] \ and self.chip_speciallib_size[chipname] + size > 250: splibrary = False self.logger.error(f'{library} {chipname} 不平衡文库相加大于设定限制') if sizelimit and notrepeatbarcode and exclusive_classific and splibrary: return True return False def assign_samples(self): ori_library_data = list() if '未测' not in self.ori_data.keys(): raise UserWarning('提供excel没有 未测 sheet ,请核查!') ori_library_df = pd.DataFrame(self.ori_data['未测']) need_col = ['#library', 'sublibrary', 'i5', 'i7', 'data_needed', 'real_data', 'customer', 'classification', 'priority', 'time', '拆分方式', 'barcode' ] get_col = set(ori_library_df.columns) unhave_col = set(need_col) - get_col if unhave_col: unhave_fom = '; '.join(unhave_col) raise UserWarning(f'未测表里没有{unhave_fom} 表头,请核查!') numeric_mask = pd.to_numeric(ori_library_df['data_needed'], errors='coerce').notna() time_mask = pd.to_datetime(ori_library_df['time'], errors='coerce').notna() ori_library_df['note'] = '' ori_library_df.loc[~numeric_mask, 'note'] = 'data_needed 列非数字' ori_library_df.loc[~time_mask, 'note'] = 'time 列非日期' need_col.append('note') no_ori_data = ori_library_df[~(numeric_mask & time_mask)] self.no_assign_data.extend(no_ori_data.to_dict('records')) # 使用布尔索引筛选出不是数字和非日期的行 ori_library_df = ori_library_df[(numeric_mask & time_mask)] ori_library_df['level'] = ori_library_df.apply(self.level, axis=1) for library, library_df in ori_library_df.groupby('#library'): ori_library_data.append(dict( library=library, size=library_df['data_needed'].sum(), split_method=library_df['拆分方式'].values[0], time=library_df['time'].values[0], level=library_df['level'].values[0], customer=library_df['customer'].values[0], classification=library_df['classification'].values[0], data=library_df[need_col].to_dict('records') )) ori_sort_data = sorted(ori_library_data, key=lambda x: (x['level'], x['time'])) i = 0 while ori_sort_data: library_data = ori_sort_data[0] chipname = f'chip{self.loc_chip_num}' # 空白芯片直接添加 if chipname not in self.index_assignments: self.add_new_data(chipname, library_data) ori_sort_data.remove(library_data) i += 1 continue # 判断条件 if self.judge_data(chipname, library_data): self.add_new_data(chipname, library_data, newer=False) ori_sort_data.remove(library_data) i += 1 else: for j in range(len(ori_sort_data)): newlibrary_data = ori_sort_data[j] if self.judge_data(chipname, newlibrary_data): ori_sort_data.remove(newlibrary_data) i += 1 self.add_new_data(chipname, newlibrary_data, newer=False) break j += 1 else: self.loc_chip_num += 1 if self.chip_size[chipname] > self.data_limit: self.loc_chip_num += 1 def assign_again(self): pass def run(self): try: self.assign_samples() except Exception as e: self.return_log.append(f'T7排样出错, 请联系!{e}') self.index_assignments = {} outputname = 'assignments_%s_%s' % (datetime.now().strftime("%m%d%H%M"), os.path.basename(self.path)) outputpath = os.path.join(self.output, 'result', outputname) writer = pd.ExcelWriter(outputpath) chip_loc = 1 for chip_idx, chip_assignments in self.index_assignments.items(): df = pd.DataFrame(chip_assignments) df['time'] = df['time'].dt.strftime('%Y-%m-%d') if df['data_needed'].sum() < 1500 or chip_loc > self.chipnum: self.no_assign_data.extend(chip_assignments) continue if [method for method in df['拆分方式'].values if '极致' in method]: addname = 'X' else: addname = '' self.dec_barcode_radio(chip_idx) df.to_excel(writer, sheet_name=addname + chip_idx, index=False) chip_loc += 1 no_assign_df = pd.DataFrame(self.no_assign_data) # no_assign_df['time'] = no_assign_df['time'].dt.strftime('%Y-%m-%d') no_assign_df.to_excel(writer, sheet_name='未测', index=False) if self.return_log: pd.DataFrame(self.return_log).to_excel(writer, sheet_name='log', index=False) writer.close() return outputpath if __name__ == '__main__': start_time = time.time() filepath = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'example', 'input排样表.xlsx') # excel_file = 'example/input排样表.xlsx' output_file = '' layout = AutoLayout(filepath, output_file) layout.run() end_time = time.time() execution_time = end_time - start_time print(f"代码执行时间为:{execution_time} 秒") # server()