Skip to content

Commit

Permalink
Merge pull request #4707 from systeminit/fnichol/rebaser-quiet-shutdo…
Browse files Browse the repository at this point in the history
…wn-v2

feat(rebaser): change quiescent shutdown to reduce missed activity
  • Loading branch information
britmyerss authored Jan 20, 2025
2 parents 5f89360 + 7607e7e commit d604d1e
Show file tree
Hide file tree
Showing 7 changed files with 132 additions and 67 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions lib/rebaser-server/BUCK
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ rust_library(
"//lib/si-settings:si-settings",
"//lib/si-std:si-std",
"//lib/telemetry-nats-rs:telemetry-nats",
"//lib/telemetry-utils-rs:telemetry-utils",
"//lib/telemetry-rs:telemetry",
"//lib/veritech-client:veritech-client",
"//third-party/rust:derive_builder",
Expand Down
1 change: 1 addition & 0 deletions lib/rebaser-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ si-settings = { path = "../../lib/si-settings" }
si-std = { path = "../../lib/si-std" }
telemetry = { path = "../../lib/telemetry-rs" }
telemetry-nats = { path = "../../lib/telemetry-nats-rs" }
telemetry-utils = { path = "../../lib/telemetry-utils-rs" }
veritech-client = { path = "../../lib/veritech-client" }

derive_builder = { workspace = true }
Expand Down
80 changes: 31 additions & 49 deletions lib/rebaser-server/src/change_set_processor_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use si_data_nats::{
};
use si_events::{ChangeSetId, WorkspacePk};
use telemetry::prelude::*;
use telemetry_utils::metric;
use thiserror::Error;
use tokio::sync::Notify;
use tokio_stream::StreamExt as _;
Expand All @@ -49,7 +50,6 @@ pub(crate) struct ChangeSetProcessorTask {
workspace_id: WorkspacePk,
change_set_id: ChangeSetId,
inner: Box<dyn Future<Output = io::Result<()>> + Unpin + Send>,
quiescence_token: CancellationToken,
}

impl ChangeSetProcessorTask {
Expand All @@ -64,8 +64,10 @@ impl ChangeSetProcessorTask {
workspace_id: WorkspacePk,
change_set_id: ChangeSetId,
ctx_builder: DalContextBuilder,
run_notify: Arc<Notify>,
run_dvu_notify: Arc<Notify>,
quiescent_period: Duration,
quiesced_notify: Arc<Notify>,
quiesced_token: CancellationToken,
task_token: CancellationToken,
server_tracker: TaskTracker,
) -> Self {
Expand All @@ -78,44 +80,41 @@ impl ChangeSetProcessorTask {
change_set_id,
nats,
ctx_builder,
run_notify,
run_dvu_notify,
server_tracker,
);

let quiescence_token = CancellationToken::new();

let captured = QuiescedCaptured {
instance_id: metadata.instance_id().to_string(),
workspace_id,
change_set_id,
quiescence_token: quiescence_token.clone(),
quiesced_notify: quiesced_notify.clone(),
};

let inactive_aware_incoming = incoming
// Looks for a gap between incoming messages greater than the duration
.timeout(quiescent_period)
// Fire the quiescence token which triggers a distinctive shutdown where we *know* we
// want to remove the task from the set of work.
// Fire quiesced_notify which triggers a specific shutdown of the serial dvu task where
// we *know* we want to remove the task from the set of work.
.inspect_err(move |_elapsed| {
let QuiescedCaptured {
instance_id,
workspace_id,
change_set_id,
quiescence_token,
quiesced_notify,
} = &captured;

debug!(
info!(
service.instance.id = instance_id,
si.workspace.id = %workspace_id,
si.change_set.id = %change_set_id,
"rate of requests has become inactive, shutting down processing tasks",
"rate of requests has become inactive, triggering a quiesced shutdown",
);
quiescence_token.cancel();
// Notify the serial dvu task that we want to shutdown due to a quiet period
quiesced_notify.notify_one();
})
// Once the first inactive period is detected, this stream is closed (i.e. returns
// `None`)
.map_while(result::Result::ok)
.fuse();
// Continue processing messages as normal until the Naxum app's graceful shutdown is
// triggered. This means we turn the stream back from a stream of
// `Result<Result<Message, _>, Elapsed>` into `Result<Message, _>`
.filter_map(|maybe_elapsed_item| maybe_elapsed_item.ok());

let app = ServiceBuilder::new()
.layer(
Expand All @@ -135,10 +134,7 @@ impl ChangeSetProcessorTask {

let inner =
naxum::serve_with_incoming_limit(inactive_aware_incoming, app.into_make_service(), 1)
.with_graceful_shutdown(graceful_shutdown_signal(
task_token,
quiescence_token.clone(),
));
.with_graceful_shutdown(graceful_shutdown_signal(task_token, quiesced_token));

let inner_fut = inner.into_future();

Expand All @@ -147,44 +143,28 @@ impl ChangeSetProcessorTask {
workspace_id,
change_set_id,
inner: Box::new(inner_fut),
quiescence_token,
}
}

pub(crate) async fn try_run(self) -> Result<Shutdown> {
pub(crate) async fn try_run(self) -> Result<()> {
self.inner.await.map_err(Error::Naxum)?;
metric!(counter.change_set_processor_task.change_set_task = -1);

if self.quiescence_token.is_cancelled() {
debug!(
task = Self::NAME,
si.workspace.id = %self.workspace_id,
si.change_set.id = %self.change_set_id,
"shutdown due to quiescent period",
);
Ok(Shutdown::Quiesced)
} else {
debug!(
task = Self::NAME,
si.workspace.id = %self.workspace_id,
si.change_set.id = %self.change_set_id,
"shutdown complete",
);
Ok(Shutdown::Graceful)
}
info!(
task = Self::NAME,
si.workspace.id = %self.workspace_id,
si.change_set.id = %self.change_set_id,
"shutdown complete",
);
Ok(())
}
}

#[derive(Debug)]
pub(crate) enum Shutdown {
Graceful,
Quiesced,
}

struct QuiescedCaptured {
instance_id: String,
workspace_id: WorkspacePk,
change_set_id: ChangeSetId,
quiescence_token: CancellationToken,
quiesced_notify: Arc<Notify>,
}

#[derive(Clone, Debug)]
Expand All @@ -207,7 +187,7 @@ impl post_process::OnSuccess for DeleteMessageOnSuccess {
let stream = self.stream.clone();

Box::pin(async move {
trace!("deleting message on success");
info!("deleting message on success");
if let Err(err) = stream.delete_message(info.stream_sequence).await {
warn!(
si.error.message = ?err,
Expand Down Expand Up @@ -305,6 +285,7 @@ mod handlers {
use si_data_nats::HeaderMap;
use telemetry::prelude::*;
use telemetry_nats::propagation;
use telemetry_utils::metric;
use thiserror::Error;

use crate::{
Expand Down Expand Up @@ -349,6 +330,7 @@ mod handlers {

impl IntoResponse for HandlerError {
fn into_response(self) -> Response {
metric!(counter.change_set_processor_task.failed_rebase = 1);
// TODO(fnichol): there are different responses, esp. for expected interrupted
error!(si.error.message = ?self, "failed to process message");
Response::default_internal_server_error()
Expand Down
23 changes: 15 additions & 8 deletions lib/rebaser-server/src/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ use ulid::Ulid;

use crate::{
app_state::AppState,
change_set_processor_task::{ChangeSetProcessorTask, ChangeSetProcessorTaskError, Shutdown},
change_set_processor_task::{ChangeSetProcessorTask, ChangeSetProcessorTaskError},
serial_dvu_task::{SerialDvuTask, SerialDvuTaskError},
Shutdown,
};

const CONSUMER_NAME_PREFIX: &str = "rebaser-requests";
Expand Down Expand Up @@ -105,7 +106,9 @@ pub(crate) async fn default(State(state): State<AppState>, subject: Subject) ->
// We want to indendently control the lifecyle of our tasks
let tasks_token = CancellationToken::new();

let run_notify = Arc::new(Notify::new());
let run_dvu_notify = Arc::new(Notify::new());
let quiesced_token = CancellationToken::new();
let quiesced_notify = Arc::new(Notify::new());

let incoming = requests_stream
.create_consumer(rebaser_requests_per_change_set_consumer_config(
Expand All @@ -126,7 +129,9 @@ pub(crate) async fn default(State(state): State<AppState>, subject: Subject) ->
workspace.id,
change_set.id,
ctx_builder.clone(),
run_notify.clone(),
run_dvu_notify.clone(),
quiesced_notify.clone(),
quiesced_token.clone(),
tasks_token.clone(),
);

Expand All @@ -138,8 +143,10 @@ pub(crate) async fn default(State(state): State<AppState>, subject: Subject) ->
workspace.id,
change_set.id,
ctx_builder,
run_notify,
run_dvu_notify,
quiescent_period,
quiesced_notify,
quiesced_token,
tasks_token.clone(),
server_tracker,
);
Expand All @@ -166,11 +173,9 @@ pub(crate) async fn default(State(state): State<AppState>, subject: Subject) ->
// Processor task completed
processor_task_result_result = processor_task_result => {
match processor_task_result_result {
// A quiet period was found in the stream; reply `Ok` to ack and remove this task
Ok(Ok(Shutdown::Quiesced)) => Ok(()),
// Processor exited cleanly, but unexpectedly; reply `Err` to nack for task to
// persist and retry
Ok(Ok(Shutdown::Graceful)) => Err(Error::ChangeSetProcessorCompleted),
Ok(Ok(())) => Err(Error::ChangeSetProcessorCompleted),
// Processor exited with error; reply `Err` to nack for task to persist and retry
Ok(Err(err)) => Err(Error::ChangeSetProcessor(err)),
// Tokio join error on processor exit; reply `Err` to nack for task to persist and
Expand All @@ -181,9 +186,11 @@ pub(crate) async fn default(State(state): State<AppState>, subject: Subject) ->
// Serial dvu task completed
dvu_task_result_result = dvu_task_result => {
match dvu_task_result_result {
// A quiet period was found in the stream; reply Ok to ack and remove this task
Ok(Ok(Shutdown::Quiesced)) => Ok(()),
// Serial dvu exited cleanly, but unexpectedly; reply `Err` to nack for task to
// persist and retry
Ok(Ok(())) => Err(Error::SerialDvuCompleted),
Ok(Ok(Shutdown::Graceful)) => Err(Error::SerialDvuCompleted),
// Serial dvu exited with error; reply `Err` to nack for task to persist and retry
Ok(Err(err)) => Err(Error::SerialDvu(err)),
// Tokio join error on serial dvu exit; reply `Err` to nack for task to persist and
Expand Down
6 changes: 6 additions & 0 deletions lib/rebaser-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,3 +102,9 @@ impl ServerError {
type Error = ServerError;

type Result<T> = std::result::Result<T, ServerError>;

#[derive(Debug)]
pub(crate) enum Shutdown {
Graceful,
Quiesced,
}
Loading

0 comments on commit d604d1e

Please sign in to comment.