This commit is contained in:
Saturneic 2019-01-16 01:36:22 +08:00
parent 98ddf7ed6f
commit 10546558c2
9 changed files with 165 additions and 35 deletions

View File

@ -11,6 +11,7 @@
925A13A621EC68D500CBD427 /* cpart.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 925A13A421EC67C900CBD427 /* cpart.cpp */; }; 925A13A621EC68D500CBD427 /* cpart.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 925A13A421EC67C900CBD427 /* cpart.cpp */; };
925A13A921EC973000CBD427 /* cmap.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 925A13A721EC973000CBD427 /* cmap.cpp */; }; 925A13A921EC973000CBD427 /* cmap.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 925A13A721EC973000CBD427 /* cmap.cpp */; };
925A13AD21EC9DB900CBD427 /* cthread.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 925A13AB21EC9DB900CBD427 /* cthread.cpp */; }; 925A13AD21EC9DB900CBD427 /* cthread.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 925A13AB21EC9DB900CBD427 /* cthread.cpp */; };
92D6CE6921EE4920005AEF3B /* server.cpp in Sources */ = {isa = PBXBuildFile; fileRef = 92D6CE6721EE4920005AEF3B /* server.cpp */; };
/* End PBXBuildFile section */ /* End PBXBuildFile section */
/* Begin PBXCopyFilesBuildPhase section */ /* Begin PBXCopyFilesBuildPhase section */
@ -36,6 +37,8 @@
925A13A821EC973000CBD427 /* cmap.h */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.c.h; path = cmap.h; sourceTree = "<group>"; }; 925A13A821EC973000CBD427 /* cmap.h */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.c.h; path = cmap.h; sourceTree = "<group>"; };
925A13AB21EC9DB900CBD427 /* cthread.cpp */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.cpp.cpp; path = cthread.cpp; sourceTree = "<group>"; }; 925A13AB21EC9DB900CBD427 /* cthread.cpp */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.cpp.cpp; path = cthread.cpp; sourceTree = "<group>"; };
925A13AC21EC9DB900CBD427 /* cthread.h */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.c.h; path = cthread.h; sourceTree = "<group>"; }; 925A13AC21EC9DB900CBD427 /* cthread.h */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.c.h; path = cthread.h; sourceTree = "<group>"; };
92D6CE6721EE4920005AEF3B /* server.cpp */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.cpp.cpp; path = server.cpp; sourceTree = "<group>"; };
92D6CE6821EE4920005AEF3B /* server.h */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.c.h; path = server.h; sourceTree = "<group>"; };
/* End PBXFileReference section */ /* End PBXFileReference section */
/* Begin PBXFrameworksBuildPhase section */ /* Begin PBXFrameworksBuildPhase section */
@ -74,6 +77,7 @@
925A13A421EC67C900CBD427 /* cpart.cpp */, 925A13A421EC67C900CBD427 /* cpart.cpp */,
925A13A721EC973000CBD427 /* cmap.cpp */, 925A13A721EC973000CBD427 /* cmap.cpp */,
925A13AB21EC9DB900CBD427 /* cthread.cpp */, 925A13AB21EC9DB900CBD427 /* cthread.cpp */,
92D6CE6721EE4920005AEF3B /* server.cpp */,
); );
name = Net; name = Net;
sourceTree = "<group>"; sourceTree = "<group>";
@ -85,6 +89,7 @@
9221DA0F21EB5FB8007310A7 /* net.h */, 9221DA0F21EB5FB8007310A7 /* net.h */,
9221DA1421EB62F6007310A7 /* cpart.h */, 9221DA1421EB62F6007310A7 /* cpart.h */,
925A13A821EC973000CBD427 /* cmap.h */, 925A13A821EC973000CBD427 /* cmap.h */,
92D6CE6821EE4920005AEF3B /* server.h */,
); );
name = include; name = include;
sourceTree = "<group>"; sourceTree = "<group>";
@ -148,6 +153,7 @@
9221DA1121EB5FB8007310A7 /* net.cpp in Sources */, 9221DA1121EB5FB8007310A7 /* net.cpp in Sources */,
925A13A621EC68D500CBD427 /* cpart.cpp in Sources */, 925A13A621EC68D500CBD427 /* cpart.cpp in Sources */,
925A13A921EC973000CBD427 /* cmap.cpp in Sources */, 925A13A921EC973000CBD427 /* cmap.cpp in Sources */,
92D6CE6921EE4920005AEF3B /* server.cpp in Sources */,
925A13AD21EC9DB900CBD427 /* cthread.cpp in Sources */, 925A13AD21EC9DB900CBD427 /* cthread.cpp in Sources */,
); );
runOnlyForDeploymentPostprocessing = 0; runOnlyForDeploymentPostprocessing = 0;

View File

@ -159,4 +159,11 @@ Depends CMap::ReadItem(string item){
return dep; return dep;
} }
void CMap::MapThrough(CPart *pcp,void (*func)(void *, CPart *),void *args){
// 调用回调函数
func(args,pcp);
auto dpds = pcp->depends;
for(auto i = dpds.begin(); i != dpds.end(); i++){
MapThrough(i->t_cpart, func, args);
}
}

3
cmap.h
View File

@ -36,6 +36,9 @@ public:
// 根据图描述文件依赖关系描述语句所提供的信息转化为依赖关系结构 // 根据图描述文件依赖关系描述语句所提供的信息转化为依赖关系结构
Depends ReadItem(string item); Depends ReadItem(string item);
// 由某个节点递归向下遍历
static void MapThrough(CPart *pcp,void(*func)(void *,CPart *),void *);
}; };
#endif /* cmap_h */ #endif /* cmap_h */

View File

@ -8,8 +8,6 @@
#include "cthread.h" #include "cthread.h"
static struct itimerval oitrl, itrl;
list<CThread *> daemon_list = {}; list<CThread *> daemon_list = {};
CThread::CThread(CMap *tp_map,int thdnum):p_map(tp_map),idxtid(0),thdnum(thdnum){ CThread::CThread(CMap *tp_map,int thdnum):p_map(tp_map),idxtid(0),thdnum(thdnum){
@ -23,6 +21,7 @@ CThread::CThread(CMap *tp_map,int thdnum):p_map(tp_map),idxtid(0),thdnum(thdnum)
// 构造任务进度列表 // 构造任务进度列表
for(auto k = p_map->cparts.begin(); k != p_map->cparts.end(); k++){ for(auto k = p_map->cparts.begin(); k != p_map->cparts.end(); k++){
ifsolved.insert(pair<string,bool>((*k).first,false)); ifsolved.insert(pair<string,bool>((*k).first,false));
if_rargs.insert(pair<string,bool>((*k).first,false));
} }
} }
@ -155,8 +154,10 @@ void CThread::Daemon(void){
pthread_join(cpdt, (void **)&rpv); pthread_join(cpdt, (void **)&rpv);
// 根据返回值处理计算任务状态 // 根据返回值处理计算任务状态
if(rpv->rtn == SUCCESS){ if(rpv->rtn == SUCCESS){
// 标识该计算模块中计算任务的状态 // 标识该计算模块中计算任务的状态为已解决
ifsolved.find(rpv->pcp->name)->second = true; ifsolved.find(rpv->pcp->name)->second = true;
// 标识储存有该计算任务的输出参数
if_rargs.find(rpv->pcp->name)->second = true;
} }
else{ else{
@ -264,20 +265,52 @@ int CThread::CancelChildPCS(unsigned long tid){
return 0; return 0;
} }
//设置全局线程时钟 int CThread::GetCPUResult(struct compute_result *pcrt){
void setThreadsClock(void){ ifsolved.find(pcrt->name)->second = true;
itrl.it_interval.tv_sec = 0; // 处理输出参数
itrl.it_interval.tv_usec = 500000; int count = 0;
itrl.it_value.tv_sec = 0; CPart *pcp = p_map->cparts.find(pcrt->name)->second;
itrl.it_value.tv_usec = 500000; vector<int> farg_out = pcp->fargs_out;
setitimer(ITIMER_REAL, &itrl, &oitrl);
} for(auto i = pcrt->args_out->begin(); i != pcrt->args_out->end(); i++,count++){
//时钟滴答调用函数 if(farg_out[count] == INT){
void threadsClock(int n){ AddArgsOut<int>(pcrt->name, *((int *)(*i)));
for(auto i = daemon_list.begin(); i != daemon_list.end(); i++){ }
(*i)->Daemon(); else if(farg_out[count] == DOUBLE){
AddArgsOut<double>(pcrt->name, *((double *)(*i)));
}
} }
daemon_list.clear(); // 处理输入参数
vector<int> farg_in = pcp->fargs_in;
count = 0;
for(auto i = pcrt->args_in->begin(); i != pcrt->args_in->end(); i++,count++){
if(farg_in[count] == INT){
AddArgs<int>(pcrt->name, *((int *)(*i)));
}
else if(farg_in[count] == DOUBLE){
AddArgs<double>(pcrt->name, *((double *)(*i)));
}
}
ifsolved.find(pcrt->name)->second = true;
if_rargs.find(pcrt->name)->second = true;
// 处理关联计算模块
p_map->CMap::MapThrough(pcp, CThread::SignedCpart,&ifsolved);
return 0;
}
struct compute_result CThread::BuildCPUResult(CPart *pcp){
struct compute_result ncpur;
ncpur.name = pcp->name;
ncpur.args_in = &rargs.find(ncpur.name)->second;
ncpur.args_in = &rargs.find(ncpur.name)->second;
return ncpur;
}
void CThread::SignedCpart(void *args, CPart *pcp){
map<string,bool> *pifsolved = (map<string,bool> *) args;
pifsolved->find(pcp->name)->second = true;
} }
//注册任务进程时钟调用 //注册任务进程时钟调用

View File

@ -11,10 +11,9 @@
#include "cpart.h" #include "cpart.h"
#include "cmap.h" #include "cmap.h"
#include "server.h"
#include <pthread.h> #include <pthread.h>
#include <sys/time.h>
#include <signal.h>
#include <list> #include <list>
using std::list; using std::list;
@ -49,24 +48,19 @@ struct line_process{
list<CPart *> line; list<CPart *> line;
}; };
//外来数据包解析结构
struct compute_result{
string name;
vector<void *> args_in;
vector<void *> args_out;
};
//计算进程管理结构 //计算进程管理结构
class CThread{ class CThread{
public: public:
// 对应的图结构管理结构 // 对应的图结构管理结构
CMap *p_map; const CMap * const p_map;
// 此计算进程中计算模块的传入参数数据列表 // 此计算进程中计算模块的传入参数数据列表
map<string,vector<void *>> rargs; map<string,vector<void *>> rargs;
// 此计算进程的计算模块的传出参数数据列表 // 此计算进程的计算模块的传出参数数据列表
map<string,vector<void *>> rargs_out; map<string,vector<void *>> rargs_out;
// 计算模块是否已经执行 // 计算模块是否已经解决
map<string,bool> ifsolved; map<string,bool> ifsolved;
// 计算模块是否有数据
map<string,bool> if_rargs;
// tid生成的依据 // tid生成的依据
unsigned long idxtid; unsigned long idxtid;
// 并行线程的个数 // 并行线程的个数
@ -76,6 +70,7 @@ public:
// 守护进程定时器 // 守护进程定时器
struct itimerval itrl; struct itimerval itrl;
// 使用图结构管理结构来构造计算进程管理结构 // 使用图结构管理结构来构造计算进程管理结构
CThread(CMap *tp_map, int thdnum = 4); CThread(CMap *tp_map, int thdnum = 4);
~CThread(); ~CThread();
@ -88,6 +83,15 @@ public:
(*k).second.push_back((void *)p_value); (*k).second.push_back((void *)p_value);
} }
template<class T>
// 添加相关计算模块的传出参数
void AddArgsOut(string name, T value){
auto k = rargs_out.find(name);
T *p_value = new T();
*p_value = value;
(*k).second.push_back((void *)p_value);
}
// 设置守护进程 // 设置守护进程
void SetDaemon(void); void SetDaemon(void);
@ -103,8 +107,14 @@ public:
int CancelChildPCS(unsigned long tid); int CancelChildPCS(unsigned long tid);
// 处理数据包 // 处理数据包
int GetCPUResult(struct compute_result *); int GetCPUResult(struct compute_result *);
// 标记计算模块
static void SignedCpart(void *args, CPart *pcp);
// 导出数据包 // 导出数据包
struct compute_result CPUResult(CPart *); struct compute_result BuildCPUResult(CPart *);
// 发送数据包到服务器
int SendCPUResult(Server *,struct compute_result);
// 从服务器中获得数据包
vector<struct compute_result> GetCPURFromServer(Server *);
// 查询计算模块是否已经解决 // 查询计算模块是否已经解决
CPart *IfCPTSolved(string name); CPart *IfCPTSolved(string name);
// 建立新线程执行计算模块 // 建立新线程执行计算模块
@ -117,10 +127,6 @@ public:
static void ChildThreadFSH(struct thread_args *); static void ChildThreadFSH(struct thread_args *);
}; };
//设置全局线程时钟
void setThreadsClock(void);
//时钟滴答调用函数
void threadsClock(int);
//注册任务进程时钟调用 //注册任务进程时钟调用
void setTrdClock(CThread *ptd); void setTrdClock(CThread *ptd);

18
net.cpp
View File

@ -12,12 +12,30 @@
#include "cthread.h" #include "cthread.h"
extern list<CThread *> daemon_list; extern list<CThread *> daemon_list;
extern list<Server *> server_list;
static struct itimerval oitrl, itrl;
void init(void){ void init(void){
signal(SIGALRM, threadsClock); signal(SIGALRM, threadsClock);
setThreadsClock(); setThreadsClock();
} }
//设置全局线程时钟
void setThreadsClock(void){
itrl.it_interval.tv_sec = 0;
itrl.it_interval.tv_usec = 500000;
itrl.it_value.tv_sec = 0;
itrl.it_value.tv_usec = 500000;
setitimer(ITIMER_REAL, &itrl, &oitrl);
}
//时钟滴答调用函数
void threadsClock(int n){
for(auto i = daemon_list.begin(); i != daemon_list.end(); i++){
(*i)->Daemon();
}
daemon_list.clear();
}
int main(void){ int main(void){
init(); init();
CMap map("./PCS"); CMap map("./PCS");

10
net.h
View File

@ -26,6 +26,8 @@
#include <arpa/inet.h> #include <arpa/inet.h>
#include <unistd.h> #include <unistd.h>
#include <netdb.h> #include <netdb.h>
#include <sys/time.h>
#include <signal.h>
#include "cpart.h" #include "cpart.h"
@ -178,8 +180,6 @@ public:
} }
}; };
struct pcs_result{ struct pcs_result{
char *name[16]; char *name[16];
uint32_t in_size; uint32_t in_size;
@ -188,4 +188,10 @@ struct pcs_result{
char *out_buff; char *out_buff;
}; };
//设置全局线程时钟
void setThreadsClock(void);
//时钟滴答调用函数
void threadsClock(int);
#endif /* net_hpp */ #endif /* net_hpp */

9
server.cpp Normal file
View File

@ -0,0 +1,9 @@
//
// server.cpp
// Net
//
// Created by 胡一兵 on 2019/1/16.
// Copyright © 2019年 Bakantu. All rights reserved.
//
#include "server.h"

42
server.h Normal file
View File

@ -0,0 +1,42 @@
//
// server.hpp
// Net
//
// Created by 胡一兵 on 2019/1/16.
// Copyright © 2019年 Bakantu. All rights reserved.
//
#ifndef server_h
#define server_h
#include "net.h"
class Server;
static list<Server *> server_list;
//外来数据包解析结构
struct compute_result{
string name;
vector<void *> *args_in;
vector<void *> *args_out;
};
//原始数据包
struct packet{
unsigned int type;
vector<pair<unsigned int, void *>> buffs;
};
class Server{
vector<compute_result> cpurs;
vector<packet> packets;
Socket socket;
Server(string ip_addr):socket(ip_addr,9048,true,false){
}
void Deamon(void){
socket.PacketRecv(<#Addr t_addr#>)
}
};
#endif /* server_h */