245 lines
9.6 KiB
Python
245 lines
9.6 KiB
Python
import os
|
||
import socket
|
||
|
||
import pandas as pd
|
||
from collections import defaultdict
|
||
from datetime import datetime
|
||
import time
|
||
import logging
|
||
import os
|
||
from tools.common import basedir, log
|
||
|
||
|
||
class AutoLayout:
|
||
"""
|
||
自动化派样
|
||
"""
|
||
|
||
def __init__(self, path, output=basedir, data_limit=1600):
|
||
self.path = path
|
||
self.output = output
|
||
self.data_limit = data_limit
|
||
|
||
self.index_assignments = defaultdict(list)
|
||
# 芯片数量量大小
|
||
self.chip_size = dict()
|
||
# 芯片是否极致
|
||
self.chip_type = dict()
|
||
# 芯片barcode
|
||
self.chip_barcode_recode = defaultdict(set)
|
||
# 芯片原始数据读取
|
||
self.ori_data = self.read_excel()
|
||
# 当前锚芯片
|
||
self.loc_chip_num = 1
|
||
# 芯片 文库计数
|
||
self.chip_lib_type = defaultdict(dict)
|
||
|
||
self.logger = log(os.path.basename(f'{path}.txt'))
|
||
self.return_log = list()
|
||
|
||
def read_excel(self):
|
||
"""
|
||
原始数据处理
|
||
:return:
|
||
"""
|
||
merge = pd.read_excel(self.path, None)
|
||
ori_data = dict()
|
||
for name, sheet in merge.items():
|
||
sheet.fillna('.', inplace=True)
|
||
ori_data[name] = sheet.to_dict('records')
|
||
return ori_data
|
||
|
||
def add_new_data(self, chipname, library_data, newer=True):
|
||
"""
|
||
增加新数据到已知芯片上
|
||
:param chipname:
|
||
:param library_data:
|
||
:param newer:
|
||
:return:
|
||
"""
|
||
self.index_assignments[chipname].extend(library_data['data'])
|
||
self.chip_barcode_recode[chipname].update({item['barcode'] for item in library_data['data']})
|
||
if newer:
|
||
self.chip_size[chipname] = library_data['size']
|
||
|
||
else:
|
||
self.chip_size[chipname] += library_data['size']
|
||
if library_data['lib_type'] in self.chip_lib_type[chipname]:
|
||
self.chip_lib_type[chipname][library_data['lib_type']] += library_data['size']
|
||
else:
|
||
self.chip_lib_type[chipname][library_data['lib_type']] = library_data['size']
|
||
|
||
def dec_barcode_radio(self, chipname):
|
||
data = self.index_assignments[chipname]
|
||
df = pd.DataFrame(data)
|
||
barcode_df = pd.DataFrame(df['barcode'].str.split('', expand=True).iloc[:, 1:-1].values,
|
||
columns=['T' + str(x) for x in range(16)]).join(df['data_needed'])
|
||
total = barcode_df['data_needed'].sum()
|
||
is_not_balance_list = []
|
||
for i in range(16):
|
||
column = 'T' + str(i)
|
||
col_df = barcode_df.groupby(column).agg({'data_needed': 'sum'})
|
||
# 去掉N计数
|
||
if 'N' in col_df.index:
|
||
base_N_size = col_df.loc['N', 'data_needed']
|
||
col_df = col_df.drop('N')
|
||
else:
|
||
base_N_size = 0
|
||
col_df['ratio'] = (col_df['data_needed']) / (total - base_N_size)
|
||
|
||
need_base_list = list()
|
||
ratio = col_df['ratio'].to_dict()
|
||
|
||
for decbase in ['A', 'T', 'C']:
|
||
if decbase not in ratio:
|
||
ratio[decbase] = 0
|
||
need_base_list.append(decbase)
|
||
continue
|
||
if ratio[decbase] < 0.1:
|
||
need_base_list.append(decbase)
|
||
# 小于标准的base 是不是空的,空的说明都满足
|
||
if len(need_base_list) > 2:
|
||
is_not_balance_list.append(
|
||
'[%s] 第%s位置, %s 有碱基不平衡,算出结果为 %s' % (chipname, i, need_base_list, ratio)
|
||
)
|
||
# 对于G不能超过10%
|
||
if 'G' not in ratio:
|
||
ratio['G'] = 0
|
||
if ratio['G'] > 0.7:
|
||
is_not_balance_list.append(
|
||
'[%s] 第%s位置, G 含量超过70%%,算出结果为 %s' % (chipname, i, ratio['G'])
|
||
)
|
||
if is_not_balance_list:
|
||
self.return_log.extend(is_not_balance_list)
|
||
print('有碱基不平衡性!\n', '\n'.join(is_not_balance_list))
|
||
|
||
@staticmethod
|
||
def read_rule():
|
||
df = pd.read_excel(os.path.join(basedir, 'rule', 'lib_type_limit.xlsx'))
|
||
return df.to_dict('index')
|
||
|
||
@staticmethod
|
||
def level(row):
|
||
if row['customer'] == '百奥益康' and '3\'' in row['lib_type']:
|
||
return 1
|
||
elif row['customer'] == '百奥益康' and '5\'' in row['lib_type']:
|
||
return 2
|
||
else:
|
||
return 100
|
||
|
||
def judge_data(self, chipname, library_data):
|
||
size = library_data['size']
|
||
library = library_data['library']
|
||
|
||
# 芯片大小不能超过设定限制
|
||
sizelimit = True
|
||
if self.chip_size[chipname] + size > self.data_limit:
|
||
sizelimit = False
|
||
self.logger.error(f'{library} {chipname} 文库相加大于设定限制')
|
||
|
||
# barcode有重复
|
||
notrepeatbarcode = True
|
||
if self.chip_barcode_recode[chipname].intersection({item['barcode'] for item in library_data['data']}):
|
||
notrepeatbarcode = False
|
||
self.logger.error(f'{library} {chipname} 文库有barcode重复')
|
||
|
||
# 特定文库不能超过限制
|
||
sp_lib1 = True
|
||
for _, myrule in self.read_rule().items():
|
||
lib_type = myrule['lib_type']
|
||
limit = myrule['limit']
|
||
if lib_type in self.chip_lib_type[chipname]:
|
||
if self.chip_lib_type[chipname][lib_type] + size > self.data_limit * limit:
|
||
sp_lib1 = False
|
||
self.logger.error(f'{library} {chipname} 文库有大于设定限制')
|
||
break
|
||
|
||
if sizelimit and notrepeatbarcode and sp_lib1:
|
||
return True
|
||
return False
|
||
|
||
def assign_samples(self):
|
||
ori_library_data = list()
|
||
ori_library_df = pd.DataFrame(self.ori_data['未测'])
|
||
ori_library_df['level'] = ori_library_df.apply(self.level, axis=1)
|
||
for library, library_df in ori_library_df.groupby('#library'):
|
||
ori_library_data.append(dict(
|
||
library=library,
|
||
size=library_df['data_needed'].sum(),
|
||
time=library_df['time'].values[0],
|
||
customer=library_df['customer'].values[0],
|
||
level=library_df['level'].values[0],
|
||
status=library_df['status'].values[0],
|
||
lib_type=library_df['lib_type'].values[0],
|
||
data=library_df.to_dict('records')
|
||
))
|
||
ori_sort_data = sorted(ori_library_data, key=lambda x: (x['level'], x['customer'], -x['size'], x['time']))
|
||
|
||
while ori_sort_data:
|
||
library_data = ori_sort_data[0]
|
||
chipname = f'lane{self.loc_chip_num}'
|
||
|
||
# 空白芯片直接添加
|
||
if chipname not in self.index_assignments:
|
||
self.add_new_data(chipname, library_data)
|
||
ori_sort_data.remove(library_data)
|
||
continue
|
||
|
||
# 判断条件
|
||
if self.judge_data(chipname, library_data):
|
||
self.add_new_data(chipname, library_data, newer=False)
|
||
ori_sort_data.remove(library_data)
|
||
else:
|
||
for j in range(len(ori_sort_data)):
|
||
newlibrary_data = ori_sort_data[j]
|
||
if self.judge_data(chipname, newlibrary_data):
|
||
ori_sort_data.remove(newlibrary_data)
|
||
self.add_new_data(chipname, newlibrary_data, newer=False)
|
||
break
|
||
j += 1
|
||
else:
|
||
# 代表接下来的数据放到这个chip当中都不行,只有换chip了
|
||
self.loc_chip_num += 1
|
||
# 加完之后下面的数据可能加上去就慢了就换chip
|
||
if self.chip_size[chipname] > self.data_limit * 0.99:
|
||
self.loc_chip_num += 1
|
||
|
||
def run(self):
|
||
try:
|
||
self.assign_samples()
|
||
except Exception as e:
|
||
self.return_log.append(f'nova_xplus排样出错, 请联系!{e}')
|
||
self.index_assignments = {}
|
||
outputname = 'assignments_%s_%s' % (datetime.now().strftime("%m%d%H%M"), os.path.basename(self.path))
|
||
outputpath = os.path.join(self.output, 'result', outputname)
|
||
writer = pd.ExcelWriter(outputpath)
|
||
|
||
no_assign_data = list()
|
||
no_assign_chip = list()
|
||
for chip_idx, chip_assignments in self.index_assignments.items():
|
||
self.dec_barcode_radio(chip_idx)
|
||
df = pd.DataFrame(chip_assignments)
|
||
if df['data_needed'].sum() < self.data_limit * 0.8:
|
||
no_assign_chip.append(chip_idx)
|
||
no_assign_data.extend(chip_assignments)
|
||
continue
|
||
df.to_excel(writer, sheet_name=chip_idx, index=False)
|
||
pd.DataFrame(no_assign_data).to_excel(writer, sheet_name='未测', index=False)
|
||
if self.return_log:
|
||
log_res = [splog for splog in self.return_log if
|
||
not any(f'[{chip}]' in str(splog) for chip in no_assign_chip)]
|
||
pd.DataFrame(log_res).to_excel(writer, sheet_name='log', index=False)
|
||
writer.close()
|
||
return outputpath
|
||
|
||
|
||
if __name__ == '__main__':
|
||
start_time = time.time()
|
||
excel_file = '../example/t1(1).xlsx'
|
||
output_file = ''
|
||
layout = AutoLayout(excel_file, output_file, data_limit=800)
|
||
layout.run()
|
||
end_time = time.time()
|
||
execution_time = end_time - start_time
|
||
print(f"代码执行时间为:{execution_time} 秒")
|