Skip to content

Commit

Permalink
Add config to run MERGE source materialization eagerly
Browse files Browse the repository at this point in the history
## Description

(Cherry-pick of 4932956 to branch-2.3)

Add config to run MERGE source materialization eagerly.
Add a new config that that controls whether the MERGE source is materialized eagerly or lazily.
New config is set to eager by default to avoid suspected determinism issues with lazy materialization.

## How was this patch tested?
Run most sensitive tests with both eager and lazy settings, the rest only with the default to not blow up the CI time unnecessarily.
  • Loading branch information
larsk-db authored Jul 24, 2023
1 parent c6100ac commit b8a4957
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -257,12 +257,11 @@ trait MergeIntoMaterializeSource extends DeltaLogging {
val sourceWithSelectedColumns = Project(referencedSourceColumns, source)
val baseSourcePlanDF = Dataset.ofRows(spark, sourceWithSelectedColumns)

// Caches the source in RDD cache using localCheckpopoint, which cuts away the RDD lineage,
// Caches the source in RDD cache using localCheckpoint, which cuts away the RDD lineage,
// which shall ensure that the source cannot be recomputed and thus become inconsistent.
val checkpointedSourcePlanDF = baseSourcePlanDF
// eager = false makes it be executed and materialized first time it's used.
// Doing it lazily inside the query lets it interleave this work better with other work.
// On the other hand, it makes it impossible to measure the time it took in a metric.
// Set eager=false for now, even if we should be doing eager, so that we can set the storage
// level before executing.
.localCheckpoint(eager = false)

// We have to reach through the crust and into the plan of the checkpointed DF
Expand All @@ -289,6 +288,20 @@ trait MergeIntoMaterializeSource extends DeltaLogging {
)
rdd.persist(storageLevel)

// WARNING: if eager == false, the source used during the first Spark Job that uses this may
// still be inconsistent with source materialized afterwards.
// This is because doCheckpoint that finalizes the lazy checkpoint is called after the Job
// that triggered the lazy checkpointing finished.
// If blocks were lost during that job, they may still get recomputed and changed compared
// to how they were used during the execution of the job.
if (spark.conf.get(DeltaSQLConf.MERGE_MATERIALIZE_SOURCE_EAGER)) {
// Force the evaluation of the `rdd`, since we cannot access `doCheckpoint()` from here.
rdd
.mapPartitions(_ => Iterator.empty.asInstanceOf[Iterator[InternalRow]])
.foreach((_: InternalRow) => ())
assert(rdd.isCheckpointed)
}

logDebug(s"Materializing MERGE with pruned columns $referencedSourceColumns. ")
logDebug(s"Materialized MERGE source plan:\n${sourceDF.get.queryExecution}")
materializeReason
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -510,6 +510,13 @@ trait DeltaSQLConfBase {
.intConf
.createWithDefault(4)

val MERGE_MATERIALIZE_SOURCE_EAGER =
buildConf("merge.materializeSource.eager")
.internal()
.doc("Materialize the source eagerly before Job 1")
.booleanConf
.createWithDefault(true)

val DELTA_LAST_COMMIT_VERSION_IN_SESSION =
buildConf("lastCommitVersionInSession")
.doc("The version of the last commit made in the SparkSession for any table.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.spark.sql.delta

import scala.collection.mutable
import scala.concurrent.duration._
import scala.reflect.ClassTag
import scala.util.control.NonFatal

Expand Down Expand Up @@ -77,19 +78,22 @@ trait MergeIntoMaterializeSourceTests



test("merge logs out of disk errors") {
val injectEx = new java.io.IOException("No space left on device")
testWithCustomErrorInjected[SparkException](injectEx) { (thrownEx, errorOpt) =>
// Compare messages instead of instances, since the equals method for these exceptions
// takes more into account.
assert(thrownEx.getCause.getMessage === injectEx.getMessage)
assert(errorOpt.isDefined)
val error = errorOpt.get
assert(error.errorType == MergeIntoMaterializeSourceErrorType.OUT_OF_DISK.toString)
assert(error.attempt == 1)
val storageLevel = StorageLevel.fromString(
spark.conf.get(DeltaSQLConf.MERGE_MATERIALIZE_SOURCE_RDD_STORAGE_LEVEL))
assert(error.materializedSourceRDDStorageLevel == storageLevel.toString)
for (eager <- BOOLEAN_DOMAIN)
test(s"merge logs out of disk errors - eager=$eager") {
withSQLConf(DeltaSQLConf.MERGE_MATERIALIZE_SOURCE_EAGER.key -> eager.toString) {
val injectEx = new java.io.IOException("No space left on device")
testWithCustomErrorInjected[SparkException](injectEx) { (thrownEx, errorOpt) =>
// Compare messages instead of instances, since the equals method for these exceptions
// takes more into account.
assert(thrownEx.getCause.getMessage === injectEx.getMessage)
assert(errorOpt.isDefined)
val error = errorOpt.get
assert(error.errorType == MergeIntoMaterializeSourceErrorType.OUT_OF_DISK.toString)
assert(error.attempt == 1)
val storageLevel = StorageLevel.fromString(
spark.conf.get(DeltaSQLConf.MERGE_MATERIALIZE_SOURCE_RDD_STORAGE_LEVEL))
assert(error.materializedSourceRDDStorageLevel == storageLevel.toString)
}
}
}

Expand Down Expand Up @@ -196,10 +200,11 @@ trait MergeIntoMaterializeSourceTests
spark.conf.get(DeltaSQLConf.MERGE_MATERIALIZE_SOURCE_RDD_STORAGE_LEVEL_RETRY)
}
)
if (rdd.getStorageLevel != expectedStorageLevel) {
val rddStorageLevel = rdd.getStorageLevel
if (rddStorageLevel != expectedStorageLevel) {
invalidStorageLevel =
Some(s"For attempt ${seenSources.size} of materialized source expected " +
s"$expectedStorageLevel but got ${rdd.getStorageLevel}")
s"$expectedStorageLevel but got ${rddStorageLevel}")
finished = true
}
logInfo(s"Unpersisting mergeMaterializedSource with id=$rddId")
Expand Down Expand Up @@ -244,36 +249,48 @@ trait MergeIntoMaterializeSourceTests
val tblName = "target"

// For 1 to maxAttempts - 1 RDD block lost failures, merge should retry and succeed.
(1 to maxAttempts - 1).foreach { kills =>
test(s"materialize source unpersist with $kills kill attempts succeeds") {
for {
eager <- BOOLEAN_DOMAIN
kills <- 1 to maxAttempts - 1
} {
test(s"materialize source unpersist with $kills kill attempts succeeds - eager=$eager") {
withTable(tblName) {
val allDeltaEvents = testMergeMaterializedSourceUnpersist(tblName, kills)
val events = allDeltaEvents.filter(_.tags.get("opType").contains("delta.dml.merge.stats"))
assert(events.length == 1, s"allDeltaEvents:\n$allDeltaEvents")
val mergeStats = JsonUtils.fromJson[MergeStats](events(0).blob)
assert(mergeStats.materializeSourceAttempts.isDefined,
s"MergeStats:\n$mergeStats")
assert(mergeStats.materializeSourceAttempts.get == kills + 1,
s"MergeStats:\n$mergeStats")

// Check query result after merge
val tab = sql(s"select * from $tblName order by id")
.collect().map(row => row.getLong(0)).toSeq
assert(tab == (0L until 90L) ++ (100L until 120L))
withSQLConf(DeltaSQLConf.MERGE_MATERIALIZE_SOURCE_EAGER.key -> eager.toString) {
val allDeltaEvents = testMergeMaterializedSourceUnpersist(tblName, kills)
val events =
allDeltaEvents.filter(_.tags.get("opType").contains("delta.dml.merge.stats"))
assert(events.length == 1, s"allDeltaEvents:\n$allDeltaEvents")
val mergeStats = JsonUtils.fromJson[MergeStats](events(0).blob)
assert(mergeStats.materializeSourceAttempts.isDefined, s"MergeStats:\n$mergeStats")
assert(
mergeStats.materializeSourceAttempts.get == kills + 1,
s"MergeStats:\n$mergeStats")

// Check query result after merge
val tab = sql(s"select * from $tblName order by id")
.collect()
.map(row => row.getLong(0))
.toSeq
assert(tab == (0L until 90L) ++ (100L until 120L))
}
}
}
}

// Eventually it should fail after exceeding maximum number of attempts.
test(s"materialize source unpersist with $maxAttempts kill attempts fails") {
withTable(tblName) {
val allDeltaEvents = testMergeMaterializedSourceUnpersist(tblName, maxAttempts)
val events = allDeltaEvents
.filter(_.tags.get("opType").contains(MergeIntoMaterializeSourceError.OP_TYPE))
assert(events.length == 1, s"allDeltaEvents:\n$allDeltaEvents")
val error = JsonUtils.fromJson[MergeIntoMaterializeSourceError](events(0).blob)
assert(error.errorType == MergeIntoMaterializeSourceErrorType.RDD_BLOCK_LOST.toString)
assert(error.attempt == maxAttempts)
for (eager <- BOOLEAN_DOMAIN) {
test(s"materialize source unpersist with $maxAttempts kill attempts fails - eager=$eager") {
withSQLConf(DeltaSQLConf.MERGE_MATERIALIZE_SOURCE_EAGER.key -> eager.toString) {
withTable(tblName) {
val allDeltaEvents = testMergeMaterializedSourceUnpersist(tblName, maxAttempts)
val events = allDeltaEvents
.filter(_.tags.get("opType").contains(MergeIntoMaterializeSourceError.OP_TYPE))
assert(events.length == 1, s"allDeltaEvents:\n$allDeltaEvents")
val error = JsonUtils.fromJson[MergeIntoMaterializeSourceError](events(0).blob)
assert(error.errorType == MergeIntoMaterializeSourceErrorType.RDD_BLOCK_LOST.toString)
assert(error.attempt == maxAttempts)
}
}
}
}
}
Expand Down Expand Up @@ -327,14 +344,17 @@ trait MergeIntoMaterializeSourceTests
hints
}

test("materialize source preserves dataframe hints") {
for (eager <- BOOLEAN_DOMAIN)
test(s"materialize source preserves dataframe hints - eager=$eager") {
withTable("A", "B", "T") {
sql("select id, id as v from range(50000)").write.format("delta").saveAsTable("T")
sql("select id, id+2 as v from range(10000)").write.format("csv").saveAsTable("A")
sql("select id, id*2 as v from range(1000)").write.format("csv").saveAsTable("B")

// Manually added broadcast hint will mess up the expected hints hence disable it
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
withSQLConf(
DeltaSQLConf.MERGE_MATERIALIZE_SOURCE_EAGER.key -> eager.toString,
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
// Simple BROADCAST hint
val hSimple = getHints(
sql("MERGE INTO T USING (SELECT /*+ BROADCAST */ * FROM A) s ON T.id = s.id" +
Expand Down

0 comments on commit b8a4957

Please sign in to comment.