From 11f26234d491500fda9f97cee851021fdf2e12ef Mon Sep 17 00:00:00 2001 From: Li0k <493570386@qq.com> Date: Tue, 8 Oct 2024 15:52:14 +0800 Subject: [PATCH 1/2] feat(iceberg): support sql catalog interface (#7) * suppport remove all for file io * resolve conflict * reorder record batch * fix scan * fix filtered_entries * fix ci ut * change list item name back to list element * fix * feat(iceberg): support sql catalog interface * typo * chore(cargo): Downgrading sqlx from 0.8.0 to 0.7.3 * fix(iceberg): remove namespace_exists for sql catalogs * fix(iceberg): fix set snapshot ref * feat(iceberg): introduce disable_config_load for storage_s3 * chore(cargo): upgrade sqlx from 0.7.3 to 0.7.4 --------- Co-authored-by: Dylan Chen Co-authored-by: xxhZs <1060434431@qq.com> Co-authored-by: Xinhao Xu <84456268+xxhZs@users.noreply.github.com> --- crates/catalog/sql/Cargo.toml | 2 +- crates/catalog/sql/src/catalog.rs | 354 +++++++++++++++++- crates/iceberg/src/catalog/mod.rs | 2 +- crates/iceberg/src/io/storage_s3.rs | 8 + crates/iceberg/src/spec/table_metadata.rs | 6 + .../src/spec/table_metadata_builder.rs | 36 ++ 6 files changed, 395 insertions(+), 13 deletions(-) diff --git a/crates/catalog/sql/Cargo.toml b/crates/catalog/sql/Cargo.toml index a51671650..71cf37c14 100644 --- a/crates/catalog/sql/Cargo.toml +++ b/crates/catalog/sql/Cargo.toml @@ -40,7 +40,7 @@ uuid = { workspace = true, features = ["v4"] } iceberg_test_utils = { path = "../../test_utils", features = ["tests"] } itertools = { workspace = true } regex = "1.10.5" -sqlx = { version = "0.8.0", features = [ +sqlx = { version = "0.8.1", features = [ "tls-rustls", "runtime-tokio", "any", diff --git a/crates/catalog/sql/src/catalog.rs b/crates/catalog/sql/src/catalog.rs index 1556614dd..783bb9bd9 100644 --- a/crates/catalog/sql/src/catalog.rs +++ b/crates/catalog/sql/src/catalog.rs @@ -23,8 +23,8 @@ use iceberg::io::FileIO; use iceberg::spec::{TableMetadata, TableMetadataBuilder}; use iceberg::table::Table; use iceberg::{ - Catalog, Error, ErrorKind, Namespace, NamespaceIdent, Result, TableCommit, TableCreation, - TableIdent, + Catalog, Error, Namespace, NamespaceIdent, Result, TableCommit, TableCreation, TableIdent, + TableUpdate, }; use sqlx::any::{install_default_drivers, AnyPoolOptions, AnyQueryResult, AnyRow}; use sqlx::{Any, AnyPool, Row, Transaction}; @@ -588,8 +588,8 @@ impl Catalog for SqlCatalog { &format!( "DELETE FROM {CATALOG_TABLE_NAME} WHERE {CATALOG_FIELD_CATALOG_NAME} = ? - AND {CATALOG_FIELD_TABLE_NAMESPACE} = ? AND {CATALOG_FIELD_TABLE_NAME} = ? + AND {CATALOG_FIELD_TABLE_NAMESPACE} = ? AND ( {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}' OR {CATALOG_FIELD_RECORD_TYPE} IS NULL @@ -769,23 +769,98 @@ impl Catalog for SqlCatalog { Ok(()) } - async fn update_table(&self, _commit: TableCommit) -> Result { - Err(Error::new( - ErrorKind::FeatureUnsupported, - "Updating a table is not supported yet", - )) + async fn update_table(&self, mut commit: TableCommit) -> Result
{ + let identifier = commit.identifier().clone(); + if !self.table_exists(&identifier).await? { + return no_such_table_err(&identifier); + } + + // ReplaceSortOrder is currently not supported, so ignore the requirement here. + let _requirements = commit.take_requirements(); + let table_updates = commit.take_updates(); + + let table = self.load_table(&identifier).await?; + let mut update_table_metadata_builder = + TableMetadataBuilder::new_from_metadata(table.metadata().clone(), None); + + for table_update in table_updates { + match table_update { + TableUpdate::AddSnapshot { snapshot } => { + update_table_metadata_builder = + update_table_metadata_builder.add_snapshot(snapshot)?; + } + + TableUpdate::SetSnapshotRef { + ref_name, + reference, + } => { + update_table_metadata_builder = + update_table_metadata_builder.set_ref(&ref_name, reference)?; + } + + _ => { + unreachable!() + } + } + } + + let new_table_meta_location = metadata_path(table.metadata().location(), Uuid::new_v4()); + let file = self.fileio.new_output(&new_table_meta_location)?; + let update_table_metadata = update_table_metadata_builder.build()?; + file.write(serde_json::to_vec(&update_table_metadata.metadata)?.into()) + .await?; + + let update = format!( + "UPDATE {CATALOG_TABLE_NAME} + SET {CATALOG_FIELD_METADATA_LOCATION_PROP} = ? + WHERE {CATALOG_FIELD_CATALOG_NAME} = ? + AND {CATALOG_FIELD_TABLE_NAMESPACE} = ? + AND {CATALOG_FIELD_TABLE_NAME} = ? + AND ( + {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}' + OR {CATALOG_FIELD_RECORD_TYPE} IS NULL + )" + ); + + let namespace_name = identifier.namespace().join("."); + let args: Vec> = vec![ + Some(&new_table_meta_location), + Some(&self.name), + Some(&namespace_name), + Some(identifier.name()), + ]; + + self.execute(&update, args, None).await?; + + Ok(Table::builder() + .file_io(self.fileio.clone()) + .identifier(identifier) + .metadata_location(new_table_meta_location) + .metadata(update_table_metadata.metadata) + .build()?) } } +/// Generate the metadata path for a table +#[inline] +pub fn metadata_path(meta_data_location: &str, uuid: Uuid) -> String { + format!("{}/metadata/0-{}.metadata.json", meta_data_location, uuid) +} + #[cfg(test)] mod tests { use std::collections::{HashMap, HashSet}; use std::hash::Hash; use iceberg::io::FileIOBuilder; - use iceberg::spec::{BoundPartitionSpec, NestedField, PrimitiveType, Schema, SortOrder, Type}; + use iceberg::spec::{ + BoundPartitionSpec, NestedField, Operation, PrimitiveType, Schema, Snapshot, + SnapshotReference, SnapshotRetention, SortOrder, Summary, Type, MAIN_BRANCH, + }; use iceberg::table::Table; - use iceberg::{Catalog, Namespace, NamespaceIdent, TableCreation, TableIdent}; + use iceberg::{ + Catalog, Namespace, NamespaceIdent, TableCommit, TableCreation, TableIdent, TableUpdate, + }; use itertools::Itertools; use regex::Regex; use sqlx::migrate::MigrateDatabase; @@ -1692,7 +1767,27 @@ mod tests { .await .unwrap_err() .to_string(), - format!("Unexpected => No such table: {:?}", src_table_ident), + format!("Unexpected => No such table: {:?}", src_table_ident) + ); + } + + #[tokio::test] + async fn test_drop_table_throws_error_if_table_not_exist() { + let warehouse_loc = temp_path(); + let catalog = new_sql_catalog(warehouse_loc.clone()).await; + let namespace_ident = NamespaceIdent::new("a".into()); + let table_name = "tbl1"; + let table_ident = TableIdent::new(namespace_ident.clone(), table_name.into()); + create_namespace(&catalog, &namespace_ident).await; + + let err = catalog + .drop_table(&table_ident) + .await + .unwrap_err() + .to_string(); + assert_eq!( + err, + "Unexpected => No such table: TableIdent { namespace: NamespaceIdent([\"a\"]), name: \"tbl1\" }" ); } @@ -1715,4 +1810,241 @@ mod tests { format!("Unexpected => Table {:?} already exists.", &dst_table_ident), ); } + + #[tokio::test] + async fn test_drop_table() { + let warehouse_loc = temp_path(); + let catalog = new_sql_catalog(warehouse_loc.clone()).await; + let namespace_ident = NamespaceIdent::new("a".into()); + let table_name = "tbl1"; + let table_ident = TableIdent::new(namespace_ident.clone(), table_name.into()); + create_namespace(&catalog, &namespace_ident).await; + + let location = warehouse_loc.clone(); + let table_creation = TableCreation::builder() + .name(table_name.into()) + .location(location.clone()) + .schema(simple_table_schema()) + .build(); + + catalog + .create_table(&namespace_ident, table_creation) + .await + .unwrap(); + + let table = catalog.load_table(&table_ident).await.unwrap(); + assert_table_eq(&table, &table_ident, &simple_table_schema()); + + catalog.drop_table(&table_ident).await.unwrap(); + let err = catalog + .load_table(&table_ident) + .await + .unwrap_err() + .to_string(); + assert_eq!( + err, + "Unexpected => No such table: TableIdent { namespace: NamespaceIdent([\"a\"]), name: \"tbl1\" }" + ); + } + + #[tokio::test] + async fn test_update_table_throws_error_if_table_not_exist() { + let warehouse_loc = temp_path(); + let catalog = new_sql_catalog(warehouse_loc.clone()).await; + let namespace_ident = NamespaceIdent::new("a".into()); + let table_name = "tbl1"; + let table_ident = TableIdent::new(namespace_ident.clone(), table_name.into()); + create_namespace(&catalog, &namespace_ident).await; + let table_commit = TableCommit::builder() + .ident(table_ident.clone()) + .updates(vec![]) + .requirements(vec![]) + .build(); + let err = catalog + .update_table(table_commit) + .await + .unwrap_err() + .to_string(); + assert_eq!( + err, + "Unexpected => No such table: TableIdent { namespace: NamespaceIdent([\"a\"]), name: \"tbl1\" }" + ); + } + + #[tokio::test] + async fn test_update_table_add_snapshot() { + let warehouse_loc = temp_path(); + let catalog = new_sql_catalog(warehouse_loc.clone()).await; + let namespace_ident = NamespaceIdent::new("a".into()); + create_namespace(&catalog, &namespace_ident).await; + + let table_name = "abc"; + let location = warehouse_loc.clone(); + let table_creation = TableCreation::builder() + .name(table_name.into()) + .location(location.clone()) + .schema(simple_table_schema()) + .build(); + + let expected_table_ident = TableIdent::new(namespace_ident.clone(), table_name.into()); + + assert_table_eq( + &catalog + .create_table(&namespace_ident, table_creation) + .await + .unwrap(), + &expected_table_ident, + &simple_table_schema(), + ); + + let table = catalog.load_table(&expected_table_ident).await.unwrap(); + assert_table_eq(&table, &expected_table_ident, &simple_table_schema()); + + let table_snapshots_iter = table.metadata().snapshots(); + assert_eq!(0, table_snapshots_iter.count()); + + let add_snapshot = Snapshot::builder() + .with_snapshot_id(638933773299822130) + .with_timestamp_ms(1662532818843) + .with_sequence_number(1) + .with_schema_id(1) + .with_manifest_list("/home/iceberg/warehouse/ns/tbl1/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro") + .with_summary(Summary { operation: Operation::Append, additional_properties: HashMap::from_iter(vec![("spark.app.id".to_string(), "local-1662532784305".to_string()), ("added-data-files".to_string(), "4".to_string()), ("added-records".to_string(), "4".to_string()), ("added-files-size".to_string(), "6001".to_string())]) }) + .build(); + + let table_update = TableUpdate::AddSnapshot { + snapshot: add_snapshot, + }; + let requirements = vec![]; + let table_commit = TableCommit::builder() + .ident(expected_table_ident.clone()) + .updates(vec![table_update]) + .requirements(requirements) + .build(); + let table = catalog.update_table(table_commit).await.unwrap(); + let snapshot_vec = table.metadata().snapshots().collect_vec(); + assert_eq!(1, snapshot_vec.len()); + let snapshot = &snapshot_vec[0]; + assert_eq!(snapshot.snapshot_id(), 638933773299822130); + assert_eq!(snapshot.timestamp_ms(), 1662532818843); + assert_eq!(snapshot.sequence_number(), 1); + assert_eq!(snapshot.schema_id().unwrap(), 1); + assert_eq!(snapshot.manifest_list(), "/home/iceberg/warehouse/ns/tbl1/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro"); + assert_eq!(snapshot.summary().operation, Operation::Append); + assert_eq!( + snapshot.summary().additional_properties, + HashMap::from_iter(vec![ + ( + "spark.app.id".to_string(), + "local-1662532784305".to_string() + ), + ("added-data-files".to_string(), "4".to_string()), + ("added-records".to_string(), "4".to_string()), + ("added-files-size".to_string(), "6001".to_string()) + ]) + ); + } + + #[tokio::test] + async fn test_update_table_set_snapshot_ref() { + let warehouse_loc = temp_path(); + let catalog = new_sql_catalog(warehouse_loc.clone()).await; + let namespace_ident = NamespaceIdent::new("a".into()); + create_namespace(&catalog, &namespace_ident).await; + + let table_name = "abc"; + let location = warehouse_loc.clone(); + let table_creation = TableCreation::builder() + .name(table_name.into()) + .location(location.clone()) + .schema(simple_table_schema()) + .build(); + + let expected_table_ident = TableIdent::new(namespace_ident.clone(), table_name.into()); + + assert_table_eq( + &catalog + .create_table(&namespace_ident, table_creation) + .await + .unwrap(), + &expected_table_ident, + &simple_table_schema(), + ); + + let table = catalog.load_table(&expected_table_ident).await.unwrap(); + assert_table_eq(&table, &expected_table_ident, &simple_table_schema()); + + let table_snapshots_iter = table.metadata().snapshots(); + assert_eq!(0, table_snapshots_iter.count()); + + let snapshot_id = 638933773299822130; + let reference = SnapshotReference { + snapshot_id, + retention: SnapshotRetention::Branch { + min_snapshots_to_keep: Some(10), + max_snapshot_age_ms: Some(100), + max_ref_age_ms: Some(200), + }, + }; + let table_update_set_snapshot_ref = TableUpdate::SetSnapshotRef { + ref_name: MAIN_BRANCH.to_string(), + reference: reference.clone(), + }; + + let add_snapshot = Snapshot::builder() + .with_snapshot_id(638933773299822130) + .with_timestamp_ms(1662532818843) + .with_sequence_number(1) + .with_schema_id(1) + .with_manifest_list("/home/iceberg/warehouse/ns/tbl1/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro") + .with_summary(Summary { operation: Operation::Append, additional_properties: HashMap::from_iter(vec![("spark.app.id".to_string(), "local-1662532784305".to_string()), ("added-data-files".to_string(), "4".to_string()), ("added-records".to_string(), "4".to_string()), ("added-files-size".to_string(), "6001".to_string())]) }) + .build(); + + let table_update_add_snapshot = TableUpdate::AddSnapshot { + snapshot: add_snapshot, + }; + + let table_update_opers = vec![table_update_add_snapshot, table_update_set_snapshot_ref]; + + let table_commit = TableCommit::builder() + .ident(expected_table_ident.clone()) + .updates(table_update_opers) + .requirements(vec![]) + .build(); + let table = catalog.update_table(table_commit).await.unwrap(); + let snapshot_vec = table.metadata().snapshots().collect_vec(); + assert_eq!(1, snapshot_vec.len()); + let snapshot = &snapshot_vec[0]; + assert_eq!(snapshot.snapshot_id(), 638933773299822130); + assert_eq!(snapshot.timestamp_ms(), 1662532818843); + assert_eq!(snapshot.sequence_number(), 1); + assert_eq!(snapshot.schema_id().unwrap(), 1); + assert_eq!(snapshot.manifest_list(), "/home/iceberg/warehouse/ns/tbl1/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro"); + assert_eq!(snapshot.summary().operation, Operation::Append); + assert_eq!( + snapshot.summary().additional_properties, + HashMap::from_iter(vec![ + ( + "spark.app.id".to_string(), + "local-1662532784305".to_string() + ), + ("added-data-files".to_string(), "4".to_string()), + ("added-records".to_string(), "4".to_string()), + ("added-files-size".to_string(), "6001".to_string()) + ]) + ); + + let snapshot_refs_map = table.metadata().snapshot_refs(); + assert_eq!(1, snapshot_refs_map.len()); + let snapshot_ref = snapshot_refs_map.get(MAIN_BRANCH).unwrap(); + let expected_snapshot_ref = SnapshotReference { + snapshot_id, + retention: SnapshotRetention::Branch { + min_snapshots_to_keep: Some(10), + max_snapshot_age_ms: Some(100), + max_ref_age_ms: Some(200), + }, + }; + assert_eq!(snapshot_ref, &expected_snapshot_ref); + } } diff --git a/crates/iceberg/src/catalog/mod.rs b/crates/iceberg/src/catalog/mod.rs index bd301ad38..a36896d89 100644 --- a/crates/iceberg/src/catalog/mod.rs +++ b/crates/iceberg/src/catalog/mod.rs @@ -267,7 +267,7 @@ pub struct TableCreation { /// TableCommit represents the commit of a table in the catalog. #[derive(Debug, TypedBuilder)] -#[builder(build_method(vis = "pub(crate)"))] +#[builder(build_method(vis = "pub"))] pub struct TableCommit { /// The table ident. ident: TableIdent, diff --git a/crates/iceberg/src/io/storage_s3.rs b/crates/iceberg/src/io/storage_s3.rs index 60e97ab45..13b200721 100644 --- a/crates/iceberg/src/io/storage_s3.rs +++ b/crates/iceberg/src/io/storage_s3.rs @@ -58,6 +58,8 @@ pub const S3_ASSUME_ROLE_ARN: &str = "client.assume-role.arn"; pub const S3_ASSUME_ROLE_EXTERNAL_ID: &str = "client.assume-role.external-id"; /// Optional session name used to assume an IAM role. pub const S3_ASSUME_ROLE_SESSION_NAME: &str = "client.assume-role.session-name"; +/// If set to true, the S3 configuration will not be loaded from the default locations. +pub const S3_DISABLE_CONFIG_LOAD: &str = "s3.disable_config_load"; /// Parse iceberg props to s3 config. pub(crate) fn s3_config_parse(mut m: HashMap) -> Result { @@ -126,6 +128,12 @@ pub(crate) fn s3_config_parse(mut m: HashMap) -> Result &HashMap { + &self.refs + } + /// Normalize this partition spec. /// /// This is an internal method diff --git a/crates/iceberg/src/spec/table_metadata_builder.rs b/crates/iceberg/src/spec/table_metadata_builder.rs index ed4ab7902..0103ab1e8 100644 --- a/crates/iceberg/src/spec/table_metadata_builder.rs +++ b/crates/iceberg/src/spec/table_metadata_builder.rs @@ -1126,6 +1126,42 @@ impl TableMetadataBuilder { fn highest_sort_order_id(&self) -> Option { self.metadata.sort_orders.keys().max().copied() } + + // /// Set current snapshot ref of table + // pub fn set_snapshot_ref( + // mut self, + // ref_name: String, + // reference: SnapshotReference, + // ) -> Result { + // if let Some(existing_snapshot_ref) = self.0.refs.get(&ref_name) { + // if (*existing_snapshot_ref) == reference { + // return Ok(self); + // } + // } + + // let snapshot_id = reference.snapshot_id; + // let snapshot = self + // .0 + // .snapshot_by_id(snapshot_id) + // .ok_or_else(|| Error::new(ErrorKind::DataInvalid, "Snapshot not found"))?; + // if snapshot.snapshot_id() == reference.snapshot_id { + // self.0.last_updated_ms = snapshot.timestamp_ms(); + // } + + // if ref_name == MAIN_BRANCH { + // self.0.current_snapshot_id = Some(snapshot_id); + // assert_ne!(0, self.0.last_updated_ms); + + // self.0.snapshot_log.push(SnapshotLog { + // timestamp_ms: self.0.last_updated_ms, + // snapshot_id, + // }); + // } + + // self.0.refs.insert(ref_name, reference); + + // Ok(self) + // } } impl From for TableMetadata { From 1a8aee13d26d95c057336ddb7ac12e2d08aef141 Mon Sep 17 00:00:00 2001 From: Li0k Date: Mon, 30 Dec 2024 18:11:56 +0800 Subject: [PATCH 2/2] fix ut --- crates/catalog/sql/src/catalog.rs | 202 ++++++++++-------- .../src/spec/table_metadata_builder.rs | 36 ---- 2 files changed, 108 insertions(+), 130 deletions(-) diff --git a/crates/catalog/sql/src/catalog.rs b/crates/catalog/sql/src/catalog.rs index 783bb9bd9..f92f47429 100644 --- a/crates/catalog/sql/src/catalog.rs +++ b/crates/catalog/sql/src/catalog.rs @@ -24,7 +24,6 @@ use iceberg::spec::{TableMetadata, TableMetadataBuilder}; use iceberg::table::Table; use iceberg::{ Catalog, Error, Namespace, NamespaceIdent, Result, TableCommit, TableCreation, TableIdent, - TableUpdate, }; use sqlx::any::{install_default_drivers, AnyPoolOptions, AnyQueryResult, AnyRow}; use sqlx::{Any, AnyPool, Row, Transaction}; @@ -784,24 +783,11 @@ impl Catalog for SqlCatalog { TableMetadataBuilder::new_from_metadata(table.metadata().clone(), None); for table_update in table_updates { - match table_update { - TableUpdate::AddSnapshot { snapshot } => { - update_table_metadata_builder = - update_table_metadata_builder.add_snapshot(snapshot)?; - } - - TableUpdate::SetSnapshotRef { - ref_name, - reference, - } => { - update_table_metadata_builder = - update_table_metadata_builder.set_ref(&ref_name, reference)?; - } + update_table_metadata_builder = table_update.apply(update_table_metadata_builder)?; + } - _ => { - unreachable!() - } - } + for table_requirement in _requirements { + table_requirement.check(Some(table.metadata()))?; } let new_table_meta_location = metadata_path(table.metadata().location(), Uuid::new_v4()); @@ -855,11 +841,12 @@ mod tests { use iceberg::io::FileIOBuilder; use iceberg::spec::{ BoundPartitionSpec, NestedField, Operation, PrimitiveType, Schema, Snapshot, - SnapshotReference, SnapshotRetention, SortOrder, Summary, Type, MAIN_BRANCH, + SnapshotReference, SnapshotRetention, SortOrder, Type, MAIN_BRANCH, }; use iceberg::table::Table; use iceberg::{ - Catalog, Namespace, NamespaceIdent, TableCommit, TableCreation, TableIdent, TableUpdate, + Catalog, Namespace, NamespaceIdent, TableCommit, TableCreation, TableIdent, + TableRequirement, TableUpdate, }; use itertools::Itertools; use regex::Regex; @@ -1903,17 +1890,23 @@ mod tests { let table_snapshots_iter = table.metadata().snapshots(); assert_eq!(0, table_snapshots_iter.count()); - let add_snapshot = Snapshot::builder() - .with_snapshot_id(638933773299822130) - .with_timestamp_ms(1662532818843) - .with_sequence_number(1) - .with_schema_id(1) - .with_manifest_list("/home/iceberg/warehouse/ns/tbl1/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro") - .with_summary(Summary { operation: Operation::Append, additional_properties: HashMap::from_iter(vec![("spark.app.id".to_string(), "local-1662532784305".to_string()), ("added-data-files".to_string(), "4".to_string()), ("added-records".to_string(), "4".to_string()), ("added-files-size".to_string(), "6001".to_string())]) }) - .build(); + // Add snapshot + let record = r#" + { + "snapshot-id": 3051729675574597004, + "sequence-number": 10, + "timestamp-ms": 9992191116217, + "summary": { + "operation": "append" + }, + "manifest-list": "s3://b/wh/.../s1.avro", + "schema-id": 0 + } + "#; + let snapshot = serde_json::from_str::(record).unwrap(); let table_update = TableUpdate::AddSnapshot { - snapshot: add_snapshot, + snapshot: snapshot.clone(), }; let requirements = vec![]; let table_commit = TableCommit::builder() @@ -1925,24 +1918,43 @@ mod tests { let snapshot_vec = table.metadata().snapshots().collect_vec(); assert_eq!(1, snapshot_vec.len()); let snapshot = &snapshot_vec[0]; - assert_eq!(snapshot.snapshot_id(), 638933773299822130); - assert_eq!(snapshot.timestamp_ms(), 1662532818843); - assert_eq!(snapshot.sequence_number(), 1); - assert_eq!(snapshot.schema_id().unwrap(), 1); - assert_eq!(snapshot.manifest_list(), "/home/iceberg/warehouse/ns/tbl1/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro"); + assert_eq!(snapshot.snapshot_id(), 3051729675574597004); + assert_eq!(snapshot.timestamp_ms(), 9992191116217); + assert_eq!(snapshot.sequence_number(), 10); + assert_eq!(snapshot.schema_id().unwrap(), 0); + assert_eq!(snapshot.manifest_list(), "s3://b/wh/.../s1.avro"); assert_eq!(snapshot.summary().operation, Operation::Append); - assert_eq!( - snapshot.summary().additional_properties, - HashMap::from_iter(vec![ - ( - "spark.app.id".to_string(), - "local-1662532784305".to_string() - ), - ("added-data-files".to_string(), "4".to_string()), - ("added-records".to_string(), "4".to_string()), - ("added-files-size".to_string(), "6001".to_string()) - ]) - ); + assert_eq!(snapshot.summary().additional_properties, HashMap::new()); + + // Add another snapshot + // Add snapshot + let record = r#" + { + "snapshot-id": 3051729675574597005, + "sequence-number": 11, + "timestamp-ms": 9992191117217, + "summary": { + "operation": "append" + }, + "manifest-list": "s3://b/wh/.../s2.avro", + "schema-id": 0 + } + "#; + let snapshot = serde_json::from_str::(record).unwrap(); + let table_update = TableUpdate::AddSnapshot { + snapshot: snapshot.clone(), + }; + let requirement = TableRequirement::RefSnapshotIdMatch { + r#ref: "main".to_string(), + snapshot_id: Some(3051729675574597004), + }; + let requirements = vec![requirement]; + let table_commit = TableCommit::builder() + .ident(expected_table_ident.clone()) + .updates(vec![table_update]) + .requirements(requirements) + .build(); + assert!(catalog.update_table(table_commit).await.is_err()); } #[tokio::test] @@ -1976,73 +1988,75 @@ mod tests { let table_snapshots_iter = table.metadata().snapshots(); assert_eq!(0, table_snapshots_iter.count()); + let table = catalog.load_table(&expected_table_ident).await.unwrap(); + assert_table_eq(&table, &expected_table_ident, &simple_table_schema()); - let snapshot_id = 638933773299822130; - let reference = SnapshotReference { - snapshot_id, - retention: SnapshotRetention::Branch { - min_snapshots_to_keep: Some(10), - max_snapshot_age_ms: Some(100), - max_ref_age_ms: Some(200), - }, - }; - let table_update_set_snapshot_ref = TableUpdate::SetSnapshotRef { - ref_name: MAIN_BRANCH.to_string(), - reference: reference.clone(), - }; + let table_snapshots_iter = table.metadata().snapshots(); + assert_eq!(0, table_snapshots_iter.count()); - let add_snapshot = Snapshot::builder() - .with_snapshot_id(638933773299822130) - .with_timestamp_ms(1662532818843) - .with_sequence_number(1) - .with_schema_id(1) - .with_manifest_list("/home/iceberg/warehouse/ns/tbl1/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro") - .with_summary(Summary { operation: Operation::Append, additional_properties: HashMap::from_iter(vec![("spark.app.id".to_string(), "local-1662532784305".to_string()), ("added-data-files".to_string(), "4".to_string()), ("added-records".to_string(), "4".to_string()), ("added-files-size".to_string(), "6001".to_string())]) }) - .build(); + // Add snapshot + let record = r#" + { + "snapshot-id": 3051729675574597004, + "sequence-number": 10, + "timestamp-ms": 9992191116217, + "summary": { + "operation": "append" + }, + "manifest-list": "s3://b/wh/.../s1.avro", + "schema-id": 0 + } + "#; - let table_update_add_snapshot = TableUpdate::AddSnapshot { - snapshot: add_snapshot, + let snapshot = serde_json::from_str::(record).unwrap(); + let table_update = TableUpdate::AddSnapshot { + snapshot: snapshot.clone(), }; - - let table_update_opers = vec![table_update_add_snapshot, table_update_set_snapshot_ref]; - + let requirements = vec![]; let table_commit = TableCommit::builder() .ident(expected_table_ident.clone()) - .updates(table_update_opers) - .requirements(vec![]) + .updates(vec![table_update]) + .requirements(requirements) .build(); let table = catalog.update_table(table_commit).await.unwrap(); let snapshot_vec = table.metadata().snapshots().collect_vec(); assert_eq!(1, snapshot_vec.len()); let snapshot = &snapshot_vec[0]; - assert_eq!(snapshot.snapshot_id(), 638933773299822130); - assert_eq!(snapshot.timestamp_ms(), 1662532818843); - assert_eq!(snapshot.sequence_number(), 1); - assert_eq!(snapshot.schema_id().unwrap(), 1); - assert_eq!(snapshot.manifest_list(), "/home/iceberg/warehouse/ns/tbl1/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro"); + assert_eq!(snapshot.snapshot_id(), 3051729675574597004); + assert_eq!(snapshot.timestamp_ms(), 9992191116217); + assert_eq!(snapshot.sequence_number(), 10); + assert_eq!(snapshot.schema_id().unwrap(), 0); + assert_eq!(snapshot.manifest_list(), "s3://b/wh/.../s1.avro"); assert_eq!(snapshot.summary().operation, Operation::Append); - assert_eq!( - snapshot.summary().additional_properties, - HashMap::from_iter(vec![ - ( - "spark.app.id".to_string(), - "local-1662532784305".to_string() - ), - ("added-data-files".to_string(), "4".to_string()), - ("added-records".to_string(), "4".to_string()), - ("added-files-size".to_string(), "6001".to_string()) - ]) - ); + assert_eq!(snapshot.summary().additional_properties, HashMap::new()); + + let table_update_set_snapshot_ref = TableUpdate::SetSnapshotRef { + ref_name: MAIN_BRANCH.to_string(), + reference: SnapshotReference { + snapshot_id: snapshot.snapshot_id(), + retention: SnapshotRetention::Branch { + min_snapshots_to_keep: Some(10), + max_snapshot_age_ms: None, + max_ref_age_ms: None, + }, + }, + }; + let table_commit = TableCommit::builder() + .ident(expected_table_ident.clone()) + .updates(vec![table_update_set_snapshot_ref]) + .requirements(vec![]) + .build(); + let table = catalog.update_table(table_commit).await.unwrap(); let snapshot_refs_map = table.metadata().snapshot_refs(); assert_eq!(1, snapshot_refs_map.len()); let snapshot_ref = snapshot_refs_map.get(MAIN_BRANCH).unwrap(); let expected_snapshot_ref = SnapshotReference { - snapshot_id, + snapshot_id: 3051729675574597004, retention: SnapshotRetention::Branch { min_snapshots_to_keep: Some(10), - max_snapshot_age_ms: Some(100), - max_ref_age_ms: Some(200), + max_snapshot_age_ms: None, + max_ref_age_ms: None, }, }; assert_eq!(snapshot_ref, &expected_snapshot_ref); diff --git a/crates/iceberg/src/spec/table_metadata_builder.rs b/crates/iceberg/src/spec/table_metadata_builder.rs index 0103ab1e8..ed4ab7902 100644 --- a/crates/iceberg/src/spec/table_metadata_builder.rs +++ b/crates/iceberg/src/spec/table_metadata_builder.rs @@ -1126,42 +1126,6 @@ impl TableMetadataBuilder { fn highest_sort_order_id(&self) -> Option { self.metadata.sort_orders.keys().max().copied() } - - // /// Set current snapshot ref of table - // pub fn set_snapshot_ref( - // mut self, - // ref_name: String, - // reference: SnapshotReference, - // ) -> Result { - // if let Some(existing_snapshot_ref) = self.0.refs.get(&ref_name) { - // if (*existing_snapshot_ref) == reference { - // return Ok(self); - // } - // } - - // let snapshot_id = reference.snapshot_id; - // let snapshot = self - // .0 - // .snapshot_by_id(snapshot_id) - // .ok_or_else(|| Error::new(ErrorKind::DataInvalid, "Snapshot not found"))?; - // if snapshot.snapshot_id() == reference.snapshot_id { - // self.0.last_updated_ms = snapshot.timestamp_ms(); - // } - - // if ref_name == MAIN_BRANCH { - // self.0.current_snapshot_id = Some(snapshot_id); - // assert_ne!(0, self.0.last_updated_ms); - - // self.0.snapshot_log.push(SnapshotLog { - // timestamp_ms: self.0.last_updated_ms, - // snapshot_id, - // }); - // } - - // self.0.refs.insert(ref_name, reference); - - // Ok(self) - // } } impl From for TableMetadata {