From 24550ea1d71013e04da8e742e70145e6e90a1702 Mon Sep 17 00:00:00 2001 From: Nipunn Koorapati Date: Fri, 3 Jan 2025 18:07:12 -0800 Subject: [PATCH] Remove the ComponentTree struct (#32758) We don't need it - we can actually export in any order to the zip file. This is a stepping stone towards us being able to sort tables in small -> big order across components. GitOrigin-RevId: 181fb5655edf7d314c9b19d609719f9af9d167c1 --- crates/application/src/export_worker.rs | 156 +++++++----------- .../common/src/components/component_path.rs | 4 + 2 files changed, 65 insertions(+), 95 deletions(-) diff --git a/crates/application/src/export_worker.rs b/crates/application/src/export_worker.rs index 3b8a0223..69cb2d39 100644 --- a/crates/application/src/export_worker.rs +++ b/crates/application/src/export_worker.rs @@ -8,7 +8,6 @@ use std::{ }; use anyhow::Context; -use async_recursion::async_recursion; use async_zip::{ write::{ EntryStreamWriter, @@ -26,7 +25,6 @@ use common::{ bootstrap_model::tables::TABLES_TABLE, components::{ ComponentId, - ComponentName, ComponentPath, }, document::{ @@ -64,6 +62,7 @@ use futures::{ StreamExt, TryStreamExt, }; +use itertools::Itertools; use keybroker::Identity; use mime2ext::mime2ext; use model::{ @@ -151,39 +150,6 @@ pub struct ExportWorker { usage_tracking: UsageCounter, } -/// All components (including unmounted) organized into tree format. -struct ComponentTree { - id: ComponentId, - children: BTreeMap>, -} - -impl ComponentTree { - fn new( - current_component_id: ComponentId, - component_ids_to_paths: &BTreeMap, - ) -> anyhow::Result { - let current_path = component_ids_to_paths - .get(¤t_component_id) - .context("Not found?")?; - let mut children = BTreeMap::new(); - for (component_id, component_path) in component_ids_to_paths { - let Some((parent_path, component_name)) = component_path.parent() else { - continue; - }; - if parent_path == *current_path { - children.insert( - component_name, - Box::new(Self::new(*component_id, component_ids_to_paths)?), - ); - } - } - Ok(Self { - id: current_component_id, - children, - }) - } -} - impl ExportWorker { #[allow(clippy::new_ret_no_self)] pub fn new( @@ -315,7 +281,7 @@ impl ExportWorker { { let storage = &self.storage; update_progress("Beginning backup".to_string()).await?; - let (ts, tables, component_ids_to_paths, by_id_indexes, system_tables, component_tree) = { + let (ts, tables, component_ids_to_paths, by_id_indexes, system_tables) = { let mut tx = self.database.begin(Identity::system()).await?; let by_id_indexes = IndexModel::new(&mut tx).by_id_indexes().await?; let snapshot = self.database.snapshot(tx.begin_timestamp())?; @@ -336,7 +302,6 @@ impl ExportWorker { }) .collect(); let component_ids_to_paths = snapshot.component_ids_to_paths(); - let component_tree = ComponentTree::new(ComponentId::Root, &component_ids_to_paths)?; let system_tables = snapshot .table_registry .iter_active_system_tables() @@ -348,7 +313,6 @@ impl ExportWorker { component_ids_to_paths, by_id_indexes, system_tables, - component_tree, ) }; match format { @@ -363,9 +327,8 @@ impl ExportWorker { let zipper = self.construct_zip_snapshot( writer, - component_tree, tables.clone(), - &component_ids_to_paths, + component_ids_to_paths, ts, by_id_indexes, system_tables, @@ -381,19 +344,17 @@ impl ExportWorker { } } - #[async_recursion] async fn write_component<'a, 'b: 'a, F, Fut>( &self, - path_prefix: &'a str, - component_tree: &'a ComponentTree, + namespace: TableNamespace, + component_path: ComponentPath, zip_snapshot_upload: &'a mut ZipSnapshotUpload<'b>, tables: &'a mut BTreeMap, - component_ids_to_paths: &BTreeMap, snapshot_ts: RepeatableTimestamp, by_id_indexes: &BTreeMap, system_tables: &BTreeMap<(TableNamespace, TableName), TabletId>, include_storage: bool, - usage: FunctionUsageTracker, + usage: &FunctionUsageTracker, requestor: ExportRequestor, update_progress: F, ) -> anyhow::Result<()> @@ -401,11 +362,8 @@ impl ExportWorker { F: Fn(String) -> Fut + Send + Copy, Fut: Future> + Send, { - let namespace: TableNamespace = component_tree.id.into(); - let component_path = component_ids_to_paths - .get(&component_tree.id) - .cloned() - .unwrap_or_default(); + let path_prefix = get_export_path_prefix(&component_path); + let in_component_str = component_path.in_component_str(); let tablet_ids: BTreeSet<_> = tables .iter() @@ -417,7 +375,7 @@ impl ExportWorker { update_progress(format!("Enumerating tables{in_component_str}")).await?; // _tables let mut table_upload = zip_snapshot_upload - .start_system_table(path_prefix, TABLES_TABLE.clone()) + .start_system_table(&path_prefix, TABLES_TABLE.clone()) .await?; // Write documents from stream to table uploads, in table number order. @@ -452,7 +410,7 @@ impl ExportWorker { // First write metadata to _storage/documents.jsonl let mut table_upload = zip_snapshot_upload - .start_system_table(path_prefix, FILE_STORAGE_VIRTUAL_TABLE.clone()) + .start_system_table(&path_prefix, FILE_STORAGE_VIRTUAL_TABLE.clone()) .await?; let table_iterator = self.database.table_iterator(snapshot_ts, 1000, None); let stream = table_iterator.stream_documents_in_table(*tablet_id, *by_id, None); @@ -552,7 +510,7 @@ impl ExportWorker { } let mut table_upload = zip_snapshot_upload - .start_table(path_prefix, table_name.clone(), generated_schema) + .start_table(&path_prefix, table_name.clone(), generated_schema) .await?; let table_iterator = self.database.table_iterator(snapshot_ts, 1000, None); @@ -572,39 +530,14 @@ impl ExportWorker { table_upload.complete().await?; } - // Write children components, if there are any. - for (name, child) in &component_tree.children { - let path_prefix = format!( - "{path_prefix}{}/{}/", - &*COMPONENTS_TABLE, - String::from(name.clone()) - ); - self.write_component( - &path_prefix, - child, - zip_snapshot_upload, - tables, - component_ids_to_paths, - snapshot_ts, - by_id_indexes, - system_tables, - include_storage, - usage.clone(), - requestor, - update_progress, - ) - .await?; - } - Ok(()) } async fn construct_zip_snapshot( &self, mut writer: ChannelWriter, - component_tree: ComponentTree, mut tables: BTreeMap, - component_ids_to_paths: &BTreeMap, + component_ids_to_paths: BTreeMap, snapshot_ts: RepeatableTimestamp, by_id_indexes: BTreeMap, system_tables: BTreeMap<(TableNamespace, TableName), TabletId>, @@ -619,21 +552,23 @@ impl ExportWorker { { let mut zip_snapshot_upload = ZipSnapshotUpload::new(&mut writer).await?; - self.write_component( - "", - &component_tree, - &mut zip_snapshot_upload, - &mut tables, - component_ids_to_paths, - snapshot_ts, - &by_id_indexes, - &system_tables, - include_storage, - usage, - requestor, - update_progress, - ) - .await?; + for (component_id, component_path) in component_ids_to_paths { + let namespace: TableNamespace = component_id.into(); + self.write_component( + namespace, + component_path, + &mut zip_snapshot_upload, + &mut tables, + snapshot_ts, + &by_id_indexes, + &system_tables, + include_storage, + &usage, + requestor, + update_progress, + ) + .await?; + } // Complete upload. zip_snapshot_upload.complete().await?; @@ -785,6 +720,19 @@ impl ExportWorker { } } +fn get_export_path_prefix(component_path: &ComponentPath) -> String { + component_path + .iter() + .map(|parent_name| { + format!( + "{}/{}/", + &*COMPONENTS_TABLE, + String::from(parent_name.clone()) + ) + }) + .join("") +} + #[derive(thiserror::Error, Debug)] #[error("Export canceled")] struct ExportCanceled; @@ -1005,7 +953,10 @@ mod tests { use super::ExportWorker; use crate::{ - export_worker::README_MD_CONTENTS, + export_worker::{ + get_export_path_prefix, + README_MD_CONTENTS, + }, test_helpers::ApplicationTestExt, tests::components::unmount_component, Application, @@ -1442,4 +1393,19 @@ mod tests { .await?; Ok(()) } + + #[test] + fn test_get_export_path_prefix() -> anyhow::Result<()> { + assert_eq!(get_export_path_prefix(&ComponentPath::root()), ""); + assert_eq!(get_export_path_prefix(&"a".parse()?), "_components/a/"); + assert_eq!( + get_export_path_prefix(&"a/b".parse()?), + "_components/a/_components/b/" + ); + assert_eq!( + get_export_path_prefix(&"a/b/c".parse()?), + "_components/a/_components/b/_components/c/" + ); + Ok(()) + } } diff --git a/crates/common/src/components/component_path.rs b/crates/common/src/components/component_path.rs index 38fcad36..fe397ff5 100644 --- a/crates/common/src/components/component_path.rs +++ b/crates/common/src/components/component_path.rs @@ -108,6 +108,10 @@ impl ComponentPath { } } + pub fn iter(&self) -> impl Iterator { + self.path.iter() + } + pub fn join(&self, name: ComponentName) -> Self { let mut path = self.path.clone(); path.push(name);