diff --git a/storage/src/tests/distributor/CMakeLists.txt b/storage/src/tests/distributor/CMakeLists.txt index 7348cfc328b2..bee7650aebd3 100644 --- a/storage/src/tests/distributor/CMakeLists.txt +++ b/storage/src/tests/distributor/CMakeLists.txt @@ -25,6 +25,7 @@ vespa_add_executable(storage_distributor_gtest_runner_app TEST mergelimitertest.cpp mergeoperationtest.cpp multi_thread_stripe_access_guard_test.cpp + node_supported_features_repo_test.cpp nodeinfotest.cpp nodemaintenancestatstrackertest.cpp operation_sequencer_test.cpp diff --git a/storage/src/tests/distributor/blockingoperationstartertest.cpp b/storage/src/tests/distributor/blockingoperationstartertest.cpp index 15aada37c9b3..7c97c962a97a 100644 --- a/storage/src/tests/distributor/blockingoperationstartertest.cpp +++ b/storage/src/tests/distributor/blockingoperationstartertest.cpp @@ -100,6 +100,9 @@ struct FakeDistributorStripeOperationContext : public DistributorStripeOperation const BucketGcTimeCalculator::BucketIdHasher& bucket_id_hasher() const override { abort(); } + const NodeSupportedFeaturesRepo& node_supported_features_repo() const noexcept override { + abort(); + } }; struct BlockingOperationStarterTest : Test { diff --git a/storage/src/tests/distributor/distributor_stripe_test.cpp b/storage/src/tests/distributor/distributor_stripe_test.cpp index 902dd6454f11..8c2ebc983fa6 100644 --- a/storage/src/tests/distributor/distributor_stripe_test.cpp +++ b/storage/src/tests/distributor/distributor_stripe_test.cpp @@ -185,6 +185,12 @@ struct DistributorStripeTest : Test, DistributorStripeTestUtil { configure_stripe(builder); } + void configure_use_unordered_merge_chaining(bool use_unordered) { + ConfigBuilder builder; + builder.useUnorderedMergeChaining = use_unordered; + configure_stripe(builder); + } + bool scheduler_has_implicitly_clear_priority_on_schedule_set() const noexcept { return _stripe->_scheduler->implicitly_clear_priority_on_schedule(); } @@ -982,4 +988,15 @@ TEST_F(DistributorStripeTest, closing_aborts_gets_started_outside_stripe_thread) EXPECT_EQ(api::ReturnCode::ABORTED, _sender.reply(0)->getResult().getResult()); } +TEST_F(DistributorStripeTest, use_unordered_merge_chaining_config_is_propagated_to_internal_config) +{ + setup_stripe(Redundancy(1), NodeCount(1), "distributor:1 storage:1"); + + configure_use_unordered_merge_chaining(true); + EXPECT_TRUE(getConfig().use_unordered_merge_chaining()); + + configure_use_unordered_merge_chaining(false); + EXPECT_FALSE(getConfig().use_unordered_merge_chaining()); +} + } diff --git a/storage/src/tests/distributor/distributor_stripe_test_util.cpp b/storage/src/tests/distributor/distributor_stripe_test_util.cpp index c5c51e64e687..b96b2dda1cb0 100644 --- a/storage/src/tests/distributor/distributor_stripe_test_util.cpp +++ b/storage/src/tests/distributor/distributor_stripe_test_util.cpp @@ -9,8 +9,10 @@ #include #include #include +#include #include #include +#include using document::test::makeBucketSpace; using document::test::makeDocumentBucket; @@ -526,6 +528,13 @@ DistributorStripeTestUtil::db_memory_sample_interval() const noexcept { return _stripe->db_memory_sample_interval(); } +void +DistributorStripeTestUtil::set_node_supported_features(uint16_t node, const NodeSupportedFeatures& features) { + vespalib::hash_map new_features; + new_features[node] = features; + _stripe->update_node_supported_features_repo(_stripe->node_supported_features_repo().make_union_of(new_features)); +} + const lib::Distribution& DistributorStripeTestUtil::getDistribution() const { return getBucketSpaceRepo().get(makeBucketSpace()).getDistribution(); diff --git a/storage/src/tests/distributor/distributor_stripe_test_util.h b/storage/src/tests/distributor/distributor_stripe_test_util.h index b1e90821e3b0..3226c16aba39 100644 --- a/storage/src/tests/distributor/distributor_stripe_test_util.h +++ b/storage/src/tests/distributor/distributor_stripe_test_util.h @@ -26,6 +26,7 @@ class DocumentSelectionParser; class ExternalOperationHandler; class IdealStateManager; class IdealStateMetricSet; +class NodeSupportedFeatures; class Operation; class StripeBucketDBUpdater; @@ -150,6 +151,7 @@ class DistributorStripeTestUtil : public DoneInitializeHandler, [[nodiscard]] const PendingMessageTracker& pending_message_tracker() const noexcept; [[nodiscard]] PendingMessageTracker& pending_message_tracker() noexcept; [[nodiscard]] std::chrono::steady_clock::duration db_memory_sample_interval() const noexcept; + void set_node_supported_features(uint16_t node, const NodeSupportedFeatures& features); const lib::Distribution& getDistribution() const; diff --git a/storage/src/tests/distributor/mergeoperationtest.cpp b/storage/src/tests/distributor/mergeoperationtest.cpp index 65ee52541935..54bd06c98e0d 100644 --- a/storage/src/tests/distributor/mergeoperationtest.cpp +++ b/storage/src/tests/distributor/mergeoperationtest.cpp @@ -1,5 +1,5 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include + #include #include #include @@ -12,6 +12,7 @@ #include #include #include +#include using document::test::makeDocumentBucket; using document::test::makeBucketSpace; @@ -37,6 +38,7 @@ struct MergeOperationTest : Test, DistributorStripeTestUtil { } std::shared_ptr setup_minimal_merge_op(); + std::shared_ptr setup_simple_merge_op(const std::vector& nodes); std::shared_ptr setup_simple_merge_op(); void assert_simple_merge_bucket_command(); void assert_simple_delete_bucket_command(); @@ -47,13 +49,13 @@ std::shared_ptr MergeOperationTest::setup_minimal_merge_op() { document::BucketId bucket_id(16, 1); - auto op = std::make_shared(BucketAndNodes(makeDocumentBucket(bucket_id), toVector(0, 1, 2))); + auto op = std::make_shared(BucketAndNodes(makeDocumentBucket(bucket_id), {0, 1, 2})); op->setIdealStateManager(&getIdealStateManager()); return op; } std::shared_ptr -MergeOperationTest::setup_simple_merge_op() +MergeOperationTest::setup_simple_merge_op(const std::vector& nodes) { getClock().setAbsoluteTimeInSeconds(10); @@ -64,12 +66,18 @@ MergeOperationTest::setup_simple_merge_op() enable_cluster_state("distributor:1 storage:3"); - auto op = std::make_shared(BucketAndNodes(makeDocumentBucket(document::BucketId(16, 1)), toVector(0, 1, 2))); + auto op = std::make_shared(BucketAndNodes(makeDocumentBucket(document::BucketId(16, 1)), nodes)); op->setIdealStateManager(&getIdealStateManager()); op->start(_sender, framework::MilliSecTime(0)); return op; } +std::shared_ptr +MergeOperationTest::setup_simple_merge_op() +{ + return setup_simple_merge_op({0, 1, 2}); +} + void MergeOperationTest::assert_simple_merge_bucket_command() { @@ -150,8 +158,10 @@ std::string getNodeList(std::string state, uint32_t redundancy, std::string exis num.erase(pos); trusted = true; } - bucketDB[i] = BucketCopy(0, atoi(num.c_str()), - api::BucketInfo(1, 2, 3)); + uint16_t node; + [[maybe_unused]] auto [ptr, ec] = std::from_chars(num.data(), num.data() + num.size(), node); + assert(ec == std::errc{}); + bucketDB[i] = BucketCopy(0, node, api::BucketInfo(1, 2, 3)); bucketDB[i].setTrusted(trusted); } std::vector nodes(st.size()); @@ -553,4 +563,44 @@ TEST_F(MergeOperationTest, on_throttled_updates_metrics) EXPECT_EQ(1, metrics->throttled.getValue()); } +TEST_F(MergeOperationTest, unordered_merges_only_sent_iff_config_enabled_and_all_nodes_support_feature) { + setup_stripe(Redundancy(4), NodeCount(4), "distributor:1 storage:4"); + NodeSupportedFeatures with_unordered; + with_unordered.unordered_merge_chaining = true; + + set_node_supported_features(1, with_unordered); + set_node_supported_features(2, with_unordered); + + auto config = make_config(); + config->set_use_unordered_merge_chaining(true); + configure_stripe(std::move(config)); + + // Only nodes {1, 2} support unordered merging; merges should be ordered (sent to lowest index node 1). + setup_simple_merge_op({1, 2, 3}); // Note: these will be re-ordered in ideal state order internally + ASSERT_EQ("MergeBucketCommand(BucketId(0x4000000000000001), to time 10000000, " + "cluster state version: 0, nodes: [2, 1, 3], chain: [], " + "reasons to start: ) => 1", + _sender.getLastCommand(true)); + + // All involved nodes support unordered merging; merges should be unordered (sent to ideal node 2) + setup_simple_merge_op({1, 2}); + ASSERT_EQ("MergeBucketCommand(BucketId(0x4000000000000001), to time 10000001, " + "cluster state version: 0, nodes: [2, 1], chain: [] (unordered forwarding), " + "reasons to start: ) => 2", + _sender.getLastCommand(true)); + + _sender.clear(); + + config = make_config(); + config->set_use_unordered_merge_chaining(false); + configure_stripe(std::move(config)); + + // If config is not enabled, should send ordered even if nodes support the feature. + setup_simple_merge_op({2, 1}); + ASSERT_EQ("MergeBucketCommand(BucketId(0x4000000000000001), to time 10000002, " + "cluster state version: 0, nodes: [2, 1], chain: [], " + "reasons to start: ) => 1", + _sender.getLastCommand(true)); +} + } // storage::distributor diff --git a/storage/src/tests/distributor/mock_tickable_stripe.h b/storage/src/tests/distributor/mock_tickable_stripe.h index 38fc0c599a2a..ec2f978c0293 100644 --- a/storage/src/tests/distributor/mock_tickable_stripe.h +++ b/storage/src/tests/distributor/mock_tickable_stripe.h @@ -33,6 +33,10 @@ struct MockTickableStripe : TickableStripe { void update_read_snapshot_after_activation(const lib::ClusterStateBundle&) override { abort(); } void clear_read_only_bucket_repo_databases() override { abort(); } + void update_node_supported_features_repo(std::shared_ptr) override { + abort(); + } + void report_bucket_db_status(document::BucketSpace, std::ostream&) const override { abort(); } StripeAccessGuard::PendingOperationStats pending_operation_stats() const override { abort(); } void report_single_bucket_requests(vespalib::xml::XmlOutputStream&) const override { abort(); } diff --git a/storage/src/tests/distributor/node_supported_features_repo_test.cpp b/storage/src/tests/distributor/node_supported_features_repo_test.cpp new file mode 100644 index 000000000000..990e0fc50a39 --- /dev/null +++ b/storage/src/tests/distributor/node_supported_features_repo_test.cpp @@ -0,0 +1,52 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include +#include +#include + +using namespace ::testing; + +namespace storage::distributor { + +struct NodeSupportedFeaturesRepoTest : Test { + using FeatureMap = vespalib::hash_map; + NodeSupportedFeaturesRepo _repo; + + static NodeSupportedFeatures set_features() noexcept { + NodeSupportedFeatures f; + f.unordered_merge_chaining = true; + return f; + } + + static NodeSupportedFeatures unset_features() noexcept { + return {}; + } +}; + +TEST_F(NodeSupportedFeaturesRepoTest, feature_set_is_empty_by_default) { + EXPECT_EQ(_repo.node_supported_features(0), unset_features()); + EXPECT_EQ(_repo.node_supported_features(12345), unset_features()); +} + +TEST_F(NodeSupportedFeaturesRepoTest, make_union_of_can_add_new_feature_mapping) { + FeatureMap fm; + fm[1] = set_features(); + fm[60] = set_features(); + auto new_repo = _repo.make_union_of(fm); + EXPECT_EQ(new_repo->node_supported_features(0), unset_features()); + EXPECT_EQ(new_repo->node_supported_features(1), set_features()); + EXPECT_EQ(new_repo->node_supported_features(60), set_features()); +} + +TEST_F(NodeSupportedFeaturesRepoTest, make_union_of_updates_existing_feature_mappings) { + FeatureMap fm; + fm[1] = set_features(); + fm[60] = set_features(); + auto new_repo = _repo.make_union_of(fm); + fm[1] = unset_features(); + new_repo = new_repo->make_union_of(fm); + EXPECT_EQ(new_repo->node_supported_features(1), unset_features()); + EXPECT_EQ(new_repo->node_supported_features(60), set_features()); +} + +} diff --git a/storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp b/storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp index fe8a607c9ae4..3ed5e9f4a8df 100644 --- a/storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp +++ b/storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp @@ -4,6 +4,7 @@ #include #include #include +#include #include #include #include @@ -119,6 +120,21 @@ class TopLevelBucketDBUpdaterTest : public Test, invalid_bucket_count)); } + void fake_bucket_reply(const lib::ClusterState &state, + const api::StorageCommand &cmd, + uint32_t bucket_count, + const std::function& reply_decorator) + { + ASSERT_EQ(cmd.getType(), MessageType::REQUESTBUCKETINFO); + const api::StorageMessageAddress& address(*cmd.getAddress()); + auto reply = make_fake_bucket_reply(state, + dynamic_cast(cmd), + address.getIndex(), + bucket_count, 0); + reply_decorator(*reply); + bucket_db_updater().onRequestBucketInfoReply(reply); + } + void send_fake_reply_for_single_bucket_request( const api::RequestBucketInfoCommand& rbi) { @@ -232,7 +248,7 @@ class TopLevelBucketDBUpdaterTest : public Test, } } - api::StorageMessageAddress storage_address(uint16_t node) { + static api::StorageMessageAddress storage_address(uint16_t node) { static vespalib::string _storage("storage"); return api::StorageMessageAddress(&_storage, lib::NodeType::STORAGE, node); } @@ -1299,7 +1315,7 @@ TEST_F(TopLevelBucketDBUpdaterTest, merge_reply_node_down_after_request_sent) { add_nodes_to_stripe_bucket_db(bucket_id, "0=1234,1=1234,2=1234"); for (uint32_t i = 0; i < 3; ++i) { - nodes.push_back(api::MergeBucketCommand::Node(i)); + nodes.emplace_back(i); } api::MergeBucketCommand cmd(makeDocumentBucket(bucket_id), nodes, 0); @@ -2662,4 +2678,37 @@ TEST_F(BucketDBUpdaterSnapshotTest, snapshot_is_unroutable_if_stale_reads_disabl EXPECT_FALSE(def_rs.is_routable()); } +TEST_F(BucketDBUpdaterSnapshotTest, node_feature_sets_are_aggregated_from_nodes_and_propagated_to_stripes) { + lib::ClusterState state("distributor:1 storage:3"); + set_cluster_state(state); + uint32_t expected_msgs = message_count(3), dummy_buckets_to_return = 1; + + // Known feature sets are initially empty. + auto stripes = distributor_stripes(); + for (auto* s : stripes) { + for (uint16_t i : {0, 1, 2}) { + EXPECT_FALSE(s->node_supported_features_repo().node_supported_features(i).unordered_merge_chaining); + } + } + + ASSERT_EQ(expected_msgs, _sender.commands().size()); + for (uint32_t i = 0; i < _sender.commands().size(); i++) { + ASSERT_NO_FATAL_FAILURE(fake_bucket_reply(state, *_sender.command(i), + dummy_buckets_to_return, [i](auto& reply) noexcept { + // Pretend nodes 1 and 2 are on a shiny version with unordered merge chaining supported. + // Node 0 does not support the fanciness. + if (i > 0) { + reply.supported_node_features().unordered_merge_chaining = true; + } + })); + } + + // Node features should be propagated to all stripes + for (auto* s : stripes) { + EXPECT_FALSE(s->node_supported_features_repo().node_supported_features(0).unordered_merge_chaining); + EXPECT_TRUE(s->node_supported_features_repo().node_supported_features(1).unordered_merge_chaining); + EXPECT_TRUE(s->node_supported_features_repo().node_supported_features(2).unordered_merge_chaining); + } +} + } diff --git a/storage/src/tests/distributor/top_level_distributor_test_util.cpp b/storage/src/tests/distributor/top_level_distributor_test_util.cpp index 636a09d1f6e9..2a61141865aa 100644 --- a/storage/src/tests/distributor/top_level_distributor_test_util.cpp +++ b/storage/src/tests/distributor/top_level_distributor_test_util.cpp @@ -115,13 +115,13 @@ TopLevelDistributorTestUtil::handle_top_level_message(const std::shared_ptrstop_and_join(); // Must be tagged as stopped prior to onClose _distributor->onClose(); } _sender.clear(); - _node.reset(0); + _node.reset(); _config = getStandardConfig(false); } diff --git a/storage/src/tests/storageserver/mergethrottlertest.cpp b/storage/src/tests/storageserver/mergethrottlertest.cpp index e8f8e425af4f..0f844ab6b4fa 100644 --- a/storage/src/tests/storageserver/mergethrottlertest.cpp +++ b/storage/src/tests/storageserver/mergethrottlertest.cpp @@ -52,15 +52,18 @@ struct MergeBuilder { ~MergeBuilder(); MergeBuilder& nodes(uint16_t n0) { + _nodes.clear(); _nodes.push_back(n0); return *this; } MergeBuilder& nodes(uint16_t n0, uint16_t n1) { + _nodes.clear(); _nodes.push_back(n0); _nodes.push_back(n1); return *this; } MergeBuilder& nodes(uint16_t n0, uint16_t n1, uint16_t n2) { + _nodes.clear(); _nodes.push_back(n0); _nodes.push_back(n1); _nodes.push_back(n2); @@ -146,7 +149,8 @@ struct MergeThrottlerTest : Test { api::ReturnCode::Result expectedResultCode); void fill_throttler_queue_with_n_commands(uint16_t throttler_index, size_t queued_count); - void receive_chained_merge_with_full_queue(bool disable_queue_limits); + void fill_up_throttler_active_window_and_queue(uint16_t node_idx); + void receive_chained_merge_with_full_queue(bool disable_queue_limits, bool unordered_fwd = false); std::shared_ptr peek_throttler_queue_top(size_t throttler_idx) { auto& queue = _throttlers[throttler_idx]->getMergeQueue(); @@ -1197,7 +1201,7 @@ TEST_F(MergeThrottlerTest, busy_returned_on_full_queue_for_merges_sent_from_dist } void -MergeThrottlerTest::receive_chained_merge_with_full_queue(bool disable_queue_limits) +MergeThrottlerTest::receive_chained_merge_with_full_queue(bool disable_queue_limits, bool unordered_fwd) { // Note: uses node with index 1 to not be the first node in chain _throttlers[1]->set_disable_queue_limits_for_chained_merges(disable_queue_limits); @@ -1218,10 +1222,15 @@ MergeThrottlerTest::receive_chained_merge_with_full_queue(bool disable_queue_lim // Send down another merge with non-empty chain. It should _not_ be busy bounced // (if limits disabled) as it has already been accepted into another node's merge window. { - std::vector nodes({{0}, {1}, {2}}); + std::vector nodes({{2}, {1}, {0}}); auto cmd = std::make_shared( makeDocumentBucket(BucketId(32, 0xf000baaa)), nodes, 1234, 1); - cmd->setChain(std::vector({0})); // Forwarded from node 0 + if (!unordered_fwd) { + cmd->setChain(std::vector({0})); // Forwarded from node 0 + } else { + cmd->setChain(std::vector({2})); // Forwarded from node 2, i.e. _not_ the lowest index + } + cmd->set_use_unordered_forwarding(unordered_fwd); _topLinks[1]->sendDown(cmd); } } @@ -1249,11 +1258,34 @@ TEST_F(MergeThrottlerTest, forwarded_merge_has_higher_pri_when_chain_limits_disa EXPECT_FALSE(highest_pri_merge->getChain().empty()); // Should be the forwarded merge } +TEST_F(MergeThrottlerTest, forwarded_unordered_merge_is_directly_accepted_into_active_window) { + // Unordered forwarding is orthogonal to disabled chain limits config, so we implicitly test that too. + ASSERT_NO_FATAL_FAILURE(receive_chained_merge_with_full_queue(true, true)); + + // Unordered merge is immediately forwarded to the next node + _topLinks[1]->waitForMessage(MessageType::MERGEBUCKET, _messageWaitTime); + auto fwd = std::dynamic_pointer_cast( + _topLinks[1]->getAndRemoveMessage(MessageType::MERGEBUCKET)); + ASSERT_TRUE(fwd); + EXPECT_TRUE(fwd->use_unordered_forwarding()); + EXPECT_EQ(fwd->getChain(), std::vector({2, 1})); +} + +TEST_F(MergeThrottlerTest, non_forwarded_unordered_merge_is_enqueued_if_active_window_full) +{ + fill_throttler_queue_with_n_commands(1, 0); // Fill active window entirely + { + std::vector nodes({{1}, {2}, {0}}); + auto cmd = std::make_shared( + makeDocumentBucket(BucketId(32, 0xf000baaa)), nodes, 1234, 1); + cmd->set_use_unordered_forwarding(true); + _topLinks[1]->sendDown(cmd); + } + waitUntilMergeQueueIs(*_throttlers[1], 1, _messageWaitTime); // Should be in queue, not active window +} + TEST_F(MergeThrottlerTest, broken_cycle) { - std::vector nodes; - nodes.push_back(1); - nodes.push_back(0); - nodes.push_back(2); + std::vector nodes({1, 0, 2}); { std::vector chain; chain.push_back(0); @@ -1268,10 +1300,7 @@ TEST_F(MergeThrottlerTest, broken_cycle) { // Send cycled merge which will be executed { - std::vector chain; - chain.push_back(0); - chain.push_back(1); - chain.push_back(2); + std::vector chain({0, 1, 2}); auto cmd = std::make_shared( makeDocumentBucket(BucketId(32, 0xfeef00)), nodes, 1234, 1, chain); _topLinks[1]->sendDown(cmd); @@ -1425,9 +1454,10 @@ TEST_F(MergeThrottlerTest, source_only_merges_are_not_affected_by_backpressure) void MergeThrottlerTest::fill_throttler_queue_with_n_commands(uint16_t throttler_index, size_t queued_count) { size_t max_pending = _throttlers[throttler_index]->getThrottlePolicy().getMaxPendingCount(); for (size_t i = 0; i < max_pending + queued_count; ++i) { - _topLinks[throttler_index]->sendDown(MergeBuilder(document::BucketId(16, i)).create()); + _topLinks[throttler_index]->sendDown(MergeBuilder(document::BucketId(16, i)) + .nodes(throttler_index, throttler_index + 1) + .create()); } - // Wait till we have max_pending merge forwards and queued_count enqueued. _topLinks[throttler_index]->waitForMessages(max_pending, _messageWaitTime); waitUntilMergeQueueIs(*_throttlers[throttler_index], queued_count, _messageWaitTime); diff --git a/storage/src/vespa/storage/config/distributorconfiguration.cpp b/storage/src/vespa/storage/config/distributorconfiguration.cpp index a23d00ee6a34..8a40899165f1 100644 --- a/storage/src/vespa/storage/config/distributorconfiguration.cpp +++ b/storage/src/vespa/storage/config/distributorconfiguration.cpp @@ -50,6 +50,7 @@ DistributorConfiguration::DistributorConfiguration(StorageComponent& component) _prioritize_global_bucket_merges(true), _enable_revert(true), _implicitly_clear_priority_on_schedule(false), + _use_unordered_merge_chaining(false), _minimumReplicaCountingMode(ReplicaCountingMode::TRUSTED) { } @@ -171,6 +172,7 @@ DistributorConfiguration::configure(const vespa::config::content::core::StorDist _max_activation_inhibited_out_of_sync_groups = config.maxActivationInhibitedOutOfSyncGroups; _enable_revert = config.enableRevert; _implicitly_clear_priority_on_schedule = config.implicitlyClearBucketPriorityOnSchedule; + _use_unordered_merge_chaining = config.useUnorderedMergeChaining; _minimumReplicaCountingMode = config.minimumReplicaCountingMode; diff --git a/storage/src/vespa/storage/config/distributorconfiguration.h b/storage/src/vespa/storage/config/distributorconfiguration.h index 7b4e082d1ed8..ea1aca171167 100644 --- a/storage/src/vespa/storage/config/distributorconfiguration.h +++ b/storage/src/vespa/storage/config/distributorconfiguration.h @@ -267,6 +267,12 @@ class DistributorConfiguration { [[nodiscard]] bool implicitly_clear_priority_on_schedule() const noexcept { return _implicitly_clear_priority_on_schedule; } + void set_use_unordered_merge_chaining(bool unordered) noexcept { + _use_unordered_merge_chaining = unordered; + } + [[nodiscard]] bool use_unordered_merge_chaining() const noexcept { + return _use_unordered_merge_chaining; + } uint32_t num_distributor_stripes() const noexcept { return _num_distributor_stripes; } @@ -324,6 +330,7 @@ class DistributorConfiguration { bool _prioritize_global_bucket_merges; bool _enable_revert; bool _implicitly_clear_priority_on_schedule; + bool _use_unordered_merge_chaining; DistrConfig::MinimumReplicaCountingMode _minimumReplicaCountingMode; diff --git a/storage/src/vespa/storage/config/stor-distributormanager.def b/storage/src/vespa/storage/config/stor-distributormanager.def index 8a9fdf748026..8021075faa39 100644 --- a/storage/src/vespa/storage/config/stor-distributormanager.def +++ b/storage/src/vespa/storage/config/stor-distributormanager.def @@ -286,3 +286,10 @@ num_distributor_stripes int default=0 restart ## bucket due to being blocked by concurrent operations. This avoids potential head-of-line ## blocking of later buckets in the priority database. implicitly_clear_bucket_priority_on_schedule bool default=false + +## Enables sending merges that are forwarded between content nodes in ideal state node key +## order, instead of strictly increasing node key order (which is the default). +## Even if this config is set to true, unordered merges will only be sent if _all_ nodes +## involved in a given merge have previously reported (as part of bucket info fetching) +## that they support the unordered merge feature. +use_unordered_merge_chaining bool default=false diff --git a/storage/src/vespa/storage/distributor/CMakeLists.txt b/storage/src/vespa/storage/distributor/CMakeLists.txt index 52171406ebf5..470bfb69abb5 100644 --- a/storage/src/vespa/storage/distributor/CMakeLists.txt +++ b/storage/src/vespa/storage/distributor/CMakeLists.txt @@ -32,6 +32,7 @@ vespa_add_library(storage_distributor messagetracker.cpp min_replica_provider.cpp multi_threaded_stripe_access_guard.cpp + node_supported_features_repo.cpp nodeinfo.cpp operation_routing_snapshot.cpp operation_sequencer.cpp diff --git a/storage/src/vespa/storage/distributor/distributor_operation_context.h b/storage/src/vespa/storage/distributor/distributor_operation_context.h index 934c5e364d82..bceb4ed13773 100644 --- a/storage/src/vespa/storage/distributor/distributor_operation_context.h +++ b/storage/src/vespa/storage/distributor/distributor_operation_context.h @@ -17,7 +17,7 @@ class DistributorBucketSpaceRepo; */ class DistributorOperationContext { public: - virtual ~DistributorOperationContext() {} + virtual ~DistributorOperationContext() = default; virtual api::Timestamp generate_unique_timestamp() = 0; virtual const BucketSpaceStateMap& bucket_space_states() const noexcept = 0; virtual BucketSpaceStateMap& bucket_space_states() noexcept = 0; diff --git a/storage/src/vespa/storage/distributor/distributor_stripe.cpp b/storage/src/vespa/storage/distributor/distributor_stripe.cpp index 9f565686216c..50c70306d924 100644 --- a/storage/src/vespa/storage/distributor/distributor_stripe.cpp +++ b/storage/src/vespa/storage/distributor/distributor_stripe.cpp @@ -6,6 +6,7 @@ #include "distributor_stripe.h" #include "distributormetricsset.h" #include "idealstatemetricsset.h" +#include "node_supported_features_repo.h" #include "operation_sequencer.h" #include "ownership_transfer_safe_time_point_calculator.h" #include "storage_node_up_states.h" @@ -68,6 +69,7 @@ DistributorStripe::DistributorStripe(DistributorComponentRegister& compReg, _recoveryTimeStarted(_component.getClock()), _tickResult(framework::ThreadWaitInfo::NO_MORE_CRITICAL_WORK_KNOWN), _bucketIdHasher(std::make_unique()), + _node_supported_features_repo(std::make_shared()), _metricLock(), _maintenanceStats(), _bucketSpacesStats(), @@ -871,6 +873,12 @@ DistributorStripe::clear_read_only_bucket_repo_databases() bucket_db_updater().clearReadOnlyBucketRepoDatabases(); } +void +DistributorStripe::update_node_supported_features_repo(std::shared_ptr features_repo) +{ + _node_supported_features_repo = std::move(features_repo); +} + void DistributorStripe::report_bucket_db_status(document::BucketSpace bucket_space, std::ostream& out) const { @@ -889,4 +897,10 @@ DistributorStripe::report_delayed_single_bucket_requests(vespalib::xml::XmlOutpu bucket_db_updater().report_delayed_single_bucket_requests(xos); } +const NodeSupportedFeaturesRepo& +DistributorStripe::node_supported_features_repo() const noexcept +{ + return *_node_supported_features_repo; +} + } diff --git a/storage/src/vespa/storage/distributor/distributor_stripe.h b/storage/src/vespa/storage/distributor/distributor_stripe.h index 5ba682d46e3e..ce6a2071efd2 100644 --- a/storage/src/vespa/storage/distributor/distributor_stripe.h +++ b/storage/src/vespa/storage/distributor/distributor_stripe.h @@ -160,6 +160,8 @@ class DistributorStripe final return *_bucketIdHasher; } + const NodeSupportedFeaturesRepo& node_supported_features_repo() const noexcept override; + StripeBucketDBUpdater& bucket_db_updater() { return _bucketDBUpdater; } const StripeBucketDBUpdater& bucket_db_updater() const { return _bucketDBUpdater; } IdealStateManager& ideal_state_manager() { return _idealStateManager; } @@ -283,6 +285,7 @@ class DistributorStripe final void update_read_snapshot_after_db_pruning(const lib::ClusterStateBundle& new_state) override; void update_read_snapshot_after_activation(const lib::ClusterStateBundle& activated_state) override; void clear_read_only_bucket_repo_databases() override; + void update_node_supported_features_repo(std::shared_ptr features_repo) override; void report_bucket_db_status(document::BucketSpace bucket_space, std::ostream& out) const override; void report_single_bucket_requests(vespalib::xml::XmlOutputStream& xos) const override; void report_delayed_single_bucket_requests(vespalib::xml::XmlOutputStream& xos) const override; @@ -338,6 +341,7 @@ class DistributorStripe final framework::ThreadWaitInfo _tickResult; BucketDBMetricUpdater _bucketDBMetricUpdater; std::unique_ptr _bucketIdHasher; + std::shared_ptr _node_supported_features_repo; mutable std::mutex _metricLock; /** * Maintenance stats for last completed database scan iteration. diff --git a/storage/src/vespa/storage/distributor/distributor_stripe_component.cpp b/storage/src/vespa/storage/distributor/distributor_stripe_component.cpp index f2d2afb8fee4..aa0a22897270 100644 --- a/storage/src/vespa/storage/distributor/distributor_stripe_component.cpp +++ b/storage/src/vespa/storage/distributor/distributor_stripe_component.cpp @@ -277,6 +277,12 @@ DistributorStripeComponent::storage_node_is_up(document::BucketSpace bucket_spac return ns.getState().oneOf(storage_node_up_states()); } +const NodeSupportedFeaturesRepo& +DistributorStripeComponent::node_supported_features_repo() const noexcept +{ + return _distributor.node_supported_features_repo(); +} + std::unique_ptr DistributorStripeComponent::parse_selection(const vespalib::string& selection) const { diff --git a/storage/src/vespa/storage/distributor/distributor_stripe_component.h b/storage/src/vespa/storage/distributor/distributor_stripe_component.h index b274e21ac7c8..5bcf9eec76d9 100644 --- a/storage/src/vespa/storage/distributor/distributor_stripe_component.h +++ b/storage/src/vespa/storage/distributor/distributor_stripe_component.h @@ -70,7 +70,7 @@ class DistributorStripeComponent : public storage::DistributorComponent, */ void update_bucket_database(const document::Bucket& bucket, const BucketCopy& changed_node, - uint32_t update_flags = 0) override { + uint32_t update_flags) override { update_bucket_database(bucket, toVector(changed_node), update_flags); @@ -79,9 +79,9 @@ class DistributorStripeComponent : public storage::DistributorComponent, /** * Adds the given copies to the bucket database. */ - virtual void update_bucket_database(const document::Bucket& bucket, - const std::vector& changed_nodes, - uint32_t update_flags = 0) override; + void update_bucket_database(const document::Bucket& bucket, + const std::vector& changed_nodes, + uint32_t update_flags) override; /** * Removes a copy from the given bucket from the bucket database. @@ -165,6 +165,8 @@ class DistributorStripeComponent : public storage::DistributorComponent, return getDistributor().getBucketIdHasher(); } + const NodeSupportedFeaturesRepo& node_supported_features_repo() const noexcept override; + // Implements DocumentSelectionParser std::unique_ptr parse_selection(const vespalib::string& selection) const override; diff --git a/storage/src/vespa/storage/distributor/distributor_stripe_interface.h b/storage/src/vespa/storage/distributor/distributor_stripe_interface.h index 4f39dd7e5bc2..dfed59499c6e 100644 --- a/storage/src/vespa/storage/distributor/distributor_stripe_interface.h +++ b/storage/src/vespa/storage/distributor/distributor_stripe_interface.h @@ -16,6 +16,7 @@ namespace storage { namespace storage::distributor { class DistributorMetricSet; +class NodeSupportedFeaturesRepo; class PendingMessageTracker; /** @@ -61,6 +62,7 @@ class DistributorStripeInterface : public DistributorStripeMessageSender virtual const DistributorConfiguration& getConfig() const = 0; virtual ChainedMessageSender& getMessageSender() = 0; virtual const BucketGcTimeCalculator::BucketIdHasher& getBucketIdHasher() const = 0; + virtual const NodeSupportedFeaturesRepo& node_supported_features_repo() const noexcept = 0; }; } diff --git a/storage/src/vespa/storage/distributor/distributor_stripe_operation_context.h b/storage/src/vespa/storage/distributor/distributor_stripe_operation_context.h index 5919261ab434..d6f4e5694f64 100644 --- a/storage/src/vespa/storage/distributor/distributor_stripe_operation_context.h +++ b/storage/src/vespa/storage/distributor/distributor_stripe_operation_context.h @@ -16,6 +16,7 @@ namespace storage::lib { class ClusterStateBundle; } namespace storage::distributor { class PendingMessageTracker; +class NodeSupportedFeaturesRepo; /** * Interface with functionality that is used when handling distributor stripe operations. @@ -57,6 +58,7 @@ class DistributorStripeOperationContext { virtual const lib::ClusterStateBundle& cluster_state_bundle() const = 0; virtual bool storage_node_is_up(document::BucketSpace bucket_space, uint32_t node_index) const = 0; virtual const BucketGcTimeCalculator::BucketIdHasher& bucket_id_hasher() const = 0; + virtual const NodeSupportedFeaturesRepo& node_supported_features_repo() const noexcept = 0; }; } diff --git a/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.cpp b/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.cpp index 1a44b79ac3a4..b00e4ce3cbae 100644 --- a/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.cpp +++ b/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.cpp @@ -132,6 +132,14 @@ void MultiThreadedStripeAccessGuard::clear_read_only_bucket_repo_databases() { }); } +void MultiThreadedStripeAccessGuard::update_node_supported_features_repo( + std::shared_ptr features_repo) +{ + for_each_stripe([&](TickableStripe& stripe) { + stripe.update_node_supported_features_repo(features_repo); + }); +} + void MultiThreadedStripeAccessGuard::report_bucket_db_status(document::BucketSpace bucket_space, std::ostream& out) const { for_each_stripe([&](TickableStripe& stripe) { stripe.report_bucket_db_status(bucket_space, out); diff --git a/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.h b/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.h index 53799fa338b1..c52a01fdded2 100644 --- a/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.h +++ b/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.h @@ -54,6 +54,8 @@ class MultiThreadedStripeAccessGuard : public StripeAccessGuard { void update_read_snapshot_after_activation(const lib::ClusterStateBundle& activated_state) override; void clear_read_only_bucket_repo_databases() override; + void update_node_supported_features_repo(std::shared_ptr features_repo) override; + void report_bucket_db_status(document::BucketSpace bucket_space, std::ostream& out) const override; PendingOperationStats pending_operation_stats() const override; void report_single_bucket_requests(vespalib::xml::XmlOutputStream& xos) const override; diff --git a/storage/src/vespa/storage/distributor/node_supported_features.h b/storage/src/vespa/storage/distributor/node_supported_features.h new file mode 100644 index 000000000000..fb9cc68e9707 --- /dev/null +++ b/storage/src/vespa/storage/distributor/node_supported_features.h @@ -0,0 +1,19 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +namespace storage::distributor { + +/** + * Collection of distinct features supported by a particular content node. + * + * Communicated to a distributor via bucket info exchanges. All features + * are initially expected to be unsupported. + */ +struct NodeSupportedFeatures { + bool unordered_merge_chaining = false; + + bool operator==(const NodeSupportedFeatures&) const noexcept = default; +}; + +} diff --git a/storage/src/vespa/storage/distributor/node_supported_features_repo.cpp b/storage/src/vespa/storage/distributor/node_supported_features_repo.cpp new file mode 100644 index 000000000000..e125f360cec2 --- /dev/null +++ b/storage/src/vespa/storage/distributor/node_supported_features_repo.cpp @@ -0,0 +1,37 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "node_supported_features_repo.h" +#include + +namespace storage::distributor { + +NodeSupportedFeaturesRepo::NodeSupportedFeaturesRepo() = default; + +NodeSupportedFeaturesRepo::NodeSupportedFeaturesRepo( + vespalib::hash_map features, + PrivateCtorTag) + : _node_features(std::move(features)) +{} + +NodeSupportedFeaturesRepo::~NodeSupportedFeaturesRepo() = default; + +const NodeSupportedFeatures& +NodeSupportedFeaturesRepo::node_supported_features(uint16_t node_idx) const noexcept +{ + static const NodeSupportedFeatures default_features; + const auto iter = _node_features.find(node_idx); + return (iter != _node_features.end() ? iter->second : default_features); +} + +std::shared_ptr +NodeSupportedFeaturesRepo::make_union_of(const vespalib::hash_map& node_features) const +{ + auto new_features = _node_features; // Must be by copy. + // We always let the _new_ features update any existing mapping. + for (const auto& nf : node_features) { + new_features[nf.first] = nf.second; + } + return std::make_shared(std::move(new_features), PrivateCtorTag{}); +} + +} diff --git a/storage/src/vespa/storage/distributor/node_supported_features_repo.h b/storage/src/vespa/storage/distributor/node_supported_features_repo.h new file mode 100644 index 000000000000..cc40c27b8e2e --- /dev/null +++ b/storage/src/vespa/storage/distributor/node_supported_features_repo.h @@ -0,0 +1,37 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include "node_supported_features.h" +#include +#include + +namespace storage::distributor { + +/** + * Repo of known mappings from node distribution key to feature set supported by + * the content node with the given distribution key. + * + * Entirely immutable; copy-on-write via make_union_of(). + */ +class NodeSupportedFeaturesRepo { + const vespalib::hash_map _node_features; + struct PrivateCtorTag {}; +public: + NodeSupportedFeaturesRepo(); + + NodeSupportedFeaturesRepo(vespalib::hash_map features, PrivateCtorTag); + ~NodeSupportedFeaturesRepo(); + + // Returns supported node features for node with distribution key node_idx, or a default feature set + // with all features unset if node has no known mapping. + [[nodiscard]] const NodeSupportedFeatures& node_supported_features(uint16_t node_idx) const noexcept; + + // Returns a new repo instance containing the union key->features set of self and node_features. + // If there is a duplicate mapping between the two, the features in node_features take precedence + // and will be stored in the new repo. + [[nodiscard]] std::shared_ptr + make_union_of(const vespalib::hash_map& node_features) const; +}; + +} diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp index f951a880e5d2..d220a71966fb 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.cpp @@ -4,6 +4,7 @@ #include #include #include +#include #include #include #include @@ -137,9 +138,8 @@ MergeOperation::onStart(DistributorStripeMessageSender& sender) getBucketId(), _limiter, nodes); - for (uint32_t i=0; i 1) { @@ -148,11 +148,16 @@ MergeOperation::onStart(DistributorStripeMessageSender& sender) _mnodes, _manager->operation_context().generate_unique_timestamp(), clusterState.getVersion()); - - // Due to merge forwarding/chaining semantics, we must always send - // the merge command to the lowest indexed storage node involved in - // the merge in order to avoid deadlocks. - std::sort(_mnodes.begin(), _mnodes.end(), NodeIndexComparator()); + const bool may_send_unordered = (_manager->operation_context().distributor_config().use_unordered_merge_chaining() + && all_involved_nodes_support_unordered_merge_chaining()); + if (!may_send_unordered) { + // Due to merge forwarding/chaining semantics, we must always send + // the merge command to the lowest indexed storage node involved in + // the merge in order to avoid deadlocks. + std::sort(_mnodes.begin(), _mnodes.end(), NodeIndexComparator()); + } else { + msg->set_use_unordered_forwarding(true); + } LOG(debug, "Sending %s to storage node %u", msg->toString().c_str(), _mnodes[0].index); @@ -262,7 +267,7 @@ void MergeOperation::onReceive(DistributorStripeMessageSender& sender, const std::shared_ptr & msg) { - if (_removeOperation.get()) { + if (_removeOperation) { if (_removeOperation->onReceiveInternal(msg)) { _ok = _removeOperation->ok(); if (!_ok) { @@ -277,7 +282,7 @@ MergeOperation::onReceive(DistributorStripeMessageSender& sender, return; } - api::MergeBucketReply& reply(dynamic_cast(*msg)); + auto& reply = dynamic_cast(*msg); LOG(debug, "Merge operation for bucket %s finished", getBucketId().toString().c_str()); @@ -367,6 +372,16 @@ bool MergeOperation::is_global_bucket_merge() const noexcept { return getBucket().getBucketSpace() == document::FixedBucketSpaces::global_space(); } +bool MergeOperation::all_involved_nodes_support_unordered_merge_chaining() const noexcept { + const auto& features_repo = _manager->operation_context().node_supported_features_repo(); + for (uint16_t node : getNodes()) { + if (!features_repo.node_supported_features(node).unordered_merge_chaining) { + return false; + } + } + return true; +} + MergeBucketMetricSet* MergeOperation::get_merge_metrics() { diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.h b/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.h index 832c0f996817..014bae842fa2 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.h +++ b/storage/src/vespa/storage/distributor/operations/idealstate/mergeoperation.h @@ -64,6 +64,7 @@ class MergeOperation : public IdealStateOperation void deleteSourceOnlyNodes(const BucketDatabase::Entry& currentState, DistributorStripeMessageSender& sender); bool is_global_bucket_merge() const noexcept; + bool all_involved_nodes_support_unordered_merge_chaining() const noexcept; MergeBucketMetricSet* get_merge_metrics(); }; diff --git a/storage/src/vespa/storage/distributor/pendingclusterstate.cpp b/storage/src/vespa/storage/distributor/pendingclusterstate.cpp index 1c1c9f4a4312..8183b0136688 100644 --- a/storage/src/vespa/storage/distributor/pendingclusterstate.cpp +++ b/storage/src/vespa/storage/distributor/pendingclusterstate.cpp @@ -9,6 +9,7 @@ #include #include #include +#include #include #include @@ -44,7 +45,8 @@ PendingClusterState::PendingClusterState( _clusterStateVersion(_cmd->getClusterStateBundle().getVersion()), _isVersionedTransition(true), _bucketOwnershipTransfer(false), - _pendingTransitions() + _pendingTransitions(), + _node_features() { logConstructionInformation(); initializeBucketSpaceTransitions(false, outdatedNodesMap); @@ -67,7 +69,8 @@ PendingClusterState::PendingClusterState( _clusterStateVersion(0), _isVersionedTransition(false), _bucketOwnershipTransfer(true), - _pendingTransitions() + _pendingTransitions(), + _node_features() { logConstructionInformation(); initializeBucketSpaceTransitions(true, OutdatedNodesMap()); @@ -287,6 +290,9 @@ PendingClusterState::onRequestBucketInfoReply(const std::shared_ptrsecond->onRequestBucketInfoReply(*reply, bucketSpaceAndNode.node); + + update_node_supported_features_from_reply(iter->second.node, *reply); + _sentMessages.erase(iter); return true; @@ -304,21 +310,6 @@ PendingClusterState::resendDelayedMessages() { } } -std::string -PendingClusterState::requestNodesToString() const -{ - std::ostringstream ost; - for (uint32_t i = 0; i < _requestedNodes.size(); ++i) { - if (_requestedNodes[i]) { - if (ost.str().length() > 0) { - ost << ","; - } - ost << i; - } - } - return ost.str(); -} - void PendingClusterState::merge_into_bucket_databases(StripeAccessGuard& guard) { @@ -366,4 +357,14 @@ PendingClusterState::getPrevClusterStateBundleString() const { return _prevClusterStateBundle.getBaselineClusterState()->toString(); } +void +PendingClusterState::update_node_supported_features_from_reply(uint16_t node, const api::RequestBucketInfoReply& reply) +{ + const auto& src_feat = reply.supported_node_features(); + NodeSupportedFeatures dest_feat; + dest_feat.unordered_merge_chaining = src_feat.unordered_merge_chaining; + // This will overwrite per bucket-space reply, but does not matter since it's independent of bucket space. + _node_features.insert(std::make_pair(node, dest_feat)); +} + } diff --git a/storage/src/vespa/storage/distributor/pendingclusterstate.h b/storage/src/vespa/storage/distributor/pendingclusterstate.h index 0d07730d9ee2..1a2f8901b479 100644 --- a/storage/src/vespa/storage/distributor/pendingclusterstate.h +++ b/storage/src/vespa/storage/distributor/pendingclusterstate.h @@ -1,6 +1,7 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once +#include "node_supported_features.h" #include "pending_bucket_space_db_transition_entry.h" #include "clusterinformation.h" #include @@ -9,6 +10,7 @@ #include #include #include +#include #include "outdated_nodes_map.h" #include #include @@ -151,9 +153,14 @@ class PendingClusterState : public vespalib::XmlSerializable { // Get pending transition for a specific bucket space. Only used by unit test. PendingBucketSpaceDbTransition &getPendingBucketSpaceDbTransition(document::BucketSpace bucketSpace); + // May be a subset of the nodes in the cluster, depending on how many nodes were consulted + // as part of the pending cluster state. Caller must take care to aggregate features. + const vespalib::hash_map& gathered_node_supported_features() const noexcept { + return _node_features; + } + void printXml(vespalib::XmlOutputStream&) const override; Summary getSummary() const; - std::string requestNodesToString() const; private: // With 100ms resend timeout, this requires a particular node to have failed @@ -170,7 +177,7 @@ class PendingClusterState : public vespalib::XmlSerializable { DistributorMessageSender& sender, const BucketSpaceStateMap& bucket_space_states, const std::shared_ptr& newStateCmd, - const OutdatedNodesMap &outdatedNodesMap, + const OutdatedNodesMap& outdatedNodesMap, api::Timestamp creationTimestamp); /** @@ -213,6 +220,7 @@ class PendingClusterState : public vespalib::XmlSerializable { std::string getNewClusterStateBundleString() const; std::string getPrevClusterStateBundleString() const; void update_reply_failure_statistics(const api::ReturnCode& result, const BucketSpaceAndNode& source); + void update_node_supported_features_from_reply(uint16_t node, const api::RequestBucketInfoReply& reply); std::shared_ptr _cmd; @@ -233,6 +241,7 @@ class PendingClusterState : public vespalib::XmlSerializable { bool _isVersionedTransition; bool _bucketOwnershipTransfer; std::unordered_map, document::BucketSpace::hash> _pendingTransitions; + vespalib::hash_map _node_features; }; } diff --git a/storage/src/vespa/storage/distributor/stripe_access_guard.h b/storage/src/vespa/storage/distributor/stripe_access_guard.h index bfc53c0ed821..2ed40cfcf2ee 100644 --- a/storage/src/vespa/storage/distributor/stripe_access_guard.h +++ b/storage/src/vespa/storage/distributor/stripe_access_guard.h @@ -20,6 +20,8 @@ namespace vespalib::xml { class XmlOutputStream; } namespace storage::distributor { +class NodeSupportedFeaturesRepo; + /** * A stripe access guard guarantees that the holder of a guard can access underlying * stripes via it in a thread safe manner. In particular, while any access guard is @@ -57,6 +59,8 @@ class StripeAccessGuard { virtual void update_read_snapshot_after_activation(const lib::ClusterStateBundle& activated_state) = 0; virtual void clear_read_only_bucket_repo_databases() = 0; + virtual void update_node_supported_features_repo(std::shared_ptr features_repo) = 0; + struct PendingOperationStats { size_t external_load_operations; size_t maintenance_operations; diff --git a/storage/src/vespa/storage/distributor/tickable_stripe.h b/storage/src/vespa/storage/distributor/tickable_stripe.h index d58b1e2e6aa3..e458043ac641 100644 --- a/storage/src/vespa/storage/distributor/tickable_stripe.h +++ b/storage/src/vespa/storage/distributor/tickable_stripe.h @@ -15,6 +15,8 @@ namespace vespalib::xml { class XmlOutputStream; } namespace storage::distributor { +class NodeSupportedFeaturesRepo; + /** * A tickable stripe is the minimal binding glue between the stripe's worker thread and * the actual implementation. Primarily allows for easier testing without having to @@ -58,6 +60,8 @@ class TickableStripe { virtual void update_read_snapshot_after_db_pruning(const lib::ClusterStateBundle& new_state) = 0; virtual void update_read_snapshot_after_activation(const lib::ClusterStateBundle& activated_state) = 0; virtual void clear_read_only_bucket_repo_databases() = 0; + virtual void update_node_supported_features_repo(std::shared_ptr features_repo) = 0; + // Functions used for state reporting virtual void report_bucket_db_status(document::BucketSpace bucket_space, std::ostream& out) const = 0; virtual StripeAccessGuard::PendingOperationStats pending_operation_stats() const = 0; diff --git a/storage/src/vespa/storage/distributor/top_level_bucket_db_updater.cpp b/storage/src/vespa/storage/distributor/top_level_bucket_db_updater.cpp index 8fc6d7576c95..613f0f6ce097 100644 --- a/storage/src/vespa/storage/distributor/top_level_bucket_db_updater.cpp +++ b/storage/src/vespa/storage/distributor/top_level_bucket_db_updater.cpp @@ -7,6 +7,7 @@ #include "top_level_distributor.h" #include "distributor_bucket_space.h" #include "distributormetricsset.h" +#include "node_supported_features_repo.h" #include "simpleclusterinformation.h" #include "stripe_access_guard.h" #include @@ -47,11 +48,12 @@ TopLevelBucketDBUpdater::TopLevelBucketDBUpdater(const DistributorNodeContext& n _chained_sender(chained_sender), _outdated_nodes_map(), _transition_timer(_node_ctx.clock()), + _node_supported_features_repo(std::make_shared()), _stale_reads_enabled(false) { // FIXME STRIPE top-level Distributor needs a proper way to track the current cluster state bundle! propagate_active_state_bundle_internally(true); // We're just starting up so assume ownership transfer. - bootstrap_distribution_config(bootstrap_distribution); + bootstrap_distribution_config(std::move(bootstrap_distribution)); } TopLevelBucketDBUpdater::~TopLevelBucketDBUpdater() = default; @@ -393,6 +395,10 @@ TopLevelBucketDBUpdater::activate_pending_cluster_state(StripeAccessGuard& guard guard.notify_distribution_change_enabled(); } + _node_supported_features_repo = _node_supported_features_repo->make_union_of( + _pending_cluster_state->gathered_node_supported_features()); + guard.update_node_supported_features_repo(_node_supported_features_repo); + guard.update_read_snapshot_after_activation(_pending_cluster_state->getNewClusterStateBundle()); _pending_cluster_state.reset(); _outdated_nodes_map.clear(); diff --git a/storage/src/vespa/storage/distributor/top_level_bucket_db_updater.h b/storage/src/vespa/storage/distributor/top_level_bucket_db_updater.h index f35991c20f3d..b1065e708a43 100644 --- a/storage/src/vespa/storage/distributor/top_level_bucket_db_updater.h +++ b/storage/src/vespa/storage/distributor/top_level_bucket_db_updater.h @@ -30,6 +30,7 @@ struct BucketSpaceDistributionConfigs; class BucketSpaceDistributionContext; class ClusterStateBundleActivationListener; class DistributorInterface; +class NodeSupportedFeaturesRepo; class StripeAccessor; class StripeAccessGuard; @@ -122,6 +123,7 @@ class TopLevelBucketDBUpdater : public framework::StatusReporter, ChainedMessageSender& _chained_sender; OutdatedNodesMap _outdated_nodes_map; framework::MilliSecTimer _transition_timer; + std::shared_ptr _node_supported_features_repo; std::atomic _stale_reads_enabled; }; diff --git a/storage/src/vespa/storage/storageserver/mergethrottler.cpp b/storage/src/vespa/storage/storageserver/mergethrottler.cpp index 05e504922064..bc2f54e5a507 100644 --- a/storage/src/vespa/storage/storageserver/mergethrottler.cpp +++ b/storage/src/vespa/storage/storageserver/mergethrottler.cpp @@ -113,30 +113,40 @@ MergeThrottler::MergeOperationMetrics::MergeOperationMetrics(const std::string& } MergeThrottler::MergeOperationMetrics::~MergeOperationMetrics() = default; -MergeThrottler::MergeNodeSequence::MergeNodeSequence( - const api::MergeBucketCommand& cmd, - uint16_t thisIndex) +MergeThrottler::MergeNodeSequence::MergeNodeSequence(const api::MergeBucketCommand& cmd, uint16_t thisIndex) : _cmd(cmd), _sortedNodes(cmd.getNodes()), - _sortedIndex(std::numeric_limits::max()), - _thisIndex(thisIndex) + _sortedIndex(UINT16_MAX), + _unordered_index(UINT16_MAX), + _thisIndex(thisIndex), + _use_unordered_forwarding(cmd.use_unordered_forwarding()) { // Sort the node vector so that we can find out if we're the // last node in the chain or if we should forward the merge std::sort(_sortedNodes.begin(), _sortedNodes.end(), NodeComparator()); - assert(!_sortedNodes.empty()); - for (std::size_t i = 0; i < _sortedNodes.size(); ++i) { + assert(!_sortedNodes.empty() && (_sortedNodes.size() < UINT16_MAX)); + for (uint16_t i = 0; i < static_cast(_sortedNodes.size()); ++i) { if (_sortedNodes[i].index == _thisIndex) { _sortedIndex = i; break; } } + const auto& nodes = unordered_nodes(); + for (uint16_t i = 0; i < static_cast(nodes.size()); ++i) { + if (nodes[i].index == _thisIndex) { + _unordered_index = i; + break; + } + } } uint16_t MergeThrottler::MergeNodeSequence::getNextNodeInChain() const { assert(_cmd.getChain().size() < _sortedNodes.size()); + if (_use_unordered_forwarding) { + return unordered_nodes()[_cmd.getChain().size() + 1].index; + } // assert(_sortedNodes[_cmd.getChain().size()].index == _thisIndex); if (_sortedNodes[_cmd.getChain().size()].index != _thisIndex) { // Some added paranoia output @@ -153,7 +163,11 @@ MergeThrottler::MergeNodeSequence::isChainCompleted() const { if (_cmd.getChain().size() != _sortedNodes.size()) return false; - for (std::size_t i = 0; i < _cmd.getChain().size(); ++i) { + if (_use_unordered_forwarding) { + return true; // Expect chain to be correct if size matches node sequence size. TODO can't we always do this? + } + + for (size_t i = 0; i < _cmd.getChain().size(); ++i) { if (_cmd.getChain()[i] != _sortedNodes[i].index) { return false; } @@ -162,10 +176,10 @@ MergeThrottler::MergeNodeSequence::isChainCompleted() const } bool -MergeThrottler::MergeNodeSequence::chainContainsIndex(uint16_t idx) const +MergeThrottler::MergeNodeSequence::chain_contains_this_node() const noexcept { - for (std::size_t i = 0; i < _cmd.getChain().size(); ++i) { - if (_cmd.getChain()[i] == idx) { + for (size_t i = 0; i < _cmd.getChain().size(); ++i) { + if (_cmd.getChain()[i] == _thisIndex) { return true; } } @@ -358,6 +372,7 @@ MergeThrottler::forwardCommandToNode( fwdMerge->setSourceIndex(mergeCmd.getSourceIndex()); fwdMerge->setPriority(mergeCmd.getPriority()); fwdMerge->setTimeout(mergeCmd.getTimeout()); + fwdMerge->set_use_unordered_forwarding(mergeCmd.use_unordered_forwarding()); msgGuard.sendUp(fwdMerge); } @@ -374,7 +389,7 @@ api::StorageMessage::SP MergeThrottler::getNextQueuedMerge() { if (_queue.empty()) { - return api::StorageMessage::SP(); + return {}; } auto iter = _queue.begin(); @@ -385,7 +400,7 @@ MergeThrottler::getNextQueuedMerge() } void -MergeThrottler::enqueueMerge( +MergeThrottler::enqueue_merge_for_later_processing( const api::StorageMessage::SP& msg, MessageGuard& msgGuard) { @@ -395,9 +410,10 @@ MergeThrottler::enqueueMerge( if (!validateNewMerge(mergeCmd, nodeSeq, msgGuard)) { return; } - const bool is_forwarded_merge = _disable_queue_limits_for_chained_merges && !mergeCmd.getChain().empty(); + // TODO remove once unordered merges are default, since forwarded unordered merges are never enqueued + const bool is_forwarded_merge = _disable_queue_limits_for_chained_merges && !mergeCmd.from_distributor(); _queue.emplace(msg, _queueSequence++, is_forwarded_merge); - _metrics->queueSize.set(_queue.size()); + _metrics->queueSize.set(static_cast(_queue.size())); } bool @@ -682,11 +698,30 @@ bool MergeThrottler::backpressure_mode_active() const { return backpressure_mode_active_no_lock(); } -bool MergeThrottler::allow_merge_with_queue_full(const api::MergeBucketCommand& cmd) const noexcept { - // We let any merge through that has already passed through at least one other node's merge - // window, as that has already taken up a logical resource slot on all those nodes. Busy-bouncing - // a merge at that point would undo a great amount of thumb-twiddling and waiting. - return (_disable_queue_limits_for_chained_merges && !cmd.getChain().empty()); +bool MergeThrottler::allow_merge_despite_full_window(const api::MergeBucketCommand& cmd) noexcept { + // We cannot let forwarded unordered merges fall into the queue, as that might lead to a deadlock. + // See comment in may_allow_into_queue() for rationale. + return (cmd.use_unordered_forwarding() && !cmd.from_distributor()); +} + +bool MergeThrottler::may_allow_into_queue(const api::MergeBucketCommand& cmd) const noexcept { + // We cannot let forwarded unordered merges fall into the queue, as that might lead to a deadlock. + // Consider the following scenario, with two nodes C0 and C1, each with a low window size of 1 (low + // limit chosen for demonstration purposes, but is entirely generalizable): + // 1. Node 0 receives merge M_x for nodes [0, 1], places in active window, forwards to node 1 + // 2. Node 1 receives merge M_y for nodes [1, 0], places in active window, forwards to node 0 + // 3. Node 0 receives merge M_y from node 1. Active window is full, so places in queue + // 4. Node 1 receives merge M_x from node 0. Active window is full, so places in queue + // 5. Neither M_x nor M_y will ever complete since they're waiting for resources that cannot be + // freed up before they themselves complete. Classic deadlock(tm). + // + // We do, however, allow enqueueing unordered merges that come straight from the distributor, as + // those cannot cause a deadlock at that point in time. + if (cmd.use_unordered_forwarding()) { + return cmd.from_distributor(); + } + return ((_queue.size() < _maxQueueSize) + || (_disable_queue_limits_for_chained_merges && !cmd.from_distributor())); } // Must be run from worker thread @@ -716,10 +751,10 @@ MergeThrottler::handleMessageDown( if (isMergeAlreadyKnown(msg)) { processCycledMergeCommand(msg, msgGuard); - } else if (canProcessNewMerge()) { + } else if (canProcessNewMerge() || allow_merge_despite_full_window(mergeCmd)) { processNewMergeCommand(msg, msgGuard); - } else if ((_queue.size() < _maxQueueSize) || allow_merge_with_queue_full(mergeCmd)) { - enqueueMerge(msg, msgGuard); // Queue for later processing + } else if (may_allow_into_queue(mergeCmd)) { + enqueue_merge_for_later_processing(msg, msgGuard); } else { // No more room at the inn. Return BUSY so that the // distributor will wait a bit before retrying @@ -773,7 +808,7 @@ MergeThrottler::validateNewMerge( << _component.getIndex() << ", which is not in its forwarding chain"; LOG(error, "%s", oss.str().data()); - } else if (mergeCmd.getChain().size() >= nodeSeq.getSortedNodes().size()) { + } else if (mergeCmd.getChain().size() >= nodeSeq.unordered_nodes().size()) { // Chain is full but we haven't seen the merge! This means // the node has probably gone down with a merge it previously // forwarded only now coming back to haunt it. @@ -781,7 +816,7 @@ MergeThrottler::validateNewMerge( << " is not in node's internal state, but has a " << "full chain, meaning it cannot be forwarded."; LOG(debug, "%s", oss.str().data()); - } else if (nodeSeq.chainContainsIndex(nodeSeq.getThisNodeIndex())) { + } else if (nodeSeq.chain_contains_this_node()) { oss << mergeCmd.toString() << " is not in node's internal state, but contains " << "this node in its non-full chain. This should not happen!"; @@ -831,7 +866,9 @@ MergeThrottler::processNewMergeCommand( // If chain is empty and this node is not the lowest // index in the nodeset, immediately execute. Required for // backwards compatibility with older distributor versions. - if (mergeCmd.getChain().empty() + // TODO remove this + if (mergeCmd.from_distributor() + && !mergeCmd.use_unordered_forwarding() && (nodeSeq.getSortedNodes()[0].index != _component.getIndex())) { LOG(debug, "%s has empty chain and was sent to node that " @@ -1039,7 +1076,6 @@ bool MergeThrottler::onSetSystemState( const std::shared_ptr& stateCmd) { - LOG(debug, "New cluster state arrived with version %u, flushing " "all outdated queued merges", diff --git a/storage/src/vespa/storage/storageserver/mergethrottler.h b/storage/src/vespa/storage/storageserver/mergethrottler.h index da301172a3aa..c115d36ad898 100644 --- a/storage/src/vespa/storage/storageserver/mergethrottler.h +++ b/storage/src/vespa/storage/storageserver/mergethrottler.h @@ -161,7 +161,7 @@ class MergeThrottler : public framework::Runnable, ActiveMergeMap _merges; MergePriorityQueue _queue; - std::size_t _maxQueueSize; + size_t _maxQueueSize; mbus::StaticThrottlePolicy::UP _throttlePolicy; uint64_t _queueSequence; // TODO: move into a stable priority queue class mutable std::mutex _messageLock; @@ -220,7 +220,7 @@ class MergeThrottler : public framework::Runnable, std::mutex& getStateLock() { return _stateLock; } Metrics& getMetrics() { return *_metrics; } - std::size_t getMaxQueueSize() const { return _maxQueueSize; } + size_t getMaxQueueSize() const { return _maxQueueSize; } void print(std::ostream& out, bool verbose, const std::string& indent) const override; void reportHtmlStatus(std::ostream&, const framework::HttpUrlPath&) const override; private: @@ -230,17 +230,18 @@ class MergeThrottler : public framework::Runnable, struct MergeNodeSequence { const api::MergeBucketCommand& _cmd; std::vector _sortedNodes; - std::size_t _sortedIndex; // Index of current storage node in the sorted node sequence + uint16_t _sortedIndex; // Index of current storage node in the sorted node sequence + uint16_t _unordered_index; const uint16_t _thisIndex; // Index of the current storage node + bool _use_unordered_forwarding; MergeNodeSequence(const api::MergeBucketCommand& cmd, uint16_t thisIndex); - std::size_t getSortedIndex() const { return _sortedIndex; } const std::vector& getSortedNodes() const { return _sortedNodes; } bool isIndexUnknown() const { - return (_sortedIndex == std::numeric_limits::max()); + return (_sortedIndex == UINT16_MAX); } /** * This node is the merge executor if it's the first element in the @@ -252,11 +253,17 @@ class MergeThrottler : public framework::Runnable, uint16_t getExecutorNodeIndex() const{ return _cmd.getNodes()[0].index; } - bool isLastNode() const { - return (_sortedIndex == _sortedNodes.size() - 1); + const std::vector& unordered_nodes() const noexcept { + return _cmd.getNodes(); } - bool chainContainsIndex(uint16_t idx) const; - uint16_t getThisNodeIndex() const { return _thisIndex; } + [[nodiscard]] bool isLastNode() const { + if (!_use_unordered_forwarding) { + return (_sortedIndex == _sortedNodes.size() - 1); + } else { + return (_unordered_index == (unordered_nodes().size() - 1)); + } + } + [[nodiscard]] bool chain_contains_this_node() const noexcept; /** * Gets node to forward to in strictly increasing order. */ @@ -339,7 +346,7 @@ class MergeThrottler : public framework::Runnable, * @return Highest priority waiting merge or null SP if queue is empty */ api::StorageMessage::SP getNextQueuedMerge(); - void enqueueMerge(const api::StorageMessage::SP& msg, MessageGuard& msgGuard); + void enqueue_merge_for_later_processing(const api::StorageMessage::SP& msg, MessageGuard& msgGuard); /** * @return true if throttle policy says at least one additional @@ -347,12 +354,13 @@ class MergeThrottler : public framework::Runnable, */ bool canProcessNewMerge() const; - bool merge_is_backpressure_throttled(const api::MergeBucketCommand& cmd) const; + [[nodiscard]] bool merge_is_backpressure_throttled(const api::MergeBucketCommand& cmd) const; void bounce_backpressure_throttled_merge(const api::MergeBucketCommand& cmd, MessageGuard& guard); - bool merge_has_this_node_as_source_only_node(const api::MergeBucketCommand& cmd) const; - bool backpressure_mode_active_no_lock() const; + [[nodiscard]] bool merge_has_this_node_as_source_only_node(const api::MergeBucketCommand& cmd) const; + [[nodiscard]] bool backpressure_mode_active_no_lock() const; void backpressure_bounce_all_queued_merges(MessageGuard& guard); - bool allow_merge_with_queue_full(const api::MergeBucketCommand& cmd) const noexcept; + [[nodiscard]] static bool allow_merge_despite_full_window(const api::MergeBucketCommand& cmd) noexcept; + [[nodiscard]] bool may_allow_into_queue(const api::MergeBucketCommand& cmd) const noexcept; void sendReply(const api::MergeBucketCommand& cmd, const api::ReturnCode& result, diff --git a/storageapi/src/tests/mbusprot/storageprotocoltest.cpp b/storageapi/src/tests/mbusprot/storageprotocoltest.cpp index a6021a7cfd2e..6a00ddc8d8ee 100644 --- a/storageapi/src/tests/mbusprot/storageprotocoltest.cpp +++ b/storageapi/src/tests/mbusprot/storageprotocoltest.cpp @@ -410,6 +410,12 @@ TEST_P(StorageProtocolTest, request_bucket_info) { // "Last modified" not counted by operator== for some reason. Testing // separately until we can figure out if this is by design or not. EXPECT_EQ(lastMod, entries[0]._info.getLastModified()); + + if (GetParam().getMajor() >= 7) { + EXPECT_TRUE(reply2->supported_node_features().unordered_merge_chaining); + } else { + EXPECT_FALSE(reply2->supported_node_features().unordered_merge_chaining); + } } } @@ -471,12 +477,18 @@ TEST_P(StorageProtocolTest, merge_bucket) { chain.push_back(14); auto cmd = std::make_shared(_bucket, nodes, Timestamp(1234), 567, chain); + cmd->set_use_unordered_forwarding(true); auto cmd2 = copyCommand(cmd); EXPECT_EQ(_bucket, cmd2->getBucket()); EXPECT_EQ(nodes, cmd2->getNodes()); EXPECT_EQ(Timestamp(1234), cmd2->getMaxTimestamp()); EXPECT_EQ(uint32_t(567), cmd2->getClusterStateVersion()); EXPECT_EQ(chain, cmd2->getChain()); + if (GetParam().getMajor() >= 7) { + EXPECT_EQ(cmd2->use_unordered_forwarding(), cmd->use_unordered_forwarding()); + } else { + EXPECT_FALSE(cmd2->use_unordered_forwarding()); + } auto reply = std::make_shared(*cmd); auto reply2 = copyReply(reply); diff --git a/storageapi/src/vespa/storageapi/mbusprot/protobuf/maintenance.proto b/storageapi/src/vespa/storageapi/mbusprot/protobuf/maintenance.proto index 34d67fdc00c7..7f7ab1d7c0b6 100644 --- a/storageapi/src/vespa/storageapi/mbusprot/protobuf/maintenance.proto +++ b/storageapi/src/vespa/storageapi/mbusprot/protobuf/maintenance.proto @@ -38,6 +38,7 @@ message MergeBucketRequest { uint64 max_timestamp = 3; repeated MergeNode nodes = 4; repeated uint32 node_chain = 5; + bool unordered_forwarding = 6; } message MergeBucketResponse { @@ -108,8 +109,14 @@ message BucketAndBucketInfo { BucketInfo bucket_info = 2; } +message SupportedNodeFeatures { + bool unordered_merge_chaining = 1; +} + message RequestBucketInfoResponse { repeated BucketAndBucketInfo bucket_infos = 1; + // Only present for full bucket info fetches (not for explicit buckets) + SupportedNodeFeatures supported_node_features = 2; } message NotifyBucketChangeRequest { diff --git a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp index bb4cb6e24a3c..8425294cbbd8 100644 --- a/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp +++ b/storageapi/src/vespa/storageapi/mbusprot/protocolserialization7.cpp @@ -766,6 +766,7 @@ void ProtocolSerialization7::onEncode(GBBuf& buf, const api::MergeBucketCommand& set_merge_nodes(*req.mutable_nodes(), msg.getNodes()); req.set_max_timestamp(msg.getMaxTimestamp()); req.set_cluster_state_version(msg.getClusterStateVersion()); + req.set_unordered_forwarding(msg.use_unordered_forwarding()); for (uint16_t chain_node : msg.getChain()) { req.add_node_chain(chain_node); } @@ -787,6 +788,7 @@ api::StorageCommand::UP ProtocolSerialization7::onDecodeMergeBucketCommand(BBuf& chain.emplace_back(node); } cmd->setChain(std::move(chain)); + cmd->set_use_unordered_forwarding(req.unordered_forwarding()); return cmd; }); } @@ -999,6 +1001,10 @@ void ProtocolSerialization7::onEncode(GBBuf& buf, const api::RequestBucketInfoRe bucket_and_info->set_raw_bucket_id(entry._bucketId.getRawId()); set_bucket_info(*bucket_and_info->mutable_bucket_info(), entry._info); } + // We mark features as available at protocol level. Only included for full bucket fetch responses. + if (msg.full_bucket_fetch()) { + res.mutable_supported_node_features()->set_unordered_merge_chaining(true); + } }); } @@ -1035,6 +1041,11 @@ api::StorageReply::UP ProtocolSerialization7::onDecodeRequestBucketInfoReply(con dest_entries[i]._bucketId = document::BucketId(proto_entry.raw_bucket_id()); dest_entries[i]._info = get_bucket_info(proto_entry.bucket_info()); } + if (res.has_supported_node_features()) { + const auto& src_features = res.supported_node_features(); + auto& dest_features = reply->supported_node_features(); + dest_features.unordered_merge_chaining = src_features.unordered_merge_chaining(); + } return reply; }); } diff --git a/storageapi/src/vespa/storageapi/message/bucket.cpp b/storageapi/src/vespa/storageapi/message/bucket.cpp index 360db5ea3d79..04a40fbc8856 100644 --- a/storageapi/src/vespa/storageapi/message/bucket.cpp +++ b/storageapi/src/vespa/storageapi/message/bucket.cpp @@ -107,7 +107,8 @@ MergeBucketCommand::MergeBucketCommand( _nodes(nodes), _maxTimestamp(maxTimestamp), _clusterStateVersion(clusterStateVersion), - _chain(chain) + _chain(chain), + _use_unordered_forwarding(false) {} MergeBucketCommand::~MergeBucketCommand() = default; @@ -128,6 +129,9 @@ MergeBucketCommand::print(std::ostream& out, bool verbose, const std::string& in out << _chain[i]; } out << "]"; + if (_use_unordered_forwarding) { + out << " (unordered forwarding)"; + } out << ", reasons to start: " << _reason; out << ")"; if (verbose) { diff --git a/storageapi/src/vespa/storageapi/message/bucket.h b/storageapi/src/vespa/storageapi/message/bucket.h index c24ed55d7a80..5fd79ffffea8 100644 --- a/storageapi/src/vespa/storageapi/message/bucket.h +++ b/storageapi/src/vespa/storageapi/message/bucket.h @@ -118,6 +118,7 @@ class MergeBucketCommand : public MaintenanceCommand { Timestamp _maxTimestamp; uint32_t _clusterStateVersion; std::vector _chain; + bool _use_unordered_forwarding; public: MergeBucketCommand(const document::Bucket &bucket, @@ -133,6 +134,11 @@ class MergeBucketCommand : public MaintenanceCommand { uint32_t getClusterStateVersion() const { return _clusterStateVersion; } void setClusterStateVersion(uint32_t version) { _clusterStateVersion = version; } void setChain(const std::vector& chain) { _chain = chain; } + void set_use_unordered_forwarding(bool unordered_forwarding) noexcept { + _use_unordered_forwarding = unordered_forwarding; + } + [[nodiscard]] bool use_unordered_forwarding() const noexcept { return _use_unordered_forwarding; } + [[nodiscard]] bool from_distributor() const noexcept { return _chain.empty(); } void print(std::ostream& out, bool verbose, const std::string& indent) const override; DECLARE_STORAGECOMMAND(MergeBucketCommand, onMergeBucket) }; @@ -385,19 +391,30 @@ class RequestBucketInfoReply : public StorageReply { : _bucketId(id), _info(info) {} friend std::ostream& operator<<(std::ostream& os, const Entry&); }; - typedef vespalib::Array EntryVector; + struct SupportedNodeFeatures { + bool unordered_merge_chaining = false; + }; + using EntryVector = vespalib::Array; private: - EntryVector _buckets; - bool _full_bucket_fetch; - document::BucketId _super_bucket_id; + EntryVector _buckets; + bool _full_bucket_fetch; + document::BucketId _super_bucket_id; + SupportedNodeFeatures _supported_node_features; public: explicit RequestBucketInfoReply(const RequestBucketInfoCommand& cmd); - ~RequestBucketInfoReply(); + ~RequestBucketInfoReply() override; const EntryVector & getBucketInfo() const { return _buckets; } EntryVector & getBucketInfo() { return _buckets; } [[nodiscard]] bool full_bucket_fetch() const noexcept { return _full_bucket_fetch; } + // Only contains useful information if full_bucket_fetch() == true + [[nodiscard]] const SupportedNodeFeatures& supported_node_features() const noexcept { + return _supported_node_features; + } + [[nodiscard]] SupportedNodeFeatures& supported_node_features() noexcept { + return _supported_node_features; + } const document::BucketId& super_bucket_id() const { return _super_bucket_id; } void print(std::ostream& out, bool verbose, const std::string& indent) const override; DECLARE_STORAGEREPLY(RequestBucketInfoReply, onRequestBucketInfoReply)