Skip to content

Commit

Permalink
Address PR comments
Browse files Browse the repository at this point in the history
* Add `from_distributor()` utility function to `MergeBucketCommand`
* Simplify boolean expression by moving sub-expression to own statement
* Improve wording of config parameter
  • Loading branch information
vekterli committed Nov 15, 2021
1 parent 88aef24 commit 31d7bb1
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 11 deletions.
9 changes: 5 additions & 4 deletions storage/src/vespa/storage/config/stor-distributormanager.def
Original file line number Diff line number Diff line change
Expand Up @@ -287,8 +287,9 @@ num_distributor_stripes int default=0 restart
## blocking of later buckets in the priority database.
implicitly_clear_bucket_priority_on_schedule bool default=false

## Enables sending merges that are not forwarded between content nodes in strictly
## increasing node key order. Even if this config is set to true, unordered merges
## will only be sent if _all_ nodes involved in a given merge have previously
## reported (as part of bucket info fetching) that they support the unordered merge feature.
## Enables sending merges that are forwarded between content nodes in ideal state node key
## order, instead of strictly increasing node key order (which is the default).
## Even if this config is set to true, unordered merges will only be sent if _all_ nodes
## involved in a given merge have previously reported (as part of bucket info fetching)
## that they support the unordered merge feature.
use_unordered_merge_chaining bool default=false
15 changes: 8 additions & 7 deletions storage/src/vespa/storage/storageserver/mergethrottler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,7 @@ MergeThrottler::enqueue_merge_for_later_processing(
return;
}
// TODO remove once unordered merges are default, since forwarded unordered merges are never enqueued
const bool is_forwarded_merge = _disable_queue_limits_for_chained_merges && !mergeCmd.getChain().empty();
const bool is_forwarded_merge = _disable_queue_limits_for_chained_merges && !mergeCmd.from_distributor();
_queue.emplace(msg, _queueSequence++, is_forwarded_merge);
_metrics->queueSize.set(static_cast<int64_t>(_queue.size()));
}
Expand Down Expand Up @@ -701,7 +701,7 @@ bool MergeThrottler::backpressure_mode_active() const {
bool MergeThrottler::allow_merge_despite_full_window(const api::MergeBucketCommand& cmd) noexcept {
// We cannot let forwarded unordered merges fall into the queue, as that might lead to a deadlock.
// See comment in may_allow_into_queue() for rationale.
return (cmd.use_unordered_forwarding() && !cmd.getChain().empty());
return (cmd.use_unordered_forwarding() && !cmd.from_distributor());
}

bool MergeThrottler::may_allow_into_queue(const api::MergeBucketCommand& cmd) const noexcept {
Expand All @@ -717,10 +717,11 @@ bool MergeThrottler::may_allow_into_queue(const api::MergeBucketCommand& cmd) co
//
// We do, however, allow enqueueing unordered merges that come straight from the distributor, as
// those cannot cause a deadlock at that point in time.
return (((_queue.size() < _maxQueueSize)
|| (_disable_queue_limits_for_chained_merges && !cmd.getChain().empty()))
&& (!cmd.use_unordered_forwarding()
|| (cmd.use_unordered_forwarding() && cmd.getChain().empty())));
if (cmd.use_unordered_forwarding()) {
return cmd.from_distributor();
}
return ((_queue.size() < _maxQueueSize)
|| (_disable_queue_limits_for_chained_merges && !cmd.from_distributor()));
}

// Must be run from worker thread
Expand Down Expand Up @@ -866,7 +867,7 @@ MergeThrottler::processNewMergeCommand(
// index in the nodeset, immediately execute. Required for
// backwards compatibility with older distributor versions.
// TODO remove this
if (mergeCmd.getChain().empty()
if (mergeCmd.from_distributor()
&& !mergeCmd.use_unordered_forwarding()
&& (nodeSeq.getSortedNodes()[0].index != _component.getIndex()))
{
Expand Down
1 change: 1 addition & 0 deletions storageapi/src/vespa/storageapi/message/bucket.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ class MergeBucketCommand : public MaintenanceCommand {
_use_unordered_forwarding = unordered_forwarding;
}
[[nodiscard]] bool use_unordered_forwarding() const noexcept { return _use_unordered_forwarding; }
[[nodiscard]] bool from_distributor() const noexcept { return _chain.empty(); }
void print(std::ostream& out, bool verbose, const std::string& indent) const override;
DECLARE_STORAGECOMMAND(MergeBucketCommand, onMergeBucket)
};
Expand Down

0 comments on commit 31d7bb1

Please sign in to comment.