Skip to content

Commit

Permalink
feat(datafusion): support metadata tables for Datafusion
Browse files Browse the repository at this point in the history
Signed-off-by: xxchan <[email protected]>
  • Loading branch information
xxchan committed Jan 5, 2025
1 parent e5bdbfc commit d25b534
Show file tree
Hide file tree
Showing 13 changed files with 473 additions and 61 deletions.
5 changes: 5 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ serde_derive = "1"
serde_json = "1"
serde_repr = "0.1.16"
serde_with = "3.4"
strum = "0.26"
tempfile = "3.8"
tokio = { version = "1", default-features = false }
typed-builder = "0.20"
Expand Down
3 changes: 2 additions & 1 deletion crates/iceberg/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -78,15 +78,16 @@ serde_derive = { workspace = true }
serde_json = { workspace = true }
serde_repr = { workspace = true }
serde_with = { workspace = true }
strum = { workspace = true, features = ["derive"] }
tokio = { workspace = true, optional = true }
typed-builder = { workspace = true }
url = { workspace = true }
uuid = { workspace = true }
zstd = { workspace = true }
expect-test = { workspace = true }

[dev-dependencies]
ctor = { workspace = true }
expect-test = { workspace = true }
iceberg-catalog-memory = { workspace = true }
iceberg_test_utils = { path = "../test_utils", features = ["tests"] }
pretty_assertions = { workspace = true }
Expand Down
1 change: 1 addition & 0 deletions crates/iceberg/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ pub mod transform;
mod runtime;

pub mod arrow;
pub mod test_utils;
mod utils;
pub mod writer;

Expand Down
87 changes: 39 additions & 48 deletions crates/iceberg/src/metadata_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,46 @@ use crate::Result;
/// - <https://github.com/apache/iceberg/blob/ac865e334e143dfd9e33011d8cf710b46d91f1e5/core/src/main/java/org/apache/iceberg/MetadataTableType.java#L23-L39>
/// - <https://iceberg.apache.org/docs/latest/spark-queries/#querying-with-sql>
/// - <https://py.iceberg.apache.org/api/#inspecting-tables>
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct MetadataTable(Table);

/// Metadata table type.
#[derive(Debug, Clone, strum::EnumIter)]
pub enum MetadataTableType {
/// [`SnapshotsTable`]
Snapshots,
/// [`ManifestsTable`]
Manifests,
}

impl MetadataTableType {
/// Returns the string representation of the metadata table type.
pub fn as_str(&self) -> &str {
match self {
MetadataTableType::Snapshots => "snapshots",
MetadataTableType::Manifests => "manifests",
}
}

/// Returns all the metadata table types.
pub fn all_types() -> impl Iterator<Item = Self> {
use strum::IntoEnumIterator;
Self::iter()
}
}

impl TryFrom<&str> for MetadataTableType {
type Error = String;

fn try_from(value: &str) -> std::result::Result<Self, String> {
match value {
"snapshots" => Ok(Self::Snapshots),
"manifests" => Ok(Self::Manifests),
_ => Err(format!("invalid metadata table type: {value}")),
}
}
}

impl MetadataTable {
/// Creates a new metadata scan.
pub(super) fn new(table: Table) -> Self {
Expand Down Expand Up @@ -262,53 +299,7 @@ mod tests {

use super::*;
use crate::scan::tests::TableTestFixture;

/// Snapshot testing to check the resulting record batch.
///
/// - `expected_schema/data`: put `expect![[""]]` as a placeholder,
/// and then run test with `UPDATE_EXPECT=1 cargo test` to automatically update the result,
/// or use rust-analyzer (see [video](https://github.com/rust-analyzer/expect-test)).
/// Check the doc of [`expect_test`] for more details.
/// - `ignore_check_columns`: Some columns are not stable, so we can skip them.
/// - `sort_column`: The order of the data might be non-deterministic, so we can sort it by a column.
fn check_record_batch(
record_batch: RecordBatch,
expected_schema: Expect,
expected_data: Expect,
ignore_check_columns: &[&str],
sort_column: Option<&str>,
) {
let mut columns = record_batch.columns().to_vec();
if let Some(sort_column) = sort_column {
let column = record_batch.column_by_name(sort_column).unwrap();
let indices = arrow_ord::sort::sort_to_indices(column, None, None).unwrap();
columns = columns
.iter()
.map(|column| arrow_select::take::take(column.as_ref(), &indices, None).unwrap())
.collect_vec();
}

expected_schema.assert_eq(&format!(
"{}",
record_batch.schema().fields().iter().format(",\n")
));
expected_data.assert_eq(&format!(
"{}",
record_batch
.schema()
.fields()
.iter()
.zip_eq(columns)
.map(|(field, column)| {
if ignore_check_columns.contains(&field.name().as_str()) {
format!("{}: (skipped)", field.name())
} else {
format!("{}: {:?}", field.name(), column)
}
})
.format(",\n")
));
}
use crate::test_utils::check_record_batch;

#[test]
fn test_snapshots_table() {
Expand Down
71 changes: 71 additions & 0 deletions crates/iceberg/src/test_utils.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Test utilities.
//! This module is pub just for internal testing.
//! It is subject to change and is not intended to be used by external users.
use arrow_array::RecordBatch;
use expect_test::Expect;
use itertools::Itertools;

/// Snapshot testing to check the resulting record batch.
///
/// - `expected_schema/data`: put `expect![[""]]` as a placeholder,
/// and then run test with `UPDATE_EXPECT=1 cargo test` to automatically update the result,
/// or use rust-analyzer (see [video](https://github.com/rust-analyzer/expect-test)).
/// Check the doc of [`expect_test`] for more details.
/// - `ignore_check_columns`: Some columns are not stable, so we can skip them.
/// - `sort_column`: The order of the data might be non-deterministic, so we can sort it by a column.
pub fn check_record_batch(
record_batch: RecordBatch,
expected_schema: Expect,
expected_data: Expect,
ignore_check_columns: &[&str],
sort_column: Option<&str>,
) {
let mut columns = record_batch.columns().to_vec();
if let Some(sort_column) = sort_column {
let column = record_batch.column_by_name(sort_column).unwrap();
let indices = arrow_ord::sort::sort_to_indices(column, None, None).unwrap();
columns = columns
.iter()
.map(|column| arrow_select::take::take(column.as_ref(), &indices, None).unwrap())
.collect_vec();
}

expected_schema.assert_eq(&format!(
"{}",
record_batch.schema().fields().iter().format(",\n")
));
expected_data.assert_eq(&format!(
"{}",
record_batch
.schema()
.fields()
.iter()
.zip_eq(columns)
.map(|(field, column)| {
if ignore_check_columns.contains(&field.name().as_str()) {
format!("{}: (skipped)", field.name())
} else {
format!("{}: {:?}", field.name(), column)
}
})
.format(",\n")
));
}
1 change: 1 addition & 0 deletions crates/integrations/datafusion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,5 +37,6 @@ iceberg = { workspace = true }
tokio = { workspace = true }

[dev-dependencies]
expect-test = { workspace = true }
iceberg-catalog-memory = { workspace = true }
tempfile = { workspace = true }
91 changes: 91 additions & 0 deletions crates/integrations/datafusion/src/physical_plan/metadata_scan.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use datafusion::catalog::TableProvider;
use datafusion::physical_expr::EquivalenceProperties;
use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
use datafusion::physical_plan::{DisplayAs, ExecutionPlan, Partitioning, PlanProperties};

use crate::metadata_table::IcebergMetadataTableProvider;

#[derive(Debug)]
pub struct IcebergMetadataScan {
provider: IcebergMetadataTableProvider,
properties: PlanProperties,
}

impl IcebergMetadataScan {
pub fn new(provider: IcebergMetadataTableProvider) -> Self {
let properties = PlanProperties::new(
EquivalenceProperties::new(provider.schema()),
Partitioning::UnknownPartitioning(1),
EmissionType::Incremental,
Boundedness::Bounded,
);
Self {
provider,
properties,
}
}
}

impl DisplayAs for IcebergMetadataScan {
fn fmt_as(
&self,
_t: datafusion::physical_plan::DisplayFormatType,
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
write!(f, "IcebergMetadataScan")
}
}

impl ExecutionPlan for IcebergMetadataScan {
fn name(&self) -> &str {
"IcebergMetadataScan"
}

fn as_any(&self) -> &dyn std::any::Any {
self
}

fn properties(&self) -> &PlanProperties {
&self.properties
}

fn children(&self) -> Vec<&std::sync::Arc<dyn ExecutionPlan>> {
vec![]
}

fn with_new_children(
self: std::sync::Arc<Self>,
_children: Vec<std::sync::Arc<dyn ExecutionPlan>>,
) -> datafusion::error::Result<std::sync::Arc<dyn ExecutionPlan>> {
Ok(self)
}

fn execute(
&self,
_partition: usize,
_context: std::sync::Arc<datafusion::execution::TaskContext>,
) -> datafusion::error::Result<datafusion::execution::SendableRecordBatchStream> {
let batch_fut = self.provider.clone().scan();
let schema = self.provider.schema();
let stream = futures::stream::once(batch_fut);
Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
}
}
1 change: 1 addition & 0 deletions crates/integrations/datafusion/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,5 @@
// under the License.

pub(crate) mod expr_to_predicate;
pub(crate) mod metadata_scan;
pub(crate) mod scan;
Loading

0 comments on commit d25b534

Please sign in to comment.