Skip to content

Commit

Permalink
timeout_end -> deadline
Browse files Browse the repository at this point in the history
  • Loading branch information
baldersheim committed Mar 1, 2022
1 parent c0e949b commit 4796fb9
Show file tree
Hide file tree
Showing 6 changed files with 15 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ class FileStorHandler : public MessageSender {
*
* @param stripe The stripe to get messages for
*/
virtual LockedMessage getNextMessage(uint32_t stripeId, vespalib::steady_time timeout_end) = 0;
virtual LockedMessage getNextMessage(uint32_t stripeId, vespalib::steady_time deadline) = 0;

/** Only used for testing, should be removed */
LockedMessage getNextMessage(uint32_t stripeId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -375,13 +375,13 @@ FileStorHandlerImpl::makeQueueTimeoutReply(api::StorageMessage& msg)
}

FileStorHandler::LockedMessage
FileStorHandlerImpl::getNextMessage(uint32_t stripeId, vespalib::steady_time timeout_end)
FileStorHandlerImpl::getNextMessage(uint32_t stripeId, vespalib::steady_time deadline)
{
if (!tryHandlePause()) {
return {}; // Still paused, return to allow tick.
}

return _stripes[stripeId].getNextMessage(timeout_end);
return _stripes[stripeId].getNextMessage(deadline);
}

std::shared_ptr<FileStorHandler::BucketLockInterface>
Expand Down Expand Up @@ -917,7 +917,7 @@ FileStorHandlerImpl::Stripe::operation_type_should_be_throttled(api::MessageType
}

FileStorHandler::LockedMessage
FileStorHandlerImpl::Stripe::getNextMessage(vespalib::steady_time timeout_end)
FileStorHandlerImpl::Stripe::getNextMessage(vespalib::steady_time deadline)
{
std::unique_lock guard(*_lock);
ThrottleToken throttle_token;
Expand Down Expand Up @@ -953,12 +953,12 @@ FileStorHandlerImpl::Stripe::getNextMessage(vespalib::steady_time timeout_end)
// Depending on whether we were blocked due to no usable ops in queue or throttling,
// wait for either the queue or throttler to (hopefully) have some fresh stuff for us.
if (!was_throttled) {
_cond->wait_until(guard, timeout_end);
_cond->wait_until(guard, deadline);
} else {
// Have to release lock before doing a blocking throttle token fetch, since it
// prevents RPC threads from pushing onto the queue.
guard.unlock();
throttle_token = _owner.operation_throttler().blocking_acquire_one(timeout_end);
throttle_token = _owner.operation_throttler().blocking_acquire_one(deadline);
guard.lock();
if (!throttle_token.valid()) {
_metrics->timeouts_waiting_for_throttle_token.inc();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ class FileStorHandlerImpl final
std::shared_ptr<FileStorHandler::BucketLockInterface> lock(const document::Bucket & bucket, api::LockingRequirements lockReq);
void failOperations(const document::Bucket & bucket, const api::ReturnCode & code);

FileStorHandler::LockedMessage getNextMessage(vespalib::steady_time timeout_end);
FileStorHandler::LockedMessage getNextMessage(vespalib::steady_time deadline);
void dumpQueue(std::ostream & os) const;
void dumpActiveHtml(std::ostream & os) const;
void dumpQueueHtml(std::ostream & os) const;
Expand Down Expand Up @@ -203,7 +203,7 @@ class FileStorHandlerImpl final
bool schedule(const std::shared_ptr<api::StorageMessage>&) override;
ScheduleAsyncResult schedule_and_get_next_async_message(const std::shared_ptr<api::StorageMessage>& msg) override;

FileStorHandler::LockedMessage getNextMessage(uint32_t stripeId, vespalib::steady_time timeout_end) override;
FileStorHandler::LockedMessage getNextMessage(uint32_t stripeId, vespalib::steady_time deadline) override;

void remapQueueAfterJoin(const RemapInfo& source, RemapInfo& target) override;
void remapQueueAfterSplit(const RemapInfo& source, RemapInfo& target1, RemapInfo& target2) override;
Expand Down
4 changes: 2 additions & 2 deletions storage/src/vespa/storage/persistence/persistencethread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ PersistenceThread::run(framework::ThreadHandle& thread)
vespalib::steady_time now = vespalib::steady_clock::now();
thread.registerTick(framework::UNKNOWN_CYCLE, now);

vespalib::steady_time doom = now + max_wait_time;
FileStorHandler::LockedMessage lock(_fileStorHandler.getNextMessage(_stripeId, doom));
vespalib::steady_time deadline = now + max_wait_time;
FileStorHandler::LockedMessage lock(_fileStorHandler.getNextMessage(_stripeId, deadline));

if (lock.lock) {
_persistenceHandler.processLockedMessage(std::move(lock));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ class DynamicOperationThrottler final : public SharedOperationThrottler {
~DynamicOperationThrottler() override;

Token blocking_acquire_one() noexcept override;
Token blocking_acquire_one(vespalib::steady_time timeout_end) noexcept override;
Token blocking_acquire_one(vespalib::steady_time deadline) noexcept override;
Token try_acquire_one() noexcept override;
uint32_t current_window_size() const noexcept override;
uint32_t current_active_token_count() const noexcept override;
Expand Down Expand Up @@ -334,12 +334,12 @@ DynamicOperationThrottler::blocking_acquire_one() noexcept
}

DynamicOperationThrottler::Token
DynamicOperationThrottler::blocking_acquire_one(vespalib::steady_time timeout_end) noexcept
DynamicOperationThrottler::blocking_acquire_one(vespalib::steady_time deadline) noexcept
{
std::unique_lock lock(_mutex);
if (!has_spare_capacity_in_active_window()) {
++_waiting_threads;
const bool accepted = _cond.wait_until(lock, timeout_end, [&] {
const bool accepted = _cond.wait_until(lock, deadline, [&] {
return has_spare_capacity_in_active_window();
});
--_waiting_threads;
Expand Down
4 changes: 2 additions & 2 deletions vespalib/src/vespa/vespalib/util/shared_operation_throttler.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@ class SharedOperationThrottler {
// Acquire a valid throttling token, uninterruptedly blocking until one can be obtained.
[[nodiscard]] virtual Token blocking_acquire_one() noexcept = 0;
// Attempt to acquire a valid throttling token, waiting up to `timeout` for one to be
// available. If the timeout is exceeded without any tokens becoming available, an
// available. If the deadline is reached without any tokens becoming available, an
// invalid token will be returned.
[[nodiscard]] virtual Token blocking_acquire_one(vespalib::steady_time timeout_end) noexcept = 0;
[[nodiscard]] virtual Token blocking_acquire_one(vespalib::steady_time deadline) noexcept = 0;
// Attempt to acquire a valid throttling token if one is immediately available.
// An invalid token will be returned if none is available. Never blocks (other than
// when contending for the internal throttler mutex).
Expand Down

0 comments on commit 4796fb9

Please sign in to comment.