Skip to content

Commit

Permalink
[3/?] Add minitrace spans for export worker. (#32770)
Browse files Browse the repository at this point in the history
GitOrigin-RevId: 1283759910f5a4e23742f6acfc5c59c3d90a03d8
  • Loading branch information
nipunn1313 authored and Convex, Inc. committed Jan 6, 2025
1 parent 0c1cf74 commit 3c56a9c
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 1 deletion.
38 changes: 37 additions & 1 deletion crates/application/src/exports/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use common::{
ComponentId,
ComponentPath,
},
minitrace_helpers::get_sampled_span,
runtime::Runtime,
types::{
IndexId,
Expand All @@ -34,6 +35,8 @@ use futures::{
};
use itertools::Itertools;
use keybroker::Identity;
use maplit::btreemap;
use minitrace::future::FutureExt;
use model::exports::types::{
ExportFormat,
ExportRequestor,
Expand Down Expand Up @@ -254,7 +257,18 @@ where
let in_component_str = component_path.in_component_str();

update_progress(format!("Backing up _tables{in_component_str}")).await?;
write_tables_table(&path_prefix, &mut zip_snapshot_upload, namespace, &tables).await?;
let root = get_sampled_span(
&worker.instance_name,
"export_worker/write_table",
&mut worker.runtime.rng(),
btreemap! {
"dev.convex.component_path".to_string() => component_path.to_string(),
"dev.convex.table_name".to_string() => "_tables".to_string(),
},
);
write_tables_table(&path_prefix, &mut zip_snapshot_upload, namespace, &tables)
.in_span(root)
.await?;
}

// sort tables small to large, and write them to the zip.
Expand All @@ -269,6 +283,16 @@ where
let by_id = by_id_indexes
.get(tablet_id)
.ok_or_else(|| anyhow::anyhow!("no by_id index for {} found", tablet_id))?;

let root = get_sampled_span(
&worker.instance_name,
"export_worker/write_table",
&mut worker.runtime.rng(),
btreemap! {
"dev.convex.component_path".to_string() => component_path.to_string(),
"dev.convex.table_name".to_string() => table_name.to_string(),
},
);
write_table(
worker,
&path_prefix,
Expand All @@ -281,6 +305,7 @@ where
by_id,
&usage,
)
.in_span(root)
.await?;
}

Expand All @@ -291,6 +316,16 @@ where
let path_prefix = get_export_path_prefix(&component_path);
let in_component_str = component_path.in_component_str();
update_progress(format!("Backing up _storage{in_component_str}")).await?;

let root = get_sampled_span(
&worker.instance_name,
"export_worker/write_table",
&mut worker.runtime.rng(),
btreemap! {
"dev.convex.component_path".to_string() => component_path.to_string(),
"dev.convex.table_name".to_string() => "_storage".to_string(),
},
);
write_storage_table(
worker,
&path_prefix,
Expand All @@ -303,6 +338,7 @@ where
&usage,
requestor,
)
.in_span(root)
.await?;
}
}
Expand Down
4 changes: 4 additions & 0 deletions crates/application/src/exports/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ pub struct ExportWorker<RT: Runtime> {
pub(super) file_storage: Arc<dyn Storage>,
pub(super) backoff: Backoff,
pub(super) usage_tracking: UsageCounter,
pub(super) instance_name: String,
}

impl<RT: Runtime> ExportWorker<RT> {
Expand All @@ -71,6 +72,7 @@ impl<RT: Runtime> ExportWorker<RT> {
storage: Arc<dyn Storage>,
file_storage: Arc<dyn Storage>,
usage_tracking: UsageCounter,
instance_name: String,
) -> impl Future<Output = ()> + Send {
let mut worker = Self {
runtime,
Expand All @@ -79,6 +81,7 @@ impl<RT: Runtime> ExportWorker<RT> {
file_storage,
backoff: Backoff::new(INITIAL_BACKOFF, MAX_BACKOFF),
usage_tracking,
instance_name,
};
async move {
loop {
Expand Down Expand Up @@ -109,6 +112,7 @@ impl<RT: Runtime> ExportWorker<RT> {
file_storage,
backoff: Backoff::new(INITIAL_BACKOFF, MAX_BACKOFF),
usage_tracking: UsageCounter::new(Arc::new(NoOpUsageEventLogger)),
instance_name: "carnitas".to_string(),
}
}

Expand Down
1 change: 1 addition & 0 deletions crates/application/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -653,6 +653,7 @@ impl<RT: Runtime> Application<RT> {
exports_storage.clone(),
files_storage.clone(),
database.usage_counter().clone(),
instance_name.clone(),
);
let export_worker = Arc::new(Mutex::new(runtime.spawn("export_worker", export_worker)));

Expand Down

0 comments on commit 3c56a9c

Please sign in to comment.