This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new f9c5eb808ca1 [SPARK-51237][SS] Add API details for new 
transformWithState helper APIs as needed
f9c5eb808ca1 is described below

commit f9c5eb808ca1419f9eea3fc07642ce0d3d15dc54
Author: Anish Shrigondekar <[email protected]>
AuthorDate: Mon Feb 17 21:56:16 2025 +0900

    [SPARK-51237][SS] Add API details for new transformWithState helper APIs as 
needed
    
    ### What changes were proposed in this pull request?
    Add API details for new transformWithState helper APIs as needed
    
    ### Why are the changes needed?
    Improve API docs for user reference
    
    ### Does this PR introduce _any_ user-facing change?
    Yes
    
    ### How was this patch tested?
    Comments only change. Existing unit tests
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #49978 from anishshri-db/task/SPARK-51237.
    
    Authored-by: Anish Shrigondekar <[email protected]>
    Signed-off-by: Jungtaek Lim <[email protected]>
    (cherry picked from commit 17b943106b713d39767dc63110c9e2e878e6dd1c)
    Signed-off-by: Jungtaek Lim <[email protected]>
---
 .../spark/sql/streaming/ExpiredTimerInfo.scala     |  4 +-
 .../org/apache/spark/sql/streaming/ListState.scala | 46 ++++++++++++--
 .../org/apache/spark/sql/streaming/MapState.scala  | 73 +++++++++++++++++++---
 .../org/apache/spark/sql/streaming/QueryInfo.scala | 24 +++++--
 .../spark/sql/streaming/StatefulProcessor.scala    | 16 +++++
 .../sql/streaming/StatefulProcessorHandle.scala    | 16 ++++-
 .../apache/spark/sql/streaming/TimerValues.scala   |  9 ++-
 .../apache/spark/sql/streaming/ValueState.scala    | 25 ++++++--
 8 files changed, 184 insertions(+), 29 deletions(-)

diff --git 
a/sql/api/src/main/scala/org/apache/spark/sql/streaming/ExpiredTimerInfo.scala 
b/sql/api/src/main/scala/org/apache/spark/sql/streaming/ExpiredTimerInfo.scala
index 31075f00e56f..3772e274c441 100644
--- 
a/sql/api/src/main/scala/org/apache/spark/sql/streaming/ExpiredTimerInfo.scala
+++ 
b/sql/api/src/main/scala/org/apache/spark/sql/streaming/ExpiredTimerInfo.scala
@@ -28,7 +28,9 @@ import org.apache.spark.annotation.Evolving
 trait ExpiredTimerInfo extends Serializable {
 
   /**
-   * Get the expired timer's expiry time as milliseconds in epoch time.
+   * Function to return the expired timer's expiry time as milliseconds in 
epoch time.
+   *
+   * @return - the expired timer's expiry time in milliseconds
    */
   def getExpiryTimeInMs(): Long
 }
diff --git 
a/sql/api/src/main/scala/org/apache/spark/sql/streaming/ListState.scala 
b/sql/api/src/main/scala/org/apache/spark/sql/streaming/ListState.scala
index 79b0d10072e8..16f3625d2a51 100644
--- a/sql/api/src/main/scala/org/apache/spark/sql/streaming/ListState.scala
+++ b/sql/api/src/main/scala/org/apache/spark/sql/streaming/ListState.scala
@@ -24,21 +24,55 @@ import org.apache.spark.annotation.Evolving
  */
 trait ListState[S] extends Serializable {
 
-  /** Whether state exists or not. */
+  /**
+   * Function to check whether state exists for current grouping key or not.
+   *
+   * @return - true if state exists, false otherwise.
+   */
   def exists(): Boolean
 
-  /** Get the state value. An empty iterator is returned if no value exists. */
+  /**
+   * Function to get the list of elements in the state as an iterator. If the 
state does not exist,
+   * an empty iterator is returned.
+   *
+   * Note that it's always recommended to check whether the state exists or 
not by calling exists()
+   * before calling get().
+   *
+   * @return - an iterator of elements in the state if it exists, an empty 
iterator otherwise.
+   */
   def get(): Iterator[S]
 
-  /** Update the value of the list. */
+  /**
+   * Function to update the value of the state with a new list.
+   *
+   * Note that this will replace the existing value with the new value.
+   *
+   * @param newState - new list of elements
+   */
   def put(newState: Array[S]): Unit
 
-  /** Append an entry to the list */
+  /**
+   * Function to append a single entry to the existing state list.
+   *
+   * Note that if this is the first time the state is being appended to, the 
state will be
+   * initialized to an empty list before appending the new entry.
+   *
+   * @param newState - single list element to be appended
+   */
   def appendValue(newState: S): Unit
 
-  /** Append an entire list to the existing value */
+  /**
+   * Function to append a list of entries to the existing state list.
+   *
+   * Note that if this is the first time the state is being appended to, the 
state will be
+   * initialized to an empty list before appending the new entries.
+   *
+   * @param newState - list of elements to be appended
+   */
   def appendList(newState: Array[S]): Unit
 
-  /** Removes this state for the given grouping key. */
+  /**
+   * Function to remove the state for the current grouping key.
+   */
   def clear(): Unit
 }
diff --git 
a/sql/api/src/main/scala/org/apache/spark/sql/streaming/MapState.scala 
b/sql/api/src/main/scala/org/apache/spark/sql/streaming/MapState.scala
index c514b4e375f8..8459bbcc257d 100644
--- a/sql/api/src/main/scala/org/apache/spark/sql/streaming/MapState.scala
+++ b/sql/api/src/main/scala/org/apache/spark/sql/streaming/MapState.scala
@@ -24,30 +24,85 @@ import org.apache.spark.annotation.Evolving
  */
 trait MapState[K, V] extends Serializable {
 
-  /** Whether state exists or not. */
+  /**
+   * Function to check whether any user map entry exists for current grouping 
key or not.
+   *
+   * @return - true if state exists, false otherwise.
+   */
   def exists(): Boolean
 
-  /** Get the state value if it exists */
+  /**
+   * Function to get the state value for current grouping key and user map key.
+   * If the state exists, the value is returned. If the state does not exist,
+   * the default value for the type is returned for AnyVal types and null for 
AnyRef types.
+   *
+   * Note that it's always recommended to check whether the state exists or 
not by calling exists()
+   * before calling get().
+   *
+   * @return - the value of the state if it exists. If the state does not 
exist, the default value
+   *           for the type is returned for AnyVal types and null for AnyRef 
types.
+   */
   def getValue(key: K): V
 
-  /** Check if the user key is contained in the map */
+  /**
+   * Function to check if the user map key is contained in the map for the 
current grouping key.
+   *
+   * @param key - user map key
+   *
+   * @return - true if the user key is present in the map, false otherwise.
+   */
   def containsKey(key: K): Boolean
 
-  /** Update value for given user key */
+  /**
+   * Function to add or update the map entry for the current grouping key.
+   *
+   * Note that this function will add the user map key and value if the user 
map key is not
+   * present in the map associated with the current grouping key.
+   * If the user map key is already present in the associated map, the value 
for the user key
+   * will be updated to the new user map value.
+   *
+   * @param key - user map key
+   * @param value - user map value
+   */
   def updateValue(key: K, value: V): Unit
 
-  /** Get the map associated with grouping key */
+  /**
+   * Function to return the iterator of user map key-value pairs present in 
the map for the
+   * current grouping key.
+   *
+   * @return - iterator of user map key-value pairs if the map is not empty
+   *           and empty iterator otherwise.
+   */
   def iterator(): Iterator[(K, V)]
 
-  /** Get the list of keys present in map associated with grouping key */
+  /**
+   * Function to return the user map keys present in the map for the current 
grouping key.
+   *
+   * @return - iterator of user map keys if the map is not empty, empty 
iterator otherwise.
+   */
   def keys(): Iterator[K]
 
-  /** Get the list of values present in map associated with grouping key */
+  /**
+   * Function to return the user map values present in the map for the current 
grouping key.
+   *
+   * @return - iterator of user map values if the map is not empty, empty 
iterator otherwise.
+   */
   def values(): Iterator[V]
 
-  /** Remove user key from map state */
+  /**
+   * Function to remove the user map key from the map for the current grouping 
key.
+   *
+   * Note that this function will remove the user map key and its associated 
value from the map
+   * associated with the current grouping key. If the user map key is not 
present in the map,
+   * this function will not do anything.
+   *
+   * @param key - user map key
+   */
   def removeKey(key: K): Unit
 
-  /** Remove this state. */
+  /**
+   * Function to remove the state for the current grouping key. Note that this 
removes the entire
+   * map state associated with the current grouping key.
+   */
   def clear(): Unit
 }
diff --git 
a/sql/api/src/main/scala/org/apache/spark/sql/streaming/QueryInfo.scala 
b/sql/api/src/main/scala/org/apache/spark/sql/streaming/QueryInfo.scala
index 2b56c92f8549..c1d47f4c28bd 100644
--- a/sql/api/src/main/scala/org/apache/spark/sql/streaming/QueryInfo.scala
+++ b/sql/api/src/main/scala/org/apache/spark/sql/streaming/QueryInfo.scala
@@ -28,15 +28,31 @@ import org.apache.spark.annotation.Evolving
 @Evolving
 trait QueryInfo extends Serializable {
 
-  /** Returns the streaming query id associated with stateful operator */
+  /**
+   * Function to return unique streaming query id associated with stateful 
operator.
+   *
+   * @return - the unique query id.
+   */
   def getQueryId: UUID
 
-  /** Returns the streaming query runId associated with stateful operator */
+  /**
+   * Function to return unique streaming query run id associated with stateful 
operator.
+   *
+   * @return - the unique query run id.
+   */
   def getRunId: UUID
 
-  /** Returns the batch id associated with stateful operator */
+  /**
+   * Function to return unique batch id associated with stateful operator.
+   *
+   * @return - the unique batch id.
+   */
   def getBatchId: Long
 
-  /** Returns the string representation of QueryInfo object */
+  /**
+   * Function to return string representation of the query info.
+   *
+   * @return - query info as string.
+   */
   def toString: String
 }
diff --git 
a/sql/api/src/main/scala/org/apache/spark/sql/streaming/StatefulProcessor.scala 
b/sql/api/src/main/scala/org/apache/spark/sql/streaming/StatefulProcessor.scala
index f0ea1dcd6871..c2cb32cccab4 100644
--- 
a/sql/api/src/main/scala/org/apache/spark/sql/streaming/StatefulProcessor.scala
+++ 
b/sql/api/src/main/scala/org/apache/spark/sql/streaming/StatefulProcessor.scala
@@ -57,6 +57,11 @@ abstract class StatefulProcessor[K, I, O] extends 
Serializable {
   /**
    * Function that will allow users to interact with input data rows along 
with the grouping key
    * and current timer values and optionally provide output rows.
+   *
+   * Note that in microbatch mode, input rows for a given grouping key will be 
provided in a
+   * single function invocation. If the grouping key is not seen in the 
current microbatch, this
+   * function will not be invoked for that key.
+   *
    * @param key
    *   \- grouping key
    * @param inputRows
@@ -64,6 +69,7 @@ abstract class StatefulProcessor[K, I, O] extends 
Serializable {
    * @param timerValues
    *   \- instance of TimerValues that provides access to current 
processing/event time if
    *   available
+   *
    * @return
    *   \- Zero or more output rows
    */
@@ -72,12 +78,18 @@ abstract class StatefulProcessor[K, I, O] extends 
Serializable {
   /**
    * Function that will be invoked when a timer is fired for a given key. 
Users can choose to
    * evict state, register new timers and optionally provide output rows.
+   *
+   * Note that in microbatch mode, this function will be called once for each 
unique timer expiry
+   * for a given key. If no timer expires for a given key, this function will 
not be invoked for
+   * that key.
+   *
    * @param key
    *   \- grouping key
    * @param timerValues
    *   \- instance of TimerValues that provides access to current 
processing/event
    * @param expiredTimerInfo
    *   \- instance of ExpiredTimerInfo that provides access to expired timer
+   *
    * @return
    *   Zero or more output rows
    */
@@ -131,6 +143,10 @@ abstract class StatefulProcessorWithInitialState[K, I, O, 
S] extends StatefulPro
    * the input rows, e.g. dataframe from data source reader of existing 
streaming query
    * checkpoint.
    *
+   * Note that in microbatch mode, this function can be called for one or more 
times per grouping
+   * key. If the grouping key is not seen within the initial state dataframe 
rows, then the
+   * function will not be invoked for that key.
+   *
    * @param key
    *   \- grouping key
    * @param initialState
diff --git 
a/sql/api/src/main/scala/org/apache/spark/sql/streaming/StatefulProcessorHandle.scala
 
b/sql/api/src/main/scala/org/apache/spark/sql/streaming/StatefulProcessorHandle.scala
index 5a6d9f6c76ea..eae04db06b26 100644
--- 
a/sql/api/src/main/scala/org/apache/spark/sql/streaming/StatefulProcessorHandle.scala
+++ 
b/sql/api/src/main/scala/org/apache/spark/sql/streaming/StatefulProcessorHandle.scala
@@ -46,6 +46,7 @@ trait StatefulProcessorHandle extends Serializable {
    *   \- the ttl configuration (time to live duration etc.)
    * @tparam T
    *   \- type of state variable
+   *
    * @return
    *   \- instance of ValueState of type T that can be used to store state 
persistently
    */
@@ -71,6 +72,7 @@ trait StatefulProcessorHandle extends Serializable {
    *   \- the ttl configuration (time to live duration etc.)
    * @tparam T
    *   \- type of state variable
+   *
    * @return
    *   \- instance of ValueState of type T that can be used to store state 
persistently
    */
@@ -95,6 +97,7 @@ trait StatefulProcessorHandle extends Serializable {
    *   \- the ttl configuration (time to live duration etc.)
    * @tparam T
    *   \- type of state variable
+   *
    * @return
    *   \- instance of ListState of type T that can be used to store state 
persistently
    */
@@ -120,6 +123,7 @@ trait StatefulProcessorHandle extends Serializable {
    *   \- the ttl configuration (time to live duration etc.)
    * @tparam T
    *   \- type of state variable
+   *
    * @return
    *   \- instance of ListState of type T that can be used to store state 
persistently
    */
@@ -148,6 +152,7 @@ trait StatefulProcessorHandle extends Serializable {
    *   \- type of key for map state variable
    * @tparam V
    *   \- type of value for map state variable
+   *
    * @return
    *   \- instance of MapState of type [K,V] that can be used to store state 
persistently
    */
@@ -176,17 +181,23 @@ trait StatefulProcessorHandle extends Serializable {
    *   \- type of key for map state variable
    * @tparam V
    *   \- type of value for map state variable
+   *
    * @return
    *   \- instance of MapState of type [K,V] that can be used to store state 
persistently
    */
   def getMapState[K: Encoder, V: Encoder](stateName: String, ttlConfig: 
TTLConfig): MapState[K, V]
 
-  /** Function to return queryInfo for currently running task */
+  /**
+   * Function to return query info for the current query
+   *
+   * @return - QueryInfo object with access to streaming query metadata
+   */
   def getQueryInfo(): QueryInfo
 
   /**
    * Function to register a processing/event time based timer for given 
implicit grouping key and
    * provided timestamp
+   *
    * @param expiryTimestampMs
    *   \- timer expiry timestamp in milliseconds
    */
@@ -195,6 +206,7 @@ trait StatefulProcessorHandle extends Serializable {
   /**
    * Function to delete a processing/event time based timer for given implicit 
grouping key and
    * provided timestamp
+   *
    * @param expiryTimestampMs
    *   \- timer expiry timestamp in milliseconds
    */
@@ -205,6 +217,7 @@ trait StatefulProcessorHandle extends Serializable {
    * listTimers() within the `handleInputRows` method of the StatefulProcessor 
will return all the
    * unprocessed registered timers, including the one being fired within the 
invocation of
    * `handleInputRows`.
+   *
    * @return
    *   \- list of all the registered timers for given implicit grouping key
    */
@@ -212,6 +225,7 @@ trait StatefulProcessorHandle extends Serializable {
 
   /**
    * Function to delete and purge state variable if defined previously
+   *
    * @param stateName
    *   \- name of the state variable
    */
diff --git 
a/sql/api/src/main/scala/org/apache/spark/sql/streaming/TimerValues.scala 
b/sql/api/src/main/scala/org/apache/spark/sql/streaming/TimerValues.scala
index a3480065965e..f277a20dac2a 100644
--- a/sql/api/src/main/scala/org/apache/spark/sql/streaming/TimerValues.scala
+++ b/sql/api/src/main/scala/org/apache/spark/sql/streaming/TimerValues.scala
@@ -29,21 +29,26 @@ import org.apache.spark.annotation.Evolving
 trait TimerValues extends Serializable {
 
   /**
-   * Get the current processing time as milliseconds in epoch time.
+   * Function to get the current processing time as milliseconds in epoch time.
+   *
    * @note
    *   This will return a constant value throughout the duration of a 
streaming query trigger,
    *   even if the trigger is re-executed.
+   *
+   * @return - the current processing time in milliseconds
    */
   def getCurrentProcessingTimeInMs(): Long
 
   /**
-   * Get the current event time watermark as milliseconds in epoch time.
+   * Function to get the current event time watermark as milliseconds in epoch 
time.
    *
    * @note
    *   This can be called only when watermark is set before calling 
`transformWithState`.
    * @note
    *   The watermark gets propagated at the end of each query. As a result, 
this method will
    *   return 0 (1970-01-01T00:00:00) for the first micro-batch.
+   *
+   * @return - the current event time watermark in milliseconds
    */
   def getCurrentWatermarkInMs(): Long
 }
diff --git 
a/sql/api/src/main/scala/org/apache/spark/sql/streaming/ValueState.scala 
b/sql/api/src/main/scala/org/apache/spark/sql/streaming/ValueState.scala
index 2910da573157..94b5f71173eb 100644
--- a/sql/api/src/main/scala/org/apache/spark/sql/streaming/ValueState.scala
+++ b/sql/api/src/main/scala/org/apache/spark/sql/streaming/ValueState.scala
@@ -27,22 +27,35 @@ import org.apache.spark.annotation.Evolving
  */
 trait ValueState[S] extends Serializable {
 
-  /** Whether state exists or not. */
+  /**
+   * Function to check whether state exists for current grouping key or not.
+   *
+   * @return - true if state exists, false otherwise.
+   */
   def exists(): Boolean
 
   /**
-   * Get the state value if it exists or return null otherwise.
+   * Function to get the state value for the current grouping key.
+   * If the state exists, the value is returned. If the state does not exist,
+   * the default value for the type is returned for AnyVal types and null for 
AnyRef types.
+   *
+   * Note that it's always recommended to check whether the state exists or 
not by calling exists()
+   * before calling get().
+   *
+   * @return - the value of the state if it exists. If the state does not 
exist, the default value
+   *           for the type is returned for AnyVal types and null for AnyRef 
types.
    */
   def get(): S
 
   /**
-   * Update the value of the state.
+   * Function to update the value of the state for the current grouping key to 
the new value.
    *
-   * @param newState
-   *   the new value
+   * @param newState - the new value
    */
   def update(newState: S): Unit
 
-  /** Remove this state. */
+  /**
+   * Function to remove the state for the current grouping key.
+   */
   def clear(): Unit
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to