-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Spark][1.0] Fix a data loss bug in MergeIntoCommand #2128
base: branch-1.0
Are you sure you want to change the base?
Conversation
3b9af6a
to
09c9d83
Compare
Seems the fix is needed for all other versions. in MergeIntoCommand
|
@johanl-db Could you please take a look into this? |
Thanks @sezruby for contributing a fix.
This pattern of accessing the spark conf is fairly widespread, I suspect a lot of things would break if there was an issue with it. You're seeing different values for |
1 => We have logs and I can find the case in spark 3.2/1.2 too, using
I think the issue exists higher version too, as it's the same code. 2, 4 => no repro test, but I guess the repro condition is
3 => I need to confirm that each MERGE request uses the same spark session or different spark sessions(I guess it's different spark sessions). However, both case could be the problem. We should throw the exception if the config is changed within the same spark session, too. The problem is, SQLConf.get refers cached conf, which is from getActiveSession() But the active session can be changed before SQLConf is cached for the thread. Yes I know many places use the conf from the active session. If the config affects correctness, we should avoid that. |
I confirmed they are using multi threads (python) to run MERGE command to tables concurrently, but not changing the config nor creating a new session for each thread. I suspect default session is set when one of threads is exiting withActive. |
Here's pyspark repro: # repro.py
import tempfile
from multiprocessing.pool import ThreadPool
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from delta.tables import DeltaTable
spark = SparkSession.builder.appName("Test").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
with tempfile.TemporaryDirectory() as temp_dir:
source_path = f"{temp_dir}/source"
target_path = f"{temp_dir}/target"
def f(spark):
print(f"spark = {spark}")
print(f"active session = {SparkSession.getActiveSession()}")
print(f"autoMerge from SQLConf.get = {spark._jvm.org.apache.spark.sql.internal.SQLConf.get().getConfString('spark.databricks.delta.schema.autoMerge.enabled', '<not set>')}")
spark.range(20) \
.withColumn("x", col("id")) \
.withColumn("y", col("id")) \
.write.mode("overwrite").format("delta").save(source_path)
spark.range(1) \
.withColumn("x", col("id")) \
.write.mode("overwrite").format("delta").save(target_path)
# Idk why but DeltaTable.forPath(spark, target_path) fails
target = DeltaTable(spark, spark._jvm.io.delta.tables.DeltaTable.forPath(spark._jsparkSession, target_path))
source = spark.read.format("delta").load(source_path).alias("s")
target.alias("t") \
.merge(source, "t.id = s.id") \
.whenMatchedUpdate(set = {"t.x": "t.x + 1"}) \
.whenNotMatchedInsertAll() \
.execute()
spark.read.format("delta").load(target_path).show()
assert spark.read.format("delta").load(target_path).count() == 20
pool = ThreadPool(1)
pool.starmap(f, [(spark,)]) Run as:
This is because a new thread is created in the JVM process and that thread doesn't inherit any ThreadLocal variables, including the active session ref. I think a better fix (if we want to minimize changes) is wrapping A more fundamental fix would be to remove all references to the active session and ensure we only reference the same SparkSession throughout the operation. Here's Scala repro: withTempDir { dir =>
val r = dir.getCanonicalPath
val sourcePath = s"$r/source"
val targetPath = s"$r/target"
val spark2 = spark.newSession
spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", "true")
spark.range(20)
.withColumn("x", $"id")
.withColumn("y", $"id")
.write.mode("overwrite").format("delta").save(sourcePath)
spark.range(1)
.withColumn("x", $"id")
.write.mode("overwrite").format("delta").save(targetPath)
val target = io.delta.tables.DeltaTable.forPath(spark, targetPath)
val source = spark.read.format("delta").load(sourcePath).alias("s")
val merge = target.alias("t")
.merge(source, "t.id = s.id")
.whenMatched.updateExpr(Map("t.x" -> "t.x + 1"))
.whenNotMatched.insertAll()
SparkSession.setActiveSession(spark2)
merge.execute()
assert(spark.read.format("delta").load(targetPath).count() == 20)
} |
cc @scottsand-db for 3.0 |
Rephrasing your findings @clee704, let me know if you disagree: So the issue is essentially that in DeltaMergeBuilder.execute(), we use the spark session from the target table that gets propagated to the analysis step and is ultimately the one used here to get the schema evolution conf value.
but in MergeIntoCommand, we get the value from I agree with wrapping execution with Other commands may also suffer from the same issue, we should review if UPDATE/INSERT, DELETE, or utility commands need the same fix |
Signed-off-by: Eunjin Song <[email protected]> Co-authored-by: Chungmin Lee <[email protected]>
@johanl-db Yes, that's a good summary. For other potential issues, we can make a follow up. |
@johanl-db Any delivery plan from Databricks by any chance? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sezruby The change looks good, we should move forward with it and cover all Delta command execution that we identify.
Having a test for each of them would be nice to have if you can but the tests you already have already highlight the issue well enough to justify this fix.
AFAICT, the issue is present in all Delta versions but for MERGE specifically it no longer silently produces incorrect results but fails instead starting with ead1ff0 which will be included with the next release.
I'm not very familiar with our backport and release policy for older delta branches, I'll need to check. You may want to target master
with this PR first and then we can work on backporting as needed.
core/src/main/scala/io/delta/tables/execution/DeltaTableOperations.scala
Outdated
Show resolved
Hide resolved
core/src/test/scala/org/apache/spark/sql/delta/MergeIntoSuiteBase.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/sql/delta/DeltaTable.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/io/delta/tables/execution/DeltaConvert.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
@scottsand-db This is a bug that impacts all delta versions, what's the process to get the fix to existing branches?
I created cherry-picks to all 2.x branches and master:
@sezruby do you mind quickly going over them to check that I didn't mess anything? |
@sezruby I updated the changes against all branches except 1.0 to cover executeDetails, executeRestore, executeGenerate and optimize. Can you update this PR against 1.0 before we merge it? |
I updated for executeGenerate. No other command available in 1.0 |
@johanl-db, this PR, #2154 is related to this issue? |
Same category of issues but these are actually unrelated:
Short update from my side on this PR since it has been stalled for a while: it's still on my radar but things have been a bit slow due to the 3.0 release. The fix was merged in 3.0 before the release, I still need to cover other branches. |
This is a cherry-pick of #2128 to the master branch. #### Which Delta project/connector is this regarding? -Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [ ] Other (fill in here) ## Description Fix a data loss bug in MergeIntoCommand. It's caused by using different spark session config object for PreprocessTableMerge and MergeIntoCommand, which is possible when multiple spark sessions are running concurrently. If source dataframe has more columns than target table, auto schema merge feature adds additional nullable column to target table schema. The updated output projection built in PreprocessTableMerge, so `matchedClauses` and `notMatchedClauses` contains the addtional columns, but target table schema in MergeIntoCommand doesn't have it. As a result, the following index doesn't indicate the delete flag column index, which is `numFields - 2`. ``` def shouldDeleteRow(row: InternalRow): Boolean = row.getBoolean(outputRowEncoder.schema.fields.size) ``` row.getBoolean returns `getByte() != 0`, which causes dropping rows randomly. - matched rows in target table loss Also as autoMerge doesn't work - newly added column data in source df loss. The fix makes sure MergeIntoCommand uses the same spark session / config object. Fixes #2104 I confirmed that #2104 is fixed with the change. I confirmed the following by debug log message without the change: 1. matchedClauses has more columns after processRow 2. row.getBoolean(outputRowEncoder.schema.fields.size) refers random column value (It's Unsafe read) 3. canMergeSchema in MergeIntoCommand is false, it was true in PreprocessTableMerge ## Does this PR introduce _any_ user-facing changes? Yes, fixes the data loss issue Closes #2162 Co-authored-by: Chungmin Lee <[email protected]> Signed-off-by: Johan Lasperas <[email protected]> GitOrigin-RevId: 49acacf8ff1c71d7e6bcb2dc2f709c325211430a
Which Delta project/connector is this regarding?
Description
Fix a data loss bug in MergeIntoCommand.
It's caused by using different spark session config object for PreprocessTableMerge and MergeIntoCommand. In PreprocessTableMerge, the config value is from the spark session which is given for DeltaTable. In MergeIntoSchema it refers SQLConf.get which is from the active session of the current thread. It can be different when user set another new spark session or active session just doesn't set properly before the execution.
If source dataframe has more columns than target table, auto schema merge feature adds additional nullable column to
target table schema. The updated output projection built in PreprocessTableMerge, so
matchedClauses
andnotMatchedClauses
contains the addtional columns, but target table schema in MergeIntoCommand doesn't have it.As a result, the following index doesn't indicate the delete flag column index, which is
numFields - 2
.row.getBoolean returns
getByte() != 0
, which causes dropping rows randomly.Also as autoMerge doesn't work
The fix makes sure setting active session as the given spark session for target table. The PR applies the fix for other command to avoid any inconsistent config behavior.
Fixes #2104
How was this patch tested?
I confirmed that #2104 is fixed with the change.
I confirmed the following by debug log message without the change:
Does this PR introduce any user-facing changes?
Yes, fixes the data loss issue