Skip to content

Commit

Permalink
Remove the ComponentTree struct (#32758)
Browse files Browse the repository at this point in the history
We don't need it - we can actually export in any order
to the zip file. This is a stepping stone towards us
being able to sort tables in small -> big order across
components.

GitOrigin-RevId: 181fb5655edf7d314c9b19d609719f9af9d167c1
  • Loading branch information
nipunn1313 authored and Convex, Inc. committed Jan 4, 2025
1 parent 8686ce1 commit 24550ea
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 95 deletions.
156 changes: 61 additions & 95 deletions crates/application/src/export_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use std::{
};

use anyhow::Context;
use async_recursion::async_recursion;
use async_zip::{
write::{
EntryStreamWriter,
Expand All @@ -26,7 +25,6 @@ use common::{
bootstrap_model::tables::TABLES_TABLE,
components::{
ComponentId,
ComponentName,
ComponentPath,
},
document::{
Expand Down Expand Up @@ -64,6 +62,7 @@ use futures::{
StreamExt,
TryStreamExt,
};
use itertools::Itertools;
use keybroker::Identity;
use mime2ext::mime2ext;
use model::{
Expand Down Expand Up @@ -151,39 +150,6 @@ pub struct ExportWorker<RT: Runtime> {
usage_tracking: UsageCounter,
}

/// All components (including unmounted) organized into tree format.
struct ComponentTree {
id: ComponentId,
children: BTreeMap<ComponentName, Box<ComponentTree>>,
}

impl ComponentTree {
fn new(
current_component_id: ComponentId,
component_ids_to_paths: &BTreeMap<ComponentId, ComponentPath>,
) -> anyhow::Result<Self> {
let current_path = component_ids_to_paths
.get(&current_component_id)
.context("Not found?")?;
let mut children = BTreeMap::new();
for (component_id, component_path) in component_ids_to_paths {
let Some((parent_path, component_name)) = component_path.parent() else {
continue;
};
if parent_path == *current_path {
children.insert(
component_name,
Box::new(Self::new(*component_id, component_ids_to_paths)?),
);
}
}
Ok(Self {
id: current_component_id,
children,
})
}
}

impl<RT: Runtime> ExportWorker<RT> {
#[allow(clippy::new_ret_no_self)]
pub fn new(
Expand Down Expand Up @@ -315,7 +281,7 @@ impl<RT: Runtime> ExportWorker<RT> {
{
let storage = &self.storage;
update_progress("Beginning backup".to_string()).await?;
let (ts, tables, component_ids_to_paths, by_id_indexes, system_tables, component_tree) = {
let (ts, tables, component_ids_to_paths, by_id_indexes, system_tables) = {
let mut tx = self.database.begin(Identity::system()).await?;
let by_id_indexes = IndexModel::new(&mut tx).by_id_indexes().await?;
let snapshot = self.database.snapshot(tx.begin_timestamp())?;
Expand All @@ -336,7 +302,6 @@ impl<RT: Runtime> ExportWorker<RT> {
})
.collect();
let component_ids_to_paths = snapshot.component_ids_to_paths();
let component_tree = ComponentTree::new(ComponentId::Root, &component_ids_to_paths)?;
let system_tables = snapshot
.table_registry
.iter_active_system_tables()
Expand All @@ -348,7 +313,6 @@ impl<RT: Runtime> ExportWorker<RT> {
component_ids_to_paths,
by_id_indexes,
system_tables,
component_tree,
)
};
match format {
Expand All @@ -363,9 +327,8 @@ impl<RT: Runtime> ExportWorker<RT> {

let zipper = self.construct_zip_snapshot(
writer,
component_tree,
tables.clone(),
&component_ids_to_paths,
component_ids_to_paths,
ts,
by_id_indexes,
system_tables,
Expand All @@ -381,31 +344,26 @@ impl<RT: Runtime> ExportWorker<RT> {
}
}

#[async_recursion]
async fn write_component<'a, 'b: 'a, F, Fut>(
&self,
path_prefix: &'a str,
component_tree: &'a ComponentTree,
namespace: TableNamespace,
component_path: ComponentPath,
zip_snapshot_upload: &'a mut ZipSnapshotUpload<'b>,
tables: &'a mut BTreeMap<TabletId, (TableNamespace, TableNumber, TableName, TableSummary)>,
component_ids_to_paths: &BTreeMap<ComponentId, ComponentPath>,
snapshot_ts: RepeatableTimestamp,
by_id_indexes: &BTreeMap<TabletId, IndexId>,
system_tables: &BTreeMap<(TableNamespace, TableName), TabletId>,
include_storage: bool,
usage: FunctionUsageTracker,
usage: &FunctionUsageTracker,
requestor: ExportRequestor,
update_progress: F,
) -> anyhow::Result<()>
where
F: Fn(String) -> Fut + Send + Copy,
Fut: Future<Output = anyhow::Result<()>> + Send,
{
let namespace: TableNamespace = component_tree.id.into();
let component_path = component_ids_to_paths
.get(&component_tree.id)
.cloned()
.unwrap_or_default();
let path_prefix = get_export_path_prefix(&component_path);

let in_component_str = component_path.in_component_str();
let tablet_ids: BTreeSet<_> = tables
.iter()
Expand All @@ -417,7 +375,7 @@ impl<RT: Runtime> ExportWorker<RT> {
update_progress(format!("Enumerating tables{in_component_str}")).await?;
// _tables
let mut table_upload = zip_snapshot_upload
.start_system_table(path_prefix, TABLES_TABLE.clone())
.start_system_table(&path_prefix, TABLES_TABLE.clone())
.await?;

// Write documents from stream to table uploads, in table number order.
Expand Down Expand Up @@ -452,7 +410,7 @@ impl<RT: Runtime> ExportWorker<RT> {

// First write metadata to _storage/documents.jsonl
let mut table_upload = zip_snapshot_upload
.start_system_table(path_prefix, FILE_STORAGE_VIRTUAL_TABLE.clone())
.start_system_table(&path_prefix, FILE_STORAGE_VIRTUAL_TABLE.clone())
.await?;
let table_iterator = self.database.table_iterator(snapshot_ts, 1000, None);
let stream = table_iterator.stream_documents_in_table(*tablet_id, *by_id, None);
Expand Down Expand Up @@ -552,7 +510,7 @@ impl<RT: Runtime> ExportWorker<RT> {
}

let mut table_upload = zip_snapshot_upload
.start_table(path_prefix, table_name.clone(), generated_schema)
.start_table(&path_prefix, table_name.clone(), generated_schema)
.await?;

let table_iterator = self.database.table_iterator(snapshot_ts, 1000, None);
Expand All @@ -572,39 +530,14 @@ impl<RT: Runtime> ExportWorker<RT> {
table_upload.complete().await?;
}

// Write children components, if there are any.
for (name, child) in &component_tree.children {
let path_prefix = format!(
"{path_prefix}{}/{}/",
&*COMPONENTS_TABLE,
String::from(name.clone())
);
self.write_component(
&path_prefix,
child,
zip_snapshot_upload,
tables,
component_ids_to_paths,
snapshot_ts,
by_id_indexes,
system_tables,
include_storage,
usage.clone(),
requestor,
update_progress,
)
.await?;
}

Ok(())
}

async fn construct_zip_snapshot<F, Fut>(
&self,
mut writer: ChannelWriter,
component_tree: ComponentTree,
mut tables: BTreeMap<TabletId, (TableNamespace, TableNumber, TableName, TableSummary)>,
component_ids_to_paths: &BTreeMap<ComponentId, ComponentPath>,
component_ids_to_paths: BTreeMap<ComponentId, ComponentPath>,
snapshot_ts: RepeatableTimestamp,
by_id_indexes: BTreeMap<TabletId, IndexId>,
system_tables: BTreeMap<(TableNamespace, TableName), TabletId>,
Expand All @@ -619,21 +552,23 @@ impl<RT: Runtime> ExportWorker<RT> {
{
let mut zip_snapshot_upload = ZipSnapshotUpload::new(&mut writer).await?;

self.write_component(
"",
&component_tree,
&mut zip_snapshot_upload,
&mut tables,
component_ids_to_paths,
snapshot_ts,
&by_id_indexes,
&system_tables,
include_storage,
usage,
requestor,
update_progress,
)
.await?;
for (component_id, component_path) in component_ids_to_paths {
let namespace: TableNamespace = component_id.into();
self.write_component(
namespace,
component_path,
&mut zip_snapshot_upload,
&mut tables,
snapshot_ts,
&by_id_indexes,
&system_tables,
include_storage,
&usage,
requestor,
update_progress,
)
.await?;
}

// Complete upload.
zip_snapshot_upload.complete().await?;
Expand Down Expand Up @@ -785,6 +720,19 @@ impl<RT: Runtime> ExportWorker<RT> {
}
}

fn get_export_path_prefix(component_path: &ComponentPath) -> String {
component_path
.iter()
.map(|parent_name| {
format!(
"{}/{}/",
&*COMPONENTS_TABLE,
String::from(parent_name.clone())
)
})
.join("")
}

#[derive(thiserror::Error, Debug)]
#[error("Export canceled")]
struct ExportCanceled;
Expand Down Expand Up @@ -1005,7 +953,10 @@ mod tests {

use super::ExportWorker;
use crate::{
export_worker::README_MD_CONTENTS,
export_worker::{
get_export_path_prefix,
README_MD_CONTENTS,
},
test_helpers::ApplicationTestExt,
tests::components::unmount_component,
Application,
Expand Down Expand Up @@ -1442,4 +1393,19 @@ mod tests {
.await?;
Ok(())
}

#[test]
fn test_get_export_path_prefix() -> anyhow::Result<()> {
assert_eq!(get_export_path_prefix(&ComponentPath::root()), "");
assert_eq!(get_export_path_prefix(&"a".parse()?), "_components/a/");
assert_eq!(
get_export_path_prefix(&"a/b".parse()?),
"_components/a/_components/b/"
);
assert_eq!(
get_export_path_prefix(&"a/b/c".parse()?),
"_components/a/_components/b/_components/c/"
);
Ok(())
}
}
4 changes: 4 additions & 0 deletions crates/common/src/components/component_path.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,10 @@ impl ComponentPath {
}
}

pub fn iter(&self) -> impl Iterator<Item = &ComponentName> {
self.path.iter()
}

pub fn join(&self, name: ComponentName) -> Self {
let mut path = self.path.clone();
path.push(name);
Expand Down

0 comments on commit 24550ea

Please sign in to comment.