import os import shutil import requests import datetime import time import hashlib import sqlite3 import pandas import threading import logging as log server_url = "http://39.100.94.111:8083" openid = "gpu-server-test1" password = "1e327b070ab43fd071768a4d474f016adbbf3ea475577fe66a505d9e33b24f2f" token = None # 客户端代码 client_code = "dc9fbb4f4f0b84fa903058991af60e73556494af8a02ef69fb6a93217729f04b" # 护照认证码 idcode = None # 时间戳 timestamp = "" # 单次最大处理句数 max_stn_num = 20000 # 当前处理的bpt的序号 bpt_id = 0 # STNS stn_list = [] # 输入数据存储表 predict_table = "predict_data" # 模型处理结果输出文件夹 result_out_dir = "./tmp/eppredict" # 初始化标志位 base_init = False log.basicConfig(filename=None, format="%(asctime)s %(levelname)s [%(funcName)s] : %(message)s", level=log.INFO) def get_timestamp(): return str(int(time.mktime(datetime.datetime.now().timetuple())) * 1000) base_headers = {"timestamp": get_timestamp(), "X-Requested-With": ""} token_headers = {"timestamp": get_timestamp(), "X-Requested-With": "", "signed": "", "openid": openid} # url对象 def url_parser(url): return server_url + "/" + url # 计算随机特征值 def calculate_random_code(): return hashlib.sha1("RandomCode [{0}][{1}][{2}]".format(openid, get_timestamp(), client_code).encode("utf-8")) \ .hexdigest() # 计算客户端签名 def calculate_signed(): return hashlib.sha1("SIGN [{0}][{1}][{2}]".format(openid, calculate_random_code(), token).encode("utf-8")) \ .hexdigest() # 检查用户是否存在 def user_checker(): log.info("Check User Existence: openid" + str(openid)) checker_param = {"openid": openid} base_headers["timestamp"] = get_timestamp() res = requests.get(url=url_parser("user"), headers=base_headers, params=checker_param) if res.status_code == 404: log.warning("User Not Exist: openid" + str(openid)) return False else: log.info("User Exist: openid " + str(openid)) return True # 注册用户 def user_register(): if not user_checker(): log.info("Try Creating New User: openid " + str(openid)) register_json = {"openid": openid, "password": password} register_param = {"clientCode": client_code} base_headers["timestamp"] = get_timestamp() res = requests.post(url=url_parser("user/cs"), headers=base_headers, json=register_json, params=register_param) respond_json = res.json() if res.status_code == 201 and respond_json["openid"] == openid: log.info("User Creation Success: openid " + str(openid)) return False else: log.error("User Creation Failed: openid " + str(openid)) return True # 获得token def get_token(): if user_checker(): log.info("Try Getting New Token") login_json = {"openid": openid, "password": password, "clientCode": client_code} res = requests.post(url=url_parser("user/login"), headers=base_headers, json=login_json) respond_json = res.json() if res.status_code == 200 and respond_json["info"] == "Authentication Success": global token token = respond_json["data"]["token"] log.info("Succeed In Getting New Token" + str(token)) else: if base_init is True: user_register() log.error("Fail To Get New Token") # 获得子服务器护照 def get_csp(): global idcode if token is not None: log.info("Try Getting New CSP") # 计算客户端签名 token_headers["signed"] = calculate_signed() token_headers["timestamp"] = get_timestamp() res = requests.post(url=url_parser("cs"), headers=token_headers) respond_json = res.json() log.debug(respond_json) # 正常返回 if res.status_code == 200: # 无权限检查 try: idcode = respond_json["identityCode"] log.info("Succeed In Getting CSP: idcode " + str(idcode)) except KeyError: if respond_json["status"] == 401: log.warning("Token OUT OF DATE: token " + str(token)) get_token() return # 无权限返回 elif res.status_code == 401: # 重新获取token log.warning("Token Maybe OUT OF DATE: token " + str(token)) log.info("Try to Get New Token") get_token() else: log.error("Failed to get New CSP") else: get_token() # 更新签证 def update_csp(): if idcode is not None: token_headers["signed"] = calculate_signed() token_headers["timestamp"] = get_timestamp() res = requests.put(url=url_parser("cs"), headers=token_headers, params={"idcode": idcode}) respond_json = res.json() log.debug(respond_json) # 成功返回 if res.status_code == 200 and respond_json["expired"] is False: log.info("Succeed IN Updating CSP: idcode " + str(idcode)) log.info("CSP Last Update Time: " + str(respond_json["lastUpdateTime"])) elif res.status_code == 401: # 尝试获得新的token log.warning("Unauthorized Status Code: Try to Get New Token") get_token() else: # 重新获得护照 log.warning("CSP Maybe OUT OF DATE: idcode " + str(idcode)) get_csp() # 放弃批处理任务 def giving_up_bpt(): global bpt_id global stn_list try_count = 3 while try_count < 3: try_count += 1 # 标记任务执行失败 res = requests.put(url=url_parser("cs/bpt"), headers=token_headers, params={"idcode": idcode, "bptId": bpt_id, "status": False}, json=[]) if res.status_code == 201: log.info("Marking Task Failed Successful: bertId ", bpt_id) return True elif res.status_code == 401: # 尝试获得新的token log.warning("Unauthorized Status Code: Try to Get New Token") get_token() else: if try_count >= 3: log.error("Marking Task Failed Eventually Failed: bertId ", bpt_id) log.warning("Connection Maybe Unstable") return False log.warning("Failed and Try: count " + str(try_count)) # 清空计算数据 bpt_id = None stn_list = [] # 从主服务器获得批处理任务 def get_bpt_from_server(): global max_stn_num global idcode if idcode is not None: log.info("Try Getting BPT From Server...") token_headers["signed"] = calculate_signed() token_headers["timestamp"] = get_timestamp() res = requests.get(url=url_parser("cs/bpt"), headers=token_headers, params={"idcode": idcode, "maxStnNum": int(max_stn_num)}) respond_json = res.json() print(res.json()) if res.status_code == 200: global bpt_id try: bpt_id = respond_json["id"] except KeyError: if respond_json["status"] == 401: get_token() return # 如果没有批处理任务 if bpt_id is None: log.info("No BPT Task For Now") return stns = respond_json["stns"] if len(stns) == 0: log.info("STNS IS EMPTY, Giving UP") giving_up_bpt() return log.info("Get BPT Task: bptId " + str(bpt_id)) global stn_list stn_list = stns conn = sqlite3.connect(r".\bptdata.db") # 处理数据 cursor = conn.cursor() cursor.execute("DELETE FROM {0}".format(predict_table)) log.info("Processing Bert Predict Data...") for stn in stns: sql = "INSERT INTO {0} (id, text) values (?, ?)".format(predict_table) cursor.execute(sql, [stn["stnId"], stn["text"]]) conn.commit() conn.close() log.info("Finished in Processing Bert Predict Data") result = execute_bert_predict() if result is True: if processing_bert_result() is True: log.info("BPT Execution Success: bptId " + str(bpt_id)) else: log.info("BPT Execution Eventually Failed: bptId " + str(bpt_id)) else: log.error("Bert Model Execution Failed") log.info("Try Giving Up BPT Task: bptId " + str(bpt_id)) giving_up_bpt() log.info("Get Status Code: " + str(res.status_code)) # 清空计算数据 bpt_id = None stn_list = [] elif res.status_code == 400: if respond_json["data"]["exception"] == "org.codedream.epaper.exception.badrequest.AuthExpiredException": print("Auth Expired Exception: Try to Get New CSP") get_csp() return else: print("Unknown Exception") elif res.status_code == 401: # 尝试获得新的token log.warning("Unauthorized Status Code: Try to Get New Token") get_token() elif res.status_code == 500: log.warning("Remote Server Error: Inner Server Error") print(res.json()) else: # 尝试获得护照 get_csp() # 初始化数据库环境 def sqlite_create_table(): conn = sqlite3.connect(r".\bptdata.db") cursor = conn.cursor() create_tb_cmd = "CREATE TABLE IF NOT EXISTS {0}" \ "(id INT PRIMARY KEY," \ "text INT)".format(predict_table) cursor.execute(create_tb_cmd) cursor.execute("DELETE FROM {0}".format(predict_table)) conn.commit() conn.close() # 启动BERT神经网络模型 def execute_bert_predict(): if os.path.exists(result_out_dir): shutil.rmtree(result_out_dir) log.info("BERT Model Executing...") os.system("python run_classifier.py " "--task_name=eppdt " "--do_predict=true " "--data_dir=./tmp " "--vocab_file=./chinese_wwm_ext_L-12_H-768_A-12/vocab.txt " "--bert_config_file=./chinese_wwm_ext_L-12_H-768_A-12/bert_config.json " "--init_checkpoint=./tmp/epout/model.ckpt-14062 " "--max_seq_length=64 " "--output_dir=./tmp/eppredict/ > bert_out.log 2>&1") result_list = os.listdir(result_out_dir) log.info("BERT Model Execution Finished.") if "test_results.tsv" not in result_list: return False else: return True # 处理模型计算结果 def processing_bert_result(): result = pandas.read_csv(result_out_dir + '/test_results.tsv', sep='\t', header=None) token_headers["timestamp"] = get_timestamp() token_headers["signed"] = calculate_signed() bpt_result_json = [] idx = 0 for i, row in result.iterrows(): bpt_result_json.append({"stnid": stn_list[idx]["stnId"], "tagPossible": [row[0], row[1], row[2]]}) idx += 1 log.debug("Bert Result Json") log.debug(bpt_result_json) log.info("Processing BERT Model Result Successful") # 尝试3次 try_count = 0 while try_count < 3: try_count += 1 log.info("Uploading BERT Model Result...") res = requests.put(url=url_parser("cs/bpt"), headers=token_headers, params={"idcode": idcode, "bptId": bpt_id, "status": True}, json=bpt_result_json) if res.status_code == 201: log.info("Uploading Successful: bertId " + str(bpt_id)) return True elif res.status_code == 401: # 尝试获得新的token log.warning("Unauthorized Status Code: Try to Get New Token") get_token() else: if try_count >= 3: log.error("Uploading Eventually Failed: bertId " + str(bpt_id)) log.warning("Connection Maybe Unstable") return False log.warning("Failed and Try: count " + str(try_count)) # 签证更新多线程定时器 def update_csp_timer(): log.info("UPDATE CSP TIMER STARTED") try: update_csp() except: log.error("Exception Thrown, Restarting Timer...") finally: t = threading.Timer(60, update_csp_timer) t.start() # 批处理任务多线程定时器 def get_bpt_timer(): log.info("GET BPT TIMER STARTED") try: get_bpt_from_server() except: log.error("Exception Thrown, Restarting Timer...") finally: t = threading.Timer(15, get_bpt_timer) t.start() # 初始化工作 def init(): global base_init sqlite_create_table() user_register() get_token() get_csp() base_init = True # 初始化定时器 def init_timer(): update_csp_timer() get_bpt_timer() if __name__ == "__main__": try_time = 0 while try_time < 3: try: init() try_time = 3 except: try_time += 1 time.sleep(5) init_timer() while True: time.sleep(5)