Skip to content

Commit

Permalink
Add gauages to track when index/database workers are doing useful wor…
Browse files Browse the repository at this point in the history
…k (#23247)

While we have timers for most of these workers, they don't show you
what's currently running. So if a worker runs for a long time you can't
be sure of its status until it finishes, which isn't super helpful.

The gauges will also show us when workers are or aren't running
concurrently, which might also make it easier to understand how they
interact with each other or other user actions.

I wanted this specifically for search index workers, but I figure the
concept is useful for most other types of background workers, so I've
added a similar to gauge to the ones I could find.

GitOrigin-RevId: f7cd20d2da115923ec9454b465199baba544139c
  • Loading branch information
sjudd authored and Convex, Inc. committed Mar 11, 2024
1 parent 7df0ab9 commit 878094a
Show file tree
Hide file tree
Showing 10 changed files with 92 additions and 2 deletions.
7 changes: 6 additions & 1 deletion crates/application/src/export_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,10 @@ use value::{
VirtualTableMapping,
};

use crate::metrics::export_timer;
use crate::metrics::{
export_timer,
log_worker_starting,
};

const INITIAL_BACKOFF: Duration = Duration::from_secs(1);
const MAX_BACKOFF: Duration = Duration::from_secs(900); // 15 minutes
Expand Down Expand Up @@ -214,6 +217,7 @@ impl<RT: Runtime> ExportWorker<RT> {
},
(Some(export), None) => {
tracing::info!("Export requested.");
let _status = log_worker_starting("ExportWorker");
let timer = export_timer();
let ts = self.database.now_ts_for_reads();
let in_progress_export = (*export).clone().in_progress(*ts)?;
Expand All @@ -233,6 +237,7 @@ impl<RT: Runtime> ExportWorker<RT> {
},
(None, Some(export)) => {
tracing::info!("In progress export restarting...");
let _status = log_worker_starting("ExportWorker");
let timer = export_timer();
self.export(export).await?;
timer.finish();
Expand Down
30 changes: 30 additions & 0 deletions crates/application/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use metrics::{
log_counter_with_tags,
log_distribution_with_tags,
log_gauge_with_tags,
metric_tag_const,
metric_tag_const_value,
register_convex_counter,
register_convex_gauge,
register_convex_histogram,
StatusTimer,
STATUS_LABEL,
Expand Down Expand Up @@ -67,3 +69,31 @@ register_convex_histogram!(
pub fn export_timer() -> StatusTimer {
StatusTimer::new(&SNAPSHOT_EXPORT_TIMER_SECONDS)
}

pub struct AppWorkerStatus {
name: &'static str,
}

impl Drop for AppWorkerStatus {
fn drop(&mut self) {
log_worker_status(false, self.name);
}
}

register_convex_gauge!(
APP_WORKER_IN_PROGRESS_TOTAL,
"1 if a worker is working, 0 otherwise",
&["worker"],
);
pub fn log_worker_starting(name: &'static str) -> AppWorkerStatus {
log_worker_status(true, name);
AppWorkerStatus { name }
}

fn log_worker_status(is_working: bool, name: &'static str) {
log_gauge_with_tags(
&APP_WORKER_IN_PROGRESS_TOTAL,
if is_working { 1f64 } else { 0f64 },
vec![metric_tag_const_value("worker", name)],
)
}
4 changes: 4 additions & 0 deletions crates/application/src/schema_worker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ use metrics::{
schema_validation_timer,
};

use crate::metrics::log_worker_starting;

mod metrics;

const INITIAL_BACKOFF: Duration = Duration::from_millis(10);
Expand Down Expand Up @@ -75,6 +77,7 @@ impl<RT: Runtime> SchemaWorker<RT> {
}

pub async fn run(&self) -> anyhow::Result<()> {
let status = log_worker_starting("SchemaWorker");
let mut tx: Transaction<RT> = self.database.begin(Identity::system()).await?;
let snapshot = self.database.snapshot(tx.begin_timestamp())?;
if let Some((id, db_schema)) = SchemaModel::new(&mut tx)
Expand Down Expand Up @@ -170,6 +173,7 @@ impl<RT: Runtime> SchemaWorker<RT> {
timer.finish();
}

drop(status);
let token = tx.into_token()?;
tracing::debug!("SchemaWorker waiting...");
let subscription = self.database.subscribe(token).await?;
Expand Down
7 changes: 6 additions & 1 deletion crates/application/src/snapshot_import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,10 @@ use value::{

use crate::{
export_worker::FileStorageZipMetadata,
metrics::snapshot_import_timer,
metrics::{
log_worker_starting,
snapshot_import_timer,
},
Application,
};

Expand Down Expand Up @@ -219,6 +222,7 @@ impl<RT: Runtime> SnapshotImportWorker<RT> {
/// If an import has Uploaded, parse it and set to WaitingForConfirmation.
/// If an import is InProgress, execute it.
pub async fn run(&mut self) -> anyhow::Result<()> {
let status = log_worker_starting("SnapshotImport");
let mut tx = self.database.begin(Identity::system()).await?;
let mut import_model = SnapshotImportModel::new(&mut tx);
if let Some(import_uploaded) = import_model.import_in_state(ImportState::Uploaded).await? {
Expand All @@ -238,6 +242,7 @@ impl<RT: Runtime> SnapshotImportWorker<RT> {
.await?;
timer.finish();
}
drop(status);
let token = tx.into_token()?;
let subscription = self.database.subscribe(token).await?;
subscription.wait_for_invalidation().await;
Expand Down
3 changes: 3 additions & 0 deletions crates/application/src/table_summary_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ use futures::{
};
use parking_lot::Mutex;

use crate::metrics::log_worker_starting;

pub struct TableSummaryWorker<RT: Runtime> {
runtime: RT,
database: Database<RT>,
Expand Down Expand Up @@ -80,6 +82,7 @@ impl<RT: Runtime> TableSummaryWorker<RT> {
last_write_info: &mut Option<LastWriteInfo>,
writer: &TableSummaryWriter<RT>,
) -> anyhow::Result<()> {
let _status = log_worker_starting("TableSummaryWorker");
let commits_since_load = self.database.write_commits_since_load();
let now = self.runtime.unix_timestamp();
if let Some(last_write_info) = last_write_info
Expand Down
4 changes: 4 additions & 0 deletions crates/database/src/index_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ use crate::{
metrics::{
log_index_backfilled,
log_num_indexes_to_backfill,
log_worker_starting,
},
Database,
ResolvedQuery,
Expand Down Expand Up @@ -262,6 +263,7 @@ impl<RT: Runtime> IndexWorker<RT> {
async fn run(&mut self) -> anyhow::Result<()> {
log::info!("Starting IndexWorker");
loop {
let status = log_worker_starting("IndexWorker");
// Get all the documents from the `_index` table.
let mut tx = self.database.begin(Identity::system()).await?;
// Index doesn't have `by_creation_time` index, and thus can't be queried via
Expand Down Expand Up @@ -311,6 +313,8 @@ impl<RT: Runtime> IndexWorker<RT> {
if self.should_terminate {
return Ok(());
}
drop(status);

let token = tx.into_token()?;
let subscription = self.database.subscribe(token).await?;
subscription.wait_for_invalidation().await;
Expand Down
5 changes: 5 additions & 0 deletions crates/database/src/index_workers/fast_forward.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ use crate::{
},
timeout_with_jitter,
},
metrics::log_worker_starting,
search_index_worker::fast_forward::SearchFastForward,
vector_index_worker::fast_forward::VectorFastForward,
Database,
Expand Down Expand Up @@ -114,6 +115,7 @@ impl FastForwardIndexWorker {
let mut vector_search_last_fast_forward_info: Option<LastFastForwardInfo> = None;

loop {
let status = log_worker_starting("TextSearchFastForward");
tracing::debug!("FastForwardWorker checking if we can fast forward");
Self::fast_forward::<RT, SearchSnapshotVersion, SearchFastForward>(
"TextSearch",
Expand All @@ -122,13 +124,16 @@ impl FastForwardIndexWorker {
&mut text_search_last_fast_forward_info,
)
.await?;
drop(status);
let status = log_worker_starting("VectorSearchFastForward");
Self::fast_forward::<RT, (), VectorFastForward>(
"VectorSearch",
rt,
db,
&mut vector_search_last_fast_forward_info,
)
.await?;
drop(status);

backoff.reset();
timeout_with_jitter(rt, *DATABASE_WORKERS_POLL_INTERVAL).await
Expand Down
3 changes: 3 additions & 0 deletions crates/database/src/index_workers/search_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use crate::{
},
timeout_with_jitter,
},
metrics::log_worker_starting,
vector_index_worker::{
compactor::CompactionConfig,
writer::VectorMetadataWriter,
Expand Down Expand Up @@ -110,11 +111,13 @@ impl<RT: Runtime> SearchIndexWorker<RT> {
tracing::info!("Starting {name}");

loop {
let status = log_worker_starting(name);
let (metrics, token) = match self {
SearchIndexWorker::VectorFlusher(flusher) => flusher.step().await?,
SearchIndexWorker::VectorCompactor(compactor) => compactor.step().await?,
SearchIndexWorker::SearchFlusher(flusher) => flusher.step().await?,
};
drop(status);

if !metrics.is_empty() {
// We did some useful work this loop iteration that we expect is committed.
Expand Down
29 changes: 29 additions & 0 deletions crates/database/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use metrics::{
log_distribution,
log_distribution_with_tags,
log_gauge,
log_gauge_with_tags,
metric_tag_const,
metric_tag_const_value,
register_convex_counter,
Expand Down Expand Up @@ -548,6 +549,34 @@ pub fn log_virtual_table_query() {
log_counter(&VIRTUAL_TABLE_QUERY_REQUESTS_TOTAL, 1);
}

pub struct DatabaseWorkerStatus {
name: &'static str,
}

impl Drop for DatabaseWorkerStatus {
fn drop(&mut self) {
log_worker_status(false, self.name);
}
}

register_convex_gauge!(
DATABASE_WORKER_IN_PROGRESS_TOTAL,
"1 if a worker is working, 0 otherwise",
&["worker"],
);
pub fn log_worker_starting(name: &'static str) -> DatabaseWorkerStatus {
log_worker_status(true, name);
DatabaseWorkerStatus { name }
}

fn log_worker_status(is_working: bool, name: &'static str) {
log_gauge_with_tags(
&DATABASE_WORKER_IN_PROGRESS_TOTAL,
if is_working { 1f64 } else { 0f64 },
vec![metric_tag_const_value("worker", name)],
)
}

pub mod search {

use metrics::{
Expand Down
2 changes: 2 additions & 0 deletions crates/database/src/vector_bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ use vector::{
use crate::{
committer::CommitterClient,
index_workers::fast_forward::load_metadata_fast_forward_ts,
metrics::log_worker_starting,
};

pub struct VectorBootstrapWorker<RT: Runtime> {
Expand Down Expand Up @@ -139,6 +140,7 @@ impl<RT: Runtime> VectorBootstrapWorker<RT> {
Option<Timestamp>,
)>,
) -> anyhow::Result<VectorIndexManager> {
let _status = log_worker_starting("VectorBootstrap");
let timer = vector::metrics::bootstrap_timer();
let upper_bound = persistence.upper_bound();

Expand Down

0 comments on commit 878094a

Please sign in to comment.