From 98e408c667e3dcda3dc177594ffa4c4d0d99a33e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?H=C3=A5vard=20Pettersen?= Date: Tue, 14 Feb 2023 10:15:12 +0000 Subject: [PATCH] stop using fastos thread more places - also stop using std::jthread - remove Active and Joinable interfaces - remove stop, stopped and slumber - remove currentThread - make start function static - override start for Runnable w/init or custom function - explicit stop/slumber where needed --- .../src/vespa/config/helper/configfetcher.cpp | 4 +- .../config/retriever/simpleconfigurer.cpp | 4 +- fnet/src/tests/examples/examples_test.cpp | 5 +- .../vespa/slobrok/server/slobrokserver.cpp | 8 +- .../filestorage/operationabortingtest.cpp | 4 +- vbench/src/apps/vbench/vbench.cpp | 3 +- .../src/tests/dispatcher/dispatcher_test.cpp | 6 +- .../handler_thread/handler_thread_test.cpp | 3 +- vbench/src/tests/timer/timer_test.cpp | 3 +- vbench/src/vbench/core/dispatcher.hpp | 3 +- vbench/src/vbench/core/handler_thread.h | 7 +- vbench/src/vbench/core/handler_thread.hpp | 4 +- .../src/vbench/vbench/request_scheduler.cpp | 24 ++++-- vbench/src/vbench/vbench/request_scheduler.h | 17 +++-- vbench/src/vbench/vbench/vbench.cpp | 5 +- vbench/src/vbench/vbench/vbench.h | 2 +- vbench/src/vbench/vbench/worker.cpp | 4 +- vbench/src/vbench/vbench/worker.h | 6 +- .../simple_thread_bundle_test.cpp | 1 + .../singleexecutor/singleexecutor_test.cpp | 1 + vespalib/src/tests/thread/thread_test.cpp | 48 +++++------- .../src/vespa/vespalib/testkit/test_hook.cpp | 25 ------- .../src/vespa/vespalib/testkit/test_hook.h | 13 ++-- .../src/vespa/vespalib/util/CMakeLists.txt | 2 - vespalib/src/vespa/vespalib/util/active.cpp | 7 -- vespalib/src/vespa/vespalib/util/active.h | 34 --------- vespalib/src/vespa/vespalib/util/joinable.cpp | 7 -- vespalib/src/vespa/vespalib/util/joinable.h | 23 ------ .../vespalib/util/simple_thread_bundle.cpp | 4 +- .../vespa/vespalib/util/singleexecutor.cpp | 9 ++- .../src/vespa/vespalib/util/singleexecutor.h | 6 +- vespalib/src/vespa/vespalib/util/thread.cpp | 75 ++----------------- vespalib/src/vespa/vespalib/util/thread.h | 56 +++++++------- 33 files changed, 134 insertions(+), 289 deletions(-) delete mode 100644 vespalib/src/vespa/vespalib/util/active.cpp delete mode 100644 vespalib/src/vespa/vespalib/util/active.h delete mode 100644 vespalib/src/vespa/vespalib/util/joinable.cpp delete mode 100644 vespalib/src/vespa/vespalib/util/joinable.h diff --git a/config/src/vespa/config/helper/configfetcher.cpp b/config/src/vespa/config/helper/configfetcher.cpp index b2cf6e1955de..ec4ea42d0bb7 100644 --- a/config/src/vespa/config/helper/configfetcher.cpp +++ b/config/src/vespa/config/helper/configfetcher.cpp @@ -15,7 +15,7 @@ VESPA_THREAD_STACK_TAG(config_fetcher_thread); ConfigFetcher::ConfigFetcher(std::shared_ptr context) : _poller(std::make_unique(std::move(context))), - _thread(std::make_unique(*_poller, config_fetcher_thread)), + _thread(std::make_unique()), _closed(false), _started(false) { @@ -36,7 +36,7 @@ ConfigFetcher::start() throw ConfigTimeoutException("ConfigFetcher::start timed out getting initial config"); } LOG(debug, "Starting fetcher thread..."); - _thread->start(); + *_thread = vespalib::Thread::start(*_poller, config_fetcher_thread); _started = true; LOG(debug, "Fetcher thread started"); } diff --git a/config/src/vespa/config/retriever/simpleconfigurer.cpp b/config/src/vespa/config/retriever/simpleconfigurer.cpp index 5059b9997f51..1e89f51ec032 100644 --- a/config/src/vespa/config/retriever/simpleconfigurer.cpp +++ b/config/src/vespa/config/retriever/simpleconfigurer.cpp @@ -13,7 +13,7 @@ VESPA_THREAD_STACK_TAG(simple_configurer_thread); SimpleConfigurer::SimpleConfigurer(SimpleConfigRetriever::UP retriever, SimpleConfigurable * const configurable) : _retriever(std::move(retriever)), _configurable(configurable), - _thread(*this, simple_configurer_thread), + _thread(), _started(false) { assert(_retriever); @@ -25,7 +25,7 @@ SimpleConfigurer::start() if (!_retriever->isClosed()) { LOG(debug, "Polling for config"); runConfigure(); - _thread.start(); + _thread = vespalib::Thread::start(*this, simple_configurer_thread); _started = true; } } diff --git a/fnet/src/tests/examples/examples_test.cpp b/fnet/src/tests/examples/examples_test.cpp index 4b9e2a58ef14..1b666898ff29 100644 --- a/fnet/src/tests/examples/examples_test.cpp +++ b/fnet/src/tests/examples/examples_test.cpp @@ -3,7 +3,8 @@ #include #include #include -#include +#include +#include #include #include @@ -42,7 +43,7 @@ bool run_with_retry(const vespalib::string &cmd) { for (size_t retry = 0; retry < 60; ++retry) { if (retry > 0) { fprintf(stderr, "retrying command in 500ms...\n"); - vespalib::Thread::sleep(500); + std::this_thread::sleep_for(500ms); } vespalib::string output; Process proc(cmd, true); diff --git a/slobrok/src/vespa/slobrok/server/slobrokserver.cpp b/slobrok/src/vespa/slobrok/server/slobrokserver.cpp index 5601336fdfdd..4a986d9ba015 100644 --- a/slobrok/src/vespa/slobrok/server/slobrokserver.cpp +++ b/slobrok/src/vespa/slobrok/server/slobrokserver.cpp @@ -8,16 +8,16 @@ VESPA_THREAD_STACK_TAG(slobrok_server_thread); SlobrokServer::SlobrokServer(ConfigShim &shim) : _env(shim), - _thread(*this, slobrok_server_thread) + _thread() { - _thread.start(); + _thread = vespalib::Thread::start(*this, slobrok_server_thread); } SlobrokServer::SlobrokServer(uint32_t port) : _env(ConfigShim(port)), - _thread(*this, slobrok_server_thread) + _thread() { - _thread.start(); + _thread = vespalib::Thread::start(*this, slobrok_server_thread); } diff --git a/storage/src/tests/persistence/filestorage/operationabortingtest.cpp b/storage/src/tests/persistence/filestorage/operationabortingtest.cpp index 000e1e1c1554..d7ecb1f30f1b 100644 --- a/storage/src/tests/persistence/filestorage/operationabortingtest.cpp +++ b/storage/src/tests/persistence/filestorage/operationabortingtest.cpp @@ -296,8 +296,7 @@ TEST_F(OperationAbortingTest, wait_for_current_operation_completion_for_aborted_ auto abortCmd = makeAbortCmd(abortSet); SendTask sendTask(abortCmd, *_queueBarrier, c.top); - vespalib::Thread thread(sendTask, test_thread); - thread.start(); + auto thread = vespalib::Thread::start(sendTask, test_thread); LOG(debug, "waiting for threads to reach barriers"); _queueBarrier->await(); @@ -306,7 +305,6 @@ TEST_F(OperationAbortingTest, wait_for_current_operation_completion_for_aborted_ LOG(debug, "waiting on completion barrier"); _completionBarrier->await(); - thread.stop(); thread.join(); // If waiting works, put reply shall always be ordered before the internal diff --git a/vbench/src/apps/vbench/vbench.cpp b/vbench/src/apps/vbench/vbench.cpp index ffea64c6034c..b5c2897207f9 100644 --- a/vbench/src/apps/vbench/vbench.cpp +++ b/vbench/src/apps/vbench/vbench.cpp @@ -44,8 +44,7 @@ int run(const std::string &cfg_name) { VBench vbench(cfg); NotifyDone notify(done); vespalib::RunnablePair runBoth(vbench, notify); - vespalib::Thread thread(runBoth, vbench_thread); - thread.start(); + auto thread = vespalib::Thread::start(runBoth, vbench_thread); while (!SIG::INT.check() && !SIG::TERM.check() && !done.await(1s)) {} if (!done.await(vespalib::duration::zero())) { vbench.abort(); diff --git a/vbench/src/tests/dispatcher/dispatcher_test.cpp b/vbench/src/tests/dispatcher/dispatcher_test.cpp index 85879feb0ee0..6a3fb8d0c7cd 100644 --- a/vbench/src/tests/dispatcher/dispatcher_test.cpp +++ b/vbench/src/tests/dispatcher/dispatcher_test.cpp @@ -30,11 +30,9 @@ TEST("dispatcher") { Dispatcher dispatcher(dropped); Fetcher fetcher1(dispatcher, handler1); Fetcher fetcher2(dispatcher, handler2); - vespalib::Thread thread1(fetcher1, fetcher1_thread); - vespalib::Thread thread2(fetcher2, fetcher2_thread); - thread1.start(); + auto thread1 = vespalib::Thread::start(fetcher1, fetcher1_thread); EXPECT_TRUE(dispatcher.waitForThreads(1, 512)); - thread2.start(); + auto thread2 = vespalib::Thread::start(fetcher2, fetcher2_thread); EXPECT_TRUE(dispatcher.waitForThreads(2, 512)); EXPECT_EQUAL(-1, dropped.value); EXPECT_EQUAL(-1, handler1.value); diff --git a/vbench/src/tests/handler_thread/handler_thread_test.cpp b/vbench/src/tests/handler_thread/handler_thread_test.cpp index 97a12e82ac87..497b7db28838 100644 --- a/vbench/src/tests/handler_thread/handler_thread_test.cpp +++ b/vbench/src/tests/handler_thread/handler_thread_test.cpp @@ -1,6 +1,7 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include #include +#include using namespace vbench; @@ -9,7 +10,7 @@ struct MyHandler : Handler { ~MyHandler() override; void handle(std::unique_ptr value) override { values.push_back(*value); - vespalib::Thread::sleep(10); // for improved coverage + std::this_thread::sleep_for(10ms); } }; diff --git a/vbench/src/tests/timer/timer_test.cpp b/vbench/src/tests/timer/timer_test.cpp index 43e7b76caa01..eda0564d2d83 100644 --- a/vbench/src/tests/timer/timer_test.cpp +++ b/vbench/src/tests/timer/timer_test.cpp @@ -1,13 +1,14 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include #include +#include using namespace vbench; IGNORE_TEST("timer") { Timer timer; EXPECT_APPROX(0.0, timer.sample(), 0.1); - vespalib::Thread::sleep(1000); + std::this_thread::sleep_for(1000ms); EXPECT_APPROX(1.0, timer.sample(), 0.1); timer.reset(); EXPECT_APPROX(0.0, timer.sample(), 0.1); diff --git a/vbench/src/vbench/core/dispatcher.hpp b/vbench/src/vbench/core/dispatcher.hpp index aa5afcd6d67f..572e9f3381de 100644 --- a/vbench/src/vbench/core/dispatcher.hpp +++ b/vbench/src/vbench/core/dispatcher.hpp @@ -1,6 +1,7 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include +#include namespace vbench { @@ -22,7 +23,7 @@ Dispatcher::waitForThreads(size_t threads, size_t pollCnt) const { for (size_t i = 0; i < pollCnt; ++i) { if (i != 0) { - vespalib::Thread::sleep(20); + std::this_thread::sleep_for(20ms); } { std::lock_guard guard(_lock); diff --git a/vbench/src/vbench/core/handler_thread.h b/vbench/src/vbench/core/handler_thread.h index 402ecbeb0dc5..8ece1389dfcd 100644 --- a/vbench/src/vbench/core/handler_thread.h +++ b/vbench/src/vbench/core/handler_thread.h @@ -6,7 +6,7 @@ #include #include #include -#include +#include namespace vbench { @@ -19,8 +19,7 @@ namespace vbench { **/ template class HandlerThread : public Handler, - public vespalib::Runnable, - public vespalib::Joinable + public vespalib::Runnable { private: std::mutex _lock; @@ -36,7 +35,7 @@ class HandlerThread : public Handler, HandlerThread(Handler &next, init_fun_t init_fun); ~HandlerThread(); void handle(std::unique_ptr obj) override; - void join() override; + void join(); }; } // namespace vbench diff --git a/vbench/src/vbench/core/handler_thread.hpp b/vbench/src/vbench/core/handler_thread.hpp index 56cc0a7771d6..1a99861ea816 100644 --- a/vbench/src/vbench/core/handler_thread.hpp +++ b/vbench/src/vbench/core/handler_thread.hpp @@ -28,10 +28,10 @@ HandlerThread::HandlerThread(Handler &next, init_fun_t init_fun) _cond(), _queue(), _next(next), - _thread(*this, init_fun), + _thread(), _done(false) { - _thread.start(); + _thread = vespalib::Thread::start(*this, init_fun); } template diff --git a/vbench/src/vbench/vbench/request_scheduler.cpp b/vbench/src/vbench/vbench/request_scheduler.cpp index 95d29181b1fc..cde31ec07b8a 100644 --- a/vbench/src/vbench/vbench/request_scheduler.cpp +++ b/vbench/src/vbench/vbench/request_scheduler.cpp @@ -2,6 +2,7 @@ #include "request_scheduler.h" #include +#include namespace vbench { @@ -13,14 +14,18 @@ RequestScheduler::run() { double sleepTime; std::vector list; - vespalib::Thread &thread = vespalib::Thread::currentThread(); while (_queue.extract(_timer.sample(), list, sleepTime)) { for (size_t i = 0; i < list.size(); ++i) { Request::UP request = Request::UP(list[i].release()); _dispatcher.handle(std::move(request)); } list.clear(); - thread.slumber(sleepTime); + { + auto guard = std::unique_lock(_lock); + if (_may_slumber) { + _cond.wait_for(guard, std::chrono::duration(sleepTime)); + } + } } } @@ -30,9 +35,12 @@ RequestScheduler::RequestScheduler(CryptoEngine::SP crypto, Handler &ne _queue(10.0, 0.020), _droppedTagger(_proxy), _dispatcher(_droppedTagger), - _thread(*this, vbench_request_scheduler_thread), + _thread(), _connectionPool(std::move(crypto), _timer), - _workers() + _workers(), + _lock(), + _cond(), + _may_slumber(true) { for (size_t i = 0; i < numWorkers; ++i) { _workers.push_back(std::make_unique(_dispatcher, _proxy, _connectionPool, _timer)); @@ -45,7 +53,11 @@ RequestScheduler::abort() { _queue.close(); _queue.discard(); - _thread.stop(); + { + auto guard = std::lock_guard(_lock); + _may_slumber = false; + _cond.notify_all(); + } } void @@ -59,7 +71,7 @@ void RequestScheduler::start() { _timer.reset(); - _thread.start(); + _thread = vespalib::Thread::start(*this, vbench_request_scheduler_thread); } RequestScheduler & diff --git a/vbench/src/vbench/vbench/request_scheduler.h b/vbench/src/vbench/vbench/request_scheduler.h index a0fb1eda4e70..b1d525eb6915 100644 --- a/vbench/src/vbench/vbench/request_scheduler.h +++ b/vbench/src/vbench/vbench/request_scheduler.h @@ -7,7 +7,8 @@ #include #include #include -#include +#include +#include namespace vbench { @@ -17,8 +18,7 @@ namespace vbench { * with. **/ class RequestScheduler : public Handler, - public vespalib::Runnable, - public vespalib::Active + public vespalib::Runnable { private: Timer _timer; @@ -29,7 +29,10 @@ class RequestScheduler : public Handler, vespalib::Thread _thread; HttpConnectionPool _connectionPool; std::vector _workers; - + std::mutex _lock; + std::condition_variable _cond; + bool _may_slumber; + void run() override; public: using UP = std::unique_ptr; @@ -37,9 +40,9 @@ class RequestScheduler : public Handler, RequestScheduler(CryptoEngine::SP crypto, Handler &next, size_t numWorkers); void abort(); void handle(Request::UP request) override; - void start() override; - RequestScheduler &stop() override; - void join() override; + void start(); + RequestScheduler &stop(); + void join(); }; } // namespace vbench diff --git a/vbench/src/vbench/vbench/vbench.cpp b/vbench/src/vbench/vbench/vbench.cpp index 9a5adad262ee..8a8bceccefbe 100644 --- a/vbench/src/vbench/vbench/vbench.cpp +++ b/vbench/src/vbench/vbench/vbench.cpp @@ -78,7 +78,6 @@ VBench::VBench(const vespalib::Slime &cfg) } inputChain->generator = _factory.createGenerator(generator, *inputChain->taggers.back()); if (inputChain->generator.get() != 0) { - inputChain->thread.reset(new vespalib::Thread(*inputChain->generator, vbench_inputchain_generator)); _inputs.push_back(std::move(inputChain)); } } @@ -101,10 +100,10 @@ VBench::run() { _scheduler->start(); for (size_t i = 0; i < _inputs.size(); ++i) { - _inputs[i]->thread->start(); + _inputs[i]->thread = vespalib::Thread::start(*_inputs[i]->generator, vbench_inputchain_generator); } for (size_t i = 0; i < _inputs.size(); ++i) { - _inputs[i]->thread->join(); + _inputs[i]->thread.join(); } _scheduler->stop().join(); for (size_t i = 0; i < _inputs.size(); ++i) { diff --git a/vbench/src/vbench/vbench/vbench.h b/vbench/src/vbench/vbench/vbench.h index dbb46e72800a..2b7fcf0cd88a 100644 --- a/vbench/src/vbench/vbench/vbench.h +++ b/vbench/src/vbench/vbench/vbench.h @@ -26,7 +26,7 @@ class VBench : public vespalib::Runnable, using UP = std::unique_ptr; std::vector taggers; Generator::UP generator; - std::unique_ptr thread; + vespalib::Thread thread; }; NativeFactory _factory; std::vector _analyzers; diff --git a/vbench/src/vbench/vbench/worker.cpp b/vbench/src/vbench/vbench/worker.cpp index afccc7de39f0..eabd17ae73fb 100644 --- a/vbench/src/vbench/vbench/worker.cpp +++ b/vbench/src/vbench/vbench/worker.cpp @@ -24,13 +24,13 @@ Worker::run() Worker::Worker(Provider &provider, Handler &next, HttpConnectionPool &pool, Timer &timer) - : _thread(*this, vbench_worker_thread), + : _thread(), _provider(provider), _next(next), _pool(pool), _timer(timer) { - _thread.start(); + _thread = vespalib::Thread::start(*this, vbench_worker_thread); } } // namespace vbench diff --git a/vbench/src/vbench/vbench/worker.h b/vbench/src/vbench/vbench/worker.h index 6594a4f3dd63..d2bcfae637bd 100644 --- a/vbench/src/vbench/vbench/worker.h +++ b/vbench/src/vbench/vbench/worker.h @@ -8,7 +8,6 @@ #include #include #include -#include namespace vbench { @@ -18,8 +17,7 @@ namespace vbench { * internal thread that will stop when the request provider starts * handing out empty requests. **/ -class Worker : public vespalib::Runnable, - public vespalib::Joinable +class Worker : public vespalib::Runnable { private: vespalib::Thread _thread; @@ -33,7 +31,7 @@ class Worker : public vespalib::Runnable, using UP = std::unique_ptr; Worker(Provider &provider, Handler &next, HttpConnectionPool &pool, Timer &timer); - void join() override { _thread.join(); } + void join() { _thread.join(); } }; } // namespace vbench diff --git a/vespalib/src/tests/simple_thread_bundle/simple_thread_bundle_test.cpp b/vespalib/src/tests/simple_thread_bundle/simple_thread_bundle_test.cpp index 59aeddfe8cad..e451f1e033df 100644 --- a/vespalib/src/tests/simple_thread_bundle/simple_thread_bundle_test.cpp +++ b/vespalib/src/tests/simple_thread_bundle/simple_thread_bundle_test.cpp @@ -4,6 +4,7 @@ #include #include #include +#include #include #include diff --git a/vespalib/src/tests/singleexecutor/singleexecutor_test.cpp b/vespalib/src/tests/singleexecutor/singleexecutor_test.cpp index 56352ff3c0d9..3b1d244eb13b 100644 --- a/vespalib/src/tests/singleexecutor/singleexecutor_test.cpp +++ b/vespalib/src/tests/singleexecutor/singleexecutor_test.cpp @@ -5,6 +5,7 @@ #include #include #include +#include #include using namespace vespalib; diff --git a/vespalib/src/tests/thread/thread_test.cpp b/vespalib/src/tests/thread/thread_test.cpp index af1fb6264621..f7e8753fd86d 100644 --- a/vespalib/src/tests/thread/thread_test.cpp +++ b/vespalib/src/tests/thread/thread_test.cpp @@ -2,56 +2,48 @@ #include #include -#include using namespace vespalib; VESPA_THREAD_STACK_TAG(test_agent_thread); struct Agent : public Runnable { - bool started; - int loopCnt; - Agent() : started(false), loopCnt(0) {} + bool was_run; + Agent() : was_run(false) {} void run() override { - started = true; - Thread &thread = Thread::currentThread(); - while (thread.slumber(60.0)) { - ++loopCnt; - } + was_run = true; } }; -TEST("thread never started") { +void my_fun(bool *was_run) { + *was_run = true; +} + +TEST("run vespalib::Runnable with init function") { Agent agent; { - Thread thread(agent, test_agent_thread); + auto thread = Thread::start(agent, test_agent_thread); } - EXPECT_TRUE(!agent.started); - EXPECT_EQUAL(0, agent.loopCnt); + EXPECT_TRUE(agent.was_run); } -TEST("normal operation") { - Agent agent; +TEST("run custom function") { + bool was_run = false; { - Thread thread(agent, test_agent_thread); - thread.start(); - std::this_thread::sleep_for(20ms); - thread.stop().join(); + auto thread = Thread::start(my_fun, &was_run); } - EXPECT_TRUE(agent.started); - EXPECT_EQUAL(0, agent.loopCnt); + EXPECT_TRUE(was_run); } -TEST("stop before start") { - Agent agent; +TEST("join multiple times (including destructor)") { + bool was_run = false; { - Thread thread(agent, test_agent_thread); - thread.stop(); - thread.start(); + auto thread = Thread::start(my_fun, &was_run); + thread.join(); + thread.join(); thread.join(); } - EXPECT_TRUE(!agent.started); - EXPECT_EQUAL(0, agent.loopCnt); + EXPECT_TRUE(was_run); } TEST_MAIN() { TEST_RUN_ALL(); } diff --git a/vespalib/src/vespa/vespalib/testkit/test_hook.cpp b/vespalib/src/vespa/vespalib/testkit/test_hook.cpp index 4e33897d869a..5b618beb01b5 100644 --- a/vespalib/src/vespa/vespalib/testkit/test_hook.cpp +++ b/vespalib/src/vespa/vespalib/testkit/test_hook.cpp @@ -4,31 +4,9 @@ #include #include #include -#include namespace vespalib { -namespace { - -struct FastOSTestThreadRunner : FastOS_Runnable { - TestThreadEntry &entry; - FastOSTestThreadRunner(TestThreadEntry &entry_in) : entry(entry_in) {} - bool DeleteOnCompletion() const override { return true; } - void Run(FastOS_ThreadInterface *, void *) override { entry.threadEntry(); } -}; - -struct FastOSTestThreadFactory : TestThreadFactory { - FastOS_ThreadPool threadPool; - FastOSTestThreadFactory() : threadPool() {} - void createThread(TestThreadEntry &entry) override { - threadPool.NewThread(new FastOSTestThreadRunner(entry), 0); - } -}; - -} // namespace vespalib:: - -__thread TestThreadFactory *TestThreadFactory::factory = 0; - void TestThreadWrapper::threadEntry() { @@ -96,8 +74,6 @@ const char *lookup_subset_pattern(const std::string &name) { void TestHook::runAll() { - FastOSTestThreadFactory threadFactory; - TestThreadFactory::factory = &threadFactory; std::string name = TestMaster::master.getName(); std::regex pattern(lookup_subset_pattern(name)); size_t testsPassed = 0; @@ -134,7 +110,6 @@ TestHook::runAll() fprintf(stderr, "%s: Warn: test summary --- %zu test(s) ignored\n", name.c_str(), testsIgnored); } - TestThreadFactory::factory = 0; } } // namespace vespalib diff --git a/vespalib/src/vespa/vespalib/testkit/test_hook.h b/vespalib/src/vespa/vespalib/testkit/test_hook.h index d0e04ede2e9a..8a5c6c1e6842 100644 --- a/vespalib/src/vespa/vespalib/testkit/test_hook.h +++ b/vespalib/src/vespa/vespalib/testkit/test_hook.h @@ -4,6 +4,8 @@ #include #include +#include +#include #include #include #include @@ -16,12 +18,6 @@ struct TestThreadEntry { virtual ~TestThreadEntry() {} }; -struct TestThreadFactory { - static __thread TestThreadFactory *factory; - virtual void createThread(TestThreadEntry &entry) = 0; - virtual ~TestThreadFactory() {} -}; - struct TestFixtureWrapper { size_t thread_id; size_t num_threads; @@ -82,8 +78,10 @@ class TestHook Barrier barrier(num_threads); std::vector fixtures; std::vector threads; + std::vector thread_handles; threads.reserve(num_threads); fixtures.reserve(num_threads); + thread_handles.reserve(num_threads - 1); for (size_t i = 0; i < num_threads; ++i) { FixtureUP fixture_up(new T(fixture)); fixture_up->thread_id = i; @@ -92,8 +90,7 @@ class TestHook fixtures.push_back(std::move(fixture_up)); } for (size_t i = 1; i < num_threads; ++i) { - assert(TestThreadFactory::factory != 0); - TestThreadFactory::factory->createThread(*threads[i]); + thread_handles.push_back(Thread::start([&target = *threads[i]](){ target.threadEntry(); })); } threads[0]->threadEntry(); latch.await(); diff --git a/vespalib/src/vespa/vespalib/util/CMakeLists.txt b/vespalib/src/vespa/vespalib/util/CMakeLists.txt index ad2db89288c6..73e8b93a2ff7 100644 --- a/vespalib/src/vespa/vespalib/util/CMakeLists.txt +++ b/vespalib/src/vespa/vespalib/util/CMakeLists.txt @@ -1,7 +1,6 @@ # Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. vespa_add_library(vespalib_vespalib_util OBJECT SOURCES - active.cpp adaptive_sequenced_executor.cpp address_space.cpp alloc.cpp @@ -45,7 +44,6 @@ vespa_add_library(vespalib_vespalib_util OBJECT invokeserviceimpl.cpp isequencedtaskexecutor.cpp issue.cpp - joinable.cpp jsonexception.cpp jsonstream.cpp jsonwriter.cpp diff --git a/vespalib/src/vespa/vespalib/util/active.cpp b/vespalib/src/vespa/vespalib/util/active.cpp deleted file mode 100644 index 48785c74b79b..000000000000 --- a/vespalib/src/vespa/vespalib/util/active.cpp +++ /dev/null @@ -1,7 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#include "active.h" - -namespace vespalib { - -} // namespace vespalib diff --git a/vespalib/src/vespa/vespalib/util/active.h b/vespalib/src/vespa/vespalib/util/active.h deleted file mode 100644 index 1fbff9514d78..000000000000 --- a/vespalib/src/vespa/vespalib/util/active.h +++ /dev/null @@ -1,34 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#pragma once - -#include "joinable.h" - -namespace vespalib { - -/** - * Interface used to abstract entities that are the source of - * activity. - **/ -struct Active : Joinable { - /** - * Start activity. - **/ - virtual void start() = 0; - - /** - * Request that activity stops. The returned object can be used to - * wait for the actual conclusion of the activity. - * - * @return object that can be used to wait for activity completion - **/ - virtual Joinable &stop() = 0; - - /** - * Empty virtual destructor to enable subclassing. - **/ - virtual ~Active() {} -}; - -} // namespace vespalib - diff --git a/vespalib/src/vespa/vespalib/util/joinable.cpp b/vespalib/src/vespa/vespalib/util/joinable.cpp deleted file mode 100644 index 581126603896..000000000000 --- a/vespalib/src/vespa/vespalib/util/joinable.cpp +++ /dev/null @@ -1,7 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#include "joinable.h" - -namespace vespalib { - -} // namespace vespalib diff --git a/vespalib/src/vespa/vespalib/util/joinable.h b/vespalib/src/vespa/vespalib/util/joinable.h deleted file mode 100644 index 275ae740d264..000000000000 --- a/vespalib/src/vespa/vespalib/util/joinable.h +++ /dev/null @@ -1,23 +0,0 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#pragma once - -namespace vespalib { - -/** - * Concurrent activity that we can wait for a conclusion of. - **/ -struct Joinable { - /** - * Wait for the conclusion of this concurrent activity - **/ - virtual void join() = 0; - - /** - * Empty virtual destructor to enable subclassing. - **/ - virtual ~Joinable() {} -}; - -} // namespace vespalib - diff --git a/vespalib/src/vespa/vespalib/util/simple_thread_bundle.cpp b/vespalib/src/vespa/vespalib/util/simple_thread_bundle.cpp index 8a66a4f68986..becb5d2ab74f 100644 --- a/vespalib/src/vespa/vespalib/util/simple_thread_bundle.cpp +++ b/vespalib/src/vespa/vespalib/util/simple_thread_bundle.cpp @@ -178,11 +178,11 @@ SimpleThreadBundle::run(Runnable* const* targets, size_t cnt) } SimpleThreadBundle::Worker::Worker(Signal &s, Runnable::init_fun_t init_fun, Runnable::UP h) - : thread(*this, std::move(init_fun)), + : thread(), signal(s), hook(std::move(h)) { - thread.start(); + thread = Thread::start(*this, std::move(init_fun)); } void diff --git a/vespalib/src/vespa/vespalib/util/singleexecutor.cpp b/vespalib/src/vespa/vespalib/util/singleexecutor.cpp index c2f83bbcf093..298226d88051 100644 --- a/vespalib/src/vespa/vespalib/util/singleexecutor.cpp +++ b/vespalib/src/vespa/vespalib/util/singleexecutor.cpp @@ -19,7 +19,8 @@ SingleExecutor::SingleExecutor(init_fun_t func, uint32_t reservedQueueSize, bool _mutex(), _consumerCondition(), _producerCondition(), - _thread(*this, func), + _thread(), + _stopped(false), _idleTracker(steady_clock::now()), _threadIdleTracker(), _wakeupCount(0), @@ -37,13 +38,13 @@ SingleExecutor::SingleExecutor(init_fun_t func, uint32_t reservedQueueSize, bool if ( ! isQueueSizeHard) { _overflow = std::make_unique>(); } - _thread.start(); + _thread = Thread::start(*this, func); } SingleExecutor::~SingleExecutor() { shutdown(); sync(); - _thread.stop(); + stop(); _consumerCondition.notify_one(); _thread.join(); } @@ -140,7 +141,7 @@ SingleExecutor::shutdown() { void SingleExecutor::run() { - while (!_thread.stopped()) { + while (!stopped()) { drain_tasks(); _producerCondition.notify_all(); _wakeupConsumerAt.store(_wp.load(std::memory_order_relaxed) + get_watermark(), std::memory_order_relaxed); diff --git a/vespalib/src/vespa/vespalib/util/singleexecutor.h b/vespalib/src/vespa/vespalib/util/singleexecutor.h index dd755a763022..051f506e90a5 100644 --- a/vespalib/src/vespa/vespalib/util/singleexecutor.h +++ b/vespalib/src/vespa/vespalib/util/singleexecutor.h @@ -9,6 +9,8 @@ #include #include #include +#include +#include namespace vespalib { @@ -38,6 +40,8 @@ class SingleExecutor final : public vespalib::SyncableThreadExecutor, vespalib:: using Lock = std::unique_lock; void drain(Lock & lock); void run() override; + void stop() { _stopped = true; } + bool stopped() const { return _stopped.load(std::memory_order_relaxed); } void drain_tasks(); void sleepProducer(Lock & guard, duration maxWaitTime, uint64_t wakeupAt); void run_tasks_till(uint64_t available); @@ -48,7 +52,6 @@ class SingleExecutor final : public vespalib::SyncableThreadExecutor, vespalib:: uint64_t index(uint64_t counter) const { return counter & (_taskLimit.load(std::memory_order_relaxed) - 1); } - uint64_t numTasks(); uint64_t numTasks(Lock & guard) const { return num_tasks_in_main_q() + num_tasks_in_overflow_q(guard); @@ -68,6 +71,7 @@ class SingleExecutor final : public vespalib::SyncableThreadExecutor, vespalib:: std::condition_variable _consumerCondition; std::condition_variable _producerCondition; vespalib::Thread _thread; + std::atomic _stopped; ExecutorIdleTracker _idleTracker; ThreadIdleTracker _threadIdleTracker; uint64_t _wakeupCount; diff --git a/vespalib/src/vespa/vespalib/util/thread.cpp b/vespalib/src/vespa/vespalib/util/thread.cpp index b6a491ee83da..cad24c5bcda7 100644 --- a/vespalib/src/vespa/vespalib/util/thread.cpp +++ b/vespalib/src/vespa/vespalib/util/thread.cpp @@ -1,56 +1,14 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "thread.h" -#include "time.h" -#include -#include namespace vespalib { -__thread Thread *Thread::_currentThread = nullptr; - -void -Thread::run() -{ - assert(_currentThread == nullptr); - _currentThread = this; - _start.await(); - if (!stopped()) { - _init_fun(_runnable); - } - assert(_currentThread == this); - _currentThread = nullptr; -} - -Thread::Thread(Runnable &runnable, init_fun_t init_fun_in) - : _runnable(runnable), - _init_fun(std::move(init_fun_in)), - _start(), - _lock(), - _cond(), - _stopped(false), - _woken(false), - _thread(&Thread::run, this) -{ -} - -Thread::~Thread() -{ - stop().start(); -} - -void -Thread::start() -{ - _start.countDown(); -} - Thread & -Thread::stop() +Thread::operator=(Thread &&rhs) noexcept { - std::unique_lock guard(_lock); - _stopped.store(true, std::memory_order_relaxed); - _cond.notify_all(); + // may call std::terminate + _thread = std::move(rhs._thread); return *this; } @@ -62,32 +20,15 @@ Thread::join() } } -bool -Thread::slumber(double s) -{ - std::unique_lock guard(_lock); - if (!stopped() || _woken) { - if (_cond.wait_for(guard, from_s(s)) == std::cv_status::no_timeout) { - _woken = stopped(); - } - } else { - _woken = true; - } - return !stopped(); -} - -Thread & -Thread::currentThread() +Thread::~Thread() { - Thread *thread = _currentThread; - assert(thread != nullptr); - return *thread; + join(); } -void -Thread::sleep(size_t ms) +Thread +Thread::start(Runnable &runnable, Runnable::init_fun_t init_fun) { - std::this_thread::sleep_for(std::chrono::milliseconds(ms)); + return start([&runnable, init_fun](){ init_fun(runnable); }); } } // namespace vespalib diff --git a/vespalib/src/vespa/vespalib/util/thread.h b/vespalib/src/vespa/vespalib/util/thread.h index c03f1c6e65c7..6db1c1d54b41 100644 --- a/vespalib/src/vespa/vespalib/util/thread.h +++ b/vespalib/src/vespa/vespalib/util/thread.h @@ -2,46 +2,42 @@ #pragma once -#include "gate.h" #include "runnable.h" -#include "active.h" -#include #include +#include namespace vespalib { /** - * Abstraction of the concept of running a single thread. + * Thin thread abstraction that takes some things from std::thread + * (not allowed to assign to a running thread), some things from + * std::jthread (destructor does automatic join) and some things from + * now deprecated thread pools (the join function can be called + * multiple times and will only join the underlying thread if it is + * joinable). Enables starting a thread either by using a runnable and + * an init function or by forwarding directly to the std::thread + * constructor. Note that this class does not handle cancellation. **/ -class Thread : public Active +class Thread { private: - using init_fun_t = Runnable::init_fun_t; - static __thread Thread *_currentThread; - - Runnable &_runnable; - init_fun_t _init_fun; - vespalib::Gate _start; - std::mutex _lock; - std::condition_variable _cond; - std::atomic _stopped; - bool _woken; - std::jthread _thread; - - void run(); - + std::thread _thread; + Thread(std::thread &&thread) noexcept : _thread(std::move(thread)) {} public: - Thread(Runnable &runnable, init_fun_t init_fun_in); - ~Thread() override; - void start() override; - Thread &stop() override; - void join() override; - [[nodiscard]] bool stopped() const noexcept { - return _stopped.load(std::memory_order_relaxed); - } - bool slumber(double s); - static Thread ¤tThread(); - static void sleep(size_t ms); + Thread() noexcept : _thread() {} + Thread(const Thread &rhs) = delete; + Thread(Thread &&rhs) noexcept : Thread(std::move(rhs._thread)) {} + std::thread::id get_id() const noexcept { return _thread.get_id(); } + Thread &operator=(const Thread &rhs) = delete; + Thread &operator=(Thread &&rhs) noexcept; + void join(); + ~Thread(); + [[nodiscard]] static Thread start(Runnable &runnable, Runnable::init_fun_t init_fun_t); + template + requires std::invocable + [[nodiscard]] static Thread start(F &&f, Args && ... args) { + return Thread(std::thread(std::forward(f), std::forward(args)...)); + }; }; } // namespace vespalib