This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 48322c7ac541 [SPARK-51933][SS][DOCS] Document the new API 
`transformWithState` in PySpark
48322c7ac541 is described below

commit 48322c7ac5417e821e05e7e54d06f6614f809241
Author: Jungtaek Lim <kabhwan.opensou...@gmail.com>
AuthorDate: Tue Apr 29 06:54:49 2025 +0900

    [SPARK-51933][SS][DOCS] Document the new API `transformWithState` in PySpark
    
    ### What changes were proposed in this pull request?
    
    This PR proposes to document the new API named `transformWithState` in 
PySpark.
    
    The new API is quite the same with the existing API 
`transformWithStateInPandas`, except the user facing type of StatefulProcessor. 
We could reuse most content and simply produce the code example along with 
existing example.
    
    ### Why are the changes needed?
    
    Doc needs to reflect the new API.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, document is updated to guide the new API.
    
    ### How was this patch tested?
    
    Confirmed with jekyll - `SKIP_API=1 bundle exec jekyll serve --watch`. Here 
are screenshots for changed parts.
    
    <img width="874" alt="스크린샷 2025-04-28 오전 11 16 28" 
src="https://github.com/user-attachments/assets/c5c9c8f2-2c00-491e-90e5-f209f6a7077e";
 />
    
    <img width="709" alt="스크린샷 2025-04-28 오전 11 16 51" 
src="https://github.com/user-attachments/assets/906dd64d-deac-4b3a-9790-d12d8647b0b1";
 />
    
    <img width="700" alt="스크린샷 2025-04-28 오전 11 16 59" 
src="https://github.com/user-attachments/assets/d738c3c6-5887-4ad4-97fc-5a474263ddcc";
 />
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #50738 from HeartSaVioR/SPARK-51933.
    
    Authored-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
---
 .../structured-streaming-transform-with-state.md   | 80 +++++++++++++++++++++-
 1 file changed, 77 insertions(+), 3 deletions(-)

diff --git a/docs/streaming/structured-streaming-transform-with-state.md 
b/docs/streaming/structured-streaming-transform-with-state.md
index 2ab6f371817b..050337f2dd77 100644
--- a/docs/streaming/structured-streaming-transform-with-state.md
+++ b/docs/streaming/structured-streaming-transform-with-state.md
@@ -27,7 +27,11 @@ This operator has support for an umbrella of features such 
as object-oriented st
 
 # Language Support
 
-`TransformWithState` is available in Scala, Java and Python. Note that in 
Python, the operator name is called `transformWithStateInPandas` similar to 
other operators interacting with the Pandas interface in Apache Spark.
+`TransformWithState` is available in Scala, Java and Python.
+
+Note that in Python, there are two operators named 
`transformWithStateInPandas` which works with Pandas interface, and 
`transformWithState` which works with Row interface.
+
+Based on popularity of Pandas and its rich set of API with vectorization, 
`transformWithStateInPandas` may be the preferred API for most users. The 
`transformWithState` API is more suitable to handle high key cardinality use 
case, since the cost of conversion is considerably high for Pandas API. If 
users aren't familiar with Pandas, Row type API might be easier to learn.
 
 # Components of a TransformWithState Query
 
@@ -118,9 +122,11 @@ Here is an example of a StatefulProcessor that implements 
a downtime detector. E
 
 When a timer expires, the application emits the elapsed time since the last 
observed event for the key. It then sets a new timer to emit an update 10 
seconds later.
 
+NOTE: `python_Pandas` tab guides the implementation of StatefulProcessor for 
`transformWithStateInPandas`, and `python_Row` tab guides the implementation of 
StatefulProcessor for `transformWithState`.
+
 <div class="codetabs">
 
-<div data-lang="python"  markdown="1">
+<div data-lang="python_Pandas"  markdown="1">
 
 {% highlight python %}
 
@@ -178,6 +184,58 @@ class DownTimeDetector(StatefulProcessor):
 
 </div>
 
+<div data-lang="python_Row"  markdown="1">
+
+{% highlight python %}
+
+class DownTimeDetector(StatefulProcessor):
+    def init(self, handle: StatefulProcessorHandle) -> None:
+        # Define schema for the state value (timestamp)
+        state_schema = StructType([StructField("value", TimestampType(), 
True)])
+        self.handle = handle
+        # Initialize state to store the last seen timestamp for each key
+        self.last_seen = handle.getValueState("last_seen", state_schema)
+
+    def handleExpiredTimer(self, key, timerValues, expiredTimerInfo) -> 
Iterator[Row]:
+        latest_from_existing = self.last_seen.get()
+        # Calculate downtime duration
+        downtime_duration = timerValues.getCurrentProcessingTimeInMs() - 
int(latest_from_existing.timestamp() * 1000)
+        # Register a new timer for 10 seconds in the future
+        self.handle.registerTimer(timerValues.getCurrentProcessingTimeInMs() + 
10000)
+        # Yield a DataFrame with the key and downtime duration
+        yield Row(id=key[0], timeValues=str(downtime_duration))
+
+    def handleInputRows(self, key, rows, timerValues) -> Iterator[Row]:
+        # Find the maximum timestamp
+        max_timestamp = max(map(lambda x: x.timestamp, rows))
+
+        # Get the latest timestamp from existing state or use epoch start if 
not exists
+        if self.last_seen.exists():
+            latest_from_existing = self.last_seen.get()
+        else:
+            latest_from_existing = datetime.fromtimestamp(0)
+
+        # If new data is more recent than existing state
+        if latest_from_existing < max_timestamp:
+            # Delete all existing timers
+            for timer in self.handle.listTimers():
+                self.handle.deleteTimer(timer)
+            # Update the last seen timestamp
+            self.last_seen.update((max_timestamp,))
+
+        # Register a new timer for 5 seconds in the future
+        self.handle.registerTimer(timerValues.getCurrentProcessingTimeInMs() + 
5000)
+
+        return iter([])
+
+    def close(self) -> None:
+        # No cleanup needed
+        pass
+
+{% endhighlight %}
+
+</div>
+
 <div data-lang="scala"  markdown="1">
 
 {% highlight scala %}
@@ -252,7 +310,7 @@ class DowntimeDetector(duration: Duration) extends
 Now that we have defined the `StatefulProcessor`, we can use it in a streaming 
query. The following code snippets show how to use the `StatefulProcessor` in a 
streaming query in Python and Scala.
 
 <div class="codetabs">
-<div data-lang="python"  markdown="1">
+<div data-lang="python_Pandas"  markdown="1">
 
 {% highlight python %}
 
@@ -264,6 +322,22 @@ q = (df.groupBy("key")
     timeMode="None",
   )
   .writeStream...
+
+{% endhighlight %}
+</div>
+
+<div data-lang="python_Row"  markdown="1">
+
+{% highlight python %}
+
+q = (df.groupBy("key")
+  .transformWithState(
+    statefulProcessor=DownTimeDetector(),
+    outputStructType=output_schema,
+    outputMode="Update",
+    timeMode="None",
+  )
+  .writeStream...
   
 {% endhighlight %}
 </div>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to