This is an automated email from the ASF dual-hosted git repository.
kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 9ffdbc65029a [SPARK-47784][SS] Merge TTLMode and TimeoutMode into a
single TimeMode
9ffdbc65029a is described below
commit 9ffdbc65029a08ce621ca50037683db05dc55761
Author: Bhuwan Sahni <[email protected]>
AuthorDate: Fri Apr 12 13:47:07 2024 +0900
[SPARK-47784][SS] Merge TTLMode and TimeoutMode into a single TimeMode
### What changes were proposed in this pull request?
This PR merges the `TimeoutMode` and `TTLMode` parameter for
`transformWithState` into a single `TimeMode`. Currently, users need to specify
the notion of time (ProcessingTime/EventTime) for timers and ttl separately.
This allows users to use a single parameter.
We do not expect users to use mix/match EventTime/ProcessingTime for timers
and ttl in a single query because it makes hard to reason about the time
semantics (when will timer be fired?, when will the state be evicted? etc.).
Its simpler to stick to one notion of time throughout timers and ttl.
### Why are the changes needed?
Changes are needed to simplify Arbitrary State API `transformWithState`
interface by merging TTLMode/TimeoutMode into a single TimeMode.
### Does this PR introduce _any_ user-facing change?
Yes, this PR changes the API parameters for `transformWithState`.
### How was this patch tested?
All existing testcases for `transformWithState` API pass.
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #45960 from sahnib/introduce-timeMode.
Authored-by: Bhuwan Sahni <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
---
.../src/main/resources/error/error-classes.json | 16 +++---
.../apache/spark/sql/KeyValueGroupedDataset.scala | 38 +++++--------
dev/checkstyle-suppressions.xml | 4 +-
docs/sql-error-conditions.md | 16 +++---
.../sql/streaming/{TTLMode.java => TimeMode.java} | 34 ++++++-----
.../apache/spark/sql/streaming/TimeoutMode.java | 51 -----------------
.../logical/{TTLMode.scala => TimeMode.scala} | 10 ++--
.../logical/TransformWithStateTimeoutModes.scala | 24 --------
.../spark/sql/streaming/StatefulProcessor.scala | 5 +-
.../spark/sql/catalyst/plans/logical/object.scala | 17 ++----
.../apache/spark/sql/KeyValueGroupedDataset.scala | 36 +++++-------
.../spark/sql/execution/SparkStrategies.scala | 9 ++-
.../execution/streaming/ExpiredTimerInfoImpl.scala | 4 +-
.../streaming/StatefulProcessorHandleImpl.scala | 17 +++---
.../sql/execution/streaming/TimerStateImpl.scala | 14 ++---
.../streaming/TransformWithStateExec.scala | 65 +++++++++-------------
.../streaming/state/StateStoreErrors.scala | 37 +++++-------
.../org/apache/spark/sql/JavaDatasetSuite.java | 6 +-
.../apache/spark/sql/TestStatefulProcessor.java | 3 +-
.../sql/TestStatefulProcessorWithInitialState.java | 3 +-
.../execution/streaming/state/ListStateSuite.scala | 14 ++---
.../execution/streaming/state/MapStateSuite.scala | 11 ++--
.../state/StatefulProcessorHandleSuite.scala | 64 ++++++++++-----------
.../sql/execution/streaming/state/TimerSuite.scala | 42 +++++++-------
.../streaming/state/ValueStateSuite.scala | 35 ++++--------
.../streaming/TransformWithListStateSuite.scala | 30 ++++------
.../sql/streaming/TransformWithMapStateSuite.scala | 18 ++----
.../TransformWithStateInitialStateSuite.scala | 25 +++------
.../sql/streaming/TransformWithStateSuite.scala | 61 +++++++-------------
.../TransformWithValueStateTTLSuite.scala | 21 +++----
30 files changed, 267 insertions(+), 463 deletions(-)
diff --git a/common/utils/src/main/resources/error/error-classes.json
b/common/utils/src/main/resources/error/error-classes.json
index 62581116000b..7b13fa4278e4 100644
--- a/common/utils/src/main/resources/error/error-classes.json
+++ b/common/utils/src/main/resources/error/error-classes.json
@@ -3591,21 +3591,15 @@
],
"sqlState" : "0A000"
},
- "STATEFUL_PROCESSOR_CANNOT_ASSIGN_TTL_IN_NO_TTL_MODE" : {
- "message" : [
- "Cannot use TTL for state=<stateName> in NoTTL() mode."
- ],
- "sqlState" : "42802"
- },
"STATEFUL_PROCESSOR_CANNOT_PERFORM_OPERATION_WITH_INVALID_HANDLE_STATE" : {
"message" : [
"Failed to perform stateful processor operation=<operationType> with
invalid handle state=<handleState>."
],
"sqlState" : "42802"
},
- "STATEFUL_PROCESSOR_CANNOT_PERFORM_OPERATION_WITH_INVALID_TIMEOUT_MODE" : {
+ "STATEFUL_PROCESSOR_CANNOT_PERFORM_OPERATION_WITH_INVALID_TIME_MODE" : {
"message" : [
- "Failed to perform stateful processor operation=<operationType> with
invalid timeoutMode=<timeoutMode>"
+ "Failed to perform stateful processor operation=<operationType> with
invalid timeMode=<timeMode>"
],
"sqlState" : "42802"
},
@@ -3615,6 +3609,12 @@
],
"sqlState" : "42802"
},
+ "STATEFUL_PROCESSOR_INCORRECT_TIME_MODE_TO_ASSIGN_TTL" : {
+ "message" : [
+ "Cannot use TTL for state=<stateName> in timeMode=<timeMode>, use
TimeMode.ProcessingTime() instead."
+ ],
+ "sqlState" : "42802"
+ },
"STATEFUL_PROCESSOR_TTL_DURATION_MUST_BE_POSITIVE" : {
"message" : [
"TTL duration must be greater than zero for State store
operation=<operationType> on state=<stateName>."
diff --git
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala
index 39e0c429046d..e38adb9b0b27 100644
---
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala
+++
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala
@@ -29,7 +29,7 @@ import
org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.ProductEncoder
import org.apache.spark.sql.connect.common.UdfUtils
import org.apache.spark.sql.expressions.ScalarUserDefinedFunction
import org.apache.spark.sql.functions.col
-import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout,
OutputMode, StatefulProcessor, StatefulProcessorWithInitialState, TimeoutMode,
TTLMode}
+import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout,
OutputMode, StatefulProcessor, StatefulProcessorWithInitialState, TimeMode}
/**
* A [[Dataset]] has been logically grouped by a user specified grouping key.
Users should not
@@ -827,17 +827,14 @@ class KeyValueGroupedDataset[K, V] private[sql] ()
extends Serializable {
* The type of the output objects. Must be encodable to Spark SQL types.
* @param statefulProcessor
* Instance of statefulProcessor whose functions will be invoked by the
operator.
- * @param timeoutMode
- * The timeout mode of the stateful processor.
- * @param ttlMode
- * The ttlMode to evict user state on ttl expiration.
+ * @param timeMode
+ * The time mode semantics of the stateful processor for timers and TTL.
* @param outputMode
* The output mode of the stateful processor.
*/
def transformWithState[U: Encoder](
statefulProcessor: StatefulProcessor[K, V, U],
- timeoutMode: TimeoutMode,
- ttlMode: TTLMode,
+ timeMode: TimeMode,
outputMode: OutputMode): Dataset[U] = {
throw new UnsupportedOperationException
}
@@ -854,10 +851,8 @@ class KeyValueGroupedDataset[K, V] private[sql] () extends
Serializable {
* The type of the output objects. Must be encodable to Spark SQL types.
* @param statefulProcessor
* Instance of statefulProcessor whose functions will be invoked by the
operator.
- * @param timeoutMode
- * The timeout mode of the stateful processor.
- * @param ttlMode
- * The ttlMode to evict user state on ttl expiration.
+ * @param timeMode
+ * The time mode semantics of the stateful processor for timers and TTL.
* @param outputMode
* The output mode of the stateful processor.
* @param outputEncoder
@@ -865,8 +860,7 @@ class KeyValueGroupedDataset[K, V] private[sql] () extends
Serializable {
*/
def transformWithState[U: Encoder](
statefulProcessor: StatefulProcessor[K, V, U],
- timeoutMode: TimeoutMode,
- ttlMode: TTLMode,
+ timeMode: TimeMode,
outputMode: OutputMode,
outputEncoder: Encoder[U]): Dataset[U] = {
throw new UnsupportedOperationException
@@ -883,10 +877,8 @@ class KeyValueGroupedDataset[K, V] private[sql] () extends
Serializable {
* The type of initial state objects. Must be encodable to Spark SQL types.
* @param statefulProcessor
* Instance of statefulProcessor whose functions will be invoked by the
operator.
- * @param timeoutMode
- * The timeout mode of the stateful processor.
- * @param ttlMode
- * The ttlMode to evict user state on ttl expiration.
+ * @param timeMode
+ * The time mode semantics of the stateful processor for timers and TTL.
* @param outputMode
* The output mode of the stateful processor.
* @param initialState
@@ -897,8 +889,7 @@ class KeyValueGroupedDataset[K, V] private[sql] () extends
Serializable {
*/
def transformWithState[U: Encoder, S: Encoder](
statefulProcessor: StatefulProcessorWithInitialState[K, V, U, S],
- timeoutMode: TimeoutMode,
- ttlMode: TTLMode,
+ timeMode: TimeMode,
outputMode: OutputMode,
initialState: KeyValueGroupedDataset[K, S]): Dataset[U] = {
throw new UnsupportedOperationException
@@ -915,10 +906,8 @@ class KeyValueGroupedDataset[K, V] private[sql] () extends
Serializable {
* The type of initial state objects. Must be encodable to Spark SQL types.
* @param statefulProcessor
* Instance of statefulProcessor whose functions will be invoked by the
operator.
- * @param timeoutMode
- * The timeout mode of the stateful processor.
- * @param ttlMode
- * The ttlMode to evict user state on ttl expiration
+ * @param timeMode
+ * The time mode semantics of the stateful processor for timers and TTL.
* @param outputMode
* The output mode of the stateful processor.
* @param initialState
@@ -933,8 +922,7 @@ class KeyValueGroupedDataset[K, V] private[sql] () extends
Serializable {
*/
private[sql] def transformWithState[U: Encoder, S: Encoder](
statefulProcessor: StatefulProcessorWithInitialState[K, V, U, S],
- timeoutMode: TimeoutMode,
- ttlMode: TTLMode,
+ timeMode: TimeMode,
outputMode: OutputMode,
initialState: KeyValueGroupedDataset[K, S],
outputEncoder: Encoder[U],
diff --git a/dev/checkstyle-suppressions.xml b/dev/checkstyle-suppressions.xml
index 94dfe20af56e..834265f48aa8 100644
--- a/dev/checkstyle-suppressions.xml
+++ b/dev/checkstyle-suppressions.xml
@@ -57,11 +57,9 @@
<suppress checks="MethodName"
files="sql/api/src/main/java/org/apache/spark/sql/streaming/GroupStateTimeout.java"/>
<suppress checks="MethodName"
-
files="sql/api/src/main/java/org/apache/spark/sql/streaming/TimeoutMode.java"/>
+
files="sql/api/src/main/java/org/apache/spark/sql/streaming/TimeMode.java"/>
<suppress checks="MethodName"
files="sql/api/src/main/java/org/apache/spark/sql/streaming/Trigger.java"/>
- <suppress checks="MethodName"
-
files="sql/api/src/main/java/org/apache/spark/sql/streaming/TTLMode.java"/>
<suppress checks="LineLength"
files="src/main/java/org/apache/spark/sql/api/java/*"/>
<suppress checks="IllegalImport"
diff --git a/docs/sql-error-conditions.md b/docs/sql-error-conditions.md
index 827ee04b7606..3f9ae72a50a6 100644
--- a/docs/sql-error-conditions.md
+++ b/docs/sql-error-conditions.md
@@ -2185,23 +2185,17 @@ The SQL config `<sqlConf>` cannot be found. Please
verify that the config exists
Star (*) is not allowed in a select list when GROUP BY an ordinal position is
used.
-### STATEFUL_PROCESSOR_CANNOT_ASSIGN_TTL_IN_NO_TTL_MODE
-
-[SQLSTATE:
42802](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)
-
-Cannot use TTL for state=`<stateName>` in NoTTL() mode.
-
### STATEFUL_PROCESSOR_CANNOT_PERFORM_OPERATION_WITH_INVALID_HANDLE_STATE
[SQLSTATE:
42802](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)
Failed to perform stateful processor operation=`<operationType>` with invalid
handle state=`<handleState>`.
-### STATEFUL_PROCESSOR_CANNOT_PERFORM_OPERATION_WITH_INVALID_TIMEOUT_MODE
+### STATEFUL_PROCESSOR_CANNOT_PERFORM_OPERATION_WITH_INVALID_TIME_MODE
[SQLSTATE:
42802](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)
-Failed to perform stateful processor operation=`<operationType>` with invalid
timeoutMode=`<timeoutMode>`
+Failed to perform stateful processor operation=`<operationType>` with invalid
timeMode=`<timeMode>`
### STATEFUL_PROCESSOR_CANNOT_REINITIALIZE_STATE_ON_KEY
@@ -2209,6 +2203,12 @@ Failed to perform stateful processor
operation=`<operationType>` with invalid ti
Cannot re-initialize state on the same grouping key during initial state
handling for stateful processor. Invalid grouping key=`<groupingKey>`.
+### STATEFUL_PROCESSOR_INCORRECT_TIME_MODE_TO_ASSIGN_TTL
+
+[SQLSTATE:
42802](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)
+
+Cannot use TTL for state=`<stateName>` in timeMode=`<timeMode>`, use
TimeMode.ProcessingTime() instead.
+
### STATEFUL_PROCESSOR_TTL_DURATION_MUST_BE_POSITIVE
[SQLSTATE:
42802](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)
diff --git a/sql/api/src/main/java/org/apache/spark/sql/streaming/TTLMode.java
b/sql/api/src/main/java/org/apache/spark/sql/streaming/TimeMode.java
similarity index 51%
rename from sql/api/src/main/java/org/apache/spark/sql/streaming/TTLMode.java
rename to sql/api/src/main/java/org/apache/spark/sql/streaming/TimeMode.java
index 30594770b3e1..a45a31bd1a05 100644
--- a/sql/api/src/main/java/org/apache/spark/sql/streaming/TTLMode.java
+++ b/sql/api/src/main/java/org/apache/spark/sql/streaming/TimeMode.java
@@ -19,24 +19,32 @@ package org.apache.spark.sql.streaming;
import org.apache.spark.annotation.Evolving;
import org.apache.spark.annotation.Experimental;
-import org.apache.spark.sql.catalyst.plans.logical.*;
+import org.apache.spark.sql.catalyst.plans.logical.EventTime$;
+import org.apache.spark.sql.catalyst.plans.logical.NoTime$;
+import org.apache.spark.sql.catalyst.plans.logical.ProcessingTime$;
/**
- * Represents the type of ttl modes possible for the Dataset operations
- * {@code transformWithState}.
+ * Represents the time modes (used for specifying timers and ttl) possible for
+ * the Dataset operations {@code transformWithState}.
*/
@Experimental
@Evolving
-public class TTLMode {
+public class TimeMode {
- /**
- * Specifies that there is no TTL for the user state. User state would not
- * be cleaned up by Spark automatically.
- */
- public static final TTLMode NoTTL() { return NoTTL$.MODULE$; }
+ /**
+ * Neither timers nor ttl is supported in this mode.
+ */
+ public static final TimeMode None() { return NoTime$.MODULE$; }
- /**
- * Specifies that all ttl durations for user state are in processing time.
- */
- public static final TTLMode ProcessingTimeTTL() { return
ProcessingTimeTTL$.MODULE$; }
+ /**
+ * Stateful processor that uses query processing time to register timers
and
+ * calculate ttl expiration.
+ */
+ public static final TimeMode ProcessingTime() { return
ProcessingTime$.MODULE$; }
+
+ /**
+ * Stateful processor that uses event time to register timers. Note that
ttl is not
+ * supported in this TimeMode.
+ */
+ public static final TimeMode EventTime() { return EventTime$.MODULE$; }
}
diff --git
a/sql/api/src/main/java/org/apache/spark/sql/streaming/TimeoutMode.java
b/sql/api/src/main/java/org/apache/spark/sql/streaming/TimeoutMode.java
deleted file mode 100644
index 68b8134cda6c..000000000000
--- a/sql/api/src/main/java/org/apache/spark/sql/streaming/TimeoutMode.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.streaming;
-
-import org.apache.spark.annotation.Evolving;
-import org.apache.spark.annotation.Experimental;
-import org.apache.spark.sql.catalyst.plans.logical.*;
-
-/**
- * Represents the type of timeouts possible for the Dataset operations
- * {@code transformWithState}.
- */
-@Experimental
-@Evolving
-public class TimeoutMode {
- /**
- * Stateful processor that does not register timers
- */
- public static final TimeoutMode NoTimeouts() {
- return NoTimeouts$.MODULE$;
- }
-
- /**
- * Stateful processor that only registers processing time based timers
- */
- public static final TimeoutMode ProcessingTime() {
- return ProcessingTime$.MODULE$;
- }
-
- /**
- * Stateful processor that only registers event time based timers
- */
- public static final TimeoutMode EventTime() {
- return EventTime$.MODULE$;
- }
-}
diff --git
a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/TTLMode.scala
b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/TimeMode.scala
similarity index 79%
rename from
sql/api/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/TTLMode.scala
rename to
sql/api/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/TimeMode.scala
index be4794a5f40b..b6248e97aa3d 100644
---
a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/TTLMode.scala
+++
b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/TimeMode.scala
@@ -16,9 +16,11 @@
*/
package org.apache.spark.sql.catalyst.plans.logical
-import org.apache.spark.sql.streaming.TTLMode
+import org.apache.spark.sql.streaming.TimeMode
-/** TTL types used in tranformWithState operator */
-case object NoTTL extends TTLMode
+/** TimeMode types used in transformWithState operator */
+case object NoTime extends TimeMode
-case object ProcessingTimeTTL extends TTLMode
+case object ProcessingTime extends TimeMode
+
+case object EventTime extends TimeMode
diff --git
a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/TransformWithStateTimeoutModes.scala
b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/TransformWithStateTimeoutModes.scala
deleted file mode 100644
index e420f7821b50..000000000000
---
a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/TransformWithStateTimeoutModes.scala
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql.catalyst.plans.logical
-
-import org.apache.spark.sql.streaming.TimeoutMode
-
-/** Types of timeouts used in transformWithState operator */
-case object NoTimeouts extends TimeoutMode
-case object ProcessingTime extends TimeoutMode
-case object EventTime extends TimeoutMode
diff --git
a/sql/api/src/main/scala/org/apache/spark/sql/streaming/StatefulProcessor.scala
b/sql/api/src/main/scala/org/apache/spark/sql/streaming/StatefulProcessor.scala
index 70f9cdfa399a..54e6a9a4ab67 100644
---
a/sql/api/src/main/scala/org/apache/spark/sql/streaming/StatefulProcessor.scala
+++
b/sql/api/src/main/scala/org/apache/spark/sql/streaming/StatefulProcessor.scala
@@ -40,12 +40,11 @@ private[sql] abstract class StatefulProcessor[K, I, O]
extends Serializable {
* Function that will be invoked as the first method that allows for users to
* initialize all their state variables and perform other init actions
before handling data.
* @param outputMode - output mode for the stateful processor
- * @param timeoutMode - timeout mode for the stateful processor
+ * @param timeMode - time mode for the stateful processor.
*/
def init(
outputMode: OutputMode,
- timeoutMode: TimeoutMode,
- ttlMode: TTLMode): Unit
+ timeMode: TimeMode): Unit
/**
* Function that will allow users to interact with input data rows along
with the grouping key
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
index ff7c8fb3df4b..28d52d39093b 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
@@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.types.DataTypeUtils
import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.streaming.{GroupStateTimeout, OutputMode,
StatefulProcessor, TimeoutMode, TTLMode}
+import org.apache.spark.sql.streaming.{GroupStateTimeout, OutputMode,
StatefulProcessor, TimeMode}
import org.apache.spark.sql.types._
object CatalystSerde {
@@ -574,8 +574,7 @@ object TransformWithState {
groupingAttributes: Seq[Attribute],
dataAttributes: Seq[Attribute],
statefulProcessor: StatefulProcessor[K, V, U],
- ttlMode: TTLMode,
- timeoutMode: TimeoutMode,
+ timeMode: TimeMode,
outputMode: OutputMode,
child: LogicalPlan): LogicalPlan = {
val keyEncoder = encoderFor[K]
@@ -585,8 +584,7 @@ object TransformWithState {
groupingAttributes,
dataAttributes,
statefulProcessor.asInstanceOf[StatefulProcessor[Any, Any, Any]],
- ttlMode,
- timeoutMode,
+ timeMode,
outputMode,
keyEncoder.asInstanceOf[ExpressionEncoder[Any]],
CatalystSerde.generateObjAttr[U],
@@ -607,8 +605,7 @@ object TransformWithState {
groupingAttributes: Seq[Attribute],
dataAttributes: Seq[Attribute],
statefulProcessor: StatefulProcessor[K, V, U],
- ttlMode: TTLMode,
- timeoutMode: TimeoutMode,
+ timeMode: TimeMode,
outputMode: OutputMode,
child: LogicalPlan,
initialStateGroupingAttrs: Seq[Attribute],
@@ -621,8 +618,7 @@ object TransformWithState {
groupingAttributes,
dataAttributes,
statefulProcessor.asInstanceOf[StatefulProcessor[Any, Any, Any]],
- ttlMode,
- timeoutMode,
+ timeMode,
outputMode,
keyEncoder.asInstanceOf[ExpressionEncoder[Any]],
CatalystSerde.generateObjAttr[U],
@@ -643,8 +639,7 @@ case class TransformWithState(
groupingAttributes: Seq[Attribute],
dataAttributes: Seq[Attribute],
statefulProcessor: StatefulProcessor[Any, Any, Any],
- ttlMode: TTLMode,
- timeoutMode: TimeoutMode,
+ timeMode: TimeMode,
outputMode: OutputMode,
keyEncoder: ExpressionEncoder[Any],
outputObjAttr: Attribute,
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala
b/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala
index f3713edd0ec0..862268eba666 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.expressions.ReduceAggregator
import org.apache.spark.sql.internal.TypedAggUtils
-import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout,
OutputMode, StatefulProcessor, StatefulProcessorWithInitialState, TimeoutMode,
TTLMode}
+import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout,
OutputMode, StatefulProcessor, StatefulProcessorWithInitialState, TimeMode}
/**
* A [[Dataset]] has been logically grouped by a user specified grouping key.
Users should not
@@ -654,16 +654,14 @@ class KeyValueGroupedDataset[K, V] private[sql](
* @tparam U The type of the output objects. Must be encodable to Spark SQL
types.
* @param statefulProcessor Instance of statefulProcessor whose functions
will be invoked
* by the operator.
- * @param timeoutMode The timeout mode of the stateful processor.
- * @param ttlMode The ttlMode to evict user state on ttl expiration
+ * @param timeMode The time mode semantics of the stateful
processor for timers and TTL.
* @param outputMode The output mode of the stateful processor.
*
* See [[Encoder]] for more details on what types are encodable to Spark SQL.
*/
private[sql] def transformWithState[U: Encoder](
statefulProcessor: StatefulProcessor[K, V, U],
- timeoutMode: TimeoutMode,
- ttlMode: TTLMode,
+ timeMode: TimeMode,
outputMode: OutputMode): Dataset[U] = {
Dataset[U](
sparkSession,
@@ -671,8 +669,7 @@ class KeyValueGroupedDataset[K, V] private[sql](
groupingAttributes,
dataAttributes,
statefulProcessor,
- ttlMode,
- timeoutMode,
+ timeMode,
outputMode,
child = logicalPlan
)
@@ -691,8 +688,7 @@ class KeyValueGroupedDataset[K, V] private[sql](
* @tparam U The type of the output objects. Must be encodable to Spark SQL
types.
* @param statefulProcessor Instance of statefulProcessor whose functions
will be invoked by the
* operator.
- * @param timeoutMode The timeout mode of the stateful processor.
- * @param ttlMode The ttlMode to evict user state on ttl expiration
+ * @param timeMode The time mode semantics of the stateful processor for
timers and TTL.
* @param outputMode The output mode of the stateful processor.
* @param outputEncoder Encoder for the output type.
*
@@ -700,11 +696,10 @@ class KeyValueGroupedDataset[K, V] private[sql](
*/
private[sql] def transformWithState[U: Encoder](
statefulProcessor: StatefulProcessor[K, V, U],
- timeoutMode: TimeoutMode,
- ttlMode: TTLMode,
+ timeMode: TimeMode,
outputMode: OutputMode,
outputEncoder: Encoder[U]): Dataset[U] = {
- transformWithState(statefulProcessor, timeoutMode, ttlMode,
outputMode)(outputEncoder)
+ transformWithState(statefulProcessor, timeMode, outputMode)(outputEncoder)
}
/**
@@ -716,8 +711,7 @@ class KeyValueGroupedDataset[K, V] private[sql](
* @tparam S The type of initial state objects. Must be encodable to Spark
SQL types.
* @param statefulProcessor Instance of statefulProcessor whose functions
will
* be invoked by the operator.
- * @param timeoutMode The timeout mode of the stateful processor.
- * @param ttlMode The ttlMode to evict user state on ttl expiration
+ * @param timeMode The time mode semantics of the stateful
processor for timers and TTL.
* @param outputMode The output mode of the stateful processor.
* @param initialState User provided initial state that will be used to
initiate state for
* the query in the first batch.
@@ -726,8 +720,7 @@ class KeyValueGroupedDataset[K, V] private[sql](
*/
private[sql] def transformWithState[U: Encoder, S: Encoder](
statefulProcessor: StatefulProcessorWithInitialState[K, V, U, S],
- timeoutMode: TimeoutMode,
- ttlMode: TTLMode,
+ timeMode: TimeMode,
outputMode: OutputMode,
initialState: KeyValueGroupedDataset[K, S]): Dataset[U] = {
Dataset[U](
@@ -736,8 +729,7 @@ class KeyValueGroupedDataset[K, V] private[sql](
groupingAttributes,
dataAttributes,
statefulProcessor,
- ttlMode,
- timeoutMode,
+ timeMode,
outputMode,
child = logicalPlan,
initialState.groupingAttributes,
@@ -756,8 +748,7 @@ class KeyValueGroupedDataset[K, V] private[sql](
* @tparam S The type of initial state objects. Must be encodable to Spark
SQL types.
* @param statefulProcessor Instance of statefulProcessor whose functions
will
* be invoked by the operator.
- * @param timeoutMode The timeout mode of the stateful processor.
- * @param ttlMode The ttlMode to evict user state on ttl expiration
+ * @param timeMode The time mode semantics of the stateful
processor for timers and TTL.
* @param outputMode The output mode of the stateful processor.
* @param initialState User provided initial state that will be used to
initiate state for
* the query in the first batch.
@@ -768,13 +759,12 @@ class KeyValueGroupedDataset[K, V] private[sql](
*/
private[sql] def transformWithState[U: Encoder, S: Encoder](
statefulProcessor: StatefulProcessorWithInitialState[K, V, U, S],
- timeoutMode: TimeoutMode,
- ttlMode: TTLMode,
+ timeMode: TimeMode,
outputMode: OutputMode,
initialState: KeyValueGroupedDataset[K, S],
outputEncoder: Encoder[U],
initialStateEncoder: Encoder[S]): Dataset[U] = {
- transformWithState(statefulProcessor, timeoutMode, ttlMode,
+ transformWithState(statefulProcessor, timeMode,
outputMode, initialState)(outputEncoder, initialStateEncoder)
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 2c534eb36f9d..d7ebf786168b 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -751,7 +751,7 @@ abstract class SparkStrategies extends
QueryPlanner[SparkPlan] {
override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case TransformWithState(
keyDeserializer, valueDeserializer, groupingAttributes,
- dataAttributes, statefulProcessor, ttlMode, timeoutMode, outputMode,
+ dataAttributes, statefulProcessor, timeMode, outputMode,
keyEncoder, outputAttr, child, hasInitialState,
initialStateGroupingAttrs, initialStateDataAttrs,
initialStateDeserializer, initialState) =>
@@ -761,8 +761,7 @@ abstract class SparkStrategies extends
QueryPlanner[SparkPlan] {
groupingAttributes,
dataAttributes,
statefulProcessor,
- ttlMode,
- timeoutMode,
+ timeMode,
outputMode,
keyEncoder,
outputAttr,
@@ -926,12 +925,12 @@ abstract class SparkStrategies extends
QueryPlanner[SparkPlan] {
hasInitialState, planLater(initialState), planLater(child)
) :: Nil
case logical.TransformWithState(keyDeserializer, valueDeserializer,
groupingAttributes,
- dataAttributes, statefulProcessor, ttlMode, timeoutMode, outputMode,
keyEncoder,
+ dataAttributes, statefulProcessor, timeMode, outputMode, keyEncoder,
outputObjAttr, child, hasInitialState,
initialStateGroupingAttrs, initialStateDataAttrs,
initialStateDeserializer, initialState) =>
TransformWithStateExec.generateSparkPlanForBatchQueries(keyDeserializer,
valueDeserializer,
- groupingAttributes, dataAttributes, statefulProcessor, ttlMode,
timeoutMode, outputMode,
+ groupingAttributes, dataAttributes, statefulProcessor, timeMode,
outputMode,
keyEncoder, outputObjAttr, planLater(child), hasInitialState,
initialStateGroupingAttrs, initialStateDataAttrs,
initialStateDeserializer, planLater(initialState)) :: Nil
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ExpiredTimerInfoImpl.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ExpiredTimerInfoImpl.scala
index 8ab05ef852b8..e0bfc684585d 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ExpiredTimerInfoImpl.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ExpiredTimerInfoImpl.scala
@@ -16,7 +16,7 @@
*/
package org.apache.spark.sql.execution.streaming
-import org.apache.spark.sql.streaming.{ExpiredTimerInfo, TimeoutMode}
+import org.apache.spark.sql.streaming.{ExpiredTimerInfo, TimeMode}
/**
* Class that provides a concrete implementation that can be used to provide
access to expired
@@ -28,7 +28,7 @@ import org.apache.spark.sql.streaming.{ExpiredTimerInfo,
TimeoutMode}
class ExpiredTimerInfoImpl(
isValid: Boolean,
expiryTimeInMsOpt: Option[Long] = None,
- timeoutMode: TimeoutMode = TimeoutMode.NoTimeouts()) extends
ExpiredTimerInfo {
+ timeMode: TimeMode = TimeMode.None()) extends ExpiredTimerInfo {
override def isValid(): Boolean = isValid
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala
index 7bef62b7fcce..2b51361f651d 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala
@@ -25,7 +25,7 @@ import org.apache.spark.sql.Encoder
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution.streaming.state._
-import org.apache.spark.sql.streaming.{ListState, MapState, QueryInfo,
StatefulProcessorHandle, TimeoutMode, TTLConfig, TTLMode, ValueState}
+import org.apache.spark.sql.streaming.{ListState, MapState, QueryInfo,
StatefulProcessorHandle, TimeMode, TTLConfig, ValueState}
import org.apache.spark.util.Utils
/**
@@ -78,8 +78,7 @@ class StatefulProcessorHandleImpl(
store: StateStore,
runId: UUID,
keyEncoder: ExpressionEncoder[Any],
- ttlMode: TTLMode,
- timeoutMode: TimeoutMode,
+ timeMode: TimeMode,
isStreaming: Boolean = true,
batchTimestampMs: Option[Long] = None)
extends StatefulProcessorHandle with Logging {
@@ -143,7 +142,7 @@ class StatefulProcessorHandleImpl(
override def getQueryInfo(): QueryInfo = currQueryInfo
- private lazy val timerState = new TimerStateImpl(store, timeoutMode,
keyEncoder)
+ private lazy val timerState = new TimerStateImpl(store, timeMode, keyEncoder)
private def verifyStateVarOperations(operationType: String): Unit = {
if (currState != CREATED) {
@@ -153,9 +152,9 @@ class StatefulProcessorHandleImpl(
}
private def verifyTimerOperations(operationType: String): Unit = {
- if (timeoutMode == NoTimeouts) {
- throw
StateStoreErrors.cannotPerformOperationWithInvalidTimeoutMode(operationType,
- timeoutMode.toString)
+ if (timeMode == NoTime) {
+ throw
StateStoreErrors.cannotPerformOperationWithInvalidTimeMode(operationType,
+ timeMode.toString)
}
if (currState < INITIALIZED || currState >= TIMER_PROCESSED) {
@@ -242,8 +241,8 @@ class StatefulProcessorHandleImpl(
private def validateTTLConfig(ttlConfig: TTLConfig, stateName: String): Unit
= {
val ttlDuration = ttlConfig.ttlDuration
- if (ttlMode != TTLMode.ProcessingTimeTTL()) {
- throw StateStoreErrors.cannotProvideTTLConfigForNoTTLMode(stateName)
+ if (timeMode != TimeMode.ProcessingTime()) {
+ throw StateStoreErrors.cannotProvideTTLConfigForTimeMode(stateName,
timeMode.toString)
} else if (ttlDuration == null || ttlDuration.isNegative ||
ttlDuration.isZero) {
throw StateStoreErrors.ttlMustBePositive("update", stateName)
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TimerStateImpl.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TimerStateImpl.scala
index 55acc4953c50..e83c83df5322 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TimerStateImpl.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TimerStateImpl.scala
@@ -16,14 +16,12 @@
*/
package org.apache.spark.sql.execution.streaming
-import java.io.Serializable
-
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.execution.streaming.state._
-import org.apache.spark.sql.streaming.TimeoutMode
+import org.apache.spark.sql.streaming.TimeMode
import org.apache.spark.sql.types._
import org.apache.spark.util.NextIterator
@@ -31,10 +29,6 @@ import org.apache.spark.util.NextIterator
* Singleton utils class used primarily while interacting with TimerState
*/
object TimerStateUtils {
- case class TimestampWithKey(
- key: Any,
- expiryTimestampMs: Long) extends Serializable
-
val PROC_TIMERS_STATE_NAME = "_procTimers"
val EVENT_TIMERS_STATE_NAME = "_eventTimers"
val KEY_TO_TIMESTAMP_CF = "_keyToTimestamp"
@@ -45,12 +39,12 @@ object TimerStateUtils {
* Class that provides the implementation for storing timers
* used within the `transformWithState` operator.
* @param store - state store to be used for storing timer data
- * @param timeoutMode - mode of timeout (event time or processing time)
+ * @param timeMode - mode of timeout (event time or processing time)
* @param keyExprEnc - encoder for key expression
*/
class TimerStateImpl(
store: StateStore,
- timeoutMode: TimeoutMode,
+ timeMode: TimeMode,
keyExprEnc: ExpressionEncoder[Any]) extends Logging {
private val EMPTY_ROW =
@@ -78,7 +72,7 @@ class TimerStateImpl(
private val secIndexKeyEncoder =
UnsafeProjection.create(keySchemaForSecIndex)
- private val timerCFName = if (timeoutMode == TimeoutMode.ProcessingTime) {
+ private val timerCFName = if (timeMode == TimeMode.ProcessingTime) {
TimerStateUtils.PROC_TIMERS_STATE_NAME
} else {
TimerStateUtils.EVENT_TIMERS_STATE_NAME
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala
index eaf51614d7cb..9f4cad1e348d 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala
@@ -42,8 +42,7 @@ import org.apache.spark.util.{CompletionIterator,
SerializableConfiguration, Uti
* @param groupingAttributes used to group the data
* @param dataAttributes used to read the data
* @param statefulProcessor processor methods called on underlying data
- * @param ttlMode defines the ttl Mode for user state
- * @param timeoutMode defines the timeout mode
+ * @param timeMode The time mode semantics of the stateful processor for
timers and TTL.
* @param outputMode defines the output mode for the statefulProcessor
* @param keyEncoder expression encoder for the key type
* @param outputObjAttr Defines the output object
@@ -59,8 +58,7 @@ case class TransformWithStateExec(
groupingAttributes: Seq[Attribute],
dataAttributes: Seq[Attribute],
statefulProcessor: StatefulProcessor[Any, Any, Any],
- ttlMode: TTLMode,
- timeoutMode: TimeoutMode,
+ timeMode: TimeMode,
outputMode: OutputMode,
keyEncoder: ExpressionEncoder[Any],
outputObjAttr: Attribute,
@@ -80,14 +78,15 @@ case class TransformWithStateExec(
override def shortName: String = "transformWithStateExec"
override def shouldRunAnotherBatch(newInputWatermark: Long): Boolean = {
- if (ttlMode == TTLMode.ProcessingTimeTTL() || timeoutMode ==
TimeoutMode.ProcessingTime()) {
- // TODO: check if we can return true only if actual timers are registered
- true
- } else if (timeoutMode == TimeoutMode.EventTime()) {
- eventTimeWatermarkForEviction.isDefined &&
- newInputWatermark > eventTimeWatermarkForEviction.get
- } else {
- false
+ timeMode match {
+ case ProcessingTime =>
+ // TODO: check if we can return true only if actual timers are
registered, or there is
+ // expired state
+ true
+ case EventTime =>
+ eventTimeWatermarkForEviction.isDefined &&
+ newInputWatermark > eventTimeWatermarkForEviction.get
+ case _ => false
}
}
@@ -200,9 +199,9 @@ case class TransformWithStateExec(
}
private def processTimers(
- timeoutMode: TimeoutMode,
+ timeMode: TimeMode,
processorHandle: StatefulProcessorHandleImpl): Iterator[InternalRow] = {
- timeoutMode match {
+ timeMode match {
case ProcessingTime =>
assert(batchTimestampMs.isDefined)
val batchTimestamp = batchTimestampMs.get
@@ -262,7 +261,7 @@ case class TransformWithStateExec(
override def next() = itr.next()
private def getIterator(): Iterator[InternalRow] =
CompletionIterator[InternalRow, Iterator[InternalRow]](
- processTimers(timeoutMode, processorHandle), {
+ processTimers(timeMode, processorHandle), {
// Note: `timeoutLatencyMs` also includes the time the parent
operator took for
// processing output returned through iterator.
timeoutLatencyMs += NANOSECONDS.toMillis(System.nanoTime -
timeoutProcessingStartTimeNs)
@@ -297,8 +296,7 @@ case class TransformWithStateExec(
override protected def doExecute(): RDD[InternalRow] = {
metrics // force lazy init at driver
- validateTTLMode()
- validateTimeoutMode()
+ validateTimeMode()
if (hasInitialState) {
val storeConf = new StateStoreConf(session.sqlContext.sessionState.conf)
@@ -413,11 +411,11 @@ case class TransformWithStateExec(
private def processData(store: StateStore, singleIterator:
Iterator[InternalRow]):
CompletionIterator[InternalRow, Iterator[InternalRow]] = {
val processorHandle = new StatefulProcessorHandleImpl(
- store, getStateInfo.queryRunId, keyEncoder, ttlMode, timeoutMode,
+ store, getStateInfo.queryRunId, keyEncoder, timeMode,
isStreaming, batchTimestampMs)
assert(processorHandle.getHandleState ==
StatefulProcessorHandleState.CREATED)
statefulProcessor.setHandle(processorHandle)
- statefulProcessor.init(outputMode, timeoutMode, ttlMode)
+ statefulProcessor.init(outputMode, timeMode)
processorHandle.setHandleState(StatefulProcessorHandleState.INITIALIZED)
processDataWithPartition(singleIterator, store, processorHandle)
}
@@ -428,10 +426,10 @@ case class TransformWithStateExec(
initStateIterator: Iterator[InternalRow]):
CompletionIterator[InternalRow, Iterator[InternalRow]] = {
val processorHandle = new StatefulProcessorHandleImpl(store,
getStateInfo.queryRunId,
- keyEncoder, ttlMode, timeoutMode, isStreaming)
+ keyEncoder, timeMode, isStreaming)
assert(processorHandle.getHandleState ==
StatefulProcessorHandleState.CREATED)
statefulProcessor.setHandle(processorHandle)
- statefulProcessor.init(outputMode, timeoutMode, ttlMode)
+ statefulProcessor.init(outputMode, timeMode)
processorHandle.setHandleState(StatefulProcessorHandleState.INITIALIZED)
// Check if is first batch
@@ -450,27 +448,16 @@ case class TransformWithStateExec(
processDataWithPartition(childDataIterator, store, processorHandle)
}
- private def validateTimeoutMode(): Unit = {
- timeoutMode match {
+ private def validateTimeMode(): Unit = {
+ timeMode match {
case ProcessingTime =>
if (batchTimestampMs.isEmpty) {
- StateStoreErrors.missingTimeoutValues(timeoutMode.toString)
+ StateStoreErrors.missingTimeValues(timeMode.toString)
}
case EventTime =>
if (eventTimeWatermarkForEviction.isEmpty) {
- StateStoreErrors.missingTimeoutValues(timeoutMode.toString)
- }
-
- case _ =>
- }
- }
-
- private def validateTTLMode(): Unit = {
- ttlMode match {
- case ProcessingTimeTTL =>
- if (batchTimestampMs.isEmpty) {
- StateStoreErrors.missingTTLValues(timeoutMode.toString)
+ StateStoreErrors.missingTimeValues(timeMode.toString)
}
case _ =>
@@ -488,8 +475,7 @@ object TransformWithStateExec {
groupingAttributes: Seq[Attribute],
dataAttributes: Seq[Attribute],
statefulProcessor: StatefulProcessor[Any, Any, Any],
- ttlMode: TTLMode,
- timeoutMode: TimeoutMode,
+ timeMode: TimeMode,
outputMode: OutputMode,
keyEncoder: ExpressionEncoder[Any],
outputObjAttr: Attribute,
@@ -514,8 +500,7 @@ object TransformWithStateExec {
groupingAttributes,
dataAttributes,
statefulProcessor,
- ttlMode,
- timeoutMode,
+ timeMode,
outputMode,
keyEncoder,
outputObjAttr,
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala
index 6c63aa94e75b..b8ab32a00851 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala
@@ -32,16 +32,9 @@ object StateStoreErrors {
)
}
- def missingTimeoutValues(timeoutMode: String): SparkException = {
+ def missingTimeValues(timeMode: String): SparkException = {
SparkException.internalError(
- msg = s"Failed to find timeout values for timeoutMode=$timeoutMode",
- category = "TWS"
- )
- }
-
- def missingTTLValues(ttlMode: String): SparkException = {
- SparkException.internalError(
- msg = s"Failed to find timeout values for ttlMode=$ttlMode",
+ msg = s"Failed to find time values for timeMode=$timeMode",
category = "TWS"
)
}
@@ -108,10 +101,10 @@ object StateStoreErrors {
new StateStoreCannotCreateColumnFamilyWithReservedChars(colFamilyName)
}
- def cannotPerformOperationWithInvalidTimeoutMode(
+ def cannotPerformOperationWithInvalidTimeMode(
operationType: String,
- timeoutMode: String):
StatefulProcessorCannotPerformOperationWithInvalidTimeoutMode = {
- new
StatefulProcessorCannotPerformOperationWithInvalidTimeoutMode(operationType,
timeoutMode)
+ timeMode: String):
StatefulProcessorCannotPerformOperationWithInvalidTimeMode = {
+ new
StatefulProcessorCannotPerformOperationWithInvalidTimeMode(operationType,
timeMode)
}
def cannotPerformOperationWithInvalidHandleState(
@@ -125,9 +118,9 @@ object StateStoreErrors {
new StatefulProcessorCannotReInitializeState(groupingKey)
}
- def cannotProvideTTLConfigForNoTTLMode(stateName: String):
- StatefulProcessorCannotAssignTTLInNoTTLMode = {
- new StatefulProcessorCannotAssignTTLInNoTTLMode(stateName)
+ def cannotProvideTTLConfigForTimeMode(stateName: String, timeMode: String):
+ StatefulProcessorCannotAssignTTLInTimeMode = {
+ new StatefulProcessorCannotAssignTTLInTimeMode(stateName, timeMode)
}
def ttlMustBePositive(operationType: String,
@@ -163,12 +156,12 @@ class
StateStoreUnsupportedOperationException(operationType: String, entity: Str
messageParameters = Map("operationType" -> operationType, "entity" ->
entity)
)
-class StatefulProcessorCannotPerformOperationWithInvalidTimeoutMode(
+class StatefulProcessorCannotPerformOperationWithInvalidTimeMode(
operationType: String,
- timeoutMode: String)
+ timeMode: String)
extends SparkUnsupportedOperationException(
- errorClass =
"STATEFUL_PROCESSOR_CANNOT_PERFORM_OPERATION_WITH_INVALID_TIMEOUT_MODE",
- messageParameters = Map("operationType" -> operationType, "timeoutMode" ->
timeoutMode)
+ errorClass =
"STATEFUL_PROCESSOR_CANNOT_PERFORM_OPERATION_WITH_INVALID_TIME_MODE",
+ messageParameters = Map("operationType" -> operationType, "timeMode" ->
timeMode)
)
class StatefulProcessorCannotPerformOperationWithInvalidHandleState(
@@ -210,10 +203,10 @@ class
StateStoreNullTypeOrderingColsNotSupported(fieldName: String, index: Strin
errorClass = "STATE_STORE_NULL_TYPE_ORDERING_COLS_NOT_SUPPORTED",
messageParameters = Map("fieldName" -> fieldName, "index" -> index))
-class StatefulProcessorCannotAssignTTLInNoTTLMode(stateName: String)
+class StatefulProcessorCannotAssignTTLInTimeMode(stateName: String, timeMode:
String)
extends SparkUnsupportedOperationException(
- errorClass = "STATEFUL_PROCESSOR_CANNOT_ASSIGN_TTL_IN_NO_TTL_MODE",
- messageParameters = Map("stateName" -> stateName))
+ errorClass = "STATEFUL_PROCESSOR_INCORRECT_TIME_MODE_TO_ASSIGN_TTL",
+ messageParameters = Map("stateName" -> stateName, "timeMode" -> timeMode))
class StatefulProcessorTTLMustBePositive(
operationType: String,
diff --git
a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java
b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java
index f9f075f4468d..5d7ae477e089 100644
--- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java
+++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java
@@ -206,8 +206,7 @@ public class JavaDatasetSuite implements Serializable {
Dataset<String> transformWithStateMapped = grouped.transformWithState(
new TestStatefulProcessorWithInitialState(),
- TimeoutMode.NoTimeouts(),
- TTLMode.NoTTL(),
+ TimeMode.None(),
OutputMode.Append(),
kvInitStateMappedDS,
Encoders.STRING(),
@@ -362,8 +361,7 @@ public class JavaDatasetSuite implements Serializable {
StatefulProcessor<Integer, String, String> testStatefulProcessor = new
TestStatefulProcessor();
Dataset<String> transformWithStateMapped = grouped.transformWithState(
testStatefulProcessor,
- TimeoutMode.NoTimeouts(),
- TTLMode.NoTTL(),
+ TimeMode.None(),
OutputMode.Append(),
Encoders.STRING());
diff --git
a/sql/core/src/test/java/test/org/apache/spark/sql/TestStatefulProcessor.java
b/sql/core/src/test/java/test/org/apache/spark/sql/TestStatefulProcessor.java
index c6d705af5f2d..e53e977da149 100644
---
a/sql/core/src/test/java/test/org/apache/spark/sql/TestStatefulProcessor.java
+++
b/sql/core/src/test/java/test/org/apache/spark/sql/TestStatefulProcessor.java
@@ -38,8 +38,7 @@ public class TestStatefulProcessor extends
StatefulProcessor<Integer, String, St
@Override
public void init(
OutputMode outputMode,
- TimeoutMode timeoutMode,
- TTLMode ttlMode) {
+ TimeMode timeMode) {
countState = this.getHandle().getValueState("countState",
Encoders.LONG());
diff --git
a/sql/core/src/test/java/test/org/apache/spark/sql/TestStatefulProcessorWithInitialState.java
b/sql/core/src/test/java/test/org/apache/spark/sql/TestStatefulProcessorWithInitialState.java
index db0b222145c4..bfa542e81e35 100644
---
a/sql/core/src/test/java/test/org/apache/spark/sql/TestStatefulProcessorWithInitialState.java
+++
b/sql/core/src/test/java/test/org/apache/spark/sql/TestStatefulProcessorWithInitialState.java
@@ -37,8 +37,7 @@ public class TestStatefulProcessorWithInitialState
@Override
public void init(
OutputMode outputMode,
- TimeoutMode timeoutMode,
- TTLMode ttlMode) {
+ TimeMode timeMode) {
testState = this.getHandle().getValueState("testState",
Encoders.STRING());
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/ListStateSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/ListStateSuite.scala
index 51cfc1548b39..5eb48a86e342 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/ListStateSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/ListStateSuite.scala
@@ -23,7 +23,7 @@ import org.apache.spark.SparkIllegalArgumentException
import org.apache.spark.sql.Encoders
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.execution.streaming.{ImplicitGroupingKeyTracker,
StatefulProcessorHandleImpl}
-import org.apache.spark.sql.streaming.{ListState, TimeoutMode, TTLMode,
ValueState}
+import org.apache.spark.sql.streaming.{ListState, TimeMode, ValueState}
/**
* Class that adds unit tests for ListState types used in arbitrary stateful
@@ -37,8 +37,7 @@ class ListStateSuite extends StateVariableSuiteBase {
tryWithProviderResource(newStoreProviderWithStateVariable(true)) {
provider =>
val store = provider.getStore(0)
val handle = new StatefulProcessorHandleImpl(store, UUID.randomUUID(),
- Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]],
- TTLMode.NoTTL(), TimeoutMode.NoTimeouts())
+ Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]], TimeMode.None())
val listState: ListState[Long] = handle.getListState[Long]("listState",
Encoders.scalaLong)
@@ -71,8 +70,7 @@ class ListStateSuite extends StateVariableSuiteBase {
tryWithProviderResource(newStoreProviderWithStateVariable(true)) {
provider =>
val store = provider.getStore(0)
val handle = new StatefulProcessorHandleImpl(store, UUID.randomUUID(),
- Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]],
- TTLMode.NoTTL(), TimeoutMode.NoTimeouts())
+ Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]], TimeMode.None())
val testState: ListState[Long] = handle.getListState[Long]("testState",
Encoders.scalaLong)
ImplicitGroupingKeyTracker.setImplicitKey("test_key")
@@ -100,8 +98,7 @@ class ListStateSuite extends StateVariableSuiteBase {
tryWithProviderResource(newStoreProviderWithStateVariable(true)) {
provider =>
val store = provider.getStore(0)
val handle = new StatefulProcessorHandleImpl(store, UUID.randomUUID(),
- Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]],
- TTLMode.NoTTL(), TimeoutMode.NoTimeouts())
+ Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]], TimeMode.None())
val testState1: ListState[Long] =
handle.getListState[Long]("testState1", Encoders.scalaLong)
val testState2: ListState[Long] =
handle.getListState[Long]("testState2", Encoders.scalaLong)
@@ -139,8 +136,7 @@ class ListStateSuite extends StateVariableSuiteBase {
tryWithProviderResource(newStoreProviderWithStateVariable(true)) {
provider =>
val store = provider.getStore(0)
val handle = new StatefulProcessorHandleImpl(store, UUID.randomUUID(),
- Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]],
- TTLMode.NoTTL(), TimeoutMode.NoTimeouts())
+ Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]], TimeMode.None())
val listState1: ListState[Long] =
handle.getListState[Long]("listState1", Encoders.scalaLong)
val listState2: ListState[Long] =
handle.getListState[Long]("listState2", Encoders.scalaLong)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/MapStateSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/MapStateSuite.scala
index 7fa41b12795e..572fc2429273 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/MapStateSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/MapStateSuite.scala
@@ -22,7 +22,7 @@ import java.util.UUID
import org.apache.spark.sql.Encoders
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.execution.streaming.{ImplicitGroupingKeyTracker,
StatefulProcessorHandleImpl}
-import org.apache.spark.sql.streaming.{ListState, MapState, TimeoutMode,
TTLMode, ValueState}
+import org.apache.spark.sql.streaming.{ListState, MapState, TimeMode,
ValueState}
import org.apache.spark.sql.types.{BinaryType, StructType}
/**
@@ -39,8 +39,7 @@ class MapStateSuite extends StateVariableSuiteBase {
tryWithProviderResource(newStoreProviderWithStateVariable(true)) {
provider =>
val store = provider.getStore(0)
val handle = new StatefulProcessorHandleImpl(store, UUID.randomUUID(),
- Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]],
- TTLMode.NoTTL(), TimeoutMode.NoTimeouts())
+ Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]], TimeMode.None())
val testState: MapState[String, Double] =
handle.getMapState[String, Double]("testState", Encoders.STRING,
Encoders.scalaDouble)
@@ -74,8 +73,7 @@ class MapStateSuite extends StateVariableSuiteBase {
tryWithProviderResource(newStoreProviderWithStateVariable(true)) {
provider =>
val store = provider.getStore(0)
val handle = new StatefulProcessorHandleImpl(store, UUID.randomUUID(),
- Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]],
- TTLMode.NoTTL(), TimeoutMode.NoTimeouts())
+ Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]], TimeMode.None())
val testState1: MapState[Long, Double] =
handle.getMapState[Long, Double]("testState1", Encoders.scalaLong,
Encoders.scalaDouble)
@@ -114,8 +112,7 @@ class MapStateSuite extends StateVariableSuiteBase {
tryWithProviderResource(newStoreProviderWithStateVariable(true)) {
provider =>
val store = provider.getStore(0)
val handle = new StatefulProcessorHandleImpl(store, UUID.randomUUID(),
- Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]],
- TTLMode.NoTTL(), TimeoutMode.NoTimeouts())
+ Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]], TimeMode.None())
val mapTestState1: MapState[String, Int] =
handle.getMapState[String, Int]("mapTestState1", Encoders.STRING,
Encoders.scalaInt)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StatefulProcessorHandleSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StatefulProcessorHandleSuite.scala
index a32b4111eae8..e9ffe4ca9269 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StatefulProcessorHandleSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StatefulProcessorHandleSuite.scala
@@ -24,7 +24,7 @@ import org.apache.spark.SparkUnsupportedOperationException
import org.apache.spark.sql.Encoders
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.execution.streaming.{ImplicitGroupingKeyTracker,
StatefulProcessorHandleImpl, StatefulProcessorHandleState}
-import org.apache.spark.sql.streaming.{TimeoutMode, TTLConfig, TTLMode}
+import org.apache.spark.sql.streaming.{TimeMode, TTLConfig}
/**
@@ -36,21 +36,21 @@ class StatefulProcessorHandleSuite extends
StateVariableSuiteBase {
private def keyExprEncoder: ExpressionEncoder[Any] =
Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]]
- private def getTimeoutMode(timeoutMode: String): TimeoutMode = {
- timeoutMode match {
- case "NoTimeouts" => TimeoutMode.NoTimeouts()
- case "ProcessingTime" => TimeoutMode.ProcessingTime()
- case "EventTime" => TimeoutMode.EventTime()
- case _ => throw new IllegalArgumentException(s"Invalid
timeoutMode=$timeoutMode")
+ private def getTimeMode(timeMode: String): TimeMode = {
+ timeMode match {
+ case "None" => TimeMode.None()
+ case "ProcessingTime" => TimeMode.ProcessingTime()
+ case "EventTime" => TimeMode.EventTime()
+ case _ => throw new IllegalArgumentException(s"Invalid
timeMode=$timeMode")
}
}
- Seq("NoTimeouts", "ProcessingTime", "EventTime").foreach { timeoutMode =>
- test(s"value state creation with timeoutMode=$timeoutMode should succeed")
{
+ Seq("None", "ProcessingTime", "EventTime").foreach { timeMode =>
+ test(s"value state creation with timeMode=$timeMode should succeed") {
tryWithProviderResource(newStoreProviderWithStateVariable(true)) {
provider =>
val store = provider.getStore(0)
val handle = new StatefulProcessorHandleImpl(store,
- UUID.randomUUID(), keyExprEncoder, TTLMode.NoTTL(),
getTimeoutMode(timeoutMode))
+ UUID.randomUUID(), keyExprEncoder, getTimeMode(timeMode))
assert(handle.getHandleState === StatefulProcessorHandleState.CREATED)
handle.getValueState[Long]("testState", Encoders.scalaLong)
}
@@ -85,13 +85,13 @@ class StatefulProcessorHandleSuite extends
StateVariableSuiteBase {
handle.registerTimer(1000L)
}
- Seq("NoTimeouts", "ProcessingTime", "EventTime").foreach { timeoutMode =>
- test(s"value state creation with timeoutMode=$timeoutMode " +
+ Seq("None", "ProcessingTime", "EventTime").foreach { timeMode =>
+ test(s"value state creation with timeMode=$timeMode " +
"and invalid state should fail") {
tryWithProviderResource(newStoreProviderWithStateVariable(true)) {
provider =>
val store = provider.getStore(0)
val handle = new StatefulProcessorHandleImpl(store,
- UUID.randomUUID(), keyExprEncoder, TTLMode.NoTTL(),
getTimeoutMode(timeoutMode))
+ UUID.randomUUID(), keyExprEncoder, getTimeMode(timeMode))
Seq(StatefulProcessorHandleState.INITIALIZED,
StatefulProcessorHandleState.DATA_PROCESSED,
@@ -105,21 +105,21 @@ class StatefulProcessorHandleSuite extends
StateVariableSuiteBase {
}
}
- test("registering processing/event time timeouts with NoTimeout mode should
fail") {
+ test("registering processing/event time timeouts with None timeMode should
fail") {
tryWithProviderResource(newStoreProviderWithStateVariable(true)) {
provider =>
val store = provider.getStore(0)
val handle = new StatefulProcessorHandleImpl(store,
- UUID.randomUUID(), keyExprEncoder, TTLMode.NoTTL(),
TimeoutMode.NoTimeouts())
+ UUID.randomUUID(), keyExprEncoder, TimeMode.None())
val ex = intercept[SparkUnsupportedOperationException] {
handle.registerTimer(10000L)
}
checkError(
ex,
- errorClass =
"STATEFUL_PROCESSOR_CANNOT_PERFORM_OPERATION_WITH_INVALID_TIMEOUT_MODE",
+ errorClass =
"STATEFUL_PROCESSOR_CANNOT_PERFORM_OPERATION_WITH_INVALID_TIME_MODE",
parameters = Map(
"operationType" -> "register_timer",
- "timeoutMode" -> TimeoutMode.NoTimeouts().toString
+ "timeMode" -> TimeMode.None().toString
),
matchPVals = true
)
@@ -130,22 +130,22 @@ class StatefulProcessorHandleSuite extends
StateVariableSuiteBase {
checkError(
ex2,
- errorClass =
"STATEFUL_PROCESSOR_CANNOT_PERFORM_OPERATION_WITH_INVALID_TIMEOUT_MODE",
+ errorClass =
"STATEFUL_PROCESSOR_CANNOT_PERFORM_OPERATION_WITH_INVALID_TIME_MODE",
parameters = Map(
"operationType" -> "delete_timer",
- "timeoutMode" -> TimeoutMode.NoTimeouts().toString
+ "timeMode" -> TimeMode.None().toString
),
matchPVals = true
)
}
}
- Seq("ProcessingTime", "EventTime").foreach { timeoutMode =>
- test(s"registering timeouts with timeoutMode=$timeoutMode should succeed")
{
+ Seq("ProcessingTime", "EventTime").foreach { timeMode =>
+ test(s"registering timeouts with timeMode=$timeMode should succeed") {
tryWithProviderResource(newStoreProviderWithStateVariable(true)) {
provider =>
val store = provider.getStore(0)
val handle = new StatefulProcessorHandleImpl(store,
- UUID.randomUUID(), keyExprEncoder, TTLMode.NoTTL(),
getTimeoutMode(timeoutMode))
+ UUID.randomUUID(), keyExprEncoder, getTimeMode(timeMode))
handle.setHandleState(StatefulProcessorHandleState.INITIALIZED)
assert(handle.getHandleState ===
StatefulProcessorHandleState.INITIALIZED)
@@ -161,12 +161,12 @@ class StatefulProcessorHandleSuite extends
StateVariableSuiteBase {
}
}
- Seq("ProcessingTime", "EventTime").foreach { timeoutMode =>
- test(s"verify listing of registered timers with timeoutMode=$timeoutMode")
{
+ Seq("ProcessingTime", "EventTime").foreach { timeMode =>
+ test(s"verify listing of registered timers with timeMode=$timeMode") {
tryWithProviderResource(newStoreProviderWithStateVariable(true)) {
provider =>
val store = provider.getStore(0)
val handle = new StatefulProcessorHandleImpl(store,
- UUID.randomUUID(), keyExprEncoder, TTLMode.NoTTL(),
getTimeoutMode(timeoutMode))
+ UUID.randomUUID(), keyExprEncoder, getTimeMode(timeMode))
handle.setHandleState(StatefulProcessorHandleState.DATA_PROCESSED)
assert(handle.getHandleState ===
StatefulProcessorHandleState.DATA_PROCESSED)
@@ -201,12 +201,12 @@ class StatefulProcessorHandleSuite extends
StateVariableSuiteBase {
}
}
- Seq("ProcessingTime", "EventTime").foreach { timeoutMode =>
- test(s"registering timeouts with timeoutMode=$timeoutMode and invalid
state should fail") {
+ Seq("ProcessingTime", "EventTime").foreach { timeMode =>
+ test(s"registering timeouts with timeMode=$timeMode and invalid state
should fail") {
tryWithProviderResource(newStoreProviderWithStateVariable(true)) {
provider =>
val store = provider.getStore(0)
val handle = new StatefulProcessorHandleImpl(store,
- UUID.randomUUID(), keyExprEncoder, TTLMode.NoTTL(),
getTimeoutMode(timeoutMode))
+ UUID.randomUUID(), keyExprEncoder, getTimeMode(timeMode))
Seq(StatefulProcessorHandleState.CREATED,
StatefulProcessorHandleState.TIMER_PROCESSED,
@@ -219,11 +219,11 @@ class StatefulProcessorHandleSuite extends
StateVariableSuiteBase {
}
}
- test(s"ttl States are populated for ttlMode=ProcessingTime") {
+ test(s"ttl States are populated for timeMode=ProcessingTime") {
tryWithProviderResource(newStoreProviderWithStateVariable(true)) {
provider =>
val store = provider.getStore(0)
val handle = new StatefulProcessorHandleImpl(store,
- UUID.randomUUID(), keyExprEncoder, TTLMode.ProcessingTimeTTL(),
TimeoutMode.NoTimeouts(),
+ UUID.randomUUID(), keyExprEncoder, TimeMode.ProcessingTime(),
batchTimestampMs = Some(10))
val valueStateWithTTL = handle.getValueState("testState",
@@ -237,11 +237,11 @@ class StatefulProcessorHandleSuite extends
StateVariableSuiteBase {
}
}
- test(s"ttl States are not populated for ttlMode=NoTTL") {
+ test(s"ttl States are not populated for timeMode=None") {
tryWithProviderResource(newStoreProviderWithStateVariable(true)) {
provider =>
val store = provider.getStore(0)
val handle = new StatefulProcessorHandleImpl(store,
- UUID.randomUUID(), keyExprEncoder, TTLMode.NoTTL(),
TimeoutMode.NoTimeouts())
+ UUID.randomUUID(), keyExprEncoder, TimeMode.None())
handle.getValueState("testState", Encoders.STRING)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/TimerSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/TimerSuite.scala
index 1af33aa7b5ad..0bf160d8b321 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/TimerSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/TimerSuite.scala
@@ -20,31 +20,31 @@ package org.apache.spark.sql.execution.streaming.state
import org.apache.spark.sql.Encoders
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.execution.streaming.{ImplicitGroupingKeyTracker,
TimerStateImpl}
-import org.apache.spark.sql.streaming.TimeoutMode
+import org.apache.spark.sql.streaming.TimeMode
/**
* Class that adds unit tests for Timer State used in arbitrary stateful
* operators such as transformWithState
*/
class TimerSuite extends StateVariableSuiteBase {
- private def testWithTimeOutMode(testName: String)
- (testFunc: TimeoutMode => Unit): Unit = {
+ private def testWithTimeMode(testName: String)
+ (testFunc: TimeMode => Unit): Unit = {
Seq("Processing", "Event").foreach { timeoutMode =>
test(s"$timeoutMode timer - " + testName) {
timeoutMode match {
- case "Processing" => testFunc(TimeoutMode.ProcessingTime())
- case "Event" => testFunc(TimeoutMode.EventTime())
+ case "Processing" => testFunc(TimeMode.ProcessingTime())
+ case "Event" => testFunc(TimeMode.EventTime())
}
}
}
}
- testWithTimeOutMode("single instance with single key") { timeoutMode =>
+ testWithTimeMode("single instance with single key") { timeMode =>
tryWithProviderResource(newStoreProviderWithStateVariable(true)) {
provider =>
val store = provider.getStore(0)
ImplicitGroupingKeyTracker.setImplicitKey("test_key")
- val timerState = new TimerStateImpl(store, timeoutMode,
+ val timerState = new TimerStateImpl(store, timeMode,
Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]])
timerState.registerTimer(1L * 1000)
assert(timerState.listTimers().toSet === Set(1000L))
@@ -58,14 +58,14 @@ class TimerSuite extends StateVariableSuiteBase {
}
}
- testWithTimeOutMode("multiple instances with single key") { timeoutMode =>
+ testWithTimeMode("multiple instances with single key") { timeMode =>
tryWithProviderResource(newStoreProviderWithStateVariable(true)) {
provider =>
val store = provider.getStore(0)
ImplicitGroupingKeyTracker.setImplicitKey("test_key")
- val timerState1 = new TimerStateImpl(store, timeoutMode,
+ val timerState1 = new TimerStateImpl(store, timeMode,
Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]])
- val timerState2 = new TimerStateImpl(store, timeoutMode,
+ val timerState2 = new TimerStateImpl(store, timeMode,
Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]])
timerState1.registerTimer(1L * 1000)
timerState2.registerTimer(15L * 1000)
@@ -83,12 +83,12 @@ class TimerSuite extends StateVariableSuiteBase {
}
}
- testWithTimeOutMode("multiple instances with multiple keys") { timeoutMode =>
+ testWithTimeMode("multiple instances with multiple keys") { timeMode =>
tryWithProviderResource(newStoreProviderWithStateVariable(true)) {
provider =>
val store = provider.getStore(0)
ImplicitGroupingKeyTracker.setImplicitKey("test_key1")
- val timerState1 = new TimerStateImpl(store, timeoutMode,
+ val timerState1 = new TimerStateImpl(store, timeMode,
Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]])
timerState1.registerTimer(1L * 1000)
timerState1.registerTimer(2L * 1000)
@@ -96,7 +96,7 @@ class TimerSuite extends StateVariableSuiteBase {
ImplicitGroupingKeyTracker.removeImplicitKey()
ImplicitGroupingKeyTracker.setImplicitKey("test_key2")
- val timerState2 = new TimerStateImpl(store, timeoutMode,
+ val timerState2 = new TimerStateImpl(store, timeMode,
Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]])
timerState2.registerTimer(15L * 1000)
ImplicitGroupingKeyTracker.removeImplicitKey()
@@ -115,13 +115,13 @@ class TimerSuite extends StateVariableSuiteBase {
}
}
- testWithTimeOutMode("Range scan on second index timer key - " +
- "verify timestamp is sorted for single instance") { timeoutMode =>
+ testWithTimeMode("Range scan on second index timer key - " +
+ "verify timestamp is sorted for single instance") { timeMode =>
tryWithProviderResource(newStoreProviderWithStateVariable(true)) {
provider =>
val store = provider.getStore(0)
ImplicitGroupingKeyTracker.setImplicitKey("test_key")
- val timerState = new TimerStateImpl(store, timeoutMode,
+ val timerState = new TimerStateImpl(store, timeMode,
Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]])
val timerTimerstamps = Seq(931L, 8000L, 452300L, 4200L, 90L, 1L, 2L, 8L,
3L, 35L, 6L, 9L, 5L)
// register/put unordered timestamp into rocksDB
@@ -134,25 +134,25 @@ class TimerSuite extends StateVariableSuiteBase {
}
}
- testWithTimeOutMode("test range scan on second index timer key - " +
- "verify timestamp is sorted for multiple instances") { timeoutMode =>
+ testWithTimeMode("test range scan on second index timer key - " +
+ "verify timestamp is sorted for multiple instances") { timeMode =>
tryWithProviderResource(newStoreProviderWithStateVariable(true)) {
provider =>
val store = provider.getStore(0)
ImplicitGroupingKeyTracker.setImplicitKey("test_key1")
- val timerState1 = new TimerStateImpl(store, timeoutMode,
+ val timerState1 = new TimerStateImpl(store, timeMode,
Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]])
val timerTimestamps1 = Seq(64L, 32L, 1024L, 4096L, 0L, 1L)
timerTimestamps1.foreach(timerState1.registerTimer)
- val timerState2 = new TimerStateImpl(store, timeoutMode,
+ val timerState2 = new TimerStateImpl(store, timeMode,
Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]])
val timerTimestamps2 = Seq(931L, 8000L, 452300L, 4200L)
timerTimestamps2.foreach(timerState2.registerTimer)
ImplicitGroupingKeyTracker.removeImplicitKey()
ImplicitGroupingKeyTracker.setImplicitKey("test_key3")
- val timerState3 = new TimerStateImpl(store, timeoutMode,
+ val timerState3 = new TimerStateImpl(store, timeMode,
Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]])
val timerTimerStamps3 = Seq(1L, 2L, 8L, 3L)
timerTimerStamps3.foreach(timerState3.registerTimer)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/ValueStateSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/ValueStateSuite.scala
index 102164d9c15f..d2747e2976f4 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/ValueStateSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/ValueStateSuite.scala
@@ -30,7 +30,7 @@ import org.apache.spark.sql.Encoders
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.execution.streaming.{ImplicitGroupingKeyTracker,
StatefulProcessorHandleImpl, ValueStateImplWithTTL}
import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.streaming.{TimeoutMode, TTLConfig, TTLMode,
ValueState}
+import org.apache.spark.sql.streaming.{TimeMode, TTLConfig, ValueState}
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types._
@@ -49,8 +49,7 @@ class ValueStateSuite extends StateVariableSuiteBase {
tryWithProviderResource(newStoreProviderWithStateVariable(true)) {
provider =>
val store = provider.getStore(0)
val handle = new StatefulProcessorHandleImpl(store, UUID.randomUUID(),
- Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]],
- TTLMode.NoTTL(), TimeoutMode.NoTimeouts())
+ Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]], TimeMode.None())
val stateName = "testState"
val testState: ValueState[Long] =
handle.getValueState[Long]("testState", Encoders.scalaLong)
@@ -94,8 +93,7 @@ class ValueStateSuite extends StateVariableSuiteBase {
tryWithProviderResource(newStoreProviderWithStateVariable(true)) {
provider =>
val store = provider.getStore(0)
val handle = new StatefulProcessorHandleImpl(store, UUID.randomUUID(),
- Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]],
- TTLMode.NoTTL(), TimeoutMode.NoTimeouts())
+ Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]], TimeMode.None())
val testState: ValueState[Long] =
handle.getValueState[Long]("testState", Encoders.scalaLong)
ImplicitGroupingKeyTracker.setImplicitKey("test_key")
@@ -121,8 +119,7 @@ class ValueStateSuite extends StateVariableSuiteBase {
tryWithProviderResource(newStoreProviderWithStateVariable(true)) {
provider =>
val store = provider.getStore(0)
val handle = new StatefulProcessorHandleImpl(store, UUID.randomUUID(),
- Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]],
- TTLMode.NoTTL(), TimeoutMode.NoTimeouts())
+ Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]], TimeMode.None())
val testState1: ValueState[Long] = handle.getValueState[Long](
"testState1", Encoders.scalaLong)
@@ -167,8 +164,7 @@ class ValueStateSuite extends StateVariableSuiteBase {
tryWithProviderResource(newStoreProviderWithStateVariable(true)) {
provider =>
val store = provider.getStore(0)
val handle = new StatefulProcessorHandleImpl(store,
- UUID.randomUUID(),
Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]],
- TTLMode.NoTTL(), TimeoutMode.NoTimeouts())
+ UUID.randomUUID(),
Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]], TimeMode.None())
val cfName = "_testState"
val ex = intercept[SparkUnsupportedOperationException] {
@@ -208,8 +204,7 @@ class ValueStateSuite extends StateVariableSuiteBase {
tryWithProviderResource(newStoreProviderWithStateVariable(true)) {
provider =>
val store = provider.getStore(0)
val handle = new StatefulProcessorHandleImpl(store, UUID.randomUUID(),
- Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]],
- TTLMode.NoTTL(), TimeoutMode.NoTimeouts())
+ Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]], TimeMode.None())
val testState: ValueState[Double] =
handle.getValueState[Double]("testState",
Encoders.scalaDouble)
@@ -235,8 +230,7 @@ class ValueStateSuite extends StateVariableSuiteBase {
tryWithProviderResource(newStoreProviderWithStateVariable(true)) {
provider =>
val store = provider.getStore(0)
val handle = new StatefulProcessorHandleImpl(store, UUID.randomUUID(),
- Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]],
- TTLMode.NoTTL(), TimeoutMode.NoTimeouts())
+ Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]], TimeMode.None())
val testState: ValueState[Long] = handle.getValueState[Long]("testState",
Encoders.scalaLong)
@@ -262,8 +256,7 @@ class ValueStateSuite extends StateVariableSuiteBase {
tryWithProviderResource(newStoreProviderWithStateVariable(true)) {
provider =>
val store = provider.getStore(0)
val handle = new StatefulProcessorHandleImpl(store, UUID.randomUUID(),
- Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]],
- TTLMode.NoTTL(), TimeoutMode.NoTimeouts())
+ Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]], TimeMode.None())
val testState: ValueState[TestClass] =
handle.getValueState[TestClass]("testState",
Encoders.product[TestClass])
@@ -289,8 +282,7 @@ class ValueStateSuite extends StateVariableSuiteBase {
tryWithProviderResource(newStoreProviderWithStateVariable(true)) {
provider =>
val store = provider.getStore(0)
val handle = new StatefulProcessorHandleImpl(store, UUID.randomUUID(),
- Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]],
- TTLMode.NoTTL(), TimeoutMode.NoTimeouts())
+ Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]], TimeMode.None())
val testState: ValueState[POJOTestClass] =
handle.getValueState[POJOTestClass]("testState",
Encoders.bean(classOf[POJOTestClass]))
@@ -318,8 +310,7 @@ class ValueStateSuite extends StateVariableSuiteBase {
val store = provider.getStore(0)
val timestampMs = 10
val handle = new StatefulProcessorHandleImpl(store, UUID.randomUUID(),
- Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]],
- TTLMode.ProcessingTimeTTL(), TimeoutMode.NoTimeouts(),
+ Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]],
TimeMode.ProcessingTime(),
batchTimestampMs = Some(timestampMs))
val ttlConfig = TTLConfig(ttlDuration = Duration.ofMinutes(1))
@@ -340,8 +331,7 @@ class ValueStateSuite extends StateVariableSuiteBase {
// increment batchProcessingTime, or watermark and ensure expired value
is not returned
val nextBatchHandle = new StatefulProcessorHandleImpl(store,
UUID.randomUUID(),
Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]],
- TTLMode.ProcessingTimeTTL(), TimeoutMode.NoTimeouts(),
- batchTimestampMs = Some(ttlExpirationMs))
+ TimeMode.ProcessingTime(), batchTimestampMs = Some(ttlExpirationMs))
val nextBatchTestState: ValueStateImplWithTTL[String] =
nextBatchHandle.getValueState[String]("testState", Encoders.STRING,
ttlConfig)
@@ -377,8 +367,7 @@ class ValueStateSuite extends StateVariableSuiteBase {
val batchTimestampMs = 10
val handle = new StatefulProcessorHandleImpl(store, UUID.randomUUID(),
Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]],
- TTLMode.ProcessingTimeTTL(), TimeoutMode.NoTimeouts(),
- batchTimestampMs = Some(batchTimestampMs))
+ TimeMode.ProcessingTime(), batchTimestampMs = Some(batchTimestampMs))
Seq(null, Duration.ZERO, Duration.ofMinutes(-1)).foreach { ttlDuration =>
val ttlConfig = TTLConfig(ttlDuration)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithListStateSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithListStateSuite.scala
index 5ccc14ab8a77..705226d51332 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithListStateSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithListStateSuite.scala
@@ -32,8 +32,7 @@ class TestListStateProcessor
override def init(
outputMode: OutputMode,
- timeoutMode: TimeoutMode,
- ttlMode: TTLMode): Unit = {
+ timeMode: TimeMode): Unit = {
_listState = getHandle.getListState("testListState", Encoders.STRING)
}
@@ -90,8 +89,7 @@ class ToggleSaveAndEmitProcessor
override def init(
outputMode: OutputMode,
- timeoutMode: TimeoutMode,
- ttlMode: TTLMode): Unit = {
+ timeMode: TimeMode): Unit = {
_listState = getHandle.getListState("testListState", Encoders.STRING)
_valueState = getHandle.getValueState("testValueState",
Encoders.scalaBoolean)
}
@@ -141,8 +139,7 @@ class TransformWithListStateSuite extends StreamTest
val result = inputData.toDS()
.groupByKey(x => x.key)
.transformWithState(new TestListStateProcessor(),
- TimeoutMode.NoTimeouts(),
- TTLMode.NoTTL(),
+ TimeMode.None(),
OutputMode.Update())
testStream(result, OutputMode.Update()) (
@@ -162,8 +159,7 @@ class TransformWithListStateSuite extends StreamTest
val result = inputData.toDS()
.groupByKey(x => x.key)
.transformWithState(new TestListStateProcessor(),
- TimeoutMode.NoTimeouts(),
- TTLMode.NoTTL(),
+ TimeMode.None(),
OutputMode.Update())
testStream(result, OutputMode.Update())(
@@ -183,8 +179,7 @@ class TransformWithListStateSuite extends StreamTest
val result = inputData.toDS()
.groupByKey(x => x.key)
.transformWithState(new TestListStateProcessor(),
- TimeoutMode.NoTimeouts(),
- TTLMode.NoTTL(),
+ TimeMode.None(),
OutputMode.Update())
testStream(result, OutputMode.Update())(
@@ -204,8 +199,7 @@ class TransformWithListStateSuite extends StreamTest
val result = inputData.toDS()
.groupByKey(x => x.key)
.transformWithState(new TestListStateProcessor(),
- TimeoutMode.NoTimeouts(),
- TTLMode.NoTTL(),
+ TimeMode.None(),
OutputMode.Update())
testStream(result, OutputMode.Update())(
@@ -225,8 +219,7 @@ class TransformWithListStateSuite extends StreamTest
val result = inputData.toDS()
.groupByKey(x => x.key)
.transformWithState(new TestListStateProcessor(),
- TimeoutMode.NoTimeouts(),
- TTLMode.NoTTL(),
+ TimeMode.None(),
OutputMode.Update())
testStream(result, OutputMode.Update())(
@@ -246,8 +239,7 @@ class TransformWithListStateSuite extends StreamTest
val result = inputData.toDS()
.groupByKey(x => x.key)
.transformWithState(new TestListStateProcessor(),
- TimeoutMode.NoTimeouts(),
- TTLMode.NoTTL(),
+ TimeMode.None(),
OutputMode.Update())
testStream(result, OutputMode.Update())(
@@ -267,8 +259,7 @@ class TransformWithListStateSuite extends StreamTest
val result = inputData.toDS()
.groupByKey(x => x.key)
.transformWithState(new TestListStateProcessor(),
- TimeoutMode.NoTimeouts(),
- TTLMode.NoTTL(),
+ TimeMode.None(),
OutputMode.Update())
testStream(result, OutputMode.Update()) (
@@ -320,8 +311,7 @@ class TransformWithListStateSuite extends StreamTest
val result = inputData.toDS()
.groupByKey(x => x)
.transformWithState(new ToggleSaveAndEmitProcessor(),
- TimeoutMode.NoTimeouts(),
- TTLMode.NoTTL(),
+ TimeMode.None(),
OutputMode.Update())
testStream(result, OutputMode.Update())(
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithMapStateSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithMapStateSuite.scala
index d32b9687d95f..0eafb16a5350 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithMapStateSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithMapStateSuite.scala
@@ -32,8 +32,7 @@ class TestMapStateProcessor
override def init(
outputMode: OutputMode,
- timeoutMode: TimeoutMode,
- ttlMode: TTLMode): Unit = {
+ timeMode: TimeMode): Unit = {
_mapState = getHandle.getMapState("sessionState", Encoders.STRING,
Encoders.STRING)
}
@@ -95,8 +94,7 @@ class TransformWithMapStateSuite extends StreamTest
val result = inputData.toDS()
.groupByKey(x => x.key)
.transformWithState(new TestMapStateProcessor(),
- TimeoutMode.NoTimeouts(),
- TTLMode.NoTTL(),
+ TimeMode.None(),
OutputMode.Update())
@@ -122,8 +120,7 @@ class TransformWithMapStateSuite extends StreamTest
val result = inputData.toDS()
.groupByKey(x => x.key)
.transformWithState(new TestMapStateProcessor(),
- TimeoutMode.NoTimeouts(),
- TTLMode.NoTTL(),
+ TimeMode.None(),
OutputMode.Update())
testStream(result, OutputMode.Update())(
@@ -147,8 +144,7 @@ class TransformWithMapStateSuite extends StreamTest
val result = inputData.toDS()
.groupByKey(x => x.key)
.transformWithState(new TestMapStateProcessor(),
- TimeoutMode.NoTimeouts(),
- TTLMode.NoTTL(),
+ TimeMode.None(),
OutputMode.Update())
testStream(result, OutputMode.Update())(
@@ -171,8 +167,7 @@ class TransformWithMapStateSuite extends StreamTest
val result = inputData.toDS()
.groupByKey(x => x.key)
.transformWithState(new TestMapStateProcessor(),
- TimeoutMode.NoTimeouts(),
- TTLMode.NoTTL(),
+ TimeMode.None(),
OutputMode.Append())
testStream(result, OutputMode.Append())(
// Test exists()
@@ -226,8 +221,7 @@ class TransformWithMapStateSuite extends StreamTest
val result = inputData.toDS()
.groupByKey(x => x.key)
.transformWithState(new TestMapStateProcessor(),
- TimeoutMode.NoTimeouts(),
- TTLMode.NoTTL(),
+ TimeMode.None(),
OutputMode.Append())
val df = result.toDF()
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateInitialStateSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateInitialStateSuite.scala
index 106f228ba78b..54cff6fc44c0 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateInitialStateSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateInitialStateSuite.scala
@@ -38,8 +38,7 @@ abstract class StatefulProcessorWithInitialStateTestClass[V]
override def init(
outputMode: OutputMode,
- timeoutMode: TimeoutMode,
- ttlMode: TTLMode): Unit = {
+ timeMode: TimeMode): Unit = {
_valState = getHandle.getValueState[Double]("testValueInit",
Encoders.scalaDouble)
_listState = getHandle.getListState[Double]("testListInit",
Encoders.scalaDouble)
_mapState = getHandle.getMapState[Double, Int](
@@ -171,8 +170,7 @@ class StatefulProcessorWithInitialStateProcTimerClass
override def init(
outputMode: OutputMode,
- timeoutMode: TimeoutMode,
- ttlMode: TTLMode) : Unit = {
+ timeMode: TimeMode) : Unit = {
_countState = getHandle.getValueState[Long]("countState",
Encoders.scalaLong)
_timerState = getHandle.getValueState[Long]("timerState",
Encoders.scalaLong)
}
@@ -215,8 +213,7 @@ class StatefulProcessorWithInitialStateEventTimerClass
override def init(
outputMode: OutputMode,
- timeoutMode: TimeoutMode,
- ttlMode: TTLMode): Unit = {
+ timeMode: TimeMode): Unit = {
_maxEventTimeState = getHandle.getValueState[Long]("maxEventTimeState",
Encoders.scalaLong)
_timerState = getHandle.getValueState[Long]("timerState",
Encoders.scalaLong)
@@ -293,7 +290,7 @@ class TransformWithStateInitialStateSuite extends
StateStoreMetricsTest
InputRowForInitialState("init_2", 100.0, List(100.0), Map(100.0 ->
1)))
.toDS().groupByKey(x => x.key).mapValues(x => x)
val query = kvDataSet.transformWithState(new
InitialStateInMemoryTestClass(),
- TimeoutMode.NoTimeouts(), TTLMode.NoTTL(), OutputMode.Append(),
initStateDf)
+ TimeMode.None(), OutputMode.Append(), initStateDf)
testStream(query, OutputMode.Update())(
// non-exist key test
@@ -371,7 +368,7 @@ class TransformWithStateInitialStateSuite extends
StateStoreMetricsTest
val query = inputData.toDS()
.groupByKey(x => x.key)
.transformWithState(new AccumulateStatefulProcessorWithInitState(),
- TimeoutMode.NoTimeouts(), TTLMode.NoTTL(), OutputMode.Append(),
initStateDf
+ TimeMode.None(), OutputMode.Append(), initStateDf
)
testStream(query, OutputMode.Update())(
AddData(inputData, InitInputRow("init_1", "add", 50.0)),
@@ -391,8 +388,7 @@ class TransformWithStateInitialStateSuite extends
StateStoreMetricsTest
val result = inputData.toDS()
.groupByKey(x => x.key)
.transformWithState(new AccumulateStatefulProcessorWithInitState(),
- TimeoutMode.NoTimeouts(),
- TTLMode.NoTTL(),
+ TimeMode.None(),
OutputMode.Append(),
createInitialDfForTest)
@@ -410,8 +406,7 @@ class TransformWithStateInitialStateSuite extends
StateStoreMetricsTest
val query = inputData.toDS()
.groupByKey(x => x.key)
.transformWithState(new AccumulateStatefulProcessorWithInitState(),
- TimeoutMode.NoTimeouts(),
- TTLMode.NoTTL(),
+ TimeMode.None(),
OutputMode.Append(),
initDf)
@@ -443,8 +438,7 @@ class TransformWithStateInitialStateSuite extends
StateStoreMetricsTest
val result = inputData.toDS().groupByKey(x => x)
.transformWithState(
new StatefulProcessorWithInitialStateProcTimerClass(),
- TimeoutMode.ProcessingTime(),
- TTLMode.NoTTL(),
+ TimeMode.ProcessingTime(),
OutputMode.Update(),
initDf)
@@ -488,8 +482,7 @@ class TransformWithStateInitialStateSuite extends
StateStoreMetricsTest
val result = eventTimeDf(inputData.toDS())
.transformWithState(
new StatefulProcessorWithInitialStateEventTimerClass(),
- TimeoutMode.EventTime(),
- TTLMode.NoTTL(),
+ TimeMode.EventTime(),
OutputMode.Update(),
initDf)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala
index 735c53bf3c91..7dec14d3e435 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala
@@ -40,8 +40,7 @@ class RunningCountStatefulProcessor extends
StatefulProcessor[String, String, (S
override def init(
outputMode: OutputMode,
- timeoutMode: TimeoutMode,
- ttlMode: TTLMode): Unit = {
+ timeMode: TimeMode): Unit = {
_countState = getHandle.getValueState[Long]("countState",
Encoders.scalaLong)
}
@@ -104,9 +103,8 @@ class RunningCountStatefulProcessorWithProcTimeTimerUpdates
override def init(
outputMode: OutputMode,
- timeoutMode: TimeoutMode,
- ttlMode: TTLMode) : Unit = {
- super.init(outputMode, timeoutMode, ttlMode)
+ timeMode: TimeMode) : Unit = {
+ super.init(outputMode, timeMode)
_timerState = getHandle.getValueState[Long]("timerState",
Encoders.scalaLong)
}
@@ -196,8 +194,7 @@ class MaxEventTimeStatefulProcessor
override def init(
outputMode: OutputMode,
- timeoutMode: TimeoutMode,
- ttlMode: TTLMode): Unit = {
+ timeMode: TimeMode): Unit = {
_maxEventTimeState = getHandle.getValueState[Long]("maxEventTimeState",
Encoders.scalaLong)
_timerState = getHandle.getValueState[Long]("timerState",
Encoders.scalaLong)
@@ -242,8 +239,7 @@ class RunningCountMostRecentStatefulProcessor
override def init(
outputMode: OutputMode,
- timeoutMode: TimeoutMode,
- ttlMode: TTLMode): Unit = {
+ timeMode: TimeMode): Unit = {
_countState = getHandle.getValueState[Long]("countState",
Encoders.scalaLong)
_mostRecent = getHandle.getValueState[String]("mostRecent",
Encoders.STRING)
}
@@ -273,8 +269,7 @@ class MostRecentStatefulProcessorWithDeletion
override def init(
outputMode: OutputMode,
- timeoutMode: TimeoutMode,
- ttlMode: TTLMode): Unit = {
+ timeMode: TimeMode): Unit = {
getHandle.deleteIfExists("countState")
_mostRecent = getHandle.getValueState[String]("mostRecent",
Encoders.STRING)
}
@@ -327,8 +322,7 @@ class TransformWithStateSuite extends StateStoreMetricsTest
val result = inputData.toDS()
.groupByKey(x => x)
.transformWithState(new RunningCountStatefulProcessorWithError(),
- TimeoutMode.NoTimeouts(),
- TTLMode.NoTTL(),
+ TimeMode.None(),
OutputMode.Update())
testStream(result, OutputMode.Update())(
@@ -349,8 +343,7 @@ class TransformWithStateSuite extends StateStoreMetricsTest
val result = inputData.toDS()
.groupByKey(x => x)
.transformWithState(new RunningCountStatefulProcessor(),
- TimeoutMode.NoTimeouts(),
- TTLMode.NoTTL(),
+ TimeMode.None(),
OutputMode.Update())
testStream(result, OutputMode.Update())(
@@ -380,8 +373,7 @@ class TransformWithStateSuite extends StateStoreMetricsTest
val result = inputData.toDS()
.groupByKey(x => x)
.transformWithState(new
RunningCountStatefulProcessorWithProcTimeTimer(),
- TimeoutMode.ProcessingTime(),
- TTLMode.NoTTL(),
+ TimeMode.ProcessingTime(),
OutputMode.Update())
testStream(result, OutputMode.Update())(
@@ -424,8 +416,7 @@ class TransformWithStateSuite extends StateStoreMetricsTest
.groupByKey(x => x)
.transformWithState(
new RunningCountStatefulProcessorWithProcTimeTimerUpdates(),
- TimeoutMode.ProcessingTime(),
- TTLMode.NoTTL(),
+ TimeMode.ProcessingTime(),
OutputMode.Update())
testStream(result, OutputMode.Update())(
@@ -461,8 +452,7 @@ class TransformWithStateSuite extends StateStoreMetricsTest
.groupByKey(x => x)
.transformWithState(
new RunningCountStatefulProcessorWithMultipleTimers(),
- TimeoutMode.ProcessingTime(),
- TTLMode.NoTTL(),
+ TimeMode.ProcessingTime(),
OutputMode.Update())
testStream(result, OutputMode.Update())(
@@ -497,8 +487,7 @@ class TransformWithStateSuite extends StateStoreMetricsTest
.groupByKey(_._1)
.transformWithState(
new MaxEventTimeStatefulProcessor(),
- TimeoutMode.EventTime(),
- TTLMode.NoTTL(),
+ TimeMode.EventTime(),
OutputMode.Update())
testStream(result, OutputMode.Update())(
@@ -539,8 +528,7 @@ class TransformWithStateSuite extends StateStoreMetricsTest
val result = inputData.toDS()
.groupByKey(x => x)
.transformWithState(new RunningCountStatefulProcessor(),
- TimeoutMode.NoTimeouts(),
- TTLMode.NoTTL(),
+ TimeMode.None(),
OutputMode.Append())
val df = result.toDF()
@@ -558,15 +546,13 @@ class TransformWithStateSuite extends
StateStoreMetricsTest
val stream1 = inputData.toDS()
.groupByKey(x => x._1)
.transformWithState(new RunningCountMostRecentStatefulProcessor(),
- TimeoutMode.NoTimeouts(),
- TTLMode.NoTTL(),
+ TimeMode.None(),
OutputMode.Update())
val stream2 = inputData.toDS()
.groupByKey(x => x._1)
.transformWithState(new MostRecentStatefulProcessorWithDeletion(),
- TimeoutMode.NoTimeouts(),
- TTLMode.NoTTL(),
+ TimeMode.None(),
OutputMode.Update())
testStream(stream1, OutputMode.Update())(
@@ -598,8 +584,7 @@ class TransformWithStateSuite extends StateStoreMetricsTest
.union(inputData2.toDS())
.groupByKey(x => x)
.transformWithState(new RunningCountStatefulProcessor(),
- TimeoutMode.NoTimeouts(),
- TTLMode.NoTTL(),
+ TimeMode.None(),
OutputMode.Update())
testStream(result, OutputMode.Update())(
@@ -632,8 +617,7 @@ class TransformWithStateSuite extends StateStoreMetricsTest
.union(inputData3.toDS())
.groupByKey(x => x)
.transformWithState(new RunningCountStatefulProcessor(),
- TimeoutMode.NoTimeouts(),
- TTLMode.NoTTL(),
+ TimeMode.None(),
OutputMode.Update())
testStream(result, OutputMode.Update())(
@@ -666,8 +650,7 @@ class TransformWithStateSuite extends StateStoreMetricsTest
.union(inputData2.toDS().map(_.toString))
.groupByKey(x => x)
.transformWithState(new RunningCountStatefulProcessor(),
- TimeoutMode.NoTimeouts(),
- TTLMode.NoTTL(),
+ TimeMode.None(),
OutputMode.Update())
testStream(result, OutputMode.Update())(
@@ -697,8 +680,7 @@ class TransformWithStateSuite extends StateStoreMetricsTest
.select("value").as[String]
.groupByKey(x => x)
.transformWithState(new RunningCountStatefulProcessor(),
- TimeoutMode.NoTimeouts(),
- TTLMode.NoTTL(),
+ TimeMode.None(),
OutputMode.Update())
}
@@ -790,8 +772,7 @@ class TransformWithStateValidationSuite extends
StateStoreMetricsTest {
val result = inputData.toDS()
.groupByKey(x => x)
.transformWithState(new RunningCountStatefulProcessor(),
- TimeoutMode.NoTimeouts(),
- TTLMode.NoTTL(),
+ TimeMode.None(),
OutputMode.Update())
testStream(result, OutputMode.Update())(
@@ -810,7 +791,7 @@ class TransformWithStateValidationSuite extends
StateStoreMetricsTest {
val result = inputData.toDS()
.groupByKey(x => x.key)
.transformWithState(new AccumulateStatefulProcessorWithInitState(),
- TimeoutMode.NoTimeouts(), TTLMode.NoTTL(), OutputMode.Append(), initDf
+ TimeMode.None(), OutputMode.Append(), initDf
)
testStream(result, OutputMode.Update())(
AddData(inputData, InitInputRow("a", "add", -1.0)),
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithValueStateTTLSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithValueStateTTLSuite.scala
index 759d535c18a3..e6dd0ace766a 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithValueStateTTLSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithValueStateTTLSuite.scala
@@ -98,8 +98,7 @@ class ValueStateTTLProcessor(ttlConfig: TTLConfig)
override def init(
outputMode: OutputMode,
- timeoutMode: TimeoutMode,
- ttlMode: TTLMode): Unit = {
+ timeMode: TimeMode): Unit = {
_valueState = getHandle
.getValueState("valueState", Encoders.scalaInt, ttlConfig)
.asInstanceOf[ValueStateImplWithTTL[Int]]
@@ -135,8 +134,7 @@ case class MultipleValueStatesTTLProcessor(
override def init(
outputMode: OutputMode,
- timeoutMode: TimeoutMode,
- ttlMode: TTLMode): Unit = {
+ timeMode: TimeMode): Unit = {
_valueStateWithTTL = getHandle
.getValueState("valueState", Encoders.scalaInt, ttlConfig)
.asInstanceOf[ValueStateImplWithTTL[Int]]
@@ -191,8 +189,7 @@ class TransformWithValueStateTTLSuite
.groupByKey(x => x.key)
.transformWithState(
new ValueStateTTLProcessor(ttlConfig),
- TimeoutMode.NoTimeouts(),
- TTLMode.ProcessingTimeTTL(),
+ TimeMode.ProcessingTime(),
OutputMode.Append())
val clock = new StreamManualClock
@@ -258,8 +255,7 @@ class TransformWithValueStateTTLSuite
.groupByKey(x => x.key)
.transformWithState(
new ValueStateTTLProcessor(ttlConfig),
- TimeoutMode.NoTimeouts(),
- TTLMode.ProcessingTimeTTL(),
+ TimeMode.ProcessingTime(),
OutputMode.Append())
val clock = new StreamManualClock
@@ -321,8 +317,7 @@ class TransformWithValueStateTTLSuite
.groupByKey(x => x.key)
.transformWithState(
new ValueStateTTLProcessor(ttlConfig),
- TimeoutMode.NoTimeouts(),
- TTLMode.ProcessingTimeTTL(),
+ TimeMode.ProcessingTime(),
OutputMode.Append())
val clock = new StreamManualClock
@@ -375,8 +370,7 @@ class TransformWithValueStateTTLSuite
.groupByKey(x => x.key)
.transformWithState(
MultipleValueStatesTTLProcessor(ttlKey, noTtlKey, ttlConfig),
- TimeoutMode.NoTimeouts(),
- TTLMode.ProcessingTimeTTL(),
+ TimeMode.ProcessingTime(),
OutputMode.Append())
val clock = new StreamManualClock
@@ -430,8 +424,7 @@ class TransformWithValueStateTTLSuite
.groupByKey(x => x.key)
.transformWithState(
new ValueStateTTLProcessor(ttlConfig),
- TimeoutMode.NoTimeouts(),
- TTLMode.ProcessingTimeTTL(),
+ TimeMode.ProcessingTime(),
OutputMode.Append())
val clock = new StreamManualClock
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]