From 837987eb3821345a3ae0b207841556a7f6043eb1 Mon Sep 17 00:00:00 2001 From: Sam Stuewe Date: Tue, 30 Apr 2024 12:05:31 -0500 Subject: [PATCH] [WIP] fix: ensure tests pass In particular, several of the commits in this set assume their `blocking_queue`s will be empty by the time the destructor is called. However, this is not guaranteeable, and causes segfaults and/or indefinite hangs when encountered. This commit predominantly ensures that the queues are all `clear()`d appropriately. Signed-off-by: Sam Stuewe --- CMakeLists.txt | 2 +- benchmarks/CMakeLists.txt | 2 ++ src/uhs/twophase/coordinator/controller.cpp | 12 +++---- src/uhs/twophase/locking_shard/controller.cpp | 30 +++++++++++----- src/uhs/twophase/locking_shard/controller.hpp | 8 +++-- src/uhs/twophase/sentinel_2pc/controller.cpp | 36 +++++++++++-------- src/uhs/twophase/sentinel_2pc/controller.hpp | 2 +- src/uhs/twophase/sentinel_2pc/server.cpp | 6 ++-- src/uhs/twophase/sentinel_2pc/server.hpp | 6 +++- src/util/raft/rpc_server.hpp | 13 +++---- tests/unit/sentinel_2pc/controller_test.cpp | 13 +++---- 11 files changed, 81 insertions(+), 49 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 0e23aa462..1d838b2ae 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -54,7 +54,7 @@ if(DEFINED CMAKE_PREFIX_PATH) endif() if(CMAKE_BUILD_TYPE STREQUAL "Debug") - add_compile_options(-fprofile-arcs -ftest-coverage) + add_compile_options(-fprofile-arcs -ftest-coverage -Og -ggdb3) endif() if(CMAKE_BUILD_TYPE STREQUAL "Debug") diff --git a/benchmarks/CMakeLists.txt b/benchmarks/CMakeLists.txt index d616364f0..782cbb270 100644 --- a/benchmarks/CMakeLists.txt +++ b/benchmarks/CMakeLists.txt @@ -18,6 +18,7 @@ target_link_libraries(run_benchmarks ${GTEST_LIBRARY} shard watchtower locking_shard + raft transaction rpc network @@ -26,4 +27,5 @@ target_link_libraries(run_benchmarks ${GTEST_LIBRARY} crypto secp256k1 ${LEVELDB_LIBRARY} + ${NURAFT_LIBRARY} ${CMAKE_THREAD_LIBS_INIT}) diff --git a/src/uhs/twophase/coordinator/controller.cpp b/src/uhs/twophase/coordinator/controller.cpp index aba46669e..9f2ddad8f 100644 --- a/src/uhs/twophase/coordinator/controller.cpp +++ b/src/uhs/twophase/coordinator/controller.cpp @@ -680,6 +680,7 @@ namespace cbdc::coordinator { m_start_thread.join(); } + m_attestation_check_queue.clear(); for(auto& t : m_attestation_check_threads) { if(t.joinable()) { t.join(); @@ -710,7 +711,7 @@ namespace cbdc::coordinator { tx, m_opts.m_sentinel_public_keys, m_opts.m_attestation_threshold); - cb(std::move(tx), valid); + cb(tx, valid); } } } @@ -718,7 +719,7 @@ namespace cbdc::coordinator { auto controller::check_tx_attestation(const transaction::compact_tx& tx, attestation_check_callback cb) -> bool { - m_attestation_check_queue.push({std::move(tx), std::move(cb)}); + m_attestation_check_queue.push({tx, std::move(cb)}); return true; } @@ -731,7 +732,7 @@ namespace cbdc::coordinator { } return check_tx_attestation( - std::move(tx), + tx, [&, res_cb = std::move(result_callback)](transaction::compact_tx tx2, bool result) { @@ -760,9 +761,8 @@ namespace cbdc::coordinator { auto idx = m_current_batch->add_tx(tx2); // Map the index of the tx to the transaction ID and // sentinel ID - m_current_txs->emplace( - tx2.m_id, - std::make_pair(std::move(res_cb), idx)); + m_current_txs->emplace(tx2.m_id, + std::make_pair(res_cb, idx)); return true; }(); if(added) { diff --git a/src/uhs/twophase/locking_shard/controller.cpp b/src/uhs/twophase/locking_shard/controller.cpp index 97f16a540..4f6eaa37a 100644 --- a/src/uhs/twophase/locking_shard/controller.cpp +++ b/src/uhs/twophase/locking_shard/controller.cpp @@ -17,7 +17,7 @@ namespace cbdc::locking_shard { controller::controller(size_t shard_id, size_t node_id, - const cbdc::config::options& opts, + cbdc::config::options opts, std::shared_ptr logger) : m_opts(std::move(opts)), m_logger(std::move(logger)), @@ -112,6 +112,18 @@ namespace cbdc::locking_shard { return true; } + controller::~controller() { + m_running = false; + m_validation_queue.clear(); + for(auto& t : m_validation_threads) { + if(t.joinable()) { + t.join(); + } + } + m_validation_threads.clear(); + m_server.reset(); + } + auto controller::raft_callback(nuraft::cb_func::Type type, nuraft::cb_func::Param* /* param */) -> nuraft::cb_func::ReturnCode { @@ -158,24 +170,24 @@ namespace cbdc::locking_shard { auto v = validation_request(); if(m_validation_queue.pop(v)) { auto [req, cb] = v; - validate_request(std::move(req), std::move(cb)); + validate_request(std::move(req), cb); } } } auto - controller::enqueue_validation(cbdc::buffer buf, + controller::enqueue_validation(cbdc::buffer request, cbdc::raft::rpc::validation_callback cb) -> bool { - m_validation_queue.push({std::move(buf), std::move(cb)}); + m_validation_queue.push({std::move(request), std::move(cb)}); return true; } - auto controller::validate_request(cbdc::buffer buf, - cbdc::raft::rpc::validation_callback cb) - -> bool { + auto controller::validate_request( + cbdc::buffer request, + const cbdc::raft::rpc::validation_callback& cb) -> bool { auto maybe_req - = cbdc::from_buffer>(buf); + = cbdc::from_buffer>(request); auto valid = true; if(maybe_req) { valid = std::visit( @@ -208,7 +220,7 @@ namespace cbdc::locking_shard { valid = false; } - cb(std::move(buf), valid); + cb(std::move(request), valid); return true; } } diff --git a/src/uhs/twophase/locking_shard/controller.hpp b/src/uhs/twophase/locking_shard/controller.hpp index 50e0c9bbe..28a5faef3 100644 --- a/src/uhs/twophase/locking_shard/controller.hpp +++ b/src/uhs/twophase/locking_shard/controller.hpp @@ -26,9 +26,10 @@ namespace cbdc::locking_shard { /// \param logger log to use for output. controller(size_t shard_id, size_t node_id, - const cbdc::config::options& opts, + cbdc::config::options opts, std::shared_ptr logger); - ~controller() = default; + + ~controller(); controller() = delete; controller(const controller&) = delete; @@ -48,7 +49,8 @@ namespace cbdc::locking_shard { nuraft::cb_func::Param* param) -> nuraft::cb_func::ReturnCode; auto validate_request(cbdc::buffer request, - cbdc::raft::rpc::validation_callback cb) -> bool; + const cbdc::raft::rpc::validation_callback& cb) + -> bool; auto enqueue_validation(cbdc::buffer request, cbdc::raft::rpc::validation_callback cb) diff --git a/src/uhs/twophase/sentinel_2pc/controller.cpp b/src/uhs/twophase/sentinel_2pc/controller.cpp index 24614d969..51360b164 100644 --- a/src/uhs/twophase/sentinel_2pc/controller.cpp +++ b/src/uhs/twophase/sentinel_2pc/controller.cpp @@ -111,19 +111,23 @@ namespace cbdc::sentinel_2pc { return true; } + controller::~controller() { + stop(); + } + void controller::validation_worker() { while(m_running) { auto v = queued_validation(); if(m_validation_queue.pop(v)) { auto [tx, cb] = v; - cb(std::move(tx), transaction::validation::check_tx(tx)); + cb(tx, transaction::validation::check_tx(tx)); } } } auto controller::validate_tx(const transaction::full_tx& tx, validation_callback cb) -> bool { - m_validation_queue.push({std::move(tx), std::move(cb)}); + m_validation_queue.push({tx, std::move(cb)}); return true; } @@ -133,14 +137,14 @@ namespace cbdc::sentinel_2pc { if(m_attestation_queue.pop(v)) { auto [tx, cb] = v; auto compact_tx = cbdc::transaction::compact_tx(tx); - cb(std::move(tx), compact_tx.sign(m_secp.get(), m_privkey)); + cb(tx, compact_tx.sign(m_secp.get(), m_privkey)); } } } auto controller::attest_tx(const transaction::full_tx& tx, attestation_callback cb) -> bool { - m_attestation_queue.push({std::move(tx), std::move(cb)}); + m_attestation_queue.push({tx, std::move(cb)}); return true; } @@ -148,7 +152,7 @@ namespace cbdc::sentinel_2pc { transaction::full_tx tx, execute_result_callback_type result_callback) -> bool { return controller::validate_tx( - std::move(tx), + tx, [&, result_callback]( const transaction::full_tx& tx2, std::optional err) { @@ -166,10 +170,7 @@ namespace cbdc::sentinel_2pc { } auto compact_tx = cbdc::transaction::compact_tx(tx2); - gather_attestations(std::move(tx2), - std::move(result_callback), - compact_tx, - {}); + gather_attestations(tx2, result_callback, compact_tx, {}); return; }); } @@ -194,7 +195,7 @@ namespace cbdc::sentinel_2pc { transaction::full_tx tx, validate_result_callback_type result_callback) -> bool { return controller::validate_tx( - std::move(tx), + tx, [&, result_callback]( const transaction::full_tx& tx2, std::optional err) { @@ -203,7 +204,7 @@ namespace cbdc::sentinel_2pc { return; } controller::attest_tx( - std::move(tx2), + tx2, [&, result_callback]( const transaction::full_tx& /* tx3 */, std::optional res) { @@ -233,17 +234,24 @@ namespace cbdc::sentinel_2pc { void controller::stop() { m_running = false; + m_rpc_server.reset(); + + m_validation_queue.clear(); + m_attestation_queue.clear(); + for(auto& t : m_validation_threads) { if(t.joinable()) { t.join(); } } + m_validation_threads.clear(); for(auto& t : m_attestation_threads) { if(t.joinable()) { t.join(); } } + m_attestation_threads.clear(); } void controller::gather_attestations( @@ -252,10 +260,10 @@ namespace cbdc::sentinel_2pc { const transaction::compact_tx& ctx, std::unordered_set requested) { if(ctx.m_attestations.size() < m_opts.m_attestation_threshold) { - if(ctx.m_attestations.size() == 0) { + if(ctx.m_attestations.empty()) { // Self-attest first controller::attest_tx( - std::move(tx), + tx, [&, ctx, result_callback](const transaction::full_tx& tx2, validate_result res) { validate_result_handler(res, @@ -297,7 +305,7 @@ namespace cbdc::sentinel_2pc { void controller::send_compact_tx(const transaction::compact_tx& ctx, execute_result_callback_type result_callback) { - auto cb = [&, this, ctx, res_cb = std::move(result_callback)]( + auto cb = [&, ctx, res_cb = std::move(result_callback)]( std::optional res) { result_handler(res, res_cb); }; diff --git a/src/uhs/twophase/sentinel_2pc/controller.hpp b/src/uhs/twophase/sentinel_2pc/controller.hpp index 7d059a703..890158a61 100644 --- a/src/uhs/twophase/sentinel_2pc/controller.hpp +++ b/src/uhs/twophase/sentinel_2pc/controller.hpp @@ -38,7 +38,7 @@ namespace cbdc::sentinel_2pc { const config::options& opts, std::shared_ptr logger); - ~controller() override = default; + ~controller() override; /// Initializes the controller. Connects to the shard coordinator /// network and launches a server thread for external clients. diff --git a/src/uhs/twophase/sentinel_2pc/server.cpp b/src/uhs/twophase/sentinel_2pc/server.cpp index f8f8abd99..ad2736508 100644 --- a/src/uhs/twophase/sentinel_2pc/server.cpp +++ b/src/uhs/twophase/sentinel_2pc/server.cpp @@ -17,18 +17,20 @@ namespace cbdc::sentinel::rpc { m_srv->register_handler_callback( [&](const request& req, async_interface::result_callback_type callback) { - auto req_item = request_queue_t{req, callback}; + auto req_item = request_queue_t{req, std::move(callback)}; m_request_queue.push(req_item); return true; }); } - bool operator<(const request_queue_t& a, const request_queue_t& b) { + auto operator<(const request_queue_t& a, const request_queue_t& b) + -> bool { // Prioritize validate requests over execute requests return (std::holds_alternative(a.m_req) && std::holds_alternative(b.m_req)); } async_server::~async_server() { m_running = false; + m_request_queue.clear(); if(m_processing_thread.joinable()) { m_processing_thread.join(); } diff --git a/src/uhs/twophase/sentinel_2pc/server.hpp b/src/uhs/twophase/sentinel_2pc/server.hpp index ffd676f63..bdd785bcb 100644 --- a/src/uhs/twophase/sentinel_2pc/server.hpp +++ b/src/uhs/twophase/sentinel_2pc/server.hpp @@ -18,7 +18,7 @@ namespace cbdc::sentinel::rpc { async_interface::result_callback_type m_cb; }; - bool operator<(const request_queue_t& a, const request_queue_t& b); + auto operator<(const request_queue_t& a, const request_queue_t& b) -> bool; /// Asynchronous RPC server for a sentinel. class async_server { public: @@ -33,6 +33,10 @@ namespace cbdc::sentinel::rpc { std::unique_ptr> srv); ~async_server(); + async_server(async_server&&) noexcept = default; + auto operator=(async_server&&) noexcept -> async_server& = default; + async_server(const async_server&) = default; + auto operator=(const async_server&) -> async_server& = default; private: void process(); diff --git a/src/util/raft/rpc_server.hpp b/src/util/raft/rpc_server.hpp index 879ec0f1e..70420168f 100644 --- a/src/util/raft/rpc_server.hpp +++ b/src/util/raft/rpc_server.hpp @@ -26,7 +26,7 @@ namespace cbdc::raft::rpc { /// \param impl pointer to the raft node. /// \see cbdc::rpc::server void register_raft_node(std::shared_ptr impl) { - register_raft_node(impl, std::nullopt); + register_raft_node(std::move(impl), std::nullopt); } /// Registers the raft node whose state machine handles RPC requests @@ -42,10 +42,11 @@ namespace cbdc::raft::rpc { if(validate.has_value()) { m_validate_func = std::move(validate.value()); } else { - m_validate_func = [&](buffer b, validation_callback cb) { - cb(std::move(b), true); - return true; - }; + m_validate_func + = [&](buffer b, const validation_callback& cb) { + cb(std::move(b), true); + return true; + }; } cbdc::rpc::raw_async_server::register_handler_callback( [&](buffer req, response_callback_type resp_cb) { @@ -84,7 +85,7 @@ namespace cbdc::raft::rpc { auto success = m_impl->replicate( new_log, - [&, resp_cb = std::move(res_cb), req_buf = new_log]( + [&, resp_cb = res_cb, req_buf = new_log]( result_type& r, nuraft::ptr& err) { if(err) { diff --git a/tests/unit/sentinel_2pc/controller_test.cpp b/tests/unit/sentinel_2pc/controller_test.cpp index 4a35b6bf5..8513cfeeb 100644 --- a/tests/unit/sentinel_2pc/controller_test.cpp +++ b/tests/unit/sentinel_2pc/controller_test.cpp @@ -99,6 +99,10 @@ class sentinel_2pc_test : public ::testing::Test { std::unique_ptr m_ctl; cbdc::transaction::full_tx m_valid_tx{}; std::shared_ptr m_logger; + std::unique_ptr + m_secp{secp256k1_context_create(SECP256K1_CONTEXT_SIGN + | SECP256K1_CONTEXT_VERIFY), + &secp256k1_context_destroy}; }; TEST_F(sentinel_2pc_test, test_init) { @@ -170,17 +174,14 @@ TEST_F(sentinel_2pc_test, digest_valid_transaction_network) { TEST_F(sentinel_2pc_test, tx_validation_test) { ASSERT_TRUE(m_ctl->init()); auto ctx = cbdc::transaction::compact_tx(m_valid_tx); - auto secp = std::unique_ptr{ - secp256k1_context_create(SECP256K1_CONTEXT_SIGN - | SECP256K1_CONTEXT_VERIFY), - &secp256k1_context_destroy}; auto res = m_ctl->validate_transaction(m_valid_tx, [&](auto validation_res) { ASSERT_TRUE(validation_res.has_value()); - ASSERT_TRUE(ctx.verify(secp.get(), validation_res.value())); + ASSERT_TRUE(ctx.verify(m_secp.get(), validation_res.value())); }); ASSERT_TRUE(res); + // ensures the validation callback has completed before we go out-of-scope + m_ctl->stop(); } TEST_F(sentinel_2pc_test, bad_coordinator_endpoint) {