This commit is contained in:
Saturneic 2019-01-15 21:50:46 +08:00
parent 595034f3f4
commit 98ddf7ed6f
4 changed files with 80 additions and 13 deletions

View File

@ -158,3 +158,5 @@ Depends CMap::ReadItem(string item){
} }
return dep; return dep;
} }

View File

@ -10,7 +10,9 @@
static struct itimerval oitrl, itrl; static struct itimerval oitrl, itrl;
CThread::CThread(CMap *tp_map):p_map(tp_map),idxtid(0){ list<CThread *> daemon_list = {};
CThread::CThread(CMap *tp_map,int thdnum):p_map(tp_map),idxtid(0),thdnum(thdnum){
lpcs.if_als = false; lpcs.if_als = false;
// 构造空的传入与传出参数列表 // 构造空的传入与传出参数列表
for(auto k = p_map->cparts.begin(); k != p_map->cparts.end(); k++){ for(auto k = p_map->cparts.begin(); k != p_map->cparts.end(); k++){
@ -88,6 +90,7 @@ void CThread::Analyse(void){
vector<void *> args_out = rargs_out.find(ditem->t_cpart->name)->second; vector<void *> args_out = rargs_out.find(ditem->t_cpart->name)->second;
// 检查传入传出参数的类型是否匹配 // 检查传入传出参数的类型是否匹配
for(auto itm = args.begin(); itm != args.end();itm++){ for(auto itm = args.begin(); itm != args.end();itm++){
// 参数不匹配则报异常
if(s_fargs_in[count++] != f_fargs_out[*itm]) throw "type conflict"; if(s_fargs_in[count++] != f_fargs_out[*itm]) throw "type conflict";
// 重新分配内存 // 重新分配内存
if(f_fargs_out[*itm] == INT){ if(f_fargs_out[*itm] == INT){
@ -100,6 +103,8 @@ void CThread::Analyse(void){
} }
lpcs.line.push_back((*k).second); lpcs.line.push_back((*k).second);
// 大于线程最高并行数则跳出
if(count == thdnum) break;
} }
} }
// 如果该计算模块没有依赖模块 // 如果该计算模块没有依赖模块
@ -132,43 +137,45 @@ void CThread::DoLine(void){
throw "fail to create thread"; throw "fail to create thread";
} }
lpcs.threads.insert({ntid,npdt}); lpcs.threads.insert({ntid,npdt});
lpcs.cpttid.insert({(*pcp),ntid});
} }
} }
void CThread::SetDaemon(void){ void CThread::SetDaemon(void){
daemon_list.push_back(this);
} }
void CThread::Daemon(void){ void CThread::Daemon(void){
// 等待线程返回 // 等待线程返回
for(auto i = lpcs.child_finished.begin(); i != lpcs.child_finished.end(); i++){ for(auto i = lpcs.child_finished.begin(); i != lpcs.child_finished.end(); i++){
unsigned long tid = (*i)->tid; unsigned long tid = (*i)->tid;
pthread_t cpdt = lpcs.threads.find(tid)->second; pthread_t cpdt = lpcs.threads.find(tid)->second;
struct thread_args *rpv = nullptr; struct thread_args *rpv = nullptr;
pthread_join(cpdt, (void **)&rpv); pthread_join(cpdt, (void **)&rpv);
// 根据返回值处理计算任务状态 // 根据返回值处理计算任务状态
if(rpv->rtn == SUCCESS){ if(rpv->rtn == SUCCESS){
// 标识该计算模块中计算任务的状态 // 标识该计算模块中计算任务的状态
ifsolved.find(rpv->pcp->name)->second = true; ifsolved.find(rpv->pcp->name)->second = true;
} }
else{ else{
} }
// 释放线程资源 // 释放线程资源
pthread_detach(cpdt); pthread_detach(cpdt);
// 在列表中销户证实宣告线程程结束 // 在列表中销户证实宣告线程程结束
lpcs.threads.erase(tid); lpcs.threads.erase(tid);
lpcs.cpttid.erase(rpv->pcp);
printf("TID: %lu Deleted.\n",tid); printf("TID: %lu Deleted.\n",tid);
delete rpv; delete rpv;
} }
lpcs.child_finished.clear(); lpcs.child_finished.clear();
if(lpcs.threads.size() > 0){ if(lpcs.threads.size() > 0){
setTrdClock(this);
} }
} }
//子线程即将结束时调用
void CThread::ChildThreadFSH(struct thread_args *pta){ void CThread::ChildThreadFSH(struct thread_args *pta){
CThread *pct = pta->pct; CThread *pct = pta->pct;
(pct->lpcs).child_finished.push_back(pta); (pct->lpcs).child_finished.push_back(pta);
@ -176,11 +183,20 @@ void CThread::ChildThreadFSH(struct thread_args *pta){
} }
void *CThread::NewThread(void *pv){ void *CThread::NewThread(void *pv){
// 取消信号对于线程起作用
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
// 线程收到取消信号时立即取消
pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS,NULL);
struct thread_args *pta = (struct thread_args *)pv; struct thread_args *pta = (struct thread_args *)pv;
printf("Calling TID %lu.\n",pta->tid); printf("Calling TID %lu.\n",pta->tid);
// 准备输入参数 // 准备输入参数
PrepareArgsIn(pta->pct,pta->pcp); PrepareArgsIn(pta->pct,pta->pcp);
if(pta->pcp->Run() == SUCCESS){ if(pta->pcp->Run() == SUCCESS){
// 检查线程是否已经被取消
pthread_testcancel();
// 取消信号对于线程不再起作用,以防参数混乱
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
// 处理输出参数 // 处理输出参数
GetArgsOut(pta->pct,pta->pcp); GetArgsOut(pta->pct,pta->pcp);
pta->rtn = SUCCESS; pta->rtn = SUCCESS;
@ -192,6 +208,10 @@ void *CThread::NewThread(void *pv){
pthread_exit(pv); pthread_exit(pv);
} }
long CThread::FindChildPCS(string name){
return lpcs.cpttid.find(p_map->cparts.find(name)->second)->second;
}
void CThread::PrepareArgsIn(CThread *pct,CPart *pcp){ void CThread::PrepareArgsIn(CThread *pct,CPart *pcp){
// 读入实际计算进程参数列表中的输入参数 // 读入实际计算进程参数列表中的输入参数
vector<void *> args = pct->rargs.find(pcp->name)->second; vector<void *> args = pct->rargs.find(pcp->name)->second;
@ -235,6 +255,15 @@ void CThread::GetArgsOut(CThread *pct,CPart *pcp){
} }
int CThread::CancelChildPCS(unsigned long tid){
// 找到子线程的操作柄
pthread_t pht = lpcs.threads.find(tid)->second;
pthread_cancel(pht);
// 对线程进行销户操作
lpcs.threads.erase(tid);
return 0;
}
//设置全局线程时钟 //设置全局线程时钟
void setThreadsClock(void){ void setThreadsClock(void){
itrl.it_interval.tv_sec = 0; itrl.it_interval.tv_sec = 0;
@ -245,5 +274,13 @@ void setThreadsClock(void){
} }
//时钟滴答调用函数 //时钟滴答调用函数
void threadsClock(int n){ void threadsClock(int n){
printf("Clock click.\n"); for(auto i = daemon_list.begin(); i != daemon_list.end(); i++){
(*i)->Daemon();
}
daemon_list.clear();
}
//注册任务进程时钟调用
void setTrdClock(CThread *ptd){
daemon_list.push_back(ptd);
} }

View File

@ -19,8 +19,10 @@
using std::list; using std::list;
class CThread; class CThread;
//线程信息记录结构体 //线程信息记录结构体
struct thread_args{ struct thread_args{
// 子线程编号 // 子线程编号
@ -41,10 +43,19 @@ struct line_process{
list<struct thread_args *>child_finished; list<struct thread_args *>child_finished;
// 子线程管理状态记录 // 子线程管理状态记录
map<unsigned long,pthread_t> threads; map<unsigned long,pthread_t> threads;
// 记录计算模块对应的线程id
map<const CPart * const,unsigned long> cpttid;
// 计算模块处理队列 // 计算模块处理队列
list<CPart *> line; list<CPart *> line;
}; };
//外来数据包解析结构
struct compute_result{
string name;
vector<void *> args_in;
vector<void *> args_out;
};
//计算进程管理结构 //计算进程管理结构
class CThread{ class CThread{
public: public:
@ -58,13 +69,15 @@ public:
map<string,bool> ifsolved; map<string,bool> ifsolved;
// tid生成的依据 // tid生成的依据
unsigned long idxtid; unsigned long idxtid;
// 并行线程的个数
int thdnum;
// 并行任务处理进程 // 并行任务处理进程
struct line_process lpcs; struct line_process lpcs;
// 守护进程定时器 // 守护进程定时器
struct itimerval itrl; struct itimerval itrl;
// 使用图结构管理结构来构造计算进程管理结构 // 使用图结构管理结构来构造计算进程管理结构
CThread(CMap *tp_map); CThread(CMap *tp_map, int thdnum = 4);
~CThread(); ~CThread();
template<class T> template<class T>
// 添加相关计算模块的传入参数 // 添加相关计算模块的传入参数
@ -84,6 +97,16 @@ public:
void Analyse(void); void Analyse(void);
// 执行处理队列 // 执行处理队列
void DoLine(void); void DoLine(void);
// 通过子线程所属的模块名找到子线程的id
long FindChildPCS(string name);
// 取消子线程
int CancelChildPCS(unsigned long tid);
// 处理数据包
int GetCPUResult(struct compute_result *);
// 导出数据包
struct compute_result CPUResult(CPart *);
// 查询计算模块是否已经解决
CPart *IfCPTSolved(string name);
// 建立新线程执行计算模块 // 建立新线程执行计算模块
static void * NewThread(void *); static void * NewThread(void *);
// 为计算模块的调用准备输入参数 // 为计算模块的调用准备输入参数
@ -98,5 +121,7 @@ public:
void setThreadsClock(void); void setThreadsClock(void);
//时钟滴答调用函数 //时钟滴答调用函数
void threadsClock(int); void threadsClock(int);
//注册任务进程时钟调用
void setTrdClock(CThread *ptd);
#endif /* cthread_h */ #endif /* cthread_h */

View File

@ -11,6 +11,8 @@
#include "cmap.h" #include "cmap.h"
#include "cthread.h" #include "cthread.h"
extern list<CThread *> daemon_list;
void init(void){ void init(void){
signal(SIGALRM, threadsClock); signal(SIGALRM, threadsClock);
setThreadsClock(); setThreadsClock();
@ -26,9 +28,10 @@ int main(void){
thread.AddArgs<double>("C", 3.0); thread.AddArgs<double>("C", 3.0);
thread.Analyse(); thread.Analyse();
thread.DoLine(); thread.DoLine();
thread.SetDaemon();
thread.CancelChildPCS(0);
while(1){ while(1){
thread.Daemon(); sleep(100);
} }
return 0; return 0;
} }