This is an automated email from the ASF dual-hosted git repository.
kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new c1ed3e60e67f [SPARK-46676][SS] dropDuplicatesWithinWatermark should
not fail on canonicalization of the plan
c1ed3e60e67f is described below
commit c1ed3e60e67f53bb323e2b9fa47789fcde70a75a
Author: Jungtaek Lim <[email protected]>
AuthorDate: Fri Jan 19 11:38:53 2024 +0900
[SPARK-46676][SS] dropDuplicatesWithinWatermark should not fail on
canonicalization of the plan
### What changes were proposed in this pull request?
This PR proposes to fix the bug on canonicalizing the plan which contains
the physical node of dropDuplicatesWithinWatermark
(`StreamingDeduplicateWithinWatermarkExec`).
### Why are the changes needed?
Canonicalization of the plan will replace the expressions (including
attributes) to remove out cosmetic, including name, "and metadata", which
denotes the event time column marker.
StreamingDeduplicateWithinWatermarkExec assumes that the input attributes
of child node contain the event time column, and it is determined at the
initialization of the node instance. Once canonicalization is being triggered,
child node will lose the notion of event time column from its attributes, and
copy of StreamingDeduplicateWithinWatermarkExec will be performed which
instantiating a new node of `StreamingDeduplicateWithinWatermarkExec` with new
child node, which no longer has an [...]
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
New UT added.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #44688 from HeartSaVioR/SPARK-46676.
Authored-by: Jungtaek Lim <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
---
.../sql/execution/streaming/statefulOperators.scala | 10 +++++++---
...StreamingDeduplicationWithinWatermarkSuite.scala | 21 +++++++++++++++++++++
2 files changed, 28 insertions(+), 3 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
index 8cb99a162ab2..c8a55ed679d0 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
@@ -1097,10 +1097,14 @@ case class StreamingDeduplicateWithinWatermarkExec(
protected val extraOptionOnStateStore: Map[String, String] = Map.empty
- private val eventTimeCol: Attribute =
WatermarkSupport.findEventTimeColumn(child.output,
+ // Below three variables are defined as lazy, as evaluating these variables
does not work with
+ // canonicalized plan. Specifically, attributes in child won't have an event
time column in
+ // the canonicalized plan. These variables are NOT referenced in
canonicalized plan, hence
+ // defining these variables as lazy would avoid such error.
+ private lazy val eventTimeCol: Attribute =
WatermarkSupport.findEventTimeColumn(child.output,
allowMultipleEventTimeColumns = false).get
- private val delayThresholdMs =
eventTimeCol.metadata.getLong(EventTimeWatermark.delayKey)
- private val eventTimeColOrdinal: Int = child.output.indexOf(eventTimeCol)
+ private lazy val delayThresholdMs =
eventTimeCol.metadata.getLong(EventTimeWatermark.delayKey)
+ private lazy val eventTimeColOrdinal: Int =
child.output.indexOf(eventTimeCol)
protected def initializeReusedDupInfoRow(): Option[UnsafeRow] = {
val timeoutToUnsafeRow = UnsafeProjection.create(schemaForValueRow)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationWithinWatermarkSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationWithinWatermarkSuite.scala
index 595fc1cb9cea..9a02ab3df7dd 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationWithinWatermarkSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationWithinWatermarkSuite.scala
@@ -199,4 +199,25 @@ class StreamingDeduplicationWithinWatermarkSuite extends
StateStoreMetricsTest {
)
}
}
+
+ test("SPARK-46676: canonicalization of
StreamingDeduplicateWithinWatermarkExec should work") {
+ withTempDir { checkpoint =>
+ val dedupeInputData = MemoryStream[(String, Int)]
+ val dedupe = dedupeInputData.toDS()
+ .withColumn("eventTime", timestamp_seconds($"_2"))
+ .withWatermark("eventTime", "10 second")
+ .dropDuplicatesWithinWatermark("_1")
+ .select($"_1", $"eventTime".cast("long").as[Long])
+
+ testStream(dedupe, Append)(
+ StartStream(checkpointLocation = checkpoint.getCanonicalPath),
+ AddData(dedupeInputData, "a" -> 1),
+ CheckNewAnswer("a" -> 1),
+ Execute { q =>
+ // This threw out error before SPARK-46676.
+ q.lastExecution.executedPlan.canonicalized
+ }
+ )
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]