Skip to content

Commit

Permalink
Fix stream output when the same file occurs with different DVs in the…
Browse files Browse the repository at this point in the history
… same batch

Cherry-pick of d36623f for branch-2.4

## Description

There was an edge case in streaming with deletion vectors in the source, where in `ignoreChanges`-mode it could happen that if the same file occurred with different DVs in the same batch (or both with a DV and without a DV), then we would read the file with the wrong DV, since we broadcast the DVs to the scans by data file path.

This PR fixes this issue, by reading files from different versions in different scans and then taking the union of the result to build the final `DataFrame` for the batch.

## How was this patch tested?

Added new tests for having 2 DML commands (DELETE->DELETE and DELETE->INSERT) in the same batch for all change modi.

## Does this PR introduce _any_ user-facing changes?

No.
  • Loading branch information
larsk-db authored Aug 22, 2023
1 parent adeda1e commit de19540
Show file tree
Hide file tree
Showing 2 changed files with 449 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,7 @@ package org.apache.spark.sql.delta.sources
import java.io.FileNotFoundException
import java.sql.Timestamp

import scala.collection.mutable
import scala.util.{Failure, Success, Try}
import scala.util.control.NonFatal
import scala.util.matching.Regex

import org.apache.spark.sql.delta._
Expand Down Expand Up @@ -316,16 +314,40 @@ trait DeltaSourceBase extends Source
* @param indexedFiles actions iterator from which to generate the DataFrame.
*/
protected def createDataFrame(indexedFiles: Iterator[IndexedFile]): DataFrame = {
val addFilesList = indexedFiles
.map(_.getFileAction)
.filter(_.isInstanceOf[AddFile])
.asInstanceOf[Iterator[AddFile]].toArray

deltaLog.createDataFrame(
readSchemaSnapshotDescriptor,
addFilesList,
isStreaming = true
)
val addFiles = indexedFiles
.filter(_.getFileAction.isInstanceOf[AddFile])
.toSeq
val hasDeletionVectors =
addFiles.exists(_.getFileAction.asInstanceOf[AddFile].deletionVector != null)
if (hasDeletionVectors) {
// Read AddFiles from different versions in different scans.
// This avoids an issue where we might read the same file with different deletion vectors in
// the same scan, which we cannot support as long we broadcast a map of DVs for lookup.
// This code can be removed once we can pass the DVs into the scan directly together with the
// AddFile/PartitionedFile entry.
addFiles
.groupBy(_.version)
.values
.map { addFilesList =>
deltaLog.createDataFrame(
readSchemaSnapshotDescriptor,
addFilesList.map(_.getFileAction.asInstanceOf[AddFile]),
isStreaming = true)
}
.reduceOption(_ union _)
.getOrElse {
// If we filtered out all the values before the groupBy, just return an empty DataFrame.
deltaLog.createDataFrame(
readSchemaSnapshotDescriptor,
Seq.empty[AddFile],
isStreaming = true)
}
} else {
deltaLog.createDataFrame(
readSchemaSnapshotDescriptor,
addFiles.map(_.getFileAction.asInstanceOf[AddFile]),
isStreaming = true)
}
}

/**
Expand Down
Loading

0 comments on commit de19540

Please sign in to comment.