This is an automated email from the ASF dual-hosted git repository.
kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 75424734ea11 [SPARK-55416][SS][PYTHON] Streaming Python Data Source
memory leak when end-offset is not updated
75424734ea11 is described below
commit 75424734ea117776e7257f89046a82801d6730c6
Author: vinodkc <[email protected]>
AuthorDate: Fri Feb 13 16:42:22 2026 +0900
[SPARK-55416][SS][PYTHON] Streaming Python Data Source memory leak when
end-offset is not updated
### What changes were proposed in this pull request?
In `_SimpleStreamReaderWrapper.latestOffset()`, validate that custom
implementation of datasource based on `SimpleDataSourceStreamReader.read()`
does not return a non-empty batch with end == start. If it does, raise
PySparkException with error class `SIMPLE_STREAM_READER_OFFSET_DID_NOT_ADVANCE`
before appending to the cache. Empty batches with end == start remain allowed.
### Why are the changes needed?
When a user implements read(start) incorrectly and returns:
- Same offset for both: end = start (e.g. both {"offset": 0}).
- Non-empty iterator: e.g. 2 rows.
If a reader returns end == start with data (e.g. return (it, {"offset":
start_idx})), the wrapper keeps appending to its prefetch cache on every
trigger while commit(end) never trims entries (first matching index is 0). The
cache grows without bound and driver (non-JVM) memory increases until OOM.
Validating and raising error before appending stops this and fails fast with a
clear error.
Empty batches with end == start remain allowed , it will allow the Python
data source to represent that there is no data to read.
### Does this PR introduce _any_ user-facing change?
Yes. Implementations that return end == start with a non-empty iterator now
get PySparkException instead of unbounded memory growth. Empty batches with end
== start are unchanged.
### How was this patch tested?
Added unit test in `test_python_streaming_datasource.py`
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #54237 from vinodkc/br_SPARK-55416.
Authored-by: vinodkc <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
---
.../source/tutorial/sql/python_data_source.rst | 8 +++-
python/pyspark/errors/error-conditions.json | 5 ++
python/pyspark/sql/datasource_internal.py | 26 +++++++++-
.../sql/tests/test_python_streaming_datasource.py | 55 ++++++++++++++++++++++
4 files changed, 92 insertions(+), 2 deletions(-)
diff --git a/python/docs/source/tutorial/sql/python_data_source.rst
b/python/docs/source/tutorial/sql/python_data_source.rst
index b3267405ffdd..07f35722e73f 100644
--- a/python/docs/source/tutorial/sql/python_data_source.rst
+++ b/python/docs/source/tutorial/sql/python_data_source.rst
@@ -309,7 +309,13 @@ This is the same dummy streaming reader that generates 2
rows every batch implem
def read(self, start: dict) -> Tuple[Iterator[Tuple], dict]:
"""
Takes start offset as an input, return an iterator of tuples and
- the start offset of next read.
+ the end offset (start offset for the next read). The end offset
must
+ advance past the start offset when returning data; otherwise Spark
+ raises a validation exception.
+ For example, returning 2 records from start_idx 0 means end should
+ be {"offset": 2} (i.e. start + 2).
+ When there is no data to read, you may return the same offset as
end and
+ start, but you must provide an empty iterator.
"""
start_idx = start["offset"]
it = iter([(i,) for i in range(start_idx, start_idx + 2)])
diff --git a/python/pyspark/errors/error-conditions.json
b/python/pyspark/errors/error-conditions.json
index ee35e237b898..bbc4d005b490 100644
--- a/python/pyspark/errors/error-conditions.json
+++ b/python/pyspark/errors/error-conditions.json
@@ -1185,6 +1185,11 @@
"SparkContext or SparkSession should be created first."
]
},
+ "SIMPLE_STREAM_READER_OFFSET_DID_NOT_ADVANCE": {
+ "message": [
+ "SimpleDataSourceStreamReader.read() returned a non-empty batch but the
end offset: <end_offset> did not advance past the start offset: <start_offset>.
The end offset must represent the position after the last record returned."
+ ]
+ },
"SLICE_WITH_STEP": {
"message": [
"Slice with step is not supported."
diff --git a/python/pyspark/sql/datasource_internal.py
b/python/pyspark/sql/datasource_internal.py
index 92a968cf0572..2ac6c280e822 100644
--- a/python/pyspark/sql/datasource_internal.py
+++ b/python/pyspark/sql/datasource_internal.py
@@ -93,6 +93,30 @@ class _SimpleStreamReaderWrapper(DataSourceStreamReader):
# We do not consider providing different read limit on simple stream
reader.
return ReadAllAvailable()
+ def add_result_to_cache(self, start: dict, end: dict, it: Iterator[Tuple])
-> None:
+ """
+ Validates that read() did not return a non-empty batch with end equal
to start,
+ which would cause the same batch to be processed repeatedly. When end
!= start,
+ appends the result to the cache; when end == start with empty
iterator, does not
+ cache (avoids unbounded cache growth).
+ """
+ start_str = json.dumps(start)
+ end_str = json.dumps(end)
+ if end_str != start_str:
+ self.cache.append(PrefetchedCacheEntry(start, end, it))
+ return
+ try:
+ next(it)
+ except StopIteration:
+ return
+ raise PySparkException(
+ errorClass="SIMPLE_STREAM_READER_OFFSET_DID_NOT_ADVANCE",
+ messageParameters={
+ "start_offset": start_str,
+ "end_offset": end_str,
+ },
+ )
+
def latestOffset(self, start: dict, limit: ReadLimit) -> dict:
assert start is not None, "start offset should not be None"
assert isinstance(
@@ -100,7 +124,7 @@ class _SimpleStreamReaderWrapper(DataSourceStreamReader):
), "simple stream reader does not support read limit"
(iter, end) = self.simple_reader.read(start)
- self.cache.append(PrefetchedCacheEntry(start, end, iter))
+ self.add_result_to_cache(start, end, iter)
return end
def commit(self, end: dict) -> None:
diff --git a/python/pyspark/sql/tests/test_python_streaming_datasource.py
b/python/pyspark/sql/tests/test_python_streaming_datasource.py
index bef85f7ba845..5f6aaf10fe01 100644
--- a/python/pyspark/sql/tests/test_python_streaming_datasource.py
+++ b/python/pyspark/sql/tests/test_python_streaming_datasource.py
@@ -41,6 +41,7 @@ from pyspark.testing.sqlutils import (
have_pyarrow,
pyarrow_requirement_message,
)
+from pyspark.errors import PySparkException
from pyspark.testing import assertDataFrameEqual
from pyspark.testing.utils import eventually
from pyspark.testing.sqlutils import ReusedSQLTestCase
@@ -509,6 +510,60 @@ class BasePythonStreamingDataSourceTestsMixin:
q.awaitTermination(timeout=30)
self.assertIsNone(q.exception(), "No exception has to be propagated.")
+ def test_simple_stream_reader_offset_did_not_advance_raises(self):
+ """Validate that returning end == start with non-empty data raises
SIMPLE_STREAM_READER_OFFSET_DID_NOT_ADVANCE."""
+ from pyspark.sql.datasource_internal import _SimpleStreamReaderWrapper
+
+ class BuggySimpleStreamReader(SimpleDataSourceStreamReader):
+ def initialOffset(self):
+ return {"offset": 0}
+
+ def read(self, start: dict):
+ # Bug: return same offset as end despite returning data
+ start_idx = start["offset"]
+ it = iter([(i,) for i in range(start_idx, start_idx + 3)])
+ return (it, start)
+
+ def readBetweenOffsets(self, start: dict, end: dict):
+ return iter([])
+
+ def commit(self, end: dict):
+ pass
+
+ reader = BuggySimpleStreamReader()
+ wrapper = _SimpleStreamReaderWrapper(reader)
+ with self.assertRaises(PySparkException) as cm:
+ wrapper.latestOffset({"offset": 0}, ReadAllAvailable())
+ self.assertEqual(
+ cm.exception.getCondition(),
+ "SIMPLE_STREAM_READER_OFFSET_DID_NOT_ADVANCE",
+ )
+
+ def
test_simple_stream_reader_empty_iterator_start_equals_end_allowed(self):
+ """When read() returns end == start with an empty iterator, no
exception and no cache entry."""
+ from pyspark.sql.datasource_internal import _SimpleStreamReaderWrapper
+
+ class EmptyBatchReader(SimpleDataSourceStreamReader):
+ def initialOffset(self):
+ return {"offset": 0}
+
+ def read(self, start: dict):
+ # Valid: same offset as end but empty iterator (no data)
+ return (iter([]), start)
+
+ def readBetweenOffsets(self, start: dict, end: dict):
+ return iter([])
+
+ def commit(self, end: dict):
+ pass
+
+ reader = EmptyBatchReader()
+ wrapper = _SimpleStreamReaderWrapper(reader)
+ start = {"offset": 0}
+ end = wrapper.latestOffset(start, ReadAllAvailable())
+ self.assertEqual(end, start)
+ self.assertEqual(len(wrapper.cache), 0)
+
def test_stream_writer(self):
input_dir =
tempfile.TemporaryDirectory(prefix="test_data_stream_write_input")
output_dir =
tempfile.TemporaryDirectory(prefix="test_data_stream_write_output")
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]