Skip to content

Commit

Permalink
chore: fix queue metric juggedness (#4689)
Browse files Browse the repository at this point in the history
### Description

See hyperlane-xyz/hyperlane-monorepo#4068 for
the problem description.

In this fix, whenever an operation is moved from one queue to another,
its metric count is decremented from the old queue and incremented for
the new one.

My initial implementation approach was to update these metrics inside
`queue.push(op)`, but the metrics for the operation's previous queue
aren't accessible there. #4068 suggests updating them in
`op.set_status`, which can't be done for the same reason, even if `op`
has a pointer to the current queue's metric internally.

So the fix I went for does store a pointer to the current queue metric
internally in `op`, but also adds a new
`op.set_status_and_update_metrics(status, new_queue_metric)` method,
which **must** be used if the queue metrics are to be correctly
calculated.

This works well except for when ops are removed from the confirm queue,
because in the absence of a call to `set_status_and_update_metrics`, no
metric decrementing is done. I considered using the `Drop` trait to
decrement, but it'd have to be implemented individually for each
`PendingOperation` type, which isn't very maintainable. I ended up
decrementing the metric in `confirm_operation`, which is called for both
batches and single submissions and, of course, all implementations of
`PendingOperation`.

Here's a screenshot of my local grafana server showing no jaggedness in
the e2e run, with prometheus configured to scrape every 2s:
![Screenshot 2024-10-15 at 17 26
56](https://github.com/user-attachments/assets/26004e0e-2ccf-4cec-aa23-ee2d032df25a)


### Drive-by changes

Adds the `prepare_queue` arg of `submit_single_operation` to the
`instrument(skip(...))` list so it no longer pollutes logs.

### Related issues

- Fixes hyperlane-xyz/hyperlane-monorepo#4068

### Backward compatibility

Yes

### Testing

Manually, by checking the queue length metric of an e2e run in grafana
  • Loading branch information
daniel-savu authored Oct 16, 2024
1 parent 9382658 commit efd438f
Show file tree
Hide file tree
Showing 7 changed files with 71 additions and 17 deletions.
1 change: 1 addition & 0 deletions rust/main/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 10 additions & 6 deletions rust/main/agents/relayer/src/msg/op_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,11 @@ impl OpQueue {
#[instrument(skip(self), ret, fields(queue_label=%self.queue_metrics_label), level = "trace")]
pub async fn push(&self, mut op: QueueOperation, new_status: Option<PendingOperationStatus>) {
if let Some(new_status) = new_status {
op.set_status(new_status);
op.set_status_and_update_metrics(
new_status,
Arc::new(self.get_operation_metric(op.as_ref())),
);
}
// increment the metric before pushing onto the queue, because we lose ownership afterwards
self.get_operation_metric(op.as_ref()).inc();

self.queue.lock().await.push(Reverse(op));
}
Expand All @@ -52,9 +53,6 @@ impl OpQueue {
let mut queue = self.queue.lock().await;
let mut popped = vec![];
while let Some(Reverse(op)) = queue.pop() {
// even if the metric is decremented here, the operation may fail to process and be re-added to the queue.
// in those cases, the queue length will look like it has spikes whose sizes are at most `limit`
self.get_operation_metric(op.as_ref()).dec();
popped.push(op);
if popped.len() >= limit {
break;
Expand Down Expand Up @@ -242,6 +240,12 @@ pub mod test {
fn set_retries(&mut self, _retries: u32) {
todo!()
}

fn get_metric(&self) -> Option<Arc<IntGauge>> {
None
}

fn set_metric(&mut self, _metric: Arc<IntGauge>) {}
}

pub fn dummy_metrics_and_label() -> (IntGaugeVec, String) {
Expand Down
5 changes: 4 additions & 1 deletion rust/main/agents/relayer/src/msg/op_submitter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ async fn submit_task(
}
}

#[instrument(skip(confirm_queue, metrics), ret, level = "debug")]
#[instrument(skip(prepare_queue, confirm_queue, metrics), ret, level = "debug")]
async fn submit_single_operation(
mut op: QueueOperation,
prepare_queue: &mut OpQueue,
Expand Down Expand Up @@ -457,6 +457,9 @@ async fn confirm_operation(
PendingOperationResult::Success => {
debug!(?op, "Operation confirmed");
metrics.ops_confirmed.inc();
if let Some(metric) = op.get_metric() {
metric.dec()
}
}
PendingOperationResult::NotReady => {
confirm_queue.push(op, None).await;
Expand Down
11 changes: 11 additions & 0 deletions rust/main/agents/relayer/src/msg/pending_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ pub struct PendingMessage {
#[new(default)]
#[serde(skip_serializing)]
metadata: Option<Vec<u8>>,
#[new(default)]
#[serde(skip_serializing)]
metric: Option<Arc<IntGauge>>,
}

impl Debug for PendingMessage {
Expand Down Expand Up @@ -481,6 +484,14 @@ impl PendingOperation for PendingMessage {
fn try_get_mailbox(&self) -> Option<Arc<dyn Mailbox>> {
Some(self.ctx.destination_mailbox.clone())
}

fn get_metric(&self) -> Option<Arc<IntGauge>> {
self.metric.clone()
}

fn set_metric(&mut self, metric: Arc<IntGauge>) {
self.metric = Some(metric);
}
}

impl PendingMessage {
Expand Down
1 change: 1 addition & 0 deletions rust/main/hyperlane-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ itertools.workspace = true
num = { workspace = true, features = ["serde"] }
num-derive.workspace = true
num-traits.workspace = true
prometheus.workspace = true
serde = { workspace = true }
serde_json = { workspace = true }
sha3 = { workspace = true }
Expand Down
21 changes: 21 additions & 0 deletions rust/main/hyperlane-core/src/traits/pending_operation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use crate::{
};
use async_trait::async_trait;
use num::CheckedDiv;
use prometheus::IntGauge;
use strum::Display;
use tracing::warn;

Expand Down Expand Up @@ -63,13 +64,33 @@ pub trait PendingOperation: Send + Sync + Debug + TryBatchAs<HyperlaneMessage> {
/// Label to use for metrics granularity.
fn app_context(&self) -> Option<String>;

/// Get the metric associated with this operation.
fn get_metric(&self) -> Option<Arc<IntGauge>>;

/// Set the metric associated with this operation.
fn set_metric(&mut self, metric: Arc<IntGauge>);

/// The status of the operation, which should explain why it is in the
/// queue.
fn status(&self) -> PendingOperationStatus;

/// Set the status of the operation.
fn set_status(&mut self, status: PendingOperationStatus);

/// Set the status of the operation and update the metrics.
fn set_status_and_update_metrics(
&mut self,
status: PendingOperationStatus,
new_metric: Arc<IntGauge>,
) {
self.set_status(status);
if let Some(old_metric) = self.get_metric() {
old_metric.dec();
}
new_metric.inc();
self.set_metric(new_metric);
}

/// Get tuple of labels for metrics.
fn get_operation_labels(&self) -> (String, String) {
let app_context = self.app_context().unwrap_or("Unknown".to_string());
Expand Down
33 changes: 23 additions & 10 deletions rust/sealevel/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit efd438f

Please sign in to comment.