diff --git a/crates/application/src/export_worker.rs b/crates/application/src/export_worker.rs index 3bfb02d6..254fd254 100644 --- a/crates/application/src/export_worker.rs +++ b/crates/application/src/export_worker.rs @@ -119,7 +119,10 @@ use value::{ VirtualTableMapping, }; -use crate::metrics::export_timer; +use crate::metrics::{ + export_timer, + log_worker_starting, +}; const INITIAL_BACKOFF: Duration = Duration::from_secs(1); const MAX_BACKOFF: Duration = Duration::from_secs(900); // 15 minutes @@ -214,6 +217,7 @@ impl ExportWorker { }, (Some(export), None) => { tracing::info!("Export requested."); + let _status = log_worker_starting("ExportWorker"); let timer = export_timer(); let ts = self.database.now_ts_for_reads(); let in_progress_export = (*export).clone().in_progress(*ts)?; @@ -233,6 +237,7 @@ impl ExportWorker { }, (None, Some(export)) => { tracing::info!("In progress export restarting..."); + let _status = log_worker_starting("ExportWorker"); let timer = export_timer(); self.export(export).await?; timer.finish(); diff --git a/crates/application/src/metrics.rs b/crates/application/src/metrics.rs index 332cea91..ab3697fc 100644 --- a/crates/application/src/metrics.rs +++ b/crates/application/src/metrics.rs @@ -1,9 +1,11 @@ use metrics::{ log_counter_with_tags, log_distribution_with_tags, + log_gauge_with_tags, metric_tag_const, metric_tag_const_value, register_convex_counter, + register_convex_gauge, register_convex_histogram, StatusTimer, STATUS_LABEL, @@ -67,3 +69,31 @@ register_convex_histogram!( pub fn export_timer() -> StatusTimer { StatusTimer::new(&SNAPSHOT_EXPORT_TIMER_SECONDS) } + +pub struct AppWorkerStatus { + name: &'static str, +} + +impl Drop for AppWorkerStatus { + fn drop(&mut self) { + log_worker_status(false, self.name); + } +} + +register_convex_gauge!( + APP_WORKER_IN_PROGRESS_TOTAL, + "1 if a worker is working, 0 otherwise", + &["worker"], +); +pub fn log_worker_starting(name: &'static str) -> AppWorkerStatus { + log_worker_status(true, name); + AppWorkerStatus { name } +} + +fn log_worker_status(is_working: bool, name: &'static str) { + log_gauge_with_tags( + &APP_WORKER_IN_PROGRESS_TOTAL, + if is_working { 1f64 } else { 0f64 }, + vec![metric_tag_const_value("worker", name)], + ) +} diff --git a/crates/application/src/schema_worker/mod.rs b/crates/application/src/schema_worker/mod.rs index 72ce8523..801bfe63 100644 --- a/crates/application/src/schema_worker/mod.rs +++ b/crates/application/src/schema_worker/mod.rs @@ -35,6 +35,8 @@ use metrics::{ schema_validation_timer, }; +use crate::metrics::log_worker_starting; + mod metrics; const INITIAL_BACKOFF: Duration = Duration::from_millis(10); @@ -75,6 +77,7 @@ impl SchemaWorker { } pub async fn run(&self) -> anyhow::Result<()> { + let status = log_worker_starting("SchemaWorker"); let mut tx: Transaction = self.database.begin(Identity::system()).await?; let snapshot = self.database.snapshot(tx.begin_timestamp())?; if let Some((id, db_schema)) = SchemaModel::new(&mut tx) @@ -170,6 +173,7 @@ impl SchemaWorker { timer.finish(); } + drop(status); let token = tx.into_token()?; tracing::debug!("SchemaWorker waiting..."); let subscription = self.database.subscribe(token).await?; diff --git a/crates/application/src/snapshot_import.rs b/crates/application/src/snapshot_import.rs index ea347602..44d9f342 100644 --- a/crates/application/src/snapshot_import.rs +++ b/crates/application/src/snapshot_import.rs @@ -163,7 +163,10 @@ use value::{ use crate::{ export_worker::FileStorageZipMetadata, - metrics::snapshot_import_timer, + metrics::{ + log_worker_starting, + snapshot_import_timer, + }, Application, }; @@ -219,6 +222,7 @@ impl SnapshotImportWorker { /// If an import has Uploaded, parse it and set to WaitingForConfirmation. /// If an import is InProgress, execute it. pub async fn run(&mut self) -> anyhow::Result<()> { + let status = log_worker_starting("SnapshotImport"); let mut tx = self.database.begin(Identity::system()).await?; let mut import_model = SnapshotImportModel::new(&mut tx); if let Some(import_uploaded) = import_model.import_in_state(ImportState::Uploaded).await? { @@ -238,6 +242,7 @@ impl SnapshotImportWorker { .await?; timer.finish(); } + drop(status); let token = tx.into_token()?; let subscription = self.database.subscribe(token).await?; subscription.wait_for_invalidation().await; diff --git a/crates/application/src/table_summary_worker.rs b/crates/application/src/table_summary_worker.rs index c308733e..42795d09 100644 --- a/crates/application/src/table_summary_worker.rs +++ b/crates/application/src/table_summary_worker.rs @@ -29,6 +29,8 @@ use futures::{ }; use parking_lot::Mutex; +use crate::metrics::log_worker_starting; + pub struct TableSummaryWorker { runtime: RT, database: Database, @@ -80,6 +82,7 @@ impl TableSummaryWorker { last_write_info: &mut Option, writer: &TableSummaryWriter, ) -> anyhow::Result<()> { + let _status = log_worker_starting("TableSummaryWorker"); let commits_since_load = self.database.write_commits_since_load(); let now = self.runtime.unix_timestamp(); if let Some(last_write_info) = last_write_info diff --git a/crates/database/src/index_worker.rs b/crates/database/src/index_worker.rs index b291e3e1..e63457e4 100644 --- a/crates/database/src/index_worker.rs +++ b/crates/database/src/index_worker.rs @@ -93,6 +93,7 @@ use crate::{ metrics::{ log_index_backfilled, log_num_indexes_to_backfill, + log_worker_starting, }, Database, ResolvedQuery, @@ -262,6 +263,7 @@ impl IndexWorker { async fn run(&mut self) -> anyhow::Result<()> { log::info!("Starting IndexWorker"); loop { + let status = log_worker_starting("IndexWorker"); // Get all the documents from the `_index` table. let mut tx = self.database.begin(Identity::system()).await?; // Index doesn't have `by_creation_time` index, and thus can't be queried via @@ -311,6 +313,8 @@ impl IndexWorker { if self.should_terminate { return Ok(()); } + drop(status); + let token = tx.into_token()?; let subscription = self.database.subscribe(token).await?; subscription.wait_for_invalidation().await; diff --git a/crates/database/src/index_workers/fast_forward.rs b/crates/database/src/index_workers/fast_forward.rs index d162d659..5639e5c3 100644 --- a/crates/database/src/index_workers/fast_forward.rs +++ b/crates/database/src/index_workers/fast_forward.rs @@ -59,6 +59,7 @@ use crate::{ }, timeout_with_jitter, }, + metrics::log_worker_starting, search_index_worker::fast_forward::SearchFastForward, vector_index_worker::fast_forward::VectorFastForward, Database, @@ -114,6 +115,7 @@ impl FastForwardIndexWorker { let mut vector_search_last_fast_forward_info: Option = None; loop { + let status = log_worker_starting("TextSearchFastForward"); tracing::debug!("FastForwardWorker checking if we can fast forward"); Self::fast_forward::( "TextSearch", @@ -122,6 +124,8 @@ impl FastForwardIndexWorker { &mut text_search_last_fast_forward_info, ) .await?; + drop(status); + let status = log_worker_starting("VectorSearchFastForward"); Self::fast_forward::( "VectorSearch", rt, @@ -129,6 +133,7 @@ impl FastForwardIndexWorker { &mut vector_search_last_fast_forward_info, ) .await?; + drop(status); backoff.reset(); timeout_with_jitter(rt, *DATABASE_WORKERS_POLL_INTERVAL).await diff --git a/crates/database/src/index_workers/search_worker.rs b/crates/database/src/index_workers/search_worker.rs index e5ddf0fe..14535965 100644 --- a/crates/database/src/index_workers/search_worker.rs +++ b/crates/database/src/index_workers/search_worker.rs @@ -24,6 +24,7 @@ use crate::{ }, timeout_with_jitter, }, + metrics::log_worker_starting, vector_index_worker::{ compactor::CompactionConfig, writer::VectorMetadataWriter, @@ -110,11 +111,13 @@ impl SearchIndexWorker { tracing::info!("Starting {name}"); loop { + let status = log_worker_starting(name); let (metrics, token) = match self { SearchIndexWorker::VectorFlusher(flusher) => flusher.step().await?, SearchIndexWorker::VectorCompactor(compactor) => compactor.step().await?, SearchIndexWorker::SearchFlusher(flusher) => flusher.step().await?, }; + drop(status); if !metrics.is_empty() { // We did some useful work this loop iteration that we expect is committed. diff --git a/crates/database/src/metrics.rs b/crates/database/src/metrics.rs index 35dbcedf..06a6c768 100644 --- a/crates/database/src/metrics.rs +++ b/crates/database/src/metrics.rs @@ -9,6 +9,7 @@ use metrics::{ log_distribution, log_distribution_with_tags, log_gauge, + log_gauge_with_tags, metric_tag_const, metric_tag_const_value, register_convex_counter, @@ -548,6 +549,34 @@ pub fn log_virtual_table_query() { log_counter(&VIRTUAL_TABLE_QUERY_REQUESTS_TOTAL, 1); } +pub struct DatabaseWorkerStatus { + name: &'static str, +} + +impl Drop for DatabaseWorkerStatus { + fn drop(&mut self) { + log_worker_status(false, self.name); + } +} + +register_convex_gauge!( + DATABASE_WORKER_IN_PROGRESS_TOTAL, + "1 if a worker is working, 0 otherwise", + &["worker"], +); +pub fn log_worker_starting(name: &'static str) -> DatabaseWorkerStatus { + log_worker_status(true, name); + DatabaseWorkerStatus { name } +} + +fn log_worker_status(is_working: bool, name: &'static str) { + log_gauge_with_tags( + &DATABASE_WORKER_IN_PROGRESS_TOTAL, + if is_working { 1f64 } else { 0f64 }, + vec![metric_tag_const_value("worker", name)], + ) +} + pub mod search { use metrics::{ diff --git a/crates/database/src/vector_bootstrap.rs b/crates/database/src/vector_bootstrap.rs index ade422b7..cee48f49 100644 --- a/crates/database/src/vector_bootstrap.rs +++ b/crates/database/src/vector_bootstrap.rs @@ -42,6 +42,7 @@ use vector::{ use crate::{ committer::CommitterClient, index_workers::fast_forward::load_metadata_fast_forward_ts, + metrics::log_worker_starting, }; pub struct VectorBootstrapWorker { @@ -139,6 +140,7 @@ impl VectorBootstrapWorker { Option, )>, ) -> anyhow::Result { + let _status = log_worker_starting("VectorBootstrap"); let timer = vector::metrics::bootstrap_timer(); let upper_bound = persistence.upper_bound();