ePaper-Distrubuted-GPU-Calc.../server.py

418 lines
13 KiB
Python
Raw Normal View History

2020-08-31 17:13:29 +00:00
import os
import shutil
import requests
import datetime
import time
import hashlib
import sqlite3
import pandas
import threading
import logging as log
server_url = "http://39.100.94.111:8083"
openid = "gpu-server-test1"
password = "1e327b070ab43fd071768a4d474f016adbbf3ea475577fe66a505d9e33b24f2f"
token = None
# 客户端代码
client_code = "dc9fbb4f4f0b84fa903058991af60e73556494af8a02ef69fb6a93217729f04b"
# 护照认证码
idcode = None
# 时间戳
timestamp = ""
# 单次最大处理句数
max_stn_num = 20000
# 当前处理的bpt的序号
bpt_id = 0
# STNS
stn_list = []
# 输入数据存储表
predict_table = "predict_data"
# 模型处理结果输出文件夹
result_out_dir = "./tmp/eppredict"
# 初始化标志位
base_init = False
log.basicConfig(filename=None, format="%(asctime)s %(levelname)s [%(funcName)s] : %(message)s", level=log.INFO)
def get_timestamp():
return str(int(time.mktime(datetime.datetime.now().timetuple())) * 1000)
base_headers = {"timestamp": get_timestamp(), "X-Requested-With": ""}
token_headers = {"timestamp": get_timestamp(), "X-Requested-With": "", "signed": "", "openid": openid}
# url对象
def url_parser(url):
return server_url + "/" + url
# 计算随机特征值
def calculate_random_code():
return hashlib.sha1("RandomCode [{0}][{1}][{2}]".format(openid, get_timestamp(), client_code).encode("utf-8")) \
.hexdigest()
# 计算客户端签名
def calculate_signed():
return hashlib.sha1("SIGN [{0}][{1}][{2}]".format(openid, calculate_random_code(), token).encode("utf-8")) \
.hexdigest()
# 检查用户是否存在
def user_checker():
log.info("Check User Existence: openid" + str(openid))
checker_param = {"openid": openid}
base_headers["timestamp"] = get_timestamp()
res = requests.get(url=url_parser("user"), headers=base_headers, params=checker_param)
if res.status_code == 404:
log.warning("User Not Exist: openid" + str(openid))
return False
else:
log.info("User Exist: openid " + str(openid))
return True
# 注册用户
def user_register():
if not user_checker():
log.info("Try Creating New User: openid " + str(openid))
register_json = {"openid": openid, "password": password}
register_param = {"clientCode": client_code}
base_headers["timestamp"] = get_timestamp()
res = requests.post(url=url_parser("user/cs"), headers=base_headers, json=register_json, params=register_param)
respond_json = res.json()
if res.status_code == 201 and respond_json["openid"] == openid:
log.info("User Creation Success: openid " + str(openid))
return False
else:
log.error("User Creation Failed: openid " + str(openid))
return True
# 获得token
def get_token():
if user_checker():
log.info("Try Getting New Token")
login_json = {"openid": openid, "password": password, "clientCode": client_code}
res = requests.post(url=url_parser("user/login"), headers=base_headers, json=login_json)
respond_json = res.json()
if res.status_code == 200 and respond_json["info"] == "Authentication Success":
global token
token = respond_json["data"]["token"]
log.info("Succeed In Getting New Token" + str(token))
else:
if base_init is True:
user_register()
log.error("Fail To Get New Token")
# 获得子服务器护照
def get_csp():
global idcode
if token is not None:
log.info("Try Getting New CSP")
# 计算客户端签名
token_headers["signed"] = calculate_signed()
token_headers["timestamp"] = get_timestamp()
res = requests.post(url=url_parser("cs"), headers=token_headers)
respond_json = res.json()
log.debug(respond_json)
# 正常返回
if res.status_code == 200:
# 无权限检查
try:
idcode = respond_json["identityCode"]
log.info("Succeed In Getting CSP: idcode " + str(idcode))
except KeyError:
if respond_json["status"] == 401:
log.warning("Token OUT OF DATE: token " + str(token))
get_token()
return
# 无权限返回
elif res.status_code == 401:
# 重新获取token
log.warning("Token Maybe OUT OF DATE: token " + str(token))
log.info("Try to Get New Token")
get_token()
else:
log.error("Failed to get New CSP")
else:
get_token()
# 更新签证
def update_csp():
if idcode is not None:
token_headers["signed"] = calculate_signed()
token_headers["timestamp"] = get_timestamp()
res = requests.put(url=url_parser("cs"), headers=token_headers, params={"idcode": idcode})
respond_json = res.json()
log.debug(respond_json)
# 成功返回
if res.status_code == 200 and respond_json["expired"] is False:
log.info("Succeed IN Updating CSP: idcode " + str(idcode))
log.info("CSP Last Update Time: " + str(respond_json["lastUpdateTime"]))
elif res.status_code == 401:
# 尝试获得新的token
log.warning("Unauthorized Status Code: Try to Get New Token")
get_token()
else:
# 重新获得护照
log.warning("CSP Maybe OUT OF DATE: idcode " + str(idcode))
get_csp()
# 放弃批处理任务
def giving_up_bpt():
global bpt_id
global stn_list
try_count = 3
while try_count < 3:
try_count += 1
# 标记任务执行失败
res = requests.put(url=url_parser("cs/bpt"),
headers=token_headers,
params={"idcode": idcode, "bptId": bpt_id, "status": False},
json=[])
if res.status_code == 201:
log.info("Marking Task Failed Successful: bertId ", bpt_id)
return True
elif res.status_code == 401:
# 尝试获得新的token
log.warning("Unauthorized Status Code: Try to Get New Token")
get_token()
else:
if try_count >= 3:
log.error("Marking Task Failed Eventually Failed: bertId ", bpt_id)
log.warning("Connection Maybe Unstable")
return False
log.warning("Failed and Try: count " + str(try_count))
# 清空计算数据
bpt_id = None
stn_list = []
# 从主服务器获得批处理任务
def get_bpt_from_server():
global max_stn_num
global idcode
if idcode is not None:
log.info("Try Getting BPT From Server...")
token_headers["signed"] = calculate_signed()
token_headers["timestamp"] = get_timestamp()
res = requests.get(url=url_parser("cs/bpt"),
headers=token_headers,
params={"idcode": idcode, "maxStnNum": int(max_stn_num)})
respond_json = res.json()
print(res.json())
if res.status_code == 200:
global bpt_id
try:
bpt_id = respond_json["id"]
except KeyError:
if respond_json["status"] == 401:
get_token()
return
# 如果没有批处理任务
if bpt_id is None:
log.info("No BPT Task For Now")
return
stns = respond_json["stns"]
if len(stns) == 0:
log.info("STNS IS EMPTY, Giving UP")
giving_up_bpt()
return
log.info("Get BPT Task: bptId " + str(bpt_id))
global stn_list
stn_list = stns
conn = sqlite3.connect(r".\bptdata.db")
# 处理数据
cursor = conn.cursor()
cursor.execute("DELETE FROM {0}".format(predict_table))
log.info("Processing Bert Predict Data...")
for stn in stns:
sql = "INSERT INTO {0} (id, text) values (?, ?)".format(predict_table)
cursor.execute(sql, [stn["stnId"], stn["text"]])
conn.commit()
conn.close()
log.info("Finished in Processing Bert Predict Data")
result = execute_bert_predict()
if result is True:
if processing_bert_result() is True:
log.info("BPT Execution Success: bptId " + str(bpt_id))
else:
log.info("BPT Execution Eventually Failed: bptId " + str(bpt_id))
else:
log.error("Bert Model Execution Failed")
log.info("Try Giving Up BPT Task: bptId " + str(bpt_id))
giving_up_bpt()
log.info("Get Status Code: " + str(res.status_code))
# 清空计算数据
bpt_id = None
stn_list = []
elif res.status_code == 400:
if respond_json["data"]["exception"] == "org.codedream.epaper.exception.badrequest.AuthExpiredException":
print("Auth Expired Exception: Try to Get New CSP")
get_csp()
return
else:
print("Unknown Exception")
elif res.status_code == 401:
# 尝试获得新的token
log.warning("Unauthorized Status Code: Try to Get New Token")
get_token()
elif res.status_code == 500:
log.warning("Remote Server Error: Inner Server Error")
print(res.json())
else:
# 尝试获得护照
get_csp()
# 初始化数据库环境
def sqlite_create_table():
conn = sqlite3.connect(r".\bptdata.db")
cursor = conn.cursor()
create_tb_cmd = "CREATE TABLE IF NOT EXISTS {0}" \
"(id INT PRIMARY KEY," \
"text INT)".format(predict_table)
cursor.execute(create_tb_cmd)
cursor.execute("DELETE FROM {0}".format(predict_table))
conn.commit()
conn.close()
# 启动BERT神经网络模型
def execute_bert_predict():
if os.path.exists(result_out_dir):
shutil.rmtree(result_out_dir)
log.info("BERT Model Executing...")
os.system("python run_classifier.py "
"--task_name=eppdt "
"--do_predict=true "
"--data_dir=./tmp "
"--vocab_file=./chinese_wwm_ext_L-12_H-768_A-12/vocab.txt "
"--bert_config_file=./chinese_wwm_ext_L-12_H-768_A-12/bert_config.json "
"--init_checkpoint=./tmp/epout/model.ckpt-14062 "
"--max_seq_length=64 "
"--output_dir=./tmp/eppredict/ > bert_out.log 2>&1")
result_list = os.listdir(result_out_dir)
log.info("BERT Model Execution Finished.")
if "test_results.tsv" not in result_list:
return False
else:
return True
# 处理模型计算结果
def processing_bert_result():
result = pandas.read_csv(result_out_dir + '/test_results.tsv', sep='\t', header=None)
token_headers["timestamp"] = get_timestamp()
token_headers["signed"] = calculate_signed()
bpt_result_json = []
idx = 0
for i, row in result.iterrows():
bpt_result_json.append({"stnid": stn_list[idx]["stnId"], "tagPossible": [row[0], row[1], row[2]]})
idx += 1
log.debug("Bert Result Json")
log.debug(bpt_result_json)
log.info("Processing BERT Model Result Successful")
# 尝试3次
try_count = 0
while try_count < 3:
try_count += 1
log.info("Uploading BERT Model Result...")
res = requests.put(url=url_parser("cs/bpt"),
headers=token_headers,
params={"idcode": idcode, "bptId": bpt_id, "status": True},
json=bpt_result_json)
if res.status_code == 201:
log.info("Uploading Successful: bertId " + str(bpt_id))
return True
elif res.status_code == 401:
# 尝试获得新的token
log.warning("Unauthorized Status Code: Try to Get New Token")
get_token()
else:
if try_count >= 3:
log.error("Uploading Eventually Failed: bertId " + str(bpt_id))
log.warning("Connection Maybe Unstable")
return False
log.warning("Failed and Try: count " + str(try_count))
# 签证更新多线程定时器
def update_csp_timer():
log.info("UPDATE CSP TIMER STARTED")
try:
update_csp()
except:
log.error("Exception Thrown, Restarting Timer...")
finally:
t = threading.Timer(60, update_csp_timer)
t.start()
# 批处理任务多线程定时器
def get_bpt_timer():
log.info("GET BPT TIMER STARTED")
try:
get_bpt_from_server()
except:
log.error("Exception Thrown, Restarting Timer...")
finally:
t = threading.Timer(15, get_bpt_timer)
t.start()
# 初始化工作
def init():
global base_init
sqlite_create_table()
user_register()
get_token()
get_csp()
base_init = True
# 初始化定时器
def init_timer():
update_csp_timer()
get_bpt_timer()
if __name__ == "__main__":
try_time = 0
while try_time < 3:
try:
init()
try_time = 3
except:
try_time += 1
time.sleep(5)
init_timer()
while True:
time.sleep(5)