Skip to content

Commit

Permalink
Merge pull request ceph#56677 from athanatos/sjust/for-review/wip-rep…
Browse files Browse the repository at this point in the history
…lica-read

osd,crimson/osd: rework of replica read and related state

Reviewed-by: Matan Breizman <[email protected]>
  • Loading branch information
athanatos authored Nov 4, 2024
2 parents 0a586d2 + dda683b commit 048ce81
Show file tree
Hide file tree
Showing 38 changed files with 887 additions and 192 deletions.

This file was deleted.

8 changes: 8 additions & 0 deletions qa/tasks/rados.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ def task(ctx, config):
write_fadvise_dontneed: write behavior like with LIBRADOS_OP_FLAG_FADVISE_DONTNEED.
This mean data don't access in the near future.
Let osd backend don't keep data in cache.
pct_update_delay: delay before primary propogates pct on write pause,
defaults to 5s if balance_reads is set
For example::
Expand Down Expand Up @@ -139,6 +141,7 @@ def task(ctx, config):
object_size = int(config.get('object_size', 4000000))
op_weights = config.get('op_weights', {})
testdir = teuthology.get_testdir(ctx)
pct_update_delay = None
args = [
'adjust-ulimits',
'ceph-coverage',
Expand Down Expand Up @@ -166,6 +169,7 @@ def task(ctx, config):
args.extend(['--pool-snaps'])
if config.get('balance_reads', False):
args.extend(['--balance-reads'])
pct_update_delay = config.get('pct_update_delay', 5);
if config.get('localize_reads', False):
args.extend(['--localize-reads'])
if config.get('max_attr_len', None):
Expand Down Expand Up @@ -274,6 +278,10 @@ def thread():
if config.get('fast_read', False):
manager.raw_cluster_cmd(
'osd', 'pool', 'set', pool, 'fast_read', 'true')
if pct_update_delay:
manager.raw_cluster_cmd(
'osd', 'pool', 'set', pool,
'pct_update_delay', str(pct_update_delay));
min_size = config.get('min_size', None);
if min_size is not None:
manager.raw_cluster_cmd(
Expand Down
222 changes: 222 additions & 0 deletions src/common/intrusive_timer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab

#pragma once

#include <mutex>
#include <condition_variable>

#include <boost/intrusive/set.hpp>

#include "common/ceph_time.h"

namespace ceph::common {

/**
* intrusive_timer
*
* SafeTimer (common/Timer.h) isn't well suited to usage in high
* usage pathways for a few reasons:
* - Usage generally requires allocation of a fresh context for each
* scheduled operation. One could override Context::complete to avoid
* destroying the instance, but actually reusing the instance is tricky
* as SafeTimer doesn't guarrantee cancelation if safe_callbacks is false.
* - SafeTimer only guarrantees cancelation if safe_timer is true, which
* it generally won't be if the user needs to call into SafeTimer while
* holding locks taken by callbacks.
*
* This implementation allows the user to repeatedly schedule and cancel
* an object inheriting from the callback_t interface below while
* guarranteeing cancelation provided that the user holds the lock
* associated with a particular callback while calling into intrusive_timer.
*/
class intrusive_timer {
using clock_t = ceph::coarse_real_clock;

public:
/**
* callback_t
*
* Objects inheriting from callback_t can be scheduled
* via intrusive_timer.
*/
class callback_t : public boost::intrusive::set_base_hook<> {
friend class intrusive_timer;
clock_t::time_point schedule_point;
unsigned incarnation = 0;

public:
/**
* add_ref, dec_ref
*
* callback_t must remain live and all methods must remain
* safe to call as long as calls to add_ref() outnumber calls
* to dec_ref().
*/
virtual void add_ref() = 0;
virtual void dec_ref() = 0;

/**
* lock, unlock
*
* For any specific callback_t, must lock/unlock a lock held while
* accessing intrusive_timer public methods for that callback_t
* instance.
*/
virtual void lock() = 0;
virtual void unlock() = 0;

/// Invokes callback, will be called with lock held
virtual void invoke() = 0;

/**
* is_scheduled
*
* Return true iff callback is scheduled to be invoked.
* May only be validly invoked while lock associated with
* callback_t instance is held.
*/
bool is_scheduled() const { return incarnation % 2 == 1; }
virtual ~callback_t() = default;

/// Order callback_t by schedule_point
auto operator<=>(const callback_t &rhs) const {
return std::make_pair(schedule_point, this) <=>
std::make_pair(rhs.schedule_point, &rhs);
}
};

private:
/// protects events, stopping
std::mutex lock;

/// stopping, cv used to signal that t should halt
std::condition_variable cv;
bool stopping = false;

/// queued events ordered by callback_t::schedule_point
boost::intrusive::set<callback_t> events;

/// thread responsible for calling scheduled callbacks
std::thread t;

/// peek front of queue, null if empty
callback_t *peek() {
return events.empty() ? nullptr : &*(events.begin());
}

/// entry point for t
void _run() {
std::unique_lock l(lock);
while (true) {
if (stopping) {
return;
}

auto next = peek();
if (!next) {
cv.wait(l);
continue;
}

if (next->schedule_point > clock_t::now()) {
cv.wait_until(l, next->schedule_point);
continue;
}

// we release the reference below
events.erase(*next);

/* cancel() and schedule_after() both hold both intrusive_timer::lock
* and the callback_t lock (precondition of both) while mutating
* next->incarnation, so this read is safe. We're relying on the
* fact that only this method in this thread will access
* next->incarnation under only one of the two. */
auto incarnation = next->incarnation;
l.unlock();
{
/* Note that intrusive_timer::cancel may observe that
* callback_t::is_scheduled() returns true while
* callback_t::is_linked() is false since we drop
* intrusive_timer::lock between removing next from the
* queue and incrementing callback_t::incarnation here
* under the callback_t lock. In that case, cancel()
* increments incarnation logically canceling the callback
* but leaves the reference for us to drop.
*/
std::unique_lock m(*next);
if (next->incarnation == incarnation) {
/* As above, cancel() and schedule_after() hold both locks so this
* mutation and read are safe. */
++next->incarnation;
next->invoke();
}
/* else, next was canceled between l.unlock() and next->lock().
* Note that if incarnation does not match, we do nothing to next
* other than drop our reference -- it might well have been
* rescheduled already! */
}
next->dec_ref();
l.lock();
}
}

public:
intrusive_timer() : t([this] { _run(); }) {}

/**
* schedule_after
*
* Schedule cb to run after the specified period.
* The lock associated with cb must be held.
* cb must not already be scheduled.
*
* @param cb [in] callback to schedule
* @param after [in] period after which to schedule cb
*/
template <typename T>
void schedule_after(callback_t &cb, T after) {
ceph_assert(!cb.is_scheduled());
std::unique_lock l(lock);
ceph_assert(!cb.is_linked());

++cb.incarnation;
cb.schedule_point = clock_t::now() + after;

cb.add_ref();
events.insert(cb);

cv.notify_one();
}

/**
* cancel
*
* Cancel already scheduled cb.
* The lock associated with cb must be held.
*
* @param cb [in] callback to cancel
*/
void cancel(callback_t &cb) {
ceph_assert(cb.is_scheduled());
std::unique_lock l(lock);
++cb.incarnation;

if (cb.is_linked()) {
events.erase(cb);
cb.dec_ref();
}
}

/// Stop intrusive_timer
void stop() {
{
std::unique_lock l(lock);
stopping = true;
cv.notify_one();
}
t.join();
}
};

}
2 changes: 1 addition & 1 deletion src/crimson/osd/ops_executer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -828,7 +828,7 @@ void OpsExecuter::fill_op_params(OpsExecuter::modified_by m)
osd_op_params->mtime = msg->get_mtime();
osd_op_params->at_version = pg->get_next_version();
osd_op_params->pg_trim_to = pg->get_pg_trim_to();
osd_op_params->min_last_complete_ondisk = pg->get_min_last_complete_ondisk();
osd_op_params->pg_committed_to = pg->get_pg_committed_to();
osd_op_params->last_complete = pg->get_info().last_complete;
osd_op_params->user_modify = (m == modified_by::user);
}
Expand Down
15 changes: 13 additions & 2 deletions src/crimson/osd/osd_operations/client_request.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include "crimson/osd/osd_operations/client_request.h"
#include "crimson/osd/osd_connection_priv.h"
#include "osd/object_state_fmt.h"
#include "osd/osd_perf_counters.h"

SET_SUBSYS(osd);

Expand Down Expand Up @@ -190,15 +191,25 @@ ClientRequest::interruptible_future<> ClientRequest::with_pg_process_interruptib
DEBUGDPP("{}.{}: dropping misdirected op",
pg, *this, this_instance_id);
co_return;
} else if (const hobject_t& hoid = m->get_hobj();
!pg.get_peering_state().can_serve_replica_read(hoid)) {
}

pg.get_perf_logger().inc(l_osd_replica_read);
if (pg.is_unreadable_object(m->get_hobj())) {
DEBUGDPP("{}.{}: {} missing on replica, bouncing to primary",
pg, *this, this_instance_id, m->get_hobj());
pg.get_perf_logger().inc(l_osd_replica_read_redirect_missing);
co_await reply_op_error(pgref, -EAGAIN);
co_return;
} else if (!pg.get_peering_state().can_serve_replica_read(m->get_hobj())) {
DEBUGDPP("{}.{}: unstable write on replica, bouncing to primary",
pg, *this, this_instance_id);
pg.get_perf_logger().inc(l_osd_replica_read_redirect_conflict);
co_await reply_op_error(pgref, -EAGAIN);
co_return;
} else {
DEBUGDPP("{}.{}: serving replica read on oid {}",
pg, *this, this_instance_id, m->get_hobj());
pg.get_perf_logger().inc(l_osd_replica_read_served);
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/crimson/osd/osd_operations/osdop_params.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ struct osd_op_params_t {
utime_t mtime;
eversion_t at_version;
eversion_t pg_trim_to;
eversion_t min_last_complete_ondisk;
eversion_t pg_committed_to;
eversion_t last_complete;
bool user_modify = false;
ObjectCleanRegions clean_regions;
Expand Down
3 changes: 2 additions & 1 deletion src/crimson/osd/osd_operations/peering_event.cc
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,8 @@ void RemotePeeringEvent::on_pg_absent(ShardServices &shard_services)
ctx.send_notify(q.from.osd, {q.query.from, q.query.to,
q.query.epoch_sent,
map_epoch, empty,
PastIntervals{}});
PastIntervals{},
PG_FEATURE_CRIMSON_ALL});
}
}
}
Expand Down
Loading

0 comments on commit 048ce81

Please sign in to comment.