From 38ed7cb2e2c77588c334ab4cc15c7dad006ce816 Mon Sep 17 00:00:00 2001 From: Saturneic Date: Tue, 5 Feb 2019 22:37:32 +0800 Subject: [PATCH] Added and Fixed. --- include/net.h | 6 +- include/server.h | 40 +++++++++-- include/sql.h | 2 + include/type.h | 3 + src/client.cpp | 26 ++++--- src/main.cpp | 13 +++- src/server.cpp | 180 +++++++++++++++++++++++++++++++++++++++++------ src/socket.cpp | 6 +- 8 files changed, 234 insertions(+), 42 deletions(-) diff --git a/include/net.h b/include/net.h index 5fb41fd..a04914d 100644 --- a/include/net.h +++ b/include/net.h @@ -66,12 +66,12 @@ public : } } ~SocketServer(){ - close(server_sfd); + //close(server_sfd); } // 接受储存简单字符串 virtual ssize_t Recv(string &str) = 0; // 接受储存二进制串 - virtual ssize_t RecvRAW(char **p_rdt) = 0; + virtual ssize_t RecvRAW(char **p_rdt, Addr &taddr) = 0; }; //客户端套接字类 @@ -174,7 +174,7 @@ public: // 接受储存简单字符串信息的数据包 ssize_t Recv(string &str); // 接受储存二进制信息的数据包 - ssize_t RecvRAW(char **p_rdt); + ssize_t RecvRAW(char **p_rdt, Addr &taddr); // 设置非阻塞模式 void UDPSetFCNTL(void); }; diff --git a/include/server.h b/include/server.h index 0d323e9..28d139f 100644 --- a/include/server.h +++ b/include/server.h @@ -13,7 +13,9 @@ #include "net.h" #include "cpart.h" #include "cthread.h" +#include "sqlite3.h" #include "rsa.h" +#include "rng.h" class Server; @@ -28,9 +30,18 @@ struct compute_result{ //请求数据包 struct request { - uint64_t r_id; + rng::rng64 r_id = 0; string type; string data; + Addr t_addr; + request(); +}; + +struct respond { + rng::rng64 r_id; + string type; + string data; + Addr t_addr; }; //通用数据包类 @@ -38,8 +49,11 @@ class packet{ public: // 数据包类型 unsigned int type; + struct sockaddr_in address; // 记录块的大小及内容所在的内存地址 vector> buffs; + void AddBuff(void *pbuff, uint32_t size); + ~packet(); }; //带标签的二进制串管理结构 @@ -54,6 +68,8 @@ public: // 信息串 char *msg = NULL; unsigned long msg_size = 0; +// 来源ip地址 + struct sockaddr_in address; // 用简单字符串直接出适合 void setData(string str){ data = (char *)malloc(str.size()); @@ -83,7 +99,7 @@ public: // 服务器类的接收套接字对象与发送套接字对象 SocketUDPServer socket; SocketUDPClient send_socket; - int packet_max = 30; + int packet_max = 1024; Server(int port = 9048, string send_ip = "127.0.0.1",int send_port = 9049); // 重新设置服务器的发送端口 @@ -93,9 +109,9 @@ public: // 将结构数据包转换成二进制串 static void Packet2Rawdata(packet &tpkt, raw_data &rdt); // 将通用二进制串转换为通用数据包 - static packet Rawdata2Packet(raw_data trdta); + static void Rawdata2Packet(packet &tpkt, raw_data &trdt); // 释放二进制串占用的空间 - static void freeRawdataServer(struct raw_data trdt); + static void freeRawdataServer(struct raw_data &trdt); // 释放通用数据包包占用 static void freePcaketServer(struct packet tpkt); @@ -130,17 +146,27 @@ public: class SQEServer:public Server{ protected: // 请求数据包 - list req_list; + list req_list; // 服务器公私钥 public_key_class pkc; private_key_class prc; public: - SQEServer(void); + SQEServer(int port = 9048); + void ProcessPacket(void); + void ProcessRequset(void); + static void Packet2Request(packet &pkt, request &req); + static void Request2Packet(packet &pkt, request &req); }; //设置服务器守护程序的时钟 void setServerClock(Server *psvr, int clicks); -//服务器守护线程 +//设置广场服务器守护程序的时钟 +void setServerClockForSquare(SQEServer *psvr, int clicks); +//服务器接收数据包守护线程 void *serverDeamon(void *psvr); +//服务器处理原始数据守护进程 +void *dataProcessorDeamon(void *pvcti); +//广场服务器处理数据包守护进程 +void *packetProcessorDeamonForSquare(void *pvcti); #endif /* server_h */ diff --git a/include/sql.h b/include/sql.h index 6ae4336..89000c3 100644 --- a/include/sql.h +++ b/include/sql.h @@ -9,6 +9,8 @@ #ifndef sql_h #define sql_h +#include "type.h" + struct SQLTable{ string name; vector> colnums; diff --git a/include/type.h b/include/type.h index 4320017..75e199e 100644 --- a/include/type.h +++ b/include/type.h @@ -52,4 +52,7 @@ using std::stringstream; typedef char Byte; +#define REQUSET_TYPE 100 +#define RESPOND_TYPE 101 + #endif /* type_h */ diff --git a/src/client.cpp b/src/client.cpp index 6fa5c3f..07d08e4 100644 --- a/src/client.cpp +++ b/src/client.cpp @@ -6,22 +6,32 @@ // Copyright © 2019年 Bakantu. All rights reserved. // - +#include "type.h" +#include "sql.h" #include "net.h" #include "server.h" +#include "rng.h" int main(int argc, char *argv[]) { try { Server BServer(1081,"127.0.0.1",9048); - vector fargs = {1,0,0,1}; - vectorargs; - raw_data rwd; - rwd.setData("Hello"); - BServer.SignedRawdata(&rwd, "TEXT"); + while (1) { - BServer.SentRawdata(&rwd); - usleep(1000); + request nreq; + nreq.type = "client-square request"; + nreq.data = "request for public key"; + packet *pnpkt = new packet(); + SQEServer::Request2Packet(*pnpkt, nreq); + raw_data *pnrwd = new raw_data(); + Server::Packet2Rawdata(*pnpkt, *pnrwd); + BServer.SignedRawdata(pnrwd, "SPKT"); + BServer.SentRawdata(pnrwd); + Server::freeRawdataServer(*pnrwd); + delete pnrwd; + Server::freePcaketServer(*pnpkt); + delete pnpkt; + } } catch (char const *str) { diff --git a/src/main.cpp b/src/main.cpp index 55b630b..24de971 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -306,11 +306,18 @@ int server(string instruct, vector &configs, vector &lconfigs, v sqlite3 *psql; sqlite3_stmt *psqlsmt; const char *pzTail; - initClock(); setThreadsClock(); - Server nsvr; - setServerClock(&nsvr, 3); + if(targets.size() == 0){ + Server nsvr; + setServerClock(&nsvr, 3); + } + else{ + if(targets[0] == "square"){ + SQEServer nsvr; + setServerClockForSquare(&nsvr, 3); + } + } while(1) usleep(10000); return 0; } diff --git a/src/server.cpp b/src/server.cpp index 9910820..ec59f23 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -10,8 +10,12 @@ #include "server.h" extern list clocks_list; +pthread_mutex_t mutex,mutex_rp,mutex_pktreq; void setServerClock(Server *psvr, int clicks){ + pthread_mutex_init(&mutex, NULL); + pthread_mutex_init(&mutex_rp, NULL); +// 注册数据接收守护时钟 clock_register *pncr = new clock_register(); pncr->if_thread = true; pncr->if_reset = true; @@ -20,11 +24,34 @@ void setServerClock(Server *psvr, int clicks){ pncr->func = serverDeamon; pncr->arg = (void *)psvr; newClock(pncr); + +// 注册数据处理守护时钟 + pncr = new clock_register(); + pncr->if_thread = true; + pncr->if_reset = true; + pncr->click = clicks*2; + pncr->rawclick = clicks*2; + pncr->func = dataProcessorDeamon; + pncr->arg = (void *)psvr; + newClock(pncr); +} + +void setServerClockForSquare(SQEServer *psvr, int clicks){ + setServerClock(psvr, clicks); + pthread_mutex_init(&mutex_pktreq, NULL); +// 注册标准数据包处理守护时钟 + clock_register *pncr = new clock_register(); + pncr->if_thread = true; + pncr->if_reset = true; + pncr->click = clicks*2+3; + pncr->rawclick = clicks*2; + pncr->func = packetProcessorDeamonForSquare; + pncr->arg = (void *)psvr; + newClock(pncr); } Server::Server(int port, string send_ip,int send_port):socket(port),send_socket(send_ip,send_port){ socket.UDPSetFCNTL(); - } void Server::SetSendPort(int port){ @@ -103,7 +130,7 @@ void Server::Packet2Rawdata(packet &tpkt, raw_data &rdt){ rdt.size = idx - data; } -Server::Rawdata2Packet(packet &tpkt, raw_data &trdt){ +void Server::Rawdata2Packet(packet &tpkt, raw_data &trdt){ char *idx = trdt.data; // 数据包ID uint32_t uint; @@ -116,7 +143,8 @@ Server::Rawdata2Packet(packet &tpkt, raw_data &trdt){ void *data = malloc(uint); memcpy(data, idx, uint); idx += uint; - tpkt.buffs.push_back({uint,data}); + tpkt.AddBuff(data, uint); + free(data); } } @@ -141,7 +169,7 @@ compute_result CNodeServer::Packet2CPUR(packet *tpkt){ } -void Server::freeRawdataServer(struct raw_data trdt){ +void Server::freeRawdataServer(struct raw_data &trdt){ free(trdt.data); if(trdt.msg != NULL) free(trdt.msg); } @@ -149,7 +177,6 @@ void Server::freeRawdataServer(struct raw_data trdt){ void Server::freePcaketServer(struct packet tpkt){ for(auto i = tpkt.buffs.begin(); i != tpkt.buffs.end(); i++) free(i->second); - delete &tpkt.buffs; } void Server::freeCPURServer(struct compute_result tcpur){ @@ -202,7 +229,7 @@ bool Server::CheckRawMsg(char *p_rdt, ssize_t size){ else return false; } -void ProcessSignedRawMsg(char *p_rdt, ssize_t size, raw_data &rdt){ +void Server::ProcessSignedRawMsg(char *p_rdt, ssize_t size, raw_data &rdt){ rdt.data = (char *)malloc(size-3*sizeof(uint32_t)); memcpy(&rdt.info, p_rdt+sizeof(uint32_t), sizeof(uint32_t)); memcpy(rdt.data, p_rdt+sizeof(uint32_t)*2, size-3*sizeof(uint32_t)); @@ -218,58 +245,171 @@ void *serverDeamon(void *pvcti){ int prm = psvr->packet_max; ssize_t tlen; char *str = nullptr; - printf("Checking Packet.\n"); + Addr taddr; do{ - tlen = psvr->socket.RecvRAW(&str); +// 加锁 + if (pthread_mutex_lock(&mutex) != 0) throw "lock error"; + tlen = psvr->socket.RecvRAW(&str,taddr); if(tlen > 0){ // 记录有效数据包 + if(Server::CheckRawMsg(str, tlen)){ - printf("Get\n"); raw_data *ptrdt = new raw_data(); Server::ProcessSignedRawMsg(str, tlen, *ptrdt); + ptrdt->address = *(struct sockaddr_in *)taddr.RawObj(); psvr->rawdata_in.push_back(ptrdt); + } } free(str); +// 解锁 + pthread_mutex_unlock(&mutex); }while (tlen && prm-- > 0); + + clockThreadFinish(pcti->tid); + pthread_exit(NULL); +} + +void *dataProcessorDeamon(void *pvcti){ + clock_thread_info *pcti = (clock_thread_info *) pvcti; + Server *psvr = (Server *) pcti->args; + psvr->ProcessRawData(); + clockThreadFinish(pcti->tid); + pthread_exit(NULL); +} + +void *packetProcessorDeamonForSquare(void *pvcti){ + clock_thread_info *pcti = (clock_thread_info *) pvcti; + SQEServer *psvr = (SQEServer *) pcti->args; + psvr->ProcessPacket(); + clockThreadFinish(pcti->tid); + pthread_exit(NULL); +} + +void *requsetProcessorDeamonForSquare(void *pvcti){ + clock_thread_info *pcti = (clock_thread_info *) pvcti; + SQEServer *psvr = (SQEServer *) pcti->args; + psvr->ProcessPacket(); clockThreadFinish(pcti->tid); pthread_exit(NULL); } void Server::ProcessRawData(void){ - for(auto prdt : rawdata_in){ - if(memcmp(prdt->info, "SPKT", sizeof(uint32_t))){ +// 一次性最大处理个数 + int prm = 2048; +// 加锁 + if (pthread_mutex_lock(&mutex) != 0) throw "lock error"; + for(auto &prdt : rawdata_in){ + if(prdt == nullptr) continue; + if(prm-- == 0) break; + + if(!memcmp(&prdt->info, "SPKT", sizeof(uint32_t))){ +// 加锁 + if (pthread_mutex_lock(&mutex_rp) != 0) throw "lock error"; packet *pnpkt = new packet(); - Rawdata2Packet(pnpkt,prdt); + Rawdata2Packet(*pnpkt,*prdt); + pnpkt->address = prdt->address; packets_in.push_back(pnpkt); - delete prdt; +// 解锁 + pthread_mutex_unlock(&mutex_rp); } else{ - delete prdt; + } + freeRawdataServer(*prdt); + delete prdt; + prdt = nullptr; + } - rawdata_in.clear(); +// 解锁 + pthread_mutex_unlock(&mutex); + + if (pthread_mutex_lock(&mutex) != 0) throw "lock error"; + rawdata_in.remove_if([](auto prdt){return prdt == nullptr;}); + pthread_mutex_unlock(&mutex); } -SQEServer::SQEServer(void){ +void SQEServer::ProcessPacket(void){ + printf("RW: %lu PKT: %lu REQ: %lu\n",rawdata_in.size(),packets_in.size(),req_list.size()); +// 一次性最大处理个数 + int prm = 2048; +// 加锁 + if (pthread_mutex_lock(&mutex_rp) != 0) throw "lock error"; + for(auto &ppkt : packets_in){ + if(ppkt == nullptr) continue; + if(prm-- == 0) break; + if(ppkt->type == REQUSET_TYPE){ + request *pnreq = new request(); + Packet2Request(*ppkt, *pnreq); + req_list.push_back(pnreq); + } + freePcaketServer(*ppkt); + delete ppkt; + ppkt = nullptr; + } +// 解锁 + pthread_mutex_unlock(&mutex_rp); + packets_in.remove_if([](auto &ppkt){return ppkt == nullptr;}); +} + +SQEServer::SQEServer(int port):Server(port){ if(sqlite3_open("info.db", &psql) == SQLITE_ERROR){ sql::printError(psql); throw "database is abnormal"; } - sqlite3_stmt psqlsmt; + sqlite3_stmt *psqlsmt; const char *pzTail; // 从数据库获得服务器的公私钥 string sql_quote = "select sqes_public,sqes_private from server_info where rowid = 1;"; - sqlite3_prepare(psql, sql_quote.data(), -1, &psqlsmt, pzTail); + sqlite3_prepare(psql, sql_quote.data(), -1, &psqlsmt, &pzTail); if(sqlite3_step(psqlsmt) != SQLITE_ROW){ sql::printError(psql); throw "database is abnormal"; } - Byte *tbyt = sqlite3_column_blob(psqlsmt, 0); + Byte *tbyt = (Byte *)sqlite3_column_blob(psqlsmt, 0); memcpy(&pkc, tbyt, sizeof(public_key_class)); - tbyt = sqlite3_column_blob(psqlsmt, 1); + tbyt = (Byte *)sqlite3_column_blob(psqlsmt, 1); memcpy(&prc, tbyt, sizeof(private_key_class)); sqlite3_finalize(psqlsmt); } + +void SQEServer::Packet2Request(packet &pkt, request &req){ + if(pkt.type == REQUSET_TYPE){ + req.r_id = *(uint32_t *)pkt.buffs[0].second; + req.type = (const char *)pkt.buffs[1].second; + req.data = (const char *)pkt.buffs[2].second; + req.t_addr = Addr(*(struct sockaddr_in *)pkt.buffs[3].second); + } +} + +void SQEServer::Request2Packet(packet &pkt, request &req){ + pkt.address = *req.t_addr.Obj(); +// 请求的类型标识号 + pkt.type = REQUSET_TYPE; + pkt.AddBuff((void *)&req.r_id, sizeof(req.r_id)); + pkt.AddBuff((void *)req.type.data(), (uint32_t)req.type.size()); + pkt.AddBuff((void *)req.data.data(), (uint32_t)req.data.size()); + pkt.AddBuff((void *)req.t_addr.Obj(), sizeof(struct sockaddr_in)); +} + +void packet::AddBuff(void *pbuff, uint32_t size){ + void *pnbuff = malloc(size); + memcpy(pnbuff, pbuff, size); + buffs.push_back({size,pnbuff}); +} + +void Server::ProcessRequset(void){ + +} + + +request::request(){ + r_id = rng::tsc_seed{}(); +} + + +packet::~packet(){ + +} diff --git a/src/socket.cpp b/src/socket.cpp index bcbc67f..64b7e10 100644 --- a/src/socket.cpp +++ b/src/socket.cpp @@ -65,14 +65,17 @@ ssize_t SocketUDPServer::Recv(string &str){ } } -ssize_t SocketUDPServer::RecvRAW(char **p_rdt){ +ssize_t SocketUDPServer::RecvRAW(char **p_rdt, Addr &taddr){ ssize_t tlen; + // 非阻塞输入 if(set_fcntl){ tlen = recvfrom(server_sfd, buff, BUFSIZ, 0, server_addr.RawObj(), server_addr.SizeP()); // 读取错误 if(tlen == -1 && errno != EAGAIN){ *p_rdt = nullptr; + printf("%d",errno); + perror("recv"); return -1; } // 缓冲区没有信息 @@ -84,6 +87,7 @@ ssize_t SocketUDPServer::RecvRAW(char **p_rdt){ else{ *p_rdt = (char *)malloc(tlen); memcpy(*p_rdt, buff, tlen); + taddr = server_addr; return tlen; } }