layout/tools/t7.py

304 lines
12 KiB
Python
Raw Normal View History

2023-06-27 13:01:44 +08:00
import pandas as pd
from collections import defaultdict
from datetime import datetime
import time
import os
2023-07-05 17:15:46 +08:00
from .common import basedir, log
2023-06-27 13:01:44 +08:00
class AutoLayout:
"""
自动化派样
"""
def __init__(self, path, output=basedir, data_limit=1520):
self.path = path
self.output = output
self.data_limit = data_limit
self.index_assignments = defaultdict(list)
# 芯片数量量大小
self.chip_size = dict()
# 芯片是否极致
self.chip_type = dict()
# 芯片barcode
self.chip_barcode_recode = defaultdict(set)
# 芯片原始数据读取
self.ori_data = self.read_excel()
# 当前锚芯片
self.loc_chip_num = 1
# 芯片客户
self.chip_customer = defaultdict(set)
self.rule = self.read_rule()
# 甲基化文库不大于200,WGBS文库不大于200G
self.chip_speciallib_size = dict()
self.logger = log(os.path.basename(f'{path}.txt'))
self.return_log = list()
def read_excel(self):
"""
原始数据处理
:return:
"""
merge = pd.read_excel(self.path, None)
ori_data = dict()
for name, sheet in merge.items():
sheet.fillna('.', inplace=True)
ori_data[name] = sheet.to_dict('records')
return ori_data
def add_new_data(self, chipname, library_data, newer=True):
"""
增加新数据到已知芯片上
:param chipname:
:param library_data:
:param newer:
:return:
"""
self.index_assignments[chipname].extend(library_data['data'])
self.chip_barcode_recode[chipname].update({item['barcode'] for item in library_data['data']})
if newer:
self.chip_size[chipname] = library_data['size']
if library_data['classification'] in ['扩增子', '不平衡文库', '单细胞文库以及甲基化']:
self.chip_speciallib_size[chipname] = library_data['size']
else:
self.chip_speciallib_size[chipname] = 0
else:
self.chip_size[chipname] += library_data['size']
if library_data['classification'] in ['扩增子', '不平衡文库', '单细胞文库', '甲基化']:
self.chip_speciallib_size[chipname] += library_data['size']
self.chip_customer[chipname].add(library_data['customer'])
def add_new_chip(self, library_data):
"""
要新增到芯片上的数据
:param library_data:
:return:
"""
chip_num_tmp = self.loc_chip_num
while True:
chip_num_tmp += 1
chipname_tmp = f'chip{chip_num_tmp}'
library = library_data['library']
if chipname_tmp not in self.index_assignments:
self.logger.error(f'{library} {chipname_tmp} 常规添加')
self.add_new_data(chipname_tmp, library_data)
break
else:
is_same_barcode = self.chip_barcode_recode[chipname_tmp].intersection(
{item['barcode'] for item in library_data['data']})
# 没有从重复的index,并且也不互斥的
if ((self.chip_size[chipname_tmp] + library_data['size']) > self.data_limit):
self.logger.error(f'{library} {chipname_tmp} 文库相加大于设定限制')
if ((self.chip_speciallib_size[chipname_tmp] + library_data['size']) >= 200):
self.logger.error(f'{library} {chipname_tmp} 不平衡文库相加大于设定限制')
if is_same_barcode:
self.logger.error(f'{library} {chipname_tmp} 文库有barcode重复')
if self.use_rule(chipname_tmp, library_data['customer']):
self.logger.error(f'{library} {chipname_tmp} 有互斥单位')
if ((self.chip_size[chipname_tmp] + library_data['size']) <= self.data_limit) \
and ((self.chip_speciallib_size[chipname_tmp] + library_data['size']) < 200) \
and (not is_same_barcode) \
and (not self.use_rule(chipname_tmp, library_data['customer'])):
self.add_new_data(chipname_tmp, library_data, newer=False)
break
def dec_barcode_radio(self, chipname):
data = self.index_assignments[chipname]
df = pd.DataFrame(data)
barcode_df = pd.DataFrame(df['barcode'].str.split('', expand=True).iloc[:, 1:-1].values,
columns=['T' + str(x) for x in range(16)]).join(df['data_needed'])
total = barcode_df['data_needed'].sum()
is_not_balance_list = []
for i in range(16):
column = 'T' + str(i)
col_df = barcode_df.groupby(column).agg({'data_needed': 'sum'})
# 去掉N计数
if 'N' in col_df.index:
base_N_size = col_df.loc['N', 'data_needed']
col_df = col_df.drop('N')
else:
base_N_size = 0
col_df['ratio'] = (col_df['data_needed']) / (total - base_N_size)
is_need_base = col_df.index[col_df['ratio'] < 0.088]
need_base_list = list(is_need_base)
ratio = col_df['ratio'].to_dict()
for decbase in ['A', 'T', 'C', 'G']:
if decbase not in ratio:
ratio[decbase] = 0
need_base_list.append(decbase)
# 小于标准的base 是不是空的,空的说明都满足
if need_base_list:
is_not_balance_list.append(
'%s%s位置, %s 有碱基不平衡,算出结果为 %s' % (chipname, i, need_base_list, ratio)
)
2023-07-05 17:15:46 +08:00
if len(is_not_balance_list) > 2:
2023-06-27 13:01:44 +08:00
self.return_log.append('有碱基不平衡性!')
self.return_log.extend(is_not_balance_list)
print('有碱基不平衡性!\n', '\n'.join(is_not_balance_list))
@staticmethod
def level(row):
if row['customer'] == '贞固':
return 1
if row['split_method'] == '极致周期':
return 2
# 医沐、清港泉、WES自己建库也是极致周期
if row['customer'] == '医沐' or row['customer'] == '清港泉':
return 3
# 赛福、桐树基因的文库尽量跟极致周期测人的样本排一起上机
if row['customer'] == '赛福' or row['customer'] == '桐树基因':
return 7
if row['classification'] == 'Nextera':
return 5
if '华大' in row['classification']:
return 6
else:
return 100
@staticmethod
def read_rule():
df = pd.read_excel(os.path.join(basedir, 'rule', 'exclusive.xlsx'))
newdf = pd.DataFrame()
newdf['customer1'] = df['customer2']
newdf['customer1'] = df['customer1']
return pd.concat([df, newdf])
def use_rule(self, chipname, customer):
may_customer = set(self.rule[self.rule['customer1'] == customer]['customer2'])
if self.chip_customer[chipname].intersection(may_customer):
return True
return False
def judge_data(self, chipname, library_data):
size = library_data['size']
customer = library_data['customer']
library = library_data['library']
# 芯片大小不能超过设定限制
sizelimit = True
if self.chip_size[chipname] + size > self.data_limit:
sizelimit = False
self.logger.error(f'{library} {chipname} 文库相加大于设定限制')
# barcode有重复
notrepeatbarcode = True
if self.chip_barcode_recode[chipname].intersection({item['barcode'] for item in library_data['data']}):
notrepeatbarcode = False
self.logger.error(f'{library} {chipname} 文库有barcode重复')
# 互斥的客户
exclusivecostom = True
if self.use_rule(chipname, customer):
exclusivecostom = False
self.logger.error(f'{library} {chipname} 有互斥单位')
# 不平衡文库大于200G 不能添加
splibrary = True
if library_data['classification'] in ['扩增子', '不平衡文库', '单细胞文库', '甲基化'] \
and self.chip_speciallib_size[chipname] + size > 200:
splibrary = False
self.logger.error(f'{library} {chipname} 不平衡文库相加大于设定限制')
if sizelimit and notrepeatbarcode and exclusivecostom and splibrary:
return True
return False
def assign_samples(self):
ori_library_data = list()
ori_library_df = pd.DataFrame(self.ori_data['未测'])
ori_library_df['level'] = ori_library_df.apply(self.level, axis=1)
for library, library_df in ori_library_df.groupby('#library'):
ori_library_data.append(dict(
library=library,
size=library_df['data_needed'].sum(),
split_method=library_df['split_method'].values[0],
time=library_df['time'].values[0],
level=library_df['level'].values[0],
customer=library_df['customer'].values[0],
classification=library_df['classification'].values[0],
data=library_df.to_dict('records')
))
ori_sort_data = sorted(ori_library_data, key=lambda x: (x['level'], -x['size'], x['time']))
i = 0
while ori_sort_data:
library_data = ori_sort_data[0]
chipname = f'chip{self.loc_chip_num}'
# 空白芯片直接添加
if chipname not in self.index_assignments:
self.add_new_data(chipname, library_data)
ori_sort_data.remove(library_data)
i += 1
continue
# 判断条件
if self.judge_data(chipname, library_data):
self.add_new_data(chipname, library_data, newer=False)
ori_sort_data.remove(library_data)
i += 1
else:
for j in range(len(ori_sort_data)):
newlibrary_data = ori_sort_data[j]
if self.judge_data(chipname, newlibrary_data):
ori_sort_data.remove(newlibrary_data)
i += 1
self.add_new_data(chipname, newlibrary_data, newer=False)
break
j += 1
else:
self.loc_chip_num += 1
if self.chip_size[chipname] > 1500:
self.loc_chip_num += 1
def assign_again(self):
pass
def run(self):
2023-07-05 17:15:46 +08:00
try:
self.assign_samples()
except Exception as e:
2023-07-12 14:27:18 +08:00
self.return_log.append(f'T7排样出错 请联系!{e}')
2023-07-05 17:15:46 +08:00
self.index_assignments = {}
2023-06-27 13:01:44 +08:00
outputname = 'assignments_%s_%s' % (datetime.now().strftime("%m%d%H%M"), os.path.basename(self.path))
outputpath = os.path.join(self.output, 'result', outputname)
writer = pd.ExcelWriter(outputpath)
no_assign_data = list()
for chip_idx, chip_assignments in self.index_assignments.items():
self.dec_barcode_radio(chip_idx)
df = pd.DataFrame(chip_assignments)
if df['data_needed'].sum() < 1400:
no_assign_data.extend(chip_assignments)
continue
if '极致周期' in df['split_method'].values:
addname = 'X'
else:
addname = ''
df.to_excel(writer, sheet_name=addname + chip_idx, index=False)
pd.DataFrame(no_assign_data).to_excel(writer, sheet_name='未测', index=False)
if self.return_log:
pd.DataFrame(self.return_log).to_excel(writer, sheet_name='log', index=False)
writer.close()
return outputpath
if __name__ == '__main__':
2023-07-05 17:15:46 +08:00
start_time = time.time()
excel_file = 'example/07031754_20230703.xlsx'
output_file = ''
layout = AutoLayout(excel_file, output_file)
layout.run()
end_time = time.time()
execution_time = end_time - start_time
print(f"代码执行时间为:{execution_time}")
# server()