From de19540bb9bd8ed104bc614f3b6b16691d734669 Mon Sep 17 00:00:00 2001 From: Lars Kroll Date: Tue, 22 Aug 2023 19:28:20 +0200 Subject: [PATCH] Fix stream output when the same file occurs with different DVs in the same batch Cherry-pick of d36623f0819a3b9a7e9ff924bb1faa4e9caf6050 for branch-2.4 ## Description There was an edge case in streaming with deletion vectors in the source, where in `ignoreChanges`-mode it could happen that if the same file occurred with different DVs in the same batch (or both with a DV and without a DV), then we would read the file with the wrong DV, since we broadcast the DVs to the scans by data file path. This PR fixes this issue, by reading files from different versions in different scans and then taking the union of the result to build the final `DataFrame` for the batch. ## How was this patch tested? Added new tests for having 2 DML commands (DELETE->DELETE and DELETE->INSERT) in the same batch for all change modi. ## Does this PR introduce _any_ user-facing changes? No. --- .../spark/sql/delta/sources/DeltaSource.scala | 46 +- .../DeltaSourceDeletionVectorsSuite.scala | 415 ++++++++++++++++++ 2 files changed, 449 insertions(+), 12 deletions(-) create mode 100644 core/src/test/scala/org/apache/spark/sql/delta/DeltaSourceDeletionVectorsSuite.scala diff --git a/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSource.scala b/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSource.scala index 32677ee67a..60a96306b7 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSource.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSource.scala @@ -20,9 +20,7 @@ package org.apache.spark.sql.delta.sources import java.io.FileNotFoundException import java.sql.Timestamp -import scala.collection.mutable import scala.util.{Failure, Success, Try} -import scala.util.control.NonFatal import scala.util.matching.Regex import org.apache.spark.sql.delta._ @@ -316,16 +314,40 @@ trait DeltaSourceBase extends Source * @param indexedFiles actions iterator from which to generate the DataFrame. */ protected def createDataFrame(indexedFiles: Iterator[IndexedFile]): DataFrame = { - val addFilesList = indexedFiles - .map(_.getFileAction) - .filter(_.isInstanceOf[AddFile]) - .asInstanceOf[Iterator[AddFile]].toArray - - deltaLog.createDataFrame( - readSchemaSnapshotDescriptor, - addFilesList, - isStreaming = true - ) + val addFiles = indexedFiles + .filter(_.getFileAction.isInstanceOf[AddFile]) + .toSeq + val hasDeletionVectors = + addFiles.exists(_.getFileAction.asInstanceOf[AddFile].deletionVector != null) + if (hasDeletionVectors) { + // Read AddFiles from different versions in different scans. + // This avoids an issue where we might read the same file with different deletion vectors in + // the same scan, which we cannot support as long we broadcast a map of DVs for lookup. + // This code can be removed once we can pass the DVs into the scan directly together with the + // AddFile/PartitionedFile entry. + addFiles + .groupBy(_.version) + .values + .map { addFilesList => + deltaLog.createDataFrame( + readSchemaSnapshotDescriptor, + addFilesList.map(_.getFileAction.asInstanceOf[AddFile]), + isStreaming = true) + } + .reduceOption(_ union _) + .getOrElse { + // If we filtered out all the values before the groupBy, just return an empty DataFrame. + deltaLog.createDataFrame( + readSchemaSnapshotDescriptor, + Seq.empty[AddFile], + isStreaming = true) + } + } else { + deltaLog.createDataFrame( + readSchemaSnapshotDescriptor, + addFiles.map(_.getFileAction.asInstanceOf[AddFile]), + isStreaming = true) + } } /** diff --git a/core/src/test/scala/org/apache/spark/sql/delta/DeltaSourceDeletionVectorsSuite.scala b/core/src/test/scala/org/apache/spark/sql/delta/DeltaSourceDeletionVectorsSuite.scala new file mode 100644 index 0000000000..79ddd1263b --- /dev/null +++ b/core/src/test/scala/org/apache/spark/sql/delta/DeltaSourceDeletionVectorsSuite.scala @@ -0,0 +1,415 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.delta + +import java.io.File + +import scala.util.control.NonFatal + +import org.apache.hadoop.fs.Path +import org.scalatest.concurrent.Eventually +import org.scalatest.concurrent.PatienceConfiguration.Timeout + +import org.apache.spark.sql.delta.test.DeltaSQLCommandTest +import org.apache.spark.sql.execution.streaming.StreamExecution +import org.apache.spark.sql.streaming.{StreamTest, Trigger} +import org.apache.spark.sql.streaming.util.StreamManualClock + +trait DeltaSourceDeletionVectorTests extends StreamTest with DeletionVectorsTestUtils { + + import testImplicits._ + + test("allow to delete files before starting a streaming query") { + withTempDir { inputDir => + val deltaLog = DeltaLog.forTable(spark, new Path(inputDir.toURI)) + (0 until 5).foreach { i => + val v = Seq(i.toString).toDF + v.write.mode("append").format("delta").save(deltaLog.dataPath.toString) + } + sql(s"DELETE FROM delta.`$inputDir`") + (5 until 10).foreach { i => + val v = Seq(i.toString).toDF + v.write.mode("append").format("delta").save(deltaLog.dataPath.toString) + } + deltaLog.checkpoint() + assert(deltaLog.readLastCheckpointFile().nonEmpty, "this test requires a checkpoint") + + val df = spark.readStream + .format("delta") + .load(inputDir.getCanonicalPath) + + testStream(df)( + AssertOnQuery { q => + q.processAllAvailable() + true + }, + CheckAnswer((5 until 10).map(_.toString): _*)) + } + } + + test("allow to delete files before staring a streaming query without checkpoint") { + withTempDir { inputDir => + val deltaLog = DeltaLog.forTable(spark, new Path(inputDir.toURI)) + (0 until 5).foreach { i => + val v = Seq(i.toString).toDF + v.write.mode("append").format("delta").save(deltaLog.dataPath.toString) + } + sql(s"DELETE FROM delta.`$inputDir`") + (5 until 7).foreach { i => + val v = Seq(i.toString).toDF + v.write.mode("append").format("delta").save(deltaLog.dataPath.toString) + } + assert(deltaLog.readLastCheckpointFile().isEmpty, "this test requires no checkpoint") + + val df = spark.readStream + .format("delta") + .load(inputDir.getCanonicalPath) + + testStream(df)( + AssertOnQuery { q => + q.processAllAvailable() + true + }, + CheckAnswer((5 until 7).map(_.toString): _*)) + } + } + + /** + * If deletion vectors are expected here, return true if they are present. If none are expected, + * return true if none are present. + */ + protected def deletionVectorsPresentIfExpected( + inputDir: String, + expectDVs: Boolean): Boolean = { + val deltaLog = DeltaLog.forTable(spark, inputDir) + val filesWithDVs = getFilesWithDeletionVectors(deltaLog) + logWarning(s"Expecting DVs=$expectDVs - found ${filesWithDVs.size}") + if (expectDVs) { + filesWithDVs.nonEmpty + } else { + filesWithDVs.isEmpty + } + } + + private def ignoreOperationsTest( + inputDir: String, + sourceOptions: Seq[(String, String)], + sqlCommand: String, + commandShouldProduceDVs: Option[Boolean] = None)(expectations: StreamAction*): Unit = { + (0 until 10 by 2).foreach { i => + Seq(i, i + 1).toDF().coalesce(1).write.format("delta").mode("append").save(inputDir) + } + + val df = spark.readStream.format("delta").options(sourceOptions.toMap).load(inputDir) + val expectDVs = commandShouldProduceDVs.getOrElse( + sqlCommand.toUpperCase().startsWith("DELETE")) + + val base = Seq( + AssertOnQuery { q => + q.processAllAvailable() + true + }, + CheckAnswer((0 until 10): _*), + AssertOnQuery { q => + sql(sqlCommand) + deletionVectorsPresentIfExpected(inputDir, expectDVs) + }) + + testStream(df)((base ++ expectations): _*) + } + + private def ignoreOperationsTestWithManualClock( + inputDir: String, + sourceOptions: Seq[(String, String)], + sqlCommand1: String, + sqlCommand2: String, + command1ShouldProduceDVs: Option[Boolean] = None, + command2ShouldProduceDVs: Option[Boolean] = None, + expectations: List[StreamAction]): Unit = { + + (0 until 15 by 3).foreach { i => + Seq(i, i + 1, i + 2).toDF().coalesce(1).write.format("delta").mode("append").save(inputDir) + } + val log = DeltaLog.forTable(spark, inputDir) + val commitVersionBeforeDML = log.update().version + val df = spark.readStream.format("delta").options(sourceOptions.toMap).load(inputDir) + def expectDVsInCommand(shouldProduceDVs: Option[Boolean], command: String): Boolean = { + shouldProduceDVs.getOrElse(command.toUpperCase().startsWith("DELETE")) + } + val expectDVsInCommand1 = expectDVsInCommand(command1ShouldProduceDVs, sqlCommand1) + val expectDVsInCommand2 = expectDVsInCommand(command2ShouldProduceDVs, sqlCommand2) + + // If it's expected to fail we must be sure not to actually process it in here, + // or it'll fail too early instead of being caught by ExpectFailure. + val shouldFailAfterCommands = expectations.exists(_.isInstanceOf[ExpectFailure[_]]) + + val baseActions: Seq[StreamAction] = Seq( + StartStream( + Trigger.ProcessingTime("10 seconds"), + new StreamManualClock(System.currentTimeMillis())), + AdvanceManualClock(10L * 1000L), + CheckAnswer((0 until 15): _*), + AssertOnQuery { q => + // Make sure we only processed a single batch since the initial data load. + q.commitLog.getLatestBatchId().get == 0 + }, + AssertOnQuery { q => + sql(sqlCommand1) + deletionVectorsPresentIfExpected(inputDir, expectDVsInCommand1) + }, + AssertOnQuery { q => + sql(sqlCommand2) + deletionVectorsPresentIfExpected(inputDir, expectDVsInCommand2) + }, + AdvanceManualClock(20L * 1000L)) ++ + (if (shouldFailAfterCommands) { + Seq.empty[StreamAction] + } else { + Seq( + // This makes it move to the next batch. + AssertOnQuery(waitUntilBatchProcessed(1, _)), + AssertOnQuery { q => + eventually("Next batch was never processed") { + // Make sure we only processed a single batch since the initial data load. + assert(q.commitLog.getLatestBatchId().get === 1) + } + true + }) + }) + + testStream(df)((baseActions ++ expectations): _*) + } + + protected def waitUntilBatchProcessed(batchId: Int, currentStream: StreamExecution): Boolean = { + eventually("Next batch was never processed") { + if (!currentStream.exception.isDefined) { + assert(currentStream.commitLog.getLatestBatchId().get >= batchId) + } + } + if (currentStream.exception.isDefined) { + throw currentStream.exception.get + } + true + } + + protected def eventually[T](message: String)(func: => T): T = { + try { + Eventually.eventually(Timeout(streamingTimeout)) { + func + } + } catch { + case NonFatal(e) => + fail(message, e) + } + } + + testQuietly(s"deleting files fails query if ignoreDeletes = false") { + withTempDir { inputDir => + ignoreOperationsTest( + inputDir.getAbsolutePath, + sourceOptions = Nil, + sqlCommand = s"DELETE FROM delta.`$inputDir`", + // Whole table deletes do not produce DVs. + commandShouldProduceDVs = Some(false))(ExpectFailure[DeltaUnsupportedOperationException] { + e => + for (msg <- Seq("Detected deleted data", "not supported", "ignoreDeletes", "true")) { + assert(e.getMessage.contains(msg)) + } + }) + } + } + + Seq("ignoreFileDeletion", DeltaOptions.IGNORE_DELETES_OPTION).foreach { ignoreDeletes => + testQuietly( + s"allow to delete files after staring a streaming query when $ignoreDeletes is true") { + withTempDir { inputDir => + ignoreOperationsTest( + inputDir.getAbsolutePath, + sourceOptions = Seq(ignoreDeletes -> "true"), + sqlCommand = s"DELETE FROM delta.`$inputDir`", + // Whole table deletes do not produce DVs. + commandShouldProduceDVs = Some(false))( + AssertOnQuery { q => + Seq(10).toDF().write.format("delta").mode("append").save(inputDir.getAbsolutePath) + q.processAllAvailable() + true + }, + CheckAnswer((0 to 10): _*)) + } + } + } + + case class SourceChangeVariant( + label: String, + query: File => String, + answerWithIgnoreChanges: Seq[Int]) + + val sourceChangeVariants: Seq[SourceChangeVariant] = Seq( + // A partial file delete is treated like an update by the Source. + SourceChangeVariant( + label = "DELETE", + query = inputDir => s"DELETE FROM delta.`$inputDir` WHERE value = 3", + // 2 occurs in the same file as 3, so it gets duplicated during processing. + answerWithIgnoreChanges = (0 to 10) :+ 2)) + + for (variant <- sourceChangeVariants) + testQuietly( + "updating the source table causes failure when ignoreChanges = false" + + s" - using ${variant.label}") { + withTempDir { inputDir => + ignoreOperationsTest( + inputDir.getAbsolutePath, + sourceOptions = Nil, + sqlCommand = variant.query(inputDir))( + ExpectFailure[DeltaUnsupportedOperationException] { e => + for (msg <- Seq("data update", "not supported", "skipChangeCommits", "true")) { + assert(e.getMessage.contains(msg)) + } + }) + } + } + + for (variant <- sourceChangeVariants) + testQuietly( + "allow to update the source table when ignoreChanges = true" + + s" - using ${variant.label}") { + withTempDir { inputDir => + ignoreOperationsTest( + inputDir.getAbsolutePath, + sourceOptions = Seq(DeltaOptions.IGNORE_CHANGES_OPTION -> "true"), + sqlCommand = variant.query(inputDir))( + AssertOnQuery { q => + Seq(10).toDF().write.format("delta").mode("append").save(inputDir.getAbsolutePath) + q.processAllAvailable() + true + }, + CheckAnswer(variant.answerWithIgnoreChanges: _*)) + } + } + + testQuietly("deleting files when ignoreChanges = true doesn't fail the query") { + withTempDir { inputDir => + ignoreOperationsTest( + inputDir.getAbsolutePath, + sourceOptions = Seq(DeltaOptions.IGNORE_CHANGES_OPTION -> "true"), + sqlCommand = s"DELETE FROM delta.`$inputDir`", + // Whole table deletes do not produce DVs. + commandShouldProduceDVs = Some(false))( + AssertOnQuery { q => + Seq(10).toDF().write.format("delta").mode("append").save(inputDir.getAbsolutePath) + q.processAllAvailable() + true + }, + CheckAnswer((0 to 10): _*)) + } + } + + for (variant <- sourceChangeVariants) + testQuietly("updating source table when ignoreDeletes = true fails the query" + + s" - using ${variant.label}") { + withTempDir { inputDir => + ignoreOperationsTest( + inputDir.getAbsolutePath, + sourceOptions = Seq(DeltaOptions.IGNORE_DELETES_OPTION -> "true"), + sqlCommand = variant.query(inputDir))( + ExpectFailure[DeltaUnsupportedOperationException] { e => + for (msg <- Seq("data update", "not supported", "skipChangeCommits", "true")) { + assert(e.getMessage.contains(msg)) + } + }) + } + } + + private val allSourceOptions = Seq( + Nil, + List(DeltaOptions.IGNORE_DELETES_OPTION), + List(DeltaOptions.IGNORE_CHANGES_OPTION), + List(DeltaOptions.SKIP_CHANGE_COMMITS_OPTION)) + .map { options => + options.map(key => key -> "true") + } + + for (sourceOption <- allSourceOptions) + testQuietly("subsequent DML commands are processed correctly in a batch - DELETE->DELETE" + + s" - $sourceOption") { + val expectations: List[StreamAction] = + sourceOption.map(_._1) match { + case List(DeltaOptions.IGNORE_DELETES_OPTION) | Nil => + // These two do not allow updates. + ExpectFailure[DeltaUnsupportedOperationException] { e => + for (msg <- Seq("data update", "not supported", "skipChangeCommits", "true")) { + assert(e.getMessage.contains(msg)) + } + } :: Nil + case List(DeltaOptions.IGNORE_CHANGES_OPTION) => + // The 4 and 5 are in the same file as 3, so the first DELETE is going to duplicate them. + // 5 is still in the same file as 4 after the first DELETE, so the second DELETE is going + // to duplicate it again. + CheckAnswer((0 until 15) ++ Seq(4, 5, 5): _*) :: Nil + case List(DeltaOptions.SKIP_CHANGE_COMMITS_OPTION) => + // This will completely ignore the DELETEs. + CheckAnswer((0 until 15): _*) :: Nil + } + + withTempDir { inputDir => + ignoreOperationsTestWithManualClock( + inputDir.getAbsolutePath, + sourceOptions = sourceOption, + sqlCommand1 = s"DELETE FROM delta.`$inputDir` WHERE value == 3", + sqlCommand2 = s"DELETE FROM delta.`$inputDir` WHERE value == 4", + expectations = expectations) + } + } + + for (sourceOption <- allSourceOptions) + testQuietly("subsequent DML commands are processed correctly in a batch - INSERT->UPDATE" + + s" - $sourceOption") { + val expectations: List[StreamAction] = sourceOption.map(_._1) match { + case List(DeltaOptions.IGNORE_DELETES_OPTION) | Nil => + // These two do not allow updates. + ExpectFailure[DeltaUnsupportedOperationException] { e => + for (msg <- Seq("data update", "not supported", "skipChangeCommits", "true")) { + assert(e.getMessage.contains(msg)) + } + } :: Nil + case List(DeltaOptions.IGNORE_CHANGES_OPTION) => + // 15 and 16 are in the same file, so 16 will get duplicated by the DELETE. + CheckAnswer((0 to 16) ++ Seq(16): _*) :: Nil + case List(DeltaOptions.SKIP_CHANGE_COMMITS_OPTION) => + // This will completely ignore the DELETE. + CheckAnswer((0 to 16): _*) :: Nil + } + + withTempDir { inputDir => + ignoreOperationsTestWithManualClock( + inputDir.getAbsolutePath, + sourceOptions = sourceOption, + sqlCommand1 = + s"INSERT INTO delta.`$inputDir` SELECT /*+ COALESCE(1) */ * FROM VALUES 15, 16", + sqlCommand2 = s"DELETE FROM delta.`$inputDir` WHERE value == 15", + expectations = expectations) + } + } +} + +class DeltaSourceDeletionVectorsSuite extends DeltaSourceSuiteBase + with DeltaSQLCommandTest + with DeltaSourceDeletionVectorTests { + override def beforeAll(): Unit = { + super.beforeAll() + enableDeletionVectorsInNewTables(spark.conf) + } +}