Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Support metadata table "Metadata Log Entries" #846

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/iceberg/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ mod avro;
pub mod io;
pub mod spec;

pub mod metadata_scan;
pub mod metadata_table;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Follow-up to this comment: #822 (comment)

Don't need to do in this PR. Up to you.

pub mod scan;

pub mod expr;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use arrow_array::types::{Int32Type, Int64Type, Int8Type, TimestampMillisecondTyp
use arrow_array::RecordBatch;
use arrow_schema::{DataType, Field, Fields, Schema, TimeUnit};

use crate::spec::Snapshot;
use crate::table::Table;
use crate::Result;

Expand All @@ -36,22 +37,27 @@ use crate::Result;
/// - <https://iceberg.apache.org/docs/latest/spark-queries/#querying-with-sql>
/// - <https://py.iceberg.apache.org/api/#inspecting-tables>
#[derive(Debug)]
pub struct MetadataTable(Table);
pub struct MetadataTable<'a>(&'a Table);

impl MetadataTable {
impl<'a> MetadataTable<'a> {
/// Creates a new metadata scan.
pub(super) fn new(table: Table) -> Self {
pub(super) fn new(table: &'a Table) -> Self {
Self(table)
}

/// Return the metadata log entries of the table.
pub fn metadata_log_entries(&self) -> MetadataLogEntriesTable {
MetadataLogEntriesTable { table: self.0 }
}

/// Get the snapshots table.
pub fn snapshots(&self) -> SnapshotsTable {
SnapshotsTable { table: &self.0 }
SnapshotsTable { table: self.0 }
}

/// Get the manifests table.
pub fn manifests(&self) -> ManifestsTable {
ManifestsTable { table: &self.0 }
ManifestsTable { table: self.0 }
}
}

Expand Down Expand Up @@ -255,6 +261,89 @@ impl<'a> ManifestsTable<'a> {
}
}

/// Metadata log entries table.
///
/// Use to inspect the current and historical metadata files in the table.
/// Contains every metadata file and the time it was added. For each metadata
/// file, the table contains information about the latest snapshot at the time.
pub struct MetadataLogEntriesTable<'a> {
table: &'a Table,
}

impl<'a> MetadataLogEntriesTable<'a> {
/// Return the schema of the metadata log entries table.
pub fn schema(&self) -> Schema {
Schema::new(vec![
Field::new(
"timestamp",
DataType::Timestamp(TimeUnit::Millisecond, Some("+00:00".into())),
false,
),
Field::new("file", DataType::Utf8, false),
Field::new("latest_snapshot_id", DataType::Int64, true),
Field::new("latest_schema_id", DataType::Int32, true),
Field::new("latest_sequence_number", DataType::Int64, true),
])
}

/// Scan the metadata log entries table.
pub fn scan(&self) -> Result<RecordBatch> {
let mut timestamp =
PrimitiveBuilder::<TimestampMillisecondType>::new().with_timezone("+00:00");
let mut file = StringBuilder::new();
let mut latest_snapshot_id = PrimitiveBuilder::<Int64Type>::new();
let mut latest_schema_id = PrimitiveBuilder::<Int32Type>::new();
let mut latest_sequence_number = PrimitiveBuilder::<Int64Type>::new();

let mut append_metadata_log_entry = |timestamp_ms: i64, metadata_file: &str| {
timestamp.append_value(timestamp_ms);
file.append_value(metadata_file);

let snapshot = self.snapshot_id_as_of_time(timestamp_ms);
latest_snapshot_id.append_option(snapshot.map(|s| s.snapshot_id()));
latest_schema_id.append_option(snapshot.and_then(|s| s.schema_id()));
latest_sequence_number.append_option(snapshot.map(|s| s.sequence_number()));
};

for metadata_log_entry in self.table.metadata().metadata_log() {
append_metadata_log_entry(
metadata_log_entry.timestamp_ms,
&metadata_log_entry.metadata_file,
);
}

// Include the current metadata location and modification time in the table. This matches
// the Java implementation. Unlike the Java implementation, a current metadata location is
// optional here. In that case, we omit current metadata from the metadata log table.
if let Some(current_metadata_location) = &self.table.metadata_location() {
append_metadata_log_entry(
self.table.metadata().last_updated_ms(),
current_metadata_location,
);
}

Ok(RecordBatch::try_new(Arc::new(self.schema()), vec![
Arc::new(timestamp.finish()),
Arc::new(file.finish()),
Arc::new(latest_snapshot_id.finish()),
Arc::new(latest_schema_id.finish()),
Arc::new(latest_sequence_number.finish()),
])?)
}

fn snapshot_id_as_of_time(&self, timestamp_ms_inclusive: i64) -> Option<&Snapshot> {
let table_metadata = self.table.metadata();
let mut snapshot_id = None;
// The table metadata snapshot log is chronological
for log_entry in table_metadata.history() {
if log_entry.timestamp_ms <= timestamp_ms_inclusive {
snapshot_id = Some(log_entry.snapshot_id);
}
}
snapshot_id.and_then(|id| table_metadata.snapshot_by_id(id).map(|s| s.as_ref()))
}
}

#[cfg(test)]
mod tests {
use expect_test::{expect, Expect};
Expand Down Expand Up @@ -310,6 +399,61 @@ mod tests {
));
}

#[test]
fn test_metadata_log_entries_table() {
let table = TableTestFixture::new().table;
let record_batch = table
.metadata_table()
.metadata_log_entries()
.scan()
.unwrap();

// Check the current metadata location is included
let current_metadata_location = table.metadata_location().unwrap();
assert!(record_batch
.column_by_name("file")
.unwrap()
.as_any()
.downcast_ref::<arrow_array::StringArray>()
.unwrap()
.iter()
.any(|location| location.is_some_and(|l| l.eq(current_metadata_location))));

check_record_batch(
record_batch,
expect![[r#"
Field { name: "timestamp", data_type: Timestamp(Millisecond, Some("+00:00")), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
Field { name: "file", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} },
Field { name: "latest_snapshot_id", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} },
Field { name: "latest_schema_id", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} },
Field { name: "latest_sequence_number", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }"#]],
expect![[r#"
timestamp: PrimitiveArray<Timestamp(Millisecond, Some("+00:00"))>
[
1970-01-01T00:25:15.100+00:00,
2020-10-14T01:22:53.590+00:00,
],
file: (skipped),
latest_snapshot_id: PrimitiveArray<Int64>
[
null,
3055729675574597004,
],
latest_schema_id: PrimitiveArray<Int32>
[
null,
1,
],
latest_sequence_number: PrimitiveArray<Int64>
[
null,
1,
]"#]],
&["file"],
Some("timestamp"),
);
}

#[test]
fn test_snapshots_table() {
let table = TableTestFixture::new().table;
Expand Down
14 changes: 8 additions & 6 deletions crates/iceberg/src/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1002,19 +1002,21 @@ pub mod tests {
let table_location = tmp_dir.path().join("table1");
let manifest_list1_location = table_location.join("metadata/manifests_list_1.avro");
let manifest_list2_location = table_location.join("metadata/manifests_list_2.avro");
// This is a past metadata location in the metadata log
let table_metadata1_location = table_location.join("metadata/v1.json");
// This is the actual location of current metadata
let template_json_location = format!(
"{}/testdata/example_table_metadata_v2.json",
env!("CARGO_MANIFEST_DIR")
);

let file_io = FileIO::from_path(table_location.as_os_str().to_str().unwrap())
.unwrap()
.build()
.unwrap();

let table_metadata = {
let template_json_str = fs::read_to_string(format!(
"{}/testdata/example_table_metadata_v2.json",
env!("CARGO_MANIFEST_DIR")
))
.unwrap();
let template_json_str = fs::read_to_string(&template_json_location).unwrap();
let mut context = Context::new();
context.insert("table_location", &table_location);
context.insert("manifest_list_1_location", &manifest_list1_location);
Expand All @@ -1029,7 +1031,7 @@ pub mod tests {
.metadata(table_metadata)
.identifier(TableIdent::from_strs(["db", "table1"]).unwrap())
.file_io(file_io.clone())
.metadata_location(table_metadata1_location.as_os_str().to_str().unwrap())
.metadata_location(template_json_location)
Comment on lines -1032 to +1034
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before this change, the location in the metadata log and the current location would both be ../metadata/v1.json.

I wanted to have a distinction so we can assert that metadata_log_entries includes the current metadata location, even if not in the metadata log.

.build()
.unwrap();

Expand Down
4 changes: 2 additions & 2 deletions crates/iceberg/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use std::sync::Arc;
use crate::arrow::ArrowReaderBuilder;
use crate::io::object_cache::ObjectCache;
use crate::io::FileIO;
use crate::metadata_scan::MetadataTable;
use crate::metadata_table::MetadataTable;
use crate::scan::TableScanBuilder;
use crate::spec::{TableMetadata, TableMetadataRef};
use crate::{Error, ErrorKind, Result, TableIdent};
Expand Down Expand Up @@ -203,7 +203,7 @@ impl Table {

/// Creates a metadata table which provides table-like APIs for inspecting metadata.
/// See [`MetadataTable`] for more details.
pub fn metadata_table(self) -> MetadataTable {
pub fn metadata_table(&self) -> MetadataTable<'_> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Follow-up to this comment: #822 (comment)

Also not necessary to do in this PR.

MetadataTable::new(self)
}

Expand Down
Loading