Skip to content

Commit

Permalink
thread pool - reflect API change in sx::http::AsyncRequest and CLI
Browse files Browse the repository at this point in the history
- cli gives now diagnostic information
  • Loading branch information
astibal committed Jul 3, 2024
1 parent 1d5d9e3 commit dc8c312
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 5 deletions.
54 changes: 54 additions & 0 deletions src/service/cmd/diag/diag_cmds.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2236,6 +2236,9 @@ int cli_diag_worker_proxy_list(struct cli_def *cli, [[maybe_unused]] const char
int cli_diag_worker_pool_list(struct cli_def *cli, const char *command, char *argv[], int argc) {

using namespace sx::tp;
auto args = args_to_vec(argv, argc);

int verbosity = ( ! args.empty() ) ? safe_val(args[0], iINF) : iINF;

std::stringstream ss;

Expand All @@ -2258,6 +2261,57 @@ int cli_diag_worker_pool_list(struct cli_def *cli, const char *command, char *ar
ss << "\n";
ss << " is active: " << active << "\n";

if(verbosity > iINF) {

ss << "\n";

auto& tp = ThreadPool::instance::get();
auto lc_ = std::scoped_lock(tp.get_lock());


{
auto const& tasks = tp.get_tasks();
auto tsz = tasks.size();
if(tsz > 0) {
ss << "Queued tasks: \n";
for (std::size_t i = 0; i < tsz; ++i) {
auto const &t = tasks[i];
ss << (verbosity < iDEB ? t->info_long() : t->info_detailed()) << "\n";
}
}
}
ss << "Running tasks: \n";
{
auto const& workers = tp.get_workers();
auto const& info = tp.get_worker_tasks();
size_t pool_running = 0;

for(size_t i = 0; i < workers.size(); ++i) {
auto s = (verbosity < iDEB ? info.info_long[i] : info.info_details[i]);
if(not info.is_finished[i])
++pool_running;

if(not s.empty()) {
if(not info.is_finished[i] or (info.is_finished[i] and verbosity >iDEB))
ss << "\n[#" << i << (info.is_finished[i] ? " (finished)" : " (running)") << "]: " << s << "\n";

if(verbosity > iDEB) {
auto lb = info.log_buffer[i].str();
if(not lb.empty()) {
ss << "[#" << i << " (log)] \n";
ss << lb << "\n";
ss << "[#" << i << " (log)] - end\n";
}
else {
ss << "[#" << i << " (log)] - empty\n";
}
}
}
}
ss << "\n " << pool_running << " worker(s) are busy\n";
}
}

cli_print(cli, "%s", ss.str().c_str());

return CLI_OK;
Expand Down
29 changes: 24 additions & 5 deletions src/service/http/async_request.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,36 @@ namespace sx::http {

void execute(std::atomic_bool const& stop_flag) override {
emit_url_wait(url, payload, hook);
if(log_stream.has_value()) {
std::stringstream& ss = log_stream.value();
ss << "test\n";
}
}

std::string info_short() const override {
return string_format("web request: POST with %dB of data", payload.length()); };
std::string info_long() const override {
return string_format("web request: POST %s with %dB of data", url.c_str(), payload.length());
};
std::string info_detailed() const override {
std::stringstream ss;
ss << string_format("web request details: POST %s with %dB of data\n", url.c_str(), payload.length());
ss << "Payload: \n" << hex_dump((unsigned char*)payload.data(), payload.length()) << "\n";
ss << "-- \n";

return ss.str();
};

void set_log_buffer(std::stringstream& ss) override {
log_stream = ss;
}


private:
std::string url;
std::string payload;
reply_hook hook;
std::optional<std::reference_wrapper<std::stringstream>> log_stream;
};

static AsyncRequest& get() {
Expand Down Expand Up @@ -181,11 +205,6 @@ namespace sx::http {
std::string copy_url(url);
std::string copy_pay(pay);

// auto ret = pool.enqueue([copy_url, copy_pay, hook]([[maybe_unused]] std::atomic_bool const &stop_flag) {
//
// emit_url_wait(copy_url, copy_pay, hook);
// });

auto task = std::make_unique<RequestTask>(copy_url, copy_pay, hook);
auto ret = pool.enqueue(std::move(task));

Expand Down

0 comments on commit dc8c312

Please sign in to comment.