Skip to content

Commit

Permalink
test
Browse files Browse the repository at this point in the history
  • Loading branch information
sezruby committed Oct 4, 2023
1 parent 2f0c669 commit abeb90b
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2919,4 +2919,40 @@ abstract class MergeIntoSuiteBase
customConditionErrorRegex =
Option("Aggregate functions are not supported in the .* condition of MERGE operation.*")
)

Seq(true, false).foreach { differentActiveSession =>
test("merge should use the same SparkSession consistently, differentActiveSession: " +
s"$differentActiveSession") {
withTempDir { dir =>
withSQLConf(DeltaSQLConf.DELTA_SCHEMA_AUTO_MIGRATE.key -> "false") {
val r = dir.getCanonicalPath
val sourcePath = s"$r/source"
val targetPath = s"$r/target"
val numSourceRecords = 20
spark.range(numSourceRecords)
.withColumn("x", $"id")
.withColumn("y", $"id")
.write.mode("overwrite").format("delta").save(sourcePath)
spark.range(1)
.withColumn("x", $"id")
.write.mode("overwrite").format("delta").save(targetPath)
val spark2 = if (differentActiveSession) {
spark.newSession
} else {
spark
}
spark2.conf.set(DeltaSQLConf.DELTA_SCHEMA_AUTO_MIGRATE.key, "true")
val target = io.delta.tables.DeltaTable.forPath(spark2, targetPath)
val source = spark.read.format("delta").load(sourcePath).alias("s")
val merge = target.alias("t")
.merge(source, "t.id = s.id")
.whenMatched.updateExpr(Map("t.x" -> "t.x + 1"))
.whenNotMatched.insertAll()
.execute()
// The target table should have the same number of rows as the source after the merge
assert(spark.read.format("delta").load(targetPath).count() == numSourceRecords)
}
}
}
}
}
29 changes: 29 additions & 0 deletions python/delta/tests/test_deltatable.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,35 @@ def reset_table():
dt.merge(source, "key = k").whenNotMatchedInsert(
values="k = 'a'", condition={"value": 1})

def test_merge_with_inconsistent_sessions(self) -> None:
source_path = os.path.join(self.tempFile, "source")
target_path = os.path.join(self.tempFile, "target")
spark = self.spark

def f(spark):
spark.range(20) \
.withColumn("x", col("id")) \
.withColumn("y", col("id")) \
.write.mode("overwrite").format("delta").save(source_path)
spark.range(1) \
.withColumn("x", col("id")) \
.write.mode("overwrite").format("delta").save(target_path)
target = DeltaTable.forPath(spark, target_path)
source = spark.read.format("delta").load(source_path).alias("s")
target.alias("t") \
.merge(source, "t.id = s.id") \
.whenMatchedUpdate(set={"t.x": "t.x + 1"}) \
.whenNotMatchedInsertAll() \
.execute()
assert(spark.read.format("delta").load(target_path).count() == 20)

pool = ThreadPool(1)
spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", "true")
try:
pool.starmap(f, [(spark,)])
finally:
spark.conf.unset("spark.databricks.delta.schema.autoMerge.enabled")

def test_history(self):
self.__writeDeltaTable([('a', 1), ('b', 2), ('c', 3)])
self.__overwriteDeltaTable([('a', 3), ('b', 2), ('c', 1)])
Expand Down

0 comments on commit abeb90b

Please sign in to comment.