diff --git a/qa/suites/rados/thrash-erasure-code/workloads/ec-small-objects-balanced.yaml b/qa/suites/rados/thrash-erasure-code/workloads/ec-small-objects-balanced.yaml deleted file mode 100644 index af0ac39310e73..0000000000000 --- a/qa/suites/rados/thrash-erasure-code/workloads/ec-small-objects-balanced.yaml +++ /dev/null @@ -1,21 +0,0 @@ -tasks: -- rados: - clients: [client.0] - ops: 400000 - max_seconds: 600 - max_in_flight: 64 - objects: 1024 - size: 16384 - ec_pool: true - balanced_reads: true - op_weights: - read: 100 - write: 0 - append: 100 - delete: 50 - snap_create: 50 - snap_remove: 50 - rollback: 50 - copy_from: 50 - setattr: 25 - rmattr: 25 diff --git a/qa/tasks/rados.py b/qa/tasks/rados.py index d8eac5d886fe8..96bcc770511a1 100644 --- a/qa/tasks/rados.py +++ b/qa/tasks/rados.py @@ -36,6 +36,8 @@ def task(ctx, config): write_fadvise_dontneed: write behavior like with LIBRADOS_OP_FLAG_FADVISE_DONTNEED. This mean data don't access in the near future. Let osd backend don't keep data in cache. + pct_update_delay: delay before primary propogates pct on write pause, + defaults to 5s if balance_reads is set For example:: @@ -139,6 +141,7 @@ def task(ctx, config): object_size = int(config.get('object_size', 4000000)) op_weights = config.get('op_weights', {}) testdir = teuthology.get_testdir(ctx) + pct_update_delay = None args = [ 'adjust-ulimits', 'ceph-coverage', @@ -166,6 +169,7 @@ def task(ctx, config): args.extend(['--pool-snaps']) if config.get('balance_reads', False): args.extend(['--balance-reads']) + pct_update_delay = config.get('pct_update_delay', 5); if config.get('localize_reads', False): args.extend(['--localize-reads']) if config.get('max_attr_len', None): @@ -274,6 +278,10 @@ def thread(): if config.get('fast_read', False): manager.raw_cluster_cmd( 'osd', 'pool', 'set', pool, 'fast_read', 'true') + if pct_update_delay: + manager.raw_cluster_cmd( + 'osd', 'pool', 'set', pool, + 'pct_update_delay', str(pct_update_delay)); min_size = config.get('min_size', None); if min_size is not None: manager.raw_cluster_cmd( diff --git a/src/common/intrusive_timer.h b/src/common/intrusive_timer.h new file mode 100644 index 0000000000000..b32286a209633 --- /dev/null +++ b/src/common/intrusive_timer.h @@ -0,0 +1,222 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include +#include + +#include + +#include "common/ceph_time.h" + +namespace ceph::common { + +/** + * intrusive_timer + * + * SafeTimer (common/Timer.h) isn't well suited to usage in high + * usage pathways for a few reasons: + * - Usage generally requires allocation of a fresh context for each + * scheduled operation. One could override Context::complete to avoid + * destroying the instance, but actually reusing the instance is tricky + * as SafeTimer doesn't guarrantee cancelation if safe_callbacks is false. + * - SafeTimer only guarrantees cancelation if safe_timer is true, which + * it generally won't be if the user needs to call into SafeTimer while + * holding locks taken by callbacks. + * + * This implementation allows the user to repeatedly schedule and cancel + * an object inheriting from the callback_t interface below while + * guarranteeing cancelation provided that the user holds the lock + * associated with a particular callback while calling into intrusive_timer. + */ +class intrusive_timer { + using clock_t = ceph::coarse_real_clock; + +public: + /** + * callback_t + * + * Objects inheriting from callback_t can be scheduled + * via intrusive_timer. + */ + class callback_t : public boost::intrusive::set_base_hook<> { + friend class intrusive_timer; + clock_t::time_point schedule_point; + unsigned incarnation = 0; + + public: + /** + * add_ref, dec_ref + * + * callback_t must remain live and all methods must remain + * safe to call as long as calls to add_ref() outnumber calls + * to dec_ref(). + */ + virtual void add_ref() = 0; + virtual void dec_ref() = 0; + + /** + * lock, unlock + * + * For any specific callback_t, must lock/unlock a lock held while + * accessing intrusive_timer public methods for that callback_t + * instance. + */ + virtual void lock() = 0; + virtual void unlock() = 0; + + /// Invokes callback, will be called with lock held + virtual void invoke() = 0; + + /** + * is_scheduled + * + * Return true iff callback is scheduled to be invoked. + * May only be validly invoked while lock associated with + * callback_t instance is held. + */ + bool is_scheduled() const { return incarnation % 2 == 1; } + virtual ~callback_t() = default; + + /// Order callback_t by schedule_point + auto operator<=>(const callback_t &rhs) const { + return std::make_pair(schedule_point, this) <=> + std::make_pair(rhs.schedule_point, &rhs); + } + }; + +private: + /// protects events, stopping + std::mutex lock; + + /// stopping, cv used to signal that t should halt + std::condition_variable cv; + bool stopping = false; + + /// queued events ordered by callback_t::schedule_point + boost::intrusive::set events; + + /// thread responsible for calling scheduled callbacks + std::thread t; + + /// peek front of queue, null if empty + callback_t *peek() { + return events.empty() ? nullptr : &*(events.begin()); + } + + /// entry point for t + void _run() { + std::unique_lock l(lock); + while (true) { + if (stopping) { + return; + } + + auto next = peek(); + if (!next) { + cv.wait(l); + continue; + } + + if (next->schedule_point > clock_t::now()) { + cv.wait_until(l, next->schedule_point); + continue; + } + + // we release the reference below + events.erase(*next); + + /* cancel() and schedule_after() both hold both intrusive_timer::lock + * and the callback_t lock (precondition of both) while mutating + * next->incarnation, so this read is safe. We're relying on the + * fact that only this method in this thread will access + * next->incarnation under only one of the two. */ + auto incarnation = next->incarnation; + l.unlock(); + { + /* Note that intrusive_timer::cancel may observe that + * callback_t::is_scheduled() returns true while + * callback_t::is_linked() is false since we drop + * intrusive_timer::lock between removing next from the + * queue and incrementing callback_t::incarnation here + * under the callback_t lock. In that case, cancel() + * increments incarnation logically canceling the callback + * but leaves the reference for us to drop. + */ + std::unique_lock m(*next); + if (next->incarnation == incarnation) { + /* As above, cancel() and schedule_after() hold both locks so this + * mutation and read are safe. */ + ++next->incarnation; + next->invoke(); + } + /* else, next was canceled between l.unlock() and next->lock(). + * Note that if incarnation does not match, we do nothing to next + * other than drop our reference -- it might well have been + * rescheduled already! */ + } + next->dec_ref(); + l.lock(); + } + } + +public: + intrusive_timer() : t([this] { _run(); }) {} + + /** + * schedule_after + * + * Schedule cb to run after the specified period. + * The lock associated with cb must be held. + * cb must not already be scheduled. + * + * @param cb [in] callback to schedule + * @param after [in] period after which to schedule cb + */ + template + void schedule_after(callback_t &cb, T after) { + ceph_assert(!cb.is_scheduled()); + std::unique_lock l(lock); + ceph_assert(!cb.is_linked()); + + ++cb.incarnation; + cb.schedule_point = clock_t::now() + after; + + cb.add_ref(); + events.insert(cb); + + cv.notify_one(); + } + + /** + * cancel + * + * Cancel already scheduled cb. + * The lock associated with cb must be held. + * + * @param cb [in] callback to cancel + */ + void cancel(callback_t &cb) { + ceph_assert(cb.is_scheduled()); + std::unique_lock l(lock); + ++cb.incarnation; + + if (cb.is_linked()) { + events.erase(cb); + cb.dec_ref(); + } + } + + /// Stop intrusive_timer + void stop() { + { + std::unique_lock l(lock); + stopping = true; + cv.notify_one(); + } + t.join(); + } +}; + +} diff --git a/src/crimson/osd/ops_executer.cc b/src/crimson/osd/ops_executer.cc index 9bf60140374c8..4e735c3b4cb96 100644 --- a/src/crimson/osd/ops_executer.cc +++ b/src/crimson/osd/ops_executer.cc @@ -828,7 +828,7 @@ void OpsExecuter::fill_op_params(OpsExecuter::modified_by m) osd_op_params->mtime = msg->get_mtime(); osd_op_params->at_version = pg->get_next_version(); osd_op_params->pg_trim_to = pg->get_pg_trim_to(); - osd_op_params->min_last_complete_ondisk = pg->get_min_last_complete_ondisk(); + osd_op_params->pg_committed_to = pg->get_pg_committed_to(); osd_op_params->last_complete = pg->get_info().last_complete; osd_op_params->user_modify = (m == modified_by::user); } diff --git a/src/crimson/osd/osd_operations/client_request.cc b/src/crimson/osd/osd_operations/client_request.cc index 7c79795f224cb..a40db28f05381 100644 --- a/src/crimson/osd/osd_operations/client_request.cc +++ b/src/crimson/osd/osd_operations/client_request.cc @@ -14,6 +14,7 @@ #include "crimson/osd/osd_operations/client_request.h" #include "crimson/osd/osd_connection_priv.h" #include "osd/object_state_fmt.h" +#include "osd/osd_perf_counters.h" SET_SUBSYS(osd); @@ -190,15 +191,25 @@ ClientRequest::interruptible_future<> ClientRequest::with_pg_process_interruptib DEBUGDPP("{}.{}: dropping misdirected op", pg, *this, this_instance_id); co_return; - } else if (const hobject_t& hoid = m->get_hobj(); - !pg.get_peering_state().can_serve_replica_read(hoid)) { + } + + pg.get_perf_logger().inc(l_osd_replica_read); + if (pg.is_unreadable_object(m->get_hobj())) { + DEBUGDPP("{}.{}: {} missing on replica, bouncing to primary", + pg, *this, this_instance_id, m->get_hobj()); + pg.get_perf_logger().inc(l_osd_replica_read_redirect_missing); + co_await reply_op_error(pgref, -EAGAIN); + co_return; + } else if (!pg.get_peering_state().can_serve_replica_read(m->get_hobj())) { DEBUGDPP("{}.{}: unstable write on replica, bouncing to primary", pg, *this, this_instance_id); + pg.get_perf_logger().inc(l_osd_replica_read_redirect_conflict); co_await reply_op_error(pgref, -EAGAIN); co_return; } else { DEBUGDPP("{}.{}: serving replica read on oid {}", pg, *this, this_instance_id, m->get_hobj()); + pg.get_perf_logger().inc(l_osd_replica_read_served); } } diff --git a/src/crimson/osd/osd_operations/osdop_params.h b/src/crimson/osd/osd_operations/osdop_params.h index 102cb7fff6b61..14202582100b3 100644 --- a/src/crimson/osd/osd_operations/osdop_params.h +++ b/src/crimson/osd/osd_operations/osdop_params.h @@ -12,7 +12,7 @@ struct osd_op_params_t { utime_t mtime; eversion_t at_version; eversion_t pg_trim_to; - eversion_t min_last_complete_ondisk; + eversion_t pg_committed_to; eversion_t last_complete; bool user_modify = false; ObjectCleanRegions clean_regions; diff --git a/src/crimson/osd/osd_operations/peering_event.cc b/src/crimson/osd/osd_operations/peering_event.cc index a8d9fce69b611..fb5696b0a9e3e 100644 --- a/src/crimson/osd/osd_operations/peering_event.cc +++ b/src/crimson/osd/osd_operations/peering_event.cc @@ -166,7 +166,8 @@ void RemotePeeringEvent::on_pg_absent(ShardServices &shard_services) ctx.send_notify(q.from.osd, {q.query.from, q.query.to, q.query.epoch_sent, map_epoch, empty, - PastIntervals{}}); + PastIntervals{}, + PG_FEATURE_CRIMSON_ALL}); } } } diff --git a/src/crimson/osd/pg.cc b/src/crimson/osd/pg.cc index 5492e05f25ede..af4a147bf4151 100644 --- a/src/crimson/osd/pg.cc +++ b/src/crimson/osd/pg.cc @@ -132,6 +132,7 @@ PG::PG( pool, name), osdmap, + PG_FEATURE_CRIMSON_ALL, this, this), scrubber(*this), @@ -1040,7 +1041,7 @@ PG::interruptible_future PG::submit_error_log( ceph::os::Transaction t; peering_state.merge_new_log_entries( log_entries, t, peering_state.get_pg_trim_to(), - peering_state.get_min_last_complete_ondisk()); + peering_state.get_pg_committed_to()); return seastar::do_with(log_entries, set{}, [this, t=std::move(t), rep_tid](auto& log_entries, auto& waiting_on) mutable { @@ -1061,7 +1062,7 @@ PG::interruptible_future PG::submit_error_log( get_last_peering_reset(), rep_tid, peering_state.get_pg_trim_to(), - peering_state.get_min_last_complete_ondisk()); + peering_state.get_pg_committed_to()); waiting_on.insert(peer); logger().debug("submit_error_log: sending log" "missing_request (rep_tid: {} entries: {})" @@ -1278,7 +1279,7 @@ PG::interruptible_future<> PG::handle_rep_op(Ref req) log_operation(std::move(log_entries), req->pg_trim_to, req->version, - req->min_last_complete_ondisk, + req->pg_committed_to, !txn.empty(), txn, false); @@ -1324,13 +1325,13 @@ void PG::log_operation( std::vector&& logv, const eversion_t &trim_to, const eversion_t &roll_forward_to, - const eversion_t &min_last_complete_ondisk, + const eversion_t &pg_committed_to, bool transaction_applied, ObjectStore::Transaction &txn, bool async) { logger().debug("{}", __func__); if (is_primary()) { - ceph_assert(trim_to <= peering_state.get_last_update_ondisk()); + ceph_assert(trim_to <= peering_state.get_pg_committed_to()); } /* TODO: when we add snap mapper and projected log support, * we'll likely want to update them here. @@ -1354,7 +1355,7 @@ void PG::log_operation( peering_state.append_log(std::move(logv), trim_to, roll_forward_to, - min_last_complete_ondisk, + pg_committed_to, txn, !txn.empty(), false); @@ -1393,17 +1394,17 @@ PG::interruptible_future<> PG::do_update_log_missing( ceph_assert(m->get_type() == MSG_OSD_PG_UPDATE_LOG_MISSING); ObjectStore::Transaction t; - std::optional op_trim_to, op_roll_forward_to; + std::optional op_trim_to, op_pg_committed_to; if (m->pg_trim_to != eversion_t()) op_trim_to = m->pg_trim_to; - if (m->pg_roll_forward_to != eversion_t()) - op_roll_forward_to = m->pg_roll_forward_to; - logger().debug("op_trim_to = {}, op_roll_forward_to = {}", + if (m->pg_committed_to != eversion_t()) + op_pg_committed_to = m->pg_committed_to; + logger().debug("op_trim_to = {}, op_pg_committed_to = {}", op_trim_to.has_value() ? *op_trim_to : eversion_t(), - op_roll_forward_to.has_value() ? *op_roll_forward_to : eversion_t()); + op_pg_committed_to.has_value() ? *op_pg_committed_to : eversion_t()); peering_state.append_log_entries_update_missing( - m->entries, t, op_trim_to, op_roll_forward_to); + m->entries, t, op_trim_to, op_pg_committed_to); return interruptor::make_interruptible(shard_services.get_store().do_transaction( coll_ref, std::move(t))).then_interruptible( diff --git a/src/crimson/osd/pg.h b/src/crimson/osd/pg.h index 604f49005ff04..c5e24a6c21d47 100644 --- a/src/crimson/osd/pg.h +++ b/src/crimson/osd/pg.h @@ -129,8 +129,8 @@ class PG : public boost::intrusive_ref_counter< return peering_state.get_pg_trim_to(); } - eversion_t get_min_last_complete_ondisk() const { - return peering_state.get_min_last_complete_ondisk(); + eversion_t get_pg_committed_to() const { + return peering_state.get_pg_committed_to(); } const pg_info_t& get_info() const final { @@ -603,7 +603,7 @@ class PG : public boost::intrusive_ref_counter< std::vector&& logv, const eversion_t &trim_to, const eversion_t &roll_forward_to, - const eversion_t &min_last_complete_ondisk, + const eversion_t &pg_commited_to, bool transaction_applied, ObjectStore::Transaction &txn, bool async = false); diff --git a/src/crimson/osd/replicated_backend.cc b/src/crimson/osd/replicated_backend.cc index cbb8c883e0752..12ee38b437054 100644 --- a/src/crimson/osd/replicated_backend.cc +++ b/src/crimson/osd/replicated_backend.cc @@ -84,7 +84,7 @@ ReplicatedBackend::submit_transaction(const std::set& pg_shards, pending_txn->second.acked_peers.push_back({pg_shard, eversion_t{}}); encode(log_entries, m->logbl); m->pg_trim_to = osd_op_p.pg_trim_to; - m->min_last_complete_ondisk = osd_op_p.min_last_complete_ondisk; + m->pg_committed_to = osd_op_p.pg_committed_to; m->pg_stats = pg.get_info().stats; // TODO: set more stuff. e.g., pg_states sends->emplace_back( @@ -99,7 +99,7 @@ ReplicatedBackend::submit_transaction(const std::set& pg_shards, std::move(log_entries), osd_op_p.pg_trim_to, osd_op_p.at_version, - osd_op_p.min_last_complete_ondisk, + osd_op_p.pg_committed_to, true, txn, false); diff --git a/src/messages/MOSDPGPCT.h b/src/messages/MOSDPGPCT.h new file mode 100644 index 0000000000000..b3f88314ec3e3 --- /dev/null +++ b/src/messages/MOSDPGPCT.h @@ -0,0 +1,99 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2024 IBM, Red Hat + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#pragma once + + +#include "MOSDFastDispatchOp.h" + +class MOSDPGPCT final : public MOSDFastDispatchOp { +private: + static constexpr int HEAD_VERSION = 1; + static constexpr int COMPAT_VERSION = 1; + +public: + /// epoch at which the message was sent + epoch_t map_epoch = 0; + + /// start epoch of the interval in which the message was sent + epoch_t min_epoch = 0; + + /// target pg + spg_t pgid; + + /** + * pg_committed_to + * + * Propagates PeeringState::pg_committed_to to replicas as with + * MOSDRepOp, ECSubWrite, MOSDPGPCT. + */ + eversion_t pg_committed_to; + + epoch_t get_map_epoch() const override { + return map_epoch; + } + epoch_t get_min_epoch() const override { + return min_epoch; + } + spg_t get_spg() const override { + return pgid; + } + + MOSDPGPCT() + : MOSDFastDispatchOp{MSG_OSD_PG_PCT, HEAD_VERSION, + COMPAT_VERSION} {} + MOSDPGPCT( + spg_t pgid, + epoch_t epoch, + epoch_t min_epoch, + eversion_t pg_committed_to) + : MOSDFastDispatchOp{MSG_OSD_PG_PCT, HEAD_VERSION, + COMPAT_VERSION}, + map_epoch(epoch), + min_epoch(min_epoch), + pgid(pgid), + pg_committed_to(pg_committed_to) + {} + +private: + ~MOSDPGPCT() final {} + +public: + std::string_view get_type_name() const override { return "PGPCT"; } + void print(std::ostream& out) const override { + out << "pg_pct(" << pgid << " epoch " << map_epoch + << "/" << min_epoch + << " pg_committed_to " << pg_committed_to + << ")"; + } + + void encode_payload(uint64_t features) override { + using ceph::encode; + encode(map_epoch, payload); + encode(min_epoch, payload); + encode(pgid, payload); + encode(pg_committed_to, payload); + } + void decode_payload() override { + using ceph::decode; + auto p = payload.cbegin(); + decode(map_epoch, p); + decode(min_epoch, p); + decode(pgid, p); + decode(pg_committed_to, p); + } +private: + template + friend boost::intrusive_ptr ceph::make_message(Args&&... args); +}; diff --git a/src/messages/MOSDPGUpdateLogMissing.h b/src/messages/MOSDPGUpdateLogMissing.h index 2a0011e8fb7c8..ebe678c6c313f 100644 --- a/src/messages/MOSDPGUpdateLogMissing.h +++ b/src/messages/MOSDPGUpdateLogMissing.h @@ -31,7 +31,23 @@ class MOSDPGUpdateLogMissing final : public MOSDFastDispatchOp { mempool::osd_pglog::list entries; // piggybacked osd/pg state eversion_t pg_trim_to; // primary->replica: trim to here - eversion_t pg_roll_forward_to; // primary->replica: trim rollback info to here + + /** + * pg_committed_to + * + * Propagates PeeringState::pg_committed_to to replicas as with + * MOSDRepOp, ECSubWrite + * + * Historical Note: Prior to early 2024, this field was named + * pg_roll_forward_to. pg_committed_to is a safe value to rollforward to as + * it is a conservative bound on versions that can become divergent. Switching + * it to be populated by pg_committed_to rather than mlcod mirrors MOSDRepOp + * and upgrade cases in both directions should be safe as mlcod is <= pct + * and replicas (both ec and replicated) only actually rely on versions <= this + * field being non-divergent. This note may be removed in main after U is + * released. + */ + eversion_t pg_committed_to; epoch_t get_epoch() const { return map_epoch; } spg_t get_pgid() const { return pgid; } @@ -59,7 +75,7 @@ class MOSDPGUpdateLogMissing final : public MOSDFastDispatchOp { epoch_t min_epoch, ceph_tid_t rep_tid, eversion_t pg_trim_to, - eversion_t pg_roll_forward_to) + eversion_t pg_committed_to) : MOSDFastDispatchOp{MSG_OSD_PG_UPDATE_LOG_MISSING, HEAD_VERSION, COMPAT_VERSION}, map_epoch(epoch), @@ -69,7 +85,7 @@ class MOSDPGUpdateLogMissing final : public MOSDFastDispatchOp { rep_tid(rep_tid), entries(entries), pg_trim_to(pg_trim_to), - pg_roll_forward_to(pg_roll_forward_to) + pg_committed_to(pg_committed_to) {} private: @@ -83,7 +99,7 @@ class MOSDPGUpdateLogMissing final : public MOSDFastDispatchOp { << " rep_tid " << rep_tid << " entries " << entries << " trim_to " << pg_trim_to - << " roll_forward_to " << pg_roll_forward_to + << " pg_committed_to " << pg_committed_to << ")"; } @@ -96,7 +112,7 @@ class MOSDPGUpdateLogMissing final : public MOSDFastDispatchOp { encode(entries, payload); encode(min_epoch, payload); encode(pg_trim_to, payload); - encode(pg_roll_forward_to, payload); + encode(pg_committed_to, payload); } void decode_payload() override { using ceph::decode; @@ -113,7 +129,7 @@ class MOSDPGUpdateLogMissing final : public MOSDFastDispatchOp { } if (header.version >= 3) { decode(pg_trim_to, p); - decode(pg_roll_forward_to, p); + decode(pg_committed_to, p); } } private: diff --git a/src/messages/MOSDRepOp.h b/src/messages/MOSDRepOp.h index ecfe3294d1c73..5e8b386ba0a51 100644 --- a/src/messages/MOSDRepOp.h +++ b/src/messages/MOSDRepOp.h @@ -54,7 +54,30 @@ class MOSDRepOp final : public MOSDFastDispatchOp { // piggybacked osd/og state eversion_t pg_trim_to; // primary->replica: trim to here - eversion_t min_last_complete_ondisk; // lower bound on committed version + + /** + * pg_committed_to + * + * Used by the primary to propagate pg_committed_to to replicas for use in + * serving replica reads. + * + * Because updates <= pg_committed_to cannot become divergent, replicas + * may safely serve reads on objects which do not have more recent updates. + * + * See PeeringState::pg_committed_to, PeeringState::can_serve_replica_read + * + * Historical note: Prior to early 2024, this field was named + * min_last_complete_ondisk. The replica, however, only actually relied on + * a single property of this field -- that any objects not modified since + * mlcod couldn't have uncommitted state. Weakening the field to the condition + * above is therefore safe -- mlcod is always <= pg_committed_to and + * sending pg_committed_to to a replica expecting mlcod will work correctly + * as it only actually uses mlcod to check replica reads. The primary difference + * between mlcod and pg_committed_to is simply that mlcod doesn't advance past + * objects missing on replicas, but we check for that anyway. This note may be + * removed in main after U is released. + */ + eversion_t pg_committed_to; hobject_t new_temp_oid; ///< new temp object that we must now start tracking hobject_t discard_temp_oid; ///< previously used temp object that we can now stop tracking @@ -110,14 +133,8 @@ class MOSDRepOp final : public MOSDFastDispatchOp { decode(from, p); decode(updated_hit_set_history, p); - if (header.version >= 3) { - decode(min_last_complete_ondisk, p); - } else { - /* This field used to mean pg_roll_foward_to, but ReplicatedBackend - * simply assumes that we're rolling foward to version. */ - eversion_t pg_roll_forward_to; - decode(pg_roll_forward_to, p); - } + ceph_assert(header.version >= 3); + decode(pg_committed_to, p); final_decode_needed = false; } @@ -141,7 +158,7 @@ class MOSDRepOp final : public MOSDFastDispatchOp { encode(discard_temp_oid, payload); encode(from, payload); encode(updated_hit_set_history, payload); - encode(min_last_complete_ondisk, payload); + encode(pg_committed_to, payload); } MOSDRepOp() @@ -164,10 +181,6 @@ class MOSDRepOp final : public MOSDFastDispatchOp { set_tid(rtid); } - void set_rollback_to(const eversion_t &rollback_to) { - header.version = 2; - min_last_complete_ondisk = rollback_to; - } private: ~MOSDRepOp() final {} @@ -180,11 +193,7 @@ class MOSDRepOp final : public MOSDFastDispatchOp { out << " " << poid << " v " << version; if (updated_hit_set_history) out << ", has_updated_hit_set_history"; - if (header.version < 3) { - out << ", rollback_to(legacy)=" << min_last_complete_ondisk; - } else { - out << ", mlcod=" << min_last_complete_ondisk; - } + out << ", pct=" << pg_committed_to; } out << ")"; } diff --git a/src/mon/MonCommands.h b/src/mon/MonCommands.h index 3cc3c8abd1ea5..1a5d1ebd73762 100644 --- a/src/mon/MonCommands.h +++ b/src/mon/MonCommands.h @@ -1161,11 +1161,11 @@ COMMAND("osd pool rename " "rename to ", "osd", "rw") COMMAND("osd pool get " "name=pool,type=CephPoolname " - "name=var,type=CephChoices,strings=size|min_size|pg_num|pgp_num|crush_rule|hashpspool|nodelete|nopgchange|nosizechange|write_fadvise_dontneed|noscrub|nodeep-scrub|hit_set_type|hit_set_period|hit_set_count|hit_set_fpp|use_gmt_hitset|target_max_objects|target_max_bytes|cache_target_dirty_ratio|cache_target_dirty_high_ratio|cache_target_full_ratio|cache_min_flush_age|cache_min_evict_age|erasure_code_profile|min_read_recency_for_promote|all|min_write_recency_for_promote|fast_read|hit_set_grade_decay_rate|hit_set_search_last_n|scrub_min_interval|scrub_max_interval|deep_scrub_interval|recovery_priority|recovery_op_priority|scrub_priority|compression_mode|compression_algorithm|compression_required_ratio|compression_max_blob_size|compression_min_blob_size|csum_type|csum_min_block|csum_max_block|allow_ec_overwrites|fingerprint_algorithm|pg_autoscale_mode|pg_autoscale_bias|pg_num_min|pg_num_max|target_size_bytes|target_size_ratio|dedup_tier|dedup_chunk_algorithm|dedup_cdc_chunk_size|eio|bulk|read_ratio", + "name=var,type=CephChoices,strings=size|min_size|pg_num|pgp_num|crush_rule|hashpspool|nodelete|nopgchange|nosizechange|write_fadvise_dontneed|noscrub|nodeep-scrub|hit_set_type|hit_set_period|hit_set_count|hit_set_fpp|use_gmt_hitset|target_max_objects|target_max_bytes|cache_target_dirty_ratio|cache_target_dirty_high_ratio|cache_target_full_ratio|cache_min_flush_age|cache_min_evict_age|erasure_code_profile|min_read_recency_for_promote|all|min_write_recency_for_promote|fast_read|hit_set_grade_decay_rate|hit_set_search_last_n|scrub_min_interval|scrub_max_interval|deep_scrub_interval|recovery_priority|recovery_op_priority|scrub_priority|compression_mode|compression_algorithm|compression_required_ratio|compression_max_blob_size|compression_min_blob_size|csum_type|csum_min_block|csum_max_block|allow_ec_overwrites|fingerprint_algorithm|pg_autoscale_mode|pg_autoscale_bias|pg_num_min|pg_num_max|target_size_bytes|target_size_ratio|dedup_tier|dedup_chunk_algorithm|dedup_cdc_chunk_size|eio|bulk|read_ratio|pct_update_delay", "get pool parameter ", "osd", "r") COMMAND("osd pool set " "name=pool,type=CephPoolname " - "name=var,type=CephChoices,strings=size|min_size|pg_num|pgp_num|pgp_num_actual|crush_rule|hashpspool|nodelete|nopgchange|nosizechange|write_fadvise_dontneed|noscrub|nodeep-scrub|hit_set_type|hit_set_period|hit_set_count|hit_set_fpp|use_gmt_hitset|target_max_bytes|target_max_objects|cache_target_dirty_ratio|cache_target_dirty_high_ratio|cache_target_full_ratio|cache_min_flush_age|cache_min_evict_age|min_read_recency_for_promote|min_write_recency_for_promote|fast_read|hit_set_grade_decay_rate|hit_set_search_last_n|scrub_min_interval|scrub_max_interval|deep_scrub_interval|recovery_priority|recovery_op_priority|scrub_priority|compression_mode|compression_algorithm|compression_required_ratio|compression_max_blob_size|compression_min_blob_size|csum_type|csum_min_block|csum_max_block|allow_ec_overwrites|fingerprint_algorithm|pg_autoscale_mode|pg_autoscale_bias|pg_num_min|pg_num_max|target_size_bytes|target_size_ratio|dedup_tier|dedup_chunk_algorithm|dedup_cdc_chunk_size|eio|bulk|read_ratio " + "name=var,type=CephChoices,strings=size|min_size|pg_num|pgp_num|pgp_num_actual|crush_rule|hashpspool|nodelete|nopgchange|nosizechange|write_fadvise_dontneed|noscrub|nodeep-scrub|hit_set_type|hit_set_period|hit_set_count|hit_set_fpp|use_gmt_hitset|target_max_bytes|target_max_objects|cache_target_dirty_ratio|cache_target_dirty_high_ratio|cache_target_full_ratio|cache_min_flush_age|cache_min_evict_age|min_read_recency_for_promote|min_write_recency_for_promote|fast_read|hit_set_grade_decay_rate|hit_set_search_last_n|scrub_min_interval|scrub_max_interval|deep_scrub_interval|recovery_priority|recovery_op_priority|scrub_priority|compression_mode|compression_algorithm|compression_required_ratio|compression_max_blob_size|compression_min_blob_size|csum_type|csum_min_block|csum_max_block|allow_ec_overwrites|fingerprint_algorithm|pg_autoscale_mode|pg_autoscale_bias|pg_num_min|pg_num_max|target_size_bytes|target_size_ratio|dedup_tier|dedup_chunk_algorithm|dedup_cdc_chunk_size|eio|bulk|read_ratio|pct_update_delay " "name=val,type=CephString " "name=yes_i_really_mean_it,type=CephBool,req=false", "set pool parameter to ", "osd", "rw") diff --git a/src/msg/Message.cc b/src/msg/Message.cc index f649e0f3d3ee2..50af00db28dd6 100644 --- a/src/msg/Message.cc +++ b/src/msg/Message.cc @@ -219,6 +219,8 @@ #include "messages/MOSDPGUpdateLogMissing.h" #include "messages/MOSDPGUpdateLogMissingReply.h" +#include "messages/MOSDPGPCT.h" + #include "messages/MNVMeofGwBeacon.h" #include "messages/MNVMeofGwMap.h" @@ -549,6 +551,9 @@ Message *decode_message(CephContext *cct, case MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY: m = make_message(); break; + case MSG_OSD_PG_PCT: + m = make_message(); + break; case CEPH_MSG_OSD_BACKOFF: m = make_message(); break; diff --git a/src/msg/Message.h b/src/msg/Message.h index bb67ff3eef5d6..80d2295c89f69 100644 --- a/src/msg/Message.h +++ b/src/msg/Message.h @@ -136,6 +136,8 @@ #define MSG_OSD_PG_UPDATE_LOG_MISSING 114 #define MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY 115 +#define MSG_OSD_PG_PCT 136 + #define MSG_OSD_PG_CREATED 116 #define MSG_OSD_REP_SCRUBMAP 117 #define MSG_OSD_PG_RECOVERY_DELETE 118 diff --git a/src/osd/ECBackend.cc b/src/osd/ECBackend.cc index beb9eacfd2a53..fa2570aba42af 100644 --- a/src/osd/ECBackend.cc +++ b/src/osd/ECBackend.cc @@ -983,8 +983,8 @@ void ECBackend::handle_sub_write( std::move(op.log_entries), op.updated_hit_set_history, op.trim_to, - op.roll_forward_to, - op.roll_forward_to, + op.pg_committed_to, + op.pg_committed_to, !op.backfill_or_async_recovery, localt, async); @@ -1470,7 +1470,7 @@ void ECBackend::submit_transaction( const eversion_t &at_version, PGTransactionUPtr &&t, const eversion_t &trim_to, - const eversion_t &min_last_complete_ondisk, + const eversion_t &pg_committed_to, vector&& log_entries, std::optional &hset_history, Context *on_all_commit, @@ -1485,7 +1485,15 @@ void ECBackend::submit_transaction( op->delta_stats = delta_stats; op->version = at_version; op->trim_to = trim_to; - op->roll_forward_to = std::max(min_last_complete_ondisk, rmw_pipeline.committed_to); + /* We update PeeringState::pg_committed_to via the callback + * invoked from ECBackend::handle_sub_write_reply immediately + * before updating rmw_pipeline.commited_to via + * rmw_pipeline.check_ops()->try_finish_rmw(), so these will + * *usually* match. However, the PrimaryLogPG::submit_log_entries + * pathway can perform an out-of-band log update which updates + * PeeringState::pg_committed_to independently. Thus, the value + * passed in is the right one to use. */ + op->pg_committed_to = pg_committed_to; op->log_entries = log_entries; std::swap(op->updated_hit_set_history, hset_history); op->on_all_commit = on_all_commit; diff --git a/src/osd/ECBackend.h b/src/osd/ECBackend.h index 910cdc064e4ff..46317b6083268 100644 --- a/src/osd/ECBackend.h +++ b/src/osd/ECBackend.h @@ -106,7 +106,7 @@ class ECBackend : public PGBackend, public ECCommon { const eversion_t &at_version, PGTransactionUPtr &&t, const eversion_t &trim_to, - const eversion_t &min_last_complete_ondisk, + const eversion_t &pg_committed_to, std::vector&& log_entries, std::optional &hset_history, Context *on_all_commit, diff --git a/src/osd/ECCommon.cc b/src/osd/ECCommon.cc index 1fc8761050298..609ac3141ae30 100644 --- a/src/osd/ECCommon.cc +++ b/src/osd/ECCommon.cc @@ -158,7 +158,7 @@ ostream &operator<<(ostream &lhs, const ECCommon::RMWPipeline::Op &rhs) rhs.client_op->get_req()->print(lhs); } #endif - lhs << " roll_forward_to=" << rhs.roll_forward_to + lhs << " pg_committed_to=" << rhs.pg_committed_to << " temp_added=" << rhs.temp_added << " temp_cleared=" << rhs.temp_cleared << " pending_read=" << rhs.pending_read @@ -895,7 +895,7 @@ bool ECCommon::RMWPipeline::try_reads_to_commit() should_send ? iter->second : empty, op->version, op->trim_to, - op->roll_forward_to, + op->pg_committed_to, op->log_entries, op->updated_hit_set_history, op->temp_added, @@ -970,8 +970,8 @@ bool ECCommon::RMWPipeline::try_finish_rmw() dout(10) << __func__ << ": " << *op << dendl; dout(20) << __func__ << ": " << cache << dendl; - if (op->roll_forward_to > completed_to) - completed_to = op->roll_forward_to; + if (op->pg_committed_to > completed_to) + completed_to = op->pg_committed_to; if (op->version > committed_to) committed_to = op->version; @@ -984,7 +984,7 @@ bool ECCommon::RMWPipeline::try_finish_rmw() auto nop = std::make_unique(); nop->hoid = op->hoid; nop->trim_to = op->trim_to; - nop->roll_forward_to = op->version; + nop->pg_committed_to = op->version; nop->tid = tid; nop->reqid = op->reqid; waiting_reads.push_back(*nop); diff --git a/src/osd/ECCommon.h b/src/osd/ECCommon.h index 88f2940111ec9..7ff9cae7646ab 100644 --- a/src/osd/ECCommon.h +++ b/src/osd/ECCommon.h @@ -200,7 +200,7 @@ struct ECListener { const std::optional &hset_history, const eversion_t &trim_to, const eversion_t &roll_forward_to, - const eversion_t &min_last_complete_ondisk, + const eversion_t &pg_committed_to, bool transaction_applied, ceph::os::Transaction &t, bool async = false) = 0; @@ -522,7 +522,17 @@ struct ECCommon { osd_reqid_t reqid; ZTracer::Trace trace; - eversion_t roll_forward_to; /// Soon to be generated internally + /** + * pg_commited_to + * + * Represents a version v such that all v' < v handled by RMWPipeline + * have fully committed. This may actually lag + * PeeringState::pg_committed_to if PrimaryLogPG::submit_log_entries + * submits an out-of-band log update. + * + * Soon to be generated internally. + */ + eversion_t pg_committed_to; /// Ancillary also provided from submit_transaction caller std::map obc_map; diff --git a/src/osd/ECMsgTypes.cc b/src/osd/ECMsgTypes.cc index a656766432f57..ae0636f7d492a 100644 --- a/src/osd/ECMsgTypes.cc +++ b/src/osd/ECMsgTypes.cc @@ -37,7 +37,7 @@ void ECSubWrite::encode(bufferlist &bl) const encode(temp_added, bl); encode(temp_removed, bl); encode(updated_hit_set_history, bl); - encode(roll_forward_to, bl); + encode(pg_committed_to, bl); encode(backfill_or_async_recovery, bl); ENCODE_FINISH(bl); } @@ -60,9 +60,9 @@ void ECSubWrite::decode(bufferlist::const_iterator &bl) decode(updated_hit_set_history, bl); } if (struct_v >= 3) { - decode(roll_forward_to, bl); + decode(pg_committed_to, bl); } else { - roll_forward_to = trim_to; + pg_committed_to = trim_to; } if (struct_v >= 4) { decode(backfill_or_async_recovery, bl); @@ -80,7 +80,7 @@ std::ostream &operator<<( << ", reqid=" << rhs.reqid << ", at_version=" << rhs.at_version << ", trim_to=" << rhs.trim_to - << ", roll_forward_to=" << rhs.roll_forward_to; + << ", pg_committed_to=" << rhs.pg_committed_to; if (rhs.updated_hit_set_history) lhs << ", has_updated_hit_set_history"; if (rhs.backfill_or_async_recovery) @@ -94,7 +94,7 @@ void ECSubWrite::dump(Formatter *f) const f->dump_stream("reqid") << reqid; f->dump_stream("at_version") << at_version; f->dump_stream("trim_to") << trim_to; - f->dump_stream("roll_forward_to") << roll_forward_to; + f->dump_stream("pg_committed_to") << pg_committed_to; f->dump_bool("has_updated_hit_set_history", static_cast(updated_hit_set_history)); f->dump_bool("backfill_or_async_recovery", backfill_or_async_recovery); @@ -116,7 +116,7 @@ void ECSubWrite::generate_test_instances(list &o) o.back()->reqid = osd_reqid_t(entity_name_t::CLIENT(123), 1, 45678); o.back()->at_version = eversion_t(10, 300); o.back()->trim_to = eversion_t(5, 42); - o.back()->roll_forward_to = eversion_t(8, 250); + o.back()->pg_committed_to = eversion_t(8, 250); } void ECSubWriteReply::encode(bufferlist &bl) const diff --git a/src/osd/ECMsgTypes.h b/src/osd/ECMsgTypes.h index 2d0bc5c122161..d0df1ad6fa153 100644 --- a/src/osd/ECMsgTypes.h +++ b/src/osd/ECMsgTypes.h @@ -31,7 +31,7 @@ struct ECSubWrite { ObjectStore::Transaction t; eversion_t at_version; eversion_t trim_to; - eversion_t roll_forward_to; + eversion_t pg_committed_to; std::vector log_entries; std::set temp_added; std::set temp_removed; @@ -47,7 +47,7 @@ struct ECSubWrite { const ObjectStore::Transaction &t, eversion_t at_version, eversion_t trim_to, - eversion_t roll_forward_to, + eversion_t pg_committed_to, std::vector log_entries, std::optional updated_hit_set_history, const std::set &temp_added, @@ -56,7 +56,7 @@ struct ECSubWrite { : from(from), tid(tid), reqid(reqid), soid(soid), stats(stats), t(t), at_version(at_version), - trim_to(trim_to), roll_forward_to(roll_forward_to), + trim_to(trim_to), pg_committed_to(pg_committed_to), log_entries(log_entries), temp_added(temp_added), temp_removed(temp_removed), @@ -72,7 +72,7 @@ struct ECSubWrite { t.swap(other.t); at_version = other.at_version; trim_to = other.trim_to; - roll_forward_to = other.roll_forward_to; + pg_committed_to = other.pg_committed_to; log_entries.swap(other.log_entries); temp_added.swap(other.temp_added); temp_removed.swap(other.temp_removed); diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index be69745765baf..bbcc64fa02e69 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -504,6 +504,8 @@ void OSDService::shutdown_reserver() void OSDService::shutdown() { + pg_timer.stop(); + mono_timer.suspend(); { @@ -9501,7 +9503,8 @@ void OSD::handle_pg_query_nopg(const MQuery& q) q.query.epoch_sent, osdmap->get_epoch(), empty, - PastIntervals()}; + PastIntervals(), + PG_FEATURE_CLASSIC_ALL}; m = new MOSDPGNotify2(spg_t{pgid.pgid, q.query.from}, std::move(notify)); } diff --git a/src/osd/OSD.h b/src/osd/OSD.h index c825c19b1ff1f..25ca723680867 100644 --- a/src/osd/OSD.h +++ b/src/osd/OSD.h @@ -48,6 +48,7 @@ #include "include/unordered_map.h" +#include "common/intrusive_timer.h" #include "common/shared_cache.hpp" #include "common/simple_cache.hpp" #include "messages/MOSDOp.h" @@ -878,6 +879,8 @@ class OSDService : public Scrub::ScrubSchedListener { bool prepare_to_stop(); void got_stop_ack(); + // -- PG timer -- + common::intrusive_timer pg_timer; #ifdef PG_DEBUG_REFS ceph::mutex pgid_lock = ceph::make_mutex("OSDService::pgid_lock"); @@ -1942,6 +1945,7 @@ class OSD : public Dispatcher, case MSG_OSD_REP_SCRUBMAP: case MSG_OSD_PG_UPDATE_LOG_MISSING: case MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY: + case MSG_OSD_PG_PCT: case MSG_OSD_PG_RECOVERY_DELETE: case MSG_OSD_PG_RECOVERY_DELETE_REPLY: case MSG_OSD_PG_LEASE: diff --git a/src/osd/PG.cc b/src/osd/PG.cc index 71b9b71338506..307651fd62729 100644 --- a/src/osd/PG.cc +++ b/src/osd/PG.cc @@ -43,6 +43,7 @@ #include "messages/MOSDECSubOpReadReply.h" #include "messages/MOSDPGUpdateLogMissing.h" #include "messages/MOSDPGUpdateLogMissingReply.h" +#include "messages/MOSDPGPCT.h" #include "messages/MOSDBackoff.h" #include "messages/MOSDScrubReserve.h" #include "messages/MOSDRepOp.h" @@ -212,6 +213,7 @@ PG::PG(OSDService *o, OSDMapRef curmap, p, _pool, curmap, + PG_FEATURE_CLASSIC_ALL, this, this), pool(recovery_state.get_pgpool()), @@ -2091,6 +2093,9 @@ bool PG::can_discard_request(OpRequestRef& op) case MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY: return can_discard_replica_op< MOSDPGUpdateLogMissingReply, MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY>(op); + case MSG_OSD_PG_PCT: + return can_discard_replica_op< + MOSDPGPCT, MSG_OSD_PG_PCT>(op); case MSG_OSD_PG_SCAN: return can_discard_scan(op); diff --git a/src/osd/PGBackend.h b/src/osd/PGBackend.h index 9cbb5e8e97ce8..b87aa1da6771b 100644 --- a/src/osd/PGBackend.h +++ b/src/osd/PGBackend.h @@ -20,6 +20,8 @@ #include "ECCommon.h" #include "osd_types.h" +#include "pg_features.h" +#include "common/intrusive_timer.h" #include "common/WorkQueue.h" #include "include/Context.h" #include "os/ObjectStore.h" @@ -136,6 +138,17 @@ typedef std::shared_ptr OSDMapRef; eversion_t v, Context *on_complete) = 0; + /** + * pg_lock, pg_unlock, pg_add_ref, pg_dec_ref + * + * Utilities for locking and manipulating refcounts on + * implementation. + */ + virtual void pg_lock() = 0; + virtual void pg_unlock() = 0; + virtual void pg_add_ref() = 0; + virtual void pg_dec_ref() = 0; + /** * Bless a context * @@ -193,6 +206,7 @@ typedef std::shared_ptr OSDMapRef; virtual epoch_t pgb_get_osdmap_epoch() const = 0; virtual const pg_info_t &get_info() const = 0; virtual const pg_pool_t &get_pool() const = 0; + virtual eversion_t get_pg_committed_to() const = 0; virtual ObjectContextRef get_obc( const hobject_t &hoid, @@ -219,7 +233,7 @@ typedef std::shared_ptr OSDMapRef; const std::optional &hset_history, const eversion_t &trim_to, const eversion_t &roll_forward_to, - const eversion_t &min_last_complete_ondisk, + const eversion_t &pg_committed_to, bool transaction_applied, ObjectStore::Transaction &t, bool async = false) = 0; @@ -240,6 +254,9 @@ typedef std::shared_ptr OSDMapRef; virtual void update_last_complete_ondisk( eversion_t lcod) = 0; + virtual void update_pct( + eversion_t pct) = 0; + virtual void update_stats( const pg_stat_t &stat) = 0; @@ -247,6 +264,8 @@ typedef std::shared_ptr OSDMapRef; GenContext *c, uint64_t cost) = 0; + virtual common::intrusive_timer &get_pg_timer() = 0; + virtual pg_shard_t whoami_shard() const = 0; int whoami() const { return whoami_shard().osd; @@ -259,6 +278,7 @@ typedef std::shared_ptr OSDMapRef; virtual pg_shard_t primary_shard() const = 0; virtual uint64_t min_peer_features() const = 0; virtual uint64_t min_upacting_features() const = 0; + virtual pg_feature_vec_t get_pg_acting_features() const = 0; virtual hobject_t get_temp_recovery_object(const hobject_t& target, eversion_t version) = 0; @@ -435,8 +455,8 @@ typedef std::shared_ptr OSDMapRef; const eversion_t &at_version, ///< [in] version PGTransactionUPtr &&t, ///< [in] trans to execute (move) const eversion_t &trim_to, ///< [in] trim log to here - const eversion_t &min_last_complete_ondisk, ///< [in] lower bound on - /// committed version + const eversion_t &pg_committed_to, ///< [in] lower bound on + /// committed version std::vector&& log_entries, ///< [in] log entries for t /// [in] hitset history (if updated with this transaction) std::optional &hset_history, diff --git a/src/osd/PeeringState.cc b/src/osd/PeeringState.cc index 22222b7f7af58..334d202d207a9 100644 --- a/src/osd/PeeringState.cc +++ b/src/osd/PeeringState.cc @@ -109,6 +109,7 @@ PeeringState::PeeringState( spg_t spgid, const PGPool &_pool, OSDMapRef curmap, + pg_feature_vec_t supported_pg_acting_features, DoutPrefixProvider *dpp, PeeringListener *pl) : state_history(*pl), @@ -122,6 +123,8 @@ PeeringState::PeeringState( pg_whoami(pg_whoami), info(spgid), pg_log(cct), + local_pg_acting_features(supported_pg_acting_features), + pg_acting_features(local_pg_acting_features), last_require_osd_release(curmap->require_osd_release), missing_loc(spgid, this, dpp, cct), machine(this, cct, spgid, dpp, pl, &state_history) @@ -314,9 +317,11 @@ void PeeringState::query_unfound(Formatter *f, string state) return; } -bool PeeringState::proc_replica_info( - pg_shard_t from, const pg_info_t &oinfo, epoch_t send_epoch) +bool PeeringState::proc_replica_notify(const pg_shard_t &from, const pg_notify_t ¬ify) { + const pg_info_t &oinfo = notify.info; + const epoch_t send_epoch = notify.epoch_sent; + auto p = peer_info.find(from); if (p != peer_info.end() && p->second.last_update == oinfo.last_update) { psdout(10) << " got dup osd." << from << " info " @@ -346,6 +351,10 @@ bool PeeringState::proc_replica_info( } } + if (is_acting(from)) { + pg_acting_features &= notify.pg_features; + } + // was this a new info? if so, update peers! if (p == peer_info.end()) update_heartbeat_peers(); @@ -746,6 +755,7 @@ void PeeringState::on_new_interval() // initialize features acting_features = CEPH_FEATURES_SUPPORTED_DEFAULT; upacting_features = CEPH_FEATURES_SUPPORTED_DEFAULT; + pg_acting_features = local_pg_acting_features; for (auto p = acting.begin(); p != acting.end(); ++p) { if (*p == CRUSH_ITEM_NONE) continue; @@ -900,7 +910,7 @@ void PeeringState::clear_primary_state() clear_recovery_state(); - last_update_ondisk = eversion_t(); + pg_committed_to = eversion_t(); missing_loc.clear(); pl->clear_primary_state(); } @@ -1404,9 +1414,8 @@ bool PeeringState::needs_backfill() const bool PeeringState::can_serve_replica_read(const hobject_t &hoid) { ceph_assert(!is_primary()); - eversion_t min_last_complete_ondisk = get_min_last_complete_ondisk(); if (!pg_log.get_log().has_write_since( - hoid, min_last_complete_ondisk)) { + hoid, pg_committed_to)) { psdout(20) << "can be safely read on this replica" << dendl; return true; } else { @@ -2663,6 +2672,10 @@ void PeeringState::activate( info.last_epoch_started <= activation_epoch); info.last_epoch_started = activation_epoch; info.last_interval_started = info.history.same_interval_since; + + // updating last_epoch_started ensures that last_update will not + // become divergent after activation completes. + pg_committed_to = info.last_update; } } else if (is_acting(pg_whoami)) { /* update last_epoch_started on acting replica to whatever the primary sent @@ -2671,15 +2684,16 @@ void PeeringState::activate( if (info.last_epoch_started < activation_epoch) { info.last_epoch_started = activation_epoch; info.last_interval_started = info.history.same_interval_since; + + // updating last_epoch_started ensures that last_update will not + // become divergent after activation completes. + pg_committed_to = info.last_update; } } auto &missing = pg_log.get_missing(); min_last_complete_ondisk = eversion_t(0,0); // we don't know (yet)! - if (is_primary()) { - last_update_ondisk = info.last_update; - } last_update_applied = info.last_update; last_rollback_info_trimmed_to_applied = pg_log.get_can_rollback_to(); @@ -3201,7 +3215,8 @@ void PeeringState::fulfill_query(const MQuery& query, PeeringCtxWrapper &rctx) query.query_epoch, get_osdmap_epoch(), notify_info.second, - past_intervals)); + past_intervals, + local_pg_acting_features)); } else { update_history(query.query.history); fulfill_log(query.from, query.query, query.query_epoch); @@ -4070,7 +4085,7 @@ void PeeringState::update_stats_wo_resched( bool PeeringState::append_log_entries_update_missing( const mempool::osd_pglog::list &entries, ObjectStore::Transaction &t, std::optional trim_to, - std::optional roll_forward_to) + std::optional pg_committed_to) { ceph_assert(!entries.empty()); ceph_assert(entries.begin()->version > info.last_update); @@ -4082,12 +4097,12 @@ bool PeeringState::append_log_entries_update_missing( entries, rollbacker.get()); - if (roll_forward_to && entries.rbegin()->soid > info.last_backfill) { + if (pg_committed_to && entries.rbegin()->soid > info.last_backfill) { pg_log.roll_forward(rollbacker.get()); } - if (roll_forward_to && *roll_forward_to > pg_log.get_can_rollback_to()) { - pg_log.roll_forward_to(*roll_forward_to, rollbacker.get()); - last_rollback_info_trimmed_to_applied = *roll_forward_to; + if (pg_committed_to && *pg_committed_to > pg_log.get_can_rollback_to()) { + pg_log.roll_forward_to(*pg_committed_to, rollbacker.get()); + last_rollback_info_trimmed_to_applied = *pg_committed_to; } info.last_update = pg_log.get_head(); @@ -4111,12 +4126,13 @@ void PeeringState::merge_new_log_entries( const mempool::osd_pglog::list &entries, ObjectStore::Transaction &t, std::optional trim_to, - std::optional roll_forward_to) + std::optional pg_committed_to) { psdout(10) << entries << dendl; ceph_assert(is_primary()); - bool rebuild_missing = append_log_entries_update_missing(entries, t, trim_to, roll_forward_to); + bool rebuild_missing = append_log_entries_update_missing( + entries, t, trim_to, pg_committed_to); for (auto i = acting_recovery_backfill.begin(); i != acting_recovery_backfill.end(); ++i) { @@ -4184,7 +4200,7 @@ void PeeringState::append_log( vector&& logv, eversion_t trim_to, eversion_t roll_forward_to, - eversion_t mlcod, + eversion_t pct, ObjectStore::Transaction &t, bool transaction_applied, bool async) @@ -4250,7 +4266,7 @@ void PeeringState::append_log( write_if_dirty(t); if (!is_primary()) - min_last_complete_ondisk = mlcod; + pg_committed_to = pct; } void PeeringState::recover_got( @@ -4437,7 +4453,7 @@ void PeeringState::recovery_committed_to(eversion_t version) void PeeringState::complete_write(eversion_t v, eversion_t lc) { - last_update_ondisk = v; + pg_committed_to = v; last_complete_ondisk = lc; calc_min_last_complete_ondisk(); } @@ -4484,7 +4500,7 @@ void PeeringState::calc_trim_to_aggressive() eversion_t limit = std::min({ pg_log.get_head(), pg_log.get_can_rollback_to(), - last_update_ondisk}); + pg_committed_to}); psdout(10) << "limit = " << limit << dendl; if (limit != eversion_t() && @@ -4644,8 +4660,7 @@ PeeringState::Initial::Initial(my_context ctx) boost::statechart::result PeeringState::Initial::react(const MNotifyRec& notify) { DECLARE_LOCALS; - ps->proc_replica_info( - notify.from, notify.notify.info, notify.notify.epoch_sent); + ps->proc_replica_notify(notify.from, notify.notify); ps->set_last_peering_reset(); return transit< Primary >(); } @@ -4798,7 +4813,8 @@ boost::statechart::result PeeringState::Reset::react(const ActMap&) ps->get_osdmap_epoch(), ps->get_osdmap_epoch(), ps->info, - ps->past_intervals)); + ps->past_intervals, + ps->local_pg_acting_features)); } ps->update_heartbeat_peers(); @@ -4888,8 +4904,7 @@ boost::statechart::result PeeringState::Primary::react(const MNotifyRec& notevt) { DECLARE_LOCALS; psdout(7) << "handle_pg_notify from osd." << notevt.from << dendl; - ps->proc_replica_info( - notevt.from, notevt.notify.info, notevt.notify.epoch_sent); + ps->proc_replica_notify(notevt.from, notevt.notify); return discard_event(); } @@ -6104,10 +6119,9 @@ boost::statechart::result PeeringState::Active::react(const MNotifyRec& notevt) << dendl; } else { psdout(10) << "Active: got notify from " << notevt.from - << ", calling proc_replica_info and discover_all_missing" + << ", calling proc_replica_notify and discover_all_missing" << dendl; - ps->proc_replica_info( - notevt.from, notevt.notify.info, notevt.notify.epoch_sent); + ps->proc_replica_notify(notevt.from, notevt.notify); if (ps->have_unfound() || (ps->is_degraded() && ps->might_have_unfound.count(notevt.from))) { ps->discover_all_missing( context().get_recovery_ctx().msgs); @@ -6530,7 +6544,8 @@ boost::statechart::result PeeringState::ReplicaActive::react(const ActMap&) ps->get_osdmap_epoch(), ps->get_osdmap_epoch(), ps->info, - ps->past_intervals)); + ps->past_intervals, + ps->local_pg_acting_features)); } return discard_event(); } @@ -6667,7 +6682,8 @@ boost::statechart::result PeeringState::Stray::react(const ActMap&) ps->get_osdmap_epoch(), ps->get_osdmap_epoch(), ps->info, - ps->past_intervals)); + ps->past_intervals, + ps->local_pg_acting_features)); } return discard_event(); } @@ -6866,8 +6882,7 @@ boost::statechart::result PeeringState::GetInfo::react(const MNotifyRec& infoevt } epoch_t old_start = ps->info.history.last_epoch_started; - if (ps->proc_replica_info( - infoevt.from, infoevt.notify.info, infoevt.notify.epoch_sent)) { + if (ps->proc_replica_notify(infoevt.from, infoevt.notify)) { // we got something new ... PastIntervals::PriorSet &prior_set = context< Peering >().prior_set; if (old_start < ps->info.history.last_epoch_started) { @@ -6898,6 +6913,7 @@ boost::statechart::result PeeringState::GetInfo::react(const MNotifyRec& infoevt psdout(20) << "Common peer features: " << hex << ps->get_min_peer_features() << dec << dendl; psdout(20) << "Common acting features: " << hex << ps->get_min_acting_features() << dec << dendl; psdout(20) << "Common upacting features: " << hex << ps->get_min_upacting_features() << dec << dendl; + psdout(20) << "Common pg_acting_features: " << hex << ps->get_pg_acting_features() << dec << dendl; post_event(GotInfo()); } } @@ -7260,8 +7276,7 @@ boost::statechart::result PeeringState::Incomplete::react(const AdvMap &advmap) boost::statechart::result PeeringState::Incomplete::react(const MNotifyRec& notevt) { DECLARE_LOCALS; psdout(7) << "handle_pg_notify from osd." << notevt.from << dendl; - if (ps->proc_replica_info( - notevt.from, notevt.notify.info, notevt.notify.epoch_sent)) { + if (ps->proc_replica_notify(notevt.from, notevt.notify)) { // We got something new, try again! return transit< GetLog >(); } else { @@ -7551,8 +7566,8 @@ ostream &operator<<(ostream &out, const PeeringState &ps) { } if (ps.is_peered()) { - if (ps.last_update_ondisk != ps.info.last_update) - out << " luod=" << ps.last_update_ondisk; + if (ps.pg_committed_to != ps.info.last_update) + out << " pct=" << ps.pg_committed_to; if (ps.last_update_applied != ps.info.last_update) out << " lua=" << ps.last_update_applied; } @@ -7575,7 +7590,8 @@ ostream &operator<<(ostream &out, const PeeringState &ps) { if (ps.last_complete_ondisk != ps.info.last_complete) out << " lcod " << ps.last_complete_ondisk; - out << " mlcod " << ps.min_last_complete_ondisk; + if (ps.is_primary()) + out << " mlcod " << ps.min_last_complete_ondisk; out << " " << pg_state_string(ps.get_state()); if (ps.should_send_notify()) diff --git a/src/osd/PeeringState.h b/src/osd/PeeringState.h index 11ac084a054b3..4b5285b18786f 100644 --- a/src/osd/PeeringState.h +++ b/src/osd/PeeringState.h @@ -1470,8 +1470,24 @@ class PeeringState : public MissingLoc::MappingInfo { epoch_t last_peering_reset = 0; ///< epoch of last peering reset - /// last_update that has committed; ONLY DEFINED WHEN is_active() - eversion_t last_update_ondisk; + /** + * pg_committed_to + * + * Maintained on the primary while pg is active (and not merely peered). + * + * Forall e <= pg_committed_to, e has been committed on all replicas. + * + * As a consequence: + * - No version e <= pg_committed_to can become divergent + * - It is safe for replicas to read any object whose most recent update is + * <= pg_committed_to + * + * Note that if the PG is only peered, pg_committed_to not be set + * and will remain eversion_t{} as we cannot guarantee that last_update + * at activation will not later become divergent. + */ + eversion_t pg_committed_to; + eversion_t last_complete_ondisk; ///< last_complete that has committed. eversion_t last_update_applied; ///< last_update readable /// last version to which rollback_info trimming has been applied @@ -1491,6 +1507,18 @@ class PeeringState : public MissingLoc::MappingInfo { std::set peer_log_requested; ///< logs i've requested (and start stamps) std::set peer_missing_requested; ///< missing sets requested + /// not constexpr because classic/crimson might differ + const pg_feature_vec_t local_pg_acting_features; + + /** + * acting_pg_features + * + * PG specific features common to entire acting set. Valid only on primary + * after activation. + */ + pg_feature_vec_t pg_acting_features; + + /// features supported by all peers uint64_t peer_features = CEPH_FEATURES_SUPPORTED_DEFAULT; /// features supported by acting set @@ -1541,8 +1569,7 @@ class PeeringState : public MissingLoc::MappingInfo { void update_heartbeat_peers(); void query_unfound(Formatter *f, std::string state); - bool proc_replica_info( - pg_shard_t from, const pg_info_t &oinfo, epoch_t send_epoch); + bool proc_replica_notify(const pg_shard_t &from, const pg_notify_t ¬ify); void remove_down_peer_info(const OSDMapRef &osdmap); void check_recovery_sources(const OSDMapRef& map); void set_last_peering_reset(); @@ -1750,6 +1777,7 @@ class PeeringState : public MissingLoc::MappingInfo { spg_t spgid, const PGPool &pool, OSDMapRef curmap, + pg_feature_vec_t supported_pg_acting_features, DoutPrefixProvider *dpp, PeeringListener *pl); @@ -1899,18 +1927,7 @@ class PeeringState : public MissingLoc::MappingInfo { const mempool::osd_pglog::list &entries, ObjectStore::Transaction &t, std::optional trim_to, - std::optional roll_forward_to); - - void append_log_with_trim_to_updated( - std::vector&& log_entries, - eversion_t roll_forward_to, - ObjectStore::Transaction &t, - bool transaction_applied, - bool async) { - update_trim_to(); - append_log(std::move(log_entries), pg_trim_to, roll_forward_to, - min_last_complete_ondisk, t, transaction_applied, async); - } + std::optional pg_committed_to); /** * Updates local log to reflect new write from primary. @@ -1919,11 +1936,21 @@ class PeeringState : public MissingLoc::MappingInfo { std::vector&& logv, eversion_t trim_to, eversion_t roll_forward_to, - eversion_t min_last_complete_ondisk, + eversion_t pg_committed_to, ObjectStore::Transaction &t, bool transaction_applied, bool async); + /** + * update_pct + * + * Updates pg_committed_to. Generally invoked on replica on + * receipt of MODPGPCT from primary. + */ + void update_pct(eversion_t pct) { + pg_committed_to = pct; + } + /** * retrieve the min last_backfill among backfill targets */ @@ -1937,7 +1964,7 @@ class PeeringState : public MissingLoc::MappingInfo { const mempool::osd_pglog::list &entries, ObjectStore::Transaction &t, std::optional trim_to, - std::optional roll_forward_to); + std::optional pg_committed_to); /// Update missing set to reflect e (TODOSAM: not sure why this is needed) void add_local_next_event(const pg_log_entry_t& e) { @@ -2412,10 +2439,6 @@ class PeeringState : public MissingLoc::MappingInfo { return missing_loc.get_missing_by_count(); } - eversion_t get_min_last_complete_ondisk() const { - return min_last_complete_ondisk; - } - eversion_t get_pg_trim_to() const { return pg_trim_to; } @@ -2424,8 +2447,8 @@ class PeeringState : public MissingLoc::MappingInfo { return last_update_applied; } - eversion_t get_last_update_ondisk() const { - return last_update_ondisk; + eversion_t get_pg_committed_to() const { + return pg_committed_to; } bool debug_has_dirty_state() const { @@ -2467,6 +2490,8 @@ class PeeringState : public MissingLoc::MappingInfo { /// Get feature vector common to up/acting set uint64_t get_min_upacting_features() const { return upacting_features; } + /// Get pg features common to acting set + pg_feature_vec_t get_pg_acting_features() const { return pg_acting_features; } // Flush control interface private: diff --git a/src/osd/PrimaryLogPG.cc b/src/osd/PrimaryLogPG.cc index 2f2ae4a22dbe1..14d2f85f40f03 100644 --- a/src/osd/PrimaryLogPG.cc +++ b/src/osd/PrimaryLogPG.cc @@ -543,6 +543,11 @@ void PrimaryLogPG::schedule_recovery_work( recovery_state.get_recovery_op_priority()); } +common::intrusive_timer &PrimaryLogPG::get_pg_timer() +{ + return osd->pg_timer; +} + void PrimaryLogPG::replica_clear_repop_obc( const vector &logv, ObjectStore::Transaction &t) @@ -2053,6 +2058,10 @@ void PrimaryLogPG::do_op(OpRequestRef& op) } } + if (!is_primary()) { + osd->logger->inc(l_osd_replica_read); + } + if (!check_laggy(op)) { return; } @@ -2183,6 +2192,7 @@ void PrimaryLogPG::do_op(OpRequestRef& op) // missing object? if (is_unreadable_object(head)) { if (!is_primary()) { + osd->logger->inc(l_osd_replica_read_redirect_missing); osd->reply_op_error(op, -EAGAIN); return; } @@ -2314,11 +2324,13 @@ void PrimaryLogPG::do_op(OpRequestRef& op) dout(20) << __func__ << ": unstable write on replica, bouncing to primary " << *m << dendl; + osd->logger->inc(l_osd_replica_read_redirect_conflict); osd->reply_op_error(op, -EAGAIN); return; } dout(20) << __func__ << ": serving replica read on oid " << oid << dendl; + osd->logger->inc(l_osd_replica_read_served); } int r = find_object_context( @@ -11491,7 +11503,7 @@ void PrimaryLogPG::issue_repop(RepGather *repop, OpContext *ctx) ctx->at_version, std::move(ctx->op_t), recovery_state.get_pg_trim_to(), - recovery_state.get_min_last_complete_ondisk(), + recovery_state.get_pg_committed_to(), std::move(ctx->log), ctx->updated_hset_history, on_all_commit, @@ -11623,7 +11635,7 @@ void PrimaryLogPG::submit_log_entries( eversion_t old_last_update = info.last_update; recovery_state.merge_new_log_entries( entries, t, recovery_state.get_pg_trim_to(), - recovery_state.get_min_last_complete_ondisk()); + recovery_state.get_pg_committed_to()); set waiting_on; for (set::const_iterator i = get_acting_recovery_backfill().begin(); @@ -11643,7 +11655,7 @@ void PrimaryLogPG::submit_log_entries( get_last_peering_reset(), repop->rep_tid, recovery_state.get_pg_trim_to(), - recovery_state.get_min_last_complete_ondisk()); + recovery_state.get_pg_committed_to()); osd->send_message_osd_cluster( peer.osd, m, get_osdmap_epoch()); waiting_on.insert(peer); @@ -12644,17 +12656,18 @@ void PrimaryLogPG::do_update_log_missing(OpRequestRef &op) op->get_req()); ceph_assert(m->get_type() == MSG_OSD_PG_UPDATE_LOG_MISSING); ObjectStore::Transaction t; - std::optional op_trim_to, op_roll_forward_to; + std::optional op_trim_to, op_pg_committed_to; if (m->pg_trim_to != eversion_t()) op_trim_to = m->pg_trim_to; - if (m->pg_roll_forward_to != eversion_t()) - op_roll_forward_to = m->pg_roll_forward_to; + if (m->pg_committed_to != eversion_t()) + op_pg_committed_to = m->pg_committed_to; dout(20) << __func__ - << " op_trim_to = " << op_trim_to << " op_roll_forward_to = " << op_roll_forward_to << dendl; + << " op_trim_to = " << op_trim_to << " op_pg_committed_to = " + << op_pg_committed_to << dendl; recovery_state.append_log_entries_update_missing( - m->entries, t, op_trim_to, op_roll_forward_to); + m->entries, t, op_trim_to, op_pg_committed_to); eversion_t new_lcod = info.last_complete; Context *complete = new LambdaContext( diff --git a/src/osd/PrimaryLogPG.h b/src/osd/PrimaryLogPG.h index 323b66e02a7be..f66b5c6e16aed 100644 --- a/src/osd/PrimaryLogPG.h +++ b/src/osd/PrimaryLogPG.h @@ -27,6 +27,7 @@ #include "messages/MOSDOpReply.h" #include "common/admin_finisher.h" #include "common/Checksummer.h" +#include "common/intrusive_timer.h" #include "common/sharedptr_registry.hpp" #include "common/shared_cache.hpp" #include "ReplicatedBackend.h" @@ -349,6 +350,19 @@ class PrimaryLogPG : public PG, eversion_t v, Context *on_complete) override; + void pg_lock() override { + lock(); + } + void pg_unlock() override { + unlock(); + } + void pg_add_ref() override { + intrusive_ptr_add_ref(this); + } + void pg_dec_ref() override { + intrusive_ptr_release(this); + } + template class BlessedGenContext; template class UnlockedBlessedGenContext; class BlessedContext; @@ -439,6 +453,9 @@ class PrimaryLogPG : public PG, const pg_pool_t &get_pool() const override { return pool.info; } + eversion_t get_pg_committed_to() const override { + return recovery_state.get_pg_committed_to(); + } ObjectContextRef get_obc( const hobject_t &hoid, @@ -497,12 +514,12 @@ class PrimaryLogPG : public PG, const std::optional &hset_history, const eversion_t &trim_to, const eversion_t &roll_forward_to, - const eversion_t &min_last_complete_ondisk, + const eversion_t &pg_committed_to, bool transaction_applied, ObjectStore::Transaction &t, bool async = false) override { if (is_primary()) { - ceph_assert(trim_to <= recovery_state.get_last_update_ondisk()); + ceph_assert(trim_to <= pg_committed_to); } if (hset_history) { recovery_state.update_hset(*hset_history); @@ -519,7 +536,7 @@ class PrimaryLogPG : public PG, replica_clear_repop_obc(logv, t); } recovery_state.append_log( - std::move(logv), trim_to, roll_forward_to, min_last_complete_ondisk, + std::move(logv), trim_to, roll_forward_to, pg_committed_to, t, transaction_applied, async); } @@ -552,6 +569,10 @@ class PrimaryLogPG : public PG, recovery_state.update_last_complete_ondisk(lcod); } + void update_pct(eversion_t pct) override { + recovery_state.update_pct(pct); + } + void update_stats( const pg_stat_t &stat) override { recovery_state.update_stats( @@ -565,6 +586,8 @@ class PrimaryLogPG : public PG, GenContext *c, uint64_t cost) override; + common::intrusive_timer &get_pg_timer() override; + pg_shard_t whoami_shard() const override { return pg_whoami; } @@ -580,6 +603,9 @@ class PrimaryLogPG : public PG, uint64_t min_upacting_features() const override { return recovery_state.get_min_upacting_features(); } + pg_feature_vec_t get_pg_acting_features() const override { + return recovery_state.get_pg_acting_features(); + } void send_message_osd_cluster( int peer, Message *m, epoch_t from_epoch) override { osd->send_message_osd_cluster(peer, m, from_epoch); diff --git a/src/osd/ReplicatedBackend.cc b/src/osd/ReplicatedBackend.cc index 3702490fb610f..7ce8fbcd2102b 100644 --- a/src/osd/ReplicatedBackend.cc +++ b/src/osd/ReplicatedBackend.cc @@ -14,6 +14,7 @@ #include "common/errno.h" #include "ReplicatedBackend.h" #include "messages/MOSDOp.h" +#include "messages/MOSDPGPCT.h" #include "messages/MOSDRepOp.h" #include "messages/MOSDRepOpReply.h" #include "messages/MOSDPGPush.h" @@ -124,7 +125,9 @@ ReplicatedBackend::ReplicatedBackend( ObjectStore::CollectionHandle &c, ObjectStore *store, CephContext *cct) : - PGBackend(cct, pg, store, coll, c) {} + PGBackend(cct, pg, store, coll, c), + pct_callback(this) +{} void ReplicatedBackend::run_recovery_op( PGBackend::RecoveryHandle *_h, @@ -229,6 +232,10 @@ bool ReplicatedBackend::_handle_message( return true; } + case MSG_OSD_PG_PCT: + do_pct(op); + return true; + default: break; } @@ -261,6 +268,7 @@ void ReplicatedBackend::on_change() } in_progress_ops.clear(); clear_recovery_state(); + cancel_pct_update(); } int ReplicatedBackend::objects_read_sync( @@ -462,13 +470,86 @@ void generate_transaction( }); } +void ReplicatedBackend::do_pct(OpRequestRef op) +{ + const MOSDPGPCT *m = static_cast(op->get_req()); + dout(10) << __func__ << ": received pct update to " + << m->pg_committed_to << dendl; + parent->update_pct(m->pg_committed_to); +} + +void ReplicatedBackend::send_pct_update() +{ + dout(10) << __func__ << ": sending pct update" << dendl; + ceph_assert( + PG_HAVE_FEATURE(parent->get_pg_acting_features(), PCT)); + for (const auto &i: parent->get_acting_shards()) { + if (i == parent->whoami_shard()) continue; + + auto *pct_update = new MOSDPGPCT( + spg_t(parent->whoami_spg_t().pgid, i.shard), + get_osdmap_epoch(), parent->get_interval_start_epoch(), + parent->get_pg_committed_to() + ); + + dout(10) << __func__ << ": sending pct update to i " << i + << ", i.osd " << i.osd << dendl; + parent->send_message_osd_cluster( + i.osd, pct_update, get_osdmap_epoch()); + } + dout(10) << __func__ << ": sending pct update complete" << dendl; +} + +void ReplicatedBackend::maybe_kick_pct_update() +{ + if (!in_progress_ops.empty()) { + dout(20) << __func__ << ": not scheduling pct update, " + << in_progress_ops.size() << " ops pending" << dendl; + return; + } + + if (!PG_HAVE_FEATURE(parent->get_pg_acting_features(), PCT)) { + dout(20) << __func__ << ": not scheduling pct update, PCT feature not" + << " supported" << dendl; + return; + } + + if (pct_callback.is_scheduled()) { + derr << __func__ + << ": pct_callback is already scheduled, this should be impossible" + << dendl; + return; + } + + int64_t pct_delay; + if (!parent->get_pool().opts.get( + pool_opts_t::PCT_UPDATE_DELAY, &pct_delay)) { + dout(20) << __func__ << ": not scheduling pct update, PCT_UPDATE_DELAY not" + << " set" << dendl; + return; + } + + dout(10) << __func__ << ": scheduling pct update after " + << pct_delay << " seconds" << dendl; + parent->get_pg_timer().schedule_after( + pct_callback, std::chrono::seconds(pct_delay)); +} + +void ReplicatedBackend::cancel_pct_update() +{ + if (pct_callback.is_scheduled()) { + dout(10) << __func__ << ": canceling pct update" << dendl; + parent->get_pg_timer().cancel(pct_callback); + } +} + void ReplicatedBackend::submit_transaction( const hobject_t &soid, const object_stat_sum_t &delta_stats, const eversion_t &at_version, PGTransactionUPtr &&_t, const eversion_t &trim_to, - const eversion_t &min_last_complete_ondisk, + const eversion_t &pg_committed_to, vector&& _log_entries, std::optional &hset_history, Context *on_all_commit, @@ -476,6 +557,8 @@ void ReplicatedBackend::submit_transaction( osd_reqid_t reqid, OpRequestRef orig_op) { + cancel_pct_update(); + parent->apply_stats( soid, delta_stats); @@ -517,7 +600,7 @@ void ReplicatedBackend::submit_transaction( tid, reqid, trim_to, - min_last_complete_ondisk, + pg_committed_to, added.size() ? *(added.begin()) : hobject_t(), removed.size() ? *(removed.begin()) : hobject_t(), log_entries, @@ -533,7 +616,7 @@ void ReplicatedBackend::submit_transaction( hset_history, trim_to, at_version, - min_last_complete_ondisk, + pg_committed_to, true, op_t); @@ -572,6 +655,7 @@ void ReplicatedBackend::op_commit(const ceph::ref_t& op) op->on_commit = 0; in_progress_ops.erase(op->tid); } + maybe_kick_pct_update(); } void ReplicatedBackend::do_repop_reply(OpRequestRef op) @@ -628,6 +712,7 @@ void ReplicatedBackend::do_repop_reply(OpRequestRef op) in_progress_ops.erase(iter); } } + maybe_kick_pct_update(); } int ReplicatedBackend::be_deep_scrub( @@ -953,7 +1038,7 @@ Message * ReplicatedBackend::generate_subop( ceph_tid_t tid, osd_reqid_t reqid, eversion_t pg_trim_to, - eversion_t min_last_complete_ondisk, + eversion_t pg_committed_to, hobject_t new_temp_oid, hobject_t discard_temp_oid, const bufferlist &log_entries, @@ -990,13 +1075,9 @@ Message * ReplicatedBackend::generate_subop( wr->pg_trim_to = pg_trim_to; - if (HAVE_FEATURE(parent->min_peer_features(), OSD_REPOP_MLCOD)) { - wr->min_last_complete_ondisk = min_last_complete_ondisk; - } else { - /* Some replicas need this field to be at_version. New replicas - * will ignore it */ - wr->set_rollback_to(at_version); - } + // this feature is from 2019 (6f12bf27cb91), assume present + ceph_assert(HAVE_FEATURE(parent->min_peer_features(), OSD_REPOP_MLCOD)); + wr->pg_committed_to = pg_committed_to; wr->new_temp_oid = new_temp_oid; wr->discard_temp_oid = discard_temp_oid; @@ -1010,7 +1091,7 @@ void ReplicatedBackend::issue_op( ceph_tid_t tid, osd_reqid_t reqid, eversion_t pg_trim_to, - eversion_t min_last_complete_ondisk, + eversion_t pg_committed_to, hobject_t new_temp_oid, hobject_t discard_temp_oid, const vector &log_entries, @@ -1043,7 +1124,7 @@ void ReplicatedBackend::issue_op( tid, reqid, pg_trim_to, - min_last_complete_ondisk, + pg_committed_to, new_temp_oid, discard_temp_oid, logs, @@ -1145,7 +1226,7 @@ void ReplicatedBackend::do_repop(OpRequestRef op) m->updated_hit_set_history, m->pg_trim_to, m->version, /* Replicated PGs don't have rollback info */ - m->min_last_complete_ondisk, + m->pg_committed_to, update_snaps, rm->localt, async); diff --git a/src/osd/ReplicatedBackend.h b/src/osd/ReplicatedBackend.h index aab75d21c7372..3dcae20605941 100644 --- a/src/osd/ReplicatedBackend.h +++ b/src/osd/ReplicatedBackend.h @@ -341,6 +341,40 @@ class ReplicatedBackend : public PGBackend { op(op), v(v) {} }; std::map> in_progress_ops; + + /// Invoked by pct_callback to update PCT after a pause in IO + void send_pct_update(); + + /// Handle MOSDPGPCT message + void do_pct(OpRequestRef op); + + /// Kick pct timer if repop_queue is empty + void maybe_kick_pct_update(); + + /// Kick pct timer if repop_queue is empty + void cancel_pct_update(); + + struct pct_callback_t final : public common::intrusive_timer::callback_t { + ReplicatedBackend *backend; + + pct_callback_t(ReplicatedBackend *backend) : backend(backend) {} + + void lock() override { + return backend->parent->pg_lock(); + } + void unlock() override { + return backend->parent->pg_unlock(); + } + void add_ref() override { + return backend->parent->pg_add_ref(); + } + void dec_ref() override { + return backend->parent->pg_dec_ref(); + } + void invoke() override { + return backend->send_pct_update(); + } + } pct_callback; public: friend class C_OSD_OnOpCommit; @@ -356,7 +390,7 @@ class ReplicatedBackend : public PGBackend { const eversion_t &at_version, PGTransactionUPtr &&t, const eversion_t &trim_to, - const eversion_t &min_last_complete_ondisk, + const eversion_t &pg_committed_to, std::vector&& log_entries, std::optional &hset_history, Context *on_all_commit, @@ -372,7 +406,7 @@ class ReplicatedBackend : public PGBackend { ceph_tid_t tid, osd_reqid_t reqid, eversion_t pg_trim_to, - eversion_t min_last_complete_ondisk, + eversion_t pg_committed_to, hobject_t new_temp_oid, hobject_t discard_temp_oid, const ceph::buffer::list &log_entries, @@ -386,7 +420,7 @@ class ReplicatedBackend : public PGBackend { ceph_tid_t tid, osd_reqid_t reqid, eversion_t pg_trim_to, - eversion_t min_last_complete_ondisk, + eversion_t pg_committed_to, hobject_t new_temp_oid, hobject_t discard_temp_oid, const std::vector &log_entries, diff --git a/src/osd/osd_perf_counters.cc b/src/osd/osd_perf_counters.cc index d585159649fdc..def85209c4ec7 100644 --- a/src/osd/osd_perf_counters.cc +++ b/src/osd/osd_perf_counters.cc @@ -133,6 +133,22 @@ PerfCounters *build_osd_logger(CephContext *cct) { osd_plb.add_time_avg(l_osd_op_before_dequeue_op_lat, "op_before_dequeue_op_lat", "Latency of IO before calling dequeue_op(already dequeued and get PG lock)"); // client io before dequeue_op latency + + osd_plb.add_u64_counter( + l_osd_replica_read, "replica_read", "Count of replica reads received"); + osd_plb.add_u64_counter( + l_osd_replica_read_redirect_missing, + "replica_read_redirect_missing", + "Count of replica reads redirected to primary due to missing object"); + osd_plb.add_u64_counter( + l_osd_replica_read_redirect_conflict, + "replica_read_redirect_conflict", + "Count of replica reads redirected to primary due to unstable write"); + osd_plb.add_u64_counter( + l_osd_replica_read_served, + "replica_read_served", + "Count of replica reads served"); + osd_plb.add_u64_counter( l_osd_sop, "subop", "Suboperations"); osd_plb.add_u64_counter( diff --git a/src/osd/osd_perf_counters.h b/src/osd/osd_perf_counters.h index 367da1712fbf2..cccdb87a5381f 100644 --- a/src/osd/osd_perf_counters.h +++ b/src/osd/osd_perf_counters.h @@ -43,6 +43,11 @@ enum { l_osd_op_before_queue_op_lat, l_osd_op_before_dequeue_op_lat, + l_osd_replica_read, + l_osd_replica_read_redirect_missing, + l_osd_replica_read_redirect_conflict, + l_osd_replica_read_served, + l_osd_sop, l_osd_sop_inb, l_osd_sop_lat, diff --git a/src/osd/osd_types.cc b/src/osd/osd_types.cc index c9f3f7d1464b7..5c2cf8b16b059 100644 --- a/src/osd/osd_types.cc +++ b/src/osd/osd_types.cc @@ -1378,7 +1378,9 @@ static opt_mapping_t opt_mapping = boost::assign::map_list_of ("pg_num_max", pool_opts_t::opt_desc_t( pool_opts_t::PG_NUM_MAX, pool_opts_t::INT)) ("read_ratio", pool_opts_t::opt_desc_t( - pool_opts_t::READ_RATIO, pool_opts_t::INT)); + pool_opts_t::READ_RATIO, pool_opts_t::INT)) + ("pct_update_delay", pool_opts_t::opt_desc_t( + pool_opts_t::PCT_UPDATE_DELAY, pool_opts_t::INT)); bool pool_opts_t::is_opt_name(const std::string& name) { @@ -3677,13 +3679,14 @@ void pg_info_t::generate_test_instances(list& o) // -- pg_notify_t -- void pg_notify_t::encode(ceph::buffer::list &bl) const { - ENCODE_START(3, 2, bl); + ENCODE_START(4, 2, bl); encode(query_epoch, bl); encode(epoch_sent, bl); encode(info, bl); encode(to, bl); encode(from, bl); encode(past_intervals, bl); + encode(pg_features, bl); ENCODE_FINISH(bl); } @@ -3698,6 +3701,9 @@ void pg_notify_t::decode(ceph::buffer::list::const_iterator &bl) if (struct_v >= 3) { decode(past_intervals, bl); } + if (struct_v >= 4) { + decode(pg_features, bl); + } DECODE_FINISH(bl); } @@ -3719,9 +3725,11 @@ void pg_notify_t::generate_test_instances(list& o) { o.push_back(new pg_notify_t); o.push_back(new pg_notify_t(shard_id_t(3), shard_id_t::NO_SHARD, 1, 1, - pg_info_t(spg_t(pg_t(0,10), shard_id_t(-1))), PastIntervals())); + pg_info_t(spg_t(pg_t(0,10), shard_id_t(-1))), PastIntervals(), + PG_FEATURE_CLASSIC_ALL)); o.push_back(new pg_notify_t(shard_id_t(0), shard_id_t(2), 3, 10, - pg_info_t(spg_t(pg_t(10,10), shard_id_t(2))), PastIntervals())); + pg_info_t(spg_t(pg_t(10,10), shard_id_t(2))), PastIntervals(), + PG_FEATURE_CLASSIC_ALL)); } ostream &operator<<(ostream &lhs, const pg_notify_t ¬ify) diff --git a/src/osd/osd_types.h b/src/osd/osd_types.h index e2edaa39dfc2d..b6f5335a0f513 100644 --- a/src/osd/osd_types.h +++ b/src/osd/osd_types.h @@ -51,6 +51,7 @@ #include "librados/ListObjectImpl.h" #include "compressor/Compressor.h" #include "osd_perf_counters.h" +#include "pg_features.h" #define CEPH_OSD_ONDISK_MAGIC "ceph osd volume v026" @@ -1106,6 +1107,21 @@ class pool_opts_t { DEDUP_CDC_CHUNK_SIZE, PG_NUM_MAX, // max pg_num READ_RATIO, // read ration for the read balancer work [0-100] + /** + * PCT_UPDATE_DELAY + * + * Time to wait (seconds) after there are no in progress writes before + * updating pg_committed_to on replicas. If the period between writes on + * a PG is usually longer than this value, most writes will trigger an + * extra message. + * + * The primary reason to enable this feature would be to limit the time + * between a write and when that write is available to be read on replicas. + * + * A value <= 0 will cause the update to be sent immediately upon write + * completion if there are no other in progress writes. + */ + PCT_UPDATE_DELAY, }; enum type_t { @@ -3790,6 +3806,7 @@ struct pg_notify_t { shard_id_t to; shard_id_t from; PastIntervals past_intervals; + pg_feature_vec_t pg_features = PG_FEATURE_NONE; pg_notify_t() : query_epoch(0), epoch_sent(0), to(shard_id_t::NO_SHARD), from(shard_id_t::NO_SHARD) {} @@ -3799,11 +3816,12 @@ struct pg_notify_t { epoch_t query_epoch, epoch_t epoch_sent, const pg_info_t &info, - const PastIntervals& pi) + const PastIntervals& pi, + pg_feature_vec_t pg_features) : query_epoch(query_epoch), epoch_sent(epoch_sent), info(info), to(to), from(from), - past_intervals(pi) { + past_intervals(pi), pg_features(pg_features) { ceph_assert(from == info.pgid.shard); } void encode(ceph::buffer::list &bl) const; diff --git a/src/osd/pg_features.h b/src/osd/pg_features.h new file mode 100644 index 0000000000000..e601c84ee6887 --- /dev/null +++ b/src/osd/pg_features.h @@ -0,0 +1,26 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +/* This feature set defines a set of features supported by OSDs once a PG has + * gone active. + * Mechanically, pretty much the same as include/ceph_features.h */ + +using pg_feature_vec_t = uint64_t; +static constexpr pg_feature_vec_t PG_FEATURE_INCARNATION_1 = 0ull; + +#define DEFINE_PG_FEATURE(bit, incarnation, name) \ + static constexpr pg_feature_vec_t PG_FEATURE_##name = (1ull << bit); \ + static constexpr pg_feature_vec_t PG_FEATUREMASK_##name = \ + (1ull << bit) | PG_FEATURE_INCARNATION_##incarnation; + +#define PG_HAVE_FEATURE(x, name) \ + (((x) & (PG_FEATUREMASK_##name)) == (PG_FEATUREMASK_##name)) + +DEFINE_PG_FEATURE(0, 1, PCT) + +static constexpr pg_feature_vec_t PG_FEATURE_NONE = 0ull; +static constexpr pg_feature_vec_t PG_FEATURE_CRIMSON_ALL = 0ull; +static constexpr pg_feature_vec_t PG_FEATURE_CLASSIC_ALL = + PG_FEATURE_PCT; diff --git a/src/osdc/Objecter.cc b/src/osdc/Objecter.cc index d881c6e1dc386..087b623333bb7 100644 --- a/src/osdc/Objecter.cc +++ b/src/osdc/Objecter.cc @@ -180,6 +180,10 @@ enum { l_osdc_osdop_omap_rd, l_osdc_osdop_omap_del, + l_osdc_replica_read_sent, + l_osdc_replica_read_bounced, + l_osdc_replica_read_completed, + l_osdc_last, }; @@ -378,6 +382,13 @@ void Objecter::init() pcb.add_u64_counter(l_osdc_osdop_omap_del, "omap_del", "OSD OMAP delete operations"); + pcb.add_u64_counter(l_osdc_replica_read_sent, "replica_read_sent", + "Operations sent to replica"); + pcb.add_u64_counter(l_osdc_replica_read_bounced, "replica_read_bounced", + "Operations bounced by replica to be resent to primary"); + pcb.add_u64_counter(l_osdc_replica_read_completed, "replica_read_completed", + "Operations completed by replica"); + logger = pcb.create_perf_counters(); cct->get_perfcounters_collection()->add(logger); } @@ -2328,6 +2339,10 @@ void Objecter::_send_op_account(Op *op) ldout(cct, 20) << " note: not requesting reply" << dendl; } + if (op->target.used_replica) { + logger->inc(l_osdc_replica_read_sent); + } + logger->inc(l_osdc_op_active); logger->inc(l_osdc_op); logger->inc(l_osdc_oplen_avg, op->ops.size()); @@ -3477,6 +3492,15 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m) return; } + if (op->target.flags & (CEPH_OSD_FLAG_BALANCE_READS | + CEPH_OSD_FLAG_LOCALIZE_READS)) { + if (rc == -EAGAIN) { + logger->inc(l_osdc_replica_read_bounced); + } else { + logger->inc(l_osdc_replica_read_completed); + } + } + if (rc == -EAGAIN) { ldout(cct, 7) << " got -EAGAIN, resubmitting" << dendl; if (op->has_completion())