import pandas as pd from collections import defaultdict from datetime import datetime import time import os from .common import basedir, log class AutoLayout: """ 自动化派样 """ def __init__(self, path, output=basedir, data_limit=1520): self.path = path self.output = output self.data_limit = data_limit self.index_assignments = defaultdict(list) # 芯片数量量大小 self.chip_size = dict() # 芯片是否极致 self.chip_type = dict() # 芯片barcode self.chip_barcode_recode = defaultdict(set) # 芯片原始数据读取 self.ori_data = self.read_excel() # 当前锚芯片 self.loc_chip_num = 1 # 芯片客户 self.chip_customer = defaultdict(set) self.rule = self.read_rule() # 甲基化文库不大于200,WGBS文库不大于200G self.chip_speciallib_size = dict() self.logger = log(os.path.basename(f'{path}.txt')) self.return_log = list() def read_excel(self): """ 原始数据处理 :return: """ merge = pd.read_excel(self.path, None) ori_data = dict() for name, sheet in merge.items(): sheet.fillna('.', inplace=True) ori_data[name] = sheet.to_dict('records') return ori_data def add_new_data(self, chipname, library_data, newer=True): """ 增加新数据到已知芯片上 :param chipname: :param library_data: :param newer: :return: """ self.index_assignments[chipname].extend(library_data['data']) self.chip_barcode_recode[chipname].update({item['barcode'] for item in library_data['data']}) if newer: self.chip_size[chipname] = library_data['size'] if library_data['classification'] in ['扩增子', '不平衡文库', '单细胞文库以及甲基化']: self.chip_speciallib_size[chipname] = library_data['size'] else: self.chip_speciallib_size[chipname] = 0 else: self.chip_size[chipname] += library_data['size'] if library_data['classification'] in ['扩增子', '不平衡文库', '单细胞文库', '甲基化']: self.chip_speciallib_size[chipname] += library_data['size'] self.chip_customer[chipname].add(library_data['customer']) def add_new_chip(self, library_data): """ 要新增到芯片上的数据 :param library_data: :return: """ chip_num_tmp = self.loc_chip_num while True: chip_num_tmp += 1 chipname_tmp = f'chip{chip_num_tmp}' library = library_data['library'] if chipname_tmp not in self.index_assignments: self.logger.error(f'{library} {chipname_tmp} 常规添加') self.add_new_data(chipname_tmp, library_data) break else: is_same_barcode = self.chip_barcode_recode[chipname_tmp].intersection( {item['barcode'] for item in library_data['data']}) # 没有从重复的index,并且也不互斥的 if ((self.chip_size[chipname_tmp] + library_data['size']) > self.data_limit): self.logger.error(f'{library} {chipname_tmp} 文库相加大于设定限制') if ((self.chip_speciallib_size[chipname_tmp] + library_data['size']) >= 200): self.logger.error(f'{library} {chipname_tmp} 不平衡文库相加大于设定限制') if is_same_barcode: self.logger.error(f'{library} {chipname_tmp} 文库有barcode重复') if self.use_rule(chipname_tmp, library_data['customer']): self.logger.error(f'{library} {chipname_tmp} 有互斥单位') if ((self.chip_size[chipname_tmp] + library_data['size']) <= self.data_limit) \ and ((self.chip_speciallib_size[chipname_tmp] + library_data['size']) < 200) \ and (not is_same_barcode) \ and (not self.use_rule(chipname_tmp, library_data['customer'])): self.add_new_data(chipname_tmp, library_data, newer=False) break def dec_barcode_radio(self, chipname): data = self.index_assignments[chipname] df = pd.DataFrame(data) barcode_df = pd.DataFrame(df['barcode'].str.split('', expand=True).iloc[:, 1:-1].values, columns=['T' + str(x) for x in range(16)]).join(df['data_needed']) total = barcode_df['data_needed'].sum() is_not_balance_list = [] for i in range(16): column = 'T' + str(i) col_df = barcode_df.groupby(column).agg({'data_needed': 'sum'}) # 去掉N计数 if 'N' in col_df.index: base_N_size = col_df.loc['N', 'data_needed'] col_df = col_df.drop('N') else: base_N_size = 0 col_df['ratio'] = (col_df['data_needed']) / (total - base_N_size) is_need_base = col_df.index[col_df['ratio'] < 0.088] need_base_list = list(is_need_base) ratio = col_df['ratio'].to_dict() for decbase in ['A', 'T', 'C', 'G']: if decbase not in ratio: ratio[decbase] = 0 need_base_list.append(decbase) # 小于标准的base 是不是空的,空的说明都满足 if need_base_list: is_not_balance_list.append( '%s 第%s位置, %s 有碱基不平衡,算出结果为 %s' % (chipname, i, need_base_list, ratio) ) if len(is_not_balance_list) > 2: self.return_log.append('有碱基不平衡性!') self.return_log.extend(is_not_balance_list) print('有碱基不平衡性!\n', '\n'.join(is_not_balance_list)) @staticmethod def level(row): if row['customer'] == '贞固': return 1 if row['split_method'] == '极致周期': return 2 # 医沐、清港泉、WES(自己建库)也是极致周期, if row['customer'] == '医沐' or row['customer'] == '清港泉': return 3 # 赛福、桐树基因的文库尽量跟极致周期测人的样本排一起上机 if row['customer'] == '赛福' or row['customer'] == '桐树基因': return 7 if row['classification'] == 'Nextera': return 5 if '华大' in row['classification']: return 6 else: return 100 @staticmethod def read_rule(): df = pd.read_excel(os.path.join(basedir, 'rule', 'exclusive.xlsx')) newdf = pd.DataFrame() newdf['customer1'] = df['customer2'] newdf['customer1'] = df['customer1'] return pd.concat([df, newdf]) def use_rule(self, chipname, customer): may_customer = set(self.rule[self.rule['customer1'] == customer]['customer2']) if self.chip_customer[chipname].intersection(may_customer): return True return False def judge_data(self, chipname, library_data): size = library_data['size'] customer = library_data['customer'] library = library_data['library'] # 芯片大小不能超过设定限制 sizelimit = True if self.chip_size[chipname] + size > self.data_limit: sizelimit = False self.logger.error(f'{library} {chipname} 文库相加大于设定限制') # barcode有重复 notrepeatbarcode = True if self.chip_barcode_recode[chipname].intersection({item['barcode'] for item in library_data['data']}): notrepeatbarcode = False self.logger.error(f'{library} {chipname} 文库有barcode重复') # 互斥的客户 exclusivecostom = True if self.use_rule(chipname, customer): exclusivecostom = False self.logger.error(f'{library} {chipname} 有互斥单位') # 不平衡文库大于200G 不能添加 splibrary = True if library_data['classification'] in ['扩增子', '不平衡文库', '单细胞文库', '甲基化'] \ and self.chip_speciallib_size[chipname] + size > 200: splibrary = False self.logger.error(f'{library} {chipname} 不平衡文库相加大于设定限制') if sizelimit and notrepeatbarcode and exclusivecostom and splibrary: return True return False def assign_samples(self): ori_library_data = list() ori_library_df = pd.DataFrame(self.ori_data['未测']) ori_library_df['level'] = ori_library_df.apply(self.level, axis=1) for library, library_df in ori_library_df.groupby('#library'): ori_library_data.append(dict( library=library, size=library_df['data_needed'].sum(), split_method=library_df['split_method'].values[0], time=library_df['time'].values[0], level=library_df['level'].values[0], customer=library_df['customer'].values[0], classification=library_df['classification'].values[0], data=library_df.to_dict('records') )) ori_sort_data = sorted(ori_library_data, key=lambda x: (x['level'], -x['size'], x['time'])) i = 0 while ori_sort_data: library_data = ori_sort_data[0] chipname = f'chip{self.loc_chip_num}' # 空白芯片直接添加 if chipname not in self.index_assignments: self.add_new_data(chipname, library_data) ori_sort_data.remove(library_data) i += 1 continue # 判断条件 if self.judge_data(chipname, library_data): self.add_new_data(chipname, library_data, newer=False) ori_sort_data.remove(library_data) i += 1 else: for j in range(len(ori_sort_data)): newlibrary_data = ori_sort_data[j] if self.judge_data(chipname, newlibrary_data): ori_sort_data.remove(newlibrary_data) i += 1 self.add_new_data(chipname, newlibrary_data, newer=False) break j += 1 else: self.loc_chip_num += 1 if self.chip_size[chipname] > 1500: self.loc_chip_num += 1 def assign_again(self): pass def run(self): try: self.assign_samples() except Exception as e: self.return_log.append(f'T7排样出错, 请联系!{e}') self.index_assignments = {} outputname = 'assignments_%s_%s' % (datetime.now().strftime("%m%d%H%M"), os.path.basename(self.path)) outputpath = os.path.join(self.output, 'result', outputname) writer = pd.ExcelWriter(outputpath) no_assign_data = list() for chip_idx, chip_assignments in self.index_assignments.items(): self.dec_barcode_radio(chip_idx) df = pd.DataFrame(chip_assignments) if df['data_needed'].sum() < 1400: no_assign_data.extend(chip_assignments) continue if '极致周期' in df['split_method'].values: addname = 'X' else: addname = '' df.to_excel(writer, sheet_name=addname + chip_idx, index=False) pd.DataFrame(no_assign_data).to_excel(writer, sheet_name='未测', index=False) if self.return_log: pd.DataFrame(self.return_log).to_excel(writer, sheet_name='log', index=False) writer.close() return outputpath if __name__ == '__main__': start_time = time.time() excel_file = 'example/07031754_20230703.xlsx' output_file = '' layout = AutoLayout(excel_file, output_file) layout.run() end_time = time.time() execution_time = end_time - start_time print(f"代码执行时间为:{execution_time} 秒") # server()