diff --git a/Net.xcodeproj/project.pbxproj b/Net.xcodeproj/project.pbxproj index cefcd82..8a9aac2 100644 --- a/Net.xcodeproj/project.pbxproj +++ b/Net.xcodeproj/project.pbxproj @@ -11,6 +11,7 @@ 925A13A621EC68D500CBD427 /* cpart.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 925A13A421EC67C900CBD427 /* cpart.cpp */; }; 925A13A921EC973000CBD427 /* cmap.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 925A13A721EC973000CBD427 /* cmap.cpp */; }; 925A13AD21EC9DB900CBD427 /* cthread.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 925A13AB21EC9DB900CBD427 /* cthread.cpp */; }; + 92D6CE6921EE4920005AEF3B /* server.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 92D6CE6721EE4920005AEF3B /* server.cpp */; }; /* End PBXBuildFile section */ /* Begin PBXCopyFilesBuildPhase section */ @@ -36,6 +37,8 @@ 925A13A821EC973000CBD427 /* cmap.h */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.c.h; path = cmap.h; sourceTree = ""; }; 925A13AB21EC9DB900CBD427 /* cthread.cpp */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.cpp.cpp; path = cthread.cpp; sourceTree = ""; }; 925A13AC21EC9DB900CBD427 /* cthread.h */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.c.h; path = cthread.h; sourceTree = ""; }; + 92D6CE6721EE4920005AEF3B /* server.cpp */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.cpp.cpp; path = server.cpp; sourceTree = ""; }; + 92D6CE6821EE4920005AEF3B /* server.h */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.c.h; path = server.h; sourceTree = ""; }; /* End PBXFileReference section */ /* Begin PBXFrameworksBuildPhase section */ @@ -74,6 +77,7 @@ 925A13A421EC67C900CBD427 /* cpart.cpp */, 925A13A721EC973000CBD427 /* cmap.cpp */, 925A13AB21EC9DB900CBD427 /* cthread.cpp */, + 92D6CE6721EE4920005AEF3B /* server.cpp */, ); name = Net; sourceTree = ""; @@ -85,6 +89,7 @@ 9221DA0F21EB5FB8007310A7 /* net.h */, 9221DA1421EB62F6007310A7 /* cpart.h */, 925A13A821EC973000CBD427 /* cmap.h */, + 92D6CE6821EE4920005AEF3B /* server.h */, ); name = include; sourceTree = ""; @@ -148,6 +153,7 @@ 9221DA1121EB5FB8007310A7 /* net.cpp in Sources */, 925A13A621EC68D500CBD427 /* cpart.cpp in Sources */, 925A13A921EC973000CBD427 /* cmap.cpp in Sources */, + 92D6CE6921EE4920005AEF3B /* server.cpp in Sources */, 925A13AD21EC9DB900CBD427 /* cthread.cpp in Sources */, ); runOnlyForDeploymentPostprocessing = 0; diff --git a/cmap.cpp b/cmap.cpp index 1f68299..4320306 100644 --- a/cmap.cpp +++ b/cmap.cpp @@ -159,4 +159,11 @@ Depends CMap::ReadItem(string item){ return dep; } - +void CMap::MapThrough(CPart *pcp,void (*func)(void *, CPart *),void *args){ +// 调用回调函数 + func(args,pcp); + auto dpds = pcp->depends; + for(auto i = dpds.begin(); i != dpds.end(); i++){ + MapThrough(i->t_cpart, func, args); + } +} diff --git a/cmap.h b/cmap.h index 0c46262..77a96c2 100644 --- a/cmap.h +++ b/cmap.h @@ -36,6 +36,9 @@ public: // 根据图描述文件依赖关系描述语句所提供的信息转化为依赖关系结构 Depends ReadItem(string item); +// 由某个节点递归向下遍历 + static void MapThrough(CPart *pcp,void(*func)(void *,CPart *),void *); + }; #endif /* cmap_h */ diff --git a/cthread.cpp b/cthread.cpp index 55ed579..1c1b73d 100644 --- a/cthread.cpp +++ b/cthread.cpp @@ -8,8 +8,6 @@ #include "cthread.h" -static struct itimerval oitrl, itrl; - list daemon_list = {}; CThread::CThread(CMap *tp_map,int thdnum):p_map(tp_map),idxtid(0),thdnum(thdnum){ @@ -23,6 +21,7 @@ CThread::CThread(CMap *tp_map,int thdnum):p_map(tp_map),idxtid(0),thdnum(thdnum) // 构造任务进度列表 for(auto k = p_map->cparts.begin(); k != p_map->cparts.end(); k++){ ifsolved.insert(pair((*k).first,false)); + if_rargs.insert(pair((*k).first,false)); } } @@ -155,8 +154,10 @@ void CThread::Daemon(void){ pthread_join(cpdt, (void **)&rpv); // 根据返回值处理计算任务状态 if(rpv->rtn == SUCCESS){ -// 标识该计算模块中计算任务的状态 +// 标识该计算模块中计算任务的状态为已解决 ifsolved.find(rpv->pcp->name)->second = true; +// 标识储存有该计算任务的输出参数 + if_rargs.find(rpv->pcp->name)->second = true; } else{ @@ -264,20 +265,52 @@ int CThread::CancelChildPCS(unsigned long tid){ return 0; } -//设置全局线程时钟 -void setThreadsClock(void){ - itrl.it_interval.tv_sec = 0; - itrl.it_interval.tv_usec = 500000; - itrl.it_value.tv_sec = 0; - itrl.it_value.tv_usec = 500000; - setitimer(ITIMER_REAL, &itrl, &oitrl); -} -//时钟滴答调用函数 -void threadsClock(int n){ - for(auto i = daemon_list.begin(); i != daemon_list.end(); i++){ - (*i)->Daemon(); +int CThread::GetCPUResult(struct compute_result *pcrt){ + ifsolved.find(pcrt->name)->second = true; +// 处理输出参数 + int count = 0; + CPart *pcp = p_map->cparts.find(pcrt->name)->second; + vector farg_out = pcp->fargs_out; + + for(auto i = pcrt->args_out->begin(); i != pcrt->args_out->end(); i++,count++){ + if(farg_out[count] == INT){ + AddArgsOut(pcrt->name, *((int *)(*i))); + } + else if(farg_out[count] == DOUBLE){ + AddArgsOut(pcrt->name, *((double *)(*i))); + } } - daemon_list.clear(); +// 处理输入参数 + vector farg_in = pcp->fargs_in; + count = 0; + for(auto i = pcrt->args_in->begin(); i != pcrt->args_in->end(); i++,count++){ + if(farg_in[count] == INT){ + AddArgs(pcrt->name, *((int *)(*i))); + } + else if(farg_in[count] == DOUBLE){ + AddArgs(pcrt->name, *((double *)(*i))); + } + } + ifsolved.find(pcrt->name)->second = true; + if_rargs.find(pcrt->name)->second = true; + +// 处理关联计算模块 + p_map->CMap::MapThrough(pcp, CThread::SignedCpart,&ifsolved); + return 0; +} + +struct compute_result CThread::BuildCPUResult(CPart *pcp){ + struct compute_result ncpur; + ncpur.name = pcp->name; + ncpur.args_in = &rargs.find(ncpur.name)->second; + ncpur.args_in = &rargs.find(ncpur.name)->second; + return ncpur; +} + + +void CThread::SignedCpart(void *args, CPart *pcp){ + map *pifsolved = (map *) args; + pifsolved->find(pcp->name)->second = true; } //注册任务进程时钟调用 diff --git a/cthread.h b/cthread.h index 22f4068..9eec979 100644 --- a/cthread.h +++ b/cthread.h @@ -11,10 +11,9 @@ #include "cpart.h" #include "cmap.h" +#include "server.h" #include -#include -#include #include using std::list; @@ -49,24 +48,19 @@ struct line_process{ list line; }; -//外来数据包解析结构 -struct compute_result{ - string name; - vector args_in; - vector args_out; -}; - //计算进程管理结构 class CThread{ public: // 对应的图结构管理结构 - CMap *p_map; + const CMap * const p_map; // 此计算进程中计算模块的传入参数数据列表 map> rargs; // 此计算进程的计算模块的传出参数数据列表 map> rargs_out; -// 计算模块是否已经执行 +// 计算模块是否已经解决 map ifsolved; +// 计算模块是否有数据 + map if_rargs; // tid生成的依据 unsigned long idxtid; // 并行线程的个数 @@ -76,6 +70,7 @@ public: // 守护进程定时器 struct itimerval itrl; + // 使用图结构管理结构来构造计算进程管理结构 CThread(CMap *tp_map, int thdnum = 4); ~CThread(); @@ -88,6 +83,15 @@ public: (*k).second.push_back((void *)p_value); } + template +// 添加相关计算模块的传出参数 + void AddArgsOut(string name, T value){ + auto k = rargs_out.find(name); + T *p_value = new T(); + *p_value = value; + (*k).second.push_back((void *)p_value); + } + // 设置守护进程 void SetDaemon(void); @@ -103,8 +107,14 @@ public: int CancelChildPCS(unsigned long tid); // 处理数据包 int GetCPUResult(struct compute_result *); +// 标记计算模块 + static void SignedCpart(void *args, CPart *pcp); // 导出数据包 - struct compute_result CPUResult(CPart *); + struct compute_result BuildCPUResult(CPart *); +// 发送数据包到服务器 + int SendCPUResult(Server *,struct compute_result); +// 从服务器中获得数据包 + vector GetCPURFromServer(Server *); // 查询计算模块是否已经解决 CPart *IfCPTSolved(string name); // 建立新线程执行计算模块 @@ -117,10 +127,6 @@ public: static void ChildThreadFSH(struct thread_args *); }; -//设置全局线程时钟 -void setThreadsClock(void); -//时钟滴答调用函数 -void threadsClock(int); //注册任务进程时钟调用 void setTrdClock(CThread *ptd); diff --git a/net.cpp b/net.cpp index 330c596..6335d53 100644 --- a/net.cpp +++ b/net.cpp @@ -12,12 +12,30 @@ #include "cthread.h" extern list daemon_list; +extern list server_list; +static struct itimerval oitrl, itrl; void init(void){ signal(SIGALRM, threadsClock); setThreadsClock(); } +//设置全局线程时钟 +void setThreadsClock(void){ + itrl.it_interval.tv_sec = 0; + itrl.it_interval.tv_usec = 500000; + itrl.it_value.tv_sec = 0; + itrl.it_value.tv_usec = 500000; + setitimer(ITIMER_REAL, &itrl, &oitrl); +} +//时钟滴答调用函数 +void threadsClock(int n){ + for(auto i = daemon_list.begin(); i != daemon_list.end(); i++){ + (*i)->Daemon(); + } + daemon_list.clear(); +} + int main(void){ init(); CMap map("./PCS"); diff --git a/net.h b/net.h index 30d2ad4..0f15b44 100644 --- a/net.h +++ b/net.h @@ -26,6 +26,8 @@ #include #include #include +#include +#include #include "cpart.h" @@ -178,8 +180,6 @@ public: } }; - - struct pcs_result{ char *name[16]; uint32_t in_size; @@ -188,4 +188,10 @@ struct pcs_result{ char *out_buff; }; +//设置全局线程时钟 +void setThreadsClock(void); +//时钟滴答调用函数 +void threadsClock(int); + + #endif /* net_hpp */ diff --git a/server.cpp b/server.cpp new file mode 100644 index 0000000..529e3bf --- /dev/null +++ b/server.cpp @@ -0,0 +1,9 @@ +// +// server.cpp +// Net +// +// Created by 胡一兵 on 2019/1/16. +// Copyright © 2019年 Bakantu. All rights reserved. +// + +#include "server.h" diff --git a/server.h b/server.h new file mode 100644 index 0000000..a3d107d --- /dev/null +++ b/server.h @@ -0,0 +1,42 @@ +// +// server.hpp +// Net +// +// Created by 胡一兵 on 2019/1/16. +// Copyright © 2019年 Bakantu. All rights reserved. +// + +#ifndef server_h +#define server_h + +#include "net.h" + +class Server; + +static list server_list; + +//外来数据包解析结构 +struct compute_result{ + string name; + vector *args_in; + vector *args_out; +}; + +//原始数据包 +struct packet{ + unsigned int type; + vector> buffs; +}; + +class Server{ + vector cpurs; + vector packets; + Socket socket; + Server(string ip_addr):socket(ip_addr,9048,true,false){ + } + void Deamon(void){ + socket.PacketRecv(<#Addr t_addr#>) + } +}; + +#endif /* server_h */