Skip to content

Commit

Permalink
Merge pull request ceph#56832 from Matan-B/wip-matanb-crimson-recover…
Browse files Browse the repository at this point in the history
…y-version

crimson/osd/pg: introduce projected_log
Reviewed-by: Xuehan Xu <[email protected]>
  • Loading branch information
Matan-B authored Nov 5, 2024
2 parents 4e355ce + 0a76ec3 commit a91bcae
Show file tree
Hide file tree
Showing 6 changed files with 64 additions and 17 deletions.
8 changes: 8 additions & 0 deletions src/crimson/osd/backfill_facades.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ struct PeeringFacade final : BackfillState::PeeringFacade {
return peering_state.get_info().log_tail;
}

const PGLog& get_pg_log() const override {
return peering_state.get_pg_log();
}

void scan_log_after(eversion_t v, scan_log_func_t f) const override {
peering_state.get_pg_log().get_log().scan_log_after(v, std::move(f));
}
Expand Down Expand Up @@ -73,6 +77,10 @@ struct PGFacade final : BackfillState::PGFacade {
return pg.projected_last_update;
}

const PGLog::IndexedLog& get_projected_log() const override {
return pg.projected_log;
}

PGFacade(PG& pg) : pg(pg) {}
};

Expand Down
11 changes: 6 additions & 5 deletions src/crimson/osd/backfill_state.cc
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,6 @@ void BackfillState::Enqueuing::maybe_update_range()
logger().info("{}: bi is current", __func__);
ceph_assert(primary_bi.version == pg().get_projected_last_update());
} else if (primary_bi.version >= peering_state().get_log_tail()) {
#if 0
if (peering_state().get_pg_log().get_log().empty() &&
pg().get_projected_log().empty()) {
/* Because we don't move log_tail on split, the log might be
Expand All @@ -137,13 +136,11 @@ void BackfillState::Enqueuing::maybe_update_range()
ceph_assert(primary_bi.version == eversion_t());
return;
}
#endif
logger().debug("{}: bi is old, ({}) can be updated with log to {}",
__func__,
primary_bi.version,
pg().get_projected_last_update());
logger().debug("{}: scanning pg log first", __func__);
peering_state().scan_log_after(primary_bi.version,
auto func =
[&](const pg_log_entry_t& e) {
logger().debug("maybe_update_range(lambda): updating from version {}",
e.version);
Expand All @@ -160,7 +157,11 @@ void BackfillState::Enqueuing::maybe_update_range()
primary_bi.objects.erase(e.soid);
}
}
});
};
logger().debug("{}: scanning pg log first", __func__);
peering_state().scan_log_after(primary_bi.version, func);
logger().debug("{}: scanning projected log", __func__);
pg().get_projected_log().scan_log_after(primary_bi.version, func);
primary_bi.version = pg().get_projected_last_update();
} else {
ceph_abort_msg(
Expand Down
4 changes: 4 additions & 0 deletions src/crimson/osd/backfill_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include <boost/statechart/transition.hpp>

#include "osd/recovery_types.h"
#include "osd/PGLog.h"

namespace crimson::osd {

Expand Down Expand Up @@ -367,6 +368,7 @@ struct BackfillState::PeeringFacade {
virtual hobject_t earliest_backfill() const = 0;
virtual const std::set<pg_shard_t>& get_backfill_targets() const = 0;
virtual const hobject_t& get_peer_last_backfill(pg_shard_t peer) const = 0;
virtual const PGLog& get_pg_log() const = 0;
virtual const eversion_t& get_last_update() const = 0;
virtual const eversion_t& get_log_tail() const = 0;

Expand All @@ -392,6 +394,8 @@ struct BackfillState::PeeringFacade {
// of behaviour that must be provided by a unit test's mock.
struct BackfillState::PGFacade {
virtual const eversion_t& get_projected_last_update() const = 0;
virtual const PGLog::IndexedLog& get_projected_log() const = 0;

virtual ~PGFacade() {}
};

Expand Down
37 changes: 26 additions & 11 deletions src/crimson/osd/pg.cc
Original file line number Diff line number Diff line change
Expand Up @@ -932,6 +932,10 @@ PG::submit_transaction(
ceph_assert(log_entries.rbegin()->version >= projected_last_update);
projected_last_update = log_entries.rbegin()->version;

for (const auto& entry: log_entries) {
projected_log.add(entry);
}

auto [submitted, all_completed] = co_await backend->submit_transaction(
peering_state.get_acting_recovery_backfill(),
obc->obs.oi.soid,
Expand Down Expand Up @@ -1333,19 +1337,15 @@ void PG::log_operation(
if (is_primary()) {
ceph_assert(trim_to <= peering_state.get_pg_committed_to());
}
/* TODO: when we add snap mapper and projected log support,
* we'll likely want to update them here.
*
* See src/osd/PrimaryLogPG.h:log_operation for how classic
* handles these cases.
*/
#if 0
auto last = logv.rbegin();
if (is_primary() && last != logv.rend()) {
logger().debug("{} on primary, trimming projected log",
__func__);
projected_log.skip_can_rollback_to_to_head();
projected_log.trim(cct, last->version, nullptr, nullptr, nullptr);
projected_log.trim(shard_services.get_cct(), last->version,
nullptr, nullptr, nullptr);
}
#endif

if (!is_primary()) { // && !is_ec_pg()
replica_clear_repop_obc(logv);
}
Expand Down Expand Up @@ -1651,8 +1651,8 @@ PG::already_complete(const osd_reqid_t& reqid)
int ret;
std::vector<pg_log_op_return_item_t> op_returns;

if (peering_state.get_pg_log().get_log().get_request(
reqid, &version, &user_version, &ret, &op_returns)) {
if (check_in_progress_op(
reqid, &version, &user_version, &ret, &op_returns)) {
complete_op_t dupinfo{
user_version,
version,
Expand Down Expand Up @@ -1717,4 +1717,19 @@ void PG::C_PG_FinishRecovery::finish(int r) {
DEBUGDPP("stale recovery finsher", pg);
}
}
bool PG::check_in_progress_op(
const osd_reqid_t& reqid,
eversion_t *version,
version_t *user_version,
int *return_code,
std::vector<pg_log_op_return_item_t> *op_returns
) const
{
return (
projected_log.get_request(reqid, version, user_version, return_code,
op_returns) ||
peering_state.get_pg_log().get_log().get_request(
reqid, version, user_version, return_code, op_returns));
}

}
8 changes: 8 additions & 0 deletions src/crimson/osd/pg.h
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,7 @@ class PG : public boost::intrusive_ref_counter<
void check_blocklisted_watchers() final;
void clear_primary_state() final {
recovery_finisher = nullptr;
projected_log = PGLog::IndexedLog();
}

void queue_check_readable(epoch_t last_peering_reset,
Expand Down Expand Up @@ -826,8 +827,15 @@ class PG : public boost::intrusive_ref_counter<
const eversion_t version;
const int err;
};
PGLog::IndexedLog projected_log;
interruptible_future<std::optional<complete_op_t>>
already_complete(const osd_reqid_t& reqid);
bool check_in_progress_op(
const osd_reqid_t& reqid,
eversion_t *version,
version_t *user_version,
int *return_code,
std::vector<pg_log_op_return_item_t> *op_returns) const;
int get_recovery_op_priority() const {
int64_t pri = 0;
get_pgpool().info.opts.get(pool_opts_t::RECOVERY_OP_PRIORITY, &pri);
Expand Down
13 changes: 12 additions & 1 deletion src/test/crimson/test_backfill.cc
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,11 @@ struct FakePrimary {
eversion_t last_update;
eversion_t projected_last_update;
eversion_t log_tail;
PGLog pg_log;
PGLog::IndexedLog projected_log;

FakePrimary(FakeStore&& store)
: store(std::move(store)) {
: store(std::move(store)), pg_log(nullptr) {
}
};

Expand Down Expand Up @@ -234,6 +236,10 @@ struct BackfillFixture::PeeringFacade
return backfill_source.log_tail;
}

const PGLog& get_pg_log() const override {
return backfill_source.pg_log;
}

void scan_log_after(eversion_t, scan_log_func_t) const override {
/* NOP */
}
Expand Down Expand Up @@ -263,6 +269,11 @@ struct BackfillFixture::PGFacade : public crimson::osd::BackfillState::PGFacade
const eversion_t& get_projected_last_update() const override {
return backfill_source.projected_last_update;
}

const PGLog::IndexedLog& get_projected_log() const override {
return backfill_source.projected_log;
}

};

BackfillFixture::BackfillFixture(
Expand Down

0 comments on commit a91bcae

Please sign in to comment.