This is an automated email from the ASF dual-hosted git repository.
kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 45c0a469890e [SPARK-53941][SS] Support AQE in stateless streaming
workloads
45c0a469890e is described below
commit 45c0a469890e9ba2a981985e7fd442c00340abe2
Author: Jungtaek Lim <[email protected]>
AuthorDate: Sat Oct 25 13:11:47 2025 +0900
[SPARK-53941][SS] Support AQE in stateless streaming workloads
### What changes were proposed in this pull request?
This PR proposes to support AQE in stateless streaming workloads. We have
been disabling it due to the incompatibility with stateful operator, but it's
arguably too restricted given shuffles are still triggered in stateless
streaming workloads and stream-static join can benefit with it.
Note that AQE performs re-optimization which replans via reapplying
optimization and physical planning against a logical link (optimized plan) for
stage. IncrementalExecution instance may not be available during AQE
re-optimization (e.g. ForeachBatch sink), hence streaming specific physical
planning rules aren't compatible with AQE re-optimization. These rules are
reserved for initializing stateful operators, hence stateless workloads are
still safe to apply full phase of AQE.
Worth mentioning that AQE is not enabled for 1) continuous mode (and
upcoming real time mode) 2) stateful workloads.
* continuous mode (and real time mode): AQE doesn't make sense for
continuous and real time mode since stages run concurrently.
* stateful workloads: AQE can't change the number of partitions in stateful
operator, and even if it's changeable, repartitioning state would cost a lot
and we shouldn't decide it per batch based on specific batch's data
distribution.
### Why are the changes needed?
There are still various cases where stateless operators trigger shuffle
(e.g. stream-static join), and these operators have the same characteristic
with batch query which AQE has been battle tested and proved its usefulness for
a long time.
### Does this PR introduce _any_ user-facing change?
Yes, AQE will be enabled in stateless streaming workloads. Given that AQE
is set to true by default, stateless streaming queries will take effect,
regardless whether the query starts with new Spark version, or being upgraded
from old Spark version. This PR also updates this to the migration guide.
### How was this patch tested?
Existing tests will run with AQE enabled if the query is stateless.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #52642 from HeartSaVioR/WIP-AQE-in-stateless-streaming-query.
Authored-by: Jungtaek Lim <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
---
.../java/org/apache/spark/internal/LogKeys.java | 1 +
docs/streaming/ss-migration-guide.md | 4 +
.../apache/spark/sql/execution/SparkPlanner.scala | 4 +-
.../spark/sql/execution/SparkStrategies.scala | 20 ++--
.../adaptive/InsertAdaptiveSparkPlan.scala | 14 ++-
.../StreamingQueryPlanTraverseHelper.scala | 61 ++++++++++
.../streaming/runtime/IncrementalExecution.scala | 12 +-
.../streaming/runtime/MicroBatchExecution.scala | 33 +++++-
.../streaming/runtime/ProgressReporter.scala | 36 +++---
.../streaming/runtime/StreamExecution.scala | 9 +-
.../streaming/runtime/WatermarkPropagator.scala | 111 ++++++++++--------
.../streaming/runtime/WatermarkTracker.scala | 9 +-
.../WriteDistributionAndOrderingSuite.scala | 54 ++++++++-
.../adaptive/AdaptiveQueryExecSuite.scala | 128 +++++++++++++++++++++
.../apache/spark/sql/streaming/StreamSuite.scala | 35 +-----
.../spark/sql/streaming/StreamingQuerySuite.scala | 5 +-
16 files changed, 411 insertions(+), 125 deletions(-)
diff --git
a/common/utils-java/src/main/java/org/apache/spark/internal/LogKeys.java
b/common/utils-java/src/main/java/org/apache/spark/internal/LogKeys.java
index e90683a20575..baa9b4f35db4 100644
--- a/common/utils-java/src/main/java/org/apache/spark/internal/LogKeys.java
+++ b/common/utils-java/src/main/java/org/apache/spark/internal/LogKeys.java
@@ -44,6 +44,7 @@ public enum LogKeys implements LogKey {
APP_ID,
APP_NAME,
APP_STATE,
+ AQE_PLAN,
ARCHIVE_NAME,
ARGS,
ARTIFACTS,
diff --git a/docs/streaming/ss-migration-guide.md
b/docs/streaming/ss-migration-guide.md
index 548b09313eac..dc0a3a0ef2ba 100644
--- a/docs/streaming/ss-migration-guide.md
+++ b/docs/streaming/ss-migration-guide.md
@@ -23,6 +23,10 @@ Note that this migration guide describes the items specific
to Structured Stream
Many items of SQL migration can be applied when migrating Structured Streaming
to higher versions.
Please refer [Migration Guide: SQL, Datasets and
DataFrame](../sql-migration-guide.html).
+## Upgrading from Structured Streaming 4.0 to 4.1
+
+- Since Spark 4.1, AQE is supported for stateless workloads, and it could
affect the behavior of the query after upgrade (especially since AQE is turned
on by default). In general, it helps to achieve better performance including
resolution of skewed partition, but you can turn off AQE via changing
`spark.sql.adaptive.enabled` to `false` to restore the behavior if you see
regression.
+
## Upgrading from Structured Streaming 3.5 to 4.0
- Since Spark 4.0, Spark falls back to single batch execution if any source in
the query does not support `Trigger.AvailableNow`. This is to avoid any
possible correctness, duplication, and dataloss issue due to incompatibility
between source and wrapper implementation. (See
[SPARK-45178](https://issues.apache.org/jira/browse/SPARK-45178) for more
details.)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala
index 205dd5381cdb..7e7f83903717 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala
@@ -52,7 +52,9 @@ class SparkPlanner(val session: SparkSession, val
experimentalMethods: Experimen
InMemoryScans ::
SparkScripts ::
Pipelines ::
- BasicOperators :: Nil)
+ BasicOperators ::
+ // Need to be here since users can specify withWatermark in stateless
streaming query.
+ EventTimeWatermarkStrategy :: Nil)
/**
* Override to add extra planning strategies to the planner. These
strategies are tried after
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index f76bc911bef8..b487e8fabf8c 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -421,13 +421,7 @@ abstract class SparkStrategies extends
QueryPlanner[SparkPlan] {
}
}
- /**
- * Used to plan streaming aggregation queries that are computed
incrementally as part of a
- * [[org.apache.spark.sql.streaming.StreamingQuery]]. Currently this rule is
injected into the
- * planner on-demand, only when planning in a
- * [[org.apache.spark.sql.execution.streaming.StreamExecution]]
- */
- object StatefulAggregationStrategy extends Strategy {
+ object EventTimeWatermarkStrategy extends Strategy {
override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case _ if !plan.isStreaming => Nil
@@ -445,6 +439,18 @@ abstract class SparkStrategies extends
QueryPlanner[SparkPlan] {
"Please report your query to Spark user mailing list.")
}
UpdateEventTimeColumnExec(columnName, delay.get, None,
planLater(child)) :: Nil
+ }
+ }
+
+ /**
+ * Used to plan streaming aggregation queries that are computed
incrementally as part of a
+ * [[org.apache.spark.sql.streaming.StreamingQuery]]. Currently this rule is
injected into the
+ * planner on-demand, only when planning in a
+ * [[org.apache.spark.sql.execution.streaming.StreamExecution]]
+ */
+ object StatefulAggregationStrategy extends Strategy {
+ override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
+ case _ if !plan.isStreaming => Nil
case PhysicalAggregation(
namedGroupingExpressions, aggregateExpressions,
rewrittenResultExpressions, child) =>
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala
index aa748d8de6dc..89c57e4d6b1a 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala
@@ -32,6 +32,7 @@ import
org.apache.spark.sql.execution.command.{DataWritingCommandExec, ExecutedC
import org.apache.spark.sql.execution.datasources.V1WriteCommand
import org.apache.spark.sql.execution.datasources.v2.V2CommandExec
import org.apache.spark.sql.execution.exchange.Exchange
+import
org.apache.spark.sql.execution.streaming.operators.stateful.StatefulOperator
import org.apache.spark.sql.internal.SQLConf
/**
@@ -55,6 +56,15 @@ case class InsertAdaptiveSparkPlan(
case c: DataWritingCommandExec
if !c.cmd.isInstanceOf[V1WriteCommand] || !conf.plannedWriteEnabled =>
c.copy(child = apply(c.child))
+ // SPARK-53941: Do not apply AQE for stateful streaming workloads. From
recent change of shuffle
+ // origin for shuffle being added from stateful operator, we anticipate
stateful operator to
+ // work with AQE. But we want to make the adoption of AQE be gradual, to
have a risk under
+ // control. Note that we will disable the value of AQE config explicitly
in streaming engine,
+ // but also introduce this pattern here for defensive programming.
+ case _ if plan.exists {
+ case _: StatefulOperator => true
+ case _ => false
+ } => plan
case _ if shouldApplyAQE(plan, isSubquery) =>
if (supportAdaptive(plan)) {
try {
@@ -114,9 +124,7 @@ case class InsertAdaptiveSparkPlan(
}
private def supportAdaptive(plan: SparkPlan): Boolean = {
- sanityCheck(plan) &&
- !plan.logicalLink.exists(_.isStreaming) &&
- plan.children.forall(supportAdaptive)
+ sanityCheck(plan) && plan.children.forall(supportAdaptive)
}
private def sanityCheck(plan: SparkPlan): Boolean =
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryPlanTraverseHelper.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryPlanTraverseHelper.scala
new file mode 100644
index 000000000000..521290fec575
--- /dev/null
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryPlanTraverseHelper.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import org.apache.spark.internal.{Logging, LogKeys}
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
+import org.apache.spark.sql.execution.adaptive.QueryStageExec
+import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
+
+/**
+ * This is an utility object placing methods to traverse the query plan for
streaming query.
+ * This is used for patterns of traversal which are repeated in multiple
places.
+ */
+object StreamingQueryPlanTraverseHelper extends Logging {
+ def collectFromUnfoldedPlan[B](
+ executedPlan: SparkPlan)(
+ pf: PartialFunction[SparkPlan, B]): Seq[B] = {
+ executedPlan.flatMap {
+ // InMemoryTableScanExec is a node to represent a cached plan. The node
has underlying
+ // actual executed plan, which we should traverse to collect the
required information.
+ case s: InMemoryTableScanExec =>
collectFromUnfoldedPlan(s.relation.cachedPlan)(pf)
+
+ // AQE physical node contains the executed plan, pick the plan.
+ // In most cases, AQE physical node is expected to contain the final
plan, which is
+ // appropriate for the caller.
+ // Even it does not contain the final plan (in whatever reason), we just
provide the
+ // plan as best effort, as there is no better way around.
+ case a: AdaptiveSparkPlanExec =>
+ if (!a.isFinalPlan) {
+ logWarning(log"AQE plan is captured, but the executed plan in AQE
plan is not" +
+ log"the final one. Providing incomplete executed plan. AQE plan:
${MDC(
+ LogKeys.AQE_PLAN, a)}")
+ }
+ collectFromUnfoldedPlan(a.executedPlan)(pf)
+
+ // There are several AQE-specific leaf nodes which covers shuffle. We
should pick the
+ // underlying plan of these nodes, since the underlying plan has the
actual executed
+ // nodes which we want to collect metrics.
+ case e: QueryStageExec => collectFromUnfoldedPlan(e.plan)(pf)
+
+ case p if pf.isDefinedAt(p) => Seq(pf(p))
+ case _ => Seq.empty[B]
+ }
+ }
+}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/IncrementalExecution.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/IncrementalExecution.scala
index 0d4b0f0941d9..4f41e8a8be06 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/IncrementalExecution.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/IncrementalExecution.scala
@@ -38,6 +38,7 @@ import
org.apache.spark.sql.execution.aggregate.{HashAggregateExec, MergingSessi
import
org.apache.spark.sql.execution.datasources.v2.state.metadata.StateMetadataPartitionReader
import org.apache.spark.sql.execution.exchange.ShuffleExchangeLike
import
org.apache.spark.sql.execution.python.streaming.{FlatMapGroupsInPandasWithStateExec,
TransformWithStateInPySparkExec}
+import
org.apache.spark.sql.execution.streaming.StreamingQueryPlanTraverseHelper
import
org.apache.spark.sql.execution.streaming.checkpointing.{CheckpointFileManager,
OffsetSeqMetadata}
import
org.apache.spark.sql.execution.streaming.operators.stateful.{SessionWindowStateStoreRestoreExec,
SessionWindowStateStoreSaveExec, StatefulOperator, StatefulOperatorStateInfo,
StateStoreRestoreExec, StateStoreSaveExec, StateStoreWriter,
StreamingDeduplicateExec, StreamingDeduplicateWithinWatermarkExec,
StreamingGlobalLimitExec, StreamingLocalLimitExec, UpdateEventTimeColumnExec}
import
org.apache.spark.sql.execution.streaming.operators.stateful.flatmapgroupswithstate.FlatMapGroupsWithStateExec
@@ -638,10 +639,11 @@ class IncrementalExecution(
def shouldRunAnotherBatch(newMetadata: OffsetSeqMetadata): Boolean = {
val tentativeBatchId = currentBatchId + 1
watermarkPropagator.propagate(tentativeBatchId, executedPlan,
newMetadata.batchWatermarkMs)
- executedPlan.collect {
- case p: StateStoreWriter => p.shouldRunAnotherBatch(
- watermarkPropagator.getInputWatermarkForEviction(tentativeBatchId,
- p.stateInfo.get.operatorId))
- }.exists(_ == true)
+ StreamingQueryPlanTraverseHelper
+ .collectFromUnfoldedPlan(executedPlan) {
+ case p: StateStoreWriter => p.shouldRunAnotherBatch(
+ watermarkPropagator.getInputWatermarkForEviction(tentativeBatchId,
+ p.stateInfo.get.operatorId))
+ }.exists(_ == true)
}
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala
index 174421fcf835..f2760c8914bb 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala
@@ -27,7 +27,7 @@ import org.apache.spark.internal.LogKeys
import org.apache.spark.internal.LogKeys.BATCH_ID
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute,
CurrentBatchTimestamp, CurrentDate, CurrentTimestamp,
FileSourceMetadataAttribute, LocalTimestamp}
-import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, GlobalLimit,
LeafNode, LocalRelation, LogicalPlan, Project, StreamSourceAwareLogicalPlan}
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Deduplicate,
DeduplicateWithinWatermark, Distinct, FlatMapGroupsInPandasWithState,
FlatMapGroupsWithState, GlobalLimit, Join, LeafNode, LocalRelation,
LogicalPlan, Project, StreamSourceAwareLogicalPlan, TransformWithState,
TransformWithStateInPySpark}
import org.apache.spark.sql.catalyst.streaming.{StreamingRelationV2,
WriteToStream}
import org.apache.spark.sql.catalyst.trees.TreePattern.CURRENT_LIKE
import org.apache.spark.sql.catalyst.util.truncatedString
@@ -344,9 +344,40 @@ class MicroBatchExecution(
setLatestExecutionContext(execCtx)
populateStartOffsets(execCtx, sparkSessionForStream)
+
+ // SPARK-53941: This code path is executed for the first batch, regardless
of whether it's a
+ // fresh new run or restart.
+ disableAQESupportInStatelessIfUnappropriated(sparkSessionForStream)
+
logInfo(log"Stream started from ${MDC(LogKeys.STREAMING_OFFSETS_START,
execCtx.startOffsets)}")
execCtx
}
+
+ private def disableAQESupportInStatelessIfUnappropriated(
+ sparkSessionToRunBatches: SparkSession): Unit = {
+ def containsStatefulOperator(p: LogicalPlan): Boolean = {
+ p.exists {
+ case node: Aggregate if node.isStreaming => true
+ case node: Deduplicate if node.isStreaming => true
+ case node: DeduplicateWithinWatermark if node.isStreaming => true
+ case node: Distinct if node.isStreaming => true
+ case node: Join if node.left.isStreaming && node.right.isStreaming =>
true
+ case node: FlatMapGroupsWithState if node.isStreaming => true
+ case node: FlatMapGroupsInPandasWithState if node.isStreaming => true
+ case node: TransformWithState if node.isStreaming => true
+ case node: TransformWithStateInPySpark if node.isStreaming => true
+ case node: GlobalLimit if node.isStreaming => true
+ case _ => false
+ }
+ }
+
+ if (containsStatefulOperator(analyzedPlan)) {
+ // SPARK-53941: We disable AQE for stateful workloads as of now.
+ logWarning(log"Disabling AQE since AQE is not supported in stateful
workloads.")
+
sparkSessionToRunBatches.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key,
"false")
+ }
+ }
+
/**
* Repeatedly attempts to run batches as data arrives.
*/
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/ProgressReporter.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/ProgressReporter.scala
index 19aa068869dc..d02e992fc190 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/ProgressReporter.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/ProgressReporter.scala
@@ -37,6 +37,7 @@ import org.apache.spark.sql.connector.catalog.Table
import org.apache.spark.sql.connector.read.streaming.{MicroBatchStream,
ReportsSinkMetrics, ReportsSourceMetrics, SparkDataStream}
import org.apache.spark.sql.execution.{QueryExecution,
StreamSourceAwareSparkPlan}
import org.apache.spark.sql.execution.datasources.v2.{MicroBatchScanExec,
StreamingDataSourceV2ScanRelation, StreamWriterCommitProgress}
+import
org.apache.spark.sql.execution.streaming.StreamingQueryPlanTraverseHelper
import org.apache.spark.sql.execution.streaming.checkpointing.OffsetSeqMetadata
import
org.apache.spark.sql.execution.streaming.operators.stateful.{EventTimeWatermarkExec,
StateStoreWriter}
import org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef
@@ -443,8 +444,8 @@ abstract class ProgressContext(
val sources = newData.keys.toSet
- val sourceToInputRowsTuples = lastExecution.executedPlan
- .collect {
+ val sourceToInputRowsTuples = StreamingQueryPlanTraverseHelper
+ .collectFromUnfoldedPlan(lastExecution.executedPlan) {
case node: StreamSourceAwareSparkPlan if node.getStream.isDefined =>
val numRows =
node.metrics.get("numOutputRows").map(_.value).getOrElse(0L)
node.getStream.get -> numRows
@@ -502,12 +503,13 @@ abstract class ProgressContext(
// It's possible that multiple DataSourceV2ScanExec instances may refer
to the same source
// (can happen with self-unions or self-joins). This means the source is
scanned multiple
// times in the query, we should count the numRows for each scan.
- val sourceToInputRowsTuples = lastExecution.executedPlan.collect {
- case s: MicroBatchScanExec =>
- val numRows =
s.metrics.get("numOutputRows").map(_.value).getOrElse(0L)
- val source = s.stream
- source -> numRows
- }
+ val sourceToInputRowsTuples = StreamingQueryPlanTraverseHelper
+ .collectFromUnfoldedPlan(lastExecution.executedPlan) {
+ case s: MicroBatchScanExec =>
+ val numRows =
s.metrics.get("numOutputRows").map(_.value).getOrElse(0L)
+ val source = s.stream
+ source -> numRows
+ }
logDebug("Source -> # input rows\n\t" +
sourceToInputRowsTuples.mkString("\n\t"))
sumRows(sourceToInputRowsTuples)
} else {
@@ -544,7 +546,10 @@ abstract class ProgressContext(
val finalLogicalPlan = unrollCTE(lastExecution.logical)
val allLogicalPlanLeaves = finalLogicalPlan.collectLeaves() // includes
non-streaming
- val allExecPlanLeaves = lastExecution.executedPlan.collectLeaves()
+ val allExecPlanLeaves = StreamingQueryPlanTraverseHelper
+ .collectFromUnfoldedPlan(lastExecution.executedPlan) {
+ case p if p.children.isEmpty => p
+ }
if (allLogicalPlanLeaves.size == allExecPlanLeaves.size) {
val execLeafToSource =
allLogicalPlanLeaves.zip(allExecPlanLeaves).flatMap {
case (_, ep: MicroBatchScanExec) =>
@@ -580,10 +585,11 @@ abstract class ProgressContext(
private def extractStateOperatorMetrics(
lastExecution: IncrementalExecution): Seq[StateOperatorProgress] = {
assert(lastExecution != null, "lastExecution is not available")
- lastExecution.executedPlan.collect {
- case p if p.isInstanceOf[StateStoreWriter] =>
- p.asInstanceOf[StateStoreWriter].getProgress()
- }
+ StreamingQueryPlanTraverseHelper
+ .collectFromUnfoldedPlan(lastExecution.executedPlan) {
+ case p if p.isInstanceOf[StateStoreWriter] =>
+ p.asInstanceOf[StateStoreWriter].getProgress()
+ }
}
/** Extracts statistics from the most recent query execution. */
@@ -609,8 +615,8 @@ abstract class ProgressContext(
return ExecutionStats(Map.empty, stateOperators, watermarkTimestamp,
sinkOutput)
}
- val eventTimeStats = lastExecution.executedPlan
- .collect {
+ val eventTimeStats = StreamingQueryPlanTraverseHelper
+ .collectFromUnfoldedPlan(lastExecution.executedPlan) {
case e: EventTimeWatermarkExec if e.eventTimeStats.value.count > 0 =>
val stats = e.eventTimeStats.value
Map(
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/StreamExecution.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/StreamExecution.scala
index 17095bcabf32..b5e51bc8b54d 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/StreamExecution.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/StreamExecution.scala
@@ -43,6 +43,7 @@ import org.apache.spark.sql.connector.read.streaming.{Offset
=> OffsetV2, ReadLi
import org.apache.spark.sql.connector.write.{LogicalWriteInfoImpl,
SupportsTruncate, Write}
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.command.StreamingExplainCommand
+import org.apache.spark.sql.execution.streaming.ContinuousTrigger
import
org.apache.spark.sql.execution.streaming.checkpointing.{CheckpointFileManager,
CommitLog, OffsetSeqLog, OffsetSeqMetadata}
import
org.apache.spark.sql.execution.streaming.operators.stateful.{StatefulOperator,
StateStoreWriter}
import
org.apache.spark.sql.execution.streaming.sources.{ForeachBatchUserFuncException,
ForeachUserFuncException}
@@ -304,8 +305,6 @@ abstract class StreamExecution(
// While active, repeatedly attempt to run batches.
sparkSessionForStream.withActive {
- // Adaptive execution can change num shuffle partitions, disallow
- sparkSessionForStream.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key,
"false")
// Disable cost-based join optimization as we do not want stateful
operations
// to be rearranged
sparkSessionForStream.conf.set(SQLConf.CBO_ENABLED.key, "false")
@@ -315,6 +314,12 @@ abstract class StreamExecution(
sparkSessionForStream.conf.set(SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_DISTRIBUTION.key,
"false")
+ if (trigger.isInstanceOf[ContinuousTrigger]) {
+ // SPARK-53941: AQE does not make sense for continuous processing,
disable it.
+ logWarning("Disabling AQE since the query runs with continuous
mode.")
+
sparkSessionForStream.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "false")
+ }
+
getLatestExecutionContext().updateStatusMessage("Initializing sources")
// force initialization of the logical plan so that the sources can be
created
logicalPlan
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/WatermarkPropagator.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/WatermarkPropagator.scala
index b3d5baf0b5af..7c78069d0858 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/WatermarkPropagator.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/WatermarkPropagator.scala
@@ -24,6 +24,8 @@ import scala.collection.mutable
import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec,
QueryStageExec}
+import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
import
org.apache.spark.sql.execution.streaming.operators.stateful.{EventTimeWatermarkExec,
StateStoreWriter}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.util.Utils
@@ -173,11 +175,21 @@ class PropagateWatermarkSimulator extends
WatermarkPropagator with Logging {
private def getInputWatermarks(
node: SparkPlan,
nodeToOutputWatermark: mutable.Map[Int, Option[Long]]): Seq[Long] = {
+ def watermarkForChild(child: SparkPlan): Option[Long] = {
+ child match {
+ case s: InMemoryTableScanExec =>
watermarkForChild(s.relation.cachedPlan)
+ case a: AdaptiveSparkPlanExec => watermarkForChild(a.executedPlan)
+ case e: QueryStageExec => watermarkForChild(e.plan)
+ case _ =>
+ nodeToOutputWatermark.getOrElse(child.id, {
+ throw new IllegalStateException(
+ s"watermark for the node ${child.id} should be registered")
+ })
+ }
+ }
+
node.children.flatMap { child =>
- nodeToOutputWatermark.getOrElse(child.id, {
- throw new IllegalStateException(
- s"watermark for the node ${child.id} should be registered")
- })
+ watermarkForChild(child)
// Since we use flatMap here, this will exclude children from watermark
calculation
// which don't have watermark information.
}
@@ -187,52 +199,55 @@ class PropagateWatermarkSimulator extends
WatermarkPropagator with Logging {
val nodeToOutputWatermark = mutable.HashMap[Int, Option[Long]]()
val nextStatefulOperatorToWatermark = mutable.HashMap[Long, Option[Long]]()
- // This calculation relies on post-order traversal of the query plan.
- plan.transformUp {
- case node: EventTimeWatermarkExec =>
- val inputWatermarks = getInputWatermarks(node, nodeToOutputWatermark)
- if (inputWatermarks.nonEmpty) {
- throw new AnalysisException(
- errorClass = "_LEGACY_ERROR_TEMP_3076",
- messageParameters = Map("config" ->
SQLConf.STATEFUL_OPERATOR_ALLOW_MULTIPLE.key))
- }
-
- nodeToOutputWatermark.put(node.id, Some(originWatermark))
- node
-
- case node: StateStoreWriter =>
- val stOpId = node.stateInfo.get.operatorId
-
- val inputWatermarks = getInputWatermarks(node, nodeToOutputWatermark)
-
- val finalInputWatermarkMs = if (inputWatermarks.nonEmpty) {
- Some(inputWatermarks.min)
- } else {
- // We can't throw exception here, as we allow stateful operator to
process without
- // watermark. E.g. streaming aggregation with update/complete mode.
- None
- }
-
- val outputWatermarkMs = finalInputWatermarkMs.flatMap { wm =>
- node.produceOutputWatermark(wm)
- }
- nodeToOutputWatermark.put(node.id, outputWatermarkMs)
- nextStatefulOperatorToWatermark.put(stOpId, finalInputWatermarkMs)
- node
-
- case node =>
- // pass-through, but also consider multiple children like the case of
union
- val inputWatermarks = getInputWatermarks(node, nodeToOutputWatermark)
- val finalInputWatermarkMs = if (inputWatermarks.nonEmpty) {
- Some(inputWatermarks.min)
- } else {
- None
- }
-
- nodeToOutputWatermark.put(node.id, finalInputWatermarkMs)
- node
+ def traverseAndSimulate(p: SparkPlan): Unit = {
+ p.foreachUp {
+ case s: InMemoryTableScanExec =>
traverseAndSimulate(s.relation.cachedPlan)
+ case a: AdaptiveSparkPlanExec => traverseAndSimulate(a.executedPlan)
+ case e: QueryStageExec => traverseAndSimulate(e.plan)
+ case node: EventTimeWatermarkExec =>
+ val inputWatermarks = getInputWatermarks(node, nodeToOutputWatermark)
+ if (inputWatermarks.nonEmpty) {
+ throw new AnalysisException(
+ errorClass = "_LEGACY_ERROR_TEMP_3076",
+ messageParameters = Map("config" ->
SQLConf.STATEFUL_OPERATOR_ALLOW_MULTIPLE.key))
+ }
+
+ nodeToOutputWatermark.put(node.id, Some(originWatermark))
+
+ case node: StateStoreWriter =>
+ val stOpId = node.stateInfo.get.operatorId
+
+ val inputWatermarks = getInputWatermarks(node, nodeToOutputWatermark)
+
+ val finalInputWatermarkMs = if (inputWatermarks.nonEmpty) {
+ Some(inputWatermarks.min)
+ } else {
+ // We can't throw exception here, as we allow stateful operator to
process without
+ // watermark. E.g. streaming aggregation with update/complete mode.
+ None
+ }
+
+ val outputWatermarkMs = finalInputWatermarkMs.flatMap { wm =>
+ node.produceOutputWatermark(wm)
+ }
+ nodeToOutputWatermark.put(node.id, outputWatermarkMs)
+ nextStatefulOperatorToWatermark.put(stOpId, finalInputWatermarkMs)
+
+ case node =>
+ // pass-through, but also consider multiple children like the case
of union
+ val inputWatermarks = getInputWatermarks(node, nodeToOutputWatermark)
+ val finalInputWatermarkMs = if (inputWatermarks.nonEmpty) {
+ Some(inputWatermarks.min)
+ } else {
+ None
+ }
+
+ nodeToOutputWatermark.put(node.id, finalInputWatermarkMs)
+ }
}
+ traverseAndSimulate(plan)
+
inputWatermarks.put(batchId, nextStatefulOperatorToWatermark.toMap)
batchIdToWatermark.put(batchId, originWatermark)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/WatermarkTracker.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/WatermarkTracker.scala
index 6d94630d8c3b..8248e00ef822 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/WatermarkTracker.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/WatermarkTracker.scala
@@ -26,6 +26,7 @@ import org.apache.spark.internal.LogKeys._
import org.apache.spark.sql.RuntimeConfig
import org.apache.spark.sql.catalyst.plans.logical.{EventTimeWatermark,
LogicalPlan}
import org.apache.spark.sql.execution.SparkPlan
+import
org.apache.spark.sql.execution.streaming.StreamingQueryPlanTraverseHelper
import
org.apache.spark.sql.execution.streaming.operators.stateful.EventTimeWatermarkExec
import org.apache.spark.sql.internal.SQLConf
@@ -103,9 +104,11 @@ class WatermarkTracker(
}
def updateWatermark(executedPlan: SparkPlan): Unit = synchronized {
- val watermarkOperators = executedPlan.collect {
- case e: EventTimeWatermarkExec => e
- }
+ val watermarkOperators = StreamingQueryPlanTraverseHelper
+ .collectFromUnfoldedPlan(executedPlan) {
+ case e: EventTimeWatermarkExec =>
+ e
+ }
if (watermarkOperators.isEmpty) return
watermarkOperators.foreach {
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/WriteDistributionAndOrderingSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/WriteDistributionAndOrderingSuite.scala
index ff6a215496be..7c4852c5e22d 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/WriteDistributionAndOrderingSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/WriteDistributionAndOrderingSuite.scala
@@ -87,11 +87,25 @@ class WriteDistributionAndOrderingSuite extends
DistributionAndOrderingSuiteBase
}
test("ordered distribution and sort with same exprs: micro-batch append") {
- checkOrderedDistributionAndSortWithSameExprs(microBatchPrefix + "append")
+ // SPARK-53941: Once AQE is enabled, the optimization kicks in and the
write distribution
+ // can be adjusted by AQE. There is a logic for batch query to consider
the AQE optimization
+ // while verifying the write distribution, but that seems to be quite
complicated and we
+ // should not block the code change by updating the tests to deal with AQE
optimization.
+ // TODO: Update the test to reflect optimization from AQE.
+ withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
+ checkOrderedDistributionAndSortWithSameExprs(microBatchPrefix + "append")
+ }
}
test("ordered distribution and sort with same exprs: micro-batch update") {
- checkOrderedDistributionAndSortWithSameExprs(microBatchPrefix + "update")
+ // SPARK-53941: Once AQE is enabled, the optimization kicks in and the
write distribution
+ // can be adjusted by AQE. There is a logic for batch query to consider
the AQE optimization
+ // while verifying the write distribution, but that seems to be quite
complicated and we
+ // should not block the code change by updating the tests to deal with AQE
optimization.
+ // TODO: Update the test to reflect optimization from AQE.
+ withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
+ checkOrderedDistributionAndSortWithSameExprs(microBatchPrefix + "update")
+ }
}
test("ordered distribution and sort with same exprs: micro-batch complete") {
@@ -187,11 +201,25 @@ class WriteDistributionAndOrderingSuite extends
DistributionAndOrderingSuiteBase
}
test("clustered distribution and sort with same exprs: micro-batch append") {
- checkClusteredDistributionAndSortWithSameExprs(microBatchPrefix + "append")
+ // SPARK-53941: Once AQE is enabled, the optimization kicks in and the
write distribution
+ // can be adjusted by AQE. There is a logic for batch query to consider
the AQE optimization
+ // while verifying the write distribution, but that seems to be quite
complicated and we
+ // should not block the code change by updating the tests to deal with AQE
optimization.
+ // TODO: Update the test to reflect optimization from AQE.
+ withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
+ checkClusteredDistributionAndSortWithSameExprs(microBatchPrefix +
"append")
+ }
}
test("clustered distribution and sort with same exprs: micro-batch update") {
- checkClusteredDistributionAndSortWithSameExprs(microBatchPrefix + "update")
+ // SPARK-53941: Once AQE is enabled, the optimization kicks in and the
write distribution
+ // can be adjusted by AQE. There is a logic for batch query to consider
the AQE optimization
+ // while verifying the write distribution, but that seems to be quite
complicated and we
+ // should not block the code change by updating the tests to deal with AQE
optimization.
+ // TODO: Update the test to reflect optimization from AQE.
+ withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
+ checkClusteredDistributionAndSortWithSameExprs(microBatchPrefix +
"update")
+ }
}
test("clustered distribution and sort with same exprs: micro-batch
complete") {
@@ -293,11 +321,25 @@ class WriteDistributionAndOrderingSuite extends
DistributionAndOrderingSuiteBase
}
test("clustered distribution and sort with extended exprs: micro-batch
append") {
- checkClusteredDistributionAndSortWithExtendedExprs(microBatchPrefix +
"append")
+ // SPARK-53941: Once AQE is enabled, the optimization kicks in and the
write distribution
+ // can be adjusted by AQE. There is a logic for batch query to consider
the AQE optimization
+ // while verifying the write distribution, but that seems to be quite
complicated and we
+ // should not block the code change by updating the tests to deal with AQE
optimization.
+ // TODO: Update the test to reflect optimization from AQE.
+ withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
+ checkClusteredDistributionAndSortWithExtendedExprs(microBatchPrefix +
"append")
+ }
}
test("clustered distribution and sort with extended exprs: micro-batch
update") {
- checkClusteredDistributionAndSortWithExtendedExprs(microBatchPrefix +
"update")
+ // SPARK-53941: Once AQE is enabled, the optimization kicks in and the
write distribution
+ // can be adjusted by AQE. There is a logic for batch query to consider
the AQE optimization
+ // while verifying the write distribution, but that seems to be quite
complicated and we
+ // should not block the code change by updating the tests to deal with AQE
optimization.
+ // TODO: Update the test to reflect optimization from AQE.
+ withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
+ checkClusteredDistributionAndSortWithExtendedExprs(microBatchPrefix +
"update")
+ }
}
test("clustered distribution and sort with extended exprs: micro-batch
complete") {
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
index de0fde16e5d0..aa361722394b 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
@@ -43,11 +43,14 @@ import
org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec,
ENSURE_REQUIREMENTS, Exchange, REPARTITION_BY_COL, REPARTITION_BY_NUM,
ReusedExchangeExec, ShuffleExchangeExec, ShuffleExchangeLike, ShuffleOrigin}
import org.apache.spark.sql.execution.joins.{BaseJoinExec,
BroadcastHashJoinExec, BroadcastNestedLoopJoinExec, ShuffledHashJoinExec,
ShuffledJoin, SortMergeJoinExec}
import org.apache.spark.sql.execution.metric.SQLShuffleReadMetricsReporter
+import org.apache.spark.sql.execution.streaming.runtime.{MemoryStream,
StreamingQueryWrapper}
+import org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider
import
org.apache.spark.sql.execution.ui.{SparkListenerSQLAdaptiveExecutionUpdate,
SparkListenerSQLAdaptiveSQLMetricUpdates, SparkListenerSQLExecutionStart}
import org.apache.spark.sql.execution.window.WindowExec
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.PartitionOverwriteMode
+import org.apache.spark.sql.streaming.{OutputMode, StatefulProcessor,
TimeMode, TimerValues, TTLConfig, ValueState}
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.test.SQLTestData.TestData
import org.apache.spark.sql.types.{IntegerType, StructType}
@@ -1045,6 +1048,131 @@ class AdaptiveQueryExecSuite
}
}
+ test("aqe in stateless streaming query") {
+ withSQLConf(
+ SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
+ SQLConf.ADAPTIVE_EXECUTION_FORCE_APPLY.key -> "true") {
+ withTempView("test") {
+ val input = MemoryStream[Int]
+ val stream = input.toDF().select(col("value"), (col("value") * 3) as
"three")
+
+ // join a table with a stream
+ val joined = spark.table("testData").join(stream,
Seq("value")).where("three > 40")
+ val query =
joined.writeStream.format("memory").queryName("test").start()
+ input.addData(1, 10, 20, 40, 50)
+ try {
+ query.processAllAvailable()
+ } finally {
+ query.stop()
+ }
+
+ // aqe should be enabled in a stateless streaming query
+ val plan = query.asInstanceOf[StreamingQueryWrapper]
+ .streamingQuery.lastExecution.executedPlan
+ val ret = plan.find {
+ case _: AdaptiveSparkPlanExec | _: QueryStageExec => true
+ case _ => false
+ }
+ assert(ret.nonEmpty,
+ s"expected AQE to take effect but can't find AQE node, plan: $plan")
+ // aqe config should still be enabled
+ assert(query.sparkSession.sessionState.conf.adaptiveExecutionEnabled)
+ }
+ }
+ }
+
+ test("no aqe in stateful streaming query - aggregation") {
+ testNoAqeInStatefulStreamingQuery(OutputMode.Update()) { input =>
+ val stream = input.toDF().select(col("value"), (col("value") * 3) as
"three")
+ stream.groupBy("value").agg(sum("three"))
+ }
+ }
+
+ test("no aqe in stateful streaming query - deduplication") {
+ testNoAqeInStatefulStreamingQuery(OutputMode.Append()) { input =>
+ val stream = input.toDF().select(col("value"), (col("value") * 3) as
"three")
+ stream.dropDuplicates("value", "three")
+ }
+ }
+
+ test("no aqe in stateful streaming query - stream-stream join") {
+ testNoAqeInStatefulStreamingQuery(OutputMode.Append()) { input =>
+ val inputDf = input.toDF()
+ val stream1 = inputDf.select(col("value") as "left", (col("value") * 2)
as "two")
+ val stream2 = inputDf.select(col("value") as "right", (col("value") * 3)
as "three")
+
+ stream1.join(stream2, expr("left = right"))
+ }
+ }
+
+ test("no aqe in stateful streaming query - transformWithState") {
+ class RunningCountStatefulProcessorInt
+ extends StatefulProcessor[Int, Int, (Int, Long)] {
+
+ import implicits._
+ @transient protected var _countState: ValueState[Long] = _
+
+ override def init(
+ outputMode: OutputMode,
+ timeMode: TimeMode): Unit = {
+ _countState = getHandle.getValueState[Long]("countState",
TTLConfig.NONE)
+ }
+
+ override def handleInputRows(
+ key: Int,
+ inputRows: Iterator[Int],
+ timerValues: TimerValues): Iterator[(Int, Long)] = {
+ val count = Option(_countState.get()).getOrElse(0L) + 1
+
+ if (count == 3) {
+ _countState.clear()
+ Iterator.empty
+ } else {
+ _countState.update(count)
+ Iterator((key, count))
+ }
+ }
+ }
+
+ testNoAqeInStatefulStreamingQuery(OutputMode.Append()) { input =>
+ input.toDS()
+ .groupByKey(x => x)
+ .transformWithState(new RunningCountStatefulProcessorInt,
+ TimeMode.None(),
+ OutputMode.Append()
+ ).toDF()
+ }
+ }
+
+ private def testNoAqeInStatefulStreamingQuery(
+ outputMode: OutputMode)(fn: MemoryStream[Int] => DataFrame): Unit = {
+ withSQLConf(
+ SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
+ SQLConf.ADAPTIVE_EXECUTION_FORCE_APPLY.key -> "true",
+ SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
classOf[RocksDBStateStoreProvider].getName) {
+
+ val input = MemoryStream[Int]
+
+ val df = fn(input)
+ val query = df.writeStream.format("noop").outputMode(outputMode).start()
+ input.addData(1, 2, 3, 4, 5)
+ try {
+ query.processAllAvailable()
+ } finally {
+ query.stop()
+ }
+
+ // aqe should not be enabled in a stateful streaming query
+ val plan = query.asInstanceOf[StreamingQueryWrapper]
+ .streamingQuery.lastExecution.executedPlan
+ val ret = plan.find {
+ case _: AdaptiveSparkPlanExec | _: QueryStageExec => true
+ case _ => false
+ }
+ assert(ret.isEmpty)
+ }
+ }
+
test("tree string output") {
withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") {
val df = sql("SELECT * FROM testData join testData2 ON key = a where
value = '1'")
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
index 2ae0de640aaf..11b761f1e4a1 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
@@ -33,7 +33,7 @@ import org.scalatest.time.SpanSugar._
import org.apache.spark.{SparkConf, SparkContext, TaskContext, TestUtils}
import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
import org.apache.spark.sql.{AnalysisException, Encoders, Row, SQLContext,
TestStrategy}
-import org.apache.spark.sql.catalyst.plans.logical.{Range,
RepartitionByExpression}
+import org.apache.spark.sql.catalyst.plans.logical.Range
import org.apache.spark.sql.catalyst.streaming.{InternalOutputModes,
StreamingRelationV2}
import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes
import org.apache.spark.sql.catalyst.util.DateTimeUtils
@@ -1116,7 +1116,7 @@ class StreamSuite extends StreamTest {
)
require(execPlan != null)
- val localLimits = execPlan.collect {
+ val localLimits =
StreamingQueryPlanTraverseHelper.collectFromUnfoldedPlan(execPlan) {
case l: LocalLimitExec => l
case l: StreamingLocalLimitExec => l
}
@@ -1299,37 +1299,6 @@ class StreamSuite extends StreamTest {
}
}
- test("SPARK-34482: correct active SparkSession for logicalPlan") {
- withSQLConf(
- SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
- SQLConf.COALESCE_PARTITIONS_INITIAL_PARTITION_NUM.key -> "10") {
- val df =
spark.readStream.format(classOf[FakeDefaultSource].getName).load()
- var query: StreamExecution = null
- try {
- query =
- df.repartition($"a")
- .writeStream
- .format("memory")
- .queryName("memory")
- .start()
- .asInstanceOf[StreamingQueryWrapper]
- .streamingQuery
- query.awaitInitialization(streamingTimeout.toMillis)
- val plan = query.logicalPlan
- val numPartition = plan
- .find { _.isInstanceOf[RepartitionByExpression] }
- .map(_.asInstanceOf[RepartitionByExpression].numPartitions)
- // Before the fix of SPARK-34482, the numPartition is the value of
- // `COALESCE_PARTITIONS_INITIAL_PARTITION_NUM`.
- assert(numPartition.get ===
spark.sessionState.conf.getConf(SQLConf.SHUFFLE_PARTITIONS))
- } finally {
- if (query != null) {
- query.stop()
- }
- }
- }
- }
-
test("isInterruptionException should correctly unwrap classic py4j
InterruptedException") {
val e1 = new py4j.Py4JException(
"""
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index 82c6f18955af..b0de21a6d9e8 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -627,7 +627,10 @@ class StreamingQuerySuite extends StreamTest with
BeforeAndAfter with Logging wi
// The number of leaves in the trigger's logical plan should be same
as the executed plan.
require(
q.lastExecution.logical.collectLeaves().length ==
- q.lastExecution.executedPlan.collectLeaves().length)
+ StreamingQueryPlanTraverseHelper
+ .collectFromUnfoldedPlan(q.lastExecution.executedPlan) {
+ case n if n.children.isEmpty => n
+ }.length)
val lastProgress = getLastProgressWithData(q)
assert(lastProgress.nonEmpty)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]