Michael Chen created SPARK-44897:
------------------------------------
Summary: Local Property Propagation to Subquery Broadcast Exec
Key: SPARK-44897
URL: https://issues.apache.org/jira/browse/SPARK-44897
Project: Spark
Issue Type: Bug
Components: SQL
Affects Versions: 3.4.0
Reporter: Michael Chen
https://issues.apache.org/jira/browse/SPARK-32748 was opened and then I believe
mistakenly reverted to address this issue. The claim was local properties
propagation in SubqueryBroadcastExec to the dynamic pruning thread is not
necessary because they will be propagated by broadcast threads anyways.
However, in a scenario where the dynamic pruning thread is first to initialize
the broadcast relation future, the local properties will not be propagated
correctly. This is because the local properties being propagated to the
broadcast threads would already be incorrect.
I do not have a good way of reproducing this consistently because generally the
SubqueryBroadcastExec is not the first to initialize the broadcast relation
future, but by adding a Thread.sleep(1) into the doPrepare method of
SubqueryBroadcastExec, the following test always fails.
{code:java}
withSQLConf(StaticSQLConf.SUBQUERY_BROADCAST_MAX_THREAD_THRESHOLD.key -> "1") {
withTable("a", "b") {
val confKey = "spark.sql.y"
val confValue1 = UUID.randomUUID().toString()
val confValue2 = UUID.randomUUID().toString()
Seq((confValue1, "1")).toDF("key", "value")
.write
.format("parquet")
.partitionBy("key")
.mode("overwrite")
.saveAsTable("a")
val df1 = spark.table("a")
def generateBroadcastDataFrame(confKey: String, confValue: String):
Dataset[String] = {
val df = spark.range(1).mapPartitions { _ =>
Iterator(TaskContext.get.getLocalProperty(confKey))
}.filter($"value".contains(confValue)).as("c")
df.hint("broadcast")
}
// set local property and assert
val df2 = generateBroadcastDataFrame(confKey, confValue1)
spark.sparkContext.setLocalProperty(confKey, confValue1)
val checkDF = df1.join(df2).where($"a.key" === $"c.value").select($"a.key",
$"c.value")
val checks = checkDF.collect()
assert(checks.forall(_.toSeq == Seq(confValue1, confValue1)))
// change local property and re-assert
Seq((confValue2, "1")).toDF("key", "value")
.write
.format("parquet")
.partitionBy("key")
.mode("overwrite")
.saveAsTable("b")
val df3 = spark.table("b")
val df4 = generateBroadcastDataFrame(confKey, confValue2)
spark.sparkContext.setLocalProperty(confKey, confValue2)
val checks2DF = df3.join(df4).where($"b.key" ===
$"c.value").select($"b.key", $"c.value")
val checks2 = checks2DF.collect()
assert(checks2.forall(_.toSeq == Seq(confValue2, confValue2)))
assert(checks2.nonEmpty)
}
} {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]