Skip to content

Commit

Permalink
[Spark] Fast Drop Feature Command (#3867)
Browse files Browse the repository at this point in the history
<!--
Thanks for sending a pull request!  Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md
2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP]
Your PR title ...'.
  3. Be sure to keep the PR description updated to reflect all changes.
  4. Please write your PR title to summarize what this PR proposes.
5. If possible, provide a concise example to reproduce the issue for a
faster review.
6. If applicable, include the corresponding issue number in the PR title
and link it in the body.
-->

#### Which Delta project/connector is this regarding?
<!--
Please add the component selected below to the beginning of the pull
request title
For example: [Spark] Title of my pull request
-->

- [x] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description

<!--
- Describe what this PR changes.
- Describe why we need the change.
 
If this PR resolves an issue be sure to include "Resolves #XXX" to
correctly link and close the issue upon merge.
-->

This is base PR for Fast Drop feature. This is a new implementation of
the DROP FEATURE command that requires no waiting time and no history
truncation.

The main difficulty when dropping a feature is that after the operation
is complete the history of the table still contains traces of the
feature. This may cause the following problems:

1. Reconstructing the state of the latest version may require replaying
log records prior to feature removal. Log replay is based on
checkpoints, an auxiliary data structure, which is used by clients as a
starting point for replaying history. Any actions before the checkpoint
do not need to be replayed. However, checkpoints are not permanent and
may be deleted any time.
2. Clients may create checkpoints in historical versions when do not
support the required features.

The proposed solution is `CheckpointProtectionTableFeature`. This is a
new writer feature that ensures that the entire history until a certain
table version, V, can only be cleaned up in its entirety or not at all.
Alternatively, the writer can delete commits and associated checkpoints
up to any version (less than V) as long as it validates against all
protocols included in the commits/checkpoints planing to remove. We
protect against the anomalies above as follows:

- All checkpoints before the transition table version are protected.
This prevents anomaly (1) by turning checkpoints into reliable barriers
that can hide unsupported log records behind them.
- Because log cleanup is disabled for the older versions, this also
removes the only reason why writers would create new checkpoints,
preventing anomaly (2).


This still uses a writer feature, but is a step forward compared to
previous solutions because it allows the table to be readable by older
clients immediately, instead of after 24 hours. Compatibility with older
writers can subsequently be achieved by truncating the history after a
24-hour waiting period.

## How was this patch tested?

<!--
If tests were added, say they were added here. Please make sure to test
the changes thoroughly including negative and positive cases if
possible.
If the changes were tested in any way other than unit tests, please
clarify how you tested step by step (ideally copy and paste-able, so
that other reviewers can test and check, and descendants can verify in
the future).
If the changes were not tested, please explain why.
-->

Added `DeltaFastDropFeatureSuite` as well as tests in
`DeltaProtocolTransitionsSuite`.

## Does this PR introduce _any_ user-facing changes?

<!--
If yes, please clarify the previous behavior and the change this PR
proposes - provide the console output, description and/or an example to
show the behavior difference if possible.
If possible, please also clarify if this is a user-facing change
compared to the released Delta Lake versions or within the unreleased
branches such as master.
If no, write 'No'.
-->
No.
  • Loading branch information
andreaschat-db authored Nov 11, 2024
1 parent e65d06e commit 860438f
Show file tree
Hide file tree
Showing 9 changed files with 466 additions and 150 deletions.
7 changes: 7 additions & 0 deletions spark/src/main/resources/error/delta-error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -967,6 +967,13 @@
],
"sqlState" : "0AKDE"
},
"DELTA_FEATURE_DROP_CHECKPOINT_FAILED" : {
"message" : [
"Dropping <featureName> failed due to a failure in checkpoint creation.",
"Please try again later. It the issue persists, contact support."
],
"sqlState" : "22KD0"
},
"DELTA_FEATURE_DROP_CONFLICT_REVALIDATION_FAIL" : {
"message" : [
"Cannot drop feature because a concurrent transaction modified the table.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ private[delta] class ConflictChecker(
// a rare event and thus not that disruptive if other concurrent transactions fail.
val winningProtocol = winningCommitSummary.protocol.get
val readProtocol = currentTransactionInfo.readSnapshot.protocol
val isWinnerDroppingFeatures = TableFeature.isProtocolRemovingExplicitFeatures(
val isWinnerDroppingFeatures = TableFeature.isProtocolRemovingFeatures(
newProtocol = winningProtocol,
oldProtocol = readProtocol)
if (isWinnerDroppingFeatures) {
Expand All @@ -243,19 +243,46 @@ private[delta] class ConflictChecker(
}
// When the winning transaction does not change the protocol but the losing txn is
// a protocol downgrade, we re-validate the invariants of the removed feature.
// Furthermore, when dropping with the fast drop feature we need to adjust
// requireCheckpointProtectionBeforeVersion.
// TODO: only revalidate against the snapshot of the last interleaved txn.
val currentProtocol = currentTransactionInfo.protocol
val newProtocol = currentTransactionInfo.protocol
val readProtocol = currentTransactionInfo.readSnapshot.protocol
if (TableFeature.isProtocolRemovingExplicitFeatures(currentProtocol, readProtocol)) {
if (TableFeature.isProtocolRemovingFeatures(newProtocol, readProtocol)) {
val winningSnapshot = deltaLog.getSnapshotAt(winningCommitSummary.commitVersion)
val isDowngradeCommitValid = TableFeature.validateFeatureRemovalAtSnapshot(
newProtocol = currentProtocol,
newProtocol = newProtocol,
oldProtocol = readProtocol,
snapshot = winningSnapshot)
if (!isDowngradeCommitValid) {
throw DeltaErrors.dropTableFeatureConflictRevalidationFailed(
winningCommitSummary.commitInfo)
}
// When the current transaction is removing a feature and CheckpointProtectionTableFeature
// is enabled, the current transaction will set the requireCheckpointProtectionBeforeVersion
// table property to the version of the current transaction.
// So we need to update it after resolving conflicts with winning transactions.
if (newProtocol.isFeatureSupported(CheckpointProtectionTableFeature) &&
TableFeature.isProtocolRemovingFeatureWithHistoryProtection(newProtocol, readProtocol)) {
val newVersion = winningCommitSummary.commitVersion + 1L
val newMetadata = CheckpointProtectionTableFeature.metadataWithCheckpointProtection(
currentTransactionInfo.metadata, newVersion)
val newActions = currentTransactionInfo.actions.collect {
// Sanity check.
case m: Metadata if m != currentTransactionInfo.metadata =>
recordDeltaEvent(
deltaLog = currentTransactionInfo.readSnapshot.deltaLog,
opType = "dropFeature.conflictCheck.metadataMissmatch",
data = Map(
"transactionInfoMetadata" -> currentTransactionInfo.metadata,
"actionMetadata" -> m))
CheckpointProtectionTableFeature.metadataWithCheckpointProtection(m, newVersion)
case _: Metadata => newMetadata
case a => a
}
currentTransactionInfo = currentTransactionInfo.copy(
metadata = newMetadata, actions = newActions)
}
}
}

Expand Down
18 changes: 18 additions & 0 deletions spark/src/main/scala/org/apache/spark/sql/delta/DeltaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -835,6 +835,24 @@ trait DeltaConfigsBase extends DeltaLogging {
v => Option(v).map(_.toLong),
validationFunction = _ => true,
"needs to be a long.")

/**
* This property is used by CheckpointProtectionTableFeature and denotes the
* version up to which the checkpoints are required to be cleaned up only together with the
* corresponding commits. If this is not possible, and metadata cleanup creates a new checkpoint
* prior to requireCheckpointProtectionBeforeVersion, it should validate write support against
* all protocols included in the commits that are being removed, or else abort. This is needed
* to make sure that the writer understands how to correctly create a checkpoint for the
* historic commit.
*
* Note, this is an internal config and should never be manually altered.
*/
val REQUIRE_CHECKPOINT_PROTECTION_BEFORE_VERSION = buildConfig[Long](
"requireCheckpointProtectionBeforeVersion",
"0",
_.toLong,
_ >= 0,
"needs to be greater or equal to zero.")
}

object DeltaConfigs extends DeltaConfigsBase
Original file line number Diff line number Diff line change
Expand Up @@ -2435,6 +2435,12 @@ trait DeltaErrorsBase
)
}

def dropTableFeatureCheckpointFailedException(featureName: String): Throwable = {
new DeltaTableFeatureException(
errorClass = "DELTA_FEATURE_DROP_CHECKPOINT_FAILED",
messageParameters = Array(featureName))
}

def concurrentAppendException(
conflictingCommit: Option[CommitInfo],
partition: String,
Expand Down
93 changes: 66 additions & 27 deletions spark/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala
Original file line number Diff line number Diff line change
Expand Up @@ -176,10 +176,11 @@ sealed trait FeatureAutomaticallyEnabledByMetadata { this: TableFeature =>
* The implementation should return true if there are no feature traces in the latest
* version. False otherwise.
*
* c) requiresHistoryTruncation. It indicates whether the table history needs to be clear
* of all feature traces before downgrading the protocol. This is by default true
* for all reader+writer features and false for writer features.
* WARNING: Disabling [[requiresHistoryTruncation]] for relevant features could result to
* c) requiresHistoryProtection. It indicates whether the feature leaves traces in the table
* history that may result in incorrect behaviour if the table is read/written by a client
* that does not support the feature. This is by default true for all reader+writer features
* and false for writer features.
* WARNING: Disabling [[requiresHistoryProtection]] for relevant features could result in
* incorrect snapshot reconstruction.
*
* d) actionUsesFeature. For features that require history truncation we verify whether past
Expand All @@ -200,7 +201,7 @@ sealed trait FeatureAutomaticallyEnabledByMetadata { this: TableFeature =>
sealed trait RemovableFeature { self: TableFeature =>
def preDowngradeCommand(table: DeltaTableV2): PreDowngradeTableFeatureCommand
def validateRemoval(snapshot: Snapshot): Boolean
def requiresHistoryTruncation: Boolean = isReaderWriterFeature
def requiresHistoryProtection: Boolean = isReaderWriterFeature
def actionUsesFeature(action: Action): Boolean

/**
Expand All @@ -226,7 +227,7 @@ sealed trait RemovableFeature { self: TableFeature =>
def historyContainsFeature(
spark: SparkSession,
downgradeTxnReadSnapshot: Snapshot): Boolean = {
require(requiresHistoryTruncation)
require(requiresHistoryProtection)
val deltaLog = downgradeTxnReadSnapshot.deltaLog
val earliestCheckpointVersion = deltaLog.findEarliestReliableCheckpoint.getOrElse(0L)
val toVersion = downgradeTxnReadSnapshot.version
Expand Down Expand Up @@ -365,7 +366,8 @@ object TableFeature {
RowTrackingFeature,
InCommitTimestampTableFeature,
VariantTypeTableFeature,
CoordinatedCommitsTableFeature)
CoordinatedCommitsTableFeature,
CheckpointProtectionTableFeature)
if (DeltaUtils.isTesting && testingFeaturesEnabled) {
features ++= Set(
RedirectReaderWriterFeature,
Expand Down Expand Up @@ -410,22 +412,33 @@ object TableFeature {
allDependentFeaturesMap.getOrElse(feature, Set.empty)

/**
* Extracts the removed (explicit) feature names by comparing new and old protocols.
* Returns None if there are no removed (explicit) features.
* Extracts the removed features by comparing new and old protocols.
* Returns None if there are no removed features.
*/
protected def getDroppedExplicitFeatureNames(
protected def getDroppedFeatures(
newProtocol: Protocol,
oldProtocol: Protocol): Option[Set[String]] = {
val newFeatureNames = newProtocol.implicitlyAndExplicitlySupportedFeatures.map(_.name)
val oldFeatureNames = oldProtocol.implicitlyAndExplicitlySupportedFeatures.map(_.name)
Option(oldFeatureNames -- newFeatureNames).filter(_.nonEmpty)
oldProtocol: Protocol): Set[TableFeature] = {
val newFeatureNames = newProtocol.implicitlyAndExplicitlySupportedFeatures
val oldFeatureNames = oldProtocol.implicitlyAndExplicitlySupportedFeatures
oldFeatureNames -- newFeatureNames
}

/** Identifies whether there was any feature removal between two protocols. */
def isProtocolRemovingFeatures(newProtocol: Protocol, oldProtocol: Protocol): Boolean = {
getDroppedFeatures(newProtocol = newProtocol, oldProtocol = oldProtocol).nonEmpty
}

/**
* Identifies whether there was any feature removal between two protocols.
* Identifies whether there were any features with requiresHistoryProtection removed
* between the two protocols.
*/
def isProtocolRemovingExplicitFeatures(newProtocol: Protocol, oldProtocol: Protocol): Boolean = {
getDroppedExplicitFeatureNames(newProtocol = newProtocol, oldProtocol = oldProtocol).isDefined
def isProtocolRemovingFeatureWithHistoryProtection(
newProtocol: Protocol,
oldProtocol: Protocol): Boolean = {
getDroppedFeatures(newProtocol = newProtocol, oldProtocol = oldProtocol).exists {
case r: RemovableFeature if r.requiresHistoryProtection => true
case _ => false
}
}

/**
Expand All @@ -435,20 +448,20 @@ object TableFeature {
newProtocol: Protocol,
oldProtocol: Protocol,
snapshot: Snapshot): Boolean = {
val droppedFeatureNamesOpt = TableFeature.getDroppedExplicitFeatureNames(
val droppedFeatures = TableFeature.getDroppedFeatures(
newProtocol = newProtocol,
oldProtocol = oldProtocol)
val droppedFeatureName = droppedFeatureNamesOpt match {
case Some(f) if f.size == 1 => f.head
// We do not support dropping more than one features at a time so we have to reject
val droppedFeature = droppedFeatures match {
case f if f.size == 1 => f.head
// We do not support dropping more than one feature at a time so we have to reject
// the validation.
case Some(_) => return false
case None => return true
case f if f.size > 1 => return false
case _ => return true
}

TableFeature.featureNameToFeature(droppedFeatureName) match {
case Some(feature: RemovableFeature) => feature.validateRemoval(snapshot)
case _ => throw DeltaErrors.dropTableFeatureFeatureNotSupportedByClient(droppedFeatureName)
droppedFeature match {
case feature: RemovableFeature => feature.validateRemoval(snapshot)
case _ => throw DeltaErrors.dropTableFeatureFeatureNotSupportedByClient(droppedFeature.name)
}
}
}
Expand Down Expand Up @@ -891,6 +904,32 @@ object VacuumProtocolCheckTableFeature
override def actionUsesFeature(action: Action): Boolean = false
}

/**
* Writer feature that enforces writers to cleanup metadata iff metadata can be cleaned up to
* requireCheckpointProtectionBeforeVersion in one go. This means that a single cleanup
* operation should truncate up to requireCheckpointProtectionBeforeVersion as opposed to
* several cleanup operations truncating in chunks.
*
* The are two exceptions to this rule. If any of the two holds, the rule
* above can be ignored:
*
* a) The writer verifies it supports all protocols between
* [start, min(requireCheckpointProtectionBeforeVersion, targetCleanupVersion)] versions
* it intends to truncate.
* b) The writer does not create any checkpoints during history cleanup and does not erase any
* checkpoints after the truncation version.
*
* The CheckpointProtectionTableFeature can only be removed if history is truncated up to
* at least requireCheckpointProtectionBeforeVersion.
*/
object CheckpointProtectionTableFeature extends WriterFeature(name = "checkpointProtection") {
def metadataWithCheckpointProtection(metadata: Metadata, version: Long): Metadata = {
val versionPropKey = DeltaConfigs.REQUIRE_CHECKPOINT_PROTECTION_BEFORE_VERSION.key
val versionConf = versionPropKey -> version.toString
metadata.copy(configuration = metadata.configuration + versionConf)
}
}

/**
* Features below are for testing only, and are being registered to the system only in the testing
* environment. See [[TableFeature.allSupportedFeaturesMap]] for the registration.
Expand Down Expand Up @@ -1129,5 +1168,5 @@ object TestRemovableWriterWithHistoryTruncationFeature
case _ => false
}

override def requiresHistoryTruncation: Boolean = true
override def requiresHistoryProtection: Boolean = true
}
Original file line number Diff line number Diff line change
Expand Up @@ -242,15 +242,21 @@ trait TableFeatureSupport { this: Protocol =>

/**
* Determine whether this protocol can be safely downgraded to a new protocol `to`.
* All we need is the implicit and explicit features between the two protocols to match,
* excluding the dropped feature. Note, this accounts for cases where we downgrade
* from table features to legacy protocol versions.
* All the implicit and explicit features between the two protocols need to match,
* excluding the dropped feature. We also need to take into account that in some cases
* the downgrade process may add the CheckpointProtectionTableFeature.
*
* Note, the conditions above also account for cases where we downgrade from table features
* to legacy protocol versions.
*/
def canDowngradeTo(to: Protocol, droppedFeatureName: String): Boolean = {
val thisFeatures = this.implicitlyAndExplicitlySupportedFeatures
val toFeatures = to.implicitlyAndExplicitlySupportedFeatures
val allowedNewFeatures: Set[TableFeature] = Set(CheckpointProtectionTableFeature)
val droppedFeature = Seq(droppedFeatureName).flatMap(TableFeature.featureNameToFeature)
(thisFeatures -- droppedFeature) == toFeatures
val newFeatures = toFeatures -- thisFeatures
newFeatures.subsetOf(allowedNewFeatures) &&
(thisFeatures -- droppedFeature == toFeatures -- newFeatures)
}

/**
Expand Down
Loading

0 comments on commit 860438f

Please sign in to comment.