diff options
Diffstat (limited to 'src/core/thread/TaskRunner.cpp')
-rw-r--r-- | src/core/thread/TaskRunner.cpp | 157 |
1 files changed, 54 insertions, 103 deletions
diff --git a/src/core/thread/TaskRunner.cpp b/src/core/thread/TaskRunner.cpp index 35ac561e..6eed7a89 100644 --- a/src/core/thread/TaskRunner.cpp +++ b/src/core/thread/TaskRunner.cpp @@ -28,121 +28,72 @@ #include "core/thread/TaskRunner.h" +#include <qobjectdefs.h> +#include <qthread.h> + +#include <memory> + #include "core/thread/Task.h" #include "spdlog/spdlog.h" -GpgFrontend::Thread::TaskRunner::TaskRunner() = default; +namespace GpgFrontend::Thread { -GpgFrontend::Thread::TaskRunner::~TaskRunner() = default; +class TaskRunner::Impl : public QThread { + public: + void PostTask(Task* task) { + if (task == nullptr) { + SPDLOG_ERROR("task posted is null"); + return; + } -void GpgFrontend::Thread::TaskRunner::PostTask(Task* task) { - if (task == nullptr) { - SPDLOG_ERROR("task posted is null"); - return; + task->setParent(nullptr); + connect(task, &Task::SignalTaskEnd, task, &Task::deleteLater); + + if (task->GetSequency()) { + SPDLOG_TRACE("post task: {}, sequency mode: {}", task->GetFullID(), + task->GetSequency()); + task->moveToThread(this); + } else { + // if it need to run concurrently, we should create a new thread to + // run it. + auto* concurrent_thread = new QThread(this); + + connect(task, &Task::SignalTaskEnd, concurrent_thread, &QThread::quit); + // concurrent thread is responsible for deleting the task + connect(concurrent_thread, &QThread::finished, task, &Task::deleteLater); + // concurrent thread is responsible for self deleting + connect(concurrent_thread, &QThread::finished, concurrent_thread, + &QThread::deleteLater); + + // start thread + concurrent_thread->start(); + task->moveToThread(concurrent_thread); + } + emit task->SignalRun(); } - SPDLOG_TRACE("post task: {}", task->GetFullID()); + void PostScheduleTask(Task* task, size_t seconds) { + if (task == nullptr) return; + // TODO + } +}; - task->setParent(nullptr); - task->moveToThread(this); +GpgFrontend::Thread::TaskRunner::TaskRunner() : p_(std::make_unique<Impl>()) {} - { - std::lock_guard<std::mutex> lock(tasks_mutex_); - tasks.push(task); - } - quit(); -} +GpgFrontend::Thread::TaskRunner::~TaskRunner() = default; -void GpgFrontend::Thread::TaskRunner::PostScheduleTask(Task* task, - size_t seconds) { - if (task == nullptr) return; - // TODO +void GpgFrontend::Thread::TaskRunner::PostTask(Task* task) { + p_->PostTask(task); } -[[noreturn]] void GpgFrontend::Thread::TaskRunner::run() { - SPDLOG_TRACE("task runner runing, thread id: {}", QThread::currentThreadId()); - while (true) { - if (tasks.empty()) { - SPDLOG_TRACE("no tasks to run, trapping into event loop..."); - exec(); - } else { - SPDLOG_TRACE("start to run task(s), queue size: {}", tasks.size()); - - Task* task = nullptr; - { - std::lock_guard<std::mutex> lock(tasks_mutex_); - task = std::move(tasks.front()); - tasks.pop(); - pending_tasks_.insert({task->GetUUID(), task}); - } - - if (task != nullptr) { - try { - // triger - SPDLOG_TRACE("running task {}, sequency: {}", task->GetFullID(), - task->GetSequency()); - - // when a signal SignalTaskEnd raise, do unregister work - connect(task, &Task::SignalTaskEnd, this, [this, task]() { - unregister_finished_task(task->GetUUID()); - }); - - if (!task->GetSequency()) { - // if it need to run concurrently, we should create a new thread to - // run it. - auto* concurrent_thread = new QThread(nullptr); - task->setParent(nullptr); - task->moveToThread(concurrent_thread); - // start thread - concurrent_thread->start(); - - connect(task, &Task::SignalTaskEnd, concurrent_thread, - &QThread::quit); - // concurrent thread is responsible for deleting the task - connect(concurrent_thread, &QThread::finished, task, - &Task::deleteLater); - } - - // run the task - task->run(); - } catch (const std::exception& e) { - SPDLOG_ERROR("task runner: exception in task {}, exception: {}", - task->GetFullID(), e.what()); - // if any exception caught, destroy the task, remove the task from the - // pending tasks - unregister_finished_task(task->GetUUID()); - } catch (...) { - SPDLOG_ERROR("task runner: unknown exception in task: {}", - task->GetFullID()); - // if any exception caught, destroy the task, remove the task from the - // pending tasks - unregister_finished_task(task->GetUUID()); - } - } - } - } +void TaskRunner::PostScheduleTask(Task* task, size_t seconds) { + p_->PostScheduleTask(task, seconds); } -/** - * @brief - * - */ -void GpgFrontend::Thread::TaskRunner::unregister_finished_task( - std::string task_uuid) { - SPDLOG_DEBUG("cleaning task {}", task_uuid); - // search in map - auto pending_task = pending_tasks_.find(task_uuid); - if (pending_task == pending_tasks_.end()) { - SPDLOG_ERROR("cannot find task in pending list: {}", task_uuid); - return; - } else { - std::lock_guard<std::mutex> lock(tasks_mutex_); - // if thread runs sequenctly, that means the thread is living in this - // thread, so we can delete it. Or, its living thread need to delete it. - if (pending_task->second->GetSequency()) - pending_task->second->deleteLater(); - pending_tasks_.erase(pending_task); - } +void TaskRunner::Start() { p_->start(); } - SPDLOG_DEBUG("clean task {} done", task_uuid); -} +QThread* TaskRunner::GetThread() { return p_.get(); } + +bool TaskRunner::IsRunning() { return p_->isRunning(); } + +} // namespace GpgFrontend::Thread |