diff --git a/cthread.cpp b/cthread.cpp index 85ce976..2b09239 100644 --- a/cthread.cpp +++ b/cthread.cpp @@ -8,7 +8,10 @@ #include "cthread.h" -CThread::CThread(CMap *tp_map):p_map(tp_map){ +static struct itimerval oitrl, itrl; + +CThread::CThread(CMap *tp_map):p_map(tp_map),idxtid(0){ + lpcs.if_als = false; // 构造空的传入与传出参数列表 for(auto k = p_map->cparts.begin(); k != p_map->cparts.end(); k++){ vector args,args_out; @@ -53,6 +56,10 @@ CThread::~CThread(){ } void CThread::Analyse(void){ +// 如果上一个并行执行进程还没有退出 + if(lpcs.if_als == true){ + //还没想好怎么做 + } for(auto k = p_map->cparts.begin(); k != p_map->cparts.end(); k++){ auto cpart_depends = (*k).second->depends; // 如果计算模块已经执行则跳过 @@ -92,7 +99,7 @@ void CThread::Analyse(void){ } } - line.push_back((*k).second); + lpcs.line.push_back((*k).second); } } // 如果该计算模块没有依赖模块 @@ -101,7 +108,7 @@ void CThread::Analyse(void){ if(rargs.find(k->second->name)->second.size() == k->second->fargs_in.size()){ // 如果该模块还没有被调用 if(ifsolved.find(name)->second == false){ - line.push_back(k->second); + lpcs.line.push_back(k->second); } } @@ -110,37 +117,67 @@ void CThread::Analyse(void){ } void CThread::DoLine(void){ - list threads; - for(auto pcp = line.begin(); pcp != line.end(); pcp++){ + for(auto pcp = lpcs.line.begin(); pcp != lpcs.line.end(); pcp++){ string name = (*pcp)->name; vector args = rargs.find(name)->second; vector fargs = (*pcp)->fargs_in; vector fargs_out = (*pcp)->fargs_out; - threads.push_back({0}); + unsigned long ntid = idxtid++; + pthread_t npdt = 0; // 创建新线程 - struct thread_args *pt_ta = new struct thread_args({this,(*pcp),-1}); - if(pthread_create(&threads.back(),NULL,&CThread::NewThread,(void *)(pt_ta))){ + struct thread_args *pt_ta = new struct thread_args({ntid,this,(*pcp),-1}); + if(pthread_create(&npdt,NULL,&CThread::NewThread,(void *)(pt_ta))){ throw "fail to create thread"; } + lpcs.threads.insert({ntid,npdt}); } - line.clear(); -// 等待线程返回 - for(auto i = threads.begin(); i != threads.end(); i++){ + +} + +void CThread::SetDaemon(void){ + +} + +void CThread::Daemon(void){ + // 等待线程返回 + for(auto i = lpcs.child_finished.begin(); i != lpcs.child_finished.end(); i++){ + unsigned long tid = (*i)->tid; + pthread_t cpdt = lpcs.threads.find(tid)->second; struct thread_args *rpv = nullptr; - pthread_join((*i), (void **)&rpv); -// 根据返回值处理计算任务状态 + pthread_join(cpdt, (void **)&rpv); + // 根据返回值处理计算任务状态 if(rpv->rtn == SUCCESS){ + // 标识该计算模块中计算任务的状态 ifsolved.find(rpv->pcp->name)->second = true; } + else{ + + } + // 释放线程资源 + pthread_detach(cpdt); + // 在列表中销户证实宣告线程程结束 + lpcs.threads.erase(tid); + + printf("TID: %lu Deleted.\n",tid); delete rpv; } + lpcs.child_finished.clear(); + if(lpcs.threads.size() > 0){ + + } +} + +void CThread::ChildThreadFSH(struct thread_args *pta){ + CThread *pct = pta->pct; + (pct->lpcs).child_finished.push_back(pta); + printf("Called TID %lu.\n",pta->tid); } void *CThread::NewThread(void *pv){ struct thread_args *pta = (struct thread_args *)pv; - printf("Calling CPART %s.\n",pta->pcp->name.data()); + printf("Calling TID %lu.\n",pta->tid); // 准备输入参数 PrepareArgsIn(pta->pct,pta->pcp); if(pta->pcp->Run() == SUCCESS){ @@ -151,7 +188,7 @@ void *CThread::NewThread(void *pv){ else{ pta->rtn = FAIL; } - printf("Called CPART %s.\n",pta->pcp->name.data()); + CThread::ChildThreadFSH(pta); pthread_exit(pv); } @@ -197,3 +234,16 @@ void CThread::GetArgsOut(CThread *pct,CPart *pcp){ } } + +//设置全局线程时钟 +void setThreadsClock(void){ + itrl.it_interval.tv_sec = 0; + itrl.it_interval.tv_usec = 500000; + itrl.it_value.tv_sec = 0; + itrl.it_value.tv_usec = 500000; + setitimer(ITIMER_REAL, &itrl, &oitrl); +} +//时钟滴答调用函数 +void threadsClock(int n){ + printf("Clock click.\n"); +} diff --git a/cthread.h b/cthread.h index c14deda..daade1b 100644 --- a/cthread.h +++ b/cthread.h @@ -13,23 +13,56 @@ #include "cmap.h" #include +#include +#include #include using std::list; +class CThread; + +//线程信息记录结构体 +struct thread_args{ + // 子线程编号 + unsigned long tid; + // 指向计算任务 + CThread *pct; + // 指向计算模块 + CPart *pcp; + // 储存计算模块调用的返回值 + int rtn; +}; + +//并行任务处理进程信息结构体 +struct line_process{ +// 是否开启并行任务管理 + bool if_als; +// 已经释放的子线程 + listchild_finished; +// 子线程管理状态记录 + map threads; +// 计算模块处理队列 + list line; +}; + //计算进程管理结构 class CThread{ public: // 对应的图结构管理结构 CMap *p_map; -// 计算模块处理队列 - list line; // 此计算进程中计算模块的传入参数数据列表 map> rargs; // 此计算进程的计算模块的传出参数数据列表 map> rargs_out; // 计算模块是否已经执行 map ifsolved; +// tid生成的依据 + unsigned long idxtid; +// 并行任务处理进程 + struct line_process lpcs; +// 守护进程定时器 + struct itimerval itrl; + // 使用图结构管理结构来构造计算进程管理结构 CThread(CMap *tp_map); ~CThread(); @@ -41,7 +74,13 @@ public: *p_value = value; (*k).second.push_back((void *)p_value); } -// 分析图结构来构造处理队列 + +// 设置守护进程 + void SetDaemon(void); + +// 守护进程 + void Daemon(void); +// 分析图结构来构造新的处理队列 void Analyse(void); // 执行处理队列 void DoLine(void); @@ -51,12 +90,13 @@ public: static void PrepareArgsIn(CThread *pct,CPart *); // 获得计算模块执行后的输出参数 static void GetArgsOut(CThread *pct,CPart *); +// 通知计算任务子线程即将结束 + static void ChildThreadFSH(struct thread_args *); }; -struct thread_args{ - CThread *pct; - CPart *pcp; - int rtn; -}; +//设置全局线程时钟 +void setThreadsClock(void); +//时钟滴答调用函数 +void threadsClock(int); #endif /* cthread_h */ diff --git a/net.cpp b/net.cpp index 14fd530..b565be7 100644 --- a/net.cpp +++ b/net.cpp @@ -11,9 +11,13 @@ #include "cmap.h" #include "cthread.h" - +void init(void){ + signal(SIGALRM, threadsClock); + setThreadsClock(); +} int main(void){ + init(); CMap map("./PCS"); CThread thread(&map); thread.AddArgs("B", 4); @@ -22,8 +26,10 @@ int main(void){ thread.AddArgs("C", 3.0); thread.Analyse(); thread.DoLine(); - thread.Analyse(); - thread.DoLine(); + + while(1){ + thread.Daemon(); + } return 0; }