This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new f944603 [SPARK-32126][SS] Scope Session.active in IncrementalExecution
f944603 is described below
commit f944603872284c03c557474bb9e816f20094a630
Author: Yuanjian Li <[email protected]>
AuthorDate: Sun Jun 28 21:35:59 2020 -0700
[SPARK-32126][SS] Scope Session.active in IncrementalExecution
### What changes were proposed in this pull request?
The `optimizedPlan` in IncrementalExecution should also be scoped in
`withActive`.
### Why are the changes needed?
Follow-up of SPARK-30798 for the Streaming side.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing UT.
Closes #28936 from xuanyuanking/SPARK-30798-follow.
Authored-by: Yuanjian Li <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala | 2 +-
.../org/apache/spark/sql/execution/streaming/IncrementalExecution.scala | 2 +-
2 files changed, 2 insertions(+), 2 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
index bf60427..791e432 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
@@ -131,7 +131,7 @@ class QueryExecution(
Option(InsertAdaptiveSparkPlan(AdaptiveExecutionContext(sparkSession,
this))))
}
- private def executePhase[T](phase: String)(block: => T): T =
sparkSession.withActive {
+ protected def executePhase[T](phase: String)(block: => T): T =
sparkSession.withActive {
tracker.measurePhase(phase)(block)
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
index 09ae769..7773ac7 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
@@ -76,7 +76,7 @@ class IncrementalExecution(
* with the desired literal
*/
override
- lazy val optimizedPlan: LogicalPlan =
tracker.measurePhase(QueryPlanningTracker.OPTIMIZATION) {
+ lazy val optimizedPlan: LogicalPlan =
executePhase(QueryPlanningTracker.OPTIMIZATION) {
sparkSession.sessionState.optimizer.executeAndTrack(withCachedData,
tracker) transformAllExpressions {
case ts @ CurrentBatchTimestamp(timestamp, _, _) =>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]