diff --git a/crates/iceberg/src/lib.rs b/crates/iceberg/src/lib.rs index 1946f35f3..111b8a12f 100644 --- a/crates/iceberg/src/lib.rs +++ b/crates/iceberg/src/lib.rs @@ -73,7 +73,7 @@ mod avro; pub mod io; pub mod spec; -pub mod metadata_scan; +pub mod metadata_table; pub mod scan; pub mod expr; diff --git a/crates/iceberg/src/metadata_scan.rs b/crates/iceberg/src/metadata_table.rs similarity index 76% rename from crates/iceberg/src/metadata_scan.rs rename to crates/iceberg/src/metadata_table.rs index 16604d781..d20a18cd5 100644 --- a/crates/iceberg/src/metadata_scan.rs +++ b/crates/iceberg/src/metadata_table.rs @@ -26,6 +26,7 @@ use arrow_array::types::{Int32Type, Int64Type, Int8Type, TimestampMillisecondTyp use arrow_array::RecordBatch; use arrow_schema::{DataType, Field, Fields, Schema, TimeUnit}; +use crate::spec::Snapshot; use crate::table::Table; use crate::Result; @@ -36,22 +37,27 @@ use crate::Result; /// - /// - #[derive(Debug)] -pub struct MetadataTable(Table); +pub struct MetadataTable<'a>(&'a Table); -impl MetadataTable { +impl<'a> MetadataTable<'a> { /// Creates a new metadata scan. - pub(super) fn new(table: Table) -> Self { + pub(super) fn new(table: &'a Table) -> Self { Self(table) } + /// Return the metadata log entries of the table. + pub fn metadata_log_entries(&self) -> MetadataLogEntriesTable { + MetadataLogEntriesTable { table: self.0 } + } + /// Get the snapshots table. pub fn snapshots(&self) -> SnapshotsTable { - SnapshotsTable { table: &self.0 } + SnapshotsTable { table: self.0 } } /// Get the manifests table. pub fn manifests(&self) -> ManifestsTable { - ManifestsTable { table: &self.0 } + ManifestsTable { table: self.0 } } } @@ -255,6 +261,89 @@ impl<'a> ManifestsTable<'a> { } } +/// Metadata log entries table. +/// +/// Use to inspect the current and historical metadata files in the table. +/// Contains every metadata file and the time it was added. For each metadata +/// file, the table contains information about the latest snapshot at the time. +pub struct MetadataLogEntriesTable<'a> { + table: &'a Table, +} + +impl<'a> MetadataLogEntriesTable<'a> { + /// Return the schema of the metadata log entries table. + pub fn schema(&self) -> Schema { + Schema::new(vec![ + Field::new( + "timestamp", + DataType::Timestamp(TimeUnit::Millisecond, Some("+00:00".into())), + false, + ), + Field::new("file", DataType::Utf8, false), + Field::new("latest_snapshot_id", DataType::Int64, true), + Field::new("latest_schema_id", DataType::Int32, true), + Field::new("latest_sequence_number", DataType::Int64, true), + ]) + } + + /// Scan the metadata log entries table. + pub fn scan(&self) -> Result { + let mut timestamp = + PrimitiveBuilder::::new().with_timezone("+00:00"); + let mut file = StringBuilder::new(); + let mut latest_snapshot_id = PrimitiveBuilder::::new(); + let mut latest_schema_id = PrimitiveBuilder::::new(); + let mut latest_sequence_number = PrimitiveBuilder::::new(); + + let mut append_metadata_log_entry = |timestamp_ms: i64, metadata_file: &str| { + timestamp.append_value(timestamp_ms); + file.append_value(metadata_file); + + let snapshot = self.snapshot_id_as_of_time(timestamp_ms); + latest_snapshot_id.append_option(snapshot.map(|s| s.snapshot_id())); + latest_schema_id.append_option(snapshot.and_then(|s| s.schema_id())); + latest_sequence_number.append_option(snapshot.map(|s| s.sequence_number())); + }; + + for metadata_log_entry in self.table.metadata().metadata_log() { + append_metadata_log_entry( + metadata_log_entry.timestamp_ms, + &metadata_log_entry.metadata_file, + ); + } + + // Include the current metadata location and modification time in the table. This matches + // the Java implementation. Unlike the Java implementation, a current metadata location is + // optional here. In that case, we omit current metadata from the metadata log table. + if let Some(current_metadata_location) = &self.table.metadata_location() { + append_metadata_log_entry( + self.table.metadata().last_updated_ms(), + current_metadata_location, + ); + } + + Ok(RecordBatch::try_new(Arc::new(self.schema()), vec![ + Arc::new(timestamp.finish()), + Arc::new(file.finish()), + Arc::new(latest_snapshot_id.finish()), + Arc::new(latest_schema_id.finish()), + Arc::new(latest_sequence_number.finish()), + ])?) + } + + fn snapshot_id_as_of_time(&self, timestamp_ms_inclusive: i64) -> Option<&Snapshot> { + let table_metadata = self.table.metadata(); + let mut snapshot_id = None; + // The table metadata snapshot log is chronological + for log_entry in table_metadata.history() { + if log_entry.timestamp_ms <= timestamp_ms_inclusive { + snapshot_id = Some(log_entry.snapshot_id); + } + } + snapshot_id.and_then(|id| table_metadata.snapshot_by_id(id).map(|s| s.as_ref())) + } +} + #[cfg(test)] mod tests { use expect_test::{expect, Expect}; @@ -310,6 +399,61 @@ mod tests { )); } + #[test] + fn test_metadata_log_entries_table() { + let table = TableTestFixture::new().table; + let record_batch = table + .metadata_table() + .metadata_log_entries() + .scan() + .unwrap(); + + // Check the current metadata location is included + let current_metadata_location = table.metadata_location().unwrap(); + assert!(record_batch + .column_by_name("file") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap() + .iter() + .any(|location| location.is_some_and(|l| l.eq(current_metadata_location)))); + + check_record_batch( + record_batch, + expect![[r#" + Field { name: "timestamp", data_type: Timestamp(Millisecond, Some("+00:00")), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, + Field { name: "file", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, + Field { name: "latest_snapshot_id", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, + Field { name: "latest_schema_id", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, + Field { name: "latest_sequence_number", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }"#]], + expect![[r#" + timestamp: PrimitiveArray + [ + 1970-01-01T00:25:15.100+00:00, + 2020-10-14T01:22:53.590+00:00, + ], + file: (skipped), + latest_snapshot_id: PrimitiveArray + [ + null, + 3055729675574597004, + ], + latest_schema_id: PrimitiveArray + [ + null, + 1, + ], + latest_sequence_number: PrimitiveArray + [ + null, + 1, + ]"#]], + &["file"], + Some("timestamp"), + ); + } + #[test] fn test_snapshots_table() { let table = TableTestFixture::new().table; diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs index 5a97e74e7..ff3034d52 100644 --- a/crates/iceberg/src/scan.rs +++ b/crates/iceberg/src/scan.rs @@ -1002,7 +1002,13 @@ pub mod tests { let table_location = tmp_dir.path().join("table1"); let manifest_list1_location = table_location.join("metadata/manifests_list_1.avro"); let manifest_list2_location = table_location.join("metadata/manifests_list_2.avro"); + // This is a past metadata location in the metadata log let table_metadata1_location = table_location.join("metadata/v1.json"); + // This is the actual location of current metadata + let template_json_location = format!( + "{}/testdata/example_table_metadata_v2.json", + env!("CARGO_MANIFEST_DIR") + ); let file_io = FileIO::from_path(table_location.as_os_str().to_str().unwrap()) .unwrap() @@ -1010,11 +1016,7 @@ pub mod tests { .unwrap(); let table_metadata = { - let template_json_str = fs::read_to_string(format!( - "{}/testdata/example_table_metadata_v2.json", - env!("CARGO_MANIFEST_DIR") - )) - .unwrap(); + let template_json_str = fs::read_to_string(&template_json_location).unwrap(); let mut context = Context::new(); context.insert("table_location", &table_location); context.insert("manifest_list_1_location", &manifest_list1_location); @@ -1029,7 +1031,7 @@ pub mod tests { .metadata(table_metadata) .identifier(TableIdent::from_strs(["db", "table1"]).unwrap()) .file_io(file_io.clone()) - .metadata_location(table_metadata1_location.as_os_str().to_str().unwrap()) + .metadata_location(template_json_location) .build() .unwrap(); diff --git a/crates/iceberg/src/table.rs b/crates/iceberg/src/table.rs index fa5304855..2a94a8ba3 100644 --- a/crates/iceberg/src/table.rs +++ b/crates/iceberg/src/table.rs @@ -22,7 +22,7 @@ use std::sync::Arc; use crate::arrow::ArrowReaderBuilder; use crate::io::object_cache::ObjectCache; use crate::io::FileIO; -use crate::metadata_scan::MetadataTable; +use crate::metadata_table::MetadataTable; use crate::scan::TableScanBuilder; use crate::spec::{TableMetadata, TableMetadataRef}; use crate::{Error, ErrorKind, Result, TableIdent}; @@ -203,7 +203,7 @@ impl Table { /// Creates a metadata table which provides table-like APIs for inspecting metadata. /// See [`MetadataTable`] for more details. - pub fn metadata_table(self) -> MetadataTable { + pub fn metadata_table(&self) -> MetadataTable<'_> { MetadataTable::new(self) }