Skip to content

Commit

Permalink
thread pool - add pool task diagnostic info collection
Browse files Browse the repository at this point in the history
- tasks now must have info_* methods, optionally set_log_buffer()
  • Loading branch information
astibal committed Jul 3, 2024
1 parent 99f4800 commit 1d5d9e3
Showing 1 changed file with 76 additions and 8 deletions.
84 changes: 76 additions & 8 deletions src/service/tpool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,11 @@ class PoolTask {
public:
PoolTask() = default;
virtual ~PoolTask() = default;
virtual void execute(std::atomic_bool const& stop_flag) { /* must be implemented to support instantiation */ };
virtual void execute(std::atomic_bool const& stop_flag) = 0;
virtual std::string info_short() const = 0;
virtual std::string info_long() const = 0;
virtual std::string info_detailed() const { return info_long(); };
virtual void set_log_buffer(std::stringstream& ss) { /* this override is not mandatory */ };
};


Expand All @@ -88,11 +92,47 @@ class ThreadPool {

// calling functions taking reference
std::vector<std::thread> workers_;
std::queue<std::unique_ptr<PoolTask>> tasks_;
std::deque<std::unique_ptr<PoolTask>> tasks_;

struct worker_tasks_t {
// each thread worker will collect info from tasks
std::vector<std::string> info_short;
std::vector<std::string> info_long;
std::vector<std::string> info_details;
// log buffer is shared with the task, so they can log into
std::vector<std::stringstream> log_buffer;
std::vector<bool> is_finished;

void init(size_t threads) {
// initialize currently processed tasks info
for (size_t i = 0; i < threads; ++i) {
info_short.emplace_back();
info_long.emplace_back();
info_details.emplace_back();
log_buffer.emplace_back();
is_finished.emplace_back(false);
}
};

void clear(size_t thread_idx) {
info_short[thread_idx].clear();
info_long[thread_idx].clear();
info_details[thread_idx].clear();

log_buffer[thread_idx].clear();
log_buffer[thread_idx].str({});

is_finished[thread_idx] = false;
};
};
worker_tasks_t worker_tasks {};
static inline std::atomic_bool collect_tasks_info = true;


mutable std::mutex lock_;
std::condition_variable cv_;
std::atomic_bool stop_ = false;

std::atomic_bool stop_ = false;
std::atomic_uint active = 0;

struct stats_t {
Expand All @@ -106,14 +146,17 @@ class ThreadPool {

public:
stats_t const &stats() const { return stats_; }
worker_tasks_t const& get_worker_tasks() const { return worker_tasks; }

// advice/recommendation for longer task to schedule their loops to check stop flag (if any)
static constexpr unsigned long milliseconds = 100;

explicit ThreadPool(size_t threads) {

for (size_t i = 0; i < threads; ++i) {
workers_.emplace_back([this] {
for (size_t thread_idx = 0; thread_idx < threads; ++thread_idx) {
worker_tasks.init(threads);

workers_.emplace_back([this, thread_idx] {
while (true) {
std::unique_ptr<PoolTask> task;
{
Expand All @@ -129,7 +172,15 @@ class ThreadPool {
continue;
}
task = std::move(this->tasks_.front());
this->tasks_.pop();
this->tasks_.pop_front();

if(collect_tasks_info) {
worker_tasks.clear(thread_idx);
worker_tasks.info_short[thread_idx] = task->info_short();
worker_tasks.info_long[thread_idx] = task->info_long();
worker_tasks.info_details[thread_idx] = task->info_detailed();
task->set_log_buffer(worker_tasks.log_buffer[thread_idx]);
}
}

this->active++;
Expand All @@ -148,10 +199,14 @@ class ThreadPool {
this->stats_.unk_except++;
}
this->active--;
{
auto lc_ = std::scoped_lock(lock_);
worker_tasks.is_finished[thread_idx] = true;
}
}
});

pthread_setname_np(workers_.back().native_handle(), string_format("sxy_tpo_%d", i).c_str());
pthread_setname_np(workers_.back().native_handle(), string_format("sxy_tpo_%d", thread_idx).c_str());
}
}

Expand All @@ -165,6 +220,19 @@ class ThreadPool {
return tasks_.size();
}

std::mutex& get_lock() const {
return lock_;
}

auto const& get_tasks() const {
return tasks_;
}

auto const& get_workers() const {
return workers_;
}


size_t tasks_running() const {
return active;
}
Expand All @@ -184,7 +252,7 @@ class ThreadPool {
if (stop_) {
return -1;
}
tasks_.emplace(std::move(f));
tasks_.emplace_back(std::move(f));
ret = socle::raw::to_signed_cast<ssize_t>(tasks_.size()).value_or(-1);
}
cv_.notify_one();
Expand Down

0 comments on commit 1d5d9e3

Please sign in to comment.