This is an automated email from the ASF dual-hosted git repository.
dtenedor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 90d09027dcf7 [SPARK-57193][SQL] Refactor some helpers out of
TimeWindowResolution
90d09027dcf7 is described below
commit 90d09027dcf78976caf6bba64136bf35579d13c3
Author: Vladimir Golubev <[email protected]>
AuthorDate: Mon Jun 1 09:46:12 2026 -0700
[SPARK-57193][SQL] Refactor some helpers out of TimeWindowResolution
### What changes were proposed in this pull request?
Refactor some helpers out of `TimeWindowResolution`.
### Why are the changes needed?
To reuse in the single-pass Analyzer.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing tests.
### Was this patch authored or co-authored using generative AI tooling?
Claude.
Closes #56246 from
vladimirg-db/vladimir-golubev_data/time-window-resolution-reusable-helpers.
Authored-by: Vladimir Golubev <[email protected]>
Signed-off-by: Daniel Tenedorio <[email protected]>
---
.../catalyst/analysis/TimeWindowResolution.scala | 64 +++++++++++++---------
1 file changed, 39 insertions(+), 25 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TimeWindowResolution.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TimeWindowResolution.scala
index 8dbe6ed44d1c..63ff894c48d4 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TimeWindowResolution.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TimeWindowResolution.scala
@@ -35,10 +35,10 @@ object TimeWindowResolution {
final val WINDOW_COL_NAME = "window"
final val SESSION_COL_NAME = "session_window"
- private final val WINDOW_START = "start"
- private final val WINDOW_END = "end"
- private final val SESSION_START = "start"
- private final val SESSION_END = "end"
+ final val WINDOW_START = "start"
+ final val WINDOW_END = "end"
+ final val SESSION_START = "start"
+ final val SESSION_END = "end"
/**
* Synthesizes the [[Project]]/[[Expand]]+[[Filter]] sub-plan for a resolved
[[TimeWindow]] and
@@ -90,8 +90,7 @@ object TimeWindowResolution {
Project(windowStruct +: child.output,
Filter(filterExpr, child))
} else {
- val overlappingWindows =
- math.ceil(window.windowDuration * 1.0 / window.slideDuration).toInt
+ val overlappingWindows = overlappingWindowCount(window)
val windows =
Seq.tabulate(overlappingWindows)(i =>
getWindow(i, window.timeColumn.dataType))
@@ -165,12 +164,7 @@ object TimeWindowResolution {
val sessionStruct = Alias(literalSessionStruct, SESSION_COL_NAME)(
exprId = sessionAttr.exprId, explicitMetadata = Some(newMetadata))
- val filterByTimeRange = if (gapDuration.foldable) {
- val interval = gapDuration.eval().asInstanceOf[CalendarInterval]
- interval == null || interval.months + interval.days +
interval.microseconds <= 0
- } else {
- true
- }
+ val filterByTimeRange = sessionFilterByTimeRange(gapDuration)
// As same as tumbling window, we add a filter to filter out nulls.
// And we also filter out events with negative or zero or invalid gap
duration.
@@ -222,19 +216,7 @@ object TimeWindowResolution {
val attr = AttributeReference(colName, windowTime.dataType, metadata =
newMetadata)()
- // NOTE: "window.end" is "exclusive" upper bound of window, so if we use
this value as
- // it is, it is going to be bound to the different window even if we
apply the same window
- // spec. Decrease 1 microsecond from window.end to let the window_time
be bound to the
- // correct window range.
- val subtractExpr =
- PreciseTimestampConversion(
- Subtract(PreciseTimestampConversion(
- GetStructField(windowTime.windowColumn, 1),
- windowTime.dataType, LongType), Literal(1L)),
- LongType,
- windowTime.dataType)
-
- val newColumn = Alias(subtractExpr, colName)(
+ val newColumn = Alias(windowTimeExtractionExpression(windowTime),
colName)(
exprId = attr.exprId, explicitMetadata = Some(newMetadata))
windowTime -> (attr, newColumn)
@@ -250,4 +232,36 @@ object TimeWindowResolution {
(windowTimeToAttr, newChild)
}
+
+ /**
+ * Builds the expression extracting a [[WindowTime]]'s timestamp: the last
microsecond of the
+ * source window (`window.end - 1us`). `window.end` is the exclusive upper
bound, so using it
+ * as-is would bind the result to the next window under the same window spec.
+ */
+ def windowTimeExtractionExpression(windowTime: WindowTime): Expression =
+ PreciseTimestampConversion(
+ Subtract(
+ PreciseTimestampConversion(
+ GetStructField(windowTime.windowColumn, 1),
+ windowTime.dataType,
+ LongType),
+ Literal(1L)),
+ LongType,
+ windowTime.dataType)
+
+ /** Number of overlapping sliding windows a single row can fall into. */
+ def overlappingWindowCount(window: TimeWindow): Int =
+ math.ceil(window.windowDuration * 1.0 / window.slideDuration).toInt
+
+ /**
+ * Whether the session-window filter must also drop empty windows (`end <=
start`): true when the
+ * gap is non-foldable (unknown at plan time) or a foldable gap is null or
non-positive.
+ */
+ def sessionFilterByTimeRange(gapDuration: Expression): Boolean =
+ if (gapDuration.foldable) {
+ val interval = gapDuration.eval().asInstanceOf[CalendarInterval]
+ interval == null || interval.months + interval.days +
interval.microseconds <= 0
+ } else {
+ true
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]