Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Basic Transaction Support for DeltaCAT Durable Storage Model w/o Isolation #442

Merged
merged 15 commits into from
Jan 14, 2025

Conversation

pdames
Copy link
Member

@pdames pdames commented Jan 9, 2025

Summary

Adds support for multi-namespace/multi-table atomic transactions that create, update, rename, and/or replace catalog artifacts. Strong transaction isolation (e.g., via serializable and/or snapshot-level isolation) is not yet supported, and will come in a subsequent PR. The catalog transaction log currently provides a minimal listing of successfully committed transaction IDs, with no O(1) read access to corresponding transaction operation details (also to be added in a subsequent PR).

Rationale

This is part of the core change set required to support https://github.com/ray-project/deltacat/milestone/4 via (1) compaction on an open, durable DeltaCAT catalog metadata format, (2) synchronization of DeltaCAT datasets to Iceberg/Hudi/Delta formats via lightweight metadata translation.

Changes

Changes are focused on DeltaCAT's internal storage model (i.e. deltacat/storage/model), esp. the DeltaCAT 2.0 durable storage bindings introduced via metafile.py.

Impact

These changes attempt to preserve backwards compatibility with the existing DeltaCAT 1.X compactor.

Testing

Unit tests (make test).

Regression Risk

If this is a bugfix, assess the risk of regression caused by this fix and steps taken to mitigate it.

Checklist

  • Unit tests covering the changes have been added

    • If this is a bugfix, regression tests have been added
  • E2E testing has been performed

@pdames pdames requested a review from flliver January 9, 2025 21:46
@@ -244,6 +254,13 @@ def stream_id(self) -> Optional[str]:
return delta_locator.stream_id
return None

@property
def stream_format(self) -> Optional[str]:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not have this return Optional[StreamFormat] given you declare the enum in types.py?

Copy link
Member Author

@pdames pdames Jan 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need to revisit this again, but I think this was just a concession on backwards compatibility with DeltaCAT 1.X. However, we may be able to update this in a way that still preserves backwards compatibility - it may just expand the corresponding scope of changes a bit.

@@ -265,6 +282,65 @@ def stream_position(self) -> Optional[int]:
return delta_locator.stream_position
return None

def to_serializable(self) -> Delta:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels non-standard to me. A pattern I've seen more is that the serialize method will ignore mutable/internal fields, e.g. jackson JsonIgnore annotation. Just looking at this class - to_serializable returns a serializable object, but I'm not sure how I'm expected to perform the serialization (pickle? json). It might help to add pydoc here.

To_serializable and from_serializable are also asymmetric in an unexpected way in that you can't round trip using them. Maybe from_serializable could be renamed to deserialize, or you could explicitly add a serialized function

Copy link
Member Author

@pdames pdames Jan 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I had been thinking about potential new names for these methods, as to_serializable() has expanded to be a more generic prepare_for_write()/before_write() method, which may include validating the object before writing it, removing in-memory references that shouldn't be persisted to disk, changing object formats to ensure they're serializable via msgpack, etc.), etc. Likewise, from_serializable() has also expanded to be more of a generic after_read() method which can restore in-memory object formats (e.g., deserialize native Arrow schemas from bytes) but also perform any post-read validations.

Regarding symmetry, the test_metafile_io() tests include asserts that all write() and read() invokes against serialized/deserialized metafiles remain lossless. The two known exceptions to this using msgpack are seen at the bottom of test_python_type_serde(), which shows that (1) tuples becomes lists, and (2) bytearray objects become bytes.

So, if your class requires a tuple to remain a tuple or bytearray to remain bytearray post-read, then this is something you'd need to put in your from_serializable() method today.

Regardless, the above seems like useful behavior to document outside of just the test cases as the expected behavior of these methods matures.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, naming/functionality was odd/non-standard here and consider changing it (doesn't have to be this CR). That said, I asked GPT about it and it seemed to think it was fine even when prodded, so if it's good enough for our AI overlords it's probably good enough to ship ;)

DEFAULT_PATH_SEPARATOR = "/"


class LocatorName:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having not read the full CR - did you consider some system in which all objects have an immutable guid and an optional user defined name? This might help to deal with conflicts like moving a table to a namespace which contains a conflicting name, or act as a primitive to do renames.

If I could go back in time and re-design Andes, I would give all tables guids so that renaming is less invasive

Copy link
Member Author

@pdames pdames Jan 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep - that's exactly what we have here. You can see some of the test_metafile_io.py rename tests to see how this works in practice but, if an object has a mutable name (e.g., Table and Namespace), then it will have a separate file written to map it's mutable locator name to the underlying metafile's immutable id (which is a UUID). If you rename the object, then this UUID will stay the same, the mapping from the old name to that UUID will be marked as deleted, and a mapping from the new name back to that UUID will be created.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One other thing I'll add is that, especially with the recent introduction of Locator Aliases and corresponding alias names in my last commit, the relationship between all of these different mutable/immutable locators, IDs, and names is becoming even more confusing and deserves some more formal documentation both in code and in a corresponding specification.

I think the order of operations here will roughly be (1) implement the concepts in code and write a bunch of tests to ensure they all work as intended, (2) document the code thoroughly and discuss potential renames/code doc updates to assist existing DeltaCAT internal developers, (3) write an external specification doc to solidify these concepts for external users and developers.


class Locator:
"""
Creates a globally unique reference to any named catalog object. Locators
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So - this is basically a URN, but maybe without elements like scheme?

You could just call these URNs and make them compatible with the URN spec by making strings like: `urn:deltacat:delta-{deltaId}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even if you aren't using the URN spec, it would be helpful to describe the specification of the locator strings. Or, if that specification is truly specific to each object type and you impose no constraints, that is relevant to document too

Copy link
Member Author

@pdames pdames Jan 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose it could be thought of like a scheme-less URN. A Locator basically just says that, if I have some filesystem that knows how to read a given catalog root path, then you should be able to create a reference to any object in the catalog using its unique hex digest directly off of that root. It's kind of like tiny URL for the catalog, such that every cataloged object (from Namespace to Delta) remains discoverable in O(1) directly from a reference stored at {catalog_root}/{object_digest}.

@property
def name(self) -> LocatorName:
"""
Returns the name of this locator.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name of the locator is the type of object, like delta or stream? IMO name is a bit confusing since I might think that a locator name is the canonical string

Copy link
Member Author

@pdames pdames Jan 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name for a Delta would be a single-part name just containing its stream position, while the name of a Partition is a tuple of its partition values and partition ID (since identical partition values may exist in multiple different partition schemes, just matching partition values alone may result in incorrect partition pruning).

But your initial thought is pretty close, since the canonical string representation is just {parent_object_digest}/{this_object_name} (i.e., name provides a relative identifier that can be used to locate the object within all siblings of the same parent, while canonical_string provides a catalog-global identifier that can be used to find the object among all objects registered with the catalog).

Open for naming suggestions to help clarify these relationships though.

@@ -83,19 +136,46 @@ def of(
txn_type: Optional[TransactionType],
txn_operations: Optional[TransactionOperationList],
) -> Transaction:
operation_types = set([op.type for op in txn_operations])
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

without being familiar with this code base, it's unclear what the difference between TransactionType and TransactionOperationType is. It looks like TransactionOperation type is a higher level classification and TransactionType has additional breakouts like UPDATE can be OVERWRITE or RESTATE or ALTER?

You should modifying the TransactionType enum to implement this mapping explicitly. This will help the enums be self documenting and avoid implicitly defining the mapping code in this unrelated class

Copy link
Member Author

@pdames pdames Jan 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm currently wondering whether I want to hang onto TransactionType long-term or not, but I think I like the idea of updating the enum to put these rules in place if we keep it. For context, this was originally created thinking that we need something similar to Iceberg's snapshot operation type: https://iceberg.apache.org/spec/#snapshots. However, I'm not entirely sure if this is true, and have never been entirely fond of the implicit trust placed in the Snapshot Operation Type as a concept.

TransactionOperationType is much more definitive than TransactionType, since it defines the exact type of operation that will be run on any registered catalog metadata file (update, create, or delete).

However, we can't capture "intent" of the overall transaction using just these three types, so TransactionType comes in to try to fill that gap by saying "this series of updates, creates, deletes, etc." is intended to just rewrite the exact same data in a different way (e.g., creating a read-optimized version of a table or partition via compaction), so we'll telegraph that to future readers/writers by saying that this a RESTATE transaction.

However, outside of RESTATE, all other transaction types can be derived automatically, so I don't really need someone to tell me the other transaction types - I can derive them automatically during transaction fulfillment, and record them in the transaction log at a finer granularity (e.g., I see that transaction A replaced table "foo", renamed namespace "bar", and created delta "4").

So one possibility is that TransactionType just gets reduced down to a boolean flag to indicate whether the transaction is just restating existing data or not. Even then, what are the implications of a RESTATE transaction? Is it just to inform users? Is it to provide more intelligent choices about when we isolate or serve concurrent writes or reads? Something else?

We're also putting a lot of trust in setting that flag correctly, and at what point does a restatement start to become an update, anyways (e.g., if compaction introduces a side-effect like a rounded decimal value, is it still valid to classify it as RESTATE, and is the compaction author even aware of this side-effect)?

Open for thoughts here.

)
for write_path in all_write_paths:
path, fs = Metafile.filesystem(write_path, filesystem)
fs.delete_file(path)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do you indicate a failure in cleaning up the failed transaction?

Copy link
Member Author

@pdames pdames Jan 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, a subsequent failure here would be rethrown and cleanup would fail, but the system would continue to function normally even if we don't cleanup failed transactions (which would be a good additional test case to add!).

The thought here is that this cleanup code should succeed often enough that garbage left behind by failed transactions doesn't become an immediate problem, but ultimately I think we'll need a separate out-of-band garbage collection job/daemon that crawls the catalog to ensure that any files we failed to cleanup here are cleaned up later.

A tangential thought I had while writing this - do we think there's any value in keeping all attempted metafile updates associated with failed transactions in a type of "dead transaction" log (for at least some period of time)?

"""

@staticmethod
def current(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would be nice to add pydoc summarizing how the logic works given the length/complexity

filesystem: pyarrow.fs.FileSystem,
extension: str = METAFILE_EXT,
txn_log_dir: Optional[str] = None,
) -> MetafileCommitInfo:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be Optional[MetafileCommitInfo] in case iteration shops?

Copy link
Member Author

@pdames pdames Jan 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the current return type is right since, right now, this method always either returns MetafileCommitInfo or raises an Exception, so None isn't a possibility (unless I'm missing something).

A missing commit is currently telegraphed by setting MetafileCommitInfo.revision = 0 and setting all other MetafileCommitInfo values to None though, which should be documented (and we should probably have a helper method to identify this special MetafileCommitInfo object as representing "no current commit").

return mci

@staticmethod
def next(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you consider using an iterator here? Not sure how that would play with the statelessness of this class, I guess you could have a static iter method which starts at current and takes an optional param of txn_id

Copy link
Member Author

@pdames pdames Jan 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, that's an interesting thought. I think that the current() and next() method naming may also make this class seem more related to an iterable at first glance than it actually is, since I don't currently intend for someone to use it to iterate over all commits of a given metafile.

The real intent of this class right now is to simply "find the latest committed metafile at location X" and "tell me what should be the next metafile commit revision at location X". Thus, a writer is only expected to call the next() method once during a given metadata file update/create/delete just to figure the next file name to assign (and calling next() again would just continue returning the same object until a new one is committed). So, maybe these methods should just be given different names?

Still, an actual iterable over all prior commits of a given metafile is something we'll probably need later, and this class would likely both serve as a starting point for implementing that iterable, and may also depend on it to continue serving these same methods.

Comment on lines +301 to +318
# TODO(pdames): Lazily restore table locator on 1st property get.
# Cache Metafile ID <-> Table/Namespace-Name map at Catalog Init, then
# swap only Metafile IDs with Names here.
if self.table_locator and self.table_locator.table_name == self.id:
parent_rev_dir_path = Metafile._parent_metafile_rev_dir_path(
base_metafile_path=path,
parent_number=4,
)
txn_log_dir = posixpath.join(
posixpath.dirname(
posixpath.dirname(
posixpath.dirname(parent_rev_dir_path),
)
),
TXN_DIR_NAME,
)
table = Table.read(
MetafileCommitInfo.current(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These 'cd ..' via posixpath.dirname and parent_number=4 are somewhat obtuse. If you're doing these alot, maybe we should have a dir_handler class that's a bit less obtuse.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, some inelegant code paths like this will need to be refactored going forward. Areas for improvements include (1) introducing either helper methods or globally available variables for the catalog root path (since that's what we're traversing back to here and in other equivalent metafile methods), and (2) ensuring that every delta/partition/stream/table-version metafile read doesn't also result in reading its parent table and namespace file just to resolve their current names.

@@ -265,6 +282,65 @@ def stream_position(self) -> Optional[int]:
return delta_locator.stream_position
return None

def to_serializable(self) -> Delta:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, naming/functionality was odd/non-standard here and consider changing it (doesn't have to be this CR). That said, I asked GPT about it and it seemed to think it was fine even when prodded, so if it's good enough for our AI overlords it's probably good enough to ship ;)

Comment on lines +41 to +55
# the transaction alters existing data
# (even if it also appends data)
# conflicts with other alters/overwrites/restates/deletes fail
ALTER = "alter"
# the transaction overwrites existing data
# (even if it also appends or alters data)
# conflicts with other alters/overwrites/restates/deletes fail
OVERWRITE = "overwrite"
# the transaction restates existing data with a new layout
# (even if it appends, alters, or overwrites data to do so)
# conflicts with other alters/overwrites/restates/deletes fail
RESTATE = "restate"
# the transaction deletes existing data
# (even if it also appends, alters, overwrites, or restates data)
# conflicts with other alters/overwrites/restates/deletes fail
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't you want more extensive tests for all these transaction types? I don't see them in the PR.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added some more extensive coverage in the latest code I just pushed but, as mentioned above, I'm also considering either just getting rid of these altogether, or deriving them automatically from the individual transaction operation types and input metafiles they're operating against.

@pdames pdames merged commit c485b6b into ray-project:2.0 Jan 14, 2025
2 of 3 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants