Skip to content

Commit

Permalink
Merge pull request #26038 from vespa-engine/havardpe/stop-using-jthre…
Browse files Browse the repository at this point in the history
…ad-as-well

stop using fastos thread more places
  • Loading branch information
baldersheim authored Feb 14, 2023
2 parents b1ebfcf + 98e408c commit 707663e
Show file tree
Hide file tree
Showing 33 changed files with 134 additions and 289 deletions.
4 changes: 2 additions & 2 deletions config/src/vespa/config/helper/configfetcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ VESPA_THREAD_STACK_TAG(config_fetcher_thread);

ConfigFetcher::ConfigFetcher(std::shared_ptr<IConfigContext> context)
: _poller(std::make_unique<ConfigPoller>(std::move(context))),
_thread(std::make_unique<vespalib::Thread>(*_poller, config_fetcher_thread)),
_thread(std::make_unique<vespalib::Thread>()),
_closed(false),
_started(false)
{
Expand All @@ -36,7 +36,7 @@ ConfigFetcher::start()
throw ConfigTimeoutException("ConfigFetcher::start timed out getting initial config");
}
LOG(debug, "Starting fetcher thread...");
_thread->start();
*_thread = vespalib::Thread::start(*_poller, config_fetcher_thread);
_started = true;
LOG(debug, "Fetcher thread started");
}
Expand Down
4 changes: 2 additions & 2 deletions config/src/vespa/config/retriever/simpleconfigurer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ VESPA_THREAD_STACK_TAG(simple_configurer_thread);
SimpleConfigurer::SimpleConfigurer(SimpleConfigRetriever::UP retriever, SimpleConfigurable * const configurable)
: _retriever(std::move(retriever)),
_configurable(configurable),
_thread(*this, simple_configurer_thread),
_thread(),
_started(false)
{
assert(_retriever);
Expand All @@ -25,7 +25,7 @@ SimpleConfigurer::start()
if (!_retriever->isClosed()) {
LOG(debug, "Polling for config");
runConfigure();
_thread.start();
_thread = vespalib::Thread::start(*this, simple_configurer_thread);
_started = true;
}
}
Expand Down
5 changes: 3 additions & 2 deletions fnet/src/tests/examples/examples_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
#include <vespa/vespalib/process/process.h>
#include <vespa/vespalib/util/size_literals.h>
#include <vespa/vespalib/util/stringfmt.h>
#include <vespa/vespalib/util/thread.h>
#include <vespa/vespalib/util/time.h>
#include <thread>
#include <atomic>
#include <csignal>

Expand Down Expand Up @@ -42,7 +43,7 @@ bool run_with_retry(const vespalib::string &cmd) {
for (size_t retry = 0; retry < 60; ++retry) {
if (retry > 0) {
fprintf(stderr, "retrying command in 500ms...\n");
vespalib::Thread::sleep(500);
std::this_thread::sleep_for(500ms);
}
vespalib::string output;
Process proc(cmd, true);
Expand Down
8 changes: 4 additions & 4 deletions slobrok/src/vespa/slobrok/server/slobrokserver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,16 @@ VESPA_THREAD_STACK_TAG(slobrok_server_thread);

SlobrokServer::SlobrokServer(ConfigShim &shim)
: _env(shim),
_thread(*this, slobrok_server_thread)
_thread()
{
_thread.start();
_thread = vespalib::Thread::start(*this, slobrok_server_thread);
}

SlobrokServer::SlobrokServer(uint32_t port)
: _env(ConfigShim(port)),
_thread(*this, slobrok_server_thread)
_thread()
{
_thread.start();
_thread = vespalib::Thread::start(*this, slobrok_server_thread);
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,8 +296,7 @@ TEST_F(OperationAbortingTest, wait_for_current_operation_completion_for_aborted_
auto abortCmd = makeAbortCmd(abortSet);

SendTask sendTask(abortCmd, *_queueBarrier, c.top);
vespalib::Thread thread(sendTask, test_thread);
thread.start();
auto thread = vespalib::Thread::start(sendTask, test_thread);

LOG(debug, "waiting for threads to reach barriers");
_queueBarrier->await();
Expand All @@ -306,7 +305,6 @@ TEST_F(OperationAbortingTest, wait_for_current_operation_completion_for_aborted_
LOG(debug, "waiting on completion barrier");
_completionBarrier->await();

thread.stop();
thread.join();

// If waiting works, put reply shall always be ordered before the internal
Expand Down
3 changes: 1 addition & 2 deletions vbench/src/apps/vbench/vbench.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,7 @@ int run(const std::string &cfg_name) {
VBench vbench(cfg);
NotifyDone notify(done);
vespalib::RunnablePair runBoth(vbench, notify);
vespalib::Thread thread(runBoth, vbench_thread);
thread.start();
auto thread = vespalib::Thread::start(runBoth, vbench_thread);
while (!SIG::INT.check() && !SIG::TERM.check() && !done.await(1s)) {}
if (!done.await(vespalib::duration::zero())) {
vbench.abort();
Expand Down
6 changes: 2 additions & 4 deletions vbench/src/tests/dispatcher/dispatcher_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,9 @@ TEST("dispatcher") {
Dispatcher<int> dispatcher(dropped);
Fetcher fetcher1(dispatcher, handler1);
Fetcher fetcher2(dispatcher, handler2);
vespalib::Thread thread1(fetcher1, fetcher1_thread);
vespalib::Thread thread2(fetcher2, fetcher2_thread);
thread1.start();
auto thread1 = vespalib::Thread::start(fetcher1, fetcher1_thread);
EXPECT_TRUE(dispatcher.waitForThreads(1, 512));
thread2.start();
auto thread2 = vespalib::Thread::start(fetcher2, fetcher2_thread);
EXPECT_TRUE(dispatcher.waitForThreads(2, 512));
EXPECT_EQUAL(-1, dropped.value);
EXPECT_EQUAL(-1, handler1.value);
Expand Down
3 changes: 2 additions & 1 deletion vbench/src/tests/handler_thread/handler_thread_test.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include <vespa/vespalib/testkit/testapp.h>
#include <vbench/test/all.h>
#include <vespa/vespalib/util/time.h>

using namespace vbench;

Expand All @@ -9,7 +10,7 @@ struct MyHandler : Handler<int> {
~MyHandler() override;
void handle(std::unique_ptr<int> value) override {
values.push_back(*value);
vespalib::Thread::sleep(10); // for improved coverage
std::this_thread::sleep_for(10ms);
}
};

Expand Down
3 changes: 2 additions & 1 deletion vbench/src/tests/timer/timer_test.cpp
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include <vespa/vespalib/testkit/testapp.h>
#include <vbench/test/all.h>
#include <vespa/vespalib/util/time.h>

using namespace vbench;

IGNORE_TEST("timer") {
Timer timer;
EXPECT_APPROX(0.0, timer.sample(), 0.1);
vespalib::Thread::sleep(1000);
std::this_thread::sleep_for(1000ms);
EXPECT_APPROX(1.0, timer.sample(), 0.1);
timer.reset();
EXPECT_APPROX(0.0, timer.sample(), 0.1);
Expand Down
3 changes: 2 additions & 1 deletion vbench/src/vbench/core/dispatcher.hpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.

#include <vespa/vespalib/util/thread.h>
#include <vespa/vespalib/util/time.h>

namespace vbench {

Expand All @@ -22,7 +23,7 @@ Dispatcher<T>::waitForThreads(size_t threads, size_t pollCnt) const
{
for (size_t i = 0; i < pollCnt; ++i) {
if (i != 0) {
vespalib::Thread::sleep(20);
std::this_thread::sleep_for(20ms);
}
{
std::lock_guard guard(_lock);
Expand Down
7 changes: 3 additions & 4 deletions vbench/src/vbench/core/handler_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
#include <vespa/vespalib/util/arrayqueue.hpp>
#include <vespa/vespalib/util/thread.h>
#include <vespa/vespalib/util/runnable.h>
#include <vespa/vespalib/util/joinable.h>
#include <condition_variable>

namespace vbench {

Expand All @@ -19,8 +19,7 @@ namespace vbench {
**/
template <typename T>
class HandlerThread : public Handler<T>,
public vespalib::Runnable,
public vespalib::Joinable
public vespalib::Runnable
{
private:
std::mutex _lock;
Expand All @@ -36,7 +35,7 @@ class HandlerThread : public Handler<T>,
HandlerThread(Handler<T> &next, init_fun_t init_fun);
~HandlerThread();
void handle(std::unique_ptr<T> obj) override;
void join() override;
void join();
};

} // namespace vbench
Expand Down
4 changes: 2 additions & 2 deletions vbench/src/vbench/core/handler_thread.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ HandlerThread<T>::HandlerThread(Handler<T> &next, init_fun_t init_fun)
_cond(),
_queue(),
_next(next),
_thread(*this, init_fun),
_thread(),
_done(false)
{
_thread.start();
_thread = vespalib::Thread::start(*this, init_fun);
}

template <typename T>
Expand Down
24 changes: 18 additions & 6 deletions vbench/src/vbench/vbench/request_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include "request_scheduler.h"
#include <vbench/core/timer.h>
#include <vespa/vespalib/util/time.h>

namespace vbench {

Expand All @@ -13,14 +14,18 @@ RequestScheduler::run()
{
double sleepTime;
std::vector<Request::UP> list;
vespalib::Thread &thread = vespalib::Thread::currentThread();
while (_queue.extract(_timer.sample(), list, sleepTime)) {
for (size_t i = 0; i < list.size(); ++i) {
Request::UP request = Request::UP(list[i].release());
_dispatcher.handle(std::move(request));
}
list.clear();
thread.slumber(sleepTime);
{
auto guard = std::unique_lock(_lock);
if (_may_slumber) {
_cond.wait_for(guard, std::chrono::duration<double,std::milli>(sleepTime));
}
}
}
}

Expand All @@ -30,9 +35,12 @@ RequestScheduler::RequestScheduler(CryptoEngine::SP crypto, Handler<Request> &ne
_queue(10.0, 0.020),
_droppedTagger(_proxy),
_dispatcher(_droppedTagger),
_thread(*this, vbench_request_scheduler_thread),
_thread(),
_connectionPool(std::move(crypto), _timer),
_workers()
_workers(),
_lock(),
_cond(),
_may_slumber(true)
{
for (size_t i = 0; i < numWorkers; ++i) {
_workers.push_back(std::make_unique<Worker>(_dispatcher, _proxy, _connectionPool, _timer));
Expand All @@ -45,7 +53,11 @@ RequestScheduler::abort()
{
_queue.close();
_queue.discard();
_thread.stop();
{
auto guard = std::lock_guard(_lock);
_may_slumber = false;
_cond.notify_all();
}
}

void
Expand All @@ -59,7 +71,7 @@ void
RequestScheduler::start()
{
_timer.reset();
_thread.start();
_thread = vespalib::Thread::start(*this, vbench_request_scheduler_thread);
}

RequestScheduler &
Expand Down
17 changes: 10 additions & 7 deletions vbench/src/vbench/vbench/request_scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
#include <vbench/core/time_queue.h>
#include <vbench/core/dispatcher.h>
#include <vbench/core/handler_thread.h>
#include <vespa/vespalib/util/active.h>
#include <mutex>
#include <condition_variable>

namespace vbench {

Expand All @@ -17,8 +18,7 @@ namespace vbench {
* with.
**/
class RequestScheduler : public Handler<Request>,
public vespalib::Runnable,
public vespalib::Active
public vespalib::Runnable
{
private:
Timer _timer;
Expand All @@ -29,17 +29,20 @@ class RequestScheduler : public Handler<Request>,
vespalib::Thread _thread;
HttpConnectionPool _connectionPool;
std::vector<Worker::UP> _workers;

std::mutex _lock;
std::condition_variable _cond;
bool _may_slumber;

void run() override;
public:
using UP = std::unique_ptr<RequestScheduler>;
using CryptoEngine = vespalib::CryptoEngine;
RequestScheduler(CryptoEngine::SP crypto, Handler<Request> &next, size_t numWorkers);
void abort();
void handle(Request::UP request) override;
void start() override;
RequestScheduler &stop() override;
void join() override;
void start();
RequestScheduler &stop();
void join();
};

} // namespace vbench
5 changes: 2 additions & 3 deletions vbench/src/vbench/vbench/vbench.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ VBench::VBench(const vespalib::Slime &cfg)
}
inputChain->generator = _factory.createGenerator(generator, *inputChain->taggers.back());
if (inputChain->generator.get() != 0) {
inputChain->thread.reset(new vespalib::Thread(*inputChain->generator, vbench_inputchain_generator));
_inputs.push_back(std::move(inputChain));
}
}
Expand All @@ -101,10 +100,10 @@ VBench::run()
{
_scheduler->start();
for (size_t i = 0; i < _inputs.size(); ++i) {
_inputs[i]->thread->start();
_inputs[i]->thread = vespalib::Thread::start(*_inputs[i]->generator, vbench_inputchain_generator);
}
for (size_t i = 0; i < _inputs.size(); ++i) {
_inputs[i]->thread->join();
_inputs[i]->thread.join();
}
_scheduler->stop().join();
for (size_t i = 0; i < _inputs.size(); ++i) {
Expand Down
2 changes: 1 addition & 1 deletion vbench/src/vbench/vbench/vbench.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class VBench : public vespalib::Runnable,
using UP = std::unique_ptr<InputChain>;
std::vector<Tagger::UP> taggers;
Generator::UP generator;
std::unique_ptr<vespalib::Thread> thread;
vespalib::Thread thread;
};
NativeFactory _factory;
std::vector<Analyzer::UP> _analyzers;
Expand Down
4 changes: 2 additions & 2 deletions vbench/src/vbench/vbench/worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@ Worker::run()

Worker::Worker(Provider<Request> &provider, Handler<Request> &next,
HttpConnectionPool &pool, Timer &timer)
: _thread(*this, vbench_worker_thread),
: _thread(),
_provider(provider),
_next(next),
_pool(pool),
_timer(timer)
{
_thread.start();
_thread = vespalib::Thread::start(*this, vbench_worker_thread);
}

} // namespace vbench
6 changes: 2 additions & 4 deletions vbench/src/vbench/vbench/worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
#include <vbench/http/http_connection_pool.h>
#include <vespa/vespalib/util/runnable.h>
#include <vespa/vespalib/util/thread.h>
#include <vespa/vespalib/util/joinable.h>

namespace vbench {

Expand All @@ -18,8 +17,7 @@ namespace vbench {
* internal thread that will stop when the request provider starts
* handing out empty requests.
**/
class Worker : public vespalib::Runnable,
public vespalib::Joinable
class Worker : public vespalib::Runnable
{
private:
vespalib::Thread _thread;
Expand All @@ -33,7 +31,7 @@ class Worker : public vespalib::Runnable,
using UP = std::unique_ptr<Worker>;
Worker(Provider<Request> &provider, Handler<Request> &next,
HttpConnectionPool &pool, Timer &timer);
void join() override { _thread.join(); }
void join() { _thread.join(); }
};

} // namespace vbench
Loading

0 comments on commit 707663e

Please sign in to comment.