From 9d2e4021bb933763b18a23d120fb2963c697be83 Mon Sep 17 00:00:00 2001 From: David Harabagiu Date: Fri, 30 Jul 2021 13:17:53 +0300 Subject: [PATCH] Integration test: Start/stop nodes works --- sand/libsand/api/src/sandnodeimpl.cpp | 1 + .../flows/inc/private/peermanagerflowimpl.hpp | 1 + .../libsand/flows/src/peermanagerflowimpl.cpp | 25 +++- .../storage/src/filestoragemetadataimpl.cpp | 3 +- .../test/fakenet/inc/public/fakenet.hpp | 1 + .../test/fakenet/inc/public/tcpsenderimpl.hpp | 2 + .../test/fakenet/inc/public/tcpserverimpl.hpp | 11 +- sand/libsand/test/fakenet/src/fakenet.cpp | 6 + .../test/fakenet/src/tcpsenderimpl.cpp | 5 + .../test/fakenet/src/tcpserverimpl.cpp | 25 +++- .../src/sandnode_test.cpp | 118 +++++++++++++----- 11 files changed, 156 insertions(+), 42 deletions(-) diff --git a/sand/libsand/api/src/sandnodeimpl.cpp b/sand/libsand/api/src/sandnodeimpl.cpp index 4cbe1b1..ee7b9b1 100644 --- a/sand/libsand/api/src/sandnodeimpl.cpp +++ b/sand/libsand/api/src/sandnodeimpl.cpp @@ -219,6 +219,7 @@ bool SANDNodeImpl::start() return false; } + set_state(State::RUNNING); return true; } diff --git a/sand/libsand/flows/inc/private/peermanagerflowimpl.hpp b/sand/libsand/flows/inc/private/peermanagerflowimpl.hpp index 6a63d72..ef2c040 100644 --- a/sand/libsand/flows/inc/private/peermanagerflowimpl.hpp +++ b/sand/libsand/flows/inc/private/peermanagerflowimpl.hpp @@ -62,6 +62,7 @@ class PeerManagerFlowImpl : public PeerManagerFlow void wait_for_reply_confirmation(std::future future, protocol::RequestId msg_id); std::future register_to_dnl(); void register_to_dnl_loop(const std::shared_ptr> &promise); + void say_bye(network::IPv4Address to); void say_bye_to_peers(); std::future ping_peers(); std::vector pick_peers( diff --git a/sand/libsand/flows/src/peermanagerflowimpl.cpp b/sand/libsand/flows/src/peermanagerflowimpl.cpp index fb3a4b9..e189703 100644 --- a/sand/libsand/flows/src/peermanagerflowimpl.cpp +++ b/sand/libsand/flows/src/peermanagerflowimpl.cpp @@ -71,7 +71,11 @@ PeerManagerFlowImpl::~PeerManagerFlowImpl() inbound_request_dispatcher_->unset_callback(); inbound_request_dispatcher_->unset_callback(); inbound_request_dispatcher_->unset_callback(); - stop_impl(); + + if (state() == State::RUNNING) + { + stop_impl(); + } } void PeerManagerFlowImpl::start() @@ -119,7 +123,7 @@ void PeerManagerFlowImpl::start() LOG(WARNING) << "No peers were preloaded. Maybe later some will be " "available."; } - else if (peers_.size() != size_t(initial_peer_count_)) + else if (peers_.size() < size_t(initial_peer_count_)) { LOG(INFO) << "Preloaded peer list with " << peers_.size() << " addresses, less than the configured amount (" @@ -478,6 +482,13 @@ void PeerManagerFlowImpl::register_to_dnl_loop(const std::shared_ptr(); + bye->request_id = rng_.next(); + protocol_message_handler_->send(to, std::move(bye)); +} + void PeerManagerFlowImpl::say_bye_to_peers() { if (state_ != State::STOPPING) @@ -489,9 +500,13 @@ void PeerManagerFlowImpl::say_bye_to_peers() for (network::IPv4Address addr : peers_) { - auto bye = std::make_unique(); - bye->request_id = rng_.next(); - protocol_message_handler_->send(addr, std::move(bye)); + say_bye(addr); + } + + // Say bye to a DNL node + if (auto dnl_addr = dnl_config_->random_pick(); dnl_addr) + { + say_bye(dnl_addr); } } diff --git a/sand/libsand/storage/src/filestoragemetadataimpl.cpp b/sand/libsand/storage/src/filestoragemetadataimpl.cpp index 47382ce..c113606 100644 --- a/sand/libsand/storage/src/filestoragemetadataimpl.cpp +++ b/sand/libsand/storage/src/filestoragemetadataimpl.cpp @@ -144,7 +144,8 @@ void FileStorageMetadataImpl::parse_metadata_file() void FileStorageMetadataImpl::write_metadata_file() const { - nlohmann::json json_root; + auto json_root = nlohmann::json::array(); + for (const auto &[hash, name] : hash_to_name_map_) { auto &e = json_root.emplace_back(); diff --git a/sand/libsand/test/fakenet/inc/public/fakenet.hpp b/sand/libsand/test/fakenet/inc/public/fakenet.hpp index be4b8d8..2a3d20b 100644 --- a/sand/libsand/test/fakenet/inc/public/fakenet.hpp +++ b/sand/libsand/test/fakenet/inc/public/fakenet.hpp @@ -28,6 +28,7 @@ class FakeNet Address set_sender_ptr(SenderPtr sender); Address set_server_ptr(ServerPtr server); ServerPtr get_server_ptr(Address addr) const; + void remove_node(Address addr); private: struct Node diff --git a/sand/libsand/test/fakenet/inc/public/tcpsenderimpl.hpp b/sand/libsand/test/fakenet/inc/public/tcpsenderimpl.hpp index bbd7781..c05a9d0 100644 --- a/sand/libsand/test/fakenet/inc/public/tcpsenderimpl.hpp +++ b/sand/libsand/test/fakenet/inc/public/tcpsenderimpl.hpp @@ -22,6 +22,8 @@ class TCPSenderImpl : public TCPSender , my_address_ {fake_net_.set_sender_ptr(this)} {} + ~TCPSenderImpl() override; + std::future send( IPv4Address to, unsigned short port, const uint8_t *data, size_t len) override; diff --git a/sand/libsand/test/fakenet/inc/public/tcpserverimpl.hpp b/sand/libsand/test/fakenet/inc/public/tcpserverimpl.hpp index 97d50d7..0a21557 100644 --- a/sand/libsand/test/fakenet/inc/public/tcpserverimpl.hpp +++ b/sand/libsand/test/fakenet/inc/public/tcpserverimpl.hpp @@ -7,6 +7,7 @@ #include "singleton.hpp" #include "tcpmessagelistener.hpp" #include "tcpserver.hpp" +#include "threadpool.hpp" namespace sand::network { @@ -16,9 +17,11 @@ class TCPServerImpl : public TCPServer template explicit TCPServerImpl(Ts &&...) : fake_net_ {Singleton::get()} - { - fake_net_.set_server_ptr(this); - } + , my_address_ {fake_net_.set_server_ptr(this)} + , thread_pool_ {1} + {} + + ~TCPServerImpl() override; bool register_listener(std::shared_ptr listener) override; bool unregister_listener(std::shared_ptr listener) override; @@ -27,7 +30,9 @@ class TCPServerImpl : public TCPServer private: FakeNet & fake_net_; + network::IPv4Address my_address_; utils::ListenerGroup listener_group_; + utils::ThreadPool thread_pool_; }; } // namespace sand::network diff --git a/sand/libsand/test/fakenet/src/fakenet.cpp b/sand/libsand/test/fakenet/src/fakenet.cpp index 346f8da..f0f5593 100644 --- a/sand/libsand/test/fakenet/src/fakenet.cpp +++ b/sand/libsand/test/fakenet/src/fakenet.cpp @@ -47,3 +47,9 @@ FakeNet::ServerPtr FakeNet::get_server_ptr(FakeNet::Address addr) const } return nullptr; } + +void FakeNet::remove_node(Address addr) +{ + std::lock_guard lock {mutex_}; + network_map_.erase(addr); +} diff --git a/sand/libsand/test/fakenet/src/tcpsenderimpl.cpp b/sand/libsand/test/fakenet/src/tcpsenderimpl.cpp index 0432627..de3ad4c 100644 --- a/sand/libsand/test/fakenet/src/tcpsenderimpl.cpp +++ b/sand/libsand/test/fakenet/src/tcpsenderimpl.cpp @@ -4,6 +4,11 @@ namespace sand::network { +TCPSenderImpl::~TCPSenderImpl() +{ + fake_net_.remove_node(my_address_); +} + std::future TCPSenderImpl::send( IPv4Address to, unsigned short /*port*/, const uint8_t *data, size_t len) { diff --git a/sand/libsand/test/fakenet/src/tcpserverimpl.cpp b/sand/libsand/test/fakenet/src/tcpserverimpl.cpp index 75a5391..069d316 100644 --- a/sand/libsand/test/fakenet/src/tcpserverimpl.cpp +++ b/sand/libsand/test/fakenet/src/tcpserverimpl.cpp @@ -1,7 +1,21 @@ #include "tcpserverimpl.hpp" +#include +#include +#include + namespace sand::network { +namespace +{ +constexpr long network_time_ms {25}; +} // namespace + +TCPServerImpl::~TCPServerImpl() +{ + fake_net_.remove_node(my_address_); +} + bool TCPServerImpl::register_listener(std::shared_ptr listener) { return listener_group_.add(listener); @@ -14,6 +28,15 @@ bool TCPServerImpl::unregister_listener(std::shared_ptr list void TCPServerImpl::inject_message(sand::network::IPv4Address from, const uint8_t *data, size_t len) { - listener_group_.notify(&TCPMessageListener::on_message_received, from, data, len); + /* + * Offload to another thread to allow further processing in the node before a reply will be + * received. + */ + thread_pool_.add_job( + [this, from, msg = std::vector(data, data + len)](const utils::CompletionToken &) { + std::this_thread::sleep_for(std::chrono::milliseconds {network_time_ms}); + listener_group_.notify( + &TCPMessageListener::on_message_received, from, msg.data(), msg.size()); + }); } } // namespace sand::network diff --git a/sand/libsand/test/integration_test_fnet/src/sandnode_test.cpp b/sand/libsand/test/integration_test_fnet/src/sandnode_test.cpp index fb91377..6d138e8 100644 --- a/sand/libsand/test/integration_test_fnet/src/sandnode_test.cpp +++ b/sand/libsand/test/integration_test_fnet/src/sandnode_test.cpp @@ -6,14 +6,16 @@ #include #include #include +#include +#include #include #include #include #include #include "fakenet.hpp" -#include "iothreadpool.hpp" #include "messages.hpp" +#include "random.hpp" #include "sandnode.hpp" #include "singleton.hpp" #include "tcpmessagelistener.hpp" @@ -25,9 +27,8 @@ using namespace ::sand; using namespace ::sand::network; using namespace ::sand::protocol; using namespace ::sand::utils; -using namespace std::literals::chrono_literals; - -namespace fs = std::filesystem; +using namespace ::std::chrono_literals; +namespace fs = ::std::filesystem; namespace { @@ -65,6 +66,8 @@ class SandNodeTest : public TestWithParam> void TearDown() override { + dnl_node_sender_.reset(); + dnl_node_server_.reset(); fs_cleanup(); Singleton::reset(); } @@ -107,37 +110,78 @@ class SandNodeTest : public TestWithParam> dnl_node_server_->register_listener(dnl_node_message_listener_); } - void handle_message_as_dnl_node(IPv4Address from, const uint8_t *data, size_t len) + void handle_message_as_dnl_node(IPv4Address from, const uint8_t *data, size_t /*len*/) { - /* - * Offload to another thread to allow further processing in the node before a reply will be - * received. - */ - thread_pool_.add_job([this, from, request = std::vector(data, data + len)]( - const CompletionToken &) { - std::cout << "Got request of size " << request.size() << " from " - << conversion::to_string(from) << '\n'; - - std::this_thread::sleep_for(10ms); - - if (request[0] == uint8_t(MessageCode::PUSH)) + if (data[0] == uint8_t(MessageCode::PUSH)) + { + // Add to node address list { - // Add to node address list + std::lock_guard lock {mutex_}; node_addresses_.push_back(from); + } - // Construct reply - std::vector reply(11); - reply[0] = uint8_t(MessageCode::REPLY); - std::copy_n(request.data() + 1, 8, reply.data() + 1); // Request ID - reply[9] = uint8_t(StatusCode::OK); - reply[10] = uint8_t(MessageCode::PUSH); + // Construct reply + std::vector reply(11); + reply[0] = uint8_t(MessageCode::REPLY); + std::copy_n(data + 1, 8, reply.data() + 1); // Request ID + reply[9] = uint8_t(StatusCode::OK); + reply[10] = uint8_t(MessageCode::PUSH); - // Send reply - ASSERT_TRUE(dnl_node_sender_->send(from, 0, reply.data(), reply.size()).get()); + // Send reply + ASSERT_TRUE(dnl_node_sender_->send(from, 0, reply.data(), reply.size()).get()); + } + else if (data[0] == uint8_t(MessageCode::PULL)) + { + size_t addr_count = data[9]; + std::vector addrs; + + // Pick some nodes + { + std::lock_guard lock {mutex_}; + addrs = pick_addresses(addr_count, from); } - else if (request[0] == uint8_t(MessageCode::PULL)) - {} - }); + + // Construct reply + std::vector reply(12 + 4 * addrs.size()); + reply[0] = uint8_t(MessageCode::REPLY); + std::copy_n(data + 1, 8, reply.data() + 1); // Request ID + reply[9] = uint8_t(StatusCode::OK); + reply[10] = uint8_t(MessageCode::PULL); + reply[11] = uint8_t(addrs.size()); + for (size_t i = 0; i != addrs.size(); ++i) + { + std::copy_n( + reinterpret_cast(&addrs[i]), 4, reply.data() + 12 + i * 4); + } + + // Send reply + ASSERT_TRUE(dnl_node_sender_->send(from, 0, reply.data(), reply.size()).get()); + } + else if (data[0] == uint8_t(MessageCode::BYE)) + { + std::lock_guard lock {mutex_}; + ++bye_msg_count_; + } + else + { + FAIL(); + } + } + + std::vector pick_addresses(size_t cnt, IPv4Address exclude) + { + std::set result; + cnt = std::min(cnt, node_addresses_.size() - 1); + for (size_t i = 0; i != cnt; ++i) + { + IPv4Address a; + do + { + a = node_addresses_[rng_.next(node_addresses_.size() - 1)]; + } while (result.count(a) != 0 || a == exclude); + result.insert(a); + } + return std::vector(result.cbegin(), result.cend()); } static size_t number_of_nodes() @@ -158,7 +202,9 @@ class SandNodeTest : public TestWithParam> IPv4Address dnl_address_; std::vector> nodes_; std::vector node_addresses_; - IOThreadPool thread_pool_; + size_t bye_msg_count_ = 0; + Random rng_; + std::mutex mutex_; static constexpr char const *config_file_name_ = "config.json"; @@ -201,7 +247,7 @@ class SandNodeTest : public TestWithParam> }; } // namespace -TEST_P(SandNodeTest, Init_Transfer_Uninit) +TEST_P(SandNodeTest, Start_Transfer_Stop) { // Init nodes for (size_t i = 0; i != number_of_nodes(); ++i) @@ -212,12 +258,20 @@ TEST_P(SandNodeTest, Init_Transfer_Uninit) nodes_.push_back(std::move(node)); } + // Wait for all nodes to fill their peer list + std::this_thread::sleep_for(1s); + // Stop nodes - for (const auto &node : nodes_) + for (auto &node : nodes_) { ASSERT_TRUE(node->stop()); } nodes_.clear(); + + // Wait for all BYE messages to arrive + std::this_thread::sleep_for(1s); + + ASSERT_EQ(number_of_nodes(), bye_msg_count_); } INSTANTIATE_TEST_SUITE_P(SandNodeTests, SandNodeTest, Values(std::make_tuple(20, 512)));