This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 75424734ea11 [SPARK-55416][SS][PYTHON] Streaming Python Data Source 
memory leak when end-offset is not updated
75424734ea11 is described below

commit 75424734ea117776e7257f89046a82801d6730c6
Author: vinodkc <[email protected]>
AuthorDate: Fri Feb 13 16:42:22 2026 +0900

    [SPARK-55416][SS][PYTHON] Streaming Python Data Source memory leak when 
end-offset is not updated
    
    ### What changes were proposed in this pull request?
    
    In `_SimpleStreamReaderWrapper.latestOffset()`, validate that  custom 
implementation of datasource based on `SimpleDataSourceStreamReader.read()` 
does not return a non-empty batch with end == start. If it does, raise 
PySparkException with error class `SIMPLE_STREAM_READER_OFFSET_DID_NOT_ADVANCE` 
before appending to the cache. Empty batches with end == start remain allowed.
    
    ### Why are the changes needed?
    
    When a user implements read(start) incorrectly and returns:
    
    - Same offset for both: end = start (e.g. both {"offset": 0}).
    - Non-empty iterator: e.g. 2 rows.
    
    If a reader returns end == start with data (e.g. return (it, {"offset": 
start_idx})), the wrapper keeps appending to its prefetch cache on every 
trigger while commit(end) never trims entries (first matching index is 0). The 
cache grows without bound and driver (non-JVM) memory increases until OOM. 
Validating and raising error before appending stops this and fails fast with a 
clear error.
    
    Empty batches with end == start remain allowed , it will allow the Python 
data source to represent that there is no data to read.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes. Implementations that return end == start with a non-empty iterator now 
get PySparkException instead of unbounded memory growth. Empty batches with end 
== start are unchanged.
    
    ### How was this patch tested?
    
    Added unit test in `test_python_streaming_datasource.py`
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #54237 from vinodkc/br_SPARK-55416.
    
    Authored-by: vinodkc <[email protected]>
    Signed-off-by: Jungtaek Lim <[email protected]>
---
 .../source/tutorial/sql/python_data_source.rst     |  8 +++-
 python/pyspark/errors/error-conditions.json        |  5 ++
 python/pyspark/sql/datasource_internal.py          | 26 +++++++++-
 .../sql/tests/test_python_streaming_datasource.py  | 55 ++++++++++++++++++++++
 4 files changed, 92 insertions(+), 2 deletions(-)

diff --git a/python/docs/source/tutorial/sql/python_data_source.rst 
b/python/docs/source/tutorial/sql/python_data_source.rst
index b3267405ffdd..07f35722e73f 100644
--- a/python/docs/source/tutorial/sql/python_data_source.rst
+++ b/python/docs/source/tutorial/sql/python_data_source.rst
@@ -309,7 +309,13 @@ This is the same dummy streaming reader that generates 2 
rows every batch implem
         def read(self, start: dict) -> Tuple[Iterator[Tuple], dict]:
             """
             Takes start offset as an input, return an iterator of tuples and
-            the start offset of next read.
+            the end offset (start offset for the next read). The end offset 
must
+            advance past the start offset when returning data; otherwise Spark
+            raises a validation exception.
+            For example, returning 2 records from start_idx 0 means end should
+            be {"offset": 2} (i.e. start + 2).
+            When there is no data to read, you may return the same offset as 
end and
+            start, but you must provide an empty iterator.
             """
             start_idx = start["offset"]
             it = iter([(i,) for i in range(start_idx, start_idx + 2)])
diff --git a/python/pyspark/errors/error-conditions.json 
b/python/pyspark/errors/error-conditions.json
index ee35e237b898..bbc4d005b490 100644
--- a/python/pyspark/errors/error-conditions.json
+++ b/python/pyspark/errors/error-conditions.json
@@ -1185,6 +1185,11 @@
       "SparkContext or SparkSession should be created first."
     ]
   },
+  "SIMPLE_STREAM_READER_OFFSET_DID_NOT_ADVANCE": {
+    "message": [
+      "SimpleDataSourceStreamReader.read() returned a non-empty batch but the 
end offset: <end_offset> did not advance past the start offset: <start_offset>. 
The end offset must represent the position after the last record returned."
+    ]
+  },
   "SLICE_WITH_STEP": {
     "message": [
       "Slice with step is not supported."
diff --git a/python/pyspark/sql/datasource_internal.py 
b/python/pyspark/sql/datasource_internal.py
index 92a968cf0572..2ac6c280e822 100644
--- a/python/pyspark/sql/datasource_internal.py
+++ b/python/pyspark/sql/datasource_internal.py
@@ -93,6 +93,30 @@ class _SimpleStreamReaderWrapper(DataSourceStreamReader):
         # We do not consider providing different read limit on simple stream 
reader.
         return ReadAllAvailable()
 
+    def add_result_to_cache(self, start: dict, end: dict, it: Iterator[Tuple]) 
-> None:
+        """
+        Validates that read() did not return a non-empty batch with end equal 
to start,
+        which would cause the same batch to be processed repeatedly. When end 
!= start,
+        appends the result to the cache; when end == start with empty 
iterator, does not
+        cache (avoids unbounded cache growth).
+        """
+        start_str = json.dumps(start)
+        end_str = json.dumps(end)
+        if end_str != start_str:
+            self.cache.append(PrefetchedCacheEntry(start, end, it))
+            return
+        try:
+            next(it)
+        except StopIteration:
+            return
+        raise PySparkException(
+            errorClass="SIMPLE_STREAM_READER_OFFSET_DID_NOT_ADVANCE",
+            messageParameters={
+                "start_offset": start_str,
+                "end_offset": end_str,
+            },
+        )
+
     def latestOffset(self, start: dict, limit: ReadLimit) -> dict:
         assert start is not None, "start offset should not be None"
         assert isinstance(
@@ -100,7 +124,7 @@ class _SimpleStreamReaderWrapper(DataSourceStreamReader):
         ), "simple stream reader does not support read limit"
 
         (iter, end) = self.simple_reader.read(start)
-        self.cache.append(PrefetchedCacheEntry(start, end, iter))
+        self.add_result_to_cache(start, end, iter)
         return end
 
     def commit(self, end: dict) -> None:
diff --git a/python/pyspark/sql/tests/test_python_streaming_datasource.py 
b/python/pyspark/sql/tests/test_python_streaming_datasource.py
index bef85f7ba845..5f6aaf10fe01 100644
--- a/python/pyspark/sql/tests/test_python_streaming_datasource.py
+++ b/python/pyspark/sql/tests/test_python_streaming_datasource.py
@@ -41,6 +41,7 @@ from pyspark.testing.sqlutils import (
     have_pyarrow,
     pyarrow_requirement_message,
 )
+from pyspark.errors import PySparkException
 from pyspark.testing import assertDataFrameEqual
 from pyspark.testing.utils import eventually
 from pyspark.testing.sqlutils import ReusedSQLTestCase
@@ -509,6 +510,60 @@ class BasePythonStreamingDataSourceTestsMixin:
         q.awaitTermination(timeout=30)
         self.assertIsNone(q.exception(), "No exception has to be propagated.")
 
+    def test_simple_stream_reader_offset_did_not_advance_raises(self):
+        """Validate that returning end == start with non-empty data raises 
SIMPLE_STREAM_READER_OFFSET_DID_NOT_ADVANCE."""
+        from pyspark.sql.datasource_internal import _SimpleStreamReaderWrapper
+
+        class BuggySimpleStreamReader(SimpleDataSourceStreamReader):
+            def initialOffset(self):
+                return {"offset": 0}
+
+            def read(self, start: dict):
+                # Bug: return same offset as end despite returning data
+                start_idx = start["offset"]
+                it = iter([(i,) for i in range(start_idx, start_idx + 3)])
+                return (it, start)
+
+            def readBetweenOffsets(self, start: dict, end: dict):
+                return iter([])
+
+            def commit(self, end: dict):
+                pass
+
+        reader = BuggySimpleStreamReader()
+        wrapper = _SimpleStreamReaderWrapper(reader)
+        with self.assertRaises(PySparkException) as cm:
+            wrapper.latestOffset({"offset": 0}, ReadAllAvailable())
+        self.assertEqual(
+            cm.exception.getCondition(),
+            "SIMPLE_STREAM_READER_OFFSET_DID_NOT_ADVANCE",
+        )
+
+    def 
test_simple_stream_reader_empty_iterator_start_equals_end_allowed(self):
+        """When read() returns end == start with an empty iterator, no 
exception and no cache entry."""
+        from pyspark.sql.datasource_internal import _SimpleStreamReaderWrapper
+
+        class EmptyBatchReader(SimpleDataSourceStreamReader):
+            def initialOffset(self):
+                return {"offset": 0}
+
+            def read(self, start: dict):
+                # Valid: same offset as end but empty iterator (no data)
+                return (iter([]), start)
+
+            def readBetweenOffsets(self, start: dict, end: dict):
+                return iter([])
+
+            def commit(self, end: dict):
+                pass
+
+        reader = EmptyBatchReader()
+        wrapper = _SimpleStreamReaderWrapper(reader)
+        start = {"offset": 0}
+        end = wrapper.latestOffset(start, ReadAllAvailable())
+        self.assertEqual(end, start)
+        self.assertEqual(len(wrapper.cache), 0)
+
     def test_stream_writer(self):
         input_dir = 
tempfile.TemporaryDirectory(prefix="test_data_stream_write_input")
         output_dir = 
tempfile.TemporaryDirectory(prefix="test_data_stream_write_output")


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to