Skip to content

Commit

Permalink
Merge pull request ceph#61086 from athanatos/sjust/wip-rep-pipeline
Browse files Browse the repository at this point in the history
crimson: allow replica side write commits to pipeline

Reviewed-by: Xuehan Xu <[email protected]>
  • Loading branch information
athanatos authored Jan 3, 2025
2 parents 3ec1e93 + 68612d1 commit 7d29ca7
Show file tree
Hide file tree
Showing 6 changed files with 96 additions and 30 deletions.
6 changes: 6 additions & 0 deletions src/crimson/osd/osd_operation.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,12 @@ struct PGRepopPipeline {
struct Process : OrderedExclusivePhaseT<Process> {
static constexpr auto type_name = "PGRepopPipeline::process";
} process;
struct WaitCommit : OrderedConcurrentPhaseT<WaitCommit> {
static constexpr auto type_name = "PGRepopPipeline::wait_repop";
} wait_commit;
struct SendReply : OrderedExclusivePhaseT<SendReply> {
static constexpr auto type_name = "PGRepopPipeline::send_reply";
} send_reply;
};

struct CommonOBCPipeline {
Expand Down
39 changes: 37 additions & 2 deletions src/crimson/osd/osd_operation_external_tracking.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,10 @@ struct LttngBackend
CommonOBCPipeline::WaitRepop::BlockingEvent::Backend,
CommonOBCPipeline::WaitRepop::BlockingEvent::ExitBarrierEvent::Backend,
CommonOBCPipeline::SendReply::BlockingEvent::Backend,
PGRepopPipeline::Process::BlockingEvent::Backend
PGRepopPipeline::Process::BlockingEvent::Backend,
PGRepopPipeline::WaitCommit::BlockingEvent::Backend,
PGRepopPipeline::WaitCommit::BlockingEvent::ExitBarrierEvent::Backend,
PGRepopPipeline::SendReply::BlockingEvent::Backend
{
void handle(ClientRequest::StartEvent&,
const Operation&) override {}
Expand Down Expand Up @@ -126,6 +129,20 @@ struct LttngBackend
const PGRepopPipeline::Process& blocker) override {
}

void handle(PGRepopPipeline::WaitCommit::BlockingEvent& ev,
const Operation& op,
const PGRepopPipeline::WaitCommit& blocker) override {
}

void handle(PGRepopPipeline::WaitCommit::BlockingEvent::ExitBarrierEvent& ev,
const Operation& op) override {
}

void handle(PGRepopPipeline::SendReply::BlockingEvent& ev,
const Operation& op,
const PGRepopPipeline::SendReply& blocker) override {
}

void handle(ClientRequest::CompletionEvent&,
const Operation&) override {}

Expand All @@ -150,7 +167,10 @@ struct HistoricBackend
CommonOBCPipeline::WaitRepop::BlockingEvent::Backend,
CommonOBCPipeline::WaitRepop::BlockingEvent::ExitBarrierEvent::Backend,
CommonOBCPipeline::SendReply::BlockingEvent::Backend,
PGRepopPipeline::Process::BlockingEvent::Backend
PGRepopPipeline::Process::BlockingEvent::Backend,
PGRepopPipeline::WaitCommit::BlockingEvent::Backend,
PGRepopPipeline::WaitCommit::BlockingEvent::ExitBarrierEvent::Backend,
PGRepopPipeline::SendReply::BlockingEvent::Backend
{
void handle(ClientRequest::StartEvent&,
const Operation&) override {}
Expand Down Expand Up @@ -246,6 +266,21 @@ struct HistoricBackend
const PGRepopPipeline::Process& blocker) override {
}

void handle(PGRepopPipeline::WaitCommit::BlockingEvent& ev,
const Operation& op,
const PGRepopPipeline::WaitCommit& blocker) override {
}

void handle(PGRepopPipeline::WaitCommit::BlockingEvent::ExitBarrierEvent& ev,
const Operation& op) override {
}

void handle(PGRepopPipeline::SendReply::BlockingEvent& ev,
const Operation& op,
const PGRepopPipeline::SendReply& blocker) override {
}


void handle(ClientRequest::CompletionEvent&, const Operation& op) override {
if (crimson::common::local_conf()->osd_op_history_size) {
to_client_request(op).put_historic();
Expand Down
55 changes: 37 additions & 18 deletions src/crimson/osd/osd_operations/replicated_request.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

#include "common/Formatter.h"

#include "crimson/common/coroutine.h"
#include "crimson/osd/osd.h"
#include "crimson/osd/osd_connection_priv.h"
#include "crimson/osd/osd_operation_external_tracking.h"
Expand Down Expand Up @@ -63,34 +64,52 @@ PGRepopPipeline &RepRequest::repop_pipeline(PG &pg)
return pg.repop_pipeline;
}

RepRequest::interruptible_future<> RepRequest::with_pg_interruptible(
Ref<PG> pg)
{
LOG_PREFIX(RepRequest::with_pg_interruptible);
DEBUGI("{}", *this);
co_await this->template enter_stage<interruptor>(repop_pipeline(*pg).process);
co_await interruptor::make_interruptible(this->template with_blocking_event<
PG_OSDMapGate::OSDMapBlocker::BlockingEvent
>([this, pg](auto &&trigger) {
return pg->osdmap_gate.wait_for_map(
std::move(trigger), req->min_epoch);
}));

if (pg->can_discard_replica_op(*req)) {
co_return;
}

auto [commit_fut, reply] = co_await pg->handle_rep_op(req);

// Transitions from OrderedExclusive->OrderedConcurrent cannot block
this->template enter_stage_sync(repop_pipeline(*pg).wait_commit);

co_await std::move(commit_fut);

co_await this->template enter_stage<interruptor>(
repop_pipeline(*pg).send_reply);

co_await interruptor::make_interruptible(
pg->shard_services.send_to_osd(
req->from.osd, std::move(reply), pg->get_osdmap_epoch())
);
}

seastar::future<> RepRequest::with_pg(
ShardServices &shard_services, Ref<PG> pg)
{
LOG_PREFIX(RepRequest::with_pg);
DEBUGI("{}: RepRequest::with_pg", *this);
DEBUGI("{}", *this);
IRef ref = this;
return interruptor::with_interruption([this, pg] {
LOG_PREFIX(RepRequest::with_pg);
DEBUGI("{}: pg present", *this);
return this->template enter_stage<interruptor>(repop_pipeline(*pg).process
).then_interruptible([this, pg] {
return this->template with_blocking_event<
PG_OSDMapGate::OSDMapBlocker::BlockingEvent
>([this, pg](auto &&trigger) {
return pg->osdmap_gate.wait_for_map(
std::move(trigger), req->min_epoch);
});
}).then_interruptible([this, pg] (auto) {
return pg->handle_rep_op(req);
}).then_interruptible([this] {
logger().debug("{}: complete", *this);
return handle.complete();
});
return with_pg_interruptible(pg);
}, [](std::exception_ptr) {
return seastar::now();
}, pg, pg->get_osdmap_epoch()).finally([this, ref=std::move(ref)] {
logger().debug("{}: exit", *this);
handle.exit();
return handle.complete();
});
}

Expand Down
5 changes: 5 additions & 0 deletions src/crimson/osd/osd_operations/replicated_request.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ class RepRequest final : public PhasedOperationT<RepRequest> {
r_conn = make_local_shared_foreign(std::move(conn));
}

interruptible_future<> with_pg_interruptible(
Ref<PG> pg);

seastar::future<> with_pg(
ShardServices &shard_services, Ref<PG> pg);

Expand All @@ -78,6 +81,8 @@ class RepRequest final : public PhasedOperationT<RepRequest> {
ConnectionPipeline::GetPGMapping::BlockingEvent,
PerShardPipeline::CreateOrWaitPG::BlockingEvent,
PGRepopPipeline::Process::BlockingEvent,
PGRepopPipeline::WaitCommit::BlockingEvent,
PGRepopPipeline::SendReply::BlockingEvent,
PG_OSDMapGate::OSDMapBlocker::BlockingEvent,
PGMap::PGCreationBlockingEvent,
OSD_OSDMapGate::OSDMapBlocker::BlockingEvent
Expand Down
13 changes: 4 additions & 9 deletions src/crimson/osd/pg.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1215,13 +1215,10 @@ void PG::update_stats(const pg_stat_t &stat) {
);
}

PG::interruptible_future<> PG::handle_rep_op(Ref<MOSDRepOp> req)
PG::handle_rep_op_fut PG::handle_rep_op(Ref<MOSDRepOp> req)
{
LOG_PREFIX(PG::handle_rep_op);
DEBUGDPP("{}", *this, *req);
if (can_discard_replica_op(*req)) {
co_return;
}

ceph::os::Transaction txn;
auto encoded_txn = req->get_data().cbegin();
Expand All @@ -1243,7 +1240,8 @@ PG::interruptible_future<> PG::handle_rep_op(Ref<MOSDRepOp> req)
txn,
false);
DEBUGDPP("{} do_transaction", *this, *req);
co_await interruptor::make_interruptible(

auto commit_fut = interruptor::make_interruptible(
shard_services.get_store().do_transaction(coll_ref, std::move(txn))
);

Expand All @@ -1254,10 +1252,7 @@ PG::interruptible_future<> PG::handle_rep_op(Ref<MOSDRepOp> req)
req.get(), pg_whoami, 0,
map_epoch, req->get_min_epoch(), CEPH_OSD_FLAG_ONDISK);
reply->set_last_complete_ondisk(lcod);
co_await interruptor::make_interruptible(
shard_services.send_to_osd(req->from.osd, std::move(reply), map_epoch)
);
co_return;
co_return handle_rep_op_ret(std::move(commit_fut), std::move(reply));
}

PG::interruptible_future<> PG::update_snap_map(
Expand Down
8 changes: 7 additions & 1 deletion src/crimson/osd/pg.h
Original file line number Diff line number Diff line change
Expand Up @@ -596,7 +596,13 @@ class PG : public boost::intrusive_ref_counter<
using with_obc_func_t =
std::function<load_obc_iertr::future<> (ObjectContextRef, ObjectContextRef)>;

interruptible_future<> handle_rep_op(Ref<MOSDRepOp> m);
using handle_rep_op_ret = std::tuple<
interruptible_future<>, // resolves upon commit
MURef<MOSDRepOpReply> // reply message
>;
// outer future resolves upon submission
using handle_rep_op_fut = interruptible_future<handle_rep_op_ret>;
handle_rep_op_fut handle_rep_op(Ref<MOSDRepOp> m);
void update_stats(const pg_stat_t &stat);
interruptible_future<> update_snap_map(
const std::vector<pg_log_entry_t> &log_entries,
Expand Down

0 comments on commit 7d29ca7

Please sign in to comment.