Skip to content

Commit

Permalink
Merge pull request #20000 from vespa-engine/vekterli/support-unordere…
Browse files Browse the repository at this point in the history
…d-merge-chaining

Add configurable support for unordered merge forwarding [run-systemtest]
  • Loading branch information
vekterli authored Nov 15, 2021
2 parents db66334 + 31d7bb1 commit c33b0e4
Show file tree
Hide file tree
Showing 42 changed files with 611 additions and 107 deletions.
1 change: 1 addition & 0 deletions storage/src/tests/distributor/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ vespa_add_executable(storage_distributor_gtest_runner_app TEST
mergelimitertest.cpp
mergeoperationtest.cpp
multi_thread_stripe_access_guard_test.cpp
node_supported_features_repo_test.cpp
nodeinfotest.cpp
nodemaintenancestatstrackertest.cpp
operation_sequencer_test.cpp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,9 @@ struct FakeDistributorStripeOperationContext : public DistributorStripeOperation
const BucketGcTimeCalculator::BucketIdHasher& bucket_id_hasher() const override {
abort();
}
const NodeSupportedFeaturesRepo& node_supported_features_repo() const noexcept override {
abort();
}
};

struct BlockingOperationStarterTest : Test {
Expand Down
17 changes: 17 additions & 0 deletions storage/src/tests/distributor/distributor_stripe_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,12 @@ struct DistributorStripeTest : Test, DistributorStripeTestUtil {
configure_stripe(builder);
}

void configure_use_unordered_merge_chaining(bool use_unordered) {
ConfigBuilder builder;
builder.useUnorderedMergeChaining = use_unordered;
configure_stripe(builder);
}

bool scheduler_has_implicitly_clear_priority_on_schedule_set() const noexcept {
return _stripe->_scheduler->implicitly_clear_priority_on_schedule();
}
Expand Down Expand Up @@ -982,4 +988,15 @@ TEST_F(DistributorStripeTest, closing_aborts_gets_started_outside_stripe_thread)
EXPECT_EQ(api::ReturnCode::ABORTED, _sender.reply(0)->getResult().getResult());
}

TEST_F(DistributorStripeTest, use_unordered_merge_chaining_config_is_propagated_to_internal_config)
{
setup_stripe(Redundancy(1), NodeCount(1), "distributor:1 storage:1");

configure_use_unordered_merge_chaining(true);
EXPECT_TRUE(getConfig().use_unordered_merge_chaining());

configure_use_unordered_merge_chaining(false);
EXPECT_FALSE(getConfig().use_unordered_merge_chaining());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@
#include <vespa/storage/distributor/distributor_stripe_component.h>
#include <vespa/storage/distributor/distributormetricsset.h>
#include <vespa/storage/distributor/ideal_state_total_metrics.h>
#include <vespa/storage/distributor/node_supported_features_repo.h>
#include <vespa/vdslib/distribution/distribution.h>
#include <vespa/vespalib/text/stringtokenizer.h>
#include <vespa/vespalib/stllike/hash_map.hpp>

using document::test::makeBucketSpace;
using document::test::makeDocumentBucket;
Expand Down Expand Up @@ -526,6 +528,13 @@ DistributorStripeTestUtil::db_memory_sample_interval() const noexcept {
return _stripe->db_memory_sample_interval();
}

void
DistributorStripeTestUtil::set_node_supported_features(uint16_t node, const NodeSupportedFeatures& features) {
vespalib::hash_map<uint16_t, NodeSupportedFeatures> new_features;
new_features[node] = features;
_stripe->update_node_supported_features_repo(_stripe->node_supported_features_repo().make_union_of(new_features));
}

const lib::Distribution&
DistributorStripeTestUtil::getDistribution() const {
return getBucketSpaceRepo().get(makeBucketSpace()).getDistribution();
Expand Down
2 changes: 2 additions & 0 deletions storage/src/tests/distributor/distributor_stripe_test_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ class DocumentSelectionParser;
class ExternalOperationHandler;
class IdealStateManager;
class IdealStateMetricSet;
class NodeSupportedFeatures;
class Operation;
class StripeBucketDBUpdater;

Expand Down Expand Up @@ -150,6 +151,7 @@ class DistributorStripeTestUtil : public DoneInitializeHandler,
[[nodiscard]] const PendingMessageTracker& pending_message_tracker() const noexcept;
[[nodiscard]] PendingMessageTracker& pending_message_tracker() noexcept;
[[nodiscard]] std::chrono::steady_clock::duration db_memory_sample_interval() const noexcept;
void set_node_supported_features(uint16_t node, const NodeSupportedFeatures& features);

const lib::Distribution& getDistribution() const;

Expand Down
62 changes: 56 additions & 6 deletions storage/src/tests/distributor/mergeoperationtest.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include <tests/common/dummystoragelink.h>

#include <tests/distributor/distributor_stripe_test_util.h>
#include <vespa/document/test/make_bucket_space.h>
#include <vespa/document/test/make_document_bucket.h>
Expand All @@ -12,6 +12,7 @@
#include <vespa/vdslib/distribution/distribution.h>
#include <vespa/vespalib/gtest/gtest.h>
#include <vespa/vespalib/text/stringtokenizer.h>
#include <charconv>

using document::test::makeDocumentBucket;
using document::test::makeBucketSpace;
Expand All @@ -37,6 +38,7 @@ struct MergeOperationTest : Test, DistributorStripeTestUtil {
}

std::shared_ptr<MergeOperation> setup_minimal_merge_op();
std::shared_ptr<MergeOperation> setup_simple_merge_op(const std::vector<uint16_t>& nodes);
std::shared_ptr<MergeOperation> setup_simple_merge_op();
void assert_simple_merge_bucket_command();
void assert_simple_delete_bucket_command();
Expand All @@ -47,13 +49,13 @@ std::shared_ptr<MergeOperation>
MergeOperationTest::setup_minimal_merge_op()
{
document::BucketId bucket_id(16, 1);
auto op = std::make_shared<MergeOperation>(BucketAndNodes(makeDocumentBucket(bucket_id), toVector<uint16_t>(0, 1, 2)));
auto op = std::make_shared<MergeOperation>(BucketAndNodes(makeDocumentBucket(bucket_id), {0, 1, 2}));
op->setIdealStateManager(&getIdealStateManager());
return op;
}

std::shared_ptr<MergeOperation>
MergeOperationTest::setup_simple_merge_op()
MergeOperationTest::setup_simple_merge_op(const std::vector<uint16_t>& nodes)
{
getClock().setAbsoluteTimeInSeconds(10);

Expand All @@ -64,12 +66,18 @@ MergeOperationTest::setup_simple_merge_op()

enable_cluster_state("distributor:1 storage:3");

auto op = std::make_shared<MergeOperation>(BucketAndNodes(makeDocumentBucket(document::BucketId(16, 1)), toVector<uint16_t>(0, 1, 2)));
auto op = std::make_shared<MergeOperation>(BucketAndNodes(makeDocumentBucket(document::BucketId(16, 1)), nodes));
op->setIdealStateManager(&getIdealStateManager());
op->start(_sender, framework::MilliSecTime(0));
return op;
}

std::shared_ptr<MergeOperation>
MergeOperationTest::setup_simple_merge_op()
{
return setup_simple_merge_op({0, 1, 2});
}

void
MergeOperationTest::assert_simple_merge_bucket_command()
{
Expand Down Expand Up @@ -150,8 +158,10 @@ std::string getNodeList(std::string state, uint32_t redundancy, std::string exis
num.erase(pos);
trusted = true;
}
bucketDB[i] = BucketCopy(0, atoi(num.c_str()),
api::BucketInfo(1, 2, 3));
uint16_t node;
[[maybe_unused]] auto [ptr, ec] = std::from_chars(num.data(), num.data() + num.size(), node);
assert(ec == std::errc{});
bucketDB[i] = BucketCopy(0, node, api::BucketInfo(1, 2, 3));
bucketDB[i].setTrusted(trusted);
}
std::vector<MergeMetaData> nodes(st.size());
Expand Down Expand Up @@ -553,4 +563,44 @@ TEST_F(MergeOperationTest, on_throttled_updates_metrics)
EXPECT_EQ(1, metrics->throttled.getValue());
}

TEST_F(MergeOperationTest, unordered_merges_only_sent_iff_config_enabled_and_all_nodes_support_feature) {
setup_stripe(Redundancy(4), NodeCount(4), "distributor:1 storage:4");
NodeSupportedFeatures with_unordered;
with_unordered.unordered_merge_chaining = true;

set_node_supported_features(1, with_unordered);
set_node_supported_features(2, with_unordered);

auto config = make_config();
config->set_use_unordered_merge_chaining(true);
configure_stripe(std::move(config));

// Only nodes {1, 2} support unordered merging; merges should be ordered (sent to lowest index node 1).
setup_simple_merge_op({1, 2, 3}); // Note: these will be re-ordered in ideal state order internally
ASSERT_EQ("MergeBucketCommand(BucketId(0x4000000000000001), to time 10000000, "
"cluster state version: 0, nodes: [2, 1, 3], chain: [], "
"reasons to start: ) => 1",
_sender.getLastCommand(true));

// All involved nodes support unordered merging; merges should be unordered (sent to ideal node 2)
setup_simple_merge_op({1, 2});
ASSERT_EQ("MergeBucketCommand(BucketId(0x4000000000000001), to time 10000001, "
"cluster state version: 0, nodes: [2, 1], chain: [] (unordered forwarding), "
"reasons to start: ) => 2",
_sender.getLastCommand(true));

_sender.clear();

config = make_config();
config->set_use_unordered_merge_chaining(false);
configure_stripe(std::move(config));

// If config is not enabled, should send ordered even if nodes support the feature.
setup_simple_merge_op({2, 1});
ASSERT_EQ("MergeBucketCommand(BucketId(0x4000000000000001), to time 10000002, "
"cluster state version: 0, nodes: [2, 1], chain: [], "
"reasons to start: ) => 1",
_sender.getLastCommand(true));
}

} // storage::distributor
4 changes: 4 additions & 0 deletions storage/src/tests/distributor/mock_tickable_stripe.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ struct MockTickableStripe : TickableStripe {
void update_read_snapshot_after_activation(const lib::ClusterStateBundle&) override { abort(); }
void clear_read_only_bucket_repo_databases() override { abort(); }

void update_node_supported_features_repo(std::shared_ptr<const NodeSupportedFeaturesRepo>) override {
abort();
}

void report_bucket_db_status(document::BucketSpace, std::ostream&) const override { abort(); }
StripeAccessGuard::PendingOperationStats pending_operation_stats() const override { abort(); }
void report_single_bucket_requests(vespalib::xml::XmlOutputStream&) const override { abort(); }
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.

#include <vespa/storage/distributor/node_supported_features_repo.h>
#include <vespa/vespalib/stllike/hash_map.hpp>
#include <vespa/vespalib/gtest/gtest.h>

using namespace ::testing;

namespace storage::distributor {

struct NodeSupportedFeaturesRepoTest : Test {
using FeatureMap = vespalib::hash_map<uint16_t, NodeSupportedFeatures>;
NodeSupportedFeaturesRepo _repo;

static NodeSupportedFeatures set_features() noexcept {
NodeSupportedFeatures f;
f.unordered_merge_chaining = true;
return f;
}

static NodeSupportedFeatures unset_features() noexcept {
return {};
}
};

TEST_F(NodeSupportedFeaturesRepoTest, feature_set_is_empty_by_default) {
EXPECT_EQ(_repo.node_supported_features(0), unset_features());
EXPECT_EQ(_repo.node_supported_features(12345), unset_features());
}

TEST_F(NodeSupportedFeaturesRepoTest, make_union_of_can_add_new_feature_mapping) {
FeatureMap fm;
fm[1] = set_features();
fm[60] = set_features();
auto new_repo = _repo.make_union_of(fm);
EXPECT_EQ(new_repo->node_supported_features(0), unset_features());
EXPECT_EQ(new_repo->node_supported_features(1), set_features());
EXPECT_EQ(new_repo->node_supported_features(60), set_features());
}

TEST_F(NodeSupportedFeaturesRepoTest, make_union_of_updates_existing_feature_mappings) {
FeatureMap fm;
fm[1] = set_features();
fm[60] = set_features();
auto new_repo = _repo.make_union_of(fm);
fm[1] = unset_features();
new_repo = new_repo->make_union_of(fm);
EXPECT_EQ(new_repo->node_supported_features(1), unset_features());
EXPECT_EQ(new_repo->node_supported_features(60), set_features());
}

}
53 changes: 51 additions & 2 deletions storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <vespa/storage/distributor/top_level_bucket_db_updater.h>
#include <vespa/storage/distributor/bucket_space_distribution_context.h>
#include <vespa/storage/distributor/distributormetricsset.h>
#include <vespa/storage/distributor/node_supported_features_repo.h>
#include <vespa/storage/distributor/pending_bucket_space_db_transition.h>
#include <vespa/storage/distributor/outdated_nodes_map.h>
#include <vespa/storage/storageutil/distributorstatecache.h>
Expand Down Expand Up @@ -119,6 +120,21 @@ class TopLevelBucketDBUpdaterTest : public Test,
invalid_bucket_count));
}

void fake_bucket_reply(const lib::ClusterState &state,
const api::StorageCommand &cmd,
uint32_t bucket_count,
const std::function<void(api::RequestBucketInfoReply&)>& reply_decorator)
{
ASSERT_EQ(cmd.getType(), MessageType::REQUESTBUCKETINFO);
const api::StorageMessageAddress& address(*cmd.getAddress());
auto reply = make_fake_bucket_reply(state,
dynamic_cast<const RequestBucketInfoCommand &>(cmd),
address.getIndex(),
bucket_count, 0);
reply_decorator(*reply);
bucket_db_updater().onRequestBucketInfoReply(reply);
}

void send_fake_reply_for_single_bucket_request(
const api::RequestBucketInfoCommand& rbi)
{
Expand Down Expand Up @@ -232,7 +248,7 @@ class TopLevelBucketDBUpdaterTest : public Test,
}
}

api::StorageMessageAddress storage_address(uint16_t node) {
static api::StorageMessageAddress storage_address(uint16_t node) {
static vespalib::string _storage("storage");
return api::StorageMessageAddress(&_storage, lib::NodeType::STORAGE, node);
}
Expand Down Expand Up @@ -1299,7 +1315,7 @@ TEST_F(TopLevelBucketDBUpdaterTest, merge_reply_node_down_after_request_sent) {
add_nodes_to_stripe_bucket_db(bucket_id, "0=1234,1=1234,2=1234");

for (uint32_t i = 0; i < 3; ++i) {
nodes.push_back(api::MergeBucketCommand::Node(i));
nodes.emplace_back(i);
}

api::MergeBucketCommand cmd(makeDocumentBucket(bucket_id), nodes, 0);
Expand Down Expand Up @@ -2662,4 +2678,37 @@ TEST_F(BucketDBUpdaterSnapshotTest, snapshot_is_unroutable_if_stale_reads_disabl
EXPECT_FALSE(def_rs.is_routable());
}

TEST_F(BucketDBUpdaterSnapshotTest, node_feature_sets_are_aggregated_from_nodes_and_propagated_to_stripes) {
lib::ClusterState state("distributor:1 storage:3");
set_cluster_state(state);
uint32_t expected_msgs = message_count(3), dummy_buckets_to_return = 1;

// Known feature sets are initially empty.
auto stripes = distributor_stripes();
for (auto* s : stripes) {
for (uint16_t i : {0, 1, 2}) {
EXPECT_FALSE(s->node_supported_features_repo().node_supported_features(i).unordered_merge_chaining);
}
}

ASSERT_EQ(expected_msgs, _sender.commands().size());
for (uint32_t i = 0; i < _sender.commands().size(); i++) {
ASSERT_NO_FATAL_FAILURE(fake_bucket_reply(state, *_sender.command(i),
dummy_buckets_to_return, [i](auto& reply) noexcept {
// Pretend nodes 1 and 2 are on a shiny version with unordered merge chaining supported.
// Node 0 does not support the fanciness.
if (i > 0) {
reply.supported_node_features().unordered_merge_chaining = true;
}
}));
}

// Node features should be propagated to all stripes
for (auto* s : stripes) {
EXPECT_FALSE(s->node_supported_features_repo().node_supported_features(0).unordered_merge_chaining);
EXPECT_TRUE(s->node_supported_features_repo().node_supported_features(1).unordered_merge_chaining);
EXPECT_TRUE(s->node_supported_features_repo().node_supported_features(2).unordered_merge_chaining);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -115,13 +115,13 @@ TopLevelDistributorTestUtil::handle_top_level_message(const std::shared_ptr<api:
void
TopLevelDistributorTestUtil::close()
{
_component.reset(0);
if (_distributor.get()) {
_component.reset();
if (_distributor) {
_stripe_pool->stop_and_join(); // Must be tagged as stopped prior to onClose
_distributor->onClose();
}
_sender.clear();
_node.reset(0);
_node.reset();
_config = getStandardConfig(false);
}

Expand Down
Loading

0 comments on commit c33b0e4

Please sign in to comment.