diff --git a/cmap.cpp b/cmap.cpp index 978d77d..1f68299 100644 --- a/cmap.cpp +++ b/cmap.cpp @@ -158,3 +158,5 @@ Depends CMap::ReadItem(string item){ } return dep; } + + diff --git a/cthread.cpp b/cthread.cpp index 2b09239..55ed579 100644 --- a/cthread.cpp +++ b/cthread.cpp @@ -10,7 +10,9 @@ static struct itimerval oitrl, itrl; -CThread::CThread(CMap *tp_map):p_map(tp_map),idxtid(0){ +list daemon_list = {}; + +CThread::CThread(CMap *tp_map,int thdnum):p_map(tp_map),idxtid(0),thdnum(thdnum){ lpcs.if_als = false; // 构造空的传入与传出参数列表 for(auto k = p_map->cparts.begin(); k != p_map->cparts.end(); k++){ @@ -88,6 +90,7 @@ void CThread::Analyse(void){ vector args_out = rargs_out.find(ditem->t_cpart->name)->second; // 检查传入传出参数的类型是否匹配 for(auto itm = args.begin(); itm != args.end();itm++){ +// 参数不匹配则报异常 if(s_fargs_in[count++] != f_fargs_out[*itm]) throw "type conflict"; // 重新分配内存 if(f_fargs_out[*itm] == INT){ @@ -100,6 +103,8 @@ void CThread::Analyse(void){ } lpcs.line.push_back((*k).second); +// 大于线程最高并行数则跳出 + if(count == thdnum) break; } } // 如果该计算模块没有依赖模块 @@ -132,43 +137,45 @@ void CThread::DoLine(void){ throw "fail to create thread"; } lpcs.threads.insert({ntid,npdt}); + lpcs.cpttid.insert({(*pcp),ntid}); } } void CThread::SetDaemon(void){ - + daemon_list.push_back(this); } void CThread::Daemon(void){ - // 等待线程返回 +// 等待线程返回 for(auto i = lpcs.child_finished.begin(); i != lpcs.child_finished.end(); i++){ unsigned long tid = (*i)->tid; pthread_t cpdt = lpcs.threads.find(tid)->second; struct thread_args *rpv = nullptr; pthread_join(cpdt, (void **)&rpv); - // 根据返回值处理计算任务状态 +// 根据返回值处理计算任务状态 if(rpv->rtn == SUCCESS){ - // 标识该计算模块中计算任务的状态 +// 标识该计算模块中计算任务的状态 ifsolved.find(rpv->pcp->name)->second = true; } else{ } - // 释放线程资源 +// 释放线程资源 pthread_detach(cpdt); - // 在列表中销户证实宣告线程程结束 +// 在列表中销户证实宣告线程程结束 lpcs.threads.erase(tid); - + lpcs.cpttid.erase(rpv->pcp); printf("TID: %lu Deleted.\n",tid); delete rpv; } lpcs.child_finished.clear(); if(lpcs.threads.size() > 0){ - + setTrdClock(this); } } +//子线程即将结束时调用 void CThread::ChildThreadFSH(struct thread_args *pta){ CThread *pct = pta->pct; (pct->lpcs).child_finished.push_back(pta); @@ -176,11 +183,20 @@ void CThread::ChildThreadFSH(struct thread_args *pta){ } void *CThread::NewThread(void *pv){ +// 取消信号对于线程起作用 + pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); +// 线程收到取消信号时立即取消 + pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS,NULL); struct thread_args *pta = (struct thread_args *)pv; printf("Calling TID %lu.\n",pta->tid); // 准备输入参数 PrepareArgsIn(pta->pct,pta->pcp); if(pta->pcp->Run() == SUCCESS){ +// 检查线程是否已经被取消 + pthread_testcancel(); + +// 取消信号对于线程不再起作用,以防参数混乱 + pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); // 处理输出参数 GetArgsOut(pta->pct,pta->pcp); pta->rtn = SUCCESS; @@ -192,6 +208,10 @@ void *CThread::NewThread(void *pv){ pthread_exit(pv); } +long CThread::FindChildPCS(string name){ + return lpcs.cpttid.find(p_map->cparts.find(name)->second)->second; +} + void CThread::PrepareArgsIn(CThread *pct,CPart *pcp){ // 读入实际计算进程参数列表中的输入参数 vector args = pct->rargs.find(pcp->name)->second; @@ -235,6 +255,15 @@ void CThread::GetArgsOut(CThread *pct,CPart *pcp){ } +int CThread::CancelChildPCS(unsigned long tid){ +// 找到子线程的操作柄 + pthread_t pht = lpcs.threads.find(tid)->second; + pthread_cancel(pht); +// 对线程进行销户操作 + lpcs.threads.erase(tid); + return 0; +} + //设置全局线程时钟 void setThreadsClock(void){ itrl.it_interval.tv_sec = 0; @@ -245,5 +274,13 @@ void setThreadsClock(void){ } //时钟滴答调用函数 void threadsClock(int n){ - printf("Clock click.\n"); + for(auto i = daemon_list.begin(); i != daemon_list.end(); i++){ + (*i)->Daemon(); + } + daemon_list.clear(); +} + +//注册任务进程时钟调用 +void setTrdClock(CThread *ptd){ + daemon_list.push_back(ptd); } diff --git a/cthread.h b/cthread.h index daade1b..22f4068 100644 --- a/cthread.h +++ b/cthread.h @@ -19,8 +19,10 @@ using std::list; + class CThread; + //线程信息记录结构体 struct thread_args{ // 子线程编号 @@ -41,10 +43,19 @@ struct line_process{ listchild_finished; // 子线程管理状态记录 map threads; +// 记录计算模块对应的线程id + map cpttid; // 计算模块处理队列 list line; }; +//外来数据包解析结构 +struct compute_result{ + string name; + vector args_in; + vector args_out; +}; + //计算进程管理结构 class CThread{ public: @@ -58,13 +69,15 @@ public: map ifsolved; // tid生成的依据 unsigned long idxtid; +// 并行线程的个数 + int thdnum; // 并行任务处理进程 struct line_process lpcs; // 守护进程定时器 struct itimerval itrl; // 使用图结构管理结构来构造计算进程管理结构 - CThread(CMap *tp_map); + CThread(CMap *tp_map, int thdnum = 4); ~CThread(); template // 添加相关计算模块的传入参数 @@ -84,6 +97,16 @@ public: void Analyse(void); // 执行处理队列 void DoLine(void); +// 通过子线程所属的模块名找到子线程的id + long FindChildPCS(string name); +// 取消子线程 + int CancelChildPCS(unsigned long tid); +// 处理数据包 + int GetCPUResult(struct compute_result *); +// 导出数据包 + struct compute_result CPUResult(CPart *); +// 查询计算模块是否已经解决 + CPart *IfCPTSolved(string name); // 建立新线程执行计算模块 static void * NewThread(void *); // 为计算模块的调用准备输入参数 @@ -98,5 +121,7 @@ public: void setThreadsClock(void); //时钟滴答调用函数 void threadsClock(int); +//注册任务进程时钟调用 +void setTrdClock(CThread *ptd); #endif /* cthread_h */ diff --git a/net.cpp b/net.cpp index b565be7..330c596 100644 --- a/net.cpp +++ b/net.cpp @@ -11,6 +11,8 @@ #include "cmap.h" #include "cthread.h" +extern list daemon_list; + void init(void){ signal(SIGALRM, threadsClock); setThreadsClock(); @@ -26,9 +28,10 @@ int main(void){ thread.AddArgs("C", 3.0); thread.Analyse(); thread.DoLine(); - + thread.SetDaemon(); + thread.CancelChildPCS(0); while(1){ - thread.Daemon(); + sleep(100); } return 0; }