diff --git a/rule/columns.xlsx b/rule/columns.xlsx new file mode 100644 index 0000000..acc4da2 Binary files /dev/null and b/rule/columns.xlsx differ diff --git a/tools/t7.py b/tools/t7.py index 0b0dd6b..bc199e4 100644 --- a/tools/t7.py +++ b/tools/t7.py @@ -5,16 +5,163 @@ from collections import defaultdict from datetime import datetime import pandas as pd +from deap import base, creator, tools, algorithms from tools.common import basedir, log # 定义一个格式化函数 def format_date(date): - if isinstance(date, pd.Timestamp): - return date.strftime('%Y-%m-%d') - else: - return str(date) + return date.strftime('%Y-%m-%d') + + +def count_barcode_radio(data): + df = pd.DataFrame(data) + ratio_sites = dict() + is_not_balance_list = [] + if df.empty: + return ratio_sites, is_not_balance_list + + df['barcode'] = df['barcode'].str.slice(0, 16) + barcode_df = pd.DataFrame(df['barcode'].str.split('', expand=True).iloc[:, 1:-1].values, + columns=['T' + str(x) for x in range(16)]).join(df['data_needed']) + total = barcode_df['data_needed'].sum() + + for i in range(16): + column = 'T' + str(i) + col_df = barcode_df.groupby(column).agg({'data_needed': 'sum'}) + # 去掉N计数 + if 'N' in col_df.index: + base_n_size = col_df.loc['N', 'data_needed'] + col_df = col_df.drop('N') + else: + base_n_size = 0 + col_df['ratio'] = (col_df['data_needed']) / (total - base_n_size) + ratio = col_df['ratio'].to_dict() + ratio_sites[i] = ratio + A, B, C, D, E, F = list(), list(), list(), list(), list(), list() + for decbase in ['A', 'T', 'C', 'G']: + if decbase not in ratio: + ratio[decbase] = 0 + if ratio[decbase] >= 0.6: + A.append(decbase) + if 0.2 <= ratio[decbase] < 0.6: + B.append(decbase) + if 0.15 <= ratio[decbase] < 0.2: + C.append(decbase) + if 0.1 <= ratio[decbase] < 0.15: + D.append(decbase) + if 0.08 <= ratio[decbase] < 0.1: + E.append(decbase) + if ratio[decbase] < 0.08: + F.append(decbase) + + A_num, B_num, C_num, D_num, E_num, F_num = len(A), len(B), len(C), len(D), len(E), len(F) + if not ((B_num + C_num + D_num == 4) or (F_num == 1 and (A_num + B_num) == 3) or ( + E_num == 1 and D_num == 1 and (A_num + B_num + C_num) == 2) or ( + E_num == 1 and (A_num + B_num + C_num) == 3)): + is_not_balance_list.append( + '第%s位置,算出结果为 %s' % (i, ratio) + ) + return ratio_sites, is_not_balance_list + + +# 定义遗传算法 +class Ga: + """ + # 定义遗传算法 + """ + + def __init__(self, sheets): + self.sheets = sheets + + # 定义个体的生成方式 + def generate_individual(self): + individual = copy.deepcopy(self.sheets) # 初始解作为个体 + return [individual] + + # 定义评估函数 + @staticmethod + def evaluate(individual): + total_data_needed_sum = 0 + xchip = 0 + try: + for sheetname, data in individual[0][0].items(): + library_data = pd.DataFrame(data) + + size = library_data['data_needed'].sum() + + # 芯片大小不能超过设定限制 + if size > 1700: + return (0, 100000, 100000) + + # barcode有重复 + if len(library_data['barcode'].values) < len(set(library_data['barcode'].values)): + return (0, 100000, 100000) + + # 不平衡文库大于250G 不能添加 + if library_data[library_data['is_balance_lib'] == '否']['data_needed'].sum() > 250: + return (0, 100000, 100000) + + # 碱基不平衡不过不添加,保证前面的数据, 在数据达到1200G的时候开始 + ratio_sites, is_not_balance_list = count_barcode_radio(library_data) + if is_not_balance_list: + return (0, 100000, 100000) + + if library_data[library_data['classification'].str.lower() == 'nextera']['data_needed'].sum() <= 50: + return (0, 100000, 100000) + + # 计算每个sheet的data_needed之和 + total_data_needed_sum += library_data['data_needed'].sum() + + # 记录包含字母"A"的sheet数量 + if any('极致' in value for value in library_data['split']): + xchip += 1 + except Exception: + return (0, 100000, 100000) + + # 返回一个适应度值,目标是最大化总的data_needed之和,最小化sheet的数量, 最少的极致芯片 + total_data_needed_sum, num_sheets, num_xchip = total_data_needed_sum, len(individual[0]), xchip + return total_data_needed_sum, num_sheets, num_xchip + + def run(self): + # 定义遗传算法的参数 + pop_size = 50 + cxpb = 0.7 # 交叉概率 + mutpb = 0.2 # 变异概率 + ngen = 100 # 迭代次数 + + # 初始化遗传算法工具箱 + creator.create("FitnessMax", base.Fitness, weights=(1.0, -1.0, -1.0,)) # 三个目标,一个最大化两个最小化 + creator.create("Individual", list, fitness=creator.FitnessMax) + + toolbox = base.Toolbox() + + # 结构初始化器 + toolbox.register("individual", tools.initRepeat, creator.Individual, self.generate_individual, n=3) + toolbox.register("population", tools.initRepeat, list, toolbox.individual) + toolbox.register("evaluate", self.evaluate) + + # 注册遗传算法所需的操作 + toolbox.register("mate", tools.cxTwoPoint) + toolbox.register("mutate", tools.mutUniformInt, low=1, up=100, indpb=0.2) + toolbox.register("select", tools.selTournament, tournsize=3) + # 初始化种群 + population = toolbox.population(n=pop_size) + + # 运行遗传算法 + algorithms.eaMuPlusLambda(population, toolbox, mu=pop_size, lambda_=pop_size * 2, cxpb=cxpb, mutpb=mutpb, + ngen=ngen, stats=None, halloffame=None) + + # 输出结果 + best_individual = tools.selBest(population, k=1) + print(best_individual) + optimized_sheets = best_individual[0] # 获取最优解 + + # 将优化后的结果输出 + # for i, sheet in enumerate(optimized_sheets): + # sheet.to_excel(f'optimized_sheet_{i + 1}.xlsx', index=False) + return optimized_sheets class AutoLayout: @@ -53,6 +200,7 @@ class AutoLayout: self.logger = log(os.path.basename(f'{path}.txt')) self.return_log = list() self.no_assign_data = list() + self.need_cols = self.read_cols() def read_excel(self): """ @@ -99,7 +247,8 @@ class AutoLayout: if 'nextera' in library_data['classification'].lower(): self.chip_speciallib_nextera_size[chipname] += library_data['size'] - def count_barcode_radio(self, data): + @staticmethod + def count_barcode_radio(data): df = pd.DataFrame(data) ratio_sites = dict() is_not_balance_list = [] @@ -116,11 +265,11 @@ class AutoLayout: col_df = barcode_df.groupby(column).agg({'data_needed': 'sum'}) # 去掉N计数 if 'N' in col_df.index: - base_N_size = col_df.loc['N', 'data_needed'] + base_n_size = col_df.loc['N', 'data_needed'] col_df = col_df.drop('N') else: - base_N_size = 0 - col_df['ratio'] = (col_df['data_needed']) / (total - base_N_size) + base_n_size = 0 + col_df['ratio'] = (col_df['data_needed']) / (total - base_n_size) ratio = col_df['ratio'].to_dict() ratio_sites[i] = ratio A, B, C, D, E, F = list(), list(), list(), list(), list(), list() @@ -195,6 +344,12 @@ class AutoLayout: res = pd.concat([df, newdf]) return res.reset_index() + @staticmethod + def read_cols(): + df = pd.read_excel(os.path.join(basedir, 'rule', 'columns.xlsx')) + cols = list(df['cols'].values) + return cols + def use_rule(self, chipname, classfication): may_classfic = set(self.rule[self.rule['c1'] == classfication]['c2']) if self.chip_customer[chipname].intersection(may_classfic): @@ -204,7 +359,7 @@ class AutoLayout: def judge_data(self, chipname, library_data): size = library_data['size'] # customer = library_data['customer'] - library = library_data['library'] + # library = library_data['library'] classification = library_data['classification'] is_balance_lib = library_data['is_balance_lib'] @@ -269,12 +424,13 @@ class AutoLayout: raise UserWarning('提供excel没有 未测 sheet ,请核查!') ori_library_df = pd.DataFrame(self.ori_data['未测']) - need_col = ['status', '#library', 'sublibrary', 'i5', 'i7', 'data_needed', 'real_data', 'customer', - 'classification', 'priority', 'time', '拆分方式', 'barcode', 'is_balance_lib', '备注', - 'TIPS1', 'TIPS2', 'TIPS3' - ] + # need_col = ['status', '#library', 'sublibrary', 'i5', 'i7', 'data_needed', 'real_data', 'customer', + # 'classification', 'priority', 'time', '拆分方式', 'barcode', 'is_balance_lib', '备注', + # 'TIPS1', 'TIPS2', 'TIPS3' + # ] + self.need_cols = self.read_cols() get_col = set(ori_library_df.columns) - unhave_col = set(need_col) - get_col + unhave_col = set(self.need_cols) - get_col if unhave_col: unhave_fom = '; '.join(unhave_col) @@ -291,10 +447,10 @@ class AutoLayout: ori_library_df.loc[~time_mask, 'note'] = 'time 列非日期' ori_library_df.loc[status_mask, 'note'] = '暂不排样' - need_col.append('note') + # need_col.append('note') no_ori_data = ori_library_df[~(numeric_mask & time_mask) | status_mask] - no_ori_data.loc[:, 'time'] = no_ori_data['time'].apply(format_date) + self.no_assign_data.extend(no_ori_data.to_dict('records')) # 使用布尔索引筛选出不是数字和非日期的行 @@ -332,7 +488,7 @@ class AutoLayout: level=library_df['level'].values[0], customer=library_df['customer'].values[0], classification=library_df['classification'].values[0], - data=library_df[need_col].to_dict('records') + data=library_df[self.need_cols].to_dict('records') )) if flag: @@ -346,9 +502,9 @@ class AutoLayout: level=library_df['level'].values[0], customer=library_df['customer'].values[0], classification=library_df['classification'].values[0], - data=library_df[need_col].to_dict('records') + data=library_df[self.need_cols].to_dict('records') )) - ori_sort_data = sorted(ori_library_data, key=lambda x: (x['level'], x['time'])) + ori_sort_data = sorted(ori_library_data, key=lambda x: (x['level'], x['time'], -x['size'])) i = 0 while ori_sort_data: @@ -397,6 +553,9 @@ class AutoLayout: outputpath = os.path.join(self.output, 'result', outputname) writer = pd.ExcelWriter(outputpath) + # ga = Ga(sheets=self.index_assignments) + # self.index_assignments = ga.run() + chip_loc = 1 librarynum = 0 for chip_idx, chip_assignments in self.index_assignments.items(): @@ -415,9 +574,23 @@ class AutoLayout: else: addname = '' self.dec_barcode_radio(chip_idx) - df.to_excel(writer, sheet_name=addname + chip_idx, index=False) + chipname = addname + chip_idx + + sum_list = list() + for library, library_df in df.groupby('#library'): + sum_list.append(dict( + 二次拆分=library, + 客户=library_df['customer'].values[0], + 类型=library_df['classification'].values[0], + 打折前=library_df['data_needed'].sum() + )) + df_sum = pd.DataFrame(sum_list) + res_df = pd.concat([df, df_sum], axis=1) + res_df.to_excel(writer, sheet_name=chipname, index=False) chip_loc += 1 no_assign_df = pd.DataFrame(self.no_assign_data) + no_assign_df = no_assign_df.applymap(lambda x: format_date(x) if isinstance(x, pd.Timestamp) else x) + no_assign_df = no_assign_df[self.need_cols] no_assign_df.to_excel(writer, sheet_name='未测', index=False) if self.return_log: pd.DataFrame(self.return_log).to_excel(writer, sheet_name='log', index=False)