This commit is contained in:
Saturneic 2019-01-15 18:35:20 +08:00
parent 66eef02948
commit 595034f3f4
3 changed files with 122 additions and 26 deletions

View File

@ -8,7 +8,10 @@
#include "cthread.h"
CThread::CThread(CMap *tp_map):p_map(tp_map){
static struct itimerval oitrl, itrl;
CThread::CThread(CMap *tp_map):p_map(tp_map),idxtid(0){
lpcs.if_als = false;
// 构造空的传入与传出参数列表
for(auto k = p_map->cparts.begin(); k != p_map->cparts.end(); k++){
vector<void *> args,args_out;
@ -53,6 +56,10 @@ CThread::~CThread(){
}
void CThread::Analyse(void){
// 如果上一个并行执行进程还没有退出
if(lpcs.if_als == true){
//还没想好怎么做
}
for(auto k = p_map->cparts.begin(); k != p_map->cparts.end(); k++){
auto cpart_depends = (*k).second->depends;
// 如果计算模块已经执行则跳过
@ -92,7 +99,7 @@ void CThread::Analyse(void){
}
}
line.push_back((*k).second);
lpcs.line.push_back((*k).second);
}
}
// 如果该计算模块没有依赖模块
@ -101,7 +108,7 @@ void CThread::Analyse(void){
if(rargs.find(k->second->name)->second.size() == k->second->fargs_in.size()){
// 如果该模块还没有被调用
if(ifsolved.find(name)->second == false){
line.push_back(k->second);
lpcs.line.push_back(k->second);
}
}
@ -110,37 +117,67 @@ void CThread::Analyse(void){
}
void CThread::DoLine(void){
list<pthread_t> threads;
for(auto pcp = line.begin(); pcp != line.end(); pcp++){
for(auto pcp = lpcs.line.begin(); pcp != lpcs.line.end(); pcp++){
string name = (*pcp)->name;
vector<void *> args = rargs.find(name)->second;
vector<int> fargs = (*pcp)->fargs_in;
vector<int> fargs_out = (*pcp)->fargs_out;
threads.push_back({0});
unsigned long ntid = idxtid++;
pthread_t npdt = 0;
// 创建新线程
struct thread_args *pt_ta = new struct thread_args({this,(*pcp),-1});
if(pthread_create(&threads.back(),NULL,&CThread::NewThread,(void *)(pt_ta))){
struct thread_args *pt_ta = new struct thread_args({ntid,this,(*pcp),-1});
if(pthread_create(&npdt,NULL,&CThread::NewThread,(void *)(pt_ta))){
throw "fail to create thread";
}
lpcs.threads.insert({ntid,npdt});
}
line.clear();
}
void CThread::SetDaemon(void){
}
void CThread::Daemon(void){
// 等待线程返回
for(auto i = threads.begin(); i != threads.end(); i++){
for(auto i = lpcs.child_finished.begin(); i != lpcs.child_finished.end(); i++){
unsigned long tid = (*i)->tid;
pthread_t cpdt = lpcs.threads.find(tid)->second;
struct thread_args *rpv = nullptr;
pthread_join((*i), (void **)&rpv);
pthread_join(cpdt, (void **)&rpv);
// 根据返回值处理计算任务状态
if(rpv->rtn == SUCCESS){
// 标识该计算模块中计算任务的状态
ifsolved.find(rpv->pcp->name)->second = true;
}
else{
}
// 释放线程资源
pthread_detach(cpdt);
// 在列表中销户证实宣告线程程结束
lpcs.threads.erase(tid);
printf("TID: %lu Deleted.\n",tid);
delete rpv;
}
lpcs.child_finished.clear();
if(lpcs.threads.size() > 0){
}
}
void CThread::ChildThreadFSH(struct thread_args *pta){
CThread *pct = pta->pct;
(pct->lpcs).child_finished.push_back(pta);
printf("Called TID %lu.\n",pta->tid);
}
void *CThread::NewThread(void *pv){
struct thread_args *pta = (struct thread_args *)pv;
printf("Calling CPART %s.\n",pta->pcp->name.data());
printf("Calling TID %lu.\n",pta->tid);
// 准备输入参数
PrepareArgsIn(pta->pct,pta->pcp);
if(pta->pcp->Run() == SUCCESS){
@ -151,7 +188,7 @@ void *CThread::NewThread(void *pv){
else{
pta->rtn = FAIL;
}
printf("Called CPART %s.\n",pta->pcp->name.data());
CThread::ChildThreadFSH(pta);
pthread_exit(pv);
}
@ -197,3 +234,16 @@ void CThread::GetArgsOut(CThread *pct,CPart *pcp){
}
}
//设置全局线程时钟
void setThreadsClock(void){
itrl.it_interval.tv_sec = 0;
itrl.it_interval.tv_usec = 500000;
itrl.it_value.tv_sec = 0;
itrl.it_value.tv_usec = 500000;
setitimer(ITIMER_REAL, &itrl, &oitrl);
}
//时钟滴答调用函数
void threadsClock(int n){
printf("Clock click.\n");
}

View File

@ -13,23 +13,56 @@
#include "cmap.h"
#include <pthread.h>
#include <sys/time.h>
#include <signal.h>
#include <list>
using std::list;
class CThread;
//线程信息记录结构体
struct thread_args{
// 子线程编号
unsigned long tid;
// 指向计算任务
CThread *pct;
// 指向计算模块
CPart *pcp;
// 储存计算模块调用的返回值
int rtn;
};
//并行任务处理进程信息结构体
struct line_process{
// 是否开启并行任务管理
bool if_als;
// 已经释放的子线程
list<struct thread_args *>child_finished;
// 子线程管理状态记录
map<unsigned long,pthread_t> threads;
// 计算模块处理队列
list<CPart *> line;
};
//计算进程管理结构
class CThread{
public:
// 对应的图结构管理结构
CMap *p_map;
// 计算模块处理队列
list<CPart *> line;
// 此计算进程中计算模块的传入参数数据列表
map<string,vector<void *>> rargs;
// 此计算进程的计算模块的传出参数数据列表
map<string,vector<void *>> rargs_out;
// 计算模块是否已经执行
map<string,bool> ifsolved;
// tid生成的依据
unsigned long idxtid;
// 并行任务处理进程
struct line_process lpcs;
// 守护进程定时器
struct itimerval itrl;
// 使用图结构管理结构来构造计算进程管理结构
CThread(CMap *tp_map);
~CThread();
@ -41,7 +74,13 @@ public:
*p_value = value;
(*k).second.push_back((void *)p_value);
}
// 分析图结构来构造处理队列
// 设置守护进程
void SetDaemon(void);
// 守护进程
void Daemon(void);
// 分析图结构来构造新的处理队列
void Analyse(void);
// 执行处理队列
void DoLine(void);
@ -51,12 +90,13 @@ public:
static void PrepareArgsIn(CThread *pct,CPart *);
// 获得计算模块执行后的输出参数
static void GetArgsOut(CThread *pct,CPart *);
// 通知计算任务子线程即将结束
static void ChildThreadFSH(struct thread_args *);
};
struct thread_args{
CThread *pct;
CPart *pcp;
int rtn;
};
//设置全局线程时钟
void setThreadsClock(void);
//时钟滴答调用函数
void threadsClock(int);
#endif /* cthread_h */

12
net.cpp
View File

@ -11,9 +11,13 @@
#include "cmap.h"
#include "cthread.h"
void init(void){
signal(SIGALRM, threadsClock);
setThreadsClock();
}
int main(void){
init();
CMap map("./PCS");
CThread thread(&map);
thread.AddArgs<int>("B", 4);
@ -22,8 +26,10 @@ int main(void){
thread.AddArgs<double>("C", 3.0);
thread.Analyse();
thread.DoLine();
thread.Analyse();
thread.DoLine();
while(1){
thread.Daemon();
}
return 0;
}