diff --git a/storage/src/vespa/storage/config/stor-distributormanager.def b/storage/src/vespa/storage/config/stor-distributormanager.def index 990a0530ecd6..8021075faa39 100644 --- a/storage/src/vespa/storage/config/stor-distributormanager.def +++ b/storage/src/vespa/storage/config/stor-distributormanager.def @@ -287,8 +287,9 @@ num_distributor_stripes int default=0 restart ## blocking of later buckets in the priority database. implicitly_clear_bucket_priority_on_schedule bool default=false -## Enables sending merges that are not forwarded between content nodes in strictly -## increasing node key order. Even if this config is set to true, unordered merges -## will only be sent if _all_ nodes involved in a given merge have previously -## reported (as part of bucket info fetching) that they support the unordered merge feature. +## Enables sending merges that are forwarded between content nodes in ideal state node key +## order, instead of strictly increasing node key order (which is the default). +## Even if this config is set to true, unordered merges will only be sent if _all_ nodes +## involved in a given merge have previously reported (as part of bucket info fetching) +## that they support the unordered merge feature. use_unordered_merge_chaining bool default=false diff --git a/storage/src/vespa/storage/storageserver/mergethrottler.cpp b/storage/src/vespa/storage/storageserver/mergethrottler.cpp index 07ddfd82d0b1..bc2f54e5a507 100644 --- a/storage/src/vespa/storage/storageserver/mergethrottler.cpp +++ b/storage/src/vespa/storage/storageserver/mergethrottler.cpp @@ -411,7 +411,7 @@ MergeThrottler::enqueue_merge_for_later_processing( return; } // TODO remove once unordered merges are default, since forwarded unordered merges are never enqueued - const bool is_forwarded_merge = _disable_queue_limits_for_chained_merges && !mergeCmd.getChain().empty(); + const bool is_forwarded_merge = _disable_queue_limits_for_chained_merges && !mergeCmd.from_distributor(); _queue.emplace(msg, _queueSequence++, is_forwarded_merge); _metrics->queueSize.set(static_cast(_queue.size())); } @@ -701,7 +701,7 @@ bool MergeThrottler::backpressure_mode_active() const { bool MergeThrottler::allow_merge_despite_full_window(const api::MergeBucketCommand& cmd) noexcept { // We cannot let forwarded unordered merges fall into the queue, as that might lead to a deadlock. // See comment in may_allow_into_queue() for rationale. - return (cmd.use_unordered_forwarding() && !cmd.getChain().empty()); + return (cmd.use_unordered_forwarding() && !cmd.from_distributor()); } bool MergeThrottler::may_allow_into_queue(const api::MergeBucketCommand& cmd) const noexcept { @@ -717,10 +717,11 @@ bool MergeThrottler::may_allow_into_queue(const api::MergeBucketCommand& cmd) co // // We do, however, allow enqueueing unordered merges that come straight from the distributor, as // those cannot cause a deadlock at that point in time. - return (((_queue.size() < _maxQueueSize) - || (_disable_queue_limits_for_chained_merges && !cmd.getChain().empty())) - && (!cmd.use_unordered_forwarding() - || (cmd.use_unordered_forwarding() && cmd.getChain().empty()))); + if (cmd.use_unordered_forwarding()) { + return cmd.from_distributor(); + } + return ((_queue.size() < _maxQueueSize) + || (_disable_queue_limits_for_chained_merges && !cmd.from_distributor())); } // Must be run from worker thread @@ -866,7 +867,7 @@ MergeThrottler::processNewMergeCommand( // index in the nodeset, immediately execute. Required for // backwards compatibility with older distributor versions. // TODO remove this - if (mergeCmd.getChain().empty() + if (mergeCmd.from_distributor() && !mergeCmd.use_unordered_forwarding() && (nodeSeq.getSortedNodes()[0].index != _component.getIndex())) { diff --git a/storageapi/src/vespa/storageapi/message/bucket.h b/storageapi/src/vespa/storageapi/message/bucket.h index d62888e0527e..5fd79ffffea8 100644 --- a/storageapi/src/vespa/storageapi/message/bucket.h +++ b/storageapi/src/vespa/storageapi/message/bucket.h @@ -138,6 +138,7 @@ class MergeBucketCommand : public MaintenanceCommand { _use_unordered_forwarding = unordered_forwarding; } [[nodiscard]] bool use_unordered_forwarding() const noexcept { return _use_unordered_forwarding; } + [[nodiscard]] bool from_distributor() const noexcept { return _chain.empty(); } void print(std::ostream& out, bool verbose, const std::string& indent) const override; DECLARE_STORAGECOMMAND(MergeBucketCommand, onMergeBucket) };