This is an automated email from the ASF dual-hosted git repository.

HyukjinKwon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new f1cdb6d9a6dd [SPARK-56854][PYTHON] Filter None values in 
DataFrame[Stream]Reader/Writer .option(s)
f1cdb6d9a6dd is described below

commit f1cdb6d9a6dd854305ff5eed4678d9ac8e39fc02
Author: Lavan Vivekanandasarma <[email protected]>
AuthorDate: Fri May 15 06:50:22 2026 +0900

    [SPARK-56854][PYTHON] Filter None values in DataFrame[Stream]Reader/Writer 
.option(s)
    
    ### What changes were proposed in this pull request?
    
    Filter None values in Classic PySpark's DataFrameReader, DataFrameWriter, 
DataFrameWriterV2, DataStreamReader, and DataStreamWriter `.option(key, value)` 
and `.options(**kwargs)` methods. After this change, `option(key, None)` is a 
no-op and `options(**{key: None, ...})` drops the None entries before 
forwarding to the JVM. The loop-style methods mirror the shape of 
`OptionUtils._set_opts` at `python/pyspark/sql/readwriter.py:41-53`: `for k, v 
in options.items(): if v is not None: ...`.
    
    ### Why are the changes needed?
    
    Classic and Spark Connect Python currently disagree on what `option(key, 
None)` means. Classic forwards Python None to the JVM as Java null, which 
several data sources interpret differently from "unset". For example, with 
`spark.read.options(nullValue=None).schema("a STRING, b STRING").csv(path)` and 
a row `"",val`, Classic produces `[Row(a='', b='val')]` while Connect produces 
`[Row(a=None, b='val')]` because Connect drops the None, the default 
`nullValue` of `""` stays in effect, an [...]
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes. `option(k, None)` and `options(**{k: None})` were previously forwarded 
to the JVM as null; they are now no-ops. A migration-guide entry under 
"Upgrading from PySpark 4.1 to 4.2" documents the change. To set an option to 
its default, omit it or pass None; to set it to an empty string, pass `""` 
explicitly.
    
    ### How was this patch tested?
    
    New parity test `test_option_none_is_filtered` in `ReadwriterTestsMixin` 
pins the CSV `nullValue=None` case to `[Row(a=None, b="val")]` for both 
`.option` and `.options`. Because `ReadwriterParityTests` inherits the mixin, 
the regression test runs on Classic and on Spark Connect, giving cross-backend 
coverage automatically.
    
    Additional defensive smoke tests guard the writer / V2 writer / streaming 
reader / streaming writer API contracts:
    
    - `test_writer_option_none_chains_safely`
    - `test_v2_writer_option_none_chains_safely`
    - `test_stream_reader_option_none_chains_safely`
    - `test_stream_writer_option_none_chains_safely`
    
    ### Was this patch authored or co-authored using generative AI tooling?
    Partially Generated-by: Claude Opus 4.7
    
    Closes #55867 from lavanv11/pyspark_reader_inconsistencies.
    
    Authored-by: Lavan Vivekanandasarma <[email protected]>
    Signed-off-by: Hyukjin Kwon <[email protected]>
---
 .../source/migration_guide/pyspark_upgrade.rst     |  1 +
 python/pyspark/sql/connect/readwriter.py           |  8 +++++--
 python/pyspark/sql/connect/streaming/readwriter.py |  4 ++++
 python/pyspark/sql/readwriter.py                   | 18 ++++++++++-----
 python/pyspark/sql/streaming/readwriter.py         | 14 ++++++++----
 .../pyspark/sql/tests/streaming/test_streaming.py  | 20 +++++++++++++++++
 python/pyspark/sql/tests/test_readwriter.py        | 26 ++++++++++++++++++++++
 7 files changed, 80 insertions(+), 11 deletions(-)

diff --git a/python/docs/source/migration_guide/pyspark_upgrade.rst 
b/python/docs/source/migration_guide/pyspark_upgrade.rst
index dba2a6266939..041640b2eb7f 100644
--- a/python/docs/source/migration_guide/pyspark_upgrade.rst
+++ b/python/docs/source/migration_guide/pyspark_upgrade.rst
@@ -23,6 +23,7 @@ Upgrading from PySpark 4.1 to 4.2
 ---------------------------------
 * In Spark 4.2, the minimum supported version for PyArrow has been raised from 
15.0.0 to 18.0.0 in PySpark.
 * In Spark 4.2, ``DataFrame.__getattr__`` on Spark Connect Python Client no 
longer eagerly validate the column name. To restore the legacy behavior, set 
``PYSPARK_VALIDATE_COLUMN_NAME_LEGACY`` environment variable to ``1``.
+* In Spark 4.2, ``DataFrame[Stream]Reader/Writer.option`` and ``.options`` now 
filter out ``None`` values (treating them as "unset") instead of forwarding 
``None`` to the JVM as Java ``null``, matching the Spark Connect Python client 
(SPARK-49263) and ``OptionUtils._set_opts``. To set an option to its default, 
omit it or pass ``None``; to set it to an empty string, pass ``""`` explicitly.
 * In Spark 4.2, columnar data exchange between PySpark and the JVM uses Apache 
Arrow by default. The configuration 
``spark.sql.execution.arrow.pyspark.enabled`` now defaults to true. To restore 
the legacy (non-Arrow) row-based data exchange, set 
``spark.sql.execution.arrow.pyspark.enabled`` to ``false``.
 * In Spark 4.2, regular Python UDFs are Arrow-optimized by default. The 
configuration ``spark.sql.execution.pythonUDF.arrow.enabled`` now defaults to 
true. To restore the legacy behavior for Python UDF execution, set 
``spark.sql.execution.pythonUDF.arrow.enabled`` to ``false``.
 * In Spark 4.2, regular Python UDTFs are Arrow-optimized by default. The 
configuration ``spark.sql.execution.pythonUDTF.arrow.enabled`` now defaults to 
true. To restore the legacy behavior for Python UDTF execution, set 
``spark.sql.execution.pythonUDTF.arrow.enabled`` to ``false``.
diff --git a/python/pyspark/sql/connect/readwriter.py 
b/python/pyspark/sql/connect/readwriter.py
index b4a5a06993e5..5c2c0c80ccdf 100644
--- a/python/pyspark/sql/connect/readwriter.py
+++ b/python/pyspark/sql/connect/readwriter.py
@@ -602,6 +602,8 @@ class DataFrameWriter(OptionUtils):
     format.__doc__ = PySparkDataFrameWriter.format.__doc__
 
     def option(self, key: str, value: "OptionalPrimitiveType") -> 
"DataFrameWriter":
+        if value is None:
+            return self
         self._write.options[key] = to_str(value)
         return self
 
@@ -609,7 +611,7 @@ class DataFrameWriter(OptionUtils):
 
     def options(self, **options: "OptionalPrimitiveType") -> "DataFrameWriter":
         for k in options:
-            self._write.options[k] = to_str(options[k])
+            self.option(k, options[k])
         return self
 
     options.__doc__ = PySparkDataFrameWriter.options.__doc__
@@ -978,6 +980,8 @@ class DataFrameWriterV2(OptionUtils):
     using.__doc__ = PySparkDataFrameWriterV2.using.__doc__
 
     def option(self, key: str, value: "OptionalPrimitiveType") -> 
"DataFrameWriterV2":
+        if value is None:
+            return self
         self._write.options[key] = to_str(value)
         return self
 
@@ -985,7 +989,7 @@ class DataFrameWriterV2(OptionUtils):
 
     def options(self, **options: "OptionalPrimitiveType") -> 
"DataFrameWriterV2":
         for k in options:
-            self._write.options[k] = to_str(options[k])
+            self.option(k, options[k])
         return self
 
     options.__doc__ = PySparkDataFrameWriterV2.options.__doc__
diff --git a/python/pyspark/sql/connect/streaming/readwriter.py 
b/python/pyspark/sql/connect/streaming/readwriter.py
index f4e12144bf40..969fdc322a0b 100644
--- a/python/pyspark/sql/connect/streaming/readwriter.py
+++ b/python/pyspark/sql/connect/streaming/readwriter.py
@@ -90,6 +90,8 @@ class DataStreamReader(OptionUtils):
     schema.__doc__ = PySparkDataStreamReader.schema.__doc__
 
     def option(self, key: str, value: "OptionalPrimitiveType") -> 
"DataStreamReader":
+        if value is None:
+            return self
         self._options[key] = str(value)
         return self
 
@@ -488,6 +490,8 @@ class DataStreamWriter:
     format.__doc__ = PySparkDataStreamWriter.format.__doc__
 
     def option(self, key: str, value: "OptionalPrimitiveType") -> 
"DataStreamWriter":
+        if value is None:
+            return self
         self._write_proto.options[key] = cast(str, to_str(value))
         return self
 
diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py
index 3e91a25771f4..e289faf89997 100644
--- a/python/pyspark/sql/readwriter.py
+++ b/python/pyspark/sql/readwriter.py
@@ -201,6 +201,8 @@ class DataFrameReader(OptionUtils):
         |100|NULL|
         +---+----+
         """
+        if value is None:
+            return self
         self._jreader = self._jreader.option(key, to_str(value))
         return self
 
@@ -248,8 +250,9 @@ class DataFrameReader(OptionUtils):
         |100|NULL|
         +---+----+
         """
-        for k in options:
-            self._jreader = self._jreader.option(k, to_str(options[k]))
+        for k, v in options.items():
+            if v is not None:
+                self._jreader = self._jreader.option(k, to_str(v))
         return self
 
     def load(
@@ -1433,6 +1436,8 @@ class DataFrameWriter(OptionUtils):
         +---+------------+
         """
 
+        if value is None:
+            return self
         self._jwrite = self._jwrite.option(key, to_str(value))
         return self
 
@@ -1483,8 +1488,9 @@ class DataFrameWriter(OptionUtils):
         |100|Hyukjin Kwon|
         +---+------------+
         """
-        for k in options:
-            self._jwrite = self._jwrite.option(k, to_str(options[k]))
+        for k, v in options.items():
+            if v is not None:
+                self._jwrite = self._jwrite.option(k, to_str(v))
         return self
 
     @overload
@@ -2469,6 +2475,8 @@ class DataFrameWriterV2:
 
         .. versionadded: 3.1.0
         """
+        if value is None:
+            return self
         self._jwriter.option(key, to_str(value))
         return self
 
@@ -2478,7 +2486,7 @@ class DataFrameWriterV2:
 
         .. versionadded: 3.1.0
         """
-        options = {k: to_str(v) for k, v in options.items()}
+        options = {k: to_str(v) for k, v in options.items() if v is not None}
         self._jwriter.options(options)
         return self
 
diff --git a/python/pyspark/sql/streaming/readwriter.py 
b/python/pyspark/sql/streaming/readwriter.py
index 585dc05424c9..5a99f8fce297 100644
--- a/python/pyspark/sql/streaming/readwriter.py
+++ b/python/pyspark/sql/streaming/readwriter.py
@@ -207,6 +207,8 @@ class DataStreamReader(OptionUtils):
         >>> time.sleep(3)
         >>> q.stop()
         """
+        if value is None:
+            return self
         self._jreader = self._jreader.option(key, to_str(value))
         return self
 
@@ -242,8 +244,9 @@ class DataStreamReader(OptionUtils):
         >>> time.sleep(3)
         >>> q.stop()
         """
-        for k in options:
-            self._jreader = self._jreader.option(k, to_str(options[k]))
+        for k, v in options.items():
+            if v is not None:
+                self._jreader = self._jreader.option(k, to_str(v))
         return self
 
     def name(self, source_name: str) -> "DataStreamReader":
@@ -1143,6 +1146,8 @@ class DataStreamWriter:
         >>> time.sleep(3)
         >>> q.stop()
         """
+        if value is None:
+            return self
         self._jwrite = self._jwrite.option(key, to_str(value))
         return self
 
@@ -1179,8 +1184,9 @@ class DataStreamWriter:
         >>> time.sleep(3)
         >>> q.stop()
         """
-        for k in options:
-            self._jwrite = self._jwrite.option(k, to_str(options[k]))
+        for k, v in options.items():
+            if v is not None:
+                self._jwrite = self._jwrite.option(k, to_str(v))
         return self
 
     @overload
diff --git a/python/pyspark/sql/tests/streaming/test_streaming.py 
b/python/pyspark/sql/tests/streaming/test_streaming.py
index ba39ca513610..c90943896809 100644
--- a/python/pyspark/sql/tests/streaming/test_streaming.py
+++ b/python/pyspark/sql/tests/streaming/test_streaming.py
@@ -667,6 +667,26 @@ class StreamingTestsMixin:
         result = self.spark.sql("SELECT * FROM 
test_streaming_drop_duplicates_within_wm").collect()
         self.assertTrue(len(result) >= 6 and len(result) <= 9)
 
+    def test_stream_reader_option_none_chains_safely(self):
+        df = (
+            self.spark.readStream.format("rate")
+            .option("rowsPerSecond", None)
+            .options(numPartitions=None)
+            .option("rowsPerSecond", "5")
+            .load()
+        )
+        self.assertIsNotNone(df.schema)
+
+    def test_stream_writer_option_none_chains_safely(self):
+        df = self.spark.readStream.format("rate").option("rowsPerSecond", 
"5").load()
+        writer = (
+            df.writeStream.format("memory")
+            .queryName("opt_none_test")
+            .option("checkpointLocation", None)
+            .options(checkpointLocation=None)
+        )
+        self.assertIsNotNone(writer)
+
 
 class StreamingTests(StreamingTestsMixin, ReusedSQLTestCase):
     def _assert_exception_tree_contains_msg(self, exception, msg):
diff --git a/python/pyspark/sql/tests/test_readwriter.py 
b/python/pyspark/sql/tests/test_readwriter.py
index 059535be166d..5dd5b1ebff63 100644
--- a/python/pyspark/sql/tests/test_readwriter.py
+++ b/python/pyspark/sql/tests/test_readwriter.py
@@ -313,6 +313,27 @@ class ReadwriterTestsMixin:
             ).changes("nonexistent_table")
         self.assertIn("changes", str(ctx.exception))
 
+    def test_option_none_is_filtered(self):
+        with tempfile.TemporaryDirectory() as d:
+            path = os.path.join(d, "data.csv")
+            with open(path, "w") as f:
+                f.write('"",val\n')
+            schema = "a STRING, b STRING"
+            expected = [Row(a=None, b="val")]
+            self.assertEqual(
+                self.spark.read.schema(schema).option("nullValue", 
None).csv(path).collect(),
+                expected,
+            )
+            self.assertEqual(
+                
self.spark.read.schema(schema).options(nullValue=None).csv(path).collect(),
+                expected,
+            )
+
+    def test_writer_option_none_chains_safely(self):
+        df = self.spark.createDataFrame([(1,)], "x INT")
+        self.assertIsNotNone(df.write.option("foo", None).option("bar", "baz"))
+        self.assertIsNotNone(df.write.options(foo=None, bar="baz"))
+
 
 class ReadwriterV2TestsMixin:
     def test_api(self):
@@ -419,6 +440,11 @@ class ReadwriterV2TestsMixin:
             self.assertEqual(get_cluster_by_cols(), ["x"])
             self.assertSetEqual(set(data), 
set(self.spark.table(table_name).collect()))
 
+    def test_v2_writer_option_none_chains_safely(self):
+        df = self.spark.createDataFrame([(1,)], "x INT")
+        self.assertIsNotNone(df.writeTo("notexist").option("foo", 
None).option("bar", "baz"))
+        self.assertIsNotNone(df.writeTo("notexist").options(foo=None, 
bar="baz"))
+
 
 class ReadwriterTests(ReadwriterTestsMixin, ReusedSQLTestCase):
     pass


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to