layout/tools/t7.py

304 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import pandas as pd
from collections import defaultdict
from datetime import datetime
import time
import os
from .common import basedir, log
class AutoLayout:
"""
自动化派样
"""
def __init__(self, path, output=basedir, data_limit=1520):
self.path = path
self.output = output
self.data_limit = data_limit
self.index_assignments = defaultdict(list)
# 芯片数量量大小
self.chip_size = dict()
# 芯片是否极致
self.chip_type = dict()
# 芯片barcode
self.chip_barcode_recode = defaultdict(set)
# 芯片原始数据读取
self.ori_data = self.read_excel()
# 当前锚芯片
self.loc_chip_num = 1
# 芯片客户
self.chip_customer = defaultdict(set)
self.rule = self.read_rule()
# 甲基化文库不大于200,WGBS文库不大于200G
self.chip_speciallib_size = dict()
self.logger = log(os.path.basename(f'{path}.txt'))
self.return_log = list()
def read_excel(self):
"""
原始数据处理
:return:
"""
merge = pd.read_excel(self.path, None)
ori_data = dict()
for name, sheet in merge.items():
sheet.fillna('.', inplace=True)
ori_data[name] = sheet.to_dict('records')
return ori_data
def add_new_data(self, chipname, library_data, newer=True):
"""
增加新数据到已知芯片上
:param chipname:
:param library_data:
:param newer:
:return:
"""
self.index_assignments[chipname].extend(library_data['data'])
self.chip_barcode_recode[chipname].update({item['barcode'] for item in library_data['data']})
if newer:
self.chip_size[chipname] = library_data['size']
if library_data['classification'] in ['扩增子', '不平衡文库', '单细胞文库以及甲基化']:
self.chip_speciallib_size[chipname] = library_data['size']
else:
self.chip_speciallib_size[chipname] = 0
else:
self.chip_size[chipname] += library_data['size']
if library_data['classification'] in ['扩增子', '不平衡文库', '单细胞文库', '甲基化']:
self.chip_speciallib_size[chipname] += library_data['size']
self.chip_customer[chipname].add(library_data['customer'])
def add_new_chip(self, library_data):
"""
要新增到芯片上的数据
:param library_data:
:return:
"""
chip_num_tmp = self.loc_chip_num
while True:
chip_num_tmp += 1
chipname_tmp = f'chip{chip_num_tmp}'
library = library_data['library']
if chipname_tmp not in self.index_assignments:
self.logger.error(f'{library} {chipname_tmp} 常规添加')
self.add_new_data(chipname_tmp, library_data)
break
else:
is_same_barcode = self.chip_barcode_recode[chipname_tmp].intersection(
{item['barcode'] for item in library_data['data']})
# 没有从重复的index,并且也不互斥的
if ((self.chip_size[chipname_tmp] + library_data['size']) > self.data_limit):
self.logger.error(f'{library} {chipname_tmp} 文库相加大于设定限制')
if ((self.chip_speciallib_size[chipname_tmp] + library_data['size']) >= 200):
self.logger.error(f'{library} {chipname_tmp} 不平衡文库相加大于设定限制')
if is_same_barcode:
self.logger.error(f'{library} {chipname_tmp} 文库有barcode重复')
if self.use_rule(chipname_tmp, library_data['customer']):
self.logger.error(f'{library} {chipname_tmp} 有互斥单位')
if ((self.chip_size[chipname_tmp] + library_data['size']) <= self.data_limit) \
and ((self.chip_speciallib_size[chipname_tmp] + library_data['size']) < 200) \
and (not is_same_barcode) \
and (not self.use_rule(chipname_tmp, library_data['customer'])):
self.add_new_data(chipname_tmp, library_data, newer=False)
break
def dec_barcode_radio(self, chipname):
data = self.index_assignments[chipname]
df = pd.DataFrame(data)
barcode_df = pd.DataFrame(df['barcode'].str.split('', expand=True).iloc[:, 1:-1].values,
columns=['T' + str(x) for x in range(16)]).join(df['data_needed'])
total = barcode_df['data_needed'].sum()
is_not_balance_list = []
for i in range(16):
column = 'T' + str(i)
col_df = barcode_df.groupby(column).agg({'data_needed': 'sum'})
# 去掉N计数
if 'N' in col_df.index:
base_N_size = col_df.loc['N', 'data_needed']
col_df = col_df.drop('N')
else:
base_N_size = 0
col_df['ratio'] = (col_df['data_needed']) / (total - base_N_size)
is_need_base = col_df.index[col_df['ratio'] < 0.088]
need_base_list = list(is_need_base)
ratio = col_df['ratio'].to_dict()
for decbase in ['A', 'T', 'C', 'G']:
if decbase not in ratio:
ratio[decbase] = 0
need_base_list.append(decbase)
# 小于标准的base 是不是空的,空的说明都满足
if need_base_list:
is_not_balance_list.append(
'%s%s位置, %s 有碱基不平衡,算出结果为 %s' % (chipname, i, need_base_list, ratio)
)
if len(is_not_balance_list) > 2:
self.return_log.append('有碱基不平衡性!')
self.return_log.extend(is_not_balance_list)
print('有碱基不平衡性!\n', '\n'.join(is_not_balance_list))
@staticmethod
def level(row):
if row['customer'] == '贞固':
return 1
if row['split_method'] == '极致周期':
return 2
# 医沐、清港泉、WES自己建库也是极致周期
if row['customer'] == '医沐' or row['customer'] == '清港泉':
return 3
# 赛福、桐树基因的文库尽量跟极致周期测人的样本排一起上机
if row['customer'] == '赛福' or row['customer'] == '桐树基因':
return 7
if row['classification'] == 'Nextera':
return 5
if '华大' in row['classification']:
return 6
else:
return 100
@staticmethod
def read_rule():
df = pd.read_excel(os.path.join(basedir, 'rule', 'exclusive.xlsx'))
newdf = pd.DataFrame()
newdf['customer1'] = df['customer2']
newdf['customer1'] = df['customer1']
return pd.concat([df, newdf])
def use_rule(self, chipname, customer):
may_customer = set(self.rule[self.rule['customer1'] == customer]['customer2'])
if self.chip_customer[chipname].intersection(may_customer):
return True
return False
def judge_data(self, chipname, library_data):
size = library_data['size']
customer = library_data['customer']
library = library_data['library']
# 芯片大小不能超过设定限制
sizelimit = True
if self.chip_size[chipname] + size > self.data_limit:
sizelimit = False
self.logger.error(f'{library} {chipname} 文库相加大于设定限制')
# barcode有重复
notrepeatbarcode = True
if self.chip_barcode_recode[chipname].intersection({item['barcode'] for item in library_data['data']}):
notrepeatbarcode = False
self.logger.error(f'{library} {chipname} 文库有barcode重复')
# 互斥的客户
exclusivecostom = True
if self.use_rule(chipname, customer):
exclusivecostom = False
self.logger.error(f'{library} {chipname} 有互斥单位')
# 不平衡文库大于200G 不能添加
splibrary = True
if library_data['classification'] in ['扩增子', '不平衡文库', '单细胞文库', '甲基化'] \
and self.chip_speciallib_size[chipname] + size > 200:
splibrary = False
self.logger.error(f'{library} {chipname} 不平衡文库相加大于设定限制')
if sizelimit and notrepeatbarcode and exclusivecostom and splibrary:
return True
return False
def assign_samples(self):
ori_library_data = list()
ori_library_df = pd.DataFrame(self.ori_data['未测'])
ori_library_df['level'] = ori_library_df.apply(self.level, axis=1)
for library, library_df in ori_library_df.groupby('#library'):
ori_library_data.append(dict(
library=library,
size=library_df['data_needed'].sum(),
split_method=library_df['split_method'].values[0],
time=library_df['time'].values[0],
level=library_df['level'].values[0],
customer=library_df['customer'].values[0],
classification=library_df['classification'].values[0],
data=library_df.to_dict('records')
))
ori_sort_data = sorted(ori_library_data, key=lambda x: (x['level'], -x['size'], x['time']))
i = 0
while ori_sort_data:
library_data = ori_sort_data[0]
chipname = f'chip{self.loc_chip_num}'
# 空白芯片直接添加
if chipname not in self.index_assignments:
self.add_new_data(chipname, library_data)
ori_sort_data.remove(library_data)
i += 1
continue
# 判断条件
if self.judge_data(chipname, library_data):
self.add_new_data(chipname, library_data, newer=False)
ori_sort_data.remove(library_data)
i += 1
else:
for j in range(len(ori_sort_data)):
newlibrary_data = ori_sort_data[j]
if self.judge_data(chipname, newlibrary_data):
ori_sort_data.remove(newlibrary_data)
i += 1
self.add_new_data(chipname, newlibrary_data, newer=False)
break
j += 1
else:
self.loc_chip_num += 1
if self.chip_size[chipname] > 1500:
self.loc_chip_num += 1
def assign_again(self):
pass
def run(self):
try:
self.assign_samples()
except Exception as e:
self.return_log.append(f'排样出错, 请联系!{e}')
self.index_assignments = {}
outputname = 'assignments_%s_%s' % (datetime.now().strftime("%m%d%H%M"), os.path.basename(self.path))
outputpath = os.path.join(self.output, 'result', outputname)
writer = pd.ExcelWriter(outputpath)
no_assign_data = list()
for chip_idx, chip_assignments in self.index_assignments.items():
self.dec_barcode_radio(chip_idx)
df = pd.DataFrame(chip_assignments)
if df['data_needed'].sum() < 1400:
no_assign_data.extend(chip_assignments)
continue
if '极致周期' in df['split_method'].values:
addname = 'X'
else:
addname = ''
df.to_excel(writer, sheet_name=addname + chip_idx, index=False)
pd.DataFrame(no_assign_data).to_excel(writer, sheet_name='未测', index=False)
if self.return_log:
pd.DataFrame(self.return_log).to_excel(writer, sheet_name='log', index=False)
writer.close()
return outputpath
if __name__ == '__main__':
start_time = time.time()
excel_file = 'example/07031754_20230703.xlsx'
output_file = ''
layout = AutoLayout(excel_file, output_file)
layout.run()
end_time = time.time()
execution_time = end_time - start_time
print(f"代码执行时间为:{execution_time}")
# server()