pipeline/script/sample_post.py

152 lines
3.8 KiB
Python
Raw Normal View History

2023-08-25 10:06:31 +08:00
#!/usr/bin/env python3
# -*- coding: UTF-8 -*-
"""
Created on: 2021-09-07
@author: cjs
# 用途对接lims系统post出样本信息
# 版本0.0.1
# 最后编辑日期: 2021-09-07
"""
from cjs_test.cjs_logger import Logger
import datetime
#import traceback
import os
import sys
import json
import requests
# 运行模式
RUN_MODE = 'formal'
# 运行参数
URL_DD = {'login': {'test': 'https://gene.jmdna.wiki361.com/user/login/auth',
'formal': 'https://dna.jmdna.com/user/login/auth'},
'sample':
{'test':
'https://gene.jmdna.wiki361.com/sample/sample/search?report=true',
'formal':
'https://dna.jmdna.com/sample/sample/search?report=true'}}
USER_DD = {'test': {'USER_NAME': 'zlseq', 'USER_PASS': 'seq345'},
'formal': {'USER_NAME': 'zlseq', 'USER_PASS': 'seq345@JM'}}
# 固定参数
USER_NAME = USER_DD[RUN_MODE]['USER_NAME']
USER_PASS = USER_DD[RUN_MODE]['USER_PASS']
# 全局参数
GLog = None
StartTime = None
SampleID = ''
Output_dir = ''
def usage(df_exe):
print("Usage:")
print("%s -s sample_id -o outdir" % df_exe)
sys.exit(0)
def Get_Argvs():
global GLog
global StartTime
global SampleID
global Output_dir
argvs = sys.argv
file_real = os.path.realpath(argvs[0])
bin_name = os.path.basename(file_real)
# StartTime = datetime.datetime.now()
# ymd = StartTime.__format__('%Y%m%d_%H%M%S')
# print(argvs)
if len(argvs) < 5:
print('参数列表数量不对, %s' % argvs)
usage(bin_name)
else:
# 参数解析
argv_index = 1
for argv in argvs[1:]:
argv_index += 1
argv = argv.upper()
# 必选参数获取
if argv == '-S':
SampleID = argvs[argv_index]
elif argv == '-O':
Output_dir = os.path.realpath(argvs[argv_index])
# 新建日志
if not os.path.exists(Output_dir):
os.makedirs(Output_dir)
# log_base = '%s_%s.log' % (bin_name, ymd)
# log_full = os.path.join(Output_dir, log_base)
# GLog = Logger(log_full, mode='w')
# GLog.info('argvs, %s' % argvs)
# GLog.info('start')
# 核对参数解析结果
if SampleID == '':
print('参数中未能解析出样本ID')
usage(bin_name)
def Get_Token():
res_token = ''
post_dict = {"userName": USER_NAME, "password": USER_PASS}
url = URL_DD['login'][RUN_MODE]
response = requests.post(url, data=json.dumps(post_dict))
req_dd = json.loads(response.text)
# print(req_dd)
req_code = req_dd.get('code')
if req_code != -1:
print(req_dd)
else:
res_token = req_dd.get('data').get('accessToken')
if res_token == '':
print(req_dd)
# 写文件
# json_out = 'accessToken.json'
# with open(json_out, 'w', encoding='utf8') as ff:
# json.dump(req_dd, ff, ensure_ascii=False, indent=4)
return res_token
def Post_SampleID(df_token):
post_dict = {"search": {"barcode": SampleID}, "accessToken": df_token}
url = URL_DD['sample'][RUN_MODE]
response = requests.post(url, data=json.dumps(post_dict))
req_dd = json.loads(response.text)
req_nums = req_dd.get('total')
# if req_nums == '1':
# print(req_dd)
# 写文件
json_out = ''.join([Output_dir,'/qc/',SampleID,'_post.json'])
with open(json_out, 'w', encoding='utf8') as ff:
json.dump(req_dd, ff, ensure_ascii=False, indent=4)
if __name__ == '__main__':
Get_Argvs()
token = Get_Token()
Post_SampleID(token)
# print(token)
# try:
# pass
# except BaseException:
# print(traceback.format_exc())
# endtime = datetime.datetime.now()