Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add check for schema read compatibility #554

Open
wants to merge 11 commits into
base: main
Choose a base branch
from

Conversation

OussamaSaoudi-db
Copy link
Collaborator

@OussamaSaoudi-db OussamaSaoudi-db commented Nov 29, 2024

What changes are proposed in this pull request?

This PR introduces a function schema.can_read_as(read_schema) for Schema. This checks that for data written with schema schema, whether it can be read with the read_schema. This check is useful for implementing schema evolution checks in CDF.

Closes #523

How was this change tested?

Schema compatibility tests are added that check the following:

  • can_read_as is reflexive
  • adding a nullable column to each the key and value of a map succeeds.
  • changing a map value from nullable to non-nullable fails
  • same schema with different field name case fails
  • changing column type from long to integer fails.
  • Setting nullability from false to true succeeds
  • Setting nullability from true to false fails
  • Adding a nullable column succeeds
  • Adding a non-nullable column fails

Copy link

codecov bot commented Nov 29, 2024

Codecov Report

Attention: Patch coverage is 96.80000% with 8 lines in your changes missing coverage. Please review.

Project coverage is 83.66%. Comparing base (cbb52a4) to head (3d852b0).

Files with missing lines Patch % Lines
kernel/src/schema_compat.rs 96.80% 4 Missing and 4 partials ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main     #554      +/-   ##
==========================================
+ Coverage   83.50%   83.66%   +0.16%     
==========================================
  Files          74       75       +1     
  Lines       16919    17169     +250     
  Branches    16919    17169     +250     
==========================================
+ Hits        14128    14365     +237     
- Misses       2133     2141       +8     
- Partials      658      663       +5     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

);
name_equal && nullability_equal && data_type_equal
}
None => read_field.is_nullable(),
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The None case is a point at which I differ from the delta implementation. I'm not convinced by the code there. If we don't find the read field in the existing schema, then we just ignore it. I think this should only pass if the new field in the read schema is nullable.

I may be missing something tho 🤔

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nullability is... complicated. But I think what you say makes sense -- technically it could be ok for the read field to not be nullable, if the parent is nullable and the parent is null for all rows where the child is null. But if the parent is hard-wired null then we shouldn't be recursing to its children in the first place.

// == read_nullable || !existing_nullable
read_nullable || !existing_nullable
}
fn is_struct_read_compatible(existing: &StructType, read_type: &StructType) -> bool {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was considering using a DeltaResult instead of a bool so we can return better errors about how a schema differs. Thoughts?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that makes sense. Something similar to ValidateColumnMappings in #543, which returns an Err with the offending column name path?

kernel/src/table_changes/schema_compat.rs Outdated Show resolved Hide resolved
kernel/src/table_changes/schema_compat.rs Outdated Show resolved Hide resolved
);
name_equal && nullability_equal && data_type_equal
}
None => read_field.is_nullable(),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nullability is... complicated. But I think what you say makes sense -- technically it could be ok for the read field to not be nullable, if the parent is nullable and the parent is null for all rows where the child is null. But if the parent is hard-wired null then we shouldn't be recursing to its children in the first place.

use crate::schema::{DataType, Schema, StructField, StructType};

fn is_nullability_compatible(existing_nullable: bool, read_nullable: bool) -> bool {
// The case to avoid is when the read_schema is non-nullable and the existing one is nullable.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"avoid" as in "it's illegal to attempt reading a nullable underlying as non-nullable"? (maybe just say that?)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, this method takes two args of the same type, but it is not commutative. Subtly error-prone, and I don't know the best way to make it safe? The arg names are a good start, but rust doesn't allow named args at call sites. And the name of the function does not give any indication of the correct arg order.

Is it worth using a struct just to force named args? Seems clunky. Or maybe we can choose an asymmetric function name of some kind, that indicates which arg comes first?

(whatever solution we choose, we should probably apply it to the is_struct_read_compatible as well)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another possibility: Add these as methods on StructType itself? Then callers would be encouraged to do things like:

table_schema.can_read_as(read_schema)

... but I don't know a good way to do that for the nullability compat check since it's a plain boolean and doesn't always apply to a struct field (can also be array element or map value).

We could define a helper trait for struct/map/array, but that just pushes the problem to the trait impl (and there is only one call site for each type right now).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I think the nullability check could benefit from using a struct. Pushing the impl to a trait would just have the same check in all of them anyway.

Comment on lines 19 to 32
let existing_names: HashSet<String> = existing
.fields()
.map(|field| field.name().clone())
.collect();
let read_names: HashSet<String> = read_type
.fields()
.map(|field| field.name().clone())
.collect();
if !existing_names.is_subset(&read_names) {
return false;
}
read_type
.fields()
.all(|read_field| match existing_fields.get(read_field.name()) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like we should only need to materialize a hash set for one side (build), and just stream the other side's fields past it (probe)?

Also: kernel's StructType::fields member is already an IndexMap so you should have O(1) name lookups without building any additional hash sets.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thing that was missing previously in this code was that they check the fields modulo case-sensitivity. The reasons delta spark does this AFAICT are:

  1. they want to ensure that there are no duplicate fields in a schema that only differ in case sensitivity.
  2. delta-spark typically ignores fields that are in the read schema, but not the current one as I pointed out above. However, they don't want fields that only differ in case to be treated as a new struct field, so they use a case-insensitive map.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The two things I'm most unsure about currently are

  1. How the case sensitive cases are handled
  2. The case where the field is in the read schema but not the existing schema

Would appreciate a second pair of eyes from @zachschuermann and @nicklan as well.

@OussamaSaoudi-db OussamaSaoudi-db changed the title Implement schema compatibility check feat: Add check for schema read compatibility Jan 7, 2025
@OussamaSaoudi-db OussamaSaoudi-db marked this pull request as ready for review January 7, 2025 02:27
@OussamaSaoudi-db
Copy link
Collaborator Author

OussamaSaoudi-db commented Jan 7, 2025

TODO: Add doc comments. I think I want those uncertainties cleared before spending time on docs

Copy link
Collaborator

@nicklan nicklan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

flushing first pass comments. will review more soon

use crate::utils::require;
use crate::{DeltaResult, Error};

struct NullabilityCheck {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: doc comment

}
}

impl StructField {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i need to think about about doing this as multiple impl blocks. it keeps the code nice and separate, but does make it more complex to find where various bits of code are. at the very least can you not import StructField, StructType, or DataType and do impl crate::schema::StructField so it's clear that it's on that struct

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can do that. In terms of the motivation for separate impl blocks, it goes back to Ryan's suggestion to make it clear what order you're passing in arguments to the compatibility check.

See this

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also I removed the import and switched to using Self. lmk if that looks good.

nullable: bool,
read_nullable: bool,
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a benefit to making this a struct and then having an impl? Seems like it'd be easier/cleaner as just a two arg method from the usage i've seen so far.

Copy link
Collaborator Author

@OussamaSaoudi-db OussamaSaoudi-db Jan 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is related to what @scovich was saying here. A nullability check function would not be commutative, and this could easily cause subtle errors with a swap. We can't do impls for primitive type like bool, so bool.can_read_as(other) is off the table (and perhaps isn't a good idea anyway).

I could alternatively do something like this:

// Option A 
struct NullabilityFlag(bool);
struct ReadNullabilityFlag(bool);
fn is_compatible(a: NullabilityFlag, b: ReadNullabilityFlag) {
    let NullabilityFlag(nullable) = a;
    let ReadNullabilityFlag(read_nullable) = b;
    ...
}
is_compatible(NullabilityFlag(self.nullable), ReadNullabilityFlag(read_field.nullable));
// Option B: 
NullabilityFlag(self.nullable).can_read_as(NullabilityFlag(read_field.nullable)) // no need for read version

Looking at these layed out, I think Option B is the winner . What do you think?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update: switched to option B for now

Copy link
Collaborator

@nicklan nicklan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

few more comments.

it's a shame we can't use EnsureDataTypes for this. That expects to compare between kernel and arrow types which doesn't work here. We might want to consider if it's worth converting the arrow types into kernel types before doing that check and then re-writing that check in terms of only kernel types. It'd be a little less efficient, but we wouldn't have two somewhat tricky ways of checking compatibility.

//! compatibility is [`can_read_as`].
//!
//! # Examples
//! ```rust, ignore
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why ignore?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is pub(crate) and doc tests only work with pub functions.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doc tests compile with the doctest feature enabled, so we could potentially make the function pub in that case.

We could also invoke doctests with developer-visibility feature enabled, to pick up that set of functions as well.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like dev-visibility worked!

.collect();
require!(
field_map.len() == self.fields.len(),
Error::generic("Delta tables don't allow field names that only differ by case")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels like it would be a bug in the schema of the table. Should we not catch this case higher up when trying to construct the schema?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed that this is a weird place to put it. I was keeping parity with delta-spark. Perhaps instead we should bake this into a StructType::try_from_string. What do you think?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like clearer and earlier checks, when possible. The fewer places we have to worry about unvalidated objects the better. Spark is definitely not a model to emulate in that regard.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made an issue to track this here.

);

// Check that the field names are a subset of the read fields.
if !field_map.keys().all(|name| read_field_names.contains(name)) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the scala code they have allowMissingColumns, which does allow dropping of columns. I'm not quite clear when you'd want to set that or not though. In the case of CDF though, why isn't it okay for us to have dropped a column? Assuming we're reading as the final schema (which I think is the case), then if there were extra columns when we started, we just... don't return those?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the scala code they have allowMissingColumns, which does allow dropping of columns.

Regarding the flags, I chatted with @zachschuermann and we decided to do the simple implementation in this round without all the configurations that the scala code provides. For CDF specifically, it seems that it only ever uses the forbidTightenNullability flag and none of the others. I also think we may not need the nullability flag because we only use the final schema.

Assuming we're reading as the final schema (which I think is the case),

That's correct

the case of CDF though, why isn't it okay for us to have dropped a column? [...] if there were extra columns when we started, we just... don't return those?

I think the reason is the output CDF of such a table may not make sense. Consider a table with columns a and b. The final schema only has column a. You could imagine all of the changes in the change data feed are made to b, but if you read with a as the only column you get:

{(_change_type: update_preimage,  a: 1),
 (_change_type: update_postimage, a: 1)}

Seems that nothing's changed at all.

Copy link
Collaborator Author

@OussamaSaoudi-db OussamaSaoudi-db Jan 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More generally: I think that if data is written in schema A, and we read it with schema B, there should not be any data loss. Hence A's columns should be a subset of B's columns. Dropping a column is essentially a projection, and they should be explicitly requested by the user/query.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAIK, clients must anyway be prepared to deal with spurious row changes? It can also happen if we don't distinguish copied rows from updated ones. It can also happen if the reader only cares about a subset of columns that didn't happen to change.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@scovich so are you saying that it actually might be acceptable to drop the columns that aren't in the end schema?

Copy link
Collaborator

@scovich scovich Jan 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like it? In my experience most users would prefer spurious changes if the alternative is pipeline failure.

It would be nice to get confirmation from some CDF workload experts tho. If delta-spark drops columns in this case that's probably an indirect confirmation?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So to my understanding, they don't allow columns to be dropped in cdf. Their schema utils function let's you specify that you tolerate dropped columns, but CDF never uses it.

Here are the only call sites for schema compat in CDF 1 & 2. They change forbidTightenNullability, but all other flags are default.

SchemaUtils.isReadCompatible(
  existingSchema = metadata.schema,
  readSchema = readSchemaSnapshot.schema,
  forbidTightenNullability = true)

Note that allowMissingColumns is false by default.

def isReadCompatible(
    existingSchema: StructType,
    readSchema: StructType,
    forbidTightenNullability: Boolean = false,
    allowMissingColumns: Boolean = false,
    allowTypeWidening: Boolean = false,
    newPartitionColumns: Seq[String] = Seq.empty,
    oldPartitionColumns: Seq[String] = Seq.empty): Boolean = {

Here we see that if we do not allow missing columns, then the schema fields should be a subset of the read schema (ie no dropped columns).

if (!allowMissingColumns &&
  !(existingFieldNames.subsetOf(newFields) &&
    isPartitionCompatible(newPartitionColumns, oldPartitionColumns))) {
  // Dropped a column that was present in the DataFrame schema
  return false
}

Note also that CDF doesn't use the partitionColumns parts of the schema comparison.

I'm planing on talking to some of the folks who worked on this in the past, but I believe what I have currently matches the CDF behaviour for delta-spark.

self_map.value_type().can_read_as(read_map.value_type())?;
}
(a, b) => {
// TODO: In the future, we will change this to support type widening.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

check_cast_compat basically does this but for two arrow types. just fyi

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mentioned in this issue: #623

@OussamaSaoudi-db
Copy link
Collaborator Author

@nicklan Ooh good callout that EnsureDataTypes exists. I'll take a closer look at it and see if it matches our needs.

@OussamaSaoudi-db
Copy link
Collaborator Author

@nicklan I made a new issue to handle the duplication with EnsureDataTypes. #629

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Allow CDF scans with schema evolution
3 participants