Added and Fixed.

This commit is contained in:
Saturneic 2019-02-05 22:37:32 +08:00
parent 1d3b8ef25f
commit 38ed7cb2e2
8 changed files with 234 additions and 42 deletions

View File

@ -66,12 +66,12 @@ public :
}
}
~SocketServer(){
close(server_sfd);
//close(server_sfd);
}
// 接受储存简单字符串
virtual ssize_t Recv(string &str) = 0;
// 接受储存二进制串
virtual ssize_t RecvRAW(char **p_rdt) = 0;
virtual ssize_t RecvRAW(char **p_rdt, Addr &taddr) = 0;
};
//客户端套接字类
@ -174,7 +174,7 @@ public:
// 接受储存简单字符串信息的数据包
ssize_t Recv(string &str);
// 接受储存二进制信息的数据包
ssize_t RecvRAW(char **p_rdt);
ssize_t RecvRAW(char **p_rdt, Addr &taddr);
// 设置非阻塞模式
void UDPSetFCNTL(void);
};

View File

@ -13,7 +13,9 @@
#include "net.h"
#include "cpart.h"
#include "cthread.h"
#include "sqlite3.h"
#include "rsa.h"
#include "rng.h"
class Server;
@ -28,9 +30,18 @@ struct compute_result{
//请求数据包
struct request {
uint64_t r_id;
rng::rng64 r_id = 0;
string type;
string data;
Addr t_addr;
request();
};
struct respond {
rng::rng64 r_id;
string type;
string data;
Addr t_addr;
};
//通用数据包类
@ -38,8 +49,11 @@ class packet{
public:
// 数据包类型
unsigned int type;
struct sockaddr_in address;
// 记录块的大小及内容所在的内存地址
vector<pair<unsigned int, void *>> buffs;
void AddBuff(void *pbuff, uint32_t size);
~packet();
};
//带标签的二进制串管理结构
@ -54,6 +68,8 @@ public:
// 信息串
char *msg = NULL;
unsigned long msg_size = 0;
// 来源ip地址
struct sockaddr_in address;
// 用简单字符串直接出适合
void setData(string str){
data = (char *)malloc(str.size());
@ -83,7 +99,7 @@ public:
// 服务器类的接收套接字对象与发送套接字对象
SocketUDPServer socket;
SocketUDPClient send_socket;
int packet_max = 30;
int packet_max = 1024;
Server(int port = 9048, string send_ip = "127.0.0.1",int send_port = 9049);
// 重新设置服务器的发送端口
@ -93,9 +109,9 @@ public:
// 将结构数据包转换成二进制串
static void Packet2Rawdata(packet &tpkt, raw_data &rdt);
// 将通用二进制串转换为通用数据包
static packet Rawdata2Packet(raw_data trdta);
static void Rawdata2Packet(packet &tpkt, raw_data &trdt);
// 释放二进制串占用的空间
static void freeRawdataServer(struct raw_data trdt);
static void freeRawdataServer(struct raw_data &trdt);
// 释放通用数据包包占用
static void freePcaketServer(struct packet tpkt);
@ -130,17 +146,27 @@ public:
class SQEServer:public Server{
protected:
// 请求数据包
list<request> req_list;
list<request *> req_list;
// 服务器公私钥
public_key_class pkc;
private_key_class prc;
public:
SQEServer(void);
SQEServer(int port = 9048);
void ProcessPacket(void);
void ProcessRequset(void);
static void Packet2Request(packet &pkt, request &req);
static void Request2Packet(packet &pkt, request &req);
};
//设置服务器守护程序的时钟
void setServerClock(Server *psvr, int clicks);
//服务器守护线程
//设置广场服务器守护程序的时钟
void setServerClockForSquare(SQEServer *psvr, int clicks);
//服务器接收数据包守护线程
void *serverDeamon(void *psvr);
//服务器处理原始数据守护进程
void *dataProcessorDeamon(void *pvcti);
//广场服务器处理数据包守护进程
void *packetProcessorDeamonForSquare(void *pvcti);
#endif /* server_h */

View File

@ -9,6 +9,8 @@
#ifndef sql_h
#define sql_h
#include "type.h"
struct SQLTable{
string name;
vector<pair<string, string>> colnums;

View File

@ -52,4 +52,7 @@ using std::stringstream;
typedef char Byte;
#define REQUSET_TYPE 100
#define RESPOND_TYPE 101
#endif /* type_h */

View File

@ -6,22 +6,32 @@
// Copyright © 2019年 Bakantu. All rights reserved.
//
#include "type.h"
#include "sql.h"
#include "net.h"
#include "server.h"
#include "rng.h"
int main(int argc, char *argv[])
{
try {
Server BServer(1081,"127.0.0.1",9048);
vector<int> fargs = {1,0,0,1};
vector<void *>args;
raw_data rwd;
rwd.setData("Hello");
BServer.SignedRawdata(&rwd, "TEXT");
while (1) {
BServer.SentRawdata(&rwd);
usleep(1000);
request nreq;
nreq.type = "client-square request";
nreq.data = "request for public key";
packet *pnpkt = new packet();
SQEServer::Request2Packet(*pnpkt, nreq);
raw_data *pnrwd = new raw_data();
Server::Packet2Rawdata(*pnpkt, *pnrwd);
BServer.SignedRawdata(pnrwd, "SPKT");
BServer.SentRawdata(pnrwd);
Server::freeRawdataServer(*pnrwd);
delete pnrwd;
Server::freePcaketServer(*pnpkt);
delete pnpkt;
}
} catch (char const *str) {

View File

@ -306,11 +306,18 @@ int server(string instruct, vector<string> &configs, vector<string> &lconfigs, v
sqlite3 *psql;
sqlite3_stmt *psqlsmt;
const char *pzTail;
initClock();
setThreadsClock();
if(targets.size() == 0){
Server nsvr;
setServerClock(&nsvr, 3);
}
else{
if(targets[0] == "square"){
SQEServer nsvr;
setServerClockForSquare(&nsvr, 3);
}
}
while(1) usleep(10000);
return 0;
}

View File

@ -10,8 +10,12 @@
#include "server.h"
extern list<clock_register> clocks_list;
pthread_mutex_t mutex,mutex_rp,mutex_pktreq;
void setServerClock(Server *psvr, int clicks){
pthread_mutex_init(&mutex, NULL);
pthread_mutex_init(&mutex_rp, NULL);
// 注册数据接收守护时钟
clock_register *pncr = new clock_register();
pncr->if_thread = true;
pncr->if_reset = true;
@ -20,11 +24,34 @@ void setServerClock(Server *psvr, int clicks){
pncr->func = serverDeamon;
pncr->arg = (void *)psvr;
newClock(pncr);
// 注册数据处理守护时钟
pncr = new clock_register();
pncr->if_thread = true;
pncr->if_reset = true;
pncr->click = clicks*2;
pncr->rawclick = clicks*2;
pncr->func = dataProcessorDeamon;
pncr->arg = (void *)psvr;
newClock(pncr);
}
void setServerClockForSquare(SQEServer *psvr, int clicks){
setServerClock(psvr, clicks);
pthread_mutex_init(&mutex_pktreq, NULL);
// 注册标准数据包处理守护时钟
clock_register *pncr = new clock_register();
pncr->if_thread = true;
pncr->if_reset = true;
pncr->click = clicks*2+3;
pncr->rawclick = clicks*2;
pncr->func = packetProcessorDeamonForSquare;
pncr->arg = (void *)psvr;
newClock(pncr);
}
Server::Server(int port, string send_ip,int send_port):socket(port),send_socket(send_ip,send_port){
socket.UDPSetFCNTL();
}
void Server::SetSendPort(int port){
@ -103,7 +130,7 @@ void Server::Packet2Rawdata(packet &tpkt, raw_data &rdt){
rdt.size = idx - data;
}
Server::Rawdata2Packet(packet &tpkt, raw_data &trdt){
void Server::Rawdata2Packet(packet &tpkt, raw_data &trdt){
char *idx = trdt.data;
// 数据包ID
uint32_t uint;
@ -116,7 +143,8 @@ Server::Rawdata2Packet(packet &tpkt, raw_data &trdt){
void *data = malloc(uint);
memcpy(data, idx, uint);
idx += uint;
tpkt.buffs.push_back({uint,data});
tpkt.AddBuff(data, uint);
free(data);
}
}
@ -141,7 +169,7 @@ compute_result CNodeServer::Packet2CPUR(packet *tpkt){
}
void Server::freeRawdataServer(struct raw_data trdt){
void Server::freeRawdataServer(struct raw_data &trdt){
free(trdt.data);
if(trdt.msg != NULL) free(trdt.msg);
}
@ -149,7 +177,6 @@ void Server::freeRawdataServer(struct raw_data trdt){
void Server::freePcaketServer(struct packet tpkt){
for(auto i = tpkt.buffs.begin(); i != tpkt.buffs.end(); i++)
free(i->second);
delete &tpkt.buffs;
}
void Server::freeCPURServer(struct compute_result tcpur){
@ -202,7 +229,7 @@ bool Server::CheckRawMsg(char *p_rdt, ssize_t size){
else return false;
}
void ProcessSignedRawMsg(char *p_rdt, ssize_t size, raw_data &rdt){
void Server::ProcessSignedRawMsg(char *p_rdt, ssize_t size, raw_data &rdt){
rdt.data = (char *)malloc(size-3*sizeof(uint32_t));
memcpy(&rdt.info, p_rdt+sizeof(uint32_t), sizeof(uint32_t));
memcpy(rdt.data, p_rdt+sizeof(uint32_t)*2, size-3*sizeof(uint32_t));
@ -218,58 +245,171 @@ void *serverDeamon(void *pvcti){
int prm = psvr->packet_max;
ssize_t tlen;
char *str = nullptr;
printf("Checking Packet.\n");
Addr taddr;
do{
tlen = psvr->socket.RecvRAW(&str);
// 加锁
if (pthread_mutex_lock(&mutex) != 0) throw "lock error";
tlen = psvr->socket.RecvRAW(&str,taddr);
if(tlen > 0){
// 记录有效数据包
if(Server::CheckRawMsg(str, tlen)){
printf("Get\n");
raw_data *ptrdt = new raw_data();
Server::ProcessSignedRawMsg(str, tlen, *ptrdt);
ptrdt->address = *(struct sockaddr_in *)taddr.RawObj();
psvr->rawdata_in.push_back(ptrdt);
}
}
free(str);
// 解锁
pthread_mutex_unlock(&mutex);
}while (tlen && prm-- > 0);
clockThreadFinish(pcti->tid);
pthread_exit(NULL);
}
void *dataProcessorDeamon(void *pvcti){
clock_thread_info *pcti = (clock_thread_info *) pvcti;
Server *psvr = (Server *) pcti->args;
psvr->ProcessRawData();
clockThreadFinish(pcti->tid);
pthread_exit(NULL);
}
void *packetProcessorDeamonForSquare(void *pvcti){
clock_thread_info *pcti = (clock_thread_info *) pvcti;
SQEServer *psvr = (SQEServer *) pcti->args;
psvr->ProcessPacket();
clockThreadFinish(pcti->tid);
pthread_exit(NULL);
}
void *requsetProcessorDeamonForSquare(void *pvcti){
clock_thread_info *pcti = (clock_thread_info *) pvcti;
SQEServer *psvr = (SQEServer *) pcti->args;
psvr->ProcessPacket();
clockThreadFinish(pcti->tid);
pthread_exit(NULL);
}
void Server::ProcessRawData(void){
for(auto prdt : rawdata_in){
if(memcmp(prdt->info, "SPKT", sizeof(uint32_t))){
// 一次性最大处理个数
int prm = 2048;
// 加锁
if (pthread_mutex_lock(&mutex) != 0) throw "lock error";
for(auto &prdt : rawdata_in){
if(prdt == nullptr) continue;
if(prm-- == 0) break;
if(!memcmp(&prdt->info, "SPKT", sizeof(uint32_t))){
// 加锁
if (pthread_mutex_lock(&mutex_rp) != 0) throw "lock error";
packet *pnpkt = new packet();
Rawdata2Packet(pnpkt,prdt);
Rawdata2Packet(*pnpkt,*prdt);
pnpkt->address = prdt->address;
packets_in.push_back(pnpkt);
delete prdt;
// 解锁
pthread_mutex_unlock(&mutex_rp);
}
else{
}
freeRawdataServer(*prdt);
delete prdt;
prdt = nullptr;
}
}
rawdata_in.clear();
// 解锁
pthread_mutex_unlock(&mutex);
if (pthread_mutex_lock(&mutex) != 0) throw "lock error";
rawdata_in.remove_if([](auto prdt){return prdt == nullptr;});
pthread_mutex_unlock(&mutex);
}
SQEServer::SQEServer(void){
void SQEServer::ProcessPacket(void){
printf("RW: %lu PKT: %lu REQ: %lu\n",rawdata_in.size(),packets_in.size(),req_list.size());
// 一次性最大处理个数
int prm = 2048;
// 加锁
if (pthread_mutex_lock(&mutex_rp) != 0) throw "lock error";
for(auto &ppkt : packets_in){
if(ppkt == nullptr) continue;
if(prm-- == 0) break;
if(ppkt->type == REQUSET_TYPE){
request *pnreq = new request();
Packet2Request(*ppkt, *pnreq);
req_list.push_back(pnreq);
}
freePcaketServer(*ppkt);
delete ppkt;
ppkt = nullptr;
}
// 解锁
pthread_mutex_unlock(&mutex_rp);
packets_in.remove_if([](auto &ppkt){return ppkt == nullptr;});
}
SQEServer::SQEServer(int port):Server(port){
if(sqlite3_open("info.db", &psql) == SQLITE_ERROR){
sql::printError(psql);
throw "database is abnormal";
}
sqlite3_stmt psqlsmt;
sqlite3_stmt *psqlsmt;
const char *pzTail;
// 从数据库获得服务器的公私钥
string sql_quote = "select sqes_public,sqes_private from server_info where rowid = 1;";
sqlite3_prepare(psql, sql_quote.data(), -1, &psqlsmt, pzTail);
sqlite3_prepare(psql, sql_quote.data(), -1, &psqlsmt, &pzTail);
if(sqlite3_step(psqlsmt) != SQLITE_ROW){
sql::printError(psql);
throw "database is abnormal";
}
Byte *tbyt = sqlite3_column_blob(psqlsmt, 0);
Byte *tbyt = (Byte *)sqlite3_column_blob(psqlsmt, 0);
memcpy(&pkc, tbyt, sizeof(public_key_class));
tbyt = sqlite3_column_blob(psqlsmt, 1);
tbyt = (Byte *)sqlite3_column_blob(psqlsmt, 1);
memcpy(&prc, tbyt, sizeof(private_key_class));
sqlite3_finalize(psqlsmt);
}
void SQEServer::Packet2Request(packet &pkt, request &req){
if(pkt.type == REQUSET_TYPE){
req.r_id = *(uint32_t *)pkt.buffs[0].second;
req.type = (const char *)pkt.buffs[1].second;
req.data = (const char *)pkt.buffs[2].second;
req.t_addr = Addr(*(struct sockaddr_in *)pkt.buffs[3].second);
}
}
void SQEServer::Request2Packet(packet &pkt, request &req){
pkt.address = *req.t_addr.Obj();
// 请求的类型标识号
pkt.type = REQUSET_TYPE;
pkt.AddBuff((void *)&req.r_id, sizeof(req.r_id));
pkt.AddBuff((void *)req.type.data(), (uint32_t)req.type.size());
pkt.AddBuff((void *)req.data.data(), (uint32_t)req.data.size());
pkt.AddBuff((void *)req.t_addr.Obj(), sizeof(struct sockaddr_in));
}
void packet::AddBuff(void *pbuff, uint32_t size){
void *pnbuff = malloc(size);
memcpy(pnbuff, pbuff, size);
buffs.push_back({size,pnbuff});
}
void Server::ProcessRequset(void){
}
request::request(){
r_id = rng::tsc_seed{}();
}
packet::~packet(){
}

View File

@ -65,14 +65,17 @@ ssize_t SocketUDPServer::Recv(string &str){
}
}
ssize_t SocketUDPServer::RecvRAW(char **p_rdt){
ssize_t SocketUDPServer::RecvRAW(char **p_rdt, Addr &taddr){
ssize_t tlen;
// 非阻塞输入
if(set_fcntl){
tlen = recvfrom(server_sfd, buff, BUFSIZ, 0, server_addr.RawObj(), server_addr.SizeP());
// 读取错误
if(tlen == -1 && errno != EAGAIN){
*p_rdt = nullptr;
printf("%d",errno);
perror("recv");
return -1;
}
// 缓冲区没有信息
@ -84,6 +87,7 @@ ssize_t SocketUDPServer::RecvRAW(char **p_rdt){
else{
*p_rdt = (char *)malloc(tlen);
memcpy(*p_rdt, buff, tlen);
taddr = server_addr;
return tlen;
}
}