From 1d5d9e3302c84903b505370678d4c8a2a36312b7 Mon Sep 17 00:00:00 2001 From: ales stibal Date: Wed, 3 Jul 2024 18:51:48 +0200 Subject: [PATCH] thread pool - add pool task diagnostic info collection - tasks now must have info_* methods, optionally set_log_buffer() --- src/service/tpool.hpp | 84 ++++++++++++++++++++++++++++++++++++++----- 1 file changed, 76 insertions(+), 8 deletions(-) diff --git a/src/service/tpool.hpp b/src/service/tpool.hpp index 0ea661f..2a3bf7b 100644 --- a/src/service/tpool.hpp +++ b/src/service/tpool.hpp @@ -79,7 +79,11 @@ class PoolTask { public: PoolTask() = default; virtual ~PoolTask() = default; - virtual void execute(std::atomic_bool const& stop_flag) { /* must be implemented to support instantiation */ }; + virtual void execute(std::atomic_bool const& stop_flag) = 0; + virtual std::string info_short() const = 0; + virtual std::string info_long() const = 0; + virtual std::string info_detailed() const { return info_long(); }; + virtual void set_log_buffer(std::stringstream& ss) { /* this override is not mandatory */ }; }; @@ -88,11 +92,47 @@ class ThreadPool { // calling functions taking reference std::vector workers_; - std::queue> tasks_; + std::deque> tasks_; + + struct worker_tasks_t { + // each thread worker will collect info from tasks + std::vector info_short; + std::vector info_long; + std::vector info_details; + // log buffer is shared with the task, so they can log into + std::vector log_buffer; + std::vector is_finished; + + void init(size_t threads) { + // initialize currently processed tasks info + for (size_t i = 0; i < threads; ++i) { + info_short.emplace_back(); + info_long.emplace_back(); + info_details.emplace_back(); + log_buffer.emplace_back(); + is_finished.emplace_back(false); + } + }; + + void clear(size_t thread_idx) { + info_short[thread_idx].clear(); + info_long[thread_idx].clear(); + info_details[thread_idx].clear(); + + log_buffer[thread_idx].clear(); + log_buffer[thread_idx].str({}); + + is_finished[thread_idx] = false; + }; + }; + worker_tasks_t worker_tasks {}; + static inline std::atomic_bool collect_tasks_info = true; + + mutable std::mutex lock_; std::condition_variable cv_; - std::atomic_bool stop_ = false; + std::atomic_bool stop_ = false; std::atomic_uint active = 0; struct stats_t { @@ -106,14 +146,17 @@ class ThreadPool { public: stats_t const &stats() const { return stats_; } + worker_tasks_t const& get_worker_tasks() const { return worker_tasks; } // advice/recommendation for longer task to schedule their loops to check stop flag (if any) static constexpr unsigned long milliseconds = 100; explicit ThreadPool(size_t threads) { - for (size_t i = 0; i < threads; ++i) { - workers_.emplace_back([this] { + for (size_t thread_idx = 0; thread_idx < threads; ++thread_idx) { + worker_tasks.init(threads); + + workers_.emplace_back([this, thread_idx] { while (true) { std::unique_ptr task; { @@ -129,7 +172,15 @@ class ThreadPool { continue; } task = std::move(this->tasks_.front()); - this->tasks_.pop(); + this->tasks_.pop_front(); + + if(collect_tasks_info) { + worker_tasks.clear(thread_idx); + worker_tasks.info_short[thread_idx] = task->info_short(); + worker_tasks.info_long[thread_idx] = task->info_long(); + worker_tasks.info_details[thread_idx] = task->info_detailed(); + task->set_log_buffer(worker_tasks.log_buffer[thread_idx]); + } } this->active++; @@ -148,10 +199,14 @@ class ThreadPool { this->stats_.unk_except++; } this->active--; + { + auto lc_ = std::scoped_lock(lock_); + worker_tasks.is_finished[thread_idx] = true; + } } }); - pthread_setname_np(workers_.back().native_handle(), string_format("sxy_tpo_%d", i).c_str()); + pthread_setname_np(workers_.back().native_handle(), string_format("sxy_tpo_%d", thread_idx).c_str()); } } @@ -165,6 +220,19 @@ class ThreadPool { return tasks_.size(); } + std::mutex& get_lock() const { + return lock_; + } + + auto const& get_tasks() const { + return tasks_; + } + + auto const& get_workers() const { + return workers_; + } + + size_t tasks_running() const { return active; } @@ -184,7 +252,7 @@ class ThreadPool { if (stop_) { return -1; } - tasks_.emplace(std::move(f)); + tasks_.emplace_back(std::move(f)); ret = socle::raw::to_signed_cast(tasks_.size()).value_or(-1); } cv_.notify_one();