layout/tools/novaplus.py

245 lines
9.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import os
import socket
import pandas as pd
from collections import defaultdict
from datetime import datetime
import time
import logging
import os
from tools.common import basedir, log
class AutoLayout:
"""
自动化派样
"""
def __init__(self, path, output=basedir, data_limit=1600):
self.path = path
self.output = output
self.data_limit = data_limit
self.index_assignments = defaultdict(list)
# 芯片数量量大小
self.chip_size = dict()
# 芯片是否极致
self.chip_type = dict()
# 芯片barcode
self.chip_barcode_recode = defaultdict(set)
# 芯片原始数据读取
self.ori_data = self.read_excel()
# 当前锚芯片
self.loc_chip_num = 1
# 芯片 文库计数
self.chip_lib_type = defaultdict(dict)
self.logger = log(os.path.basename(f'{path}.txt'))
self.return_log = list()
def read_excel(self):
"""
原始数据处理
:return:
"""
merge = pd.read_excel(self.path, None)
ori_data = dict()
for name, sheet in merge.items():
sheet.fillna('.', inplace=True)
ori_data[name] = sheet.to_dict('records')
return ori_data
def add_new_data(self, chipname, library_data, newer=True):
"""
增加新数据到已知芯片上
:param chipname:
:param library_data:
:param newer:
:return:
"""
self.index_assignments[chipname].extend(library_data['data'])
self.chip_barcode_recode[chipname].update({item['barcode'] for item in library_data['data']})
if newer:
self.chip_size[chipname] = library_data['size']
else:
self.chip_size[chipname] += library_data['size']
if library_data['lib_type'] in self.chip_lib_type[chipname]:
self.chip_lib_type[chipname][library_data['lib_type']] += library_data['size']
else:
self.chip_lib_type[chipname][library_data['lib_type']] = library_data['size']
def dec_barcode_radio(self, chipname):
data = self.index_assignments[chipname]
df = pd.DataFrame(data)
barcode_df = pd.DataFrame(df['barcode'].str.split('', expand=True).iloc[:, 1:-1].values,
columns=['T' + str(x) for x in range(16)]).join(df['data_needed'])
total = barcode_df['data_needed'].sum()
is_not_balance_list = []
for i in range(16):
column = 'T' + str(i)
col_df = barcode_df.groupby(column).agg({'data_needed': 'sum'})
# 去掉N计数
if 'N' in col_df.index:
base_N_size = col_df.loc['N', 'data_needed']
col_df = col_df.drop('N')
else:
base_N_size = 0
col_df['ratio'] = (col_df['data_needed']) / (total - base_N_size)
need_base_list = list()
ratio = col_df['ratio'].to_dict()
for decbase in ['A', 'T', 'C']:
if decbase not in ratio:
ratio[decbase] = 0
need_base_list.append(decbase)
continue
if ratio[decbase] < 0.1:
need_base_list.append(decbase)
# 小于标准的base 是不是空的,空的说明都满足
if len(need_base_list) > 2:
is_not_balance_list.append(
'[%s] 第%s位置, %s 有碱基不平衡,算出结果为 %s' % (chipname, i, need_base_list, ratio)
)
# 对于G不能超过10%
if 'G' not in ratio:
ratio['G'] = 0
if ratio['G'] > 0.7:
is_not_balance_list.append(
'[%s] 第%s位置, G 含量超过70%%,算出结果为 %s' % (chipname, i, ratio['G'])
)
if is_not_balance_list:
self.return_log.extend(is_not_balance_list)
print('有碱基不平衡性!\n', '\n'.join(is_not_balance_list))
@staticmethod
def read_rule():
df = pd.read_excel(os.path.join(basedir, 'rule', 'lib_type_limit.xlsx'))
return df.to_dict('index')
@staticmethod
def level(row):
if row['customer'] == '百奥益康' and '3\'' in row['lib_type']:
return 1
elif row['customer'] == '百奥益康' and '5\'' in row['lib_type']:
return 2
else:
return 100
def judge_data(self, chipname, library_data):
size = library_data['size']
library = library_data['library']
# 芯片大小不能超过设定限制
sizelimit = True
if self.chip_size[chipname] + size > self.data_limit:
sizelimit = False
self.logger.error(f'{library} {chipname} 文库相加大于设定限制')
# barcode有重复
notrepeatbarcode = True
if self.chip_barcode_recode[chipname].intersection({item['barcode'] for item in library_data['data']}):
notrepeatbarcode = False
self.logger.error(f'{library} {chipname} 文库有barcode重复')
# 特定文库不能超过限制
sp_lib1 = True
for _, myrule in self.read_rule().items():
lib_type = myrule['lib_type']
limit = myrule['limit']
if lib_type in self.chip_lib_type[chipname]:
if self.chip_lib_type[chipname][lib_type] + size > self.data_limit * limit:
sp_lib1 = False
self.logger.error(f'{library} {chipname} 文库有大于设定限制')
break
if sizelimit and notrepeatbarcode and sp_lib1:
return True
return False
def assign_samples(self):
ori_library_data = list()
ori_library_df = pd.DataFrame(self.ori_data['未测'])
ori_library_df['level'] = ori_library_df.apply(self.level, axis=1)
for library, library_df in ori_library_df.groupby('#library'):
ori_library_data.append(dict(
library=library,
size=library_df['data_needed'].sum(),
time=library_df['time'].values[0],
customer=library_df['customer'].values[0],
level=library_df['level'].values[0],
status=library_df['status'].values[0],
lib_type=library_df['lib_type'].values[0],
data=library_df.to_dict('records')
))
ori_sort_data = sorted(ori_library_data, key=lambda x: (x['level'], x['customer'], -x['size'], x['time']))
while ori_sort_data:
library_data = ori_sort_data[0]
chipname = f'lane{self.loc_chip_num}'
# 空白芯片直接添加
if chipname not in self.index_assignments:
self.add_new_data(chipname, library_data)
ori_sort_data.remove(library_data)
continue
# 判断条件
if self.judge_data(chipname, library_data):
self.add_new_data(chipname, library_data, newer=False)
ori_sort_data.remove(library_data)
else:
for j in range(len(ori_sort_data)):
newlibrary_data = ori_sort_data[j]
if self.judge_data(chipname, newlibrary_data):
ori_sort_data.remove(newlibrary_data)
self.add_new_data(chipname, newlibrary_data, newer=False)
break
j += 1
else:
# 代表接下来的数据放到这个chip当中都不行只有换chip了
self.loc_chip_num += 1
# 加完之后下面的数据可能加上去就慢了就换chip
if self.chip_size[chipname] > self.data_limit * 0.99:
self.loc_chip_num += 1
def run(self):
try:
self.assign_samples()
except Exception as e:
self.return_log.append(f'nova_xplus排样出错 请联系!{e}')
self.index_assignments = {}
outputname = 'assignments_%s_%s' % (datetime.now().strftime("%m%d%H%M"), os.path.basename(self.path))
outputpath = os.path.join(self.output, 'result', outputname)
writer = pd.ExcelWriter(outputpath)
no_assign_data = list()
no_assign_chip = list()
for chip_idx, chip_assignments in self.index_assignments.items():
self.dec_barcode_radio(chip_idx)
df = pd.DataFrame(chip_assignments)
if df['data_needed'].sum() < self.data_limit * 0.8:
no_assign_chip.append(chip_idx)
no_assign_data.extend(chip_assignments)
continue
df.to_excel(writer, sheet_name=chip_idx, index=False)
pd.DataFrame(no_assign_data).to_excel(writer, sheet_name='未测', index=False)
if self.return_log:
log_res = [splog for splog in self.return_log if
not any(f'[{chip}]' in str(splog) for chip in no_assign_chip)]
pd.DataFrame(log_res).to_excel(writer, sheet_name='log', index=False)
writer.close()
return outputpath
if __name__ == '__main__':
start_time = time.time()
excel_file = '../example/t1(1).xlsx'
output_file = ''
layout = AutoLayout(excel_file, output_file, data_limit=800)
layout.run()
end_time = time.time()
execution_time = end_time - start_time
print(f"代码执行时间为:{execution_time}")