From dc8c31262be0fc89880261fe2a7862871a70254e Mon Sep 17 00:00:00 2001 From: ales stibal Date: Wed, 3 Jul 2024 18:52:59 +0200 Subject: [PATCH] thread pool - reflect API change in sx::http::AsyncRequest and CLI - cli gives now diagnostic information --- src/service/cmd/diag/diag_cmds.cpp | 54 ++++++++++++++++++++++++++++++ src/service/http/async_request.hpp | 29 +++++++++++++--- 2 files changed, 78 insertions(+), 5 deletions(-) diff --git a/src/service/cmd/diag/diag_cmds.cpp b/src/service/cmd/diag/diag_cmds.cpp index 2797630..f0e76bb 100644 --- a/src/service/cmd/diag/diag_cmds.cpp +++ b/src/service/cmd/diag/diag_cmds.cpp @@ -2236,6 +2236,9 @@ int cli_diag_worker_proxy_list(struct cli_def *cli, [[maybe_unused]] const char int cli_diag_worker_pool_list(struct cli_def *cli, const char *command, char *argv[], int argc) { using namespace sx::tp; + auto args = args_to_vec(argv, argc); + + int verbosity = ( ! args.empty() ) ? safe_val(args[0], iINF) : iINF; std::stringstream ss; @@ -2258,6 +2261,57 @@ int cli_diag_worker_pool_list(struct cli_def *cli, const char *command, char *ar ss << "\n"; ss << " is active: " << active << "\n"; + if(verbosity > iINF) { + + ss << "\n"; + + auto& tp = ThreadPool::instance::get(); + auto lc_ = std::scoped_lock(tp.get_lock()); + + + { + auto const& tasks = tp.get_tasks(); + auto tsz = tasks.size(); + if(tsz > 0) { + ss << "Queued tasks: \n"; + for (std::size_t i = 0; i < tsz; ++i) { + auto const &t = tasks[i]; + ss << (verbosity < iDEB ? t->info_long() : t->info_detailed()) << "\n"; + } + } + } + ss << "Running tasks: \n"; + { + auto const& workers = tp.get_workers(); + auto const& info = tp.get_worker_tasks(); + size_t pool_running = 0; + + for(size_t i = 0; i < workers.size(); ++i) { + auto s = (verbosity < iDEB ? info.info_long[i] : info.info_details[i]); + if(not info.is_finished[i]) + ++pool_running; + + if(not s.empty()) { + if(not info.is_finished[i] or (info.is_finished[i] and verbosity >iDEB)) + ss << "\n[#" << i << (info.is_finished[i] ? " (finished)" : " (running)") << "]: " << s << "\n"; + + if(verbosity > iDEB) { + auto lb = info.log_buffer[i].str(); + if(not lb.empty()) { + ss << "[#" << i << " (log)] \n"; + ss << lb << "\n"; + ss << "[#" << i << " (log)] - end\n"; + } + else { + ss << "[#" << i << " (log)] - empty\n"; + } + } + } + } + ss << "\n " << pool_running << " worker(s) are busy\n"; + } + } + cli_print(cli, "%s", ss.str().c_str()); return CLI_OK; diff --git a/src/service/http/async_request.hpp b/src/service/http/async_request.hpp index bc103e4..76e3c72 100644 --- a/src/service/http/async_request.hpp +++ b/src/service/http/async_request.hpp @@ -78,12 +78,36 @@ namespace sx::http { void execute(std::atomic_bool const& stop_flag) override { emit_url_wait(url, payload, hook); + if(log_stream.has_value()) { + std::stringstream& ss = log_stream.value(); + ss << "test\n"; + } + } + + std::string info_short() const override { + return string_format("web request: POST with %dB of data", payload.length()); }; + std::string info_long() const override { + return string_format("web request: POST %s with %dB of data", url.c_str(), payload.length()); + }; + std::string info_detailed() const override { + std::stringstream ss; + ss << string_format("web request details: POST %s with %dB of data\n", url.c_str(), payload.length()); + ss << "Payload: \n" << hex_dump((unsigned char*)payload.data(), payload.length()) << "\n"; + ss << "-- \n"; + + return ss.str(); + }; + + void set_log_buffer(std::stringstream& ss) override { + log_stream = ss; } + private: std::string url; std::string payload; reply_hook hook; + std::optional> log_stream; }; static AsyncRequest& get() { @@ -181,11 +205,6 @@ namespace sx::http { std::string copy_url(url); std::string copy_pay(pay); -// auto ret = pool.enqueue([copy_url, copy_pay, hook]([[maybe_unused]] std::atomic_bool const &stop_flag) { -// -// emit_url_wait(copy_url, copy_pay, hook); -// }); - auto task = std::make_unique(copy_url, copy_pay, hook); auto ret = pool.enqueue(std::move(task));