From f36897d3307bd42b520fcd11ab6735c44f27a3dc Mon Sep 17 00:00:00 2001 From: Saturneic Date: Wed, 6 Feb 2019 21:22:03 +0800 Subject: [PATCH] Added and fixed. --- Net.xcodeproj/project.pbxproj | 2 + include/instruct.h | 23 +++++++ include/net.h | 13 ++-- include/server.h | 35 ++++++++-- src/addr.cpp | 5 ++ src/client.cpp | 18 +++-- src/main.cpp | 27 +++++--- src/server.cpp | 119 +++++++++++++++++++++++++++++++--- src/socket.cpp | 18 +++-- 9 files changed, 220 insertions(+), 40 deletions(-) create mode 100644 include/instruct.h diff --git a/Net.xcodeproj/project.pbxproj b/Net.xcodeproj/project.pbxproj index 8ecfa4a..27c71b3 100644 --- a/Net.xcodeproj/project.pbxproj +++ b/Net.xcodeproj/project.pbxproj @@ -38,6 +38,7 @@ /* Begin PBXFileReference section */ 9221D9EB21EA5142007310A7 /* Net */ = {isa = PBXFileReference; explicitFileType = "compiled.mach-o.executable"; includeInIndex = 0; path = Net; sourceTree = BUILT_PRODUCTS_DIR; }; + 926E09632209D9D300AD5D5B /* instruct.h */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.c.h; name = instruct.h; path = include/instruct.h; sourceTree = ""; }; 9277A14621FD7246009C5F11 /* cmap.h */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.c.h; name = cmap.h; path = include/cmap.h; sourceTree = ""; }; 9277A14721FD7246009C5F11 /* server.h */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.c.h; name = server.h; path = include/server.h; sourceTree = ""; }; 9277A14821FD7246009C5F11 /* net.h */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.c.h; name = net.h; path = include/net.h; sourceTree = ""; }; @@ -129,6 +130,7 @@ 9277A14821FD7246009C5F11 /* net.h */, 9277A14721FD7246009C5F11 /* server.h */, 9277A15021FD7246009C5F11 /* type.h */, + 926E09632209D9D300AD5D5B /* instruct.h */, ); name = include; sourceTree = ""; diff --git a/include/instruct.h b/include/instruct.h new file mode 100644 index 0000000..60abbb3 --- /dev/null +++ b/include/instruct.h @@ -0,0 +1,23 @@ +// +// instruct.h +// Net +// +// Created by 胡一兵 on 2019/2/5. +// Copyright © 2019年 Bakantu. All rights reserved. +// + +#ifndef instruct_h +#define instruct_h + +#include "type.h" +#include "memory.h" +#include "clock.h" +#include "net.h" +#include "cproj.h" +#include "cpart.h" +#include "cmap.h" +#include "cthread.h" +#include "sha1.h" +#include "rsa.h" + +#endif /* instruct_h */ diff --git a/include/net.h b/include/net.h index a04914d..7fb0bb3 100644 --- a/include/net.h +++ b/include/net.h @@ -32,6 +32,7 @@ public: void SetIP(string ip_addr); // IP地址管理结构的大小变量 void SetSize(void); + void SetSockAddr(struct sockaddr_in); // 获得指向IP地址管理结构的指针 struct sockaddr_in *Obj(void); // 获得指向IP地址管理结构的指针 @@ -99,16 +100,18 @@ public : } } ~SocketClient(){ - close(client_sfd); + //close(client_sfd); } - // 接受储存简单字符串 +// 接受储存简单字符串 virtual void Send(string buff) = 0; - // 接受储存二进制串 +// 接受储存二进制串 virtual void SendRAW(char *buff, unsigned long size) = 0; - // 重新设置发送目的地的端口 +// 重新设置发送目的地的端口 void SetSendPort(int port); - // 重新设置发送目的地的IP地址 +// 重新设置发送目的地的IP地址 void SetSendIP(string ip); +// 共享设置发送地址相关信息管理结构 + void SetSendSockAddr(struct sockaddr_in); }; diff --git a/include/server.h b/include/server.h index 28d139f..63ddff7 100644 --- a/include/server.h +++ b/include/server.h @@ -33,15 +33,24 @@ struct request { rng::rng64 r_id = 0; string type; string data; + uint32_t recv_port; Addr t_addr; request(); }; +//请求监听管理结构 +struct request_listener{ + +} + struct respond { rng::rng64 r_id; string type; - string data; + Byte *buff = nullptr; + uint32_t buff_size; Addr t_addr; + void SetBuff(Byte *buff, uint32_t size); + ~respond(); }; //通用数据包类 @@ -93,6 +102,8 @@ protected: list packets_in; // 缓存带标签的二进制串管理结构 list rawdata_in; +// 输出的数据包列表 + list packets_out; struct server_info tsi; sqlite3 *psql; public: @@ -129,6 +140,7 @@ public: friend void *serverDeamon(void *psvr); // 处理RawData void ProcessRawData(void); + void ProcessSendPackets(void); }; @@ -156,17 +168,30 @@ public: void ProcessRequset(void); static void Packet2Request(packet &pkt, request &req); static void Request2Packet(packet &pkt, request &req); + static void Respond2Packet(packet &pkt, respond &res); + static void Packet2Respond(packet &pkt, respond &res); }; -//设置服务器守护程序的时钟 +class Client{ + list req_lst; + uint32_t listen_port; + + +}; + +//设置服务器守护线程的时钟 void setServerClock(Server *psvr, int clicks); -//设置广场服务器守护程序的时钟 +//设置广场服务器守护线程的时钟 void setServerClockForSquare(SQEServer *psvr, int clicks); //服务器接收数据包守护线程 void *serverDeamon(void *psvr); -//服务器处理原始数据守护进程 +//服务器处理原始数据守护线程 void *dataProcessorDeamon(void *pvcti); -//广场服务器处理数据包守护进程 +//广场服务器处理数据包守护线程 void *packetProcessorDeamonForSquare(void *pvcti); +//广场服务器处理请求守护线程 +void *requestProcessorDeamonForSquare(void *pvcti); +//服务器发送数据包守护线程 +void *sendPacketProcessorDeamonForSquare(void *pvcti); #endif /* server_h */ diff --git a/src/addr.cpp b/src/addr.cpp index fbb8fc2..2fccc0e 100644 --- a/src/addr.cpp +++ b/src/addr.cpp @@ -15,6 +15,7 @@ Addr::Addr(string ip_addr, int port, bool ipv4){ else address.sin_family = AF_INET6; address.sin_port = htons(port); + address.sin_addr.s_addr = inet_addr(ip_addr.data()); addr_size = sizeof(address); } @@ -86,3 +87,7 @@ bool Addr::checkValidIP(string ipaddr){ else return false; return true; } + +void Addr::SetSockAddr(struct sockaddr_in tsi){ + address = tsi; +} diff --git a/src/client.cpp b/src/client.cpp index 07d08e4..81509f9 100644 --- a/src/client.cpp +++ b/src/client.cpp @@ -6,21 +6,19 @@ // Copyright © 2019年 Bakantu. All rights reserved. // -#include "type.h" -#include "sql.h" -#include "net.h" -#include "server.h" -#include "rng.h" +#include "instruct.h" + int main(int argc, char *argv[]) { try { - Server BServer(1081,"127.0.0.1",9048); + Server BServer(9050,"127.0.0.1",9048); while (1) { request nreq; nreq.type = "client-square request"; nreq.data = "request for public key"; + nreq.port = 9050; packet *pnpkt = new packet(); SQEServer::Request2Packet(*pnpkt, nreq); raw_data *pnrwd = new raw_data(); @@ -31,6 +29,14 @@ int main(int argc, char *argv[]) delete pnrwd; Server::freePcaketServer(*pnpkt); delete pnpkt; + Addr taddr; + char *buff = nullptr; + if(BServer.socket.RecvRAW(&buff, taddr) > 0){ + printf("Receive: %s\n",buff); + free(buff); + } + + } diff --git a/src/main.cpp b/src/main.cpp index 24de971..95d5694 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -6,22 +6,14 @@ // Copyright © 2019年 Bakantu. All rights reserved. // -#include "type.h" -#include "memory.h" -#include "clock.h" -#include "net.h" -#include "cproj.h" -#include "cpart.h" -#include "cmap.h" -#include "cthread.h" -#include "sha1.h" -#include "rsa.h" +#include "instruct.h" extern string PRIME_SOURCE_FILE; int update(string instruct, vector &configs, vector &lconfigs, vector &targets); int construct(string instruct,vector &config, vector &lconfig, vector &target); int server(string instruct, vector &configs, vector &lconfigs, vector &targets); +int client(string instruct, vector &configs, vector &lconfigs, vector &targets); int init(string instruct, vector &configs, vector &lconfigs, vector &targets); int set(string instruct, vector &configs, vector &lconfigs, vector &targets); @@ -30,6 +22,7 @@ struct instructions{ int (*construct)(string, vector &, vector &, vector &) = NULL; int (*update)(string, vector &, vector &, vector &) = NULL; int (*server)(string, vector &, vector &, vector &) = NULL; + int (*client)(string, vector &, vector &, vector &) = NULL; int (*set)(string, vector &, vector &, vector &) = NULL; int (*init)(string, vector &, vector &, vector &) = NULL; }; @@ -62,6 +55,7 @@ int main(int argc, const char *argv[]){ istns.server = server; istns.init = init; istns.set = set; + istns.client = client; // 解析命令 int if_instruct = 1; @@ -104,6 +98,10 @@ int main(int argc, const char *argv[]){ if(istns.update != nullptr) istns.set(instruct,config,long_config,target); else error::printError("Function not found."); } + else if (instruct == "client"){ + if(istns.update != nullptr) istns.client(instruct,config,long_config,target); + else error::printError("Function not found."); + } else{ printf("\033[33mInstruction \"%s\" doesn't make sense.\n\033[0m",instruct.data()); } @@ -222,6 +220,10 @@ int init(string instruct, vector &configs, vector &lconfigs, vec } int set(string instruct, vector &configs, vector &lconfigs, vector &targets){ + if(targets.size() < 2){ + error::printError("Args error."); + return -1; + } sqlite3 *psql; sqlite3_stmt *psqlsmt; const char *pzTail; @@ -384,6 +386,11 @@ int construct(string instruct, vector &configs, vector &lconfigs return 0; } +int client(string instruct, vector &configs, vector &lconfigs, vector &targets){ + + return 0; +} + void wiki_cpart(void){ CPart ncp("./PCS","./Libs","a.cpp","A"); void *a = main_pool.bv_malloc(2.0); diff --git a/src/server.cpp b/src/server.cpp index ec59f23..5fa798e 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -10,12 +10,13 @@ #include "server.h" extern list clocks_list; -pthread_mutex_t mutex,mutex_rp,mutex_pktreq; +pthread_mutex_t mutex,mutex_rp,mutex_pktreq,mutex_sndpkt; void setServerClock(Server *psvr, int clicks){ pthread_mutex_init(&mutex, NULL); pthread_mutex_init(&mutex_rp, NULL); -// 注册数据接收守护时钟 + pthread_mutex_init(&mutex_sndpkt, NULL); +// 注册数据接收时钟 clock_register *pncr = new clock_register(); pncr->if_thread = true; pncr->if_reset = true; @@ -25,7 +26,7 @@ void setServerClock(Server *psvr, int clicks){ pncr->arg = (void *)psvr; newClock(pncr); -// 注册数据处理守护时钟 +// 注册数据处理时钟 pncr = new clock_register(); pncr->if_thread = true; pncr->if_reset = true; @@ -34,6 +35,16 @@ void setServerClock(Server *psvr, int clicks){ pncr->func = dataProcessorDeamon; pncr->arg = (void *)psvr; newClock(pncr); + +// 注册标准数据包发送时钟 + pncr = new clock_register(); + pncr->if_thread = true; + pncr->if_reset = true; + pncr->click = clicks*2; + pncr->rawclick = clicks/1.5; + pncr->func = sendPacketProcessorDeamonForSquare; + pncr->arg = (void *)psvr; + newClock(pncr); } void setServerClockForSquare(SQEServer *psvr, int clicks){ @@ -48,6 +59,16 @@ void setServerClockForSquare(SQEServer *psvr, int clicks){ pncr->func = packetProcessorDeamonForSquare; pncr->arg = (void *)psvr; newClock(pncr); + +// 注册请求处理守护时钟 + pncr = new clock_register(); + pncr->if_thread = true; + pncr->if_reset = true; + pncr->click = clicks*2+7; + pncr->rawclick = clicks/2; + pncr->func = requestProcessorDeamonForSquare; + pncr->arg = (void *)psvr; + newClock(pncr); } Server::Server(int port, string send_ip,int send_port):socket(port),send_socket(send_ip,send_port){ @@ -286,10 +307,19 @@ void *packetProcessorDeamonForSquare(void *pvcti){ pthread_exit(NULL); } -void *requsetProcessorDeamonForSquare(void *pvcti){ +void *requestProcessorDeamonForSquare(void *pvcti){ clock_thread_info *pcti = (clock_thread_info *) pvcti; SQEServer *psvr = (SQEServer *) pcti->args; - psvr->ProcessPacket(); + psvr->ProcessRequset(); + clockThreadFinish(pcti->tid); + pthread_exit(NULL); +} + +void *sendPacketProcessorDeamonForSquare(void *pvcti){ + clock_thread_info *pcti = (clock_thread_info *) pvcti; + SQEServer *psvr = (SQEServer *) pcti->args; + + psvr->ProcessSendPackets(); clockThreadFinish(pcti->tid); pthread_exit(NULL); } @@ -330,7 +360,6 @@ void Server::ProcessRawData(void){ } void SQEServer::ProcessPacket(void){ - printf("RW: %lu PKT: %lu REQ: %lu\n",rawdata_in.size(),packets_in.size(),req_list.size()); // 一次性最大处理个数 int prm = 2048; // 加锁 @@ -339,17 +368,20 @@ void SQEServer::ProcessPacket(void){ if(ppkt == nullptr) continue; if(prm-- == 0) break; if(ppkt->type == REQUSET_TYPE){ + if(pthread_mutex_lock(&mutex_pktreq) != 0) throw "lock error"; request *pnreq = new request(); Packet2Request(*ppkt, *pnreq); + pnreq->t_addr.SetSockAddr(ppkt->address); req_list.push_back(pnreq); + pthread_mutex_unlock(&mutex_pktreq); } freePcaketServer(*ppkt); delete ppkt; ppkt = nullptr; } + packets_in.remove_if([](auto &ppkt){return ppkt == nullptr;}); // 解锁 pthread_mutex_unlock(&mutex_rp); - packets_in.remove_if([](auto &ppkt){return ppkt == nullptr;}); } SQEServer::SQEServer(int port):Server(port){ @@ -381,6 +413,7 @@ void SQEServer::Packet2Request(packet &pkt, request &req){ req.type = (const char *)pkt.buffs[1].second; req.data = (const char *)pkt.buffs[2].second; req.t_addr = Addr(*(struct sockaddr_in *)pkt.buffs[3].second); + req.recv_port = *(uint32_t *)pkt.buffs[4].second; } } @@ -392,6 +425,7 @@ void SQEServer::Request2Packet(packet &pkt, request &req){ pkt.AddBuff((void *)req.type.data(), (uint32_t)req.type.size()); pkt.AddBuff((void *)req.data.data(), (uint32_t)req.data.size()); pkt.AddBuff((void *)req.t_addr.Obj(), sizeof(struct sockaddr_in)); + pkt.AddBuff((void *)&req.recv_port, sizeof(uint32_t)); } void packet::AddBuff(void *pbuff, uint32_t size){ @@ -400,16 +434,85 @@ void packet::AddBuff(void *pbuff, uint32_t size){ buffs.push_back({size,pnbuff}); } -void Server::ProcessRequset(void){ +void SQEServer::ProcessRequset(void){ + printf("RW: %4lu PKT: %4lu REQ: %4lu PKTS: %5lu\n",rawdata_in.size(),packets_in.size(),req_list.size(),packets_out.size()); +// 一次性最大处理数 + int prm = 2048; + if(pthread_mutex_lock(&mutex_pktreq) != 0) throw "lock error"; + for(auto &preq : req_list){ + if(preq == nullptr) continue; + if(prm-- == 0) break; + if(preq->type == "client-square request"){ + if(preq->data == "request for public key"){ + + respond *pnr = new respond(); + pnr->r_id = preq->r_id; + pnr->SetBuff((Byte *)&pkc, sizeof(public_key_class)); + pnr->type = "square public key"; + pnr->t_addr = preq->t_addr; + pnr->t_addr.SetPort(preq->recv_port); + packet *pnpkt = new packet(); + Respond2Packet(*pnpkt, *pnr); + delete pnr; + if(pthread_mutex_lock(&mutex_sndpkt) != 0) throw "lock error"; + packets_out.push_back(pnpkt); + pthread_mutex_unlock(&mutex_sndpkt); + } + } + delete preq; + preq = nullptr; + } + req_list.remove_if([](auto &preq){return preq == nullptr;}); + pthread_mutex_unlock(&mutex_pktreq); +} + +void SQEServer::Packet2Respond(packet &pkt, respond &res){ } +void SQEServer::Respond2Packet(packet &pkt, respond &res){ + pkt.type = RESPOND_TYPE; + pkt.address = *res.t_addr.Obj(); + pkt.AddBuff((void *) res.type.data(), (uint32_t)res.type.size()); + pkt.AddBuff((void *)res.buff, res.buff_size); +} request::request(){ r_id = rng::tsc_seed{}(); } +void respond::SetBuff(Byte *buff, uint32_t size){ + void *nbuff = malloc(size); + memcpy(nbuff, buff, size); + this->buff = (Byte *)nbuff; + this->buff_size = size; +} packet::~packet(){ } + +respond::~respond(){ + if(buff != nullptr) free(buff); +} + +void Server::ProcessSendPackets(void){ +// 一次性最大处理个数 + int prm = 512; + if(pthread_mutex_lock(&mutex_sndpkt) != 0) throw "lock error"; + for(auto &ppkt : packets_out){ + if(ppkt == nullptr) continue; + if(prm-- == 0) break; + raw_data nrwd; + Packet2Rawdata(*ppkt, nrwd); + SignedRawdata(&nrwd, "SPKT"); + send_socket.SetSendSockAddr(ppkt->address); + SentRawdata(&nrwd); + freeRawdataServer(nrwd); + freePcaketServer(*ppkt); + delete ppkt; + ppkt = nullptr; + } + packets_out.remove_if([](auto ppkt){return ppkt == nullptr;}); + pthread_mutex_unlock(&mutex_sndpkt); +} diff --git a/src/socket.cpp b/src/socket.cpp index 64b7e10..4348ad9 100644 --- a/src/socket.cpp +++ b/src/socket.cpp @@ -67,10 +67,12 @@ ssize_t SocketUDPServer::Recv(string &str){ ssize_t SocketUDPServer::RecvRAW(char **p_rdt, Addr &taddr){ ssize_t tlen; - - // 非阻塞输入 + sockaddr_in tsai; + socklen_t tsai_size = sizeof(sockaddr); +// 非阻塞读取 if(set_fcntl){ - tlen = recvfrom(server_sfd, buff, BUFSIZ, 0, server_addr.RawObj(), server_addr.SizeP()); + tlen = recvfrom(server_sfd, buff, BUFSIZ, 0, (struct sockaddr *)(&tsai), &tsai_size); + // 读取错误 if(tlen == -1 && errno != EAGAIN){ *p_rdt = nullptr; @@ -78,16 +80,16 @@ ssize_t SocketUDPServer::RecvRAW(char **p_rdt, Addr &taddr){ perror("recv"); return -1; } - // 缓冲区没有信息 +// 缓冲区没有信息 else if(tlen == 0 || (tlen == -1 && errno == EAGAIN)){ *p_rdt = nullptr; return 0; } - // 成功读取信息 +// 成功读取信息 else{ *p_rdt = (char *)malloc(tlen); + taddr.SetSockAddr(tsai); memcpy(*p_rdt, buff, tlen); - taddr = server_addr; return tlen; } } @@ -118,3 +120,7 @@ void SocketUDPClient::Send(string buff){ void SocketUDPClient::SendRAW(char *buff, unsigned long size){ sendto(client_sfd, buff, size, 0, send_addr.RawObj(), send_addr.Size()); } + +void SocketClient::SetSendSockAddr(struct sockaddr_in tsi){ + send_addr.SetSockAddr(tsi); +}