diff --git a/tools/t7.py b/tools/t7.py index 57ae01a..f881807 100644 --- a/tools/t7.py +++ b/tools/t7.py @@ -38,6 +38,10 @@ class AutoLayout: self.rule = self.read_rule() # 甲基化文库不大于200,WGBS文库不大于200G self.chip_speciallib_size = dict() + + # Nextera 文库大小 + self.chip_speciallib_nextera_size = dict() + self.logger = log(os.path.basename(f'{path}.txt')) self.return_log = list() self.no_assign_data = list() @@ -50,7 +54,7 @@ class AutoLayout: merge = pd.read_excel(self.path, None) ori_data = dict() for name, sheet in merge.items(): - sheet.fillna('.', inplace=True) + sheet.fillna('', inplace=True) ori_data[name] = sheet.to_dict('records') return ori_data @@ -65,6 +69,9 @@ class AutoLayout: self.index_assignments[chipname].extend(library_data['data']) self.chip_barcode_recode[chipname].update({item['barcode'] for item in library_data['data']}) + self.chip_customer[chipname].add(library_data['customer']) + self.chip_classification[chipname].add(library_data['classification']) + if newer: self.chip_size[chipname] = library_data['size'] # if library_data['classification'] in ['扩增子', '不平衡文库', '单细胞文库以及甲基化']: @@ -72,13 +79,17 @@ class AutoLayout: self.chip_speciallib_size[chipname] = library_data['size'] else: self.chip_speciallib_size[chipname] = 0 + if 'Nextera' in library_data['classification'].lower(): + self.chip_speciallib_nextera_size[chipname] = library_data['size'] + else: + self.chip_speciallib_nextera_size[chipname] = 0 + else: self.chip_size[chipname] += library_data['size'] if library_data['is_balance_lib'] == '否': self.chip_speciallib_size[chipname] += library_data['size'] - - self.chip_customer[chipname].add(library_data['customer']) - self.chip_classification[chipname].add(library_data['classification']) + if 'Nextera' in library_data['classification'].lower(): + self.chip_speciallib_nextera_size[chipname] += library_data['size'] def count_barcode_radio(self, data): df = pd.DataFrame(data) @@ -146,10 +157,14 @@ class AutoLayout: # 将时间字符串转换为 datetime 对象 # mytime = datetime.strptime(row['time'], "%Y-%m-%d") # mytime = row['time'].strftime("%Y-%m-%d") - mytime = row['time'] + + if 'nextera' in row['classification'].lower(): + return 1 + if row['拆分方式'] == '极致周期' or '极致' in row['拆分方式']: return 2 + mytime = row['time'] # 判断日期是之前的还是之后的 if mytime < today_date: return 3 @@ -219,6 +234,25 @@ class AutoLayout: return True return False + def add_loc_num(self): + # 有nextera文库 必须满足大于50G + nextera_size = self.chip_speciallib_nextera_size[f'chip{self.loc_chip_num}'] + if nextera_size > 50 or nextera_size == 0: + self.loc_chip_num += 1 + else: + # 有nextera文库,但是不满足50G 去除 + nextary_barcode = set() + no_nextary_data = list() + for libdata in self.index_assignments[self.loc_chip_num]: + if libdata['classification'].lower() != 'nextera': + no_nextary_data.append(libdata) + + else: + self.no_assign_data.extend(libdata['data']) + nextary_barcode.update(set([nextera_data['barcode'] for nextera_data in libdata['data']])) + + self.chip_barcode_recode[self.loc_chip_num] -= nextary_barcode + def assign_samples(self): ori_library_data = list() @@ -226,8 +260,8 @@ class AutoLayout: raise UserWarning('提供excel没有 未测 sheet ,请核查!') ori_library_df = pd.DataFrame(self.ori_data['未测']) - need_col = ['#library', 'sublibrary', 'i5', 'i7', 'data_needed', 'real_data', 'customer', - 'classification', 'priority', 'time', '拆分方式', 'barcode', 'is_balance_lib' + need_col = ['status', '#library', 'sublibrary', 'i5', 'i7', 'data_needed', 'real_data', 'customer', + 'classification', 'priority', 'time', '拆分方式', 'barcode', 'is_balance_lib', '备注' ] get_col = set(ori_library_df.columns) unhave_col = set(need_col) - get_col @@ -239,17 +273,22 @@ class AutoLayout: numeric_mask = pd.to_numeric(ori_library_df['data_needed'], errors='coerce').notna() time_mask = pd.to_datetime(ori_library_df['time'], errors='coerce').notna() + # 添加处理status列的逻辑 + status_mask = ori_library_df['status'] == '暂不排样' + ori_library_df['note'] = '' ori_library_df.loc[~numeric_mask, 'note'] = 'data_needed 列非数字' ori_library_df.loc[~time_mask, 'note'] = 'time 列非日期' + ori_library_df.loc[status_mask, 'note'] = '暂不排样' need_col.append('note') - no_ori_data = ori_library_df[~(numeric_mask & time_mask)] + no_ori_data = ori_library_df[~(numeric_mask & time_mask) | status_mask] self.no_assign_data.extend(no_ori_data.to_dict('records')) # 使用布尔索引筛选出不是数字和非日期的行 - ori_library_df = ori_library_df[(numeric_mask & time_mask)] + ori_library_df = ori_library_df[(numeric_mask & time_mask) & ~status_mask] + ori_library_df['level'] = ori_library_df.apply(self.level, axis=1) # # 极致客户有重复的,把等级调到0,防止放到了最后,到了未测里 @@ -320,9 +359,11 @@ class AutoLayout: break j += 1 else: - self.loc_chip_num += 1 + # self.loc_chip_num += 1 + self.add_loc_num() if self.chip_size[chipname] > self.data_limit: - self.loc_chip_num += 1 + # self.loc_chip_num += 1 + self.add_loc_num() def assign_again(self): pass