main
chaopower 2024-01-30 14:31:18 +08:00
parent 41d6e0cc7c
commit c203913bd4
2 changed files with 193 additions and 20 deletions

BIN
rule/columns.xlsx 100644

Binary file not shown.

View File

@ -5,16 +5,163 @@ from collections import defaultdict
from datetime import datetime from datetime import datetime
import pandas as pd import pandas as pd
from deap import base, creator, tools, algorithms
from tools.common import basedir, log from tools.common import basedir, log
# 定义一个格式化函数 # 定义一个格式化函数
def format_date(date): def format_date(date):
if isinstance(date, pd.Timestamp):
return date.strftime('%Y-%m-%d') return date.strftime('%Y-%m-%d')
def count_barcode_radio(data):
df = pd.DataFrame(data)
ratio_sites = dict()
is_not_balance_list = []
if df.empty:
return ratio_sites, is_not_balance_list
df['barcode'] = df['barcode'].str.slice(0, 16)
barcode_df = pd.DataFrame(df['barcode'].str.split('', expand=True).iloc[:, 1:-1].values,
columns=['T' + str(x) for x in range(16)]).join(df['data_needed'])
total = barcode_df['data_needed'].sum()
for i in range(16):
column = 'T' + str(i)
col_df = barcode_df.groupby(column).agg({'data_needed': 'sum'})
# 去掉N计数
if 'N' in col_df.index:
base_n_size = col_df.loc['N', 'data_needed']
col_df = col_df.drop('N')
else: else:
return str(date) base_n_size = 0
col_df['ratio'] = (col_df['data_needed']) / (total - base_n_size)
ratio = col_df['ratio'].to_dict()
ratio_sites[i] = ratio
A, B, C, D, E, F = list(), list(), list(), list(), list(), list()
for decbase in ['A', 'T', 'C', 'G']:
if decbase not in ratio:
ratio[decbase] = 0
if ratio[decbase] >= 0.6:
A.append(decbase)
if 0.2 <= ratio[decbase] < 0.6:
B.append(decbase)
if 0.15 <= ratio[decbase] < 0.2:
C.append(decbase)
if 0.1 <= ratio[decbase] < 0.15:
D.append(decbase)
if 0.08 <= ratio[decbase] < 0.1:
E.append(decbase)
if ratio[decbase] < 0.08:
F.append(decbase)
A_num, B_num, C_num, D_num, E_num, F_num = len(A), len(B), len(C), len(D), len(E), len(F)
if not ((B_num + C_num + D_num == 4) or (F_num == 1 and (A_num + B_num) == 3) or (
E_num == 1 and D_num == 1 and (A_num + B_num + C_num) == 2) or (
E_num == 1 and (A_num + B_num + C_num) == 3)):
is_not_balance_list.append(
'%s位置,算出结果为 %s' % (i, ratio)
)
return ratio_sites, is_not_balance_list
# 定义遗传算法
class Ga:
"""
# 定义遗传算法
"""
def __init__(self, sheets):
self.sheets = sheets
# 定义个体的生成方式
def generate_individual(self):
individual = copy.deepcopy(self.sheets) # 初始解作为个体
return [individual]
# 定义评估函数
@staticmethod
def evaluate(individual):
total_data_needed_sum = 0
xchip = 0
try:
for sheetname, data in individual[0][0].items():
library_data = pd.DataFrame(data)
size = library_data['data_needed'].sum()
# 芯片大小不能超过设定限制
if size > 1700:
return (0, 100000, 100000)
# barcode有重复
if len(library_data['barcode'].values) < len(set(library_data['barcode'].values)):
return (0, 100000, 100000)
# 不平衡文库大于250G 不能添加
if library_data[library_data['is_balance_lib'] == '']['data_needed'].sum() > 250:
return (0, 100000, 100000)
# 碱基不平衡不过不添加,保证前面的数据, 在数据达到1200G的时候开始
ratio_sites, is_not_balance_list = count_barcode_radio(library_data)
if is_not_balance_list:
return (0, 100000, 100000)
if library_data[library_data['classification'].str.lower() == 'nextera']['data_needed'].sum() <= 50:
return (0, 100000, 100000)
# 计算每个sheet的data_needed之和
total_data_needed_sum += library_data['data_needed'].sum()
# 记录包含字母"A"的sheet数量
if any('极致' in value for value in library_data['split']):
xchip += 1
except Exception:
return (0, 100000, 100000)
# 返回一个适应度值目标是最大化总的data_needed之和最小化sheet的数量, 最少的极致芯片
total_data_needed_sum, num_sheets, num_xchip = total_data_needed_sum, len(individual[0]), xchip
return total_data_needed_sum, num_sheets, num_xchip
def run(self):
# 定义遗传算法的参数
pop_size = 50
cxpb = 0.7 # 交叉概率
mutpb = 0.2 # 变异概率
ngen = 100 # 迭代次数
# 初始化遗传算法工具箱
creator.create("FitnessMax", base.Fitness, weights=(1.0, -1.0, -1.0,)) # 三个目标,一个最大化两个最小化
creator.create("Individual", list, fitness=creator.FitnessMax)
toolbox = base.Toolbox()
# 结构初始化器
toolbox.register("individual", tools.initRepeat, creator.Individual, self.generate_individual, n=3)
toolbox.register("population", tools.initRepeat, list, toolbox.individual)
toolbox.register("evaluate", self.evaluate)
# 注册遗传算法所需的操作
toolbox.register("mate", tools.cxTwoPoint)
toolbox.register("mutate", tools.mutUniformInt, low=1, up=100, indpb=0.2)
toolbox.register("select", tools.selTournament, tournsize=3)
# 初始化种群
population = toolbox.population(n=pop_size)
# 运行遗传算法
algorithms.eaMuPlusLambda(population, toolbox, mu=pop_size, lambda_=pop_size * 2, cxpb=cxpb, mutpb=mutpb,
ngen=ngen, stats=None, halloffame=None)
# 输出结果
best_individual = tools.selBest(population, k=1)
print(best_individual)
optimized_sheets = best_individual[0] # 获取最优解
# 将优化后的结果输出
# for i, sheet in enumerate(optimized_sheets):
# sheet.to_excel(f'optimized_sheet_{i + 1}.xlsx', index=False)
return optimized_sheets
class AutoLayout: class AutoLayout:
@ -53,6 +200,7 @@ class AutoLayout:
self.logger = log(os.path.basename(f'{path}.txt')) self.logger = log(os.path.basename(f'{path}.txt'))
self.return_log = list() self.return_log = list()
self.no_assign_data = list() self.no_assign_data = list()
self.need_cols = self.read_cols()
def read_excel(self): def read_excel(self):
""" """
@ -99,7 +247,8 @@ class AutoLayout:
if 'nextera' in library_data['classification'].lower(): if 'nextera' in library_data['classification'].lower():
self.chip_speciallib_nextera_size[chipname] += library_data['size'] self.chip_speciallib_nextera_size[chipname] += library_data['size']
def count_barcode_radio(self, data): @staticmethod
def count_barcode_radio(data):
df = pd.DataFrame(data) df = pd.DataFrame(data)
ratio_sites = dict() ratio_sites = dict()
is_not_balance_list = [] is_not_balance_list = []
@ -116,11 +265,11 @@ class AutoLayout:
col_df = barcode_df.groupby(column).agg({'data_needed': 'sum'}) col_df = barcode_df.groupby(column).agg({'data_needed': 'sum'})
# 去掉N计数 # 去掉N计数
if 'N' in col_df.index: if 'N' in col_df.index:
base_N_size = col_df.loc['N', 'data_needed'] base_n_size = col_df.loc['N', 'data_needed']
col_df = col_df.drop('N') col_df = col_df.drop('N')
else: else:
base_N_size = 0 base_n_size = 0
col_df['ratio'] = (col_df['data_needed']) / (total - base_N_size) col_df['ratio'] = (col_df['data_needed']) / (total - base_n_size)
ratio = col_df['ratio'].to_dict() ratio = col_df['ratio'].to_dict()
ratio_sites[i] = ratio ratio_sites[i] = ratio
A, B, C, D, E, F = list(), list(), list(), list(), list(), list() A, B, C, D, E, F = list(), list(), list(), list(), list(), list()
@ -195,6 +344,12 @@ class AutoLayout:
res = pd.concat([df, newdf]) res = pd.concat([df, newdf])
return res.reset_index() return res.reset_index()
@staticmethod
def read_cols():
df = pd.read_excel(os.path.join(basedir, 'rule', 'columns.xlsx'))
cols = list(df['cols'].values)
return cols
def use_rule(self, chipname, classfication): def use_rule(self, chipname, classfication):
may_classfic = set(self.rule[self.rule['c1'] == classfication]['c2']) may_classfic = set(self.rule[self.rule['c1'] == classfication]['c2'])
if self.chip_customer[chipname].intersection(may_classfic): if self.chip_customer[chipname].intersection(may_classfic):
@ -204,7 +359,7 @@ class AutoLayout:
def judge_data(self, chipname, library_data): def judge_data(self, chipname, library_data):
size = library_data['size'] size = library_data['size']
# customer = library_data['customer'] # customer = library_data['customer']
library = library_data['library'] # library = library_data['library']
classification = library_data['classification'] classification = library_data['classification']
is_balance_lib = library_data['is_balance_lib'] is_balance_lib = library_data['is_balance_lib']
@ -269,12 +424,13 @@ class AutoLayout:
raise UserWarning('提供excel没有 未测 sheet ,请核查!') raise UserWarning('提供excel没有 未测 sheet ,请核查!')
ori_library_df = pd.DataFrame(self.ori_data['未测']) ori_library_df = pd.DataFrame(self.ori_data['未测'])
need_col = ['status', '#library', 'sublibrary', 'i5', 'i7', 'data_needed', 'real_data', 'customer', # need_col = ['status', '#library', 'sublibrary', 'i5', 'i7', 'data_needed', 'real_data', 'customer',
'classification', 'priority', 'time', '拆分方式', 'barcode', 'is_balance_lib', '备注', # 'classification', 'priority', 'time', '拆分方式', 'barcode', 'is_balance_lib', '备注',
'TIPS1', 'TIPS2', 'TIPS3' # 'TIPS1', 'TIPS2', 'TIPS3'
] # ]
self.need_cols = self.read_cols()
get_col = set(ori_library_df.columns) get_col = set(ori_library_df.columns)
unhave_col = set(need_col) - get_col unhave_col = set(self.need_cols) - get_col
if unhave_col: if unhave_col:
unhave_fom = '; '.join(unhave_col) unhave_fom = '; '.join(unhave_col)
@ -291,10 +447,10 @@ class AutoLayout:
ori_library_df.loc[~time_mask, 'note'] = 'time 列非日期' ori_library_df.loc[~time_mask, 'note'] = 'time 列非日期'
ori_library_df.loc[status_mask, 'note'] = '暂不排样' ori_library_df.loc[status_mask, 'note'] = '暂不排样'
need_col.append('note') # need_col.append('note')
no_ori_data = ori_library_df[~(numeric_mask & time_mask) | status_mask] no_ori_data = ori_library_df[~(numeric_mask & time_mask) | status_mask]
no_ori_data.loc[:, 'time'] = no_ori_data['time'].apply(format_date)
self.no_assign_data.extend(no_ori_data.to_dict('records')) self.no_assign_data.extend(no_ori_data.to_dict('records'))
# 使用布尔索引筛选出不是数字和非日期的行 # 使用布尔索引筛选出不是数字和非日期的行
@ -332,7 +488,7 @@ class AutoLayout:
level=library_df['level'].values[0], level=library_df['level'].values[0],
customer=library_df['customer'].values[0], customer=library_df['customer'].values[0],
classification=library_df['classification'].values[0], classification=library_df['classification'].values[0],
data=library_df[need_col].to_dict('records') data=library_df[self.need_cols].to_dict('records')
)) ))
if flag: if flag:
@ -346,9 +502,9 @@ class AutoLayout:
level=library_df['level'].values[0], level=library_df['level'].values[0],
customer=library_df['customer'].values[0], customer=library_df['customer'].values[0],
classification=library_df['classification'].values[0], classification=library_df['classification'].values[0],
data=library_df[need_col].to_dict('records') data=library_df[self.need_cols].to_dict('records')
)) ))
ori_sort_data = sorted(ori_library_data, key=lambda x: (x['level'], x['time'])) ori_sort_data = sorted(ori_library_data, key=lambda x: (x['level'], x['time'], -x['size']))
i = 0 i = 0
while ori_sort_data: while ori_sort_data:
@ -397,6 +553,9 @@ class AutoLayout:
outputpath = os.path.join(self.output, 'result', outputname) outputpath = os.path.join(self.output, 'result', outputname)
writer = pd.ExcelWriter(outputpath) writer = pd.ExcelWriter(outputpath)
# ga = Ga(sheets=self.index_assignments)
# self.index_assignments = ga.run()
chip_loc = 1 chip_loc = 1
librarynum = 0 librarynum = 0
for chip_idx, chip_assignments in self.index_assignments.items(): for chip_idx, chip_assignments in self.index_assignments.items():
@ -415,9 +574,23 @@ class AutoLayout:
else: else:
addname = '' addname = ''
self.dec_barcode_radio(chip_idx) self.dec_barcode_radio(chip_idx)
df.to_excel(writer, sheet_name=addname + chip_idx, index=False) chipname = addname + chip_idx
sum_list = list()
for library, library_df in df.groupby('#library'):
sum_list.append(dict(
二次拆分=library,
客户=library_df['customer'].values[0],
类型=library_df['classification'].values[0],
打折前=library_df['data_needed'].sum()
))
df_sum = pd.DataFrame(sum_list)
res_df = pd.concat([df, df_sum], axis=1)
res_df.to_excel(writer, sheet_name=chipname, index=False)
chip_loc += 1 chip_loc += 1
no_assign_df = pd.DataFrame(self.no_assign_data) no_assign_df = pd.DataFrame(self.no_assign_data)
no_assign_df = no_assign_df.applymap(lambda x: format_date(x) if isinstance(x, pd.Timestamp) else x)
no_assign_df = no_assign_df[self.need_cols]
no_assign_df.to_excel(writer, sheet_name='未测', index=False) no_assign_df.to_excel(writer, sheet_name='未测', index=False)
if self.return_log: if self.return_log:
pd.DataFrame(self.return_log).to_excel(writer, sheet_name='log', index=False) pd.DataFrame(self.return_log).to_excel(writer, sheet_name='log', index=False)