diff --git a/crates/application/src/export_worker.rs b/crates/application/src/export_worker.rs deleted file mode 100644 index 69cb2d39..00000000 --- a/crates/application/src/export_worker.rs +++ /dev/null @@ -1,1411 +0,0 @@ -use std::{ - collections::{ - BTreeMap, - BTreeSet, - }, - sync::Arc, - time::Duration, -}; - -use anyhow::Context; -use async_zip::{ - write::{ - EntryStreamWriter, - ZipFileWriter, - }, - Compression, - ZipEntryBuilder, - ZipEntryBuilderExt, -}; -use bytes::Bytes; -use common::{ - self, - async_compat::TokioAsyncWriteCompatExt, - backoff::Backoff, - bootstrap_model::tables::TABLES_TABLE, - components::{ - ComponentId, - ComponentPath, - }, - document::{ - ParsedDocument, - ResolvedDocument, - }, - errors::report_error, - execution_context::ExecutionId, - pause::PauseClient, - runtime::Runtime, - types::{ - IndexId, - ObjectKey, - RepeatableTimestamp, - TableName, - Timestamp, - UdfIdentifier, - }, - RequestId, -}; -use database::{ - Database, - IndexModel, - SystemMetadataModel, - TableSummary, - COMPONENTS_TABLE, -}; -use futures::{ - pin_mut, - stream::BoxStream, - try_join, - AsyncWriteExt, - Future, - FutureExt, - StreamExt, - TryStreamExt, -}; -use itertools::Itertools; -use keybroker::Identity; -use mime2ext::mime2ext; -use model::{ - exports::{ - types::{ - Export, - ExportFormat, - ExportRequestor, - }, - ExportsModel, - }, - file_storage::{ - types::FileStorageEntry, - FILE_STORAGE_TABLE, - FILE_STORAGE_VIRTUAL_TABLE, - }, -}; -use serde::{ - Deserialize, - Serialize, -}; -use serde_json::{ - json, - Value as JsonValue, -}; -use shape_inference::{ - export_context::{ - ExportContext, - GeneratedSchema, - }, - ShapeConfig, -}; -use storage::{ - ChannelWriter, - Storage, - StorageExt, - Upload, - UploadExt, -}; -use tokio::sync::mpsc; -use tokio_stream::wrappers::ReceiverStream; -use usage_tracking::{ - CallType, - FunctionUsageTracker, - StorageUsageTracker, - UsageCounter, -}; -use value::{ - export::ValueFormat, - TableNamespace, - TableNumber, - TabletId, -}; - -use crate::metrics::{ - export_timer, - log_worker_starting, -}; - -const INITIAL_BACKOFF: Duration = Duration::from_secs(1); -const MAX_BACKOFF: Duration = Duration::from_secs(900); // 15 minutes -static AFTER_DOCUMENTS_CLEAN: Bytes = Bytes::from_static("\n".as_bytes()); - -// 0o644 => read-write for owner, read for everyone else. -const ZIP_ENTRY_PERMISSIONS: u16 = 0o644; - -static README_MD_CONTENTS: &str = r#"# Welcome to your Convex snapshot export! - -This ZIP file contains a snapshot of the tables in your Convex deployment. - -Documents for each table are listed as lines of JSON in -/documents.jsonl files. - -For details on the format and how to use this snapshot with npx convex import, -check out [the docs](https://docs.convex.dev/database/import-export/export) or -ask us in [Discord](http://convex.dev/community). -"#; - -pub struct ExportWorker { - runtime: RT, - database: Database, - storage: Arc, - file_storage: Arc, - backoff: Backoff, - usage_tracking: UsageCounter, -} - -impl ExportWorker { - #[allow(clippy::new_ret_no_self)] - pub fn new( - runtime: RT, - database: Database, - storage: Arc, - file_storage: Arc, - usage_tracking: UsageCounter, - ) -> impl Future + Send { - let mut worker = Self { - runtime, - database, - storage, - file_storage, - backoff: Backoff::new(INITIAL_BACKOFF, MAX_BACKOFF), - usage_tracking, - }; - async move { - loop { - if let Err(e) = worker.run().await { - report_error(&mut e.context("ExportWorker died")).await; - let delay = worker.backoff.fail(&mut worker.runtime.rng()); - worker.runtime.wait(delay).await; - } else { - worker.backoff.reset(); - } - } - } - } - - #[cfg(test)] - pub fn new_test( - runtime: RT, - database: Database, - storage: Arc, - file_storage: Arc, - ) -> Self { - use events::usage::NoOpUsageEventLogger; - - Self { - runtime, - database, - storage, - file_storage, - backoff: Backoff::new(INITIAL_BACKOFF, MAX_BACKOFF), - usage_tracking: UsageCounter::new(Arc::new(NoOpUsageEventLogger)), - } - } - - // Subscribe to the export table. If there is a requested export, start - // an export and mark as in_progress. If there's an export job that didn't - // finish (it's in_progress), restart that export. - pub async fn run(&mut self) -> anyhow::Result<()> { - let mut tx = self.database.begin(Identity::system()).await?; - let mut exports_model = ExportsModel::new(&mut tx); - let export_requested = exports_model.latest_requested().await?; - let export_in_progress = exports_model.latest_in_progress().await?; - match (export_requested, export_in_progress) { - (Some(_), Some(_)) => { - anyhow::bail!("Can only have one export requested or in progress at once.") - }, - (Some(export), None) => { - tracing::info!("Export requested."); - let _status = log_worker_starting("ExportWorker"); - let timer = export_timer(); - let ts = self.database.now_ts_for_reads(); - let in_progress_export = (*export).clone().in_progress(*ts)?; - let in_progress_export_doc = SystemMetadataModel::new_global(&mut tx) - .replace( - export.id().to_owned(), - in_progress_export.clone().try_into()?, - ) - .await? - .try_into()?; - self.database - .commit_with_write_source(tx, "export_worker_export_requested") - .await?; - self.export(in_progress_export_doc).await?; - timer.finish(); - return Ok(()); - }, - (None, Some(export)) => { - tracing::info!("In progress export restarting..."); - let _status = log_worker_starting("ExportWorker"); - let timer = export_timer(); - self.export(export).await?; - timer.finish(); - return Ok(()); - }, - (None, None) => { - tracing::info!("No exports requested or in progress."); - }, - } - let token = tx.into_token()?; - let subscription = self.database.subscribe(token).await?; - subscription.wait_for_invalidation().await; - Ok(()) - } - - async fn export(&mut self, export: ParsedDocument) -> anyhow::Result<()> { - loop { - match self.export_and_mark_complete(export.clone()).await { - Ok(()) => { - return Ok(()); - }, - Err(mut e) => { - if e.is::() { - tracing::info!("Export {} canceled", export.id()); - return Ok(()); - } - report_error(&mut e).await; - let delay = self.backoff.fail(&mut self.runtime.rng()); - tracing::error!("Export failed, retrying in {delay:?}"); - self.runtime.wait(delay).await; - }, - } - } - } - - async fn export_inner( - &mut self, - format: ExportFormat, - requestor: ExportRequestor, - update_progress: F, - ) -> anyhow::Result<(Timestamp, ObjectKey, FunctionUsageTracker)> - where - F: Fn(String) -> Fut + Send + Copy, - Fut: Future> + Send, - { - let storage = &self.storage; - update_progress("Beginning backup".to_string()).await?; - let (ts, tables, component_ids_to_paths, by_id_indexes, system_tables) = { - let mut tx = self.database.begin(Identity::system()).await?; - let by_id_indexes = IndexModel::new(&mut tx).by_id_indexes().await?; - let snapshot = self.database.snapshot(tx.begin_timestamp())?; - let table_summaries = snapshot.must_table_summaries()?; - let tables: BTreeMap<_, _> = snapshot - .table_registry - .iter_active_user_tables() - .map(|(tablet_id, table_namespace, table_number, table_name)| { - ( - tablet_id, - ( - table_namespace, - table_number, - table_name.clone(), - table_summaries.tablet_summary(&tablet_id), - ), - ) - }) - .collect(); - let component_ids_to_paths = snapshot.component_ids_to_paths(); - let system_tables = snapshot - .table_registry - .iter_active_system_tables() - .map(|(id, namespace, _, name)| ((namespace, name.clone()), id)) - .collect(); - ( - tx.begin_timestamp(), - tables, - component_ids_to_paths, - by_id_indexes, - system_tables, - ) - }; - match format { - ExportFormat::Zip { include_storage } => { - // Start upload. - let mut upload = storage.start_upload().await?; - let (sender, receiver) = mpsc::channel::(1); - let uploader = - upload.try_write_parallel_and_hash(ReceiverStream::new(receiver).map(Ok)); - let writer = ChannelWriter::new(sender, 5 * (1 << 20)); - let usage = FunctionUsageTracker::new(); - - let zipper = self.construct_zip_snapshot( - writer, - tables.clone(), - component_ids_to_paths, - ts, - by_id_indexes, - system_tables, - include_storage, - usage.clone(), - requestor, - update_progress, - ); - let (_, ()) = try_join!(uploader, zipper)?; - let zip_object_key = upload.complete().await?; - Ok((*ts, zip_object_key, usage)) - }, - } - } - - async fn write_component<'a, 'b: 'a, F, Fut>( - &self, - namespace: TableNamespace, - component_path: ComponentPath, - zip_snapshot_upload: &'a mut ZipSnapshotUpload<'b>, - tables: &'a mut BTreeMap, - snapshot_ts: RepeatableTimestamp, - by_id_indexes: &BTreeMap, - system_tables: &BTreeMap<(TableNamespace, TableName), TabletId>, - include_storage: bool, - usage: &FunctionUsageTracker, - requestor: ExportRequestor, - update_progress: F, - ) -> anyhow::Result<()> - where - F: Fn(String) -> Fut + Send + Copy, - Fut: Future> + Send, - { - let path_prefix = get_export_path_prefix(&component_path); - - let in_component_str = component_path.in_component_str(); - let tablet_ids: BTreeSet<_> = tables - .iter() - .filter(|(_, (ns, ..))| *ns == namespace) - .map(|(tablet_id, _)| *tablet_id) - .collect(); - - { - update_progress(format!("Enumerating tables{in_component_str}")).await?; - // _tables - let mut table_upload = zip_snapshot_upload - .start_system_table(&path_prefix, TABLES_TABLE.clone()) - .await?; - - // Write documents from stream to table uploads, in table number order. - // This includes all user tables present in the export. - let mut user_table_numbers_and_names: Vec<_> = tables - .iter() - .filter(|(_, (ns, ..))| *ns == namespace) - .map(|(_, (_, table_number, table_name, _))| (table_number, table_name)) - .collect(); - user_table_numbers_and_names.sort(); - for (table_number, table_name) in user_table_numbers_and_names { - table_upload - .write_json_line(json!({ - "name": table_name.clone(), - "id": *table_number, - })) - .await?; - } - table_upload.complete().await?; - } - - if include_storage { - update_progress(format!("Backing up _storage{in_component_str}")).await?; - - // _storage - let tablet_id = system_tables - .get(&(namespace, FILE_STORAGE_TABLE.clone())) - .context("_file_storage does not exist")?; - let by_id = by_id_indexes - .get(tablet_id) - .context("_file_storage.by_id does not exist")?; - - // First write metadata to _storage/documents.jsonl - let mut table_upload = zip_snapshot_upload - .start_system_table(&path_prefix, FILE_STORAGE_VIRTUAL_TABLE.clone()) - .await?; - let table_iterator = self.database.table_iterator(snapshot_ts, 1000, None); - let stream = table_iterator.stream_documents_in_table(*tablet_id, *by_id, None); - pin_mut!(stream); - while let Some((doc, _ts)) = stream.try_next().await? { - let file_storage_entry = ParsedDocument::::try_from(doc)?; - let virtual_storage_id = file_storage_entry.id().developer_id; - let creation_time = f64::from( - file_storage_entry - .creation_time() - .context("file should have creation time")?, - ); - table_upload - .write_json_line(json!(FileStorageZipMetadata { - id: virtual_storage_id.encode(), - creation_time: Some(creation_time), - sha256: Some(file_storage_entry.sha256.as_base64()), - size: Some(file_storage_entry.size), - content_type: file_storage_entry.content_type.clone(), - internal_id: Some(file_storage_entry.storage_id.to_string()), - })) - .await?; - } - table_upload.complete().await?; - - let table_iterator = self.database.table_iterator(snapshot_ts, 1000, None); - let stream = table_iterator.stream_documents_in_table(*tablet_id, *by_id, None); - pin_mut!(stream); - while let Some((doc, _ts)) = stream.try_next().await? { - let file_storage_entry = ParsedDocument::::try_from(doc)?; - let virtual_storage_id = file_storage_entry.id().developer_id; - // Add an extension, which isn't necessary for anything and might be incorrect, - // but allows the file to be viewed at a glance in most cases. - let extension_guess = file_storage_entry - .content_type - .as_ref() - .and_then(mime2ext) - .map(|extension| format!(".{extension}")) - .unwrap_or_default(); - let path = format!( - "{path_prefix}{}/{}{extension_guess}", - *FILE_STORAGE_VIRTUAL_TABLE, - virtual_storage_id.encode() - ); - let file_stream = self - .file_storage - .get(&file_storage_entry.storage_key) - .await? - .with_context(|| { - format!( - "file missing from storage: {} with key {:?}", - file_storage_entry.developer_id().encode(), - file_storage_entry.storage_key, - ) - })?; - - let content_type = file_storage_entry - .content_type - .as_ref() - .map(|ct| ct.parse()) - .transpose()?; - usage.track_storage_call( - component_path.clone(), - requestor.usage_tag(), - file_storage_entry.storage_id.clone(), - content_type, - file_storage_entry.sha256.clone(), - ); - self.usage_tracking.track_independent_storage_egress_size( - component_path.clone(), - requestor.usage_tag().to_string(), - file_stream.content_length as u64, - ); - zip_snapshot_upload - .stream_full_file(path, file_stream.stream) - .await?; - } - } - - for tablet_id in tablet_ids.iter() { - let (_, _, table_name, table_summary) = - tables.remove(tablet_id).expect("table should have details"); - let by_id = by_id_indexes - .get(tablet_id) - .ok_or_else(|| anyhow::anyhow!("no by_id index for {} found", tablet_id))?; - - update_progress(format!("Backing up {table_name}{in_component_str}")).await?; - - let mut generated_schema = GeneratedSchema::new(table_summary.inferred_type().into()); - if ExportContext::is_ambiguous(table_summary.inferred_type()) { - let table_iterator = self.database.table_iterator(snapshot_ts, 1000, None); - let stream = table_iterator.stream_documents_in_table(*tablet_id, *by_id, None); - pin_mut!(stream); - while let Some((doc, _ts)) = stream.try_next().await? { - generated_schema.insert(doc.value(), doc.developer_id()); - } - } - - let mut table_upload = zip_snapshot_upload - .start_table(&path_prefix, table_name.clone(), generated_schema) - .await?; - - let table_iterator = self.database.table_iterator(snapshot_ts, 1000, None); - let stream = table_iterator.stream_documents_in_table(*tablet_id, *by_id, None); - pin_mut!(stream); - - // Write documents from stream to table uploads - while let Some((doc, _ts)) = stream.try_next().await? { - usage.track_database_egress_size( - component_path.clone(), - table_name.to_string(), - doc.size() as u64, - false, - ); - table_upload.write(doc).await?; - } - table_upload.complete().await?; - } - - Ok(()) - } - - async fn construct_zip_snapshot( - &self, - mut writer: ChannelWriter, - mut tables: BTreeMap, - component_ids_to_paths: BTreeMap, - snapshot_ts: RepeatableTimestamp, - by_id_indexes: BTreeMap, - system_tables: BTreeMap<(TableNamespace, TableName), TabletId>, - include_storage: bool, - usage: FunctionUsageTracker, - requestor: ExportRequestor, - update_progress: F, - ) -> anyhow::Result<()> - where - F: Fn(String) -> Fut + Send + Copy, - Fut: Future> + Send, - { - let mut zip_snapshot_upload = ZipSnapshotUpload::new(&mut writer).await?; - - for (component_id, component_path) in component_ids_to_paths { - let namespace: TableNamespace = component_id.into(); - self.write_component( - namespace, - component_path, - &mut zip_snapshot_upload, - &mut tables, - snapshot_ts, - &by_id_indexes, - &system_tables, - include_storage, - &usage, - requestor, - update_progress, - ) - .await?; - } - - // Complete upload. - zip_snapshot_upload.complete().await?; - writer.compat_write().close().await?; - Ok(()) - } - - async fn export_and_mark_complete( - &mut self, - export: ParsedDocument, - ) -> anyhow::Result<()> { - let id = export.id(); - let format = export.format(); - let requestor = export.requestor(); - drop(export); // Drop this to prevent accidentally using stale state - - tracing::info!("Export {id} beginning..."); - let (snapshot_ts, object_key, usage) = { - let database_ = self.database.clone(); - let export_future = async { - let database_ = self.database.clone(); - - self.export_inner(format, requestor, |msg| async { - tracing::info!("Export {id} progress: {msg}"); - database_ - .execute_with_occ_retries( - Identity::system(), - FunctionUsageTracker::new(), - PauseClient::new(), - "export_worker_update_progress", - move |tx| { - let msg = msg.clone(); - async move { - let export: ParsedDocument = - tx.get(id).await?.context(ExportCanceled)?.try_into()?; - let export = export.into_value(); - if let Export::Canceled { .. } = export { - anyhow::bail!(ExportCanceled); - } - SystemMetadataModel::new_global(tx) - .replace(id, export.update_progress(msg)?.try_into()?) - .await?; - Ok(()) - } - .boxed() - .into() - }, - ) - .await?; - Ok(()) - }) - .await - }; - tokio::pin!(export_future); - - // In parallel, monitor the export document to check for cancellation - let monitor_export = async move { - loop { - let mut tx = database_.begin_system().await?; - let Some(export) = tx.get(id).await? else { - tracing::warn!("Export {id} disappeared"); - return Err(ExportCanceled.into()); - }; - let export: ParsedDocument = export.try_into()?; - match *export { - Export::InProgress { .. } => (), - Export::Canceled { .. } => return Err(ExportCanceled.into()), - Export::Requested { .. } - | Export::Failed { .. } - | Export::Completed { .. } => { - anyhow::bail!("Export {id} is in unexpected state: {export:?}"); - }, - } - let token = tx.into_token()?; - let subscription = database_.subscribe(token).await?; - subscription.wait_for_invalidation().await; - } - }; - tokio::pin!(monitor_export); - - futures::future::select(export_future, monitor_export) - .await - .factor_first() - .0? - }; - - // Export is done; mark it as such. - tracing::info!("Export {id} completed"); - self.database - .execute_with_occ_retries( - Identity::system(), - FunctionUsageTracker::new(), - PauseClient::new(), - "export_worker_mark_complete", - |tx| { - let object_key = object_key.clone(); - async move { - let Some(export) = tx.get(id).await? else { - tracing::warn!("Export {id} disappeared"); - return Err(ExportCanceled.into()); - }; - let export: ParsedDocument = export.try_into()?; - if let Export::Canceled { .. } = *export { - return Err(ExportCanceled.into()); - } - let completed_export = export.into_value().completed( - snapshot_ts, - *tx.begin_timestamp(), - object_key, - )?; - SystemMetadataModel::new_global(tx) - .replace(id, completed_export.try_into()?) - .await?; - Ok(()) - } - .boxed() - .into() - }, - ) - .await?; - - let object_attributes = self - .storage - .get_object_attributes(&object_key) - .await? - .context("error getting export object attributes from S3")?; - - let tag = requestor.usage_tag().to_string(); - let call_type = match requestor { - ExportRequestor::SnapshotExport => CallType::Export, - ExportRequestor::CloudBackup => CallType::CloudBackup, - }; - // Charge file bandwidth for the upload of the snapshot to exports storage - self.usage_tracking.track_independent_storage_ingress_size( - ComponentPath::root(), - tag.clone(), - object_attributes.size, - ); - // Charge database bandwidth accumulated during the export - self.usage_tracking.track_call( - UdfIdentifier::Cli(tag), - ExecutionId::new(), - RequestId::new(), - call_type, - true, - usage.gather_user_stats(), - ); - Ok(()) - } -} - -fn get_export_path_prefix(component_path: &ComponentPath) -> String { - component_path - .iter() - .map(|parent_name| { - format!( - "{}/{}/", - &*COMPONENTS_TABLE, - String::from(parent_name.clone()) - ) - }) - .join("") -} - -#[derive(thiserror::Error, Debug)] -#[error("Export canceled")] -struct ExportCanceled; - -#[derive(Debug, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct FileStorageZipMetadata { - #[serde(rename = "_id")] - pub id: String, - #[serde(rename = "_creationTime")] - pub creation_time: Option, - pub sha256: Option, - pub size: Option, - pub content_type: Option, - pub internal_id: Option, -} - -// 'a is lifetime of entire zip file writer. -// 'b is lifetime of entry writer for a single table. -struct ZipSnapshotTableUpload<'a, 'b> { - entry_writer: EntryStreamWriter<'b, &'a mut ChannelWriter>, -} - -impl<'a, 'b> ZipSnapshotTableUpload<'a, 'b> { - async fn new( - zip_writer: &'b mut ZipFileWriter<&'a mut ChannelWriter>, - path_prefix: &str, - table_name: TableName, - ) -> anyhow::Result { - let source_path = format!("{path_prefix}{table_name}/documents.jsonl"); - let builder = ZipEntryBuilder::new(source_path.clone(), Compression::Deflate) - .unix_permissions(ZIP_ENTRY_PERMISSIONS); - let entry_writer = zip_writer.write_entry_stream(builder.build()).await?; - Ok(Self { entry_writer }) - } - - async fn write(&mut self, doc: ResolvedDocument) -> anyhow::Result<()> { - let json = doc.export(ValueFormat::ConvexCleanJSON); - self.write_json_line(json).await - } - - async fn write_json_line(&mut self, json: JsonValue) -> anyhow::Result<()> { - let buf = serde_json::to_vec(&json)?; - self.entry_writer.compat_mut_write().write_all(&buf).await?; - self.entry_writer - .compat_mut_write() - .write_all(&AFTER_DOCUMENTS_CLEAN) - .await?; - Ok(()) - } - - async fn complete(self) -> anyhow::Result<()> { - self.entry_writer.close().await?; - Ok(()) - } -} - -struct ZipSnapshotUpload<'a> { - writer: ZipFileWriter<&'a mut ChannelWriter>, -} - -impl<'a> ZipSnapshotUpload<'a> { - async fn new(out: &'a mut ChannelWriter) -> anyhow::Result { - let writer = ZipFileWriter::new(out); - let mut zip_snapshot_upload = Self { writer }; - zip_snapshot_upload - .write_full_file(format!("README.md"), README_MD_CONTENTS) - .await?; - Ok(zip_snapshot_upload) - } - - async fn write_full_file(&mut self, path: String, contents: &str) -> anyhow::Result<()> { - let builder = ZipEntryBuilder::new(path, Compression::Deflate) - .unix_permissions(ZIP_ENTRY_PERMISSIONS); - let mut entry_writer = self.writer.write_entry_stream(builder.build()).await?; - entry_writer - .compat_mut_write() - .write_all(contents.as_bytes()) - .await?; - entry_writer.close().await?; - Ok(()) - } - - async fn stream_full_file( - &mut self, - path: String, - mut contents: BoxStream<'_, std::io::Result>, - ) -> anyhow::Result<()> { - let builder = ZipEntryBuilder::new(path, Compression::Deflate) - .unix_permissions(ZIP_ENTRY_PERMISSIONS); - let mut entry_writer = self.writer.write_entry_stream(builder.build()).await?; - while let Some(chunk) = contents.try_next().await? { - entry_writer.compat_mut_write().write_all(&chunk).await?; - } - entry_writer.close().await?; - Ok(()) - } - - async fn start_table( - &mut self, - path_prefix: &str, - table_name: TableName, - generated_schema: GeneratedSchema, - ) -> anyhow::Result> { - self.write_generated_schema(path_prefix, &table_name, generated_schema) - .await?; - - ZipSnapshotTableUpload::new(&mut self.writer, path_prefix, table_name).await - } - - /// System tables have known shape, so we don't need to serialize it. - async fn start_system_table( - &mut self, - path_prefix: &str, - table_name: TableName, - ) -> anyhow::Result> { - anyhow::ensure!(table_name.is_system()); - ZipSnapshotTableUpload::new(&mut self.writer, path_prefix, table_name).await - } - - async fn write_generated_schema( - &mut self, - path_prefix: &str, - table_name: &TableName, - generated_schema: GeneratedSchema, - ) -> anyhow::Result<()> { - let generated_schema_path = format!("{path_prefix}{table_name}/generated_schema.jsonl"); - let builder = ZipEntryBuilder::new(generated_schema_path.clone(), Compression::Deflate) - .unix_permissions(ZIP_ENTRY_PERMISSIONS); - let mut entry_writer = self.writer.write_entry_stream(builder.build()).await?; - let generated_schema_str = generated_schema.inferred_shape.to_string(); - entry_writer - .compat_mut_write() - .write_all(serde_json::to_string(&generated_schema_str)?.as_bytes()) - .await?; - entry_writer.compat_mut_write().write_all(b"\n").await?; - for (override_id, override_export_context) in generated_schema.overrides.into_iter() { - let override_json = - json!({override_id.encode(): JsonValue::from(override_export_context)}); - entry_writer - .compat_mut_write() - .write_all(serde_json::to_string(&override_json)?.as_bytes()) - .await?; - entry_writer.compat_mut_write().write_all(b"\n").await?; - } - entry_writer.close().await?; - Ok(()) - } - - async fn complete(self) -> anyhow::Result<()> { - self.writer.close().await?; - Ok(()) - } -} - -#[cfg(test)] -mod tests { - use std::{ - collections::{ - BTreeMap, - BTreeSet, - }, - str, - sync::Arc, - }; - - use anyhow::Context; - use bytes::Bytes; - use common::{ - components::{ - ComponentId, - ComponentPath, - }, - document::ParsedDocument, - types::{ - ConvexOrigin, - TableName, - }, - value::ConvexObject, - }; - use database::{ - test_helpers::DbFixtures, - BootstrapComponentsModel, - Database, - TableModel, - UserFacingModel, - }; - use file_storage::{ - FileStorage, - TransactionalFileStorage, - }; - use headers::ContentType; - use keybroker::Identity; - use maplit::btreeset; - use model::{ - exports::types::{ - ExportFormat, - ExportRequestor, - }, - file_storage::types::FileStorageEntry, - test_helpers::DbFixturesWithModel, - }; - use runtime::testing::TestRuntime; - use serde_json::json; - use storage::{ - LocalDirStorage, - Storage, - StorageExt, - }; - use usage_tracking::FunctionUsageTracker; - use value::{ - assert_obj, - export::ValueFormat, - DeveloperDocumentId, - ResolvedDocumentId, - TableNamespace, - }; - - use super::ExportWorker; - use crate::{ - export_worker::{ - get_export_path_prefix, - README_MD_CONTENTS, - }, - test_helpers::ApplicationTestExt, - tests::components::unmount_component, - Application, - }; - - #[convex_macro::test_runtime] - async fn test_export_zip(rt: TestRuntime) -> anyhow::Result<()> { - let DbFixtures { db, .. } = DbFixtures::new_with_model(&rt).await?; - let storage: Arc = Arc::new(LocalDirStorage::new(rt.clone())?); - let file_storage: Arc = Arc::new(LocalDirStorage::new(rt.clone())?); - let mut export_worker = - ExportWorker::new_test(rt, db.clone(), storage.clone(), file_storage); - - let mut expected_export_entries = BTreeMap::new(); - - expected_export_entries.insert("README.md".to_string(), README_MD_CONTENTS.to_string()); - - expected_export_entries.insert( - "_tables/documents.jsonl".to_string(), - format!( - "{}\n{}\n{}\n", - json!({"name": "table_0", "id": 10001}), - json!({"name": "table_1", "id": 10002}), - json!({"name": "table_2", "id": 10003}), - ), - ); - expected_export_entries.insert("_storage/documents.jsonl".to_string(), format!("")); - - // Write to a bunch of tables - for i in 0..3 { - let table: TableName = str::parse(format!("table_{i}").as_str())?; - let mut tx = db.begin(Identity::system()).await?; - let id = match i { - 0 => { - UserFacingModel::new_root_for_test(&mut tx) - .insert(table, assert_obj!("foo" => 1)) - .await? - }, - 1 => { - UserFacingModel::new_root_for_test(&mut tx) - .insert(table, assert_obj!("foo" => [1, "1"])) - .await? - }, - _ => { - UserFacingModel::new_root_for_test(&mut tx) - .insert(table, assert_obj!("foo" => "1")) - .await? - }, - }; - let doc = UserFacingModel::new_root_for_test(&mut tx) - .get(id, None) - .await? - .unwrap(); - let tablet_id = tx - .table_mapping() - .namespace(TableNamespace::test_user()) - .number_to_tablet()(doc.table())?; - let doc = doc.to_resolved(tablet_id); - let id_v6 = doc.developer_id().encode(); - expected_export_entries.insert( - format!("table_{i}/documents.jsonl"), - format!( - "{}\n", - serde_json::to_string(&doc.export(ValueFormat::ConvexCleanJSON))? - ), - ); - expected_export_entries.insert( - format!("table_{i}/generated_schema.jsonl"), - match i { - 0 => format!( - "{}\n", - json!(format!( - "{{\"_creationTime\": normalfloat64, \"_id\": \"{id_v6}\", \"foo\": \ - int64}}" - )) - ), - 1 => format!( - "{}\n{}\n", - json!(format!( - "{{\"_creationTime\": normalfloat64, \"_id\": \"{id_v6}\", \"foo\": \ - array}}" - )), - json!({id_v6: {"foo": ["int64", "infer"]}}) - ), - _ => format!( - "{}\n", - json!(format!( - "{{\"_creationTime\": normalfloat64, \"_id\": \"{id_v6}\", \"foo\": \ - field_name}}" - )) - ), - }, - ); - db.commit(tx).await?; - } - let (_, zip_object_key, usage) = export_worker - .export_inner( - ExportFormat::Zip { - include_storage: true, - }, - ExportRequestor::SnapshotExport, - |_| async { Ok(()) }, - ) - .await?; - - // Check we can get the stored zip. - let storage_stream = storage - .get(&zip_object_key) - .await? - .context("object missing from storage")?; - let stored_bytes = storage_stream.collect_as_bytes().await?; - let mut zip_reader = async_zip::read::mem::ZipFileReader::new(&stored_bytes).await?; - let mut zip_entries = BTreeMap::new(); - let filenames: Vec<_> = zip_reader - .entries() - .into_iter() - .map(|entry| entry.filename().to_string()) - .collect(); - for (i, filename) in filenames.into_iter().enumerate() { - let entry_reader = zip_reader.entry_reader(i).await?; - let entry_contents = String::from_utf8(entry_reader.read_to_end_crc().await?)?; - zip_entries.insert(filename, entry_contents); - } - assert_eq!(zip_entries, expected_export_entries); - - let usage = usage.gather_user_stats(); - let component_path = ComponentPath::test_user(); - assert!(usage.database_egress_size[&(component_path.clone(), "table_0".to_string())] > 0); - assert!(usage.database_egress_size[&(component_path.clone(), "table_1".to_string())] > 0); - assert!(usage.database_egress_size[&(component_path, "table_2".to_string())] > 0); - - Ok(()) - } - - async fn write_test_data_in_component( - db: &Database, - component: ComponentId, - path_prefix: &str, - expected_export_entries: &mut BTreeMap, - ) -> anyhow::Result<()> { - expected_export_entries.insert( - format!("{path_prefix}_tables/documents.jsonl"), - format!("{}\n", json!({"name": "messages", "id": 10001}),), - ); - // Write to tables in each component - let table: TableName = str::parse("messages")?; - let mut tx = db.begin(Identity::system()).await?; - let id = UserFacingModel::new(&mut tx, component.into()) - .insert(table, assert_obj!("channel" => "c", "text" => path_prefix)) - .await?; - let doc = UserFacingModel::new(&mut tx, component.into()) - .get(id, None) - .await? - .unwrap(); - let tablet_id = tx - .table_mapping() - .namespace(component.into()) - .number_to_tablet()(doc.table())?; - let doc = doc.to_resolved(tablet_id); - let expected_documents = format!( - "{}\n", - serde_json::to_string(&doc.export(ValueFormat::ConvexCleanJSON))? - ); - let expected_generated_schema = format!( - "{}\n", - json!(format!( - r#"{{"_creationTime": normalfloat64, "_id": "{id}", "channel": "c", "text": field_name}}"#, - )) - ); - expected_export_entries.insert( - format!("{path_prefix}messages/documents.jsonl"), - expected_documents.clone(), - ); - expected_export_entries.insert( - format!("{path_prefix}messages/generated_schema.jsonl"), - expected_generated_schema.clone(), - ); - db.commit(tx).await?; - Ok(()) - } - - #[convex_macro::test_runtime] - async fn test_export_components(rt: TestRuntime) -> anyhow::Result<()> { - let application = Application::new_for_tests(&rt).await?; - application - .load_component_tests_modules("with-schema") - .await?; - let db = application.database().clone(); - let storage: Arc = Arc::new(LocalDirStorage::new(rt.clone())?); - let file_storage: Arc = Arc::new(LocalDirStorage::new(rt.clone())?); - let mut export_worker = - ExportWorker::new_test(rt, db.clone(), storage.clone(), file_storage); - - let mut expected_export_entries = BTreeMap::new(); - - expected_export_entries.insert("README.md".to_string(), README_MD_CONTENTS.to_string()); - - let mut tx = db.begin(Identity::system()).await?; - let component_path = "component".parse()?; - let (_, child_component) = - BootstrapComponentsModel::new(&mut tx).must_component_path_to_ids(&component_path)?; - - for (path_prefix, component) in [ - ("", ComponentId::Root), - ("_components/component/", child_component), - ] { - write_test_data_in_component(&db, component, path_prefix, &mut expected_export_entries) - .await?; - } - - let (_, zip_object_key, usage) = export_worker - .export_inner( - ExportFormat::Zip { - include_storage: false, - }, - ExportRequestor::SnapshotExport, - |_| async { Ok(()) }, - ) - .await?; - - // Check we can get the stored zip. - let storage_stream = storage - .get(&zip_object_key) - .await? - .context("object missing from storage")?; - let stored_bytes = storage_stream.collect_as_bytes().await?; - let mut zip_reader = async_zip::read::mem::ZipFileReader::new(&stored_bytes).await?; - let mut zip_entries = BTreeMap::new(); - let filenames: Vec<_> = zip_reader - .entries() - .into_iter() - .map(|entry| entry.filename().to_string()) - .collect(); - for (i, filename) in filenames.into_iter().enumerate() { - let entry_reader = zip_reader.entry_reader(i).await?; - let entry_contents = String::from_utf8(entry_reader.read_to_end_crc().await?)?; - zip_entries.insert(filename, entry_contents); - } - assert_eq!(zip_entries, expected_export_entries); - - let usage = usage.gather_user_stats(); - assert!(usage.database_egress_size[&(component_path, "messages".to_string())] > 0); - Ok(()) - } - - #[convex_macro::test_runtime] - async fn test_export_unmounted_components(rt: TestRuntime) -> anyhow::Result<()> { - let application = Application::new_for_tests(&rt).await?; - unmount_component(&application).await?; - - let db = application.database().clone(); - let storage: Arc = Arc::new(LocalDirStorage::new(rt.clone())?); - let file_storage: Arc = Arc::new(LocalDirStorage::new(rt.clone())?); - let mut export_worker = - ExportWorker::new_test(rt, db.clone(), storage.clone(), file_storage); - - let expected_export_entries = btreeset! { - "README.md".to_string(), - "_components/component/_tables/documents.jsonl".to_string(), - "_components/component/messages/documents.jsonl".to_string(), - "_components/component/messages/generated_schema.jsonl".to_string(), - "_components/envVars/_components/component/_tables/documents.jsonl".to_string(), - "_components/envVars/_components/component/messages/documents.jsonl".to_string(), - "_components/envVars/_components/component/messages/generated_schema.jsonl".to_string(), - "_components/envVars/_tables/documents.jsonl".to_string(), - "_tables/documents.jsonl".to_string(), - }; - - let (_, zip_object_key, usage) = export_worker - .export_inner( - ExportFormat::Zip { - include_storage: false, - }, - ExportRequestor::SnapshotExport, - |_| async { Ok(()) }, - ) - .await?; - - // Check we can get the stored zip. - let storage_stream = storage - .get(&zip_object_key) - .await? - .context("object missing from storage")?; - let stored_bytes = storage_stream.collect_as_bytes().await?; - let mut zip_reader = async_zip::read::mem::ZipFileReader::new(&stored_bytes).await?; - let mut zip_entries = BTreeSet::new(); - let filenames: Vec<_> = zip_reader - .entries() - .into_iter() - .map(|entry| entry.filename().to_string()) - .collect(); - for (i, filename) in filenames.into_iter().enumerate() { - let entry_reader = zip_reader.entry_reader(i).await?; - let _entry_contents = String::from_utf8(entry_reader.read_to_end_crc().await?)?; - zip_entries.insert(filename); - } - assert_eq!(zip_entries, expected_export_entries); - - let usage = usage.gather_user_stats(); - assert!(usage.database_egress_size[&("component".parse()?, "messages".to_string())] > 0); - Ok(()) - } - - #[convex_macro::test_runtime] - async fn test_export_storage(rt: TestRuntime) -> anyhow::Result<()> { - let DbFixtures { db, .. } = DbFixtures::new_with_model(&rt).await?; - let storage: Arc = Arc::new(LocalDirStorage::new(rt.clone())?); - let file_storage: Arc = Arc::new(LocalDirStorage::new(rt.clone())?); - let mut export_worker = ExportWorker::new_test( - rt.clone(), - db.clone(), - storage.clone(), - file_storage.clone(), - ); - let file_storage_wrapper = FileStorage { - database: db.clone(), - transactional_file_storage: TransactionalFileStorage::new( - rt, - file_storage, - ConvexOrigin::from("origin".to_string()), - ), - }; - let mut expected_export_entries = BTreeMap::new(); - - expected_export_entries.insert("README.md".to_string(), README_MD_CONTENTS.to_string()); - expected_export_entries.insert("_tables/documents.jsonl".to_string(), format!("")); - - // Write a few storage files. - let usage_tracker = FunctionUsageTracker::new(); - let file1_id = file_storage_wrapper - .store_file( - TableNamespace::test_user(), - None, - Some(ContentType::jpeg()), - futures::stream::iter(vec![Ok(Bytes::from_static(b"abc"))]), - None, - &usage_tracker, - ) - .await?; - let mut tx = db.begin(Identity::system()).await?; - let storage_table_id = tx - .table_mapping() - .namespace(TableNamespace::test_user()) - .id(&"_file_storage".parse()?)?; - let file1: ParsedDocument = tx - .get(ResolvedDocumentId::new( - storage_table_id.tablet_id, - DeveloperDocumentId::new(storage_table_id.table_number, file1_id.internal_id()), - )) - .await? - .unwrap() - .try_into()?; - - expected_export_entries.insert(format!("_storage/{file1_id}.jpeg"), format!("abc")); - expected_export_entries.insert( - "_storage/documents.jsonl".to_string(), - format!( - "{}\n", - json!({"_id": file1_id.encode(), "_creationTime": f64::from(file1.creation_time().unwrap()), "sha256": "ungWv48Bz+pBQUDeXa4iI7ADYaOWF3qctBD/YfIAFa0=", "size": 3, "contentType": "image/jpeg", "internalId": file1.storage_id.to_string()}), - ), - ); - - let (_, zip_object_key, usage) = export_worker - .export_inner( - ExportFormat::Zip { - include_storage: true, - }, - ExportRequestor::SnapshotExport, - |_| async { Ok(()) }, - ) - .await?; - - // Check we can get the stored zip. - let storage_stream = storage - .get(&zip_object_key) - .await? - .context("object missing from storage")?; - let stored_bytes = storage_stream.collect_as_bytes().await?; - let mut zip_reader = async_zip::read::mem::ZipFileReader::new(&stored_bytes).await?; - let mut zip_entries = BTreeMap::new(); - let filenames: Vec<_> = zip_reader - .entries() - .into_iter() - .map(|entry| entry.filename().to_string()) - .collect(); - for (i, filename) in filenames.into_iter().enumerate() { - let entry_reader = zip_reader.entry_reader(i).await?; - let entry_contents = String::from_utf8(entry_reader.read_to_end_crc().await?)?; - zip_entries.insert(filename, entry_contents); - } - assert_eq!(zip_entries, expected_export_entries); - - let usage = usage.gather_user_stats(); - assert!(usage.database_egress_size.is_empty()); - - Ok(()) - } - - // Regression test: previously we were trying to export documents from deleted - // tables and table_mapping was failing. - #[convex_macro::test_runtime] - async fn test_export_with_table_delete(rt: TestRuntime) -> anyhow::Result<()> { - let DbFixtures { db, .. } = DbFixtures::new(&rt).await?; - let storage: Arc = Arc::new(LocalDirStorage::new(rt.clone())?); - let file_storage: Arc = Arc::new(LocalDirStorage::new(rt.clone())?); - let mut export_worker = - ExportWorker::new_test(rt.clone(), db.clone(), storage.clone(), file_storage); - - // Write to two tables and delete one. - let mut tx = db.begin(Identity::system()).await?; - UserFacingModel::new_root_for_test(&mut tx) - .insert("table_0".parse()?, ConvexObject::empty()) - .await?; - db.commit(tx).await?; - let mut tx = db.begin(Identity::system()).await?; - UserFacingModel::new_root_for_test(&mut tx) - .insert("table_1".parse()?, ConvexObject::empty()) - .await?; - db.commit(tx).await?; - let mut tx = db.begin(Identity::system()).await?; - TableModel::new(&mut tx) - .delete_table(TableNamespace::test_user(), "table_0".parse()?) - .await?; - db.commit(tx).await?; - - let (_, _zip_object_key, _) = export_worker - .export_inner( - ExportFormat::Zip { - include_storage: false, - }, - ExportRequestor::SnapshotExport, - |_| async { Ok(()) }, - ) - .await?; - Ok(()) - } - - #[test] - fn test_get_export_path_prefix() -> anyhow::Result<()> { - assert_eq!(get_export_path_prefix(&ComponentPath::root()), ""); - assert_eq!(get_export_path_prefix(&"a".parse()?), "_components/a/"); - assert_eq!( - get_export_path_prefix(&"a/b".parse()?), - "_components/a/_components/b/" - ); - assert_eq!( - get_export_path_prefix(&"a/b/c".parse()?), - "_components/a/_components/b/_components/c/" - ); - Ok(()) - } -} diff --git a/crates/application/src/exports/mod.rs b/crates/application/src/exports/mod.rs new file mode 100644 index 00000000..d57f2059 --- /dev/null +++ b/crates/application/src/exports/mod.rs @@ -0,0 +1,423 @@ +use std::collections::{ + BTreeMap, + BTreeSet, +}; + +use anyhow::Context; +use bytes::Bytes; +use common::{ + self, + async_compat::TokioAsyncWriteCompatExt, + bootstrap_model::tables::TABLES_TABLE, + components::{ + ComponentId, + ComponentPath, + }, + document::ParsedDocument, + runtime::Runtime, + types::{ + IndexId, + ObjectKey, + RepeatableTimestamp, + TableName, + Timestamp, + }, +}; +use database::{ + IndexModel, + TableSummary, + COMPONENTS_TABLE, +}; +use futures::{ + pin_mut, + try_join, + AsyncWriteExt, + Future, + StreamExt, + TryStreamExt, +}; +use itertools::Itertools; +use keybroker::Identity; +use mime2ext::mime2ext; +use model::{ + exports::types::{ + ExportFormat, + ExportRequestor, + }, + file_storage::{ + types::FileStorageEntry, + FILE_STORAGE_TABLE, + FILE_STORAGE_VIRTUAL_TABLE, + }, +}; +use serde::{ + Deserialize, + Serialize, +}; +use serde_json::json; +use shape_inference::export_context::{ + ExportContext, + GeneratedSchema, +}; +use storage::{ + ChannelWriter, + StorageExt, + Upload, + UploadExt, +}; +use tokio::sync::mpsc; +use tokio_stream::wrappers::ReceiverStream; +use usage_tracking::{ + FunctionUsageTracker, + StorageUsageTracker, +}; +use value::{ + TableNamespace, + TableNumber, + TabletId, +}; + +use crate::exports::{ + worker::ExportWorker, + zip_uploader::ZipSnapshotUpload, +}; + +#[cfg(test)] +mod tests; +pub mod worker; +mod zip_uploader; + +async fn export_inner( + worker: &mut ExportWorker, + format: ExportFormat, + requestor: ExportRequestor, + update_progress: F, +) -> anyhow::Result<(Timestamp, ObjectKey, FunctionUsageTracker)> +where + F: Fn(String) -> Fut + Send + Copy, + Fut: Future> + Send, +{ + let storage = &worker.storage; + update_progress("Beginning backup".to_string()).await?; + let (ts, tables, component_ids_to_paths, by_id_indexes, system_tables) = { + let mut tx = worker.database.begin(Identity::system()).await?; + let by_id_indexes = IndexModel::new(&mut tx).by_id_indexes().await?; + let snapshot = worker.database.snapshot(tx.begin_timestamp())?; + let table_summaries = snapshot.must_table_summaries()?; + let tables: BTreeMap<_, _> = snapshot + .table_registry + .iter_active_user_tables() + .map(|(tablet_id, table_namespace, table_number, table_name)| { + ( + tablet_id, + ( + table_namespace, + table_number, + table_name.clone(), + table_summaries.tablet_summary(&tablet_id), + ), + ) + }) + .collect(); + let component_ids_to_paths = snapshot.component_ids_to_paths(); + let system_tables = snapshot + .table_registry + .iter_active_system_tables() + .map(|(id, namespace, _, name)| ((namespace, name.clone()), id)) + .collect(); + ( + tx.begin_timestamp(), + tables, + component_ids_to_paths, + by_id_indexes, + system_tables, + ) + }; + match format { + ExportFormat::Zip { include_storage } => { + // Start upload. + let mut upload = storage.start_upload().await?; + let (sender, receiver) = mpsc::channel::(1); + let uploader = + upload.try_write_parallel_and_hash(ReceiverStream::new(receiver).map(Ok)); + let writer = ChannelWriter::new(sender, 5 * (1 << 20)); + let usage = FunctionUsageTracker::new(); + + let zipper = construct_zip_snapshot( + worker, + writer, + tables.clone(), + component_ids_to_paths, + ts, + by_id_indexes, + system_tables, + include_storage, + usage.clone(), + requestor, + update_progress, + ); + let (_, ()) = try_join!(uploader, zipper)?; + let zip_object_key = upload.complete().await?; + Ok((*ts, zip_object_key, usage)) + }, + } +} + +async fn write_component<'a, 'b: 'a, F, Fut, RT: Runtime>( + worker: &ExportWorker, + namespace: TableNamespace, + component_path: ComponentPath, + zip_snapshot_upload: &'a mut ZipSnapshotUpload<'b>, + tables: &'a mut BTreeMap, + snapshot_ts: RepeatableTimestamp, + by_id_indexes: &BTreeMap, + system_tables: &BTreeMap<(TableNamespace, TableName), TabletId>, + include_storage: bool, + usage: &FunctionUsageTracker, + requestor: ExportRequestor, + update_progress: F, +) -> anyhow::Result<()> +where + F: Fn(String) -> Fut + Send + Copy, + Fut: Future> + Send, +{ + let path_prefix = get_export_path_prefix(&component_path); + + let in_component_str = component_path.in_component_str(); + let tablet_ids: BTreeSet<_> = tables + .iter() + .filter(|(_, (ns, ..))| *ns == namespace) + .map(|(tablet_id, _)| *tablet_id) + .collect(); + + { + update_progress(format!("Enumerating tables{in_component_str}")).await?; + // _tables + let mut table_upload = zip_snapshot_upload + .start_system_table(&path_prefix, TABLES_TABLE.clone()) + .await?; + + // Write documents from stream to table uploads, in table number order. + // This includes all user tables present in the export. + let mut user_table_numbers_and_names: Vec<_> = tables + .iter() + .filter(|(_, (ns, ..))| *ns == namespace) + .map(|(_, (_, table_number, table_name, _))| (table_number, table_name)) + .collect(); + user_table_numbers_and_names.sort(); + for (table_number, table_name) in user_table_numbers_and_names { + table_upload + .write_json_line(json!({ + "name": table_name.clone(), + "id": *table_number, + })) + .await?; + } + table_upload.complete().await?; + } + + if include_storage { + update_progress(format!("Backing up _storage{in_component_str}")).await?; + + // _storage + let tablet_id = system_tables + .get(&(namespace, FILE_STORAGE_TABLE.clone())) + .context("_file_storage does not exist")?; + let by_id = by_id_indexes + .get(tablet_id) + .context("_file_storage.by_id does not exist")?; + + // First write metadata to _storage/documents.jsonl + let mut table_upload = zip_snapshot_upload + .start_system_table(&path_prefix, FILE_STORAGE_VIRTUAL_TABLE.clone()) + .await?; + let table_iterator = worker.database.table_iterator(snapshot_ts, 1000, None); + let stream = table_iterator.stream_documents_in_table(*tablet_id, *by_id, None); + pin_mut!(stream); + while let Some((doc, _ts)) = stream.try_next().await? { + let file_storage_entry = ParsedDocument::::try_from(doc)?; + let virtual_storage_id = file_storage_entry.id().developer_id; + let creation_time = f64::from( + file_storage_entry + .creation_time() + .context("file should have creation time")?, + ); + table_upload + .write_json_line(json!(FileStorageZipMetadata { + id: virtual_storage_id.encode(), + creation_time: Some(creation_time), + sha256: Some(file_storage_entry.sha256.as_base64()), + size: Some(file_storage_entry.size), + content_type: file_storage_entry.content_type.clone(), + internal_id: Some(file_storage_entry.storage_id.to_string()), + })) + .await?; + } + table_upload.complete().await?; + + let table_iterator = worker.database.table_iterator(snapshot_ts, 1000, None); + let stream = table_iterator.stream_documents_in_table(*tablet_id, *by_id, None); + pin_mut!(stream); + while let Some((doc, _ts)) = stream.try_next().await? { + let file_storage_entry = ParsedDocument::::try_from(doc)?; + let virtual_storage_id = file_storage_entry.id().developer_id; + // Add an extension, which isn't necessary for anything and might be incorrect, + // but allows the file to be viewed at a glance in most cases. + let extension_guess = file_storage_entry + .content_type + .as_ref() + .and_then(mime2ext) + .map(|extension| format!(".{extension}")) + .unwrap_or_default(); + let path = format!( + "{path_prefix}{}/{}{extension_guess}", + *FILE_STORAGE_VIRTUAL_TABLE, + virtual_storage_id.encode() + ); + let file_stream = worker + .file_storage + .get(&file_storage_entry.storage_key) + .await? + .with_context(|| { + format!( + "file missing from storage: {} with key {:?}", + file_storage_entry.developer_id().encode(), + file_storage_entry.storage_key, + ) + })?; + + let content_type = file_storage_entry + .content_type + .as_ref() + .map(|ct| ct.parse()) + .transpose()?; + usage.track_storage_call( + component_path.clone(), + requestor.usage_tag(), + file_storage_entry.storage_id.clone(), + content_type, + file_storage_entry.sha256.clone(), + ); + worker.usage_tracking.track_independent_storage_egress_size( + component_path.clone(), + requestor.usage_tag().to_string(), + file_stream.content_length as u64, + ); + zip_snapshot_upload + .stream_full_file(path, file_stream.stream) + .await?; + } + } + + for tablet_id in tablet_ids.iter() { + let (_, _, table_name, table_summary) = + tables.remove(tablet_id).expect("table should have details"); + let by_id = by_id_indexes + .get(tablet_id) + .ok_or_else(|| anyhow::anyhow!("no by_id index for {} found", tablet_id))?; + + update_progress(format!("Backing up {table_name}{in_component_str}")).await?; + + let mut generated_schema = GeneratedSchema::new(table_summary.inferred_type().into()); + if ExportContext::is_ambiguous(table_summary.inferred_type()) { + let table_iterator = worker.database.table_iterator(snapshot_ts, 1000, None); + let stream = table_iterator.stream_documents_in_table(*tablet_id, *by_id, None); + pin_mut!(stream); + while let Some((doc, _ts)) = stream.try_next().await? { + generated_schema.insert(doc.value(), doc.developer_id()); + } + } + + let mut table_upload = zip_snapshot_upload + .start_table(&path_prefix, table_name.clone(), generated_schema) + .await?; + + let table_iterator = worker.database.table_iterator(snapshot_ts, 1000, None); + let stream = table_iterator.stream_documents_in_table(*tablet_id, *by_id, None); + pin_mut!(stream); + + // Write documents from stream to table uploads + while let Some((doc, _ts)) = stream.try_next().await? { + usage.track_database_egress_size( + component_path.clone(), + table_name.to_string(), + doc.size() as u64, + false, + ); + table_upload.write(doc).await?; + } + table_upload.complete().await?; + } + + Ok(()) +} + +async fn construct_zip_snapshot( + worker: &ExportWorker, + mut writer: ChannelWriter, + mut tables: BTreeMap, + component_ids_to_paths: BTreeMap, + snapshot_ts: RepeatableTimestamp, + by_id_indexes: BTreeMap, + system_tables: BTreeMap<(TableNamespace, TableName), TabletId>, + include_storage: bool, + usage: FunctionUsageTracker, + requestor: ExportRequestor, + update_progress: F, +) -> anyhow::Result<()> +where + F: Fn(String) -> Fut + Send + Copy, + Fut: Future> + Send, +{ + let mut zip_snapshot_upload = ZipSnapshotUpload::new(&mut writer).await?; + + for (component_id, component_path) in component_ids_to_paths { + let namespace: TableNamespace = component_id.into(); + write_component( + worker, + namespace, + component_path, + &mut zip_snapshot_upload, + &mut tables, + snapshot_ts, + &by_id_indexes, + &system_tables, + include_storage, + &usage, + requestor, + update_progress, + ) + .await?; + } + + // Complete upload. + zip_snapshot_upload.complete().await?; + writer.compat_write().close().await?; + Ok(()) +} + +fn get_export_path_prefix(component_path: &ComponentPath) -> String { + component_path + .iter() + .map(|parent_name| { + format!( + "{}/{}/", + &*COMPONENTS_TABLE, + String::from(parent_name.clone()) + ) + }) + .join("") +} + +#[derive(Debug, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct FileStorageZipMetadata { + #[serde(rename = "_id")] + pub id: String, + #[serde(rename = "_creationTime")] + pub creation_time: Option, + pub sha256: Option, + pub size: Option, + pub content_type: Option, + pub internal_id: Option, +} diff --git a/crates/application/src/exports/tests.rs b/crates/application/src/exports/tests.rs new file mode 100644 index 00000000..43d6f59a --- /dev/null +++ b/crates/application/src/exports/tests.rs @@ -0,0 +1,516 @@ +use std::{ + collections::{ + BTreeMap, + BTreeSet, + }, + str, + sync::Arc, +}; + +use anyhow::Context; +use bytes::Bytes; +use common::{ + components::{ + ComponentId, + ComponentPath, + }, + document::ParsedDocument, + types::{ + ConvexOrigin, + TableName, + }, + value::ConvexObject, +}; +use database::{ + test_helpers::DbFixtures, + BootstrapComponentsModel, + Database, + TableModel, + UserFacingModel, +}; +use file_storage::{ + FileStorage, + TransactionalFileStorage, +}; +use headers::ContentType; +use keybroker::Identity; +use maplit::btreeset; +use model::{ + exports::types::{ + ExportFormat, + ExportRequestor, + }, + file_storage::types::FileStorageEntry, + test_helpers::DbFixturesWithModel, +}; +use runtime::testing::TestRuntime; +use serde_json::json; +use storage::{ + LocalDirStorage, + Storage, + StorageExt, +}; +use usage_tracking::FunctionUsageTracker; +use value::{ + assert_obj, + export::ValueFormat, + DeveloperDocumentId, + ResolvedDocumentId, + TableNamespace, +}; + +use super::ExportWorker; +use crate::{ + exports::{ + export_inner, + get_export_path_prefix, + zip_uploader::README_MD_CONTENTS, + }, + test_helpers::ApplicationTestExt, + tests::components::unmount_component, + Application, +}; + +#[convex_macro::test_runtime] +async fn test_export_zip(rt: TestRuntime) -> anyhow::Result<()> { + let DbFixtures { db, .. } = DbFixtures::new_with_model(&rt).await?; + let storage: Arc = Arc::new(LocalDirStorage::new(rt.clone())?); + let file_storage: Arc = Arc::new(LocalDirStorage::new(rt.clone())?); + let mut export_worker = ExportWorker::new_test(rt, db.clone(), storage.clone(), file_storage); + + let mut expected_export_entries = BTreeMap::new(); + + expected_export_entries.insert("README.md".to_string(), README_MD_CONTENTS.to_string()); + + expected_export_entries.insert( + "_tables/documents.jsonl".to_string(), + format!( + "{}\n{}\n{}\n", + json!({"name": "table_0", "id": 10001}), + json!({"name": "table_1", "id": 10002}), + json!({"name": "table_2", "id": 10003}), + ), + ); + expected_export_entries.insert("_storage/documents.jsonl".to_string(), format!("")); + + // Write to a bunch of tables + for i in 0..3 { + let table: TableName = str::parse(format!("table_{i}").as_str())?; + let mut tx = db.begin(Identity::system()).await?; + let id = match i { + 0 => { + UserFacingModel::new_root_for_test(&mut tx) + .insert(table, assert_obj!("foo" => 1)) + .await? + }, + 1 => { + UserFacingModel::new_root_for_test(&mut tx) + .insert(table, assert_obj!("foo" => [1, "1"])) + .await? + }, + _ => { + UserFacingModel::new_root_for_test(&mut tx) + .insert(table, assert_obj!("foo" => "1")) + .await? + }, + }; + let doc = UserFacingModel::new_root_for_test(&mut tx) + .get(id, None) + .await? + .unwrap(); + let tablet_id = tx + .table_mapping() + .namespace(TableNamespace::test_user()) + .number_to_tablet()(doc.table())?; + let doc = doc.to_resolved(tablet_id); + let id_v6 = doc.developer_id().encode(); + expected_export_entries.insert( + format!("table_{i}/documents.jsonl"), + format!( + "{}\n", + serde_json::to_string(&doc.export(ValueFormat::ConvexCleanJSON))? + ), + ); + expected_export_entries.insert( + format!("table_{i}/generated_schema.jsonl"), + match i { + 0 => format!( + "{}\n", + json!(format!( + "{{\"_creationTime\": normalfloat64, \"_id\": \"{id_v6}\", \"foo\": \ + int64}}" + )) + ), + 1 => format!( + "{}\n{}\n", + json!(format!( + "{{\"_creationTime\": normalfloat64, \"_id\": \"{id_v6}\", \"foo\": \ + array}}" + )), + json!({id_v6: {"foo": ["int64", "infer"]}}) + ), + _ => format!( + "{}\n", + json!(format!( + "{{\"_creationTime\": normalfloat64, \"_id\": \"{id_v6}\", \"foo\": \ + field_name}}" + )) + ), + }, + ); + db.commit(tx).await?; + } + let (_, zip_object_key, usage) = export_inner( + &mut export_worker, + ExportFormat::Zip { + include_storage: true, + }, + ExportRequestor::SnapshotExport, + |_| async { Ok(()) }, + ) + .await?; + + // Check we can get the stored zip. + let storage_stream = storage + .get(&zip_object_key) + .await? + .context("object missing from storage")?; + let stored_bytes = storage_stream.collect_as_bytes().await?; + let mut zip_reader = async_zip::read::mem::ZipFileReader::new(&stored_bytes).await?; + let mut zip_entries = BTreeMap::new(); + let filenames: Vec<_> = zip_reader + .entries() + .into_iter() + .map(|entry| entry.filename().to_string()) + .collect(); + for (i, filename) in filenames.into_iter().enumerate() { + let entry_reader = zip_reader.entry_reader(i).await?; + let entry_contents = String::from_utf8(entry_reader.read_to_end_crc().await?)?; + zip_entries.insert(filename, entry_contents); + } + assert_eq!(zip_entries, expected_export_entries); + + let usage = usage.gather_user_stats(); + let component_path = ComponentPath::test_user(); + assert!(usage.database_egress_size[&(component_path.clone(), "table_0".to_string())] > 0); + assert!(usage.database_egress_size[&(component_path.clone(), "table_1".to_string())] > 0); + assert!(usage.database_egress_size[&(component_path, "table_2".to_string())] > 0); + + Ok(()) +} + +async fn write_test_data_in_component( + db: &Database, + component: ComponentId, + path_prefix: &str, + expected_export_entries: &mut BTreeMap, +) -> anyhow::Result<()> { + expected_export_entries.insert( + format!("{path_prefix}_tables/documents.jsonl"), + format!("{}\n", json!({"name": "messages", "id": 10001}),), + ); + // Write to tables in each component + let table: TableName = str::parse("messages")?; + let mut tx = db.begin(Identity::system()).await?; + let id = UserFacingModel::new(&mut tx, component.into()) + .insert(table, assert_obj!("channel" => "c", "text" => path_prefix)) + .await?; + let doc = UserFacingModel::new(&mut tx, component.into()) + .get(id, None) + .await? + .unwrap(); + let tablet_id = tx + .table_mapping() + .namespace(component.into()) + .number_to_tablet()(doc.table())?; + let doc = doc.to_resolved(tablet_id); + let expected_documents = format!( + "{}\n", + serde_json::to_string(&doc.export(ValueFormat::ConvexCleanJSON))? + ); + let expected_generated_schema = format!( + "{}\n", + json!(format!( + r#"{{"_creationTime": normalfloat64, "_id": "{id}", "channel": "c", "text": field_name}}"#, + )) + ); + expected_export_entries.insert( + format!("{path_prefix}messages/documents.jsonl"), + expected_documents.clone(), + ); + expected_export_entries.insert( + format!("{path_prefix}messages/generated_schema.jsonl"), + expected_generated_schema.clone(), + ); + db.commit(tx).await?; + Ok(()) +} + +#[convex_macro::test_runtime] +async fn test_export_components(rt: TestRuntime) -> anyhow::Result<()> { + let application = Application::new_for_tests(&rt).await?; + application + .load_component_tests_modules("with-schema") + .await?; + let db = application.database().clone(); + let storage: Arc = Arc::new(LocalDirStorage::new(rt.clone())?); + let file_storage: Arc = Arc::new(LocalDirStorage::new(rt.clone())?); + let mut export_worker = ExportWorker::new_test(rt, db.clone(), storage.clone(), file_storage); + + let mut expected_export_entries = BTreeMap::new(); + + expected_export_entries.insert("README.md".to_string(), README_MD_CONTENTS.to_string()); + + let mut tx = db.begin(Identity::system()).await?; + let component_path = "component".parse()?; + let (_, child_component) = + BootstrapComponentsModel::new(&mut tx).must_component_path_to_ids(&component_path)?; + + for (path_prefix, component) in [ + ("", ComponentId::Root), + ("_components/component/", child_component), + ] { + write_test_data_in_component(&db, component, path_prefix, &mut expected_export_entries) + .await?; + } + + let (_, zip_object_key, usage) = export_inner( + &mut export_worker, + ExportFormat::Zip { + include_storage: false, + }, + ExportRequestor::SnapshotExport, + |_| async { Ok(()) }, + ) + .await?; + + // Check we can get the stored zip. + let storage_stream = storage + .get(&zip_object_key) + .await? + .context("object missing from storage")?; + let stored_bytes = storage_stream.collect_as_bytes().await?; + let mut zip_reader = async_zip::read::mem::ZipFileReader::new(&stored_bytes).await?; + let mut zip_entries = BTreeMap::new(); + let filenames: Vec<_> = zip_reader + .entries() + .into_iter() + .map(|entry| entry.filename().to_string()) + .collect(); + for (i, filename) in filenames.into_iter().enumerate() { + let entry_reader = zip_reader.entry_reader(i).await?; + let entry_contents = String::from_utf8(entry_reader.read_to_end_crc().await?)?; + zip_entries.insert(filename, entry_contents); + } + assert_eq!(zip_entries, expected_export_entries); + + let usage = usage.gather_user_stats(); + assert!(usage.database_egress_size[&(component_path, "messages".to_string())] > 0); + Ok(()) +} + +#[convex_macro::test_runtime] +async fn test_export_unmounted_components(rt: TestRuntime) -> anyhow::Result<()> { + let application = Application::new_for_tests(&rt).await?; + unmount_component(&application).await?; + + let db = application.database().clone(); + let storage: Arc = Arc::new(LocalDirStorage::new(rt.clone())?); + let file_storage: Arc = Arc::new(LocalDirStorage::new(rt.clone())?); + let mut export_worker = ExportWorker::new_test(rt, db.clone(), storage.clone(), file_storage); + + let expected_export_entries = btreeset! { + "README.md".to_string(), + "_components/component/_tables/documents.jsonl".to_string(), + "_components/component/messages/documents.jsonl".to_string(), + "_components/component/messages/generated_schema.jsonl".to_string(), + "_components/envVars/_components/component/_tables/documents.jsonl".to_string(), + "_components/envVars/_components/component/messages/documents.jsonl".to_string(), + "_components/envVars/_components/component/messages/generated_schema.jsonl".to_string(), + "_components/envVars/_tables/documents.jsonl".to_string(), + "_tables/documents.jsonl".to_string(), + }; + + let (_, zip_object_key, usage) = export_inner( + &mut export_worker, + ExportFormat::Zip { + include_storage: false, + }, + ExportRequestor::SnapshotExport, + |_| async { Ok(()) }, + ) + .await?; + + // Check we can get the stored zip. + let storage_stream = storage + .get(&zip_object_key) + .await? + .context("object missing from storage")?; + let stored_bytes = storage_stream.collect_as_bytes().await?; + let mut zip_reader = async_zip::read::mem::ZipFileReader::new(&stored_bytes).await?; + let mut zip_entries = BTreeSet::new(); + let filenames: Vec<_> = zip_reader + .entries() + .into_iter() + .map(|entry| entry.filename().to_string()) + .collect(); + for (i, filename) in filenames.into_iter().enumerate() { + let entry_reader = zip_reader.entry_reader(i).await?; + let _entry_contents = String::from_utf8(entry_reader.read_to_end_crc().await?)?; + zip_entries.insert(filename); + } + assert_eq!(zip_entries, expected_export_entries); + + let usage = usage.gather_user_stats(); + assert!(usage.database_egress_size[&("component".parse()?, "messages".to_string())] > 0); + Ok(()) +} + +#[convex_macro::test_runtime] +async fn test_export_storage(rt: TestRuntime) -> anyhow::Result<()> { + let DbFixtures { db, .. } = DbFixtures::new_with_model(&rt).await?; + let storage: Arc = Arc::new(LocalDirStorage::new(rt.clone())?); + let file_storage: Arc = Arc::new(LocalDirStorage::new(rt.clone())?); + let mut export_worker = ExportWorker::new_test( + rt.clone(), + db.clone(), + storage.clone(), + file_storage.clone(), + ); + let file_storage_wrapper = FileStorage { + database: db.clone(), + transactional_file_storage: TransactionalFileStorage::new( + rt, + file_storage, + ConvexOrigin::from("origin".to_string()), + ), + }; + let mut expected_export_entries = BTreeMap::new(); + + expected_export_entries.insert("README.md".to_string(), README_MD_CONTENTS.to_string()); + expected_export_entries.insert("_tables/documents.jsonl".to_string(), format!("")); + + // Write a few storage files. + let usage_tracker = FunctionUsageTracker::new(); + let file1_id = file_storage_wrapper + .store_file( + TableNamespace::test_user(), + None, + Some(ContentType::jpeg()), + futures::stream::iter(vec![Ok(Bytes::from_static(b"abc"))]), + None, + &usage_tracker, + ) + .await?; + let mut tx = db.begin(Identity::system()).await?; + let storage_table_id = tx + .table_mapping() + .namespace(TableNamespace::test_user()) + .id(&"_file_storage".parse()?)?; + let file1: ParsedDocument = tx + .get(ResolvedDocumentId::new( + storage_table_id.tablet_id, + DeveloperDocumentId::new(storage_table_id.table_number, file1_id.internal_id()), + )) + .await? + .unwrap() + .try_into()?; + + expected_export_entries.insert(format!("_storage/{file1_id}.jpeg"), format!("abc")); + expected_export_entries.insert( + "_storage/documents.jsonl".to_string(), + format!( + "{}\n", + json!({"_id": file1_id.encode(), "_creationTime": f64::from(file1.creation_time().unwrap()), "sha256": "ungWv48Bz+pBQUDeXa4iI7ADYaOWF3qctBD/YfIAFa0=", "size": 3, "contentType": "image/jpeg", "internalId": file1.storage_id.to_string()}), + ), + ); + + let (_, zip_object_key, usage) = export_inner( + &mut export_worker, + ExportFormat::Zip { + include_storage: true, + }, + ExportRequestor::SnapshotExport, + |_| async { Ok(()) }, + ) + .await?; + + // Check we can get the stored zip. + let storage_stream = storage + .get(&zip_object_key) + .await? + .context("object missing from storage")?; + let stored_bytes = storage_stream.collect_as_bytes().await?; + let mut zip_reader = async_zip::read::mem::ZipFileReader::new(&stored_bytes).await?; + let mut zip_entries = BTreeMap::new(); + let filenames: Vec<_> = zip_reader + .entries() + .into_iter() + .map(|entry| entry.filename().to_string()) + .collect(); + for (i, filename) in filenames.into_iter().enumerate() { + let entry_reader = zip_reader.entry_reader(i).await?; + let entry_contents = String::from_utf8(entry_reader.read_to_end_crc().await?)?; + zip_entries.insert(filename, entry_contents); + } + assert_eq!(zip_entries, expected_export_entries); + + let usage = usage.gather_user_stats(); + assert!(usage.database_egress_size.is_empty()); + + Ok(()) +} + +// Regression test: previously we were trying to export documents from deleted +// tables and table_mapping was failing. +#[convex_macro::test_runtime] +async fn test_export_with_table_delete(rt: TestRuntime) -> anyhow::Result<()> { + let DbFixtures { db, .. } = DbFixtures::new(&rt).await?; + let storage: Arc = Arc::new(LocalDirStorage::new(rt.clone())?); + let file_storage: Arc = Arc::new(LocalDirStorage::new(rt.clone())?); + let mut export_worker = + ExportWorker::new_test(rt.clone(), db.clone(), storage.clone(), file_storage); + + // Write to two tables and delete one. + let mut tx = db.begin(Identity::system()).await?; + UserFacingModel::new_root_for_test(&mut tx) + .insert("table_0".parse()?, ConvexObject::empty()) + .await?; + db.commit(tx).await?; + let mut tx = db.begin(Identity::system()).await?; + UserFacingModel::new_root_for_test(&mut tx) + .insert("table_1".parse()?, ConvexObject::empty()) + .await?; + db.commit(tx).await?; + let mut tx = db.begin(Identity::system()).await?; + TableModel::new(&mut tx) + .delete_table(TableNamespace::test_user(), "table_0".parse()?) + .await?; + db.commit(tx).await?; + + let (_, _zip_object_key, _) = export_inner( + &mut export_worker, + ExportFormat::Zip { + include_storage: false, + }, + ExportRequestor::SnapshotExport, + |_| async { Ok(()) }, + ) + .await?; + Ok(()) +} + +#[test] +fn test_get_export_path_prefix() -> anyhow::Result<()> { + assert_eq!(get_export_path_prefix(&ComponentPath::root()), ""); + assert_eq!(get_export_path_prefix(&"a".parse()?), "_components/a/"); + assert_eq!( + get_export_path_prefix(&"a/b".parse()?), + "_components/a/_components/b/" + ); + assert_eq!( + get_export_path_prefix(&"a/b/c".parse()?), + "_components/a/_components/b/_components/c/" + ); + Ok(()) +} diff --git a/crates/application/src/exports/worker.rs b/crates/application/src/exports/worker.rs new file mode 100644 index 00000000..f7151d5b --- /dev/null +++ b/crates/application/src/exports/worker.rs @@ -0,0 +1,327 @@ +use std::{ + sync::Arc, + time::Duration, +}; + +use anyhow::Context; +use common::{ + self, + backoff::Backoff, + components::ComponentPath, + document::ParsedDocument, + errors::report_error, + execution_context::ExecutionId, + pause::PauseClient, + runtime::Runtime, + types::UdfIdentifier, + RequestId, +}; +use database::{ + Database, + SystemMetadataModel, +}; +use futures::{ + Future, + FutureExt, +}; +use keybroker::Identity; +use model::exports::{ + types::{ + Export, + ExportRequestor, + }, + ExportsModel, +}; +use storage::Storage; +use usage_tracking::{ + CallType, + FunctionUsageTracker, + UsageCounter, +}; + +use crate::{ + exports::export_inner, + metrics::{ + export_timer, + log_worker_starting, + }, +}; + +const INITIAL_BACKOFF: Duration = Duration::from_secs(1); +const MAX_BACKOFF: Duration = Duration::from_secs(900); // 15 minutes + // +#[derive(thiserror::Error, Debug)] +#[error("Export canceled")] +struct ExportCanceled; + +pub struct ExportWorker { + pub(super) runtime: RT, + pub(super) database: Database, + pub(super) storage: Arc, + pub(super) file_storage: Arc, + pub(super) backoff: Backoff, + pub(super) usage_tracking: UsageCounter, +} + +impl ExportWorker { + #[allow(clippy::new_ret_no_self)] + pub fn new( + runtime: RT, + database: Database, + storage: Arc, + file_storage: Arc, + usage_tracking: UsageCounter, + ) -> impl Future + Send { + let mut worker = Self { + runtime, + database, + storage, + file_storage, + backoff: Backoff::new(INITIAL_BACKOFF, MAX_BACKOFF), + usage_tracking, + }; + async move { + loop { + if let Err(e) = worker.run().await { + report_error(&mut e.context("ExportWorker died")).await; + let delay = worker.backoff.fail(&mut worker.runtime.rng()); + worker.runtime.wait(delay).await; + } else { + worker.backoff.reset(); + } + } + } + } + + #[cfg(test)] + pub fn new_test( + runtime: RT, + database: Database, + storage: Arc, + file_storage: Arc, + ) -> Self { + use events::usage::NoOpUsageEventLogger; + + Self { + runtime, + database, + storage, + file_storage, + backoff: Backoff::new(INITIAL_BACKOFF, MAX_BACKOFF), + usage_tracking: UsageCounter::new(Arc::new(NoOpUsageEventLogger)), + } + } + + // Subscribe to the export table. If there is a requested export, start + // an export and mark as in_progress. If there's an export job that didn't + // finish (it's in_progress), restart that export. + pub async fn run(&mut self) -> anyhow::Result<()> { + let mut tx = self.database.begin(Identity::system()).await?; + let mut exports_model = ExportsModel::new(&mut tx); + let export_requested = exports_model.latest_requested().await?; + let export_in_progress = exports_model.latest_in_progress().await?; + match (export_requested, export_in_progress) { + (Some(_), Some(_)) => { + anyhow::bail!("Can only have one export requested or in progress at once.") + }, + (Some(export), None) => { + tracing::info!("Export requested."); + let _status = log_worker_starting("ExportWorker"); + let timer = export_timer(); + let ts = self.database.now_ts_for_reads(); + let in_progress_export = (*export).clone().in_progress(*ts)?; + let in_progress_export_doc = SystemMetadataModel::new_global(&mut tx) + .replace( + export.id().to_owned(), + in_progress_export.clone().try_into()?, + ) + .await? + .try_into()?; + self.database + .commit_with_write_source(tx, "export_worker_export_requested") + .await?; + self.export(in_progress_export_doc).await?; + timer.finish(); + return Ok(()); + }, + (None, Some(export)) => { + tracing::info!("In progress export restarting..."); + let _status = log_worker_starting("ExportWorker"); + let timer = export_timer(); + self.export(export).await?; + timer.finish(); + return Ok(()); + }, + (None, None) => { + tracing::info!("No exports requested or in progress."); + }, + } + let token = tx.into_token()?; + let subscription = self.database.subscribe(token).await?; + subscription.wait_for_invalidation().await; + Ok(()) + } + + async fn export(&mut self, export: ParsedDocument) -> anyhow::Result<()> { + loop { + match self.export_and_mark_complete(export.clone()).await { + Ok(()) => { + return Ok(()); + }, + Err(mut e) => { + if e.is::() { + tracing::info!("Export {} canceled", export.id()); + return Ok(()); + } + report_error(&mut e).await; + let delay = self.backoff.fail(&mut self.runtime.rng()); + tracing::error!("Export failed, retrying in {delay:?}"); + self.runtime.wait(delay).await; + }, + } + } + } + + async fn export_and_mark_complete( + &mut self, + export: ParsedDocument, + ) -> anyhow::Result<()> { + let id = export.id(); + let format = export.format(); + let requestor = export.requestor(); + drop(export); // Drop this to prevent accidentally using stale state + + tracing::info!("Export {id} beginning..."); + let (snapshot_ts, object_key, usage) = { + let database_ = self.database.clone(); + let export_future = async { + let database_ = self.database.clone(); + + export_inner(self, format, requestor, |msg| async { + tracing::info!("Export {id} progress: {msg}"); + database_ + .execute_with_occ_retries( + Identity::system(), + FunctionUsageTracker::new(), + PauseClient::new(), + "export_worker_update_progress", + move |tx| { + let msg = msg.clone(); + async move { + let export: ParsedDocument = + tx.get(id).await?.context(ExportCanceled)?.try_into()?; + let export = export.into_value(); + if let Export::Canceled { .. } = export { + anyhow::bail!(ExportCanceled); + } + SystemMetadataModel::new_global(tx) + .replace(id, export.update_progress(msg)?.try_into()?) + .await?; + Ok(()) + } + .boxed() + .into() + }, + ) + .await?; + Ok(()) + }) + .await + }; + tokio::pin!(export_future); + + // In parallel, monitor the export document to check for cancellation + let monitor_export = async move { + loop { + let mut tx = database_.begin_system().await?; + let Some(export) = tx.get(id).await? else { + tracing::warn!("Export {id} disappeared"); + return Err(ExportCanceled.into()); + }; + let export: ParsedDocument = export.try_into()?; + match *export { + Export::InProgress { .. } => (), + Export::Canceled { .. } => return Err(ExportCanceled.into()), + Export::Requested { .. } + | Export::Failed { .. } + | Export::Completed { .. } => { + anyhow::bail!("Export {id} is in unexpected state: {export:?}"); + }, + } + let token = tx.into_token()?; + let subscription = database_.subscribe(token).await?; + subscription.wait_for_invalidation().await; + } + }; + tokio::pin!(monitor_export); + + futures::future::select(export_future, monitor_export) + .await + .factor_first() + .0? + }; + + // Export is done; mark it as such. + tracing::info!("Export {id} completed"); + self.database + .execute_with_occ_retries( + Identity::system(), + FunctionUsageTracker::new(), + PauseClient::new(), + "export_worker_mark_complete", + |tx| { + let object_key = object_key.clone(); + async move { + let Some(export) = tx.get(id).await? else { + tracing::warn!("Export {id} disappeared"); + return Err(ExportCanceled.into()); + }; + let export: ParsedDocument = export.try_into()?; + if let Export::Canceled { .. } = *export { + return Err(ExportCanceled.into()); + } + let completed_export = export.into_value().completed( + snapshot_ts, + *tx.begin_timestamp(), + object_key, + )?; + SystemMetadataModel::new_global(tx) + .replace(id, completed_export.try_into()?) + .await?; + Ok(()) + } + .boxed() + .into() + }, + ) + .await?; + + let object_attributes = self + .storage + .get_object_attributes(&object_key) + .await? + .context("error getting export object attributes from S3")?; + + let tag = requestor.usage_tag().to_string(); + let call_type = match requestor { + ExportRequestor::SnapshotExport => CallType::Export, + ExportRequestor::CloudBackup => CallType::CloudBackup, + }; + // Charge file bandwidth for the upload of the snapshot to exports storage + self.usage_tracking.track_independent_storage_ingress_size( + ComponentPath::root(), + tag.clone(), + object_attributes.size, + ); + // Charge database bandwidth accumulated during the export + self.usage_tracking.track_call( + UdfIdentifier::Cli(tag), + ExecutionId::new(), + RequestId::new(), + call_type, + true, + usage.gather_user_stats(), + ); + Ok(()) + } +} diff --git a/crates/application/src/exports/zip_uploader.rs b/crates/application/src/exports/zip_uploader.rs new file mode 100644 index 00000000..cd12040f --- /dev/null +++ b/crates/application/src/exports/zip_uploader.rs @@ -0,0 +1,186 @@ +use async_zip::{ + write::{ + EntryStreamWriter, + ZipFileWriter, + }, + Compression, + ZipEntryBuilder, + ZipEntryBuilderExt, +}; +use bytes::Bytes; +use common::{ + self, + async_compat::TokioAsyncWriteCompatExt, + document::ResolvedDocument, + types::TableName, +}; +use futures::{ + stream::BoxStream, + AsyncWriteExt, + TryStreamExt, +}; +use serde_json::{ + json, + Value as JsonValue, +}; +use shape_inference::{ + export_context::GeneratedSchema, + ShapeConfig, +}; +use storage::ChannelWriter; +use value::export::ValueFormat; + +static AFTER_DOCUMENTS_CLEAN: Bytes = Bytes::from_static("\n".as_bytes()); + +// 0o644 => read-write for owner, read for everyone else. +const ZIP_ENTRY_PERMISSIONS: u16 = 0o644; + +pub(super) static README_MD_CONTENTS: &str = r#"# Welcome to your Convex snapshot export! + +This ZIP file contains a snapshot of the tables in your Convex deployment. + +Documents for each table are listed as lines of JSON in +/documents.jsonl files. + +For details on the format and how to use this snapshot with npx convex import, +check out [the docs](https://docs.convex.dev/database/import-export/export) or +ask us in [Discord](http://convex.dev/community). +"#; + +// 'a is lifetime of entire zip file writer. +// 'b is lifetime of entry writer for a single table. +pub struct ZipSnapshotTableUpload<'a, 'b> { + entry_writer: EntryStreamWriter<'b, &'a mut ChannelWriter>, +} + +impl<'a, 'b> ZipSnapshotTableUpload<'a, 'b> { + async fn new( + zip_writer: &'b mut ZipFileWriter<&'a mut ChannelWriter>, + path_prefix: &str, + table_name: TableName, + ) -> anyhow::Result { + let source_path = format!("{path_prefix}{table_name}/documents.jsonl"); + let builder = ZipEntryBuilder::new(source_path.clone(), Compression::Deflate) + .unix_permissions(ZIP_ENTRY_PERMISSIONS); + let entry_writer = zip_writer.write_entry_stream(builder.build()).await?; + Ok(Self { entry_writer }) + } + + pub async fn write(&mut self, doc: ResolvedDocument) -> anyhow::Result<()> { + let json = doc.export(ValueFormat::ConvexCleanJSON); + self.write_json_line(json).await + } + + pub async fn write_json_line(&mut self, json: JsonValue) -> anyhow::Result<()> { + let buf = serde_json::to_vec(&json)?; + self.entry_writer.compat_mut_write().write_all(&buf).await?; + self.entry_writer + .compat_mut_write() + .write_all(&AFTER_DOCUMENTS_CLEAN) + .await?; + Ok(()) + } + + pub async fn complete(self) -> anyhow::Result<()> { + self.entry_writer.close().await?; + Ok(()) + } +} + +pub struct ZipSnapshotUpload<'a> { + writer: ZipFileWriter<&'a mut ChannelWriter>, +} + +impl<'a> ZipSnapshotUpload<'a> { + pub async fn new(out: &'a mut ChannelWriter) -> anyhow::Result { + let writer = ZipFileWriter::new(out); + let mut zip_snapshot_upload = Self { writer }; + zip_snapshot_upload + .write_full_file(format!("README.md"), README_MD_CONTENTS) + .await?; + Ok(zip_snapshot_upload) + } + + async fn write_full_file(&mut self, path: String, contents: &str) -> anyhow::Result<()> { + let builder = ZipEntryBuilder::new(path, Compression::Deflate) + .unix_permissions(ZIP_ENTRY_PERMISSIONS); + let mut entry_writer = self.writer.write_entry_stream(builder.build()).await?; + entry_writer + .compat_mut_write() + .write_all(contents.as_bytes()) + .await?; + entry_writer.close().await?; + Ok(()) + } + + pub async fn stream_full_file( + &mut self, + path: String, + mut contents: BoxStream<'_, std::io::Result>, + ) -> anyhow::Result<()> { + let builder = ZipEntryBuilder::new(path, Compression::Deflate) + .unix_permissions(ZIP_ENTRY_PERMISSIONS); + let mut entry_writer = self.writer.write_entry_stream(builder.build()).await?; + while let Some(chunk) = contents.try_next().await? { + entry_writer.compat_mut_write().write_all(&chunk).await?; + } + entry_writer.close().await?; + Ok(()) + } + + pub async fn start_table( + &mut self, + path_prefix: &str, + table_name: TableName, + generated_schema: GeneratedSchema, + ) -> anyhow::Result> { + self.write_generated_schema(path_prefix, &table_name, generated_schema) + .await?; + + ZipSnapshotTableUpload::new(&mut self.writer, path_prefix, table_name).await + } + + /// System tables have known shape, so we don't need to serialize it. + pub async fn start_system_table( + &mut self, + path_prefix: &str, + table_name: TableName, + ) -> anyhow::Result> { + anyhow::ensure!(table_name.is_system()); + ZipSnapshotTableUpload::new(&mut self.writer, path_prefix, table_name).await + } + + async fn write_generated_schema( + &mut self, + path_prefix: &str, + table_name: &TableName, + generated_schema: GeneratedSchema, + ) -> anyhow::Result<()> { + let generated_schema_path = format!("{path_prefix}{table_name}/generated_schema.jsonl"); + let builder = ZipEntryBuilder::new(generated_schema_path.clone(), Compression::Deflate) + .unix_permissions(ZIP_ENTRY_PERMISSIONS); + let mut entry_writer = self.writer.write_entry_stream(builder.build()).await?; + let generated_schema_str = generated_schema.inferred_shape.to_string(); + entry_writer + .compat_mut_write() + .write_all(serde_json::to_string(&generated_schema_str)?.as_bytes()) + .await?; + entry_writer.compat_mut_write().write_all(b"\n").await?; + for (override_id, override_export_context) in generated_schema.overrides.into_iter() { + let override_json = + json!({override_id.encode(): JsonValue::from(override_export_context)}); + entry_writer + .compat_mut_write() + .write_all(serde_json::to_string(&override_json)?.as_bytes()) + .await?; + entry_writer.compat_mut_write().write_all(b"\n").await?; + } + entry_writer.close().await?; + Ok(()) + } + + pub async fn complete(self) -> anyhow::Result<()> { + self.writer.close().await?; + Ok(()) + } +} diff --git a/crates/application/src/lib.rs b/crates/application/src/lib.rs index 656bc371..9ff335ce 100644 --- a/crates/application/src/lib.rs +++ b/crates/application/src/lib.rs @@ -310,7 +310,7 @@ use vector::{ use crate::{ application_function_runner::ApplicationFunctionRunner, - export_worker::ExportWorker, + exports::worker::ExportWorker, function_log::{ FunctionExecutionLog, TableRate, @@ -331,7 +331,7 @@ pub mod application_function_runner; mod cache; pub mod cron_jobs; pub mod deploy_config; -mod export_worker; +mod exports; pub mod function_log; pub mod log_visibility; mod metrics; diff --git a/crates/application/src/snapshot_import/import_file_storage.rs b/crates/application/src/snapshot_import/import_file_storage.rs index 37cd5eb5..0cd7fb38 100644 --- a/crates/application/src/snapshot_import/import_file_storage.rs +++ b/crates/application/src/snapshot_import/import_file_storage.rs @@ -61,7 +61,7 @@ use value::{ }; use crate::{ - export_worker::FileStorageZipMetadata, + exports::FileStorageZipMetadata, snapshot_import::{ import_error::ImportError, parse::ImportUnit,