Skip to content

Commit

Permalink
Integration test: Start/stop nodes works
Browse files Browse the repository at this point in the history
  • Loading branch information
davidharabagiu committed Jul 30, 2021
1 parent a84a40e commit 9d2e402
Show file tree
Hide file tree
Showing 11 changed files with 156 additions and 42 deletions.
1 change: 1 addition & 0 deletions sand/libsand/api/src/sandnodeimpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ bool SANDNodeImpl::start()
return false;
}

set_state(State::RUNNING);
return true;
}

Expand Down
1 change: 1 addition & 0 deletions sand/libsand/flows/inc/private/peermanagerflowimpl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ class PeerManagerFlowImpl : public PeerManagerFlow
void wait_for_reply_confirmation(std::future<bool> future, protocol::RequestId msg_id);
std::future<bool> register_to_dnl();
void register_to_dnl_loop(const std::shared_ptr<std::promise<bool>> &promise);
void say_bye(network::IPv4Address to);
void say_bye_to_peers();
std::future<void> ping_peers();
std::vector<network::IPv4Address> pick_peers(
Expand Down
25 changes: 20 additions & 5 deletions sand/libsand/flows/src/peermanagerflowimpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,11 @@ PeerManagerFlowImpl::~PeerManagerFlowImpl()
inbound_request_dispatcher_->unset_callback<protocol::PushMessage>();
inbound_request_dispatcher_->unset_callback<protocol::ByeMessage>();
inbound_request_dispatcher_->unset_callback<protocol::PingMessage>();
stop_impl();

if (state() == State::RUNNING)
{
stop_impl();
}
}

void PeerManagerFlowImpl::start()
Expand Down Expand Up @@ -119,7 +123,7 @@ void PeerManagerFlowImpl::start()
LOG(WARNING) << "No peers were preloaded. Maybe later some will be "
"available.";
}
else if (peers_.size() != size_t(initial_peer_count_))
else if (peers_.size() < size_t(initial_peer_count_))
{
LOG(INFO) << "Preloaded peer list with " << peers_.size()
<< " addresses, less than the configured amount ("
Expand Down Expand Up @@ -478,6 +482,13 @@ void PeerManagerFlowImpl::register_to_dnl_loop(const std::shared_ptr<std::promis
true);
}

void PeerManagerFlowImpl::say_bye(network::IPv4Address to)
{
auto bye = std::make_unique<protocol::ByeMessage>();
bye->request_id = rng_.next<protocol::RequestId>();
protocol_message_handler_->send(to, std::move(bye));
}

void PeerManagerFlowImpl::say_bye_to_peers()
{
if (state_ != State::STOPPING)
Expand All @@ -489,9 +500,13 @@ void PeerManagerFlowImpl::say_bye_to_peers()

for (network::IPv4Address addr : peers_)
{
auto bye = std::make_unique<protocol::ByeMessage>();
bye->request_id = rng_.next<protocol::RequestId>();
protocol_message_handler_->send(addr, std::move(bye));
say_bye(addr);
}

// Say bye to a DNL node
if (auto dnl_addr = dnl_config_->random_pick(); dnl_addr)
{
say_bye(dnl_addr);
}
}

Expand Down
3 changes: 2 additions & 1 deletion sand/libsand/storage/src/filestoragemetadataimpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,8 @@ void FileStorageMetadataImpl::parse_metadata_file()

void FileStorageMetadataImpl::write_metadata_file() const
{
nlohmann::json json_root;
auto json_root = nlohmann::json::array();

for (const auto &[hash, name] : hash_to_name_map_)
{
auto &e = json_root.emplace_back();
Expand Down
1 change: 1 addition & 0 deletions sand/libsand/test/fakenet/inc/public/fakenet.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ class FakeNet
Address set_sender_ptr(SenderPtr sender);
Address set_server_ptr(ServerPtr server);
ServerPtr get_server_ptr(Address addr) const;
void remove_node(Address addr);

private:
struct Node
Expand Down
2 changes: 2 additions & 0 deletions sand/libsand/test/fakenet/inc/public/tcpsenderimpl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ class TCPSenderImpl : public TCPSender
, my_address_ {fake_net_.set_sender_ptr(this)}
{}

~TCPSenderImpl() override;

std::future<bool> send(
IPv4Address to, unsigned short port, const uint8_t *data, size_t len) override;

Expand Down
11 changes: 8 additions & 3 deletions sand/libsand/test/fakenet/inc/public/tcpserverimpl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include "singleton.hpp"
#include "tcpmessagelistener.hpp"
#include "tcpserver.hpp"
#include "threadpool.hpp"

namespace sand::network
{
Expand All @@ -16,9 +17,11 @@ class TCPServerImpl : public TCPServer
template<typename... Ts>
explicit TCPServerImpl(Ts &&...)
: fake_net_ {Singleton<FakeNet>::get()}
{
fake_net_.set_server_ptr(this);
}
, my_address_ {fake_net_.set_server_ptr(this)}
, thread_pool_ {1}
{}

~TCPServerImpl() override;

bool register_listener(std::shared_ptr<TCPMessageListener> listener) override;
bool unregister_listener(std::shared_ptr<TCPMessageListener> listener) override;
Expand All @@ -27,7 +30,9 @@ class TCPServerImpl : public TCPServer

private:
FakeNet & fake_net_;
network::IPv4Address my_address_;
utils::ListenerGroup<TCPMessageListener> listener_group_;
utils::ThreadPool thread_pool_;
};
} // namespace sand::network

Expand Down
6 changes: 6 additions & 0 deletions sand/libsand/test/fakenet/src/fakenet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,9 @@ FakeNet::ServerPtr FakeNet::get_server_ptr(FakeNet::Address addr) const
}
return nullptr;
}

void FakeNet::remove_node(Address addr)
{
std::lock_guard lock {mutex_};
network_map_.erase(addr);
}
5 changes: 5 additions & 0 deletions sand/libsand/test/fakenet/src/tcpsenderimpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@

namespace sand::network
{
TCPSenderImpl::~TCPSenderImpl()
{
fake_net_.remove_node(my_address_);
}

std::future<bool> TCPSenderImpl::send(
IPv4Address to, unsigned short /*port*/, const uint8_t *data, size_t len)
{
Expand Down
25 changes: 24 additions & 1 deletion sand/libsand/test/fakenet/src/tcpserverimpl.cpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,21 @@
#include "tcpserverimpl.hpp"

#include <chrono>
#include <thread>
#include <vector>

namespace sand::network
{
namespace
{
constexpr long network_time_ms {25};
} // namespace

TCPServerImpl::~TCPServerImpl()
{
fake_net_.remove_node(my_address_);
}

bool TCPServerImpl::register_listener(std::shared_ptr<TCPMessageListener> listener)
{
return listener_group_.add(listener);
Expand All @@ -14,6 +28,15 @@ bool TCPServerImpl::unregister_listener(std::shared_ptr<TCPMessageListener> list

void TCPServerImpl::inject_message(sand::network::IPv4Address from, const uint8_t *data, size_t len)
{
listener_group_.notify(&TCPMessageListener::on_message_received, from, data, len);
/*
* Offload to another thread to allow further processing in the node before a reply will be
* received.
*/
thread_pool_.add_job(
[this, from, msg = std::vector<uint8_t>(data, data + len)](const utils::CompletionToken &) {
std::this_thread::sleep_for(std::chrono::milliseconds {network_time_ms});
listener_group_.notify(
&TCPMessageListener::on_message_received, from, msg.data(), msg.size());
});
}
} // namespace sand::network
118 changes: 86 additions & 32 deletions sand/libsand/test/integration_test_fnet/src/sandnode_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,16 @@
#include <fstream>
#include <iostream>
#include <memory>
#include <mutex>
#include <set>
#include <string>
#include <thread>
#include <tuple>
#include <vector>

#include "fakenet.hpp"
#include "iothreadpool.hpp"
#include "messages.hpp"
#include "random.hpp"
#include "sandnode.hpp"
#include "singleton.hpp"
#include "tcpmessagelistener.hpp"
Expand All @@ -25,9 +27,8 @@ using namespace ::sand;
using namespace ::sand::network;
using namespace ::sand::protocol;
using namespace ::sand::utils;
using namespace std::literals::chrono_literals;

namespace fs = std::filesystem;
using namespace ::std::chrono_literals;
namespace fs = ::std::filesystem;

namespace
{
Expand Down Expand Up @@ -65,6 +66,8 @@ class SandNodeTest : public TestWithParam<std::tuple<size_t, size_t>>

void TearDown() override
{
dnl_node_sender_.reset();
dnl_node_server_.reset();
fs_cleanup();
Singleton<FakeNet>::reset();
}
Expand Down Expand Up @@ -107,37 +110,78 @@ class SandNodeTest : public TestWithParam<std::tuple<size_t, size_t>>
dnl_node_server_->register_listener(dnl_node_message_listener_);
}

void handle_message_as_dnl_node(IPv4Address from, const uint8_t *data, size_t len)
void handle_message_as_dnl_node(IPv4Address from, const uint8_t *data, size_t /*len*/)
{
/*
* Offload to another thread to allow further processing in the node before a reply will be
* received.
*/
thread_pool_.add_job([this, from, request = std::vector<uint8_t>(data, data + len)](
const CompletionToken &) {
std::cout << "Got request of size " << request.size() << " from "
<< conversion::to_string(from) << '\n';

std::this_thread::sleep_for(10ms);

if (request[0] == uint8_t(MessageCode::PUSH))
if (data[0] == uint8_t(MessageCode::PUSH))
{
// Add to node address list
{
// Add to node address list
std::lock_guard lock {mutex_};
node_addresses_.push_back(from);
}

// Construct reply
std::vector<uint8_t> reply(11);
reply[0] = uint8_t(MessageCode::REPLY);
std::copy_n(request.data() + 1, 8, reply.data() + 1); // Request ID
reply[9] = uint8_t(StatusCode::OK);
reply[10] = uint8_t(MessageCode::PUSH);
// Construct reply
std::vector<uint8_t> reply(11);
reply[0] = uint8_t(MessageCode::REPLY);
std::copy_n(data + 1, 8, reply.data() + 1); // Request ID
reply[9] = uint8_t(StatusCode::OK);
reply[10] = uint8_t(MessageCode::PUSH);

// Send reply
ASSERT_TRUE(dnl_node_sender_->send(from, 0, reply.data(), reply.size()).get());
// Send reply
ASSERT_TRUE(dnl_node_sender_->send(from, 0, reply.data(), reply.size()).get());
}
else if (data[0] == uint8_t(MessageCode::PULL))
{
size_t addr_count = data[9];
std::vector<IPv4Address> addrs;

// Pick some nodes
{
std::lock_guard lock {mutex_};
addrs = pick_addresses(addr_count, from);
}
else if (request[0] == uint8_t(MessageCode::PULL))
{}
});

// Construct reply
std::vector<uint8_t> reply(12 + 4 * addrs.size());
reply[0] = uint8_t(MessageCode::REPLY);
std::copy_n(data + 1, 8, reply.data() + 1); // Request ID
reply[9] = uint8_t(StatusCode::OK);
reply[10] = uint8_t(MessageCode::PULL);
reply[11] = uint8_t(addrs.size());
for (size_t i = 0; i != addrs.size(); ++i)
{
std::copy_n(
reinterpret_cast<const uint8_t *>(&addrs[i]), 4, reply.data() + 12 + i * 4);
}

// Send reply
ASSERT_TRUE(dnl_node_sender_->send(from, 0, reply.data(), reply.size()).get());
}
else if (data[0] == uint8_t(MessageCode::BYE))
{
std::lock_guard lock {mutex_};
++bye_msg_count_;
}
else
{
FAIL();
}
}

std::vector<IPv4Address> pick_addresses(size_t cnt, IPv4Address exclude)
{
std::set<IPv4Address> result;
cnt = std::min(cnt, node_addresses_.size() - 1);
for (size_t i = 0; i != cnt; ++i)
{
IPv4Address a;
do
{
a = node_addresses_[rng_.next(node_addresses_.size() - 1)];
} while (result.count(a) != 0 || a == exclude);
result.insert(a);
}
return std::vector<IPv4Address>(result.cbegin(), result.cend());
}

static size_t number_of_nodes()
Expand All @@ -158,7 +202,9 @@ class SandNodeTest : public TestWithParam<std::tuple<size_t, size_t>>
IPv4Address dnl_address_;
std::vector<std::unique_ptr<SANDNode>> nodes_;
std::vector<IPv4Address> node_addresses_;
IOThreadPool thread_pool_;
size_t bye_msg_count_ = 0;
Random rng_;
std::mutex mutex_;

static constexpr char const *config_file_name_ = "config.json";

Expand Down Expand Up @@ -201,7 +247,7 @@ class SandNodeTest : public TestWithParam<std::tuple<size_t, size_t>>
};
} // namespace

TEST_P(SandNodeTest, Init_Transfer_Uninit)
TEST_P(SandNodeTest, Start_Transfer_Stop)
{
// Init nodes
for (size_t i = 0; i != number_of_nodes(); ++i)
Expand All @@ -212,12 +258,20 @@ TEST_P(SandNodeTest, Init_Transfer_Uninit)
nodes_.push_back(std::move(node));
}

// Wait for all nodes to fill their peer list
std::this_thread::sleep_for(1s);

// Stop nodes
for (const auto &node : nodes_)
for (auto &node : nodes_)
{
ASSERT_TRUE(node->stop());
}
nodes_.clear();

// Wait for all BYE messages to arrive
std::this_thread::sleep_for(1s);

ASSERT_EQ(number_of_nodes(), bye_msg_count_);
}

INSTANTIATE_TEST_SUITE_P(SandNodeTests, SandNodeTest, Values(std::make_tuple(20, 512)));

0 comments on commit 9d2e402

Please sign in to comment.