This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new ac1835484320 [SPARK-52433][PYTHON] Unify the string coercion in 
createDataFrame
ac1835484320 is described below

commit ac18354843203b63e4974ba47ad081a34afac7eb
Author: Ruifeng Zheng <[email protected]>
AuthorDate: Mon Jun 16 10:32:16 2025 +0800

    [SPARK-52433][PYTHON] Unify the string coercion in createDataFrame
    
    ### What changes were proposed in this pull request?
    Unify the string coercion in createDataFrame
    
    ### Why are the changes needed?
    currently there is behavior difference between PySpark Classic and PySpark 
Connect:
    
    ```python
    from pyspark.sql.types import StructType, StructField, StringType, 
IntegerType
    
    items = [{'id': '5558382', 'broker': {'teamId': 3398, 'contactEmail': 
'abc.xyz123.ca'}} ]
    
    schema = StructType([
        StructField("id", StringType()),
        StructField("broker", StringType())
    
    ])
    
    df = spark.createDataFrame(items, schema=schema)
    
    df.show(truncate=False)
    ```
    
    PySpark Classic:
    ```
    +-------+------------------------------------------+
    |id     |broker                                    |
    +-------+------------------------------------------+
    |5558382|{contactEmail=abc.xyz123.ca, teamId=3398}|
    +-------+------------------------------------------+
    ```
    
    PySpark Connect:
    ```
    +-------+--------------------------------------------------+
    |id     |broker                                            |
    +-------+--------------------------------------------------+
    |5558382|{'teamId': 3398, 'contactEmail': 'abc.xyz123.ca'}|
    +-------+--------------------------------------------------+
    ```
    
    The latter seems more reasonable, the dicts should be converted into 
strings in the python side.
    
    ### Does this PR introduce _any_ user-facing change?
    yes, in the PySpark classic
    
    ### How was this patch tested?
    new UT
    
    ### Was this patch authored or co-authored using generative AI tooling?
    no
    
    Closes #51140 from zhengruifeng/py_create_df.
    
    Authored-by: Ruifeng Zheng <[email protected]>
    Signed-off-by: Ruifeng Zheng <[email protected]>
---
 python/pyspark/sql/conversion.py          |  2 +-
 python/pyspark/sql/session.py             | 10 ++++++++--
 python/pyspark/sql/tests/test_creation.py | 18 ++++++++++++++++++
 python/pyspark/sql/types.py               | 26 ++++++++++++++++++++++++--
 4 files changed, 51 insertions(+), 5 deletions(-)

diff --git a/python/pyspark/sql/conversion.py b/python/pyspark/sql/conversion.py
index a0cd89da3822..1ce961554940 100644
--- a/python/pyspark/sql/conversion.py
+++ b/python/pyspark/sql/conversion.py
@@ -255,7 +255,7 @@ class LocalDataToArrowConversion:
                     return None
                 else:
                     if isinstance(value, bool):
-                        # To match the PySpark which convert bool to string in
+                        # To match the PySpark Classic which convert bool to 
string in
                         # the JVM side (python.EvaluatePython.makeFromJava)
                         return str(value).lower()
                     else:
diff --git a/python/pyspark/sql/session.py b/python/pyspark/sql/session.py
index 241424c954b2..2cb0f2f59b1b 100644
--- a/python/pyspark/sql/session.py
+++ b/python/pyspark/sql/session.py
@@ -54,6 +54,7 @@ from pyspark.sql.types import (
     DataType,
     StructField,
     StructType,
+    VariantVal,
     _make_type_verifier,
     _infer_schema,
     _has_nulltype,
@@ -1194,10 +1195,14 @@ class SparkSession(SparkConversionMixin):
         if not isinstance(data, list):
             data = list(data)
 
+        if any(isinstance(d, VariantVal) for d in data):
+            raise PySparkValueError("Rows cannot be of type VariantVal")
+
+        tupled_data: Iterable[Tuple]
         if schema is None or isinstance(schema, (list, tuple)):
             struct = self._inferSchemaFromList(data, names=schema)
             converter = _create_converter(struct)
-            tupled_data: Iterable[Tuple] = map(converter, data)
+            tupled_data = map(converter, data)
             if isinstance(schema, (list, tuple)):
                 for i, name in enumerate(schema):
                     struct.fields[i].name = name
@@ -1205,7 +1210,8 @@ class SparkSession(SparkConversionMixin):
 
         elif isinstance(schema, StructType):
             struct = schema
-            tupled_data = data
+            converter = _create_converter(struct)
+            tupled_data = map(converter, data)
 
         else:
             raise PySparkTypeError(
diff --git a/python/pyspark/sql/tests/test_creation.py 
b/python/pyspark/sql/tests/test_creation.py
index c6917aa234b4..0045deb76389 100644
--- a/python/pyspark/sql/tests/test_creation.py
+++ b/python/pyspark/sql/tests/test_creation.py
@@ -24,6 +24,9 @@ from typing import cast
 from pyspark.sql import Row
 import pyspark.sql.functions as F
 from pyspark.sql.types import (
+    StructType,
+    StructField,
+    StringType,
     DateType,
     TimestampType,
     TimestampNTZType,
@@ -42,6 +45,21 @@ from pyspark.testing.sqlutils import (
 
 
 class DataFrameCreationTestsMixin:
+    def test_create_str_from_dict(self):
+        data = [
+            {"broker": {"teamId": 3398, "contactEmail": "[email protected]"}},
+        ]
+
+        for schema in [
+            StructType([StructField("broker", StringType())]),
+            "broker: string",
+        ]:
+            df = self.spark.createDataFrame(data, schema=schema)
+            self.assertEqual(
+                df.first().broker,
+                """{'teamId': 3398, 'contactEmail': '[email protected]'}""",
+            )
+
     def test_create_dataframe_from_array_of_long(self):
         import array
 
diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py
index 26658bb8d604..6ff6d78a94c5 100644
--- a/python/pyspark/sql/types.py
+++ b/python/pyspark/sql/types.py
@@ -2510,6 +2510,9 @@ def _need_converter(dataType: DataType) -> bool:
         return _need_converter(dataType.keyType) or 
_need_converter(dataType.valueType)
     elif isinstance(dataType, NullType):
         return True
+    elif isinstance(dataType, StringType):
+        # Coercion to StringType is allowed, e.g. dict -> str
+        return True
     else:
         return False
 
@@ -2521,16 +2524,35 @@ def _create_converter(dataType: DataType) -> Callable:
 
     if isinstance(dataType, ArrayType):
         conv = _create_converter(dataType.elementType)
-        return lambda row: [conv(v) for v in row]
+        return lambda row: [conv(v) for v in row] if row is not None else None
 
     elif isinstance(dataType, MapType):
         kconv = _create_converter(dataType.keyType)
         vconv = _create_converter(dataType.valueType)
-        return lambda row: dict((kconv(k), vconv(v)) for k, v in row.items())
+        return (
+            lambda row: dict((kconv(k), vconv(v)) for k, v in row.items())
+            if row is not None
+            else None
+        )
 
     elif isinstance(dataType, NullType):
         return lambda x: None
 
+    elif isinstance(dataType, StringType):
+
+        def convert_string(value: Any) -> Any:
+            if value is None:
+                return None
+            else:
+                if isinstance(value, bool):
+                    # To match the Classic behavior
+                    return str(value).lower()
+                else:
+                    # Coercion to StringType is allowed, e.g. dict -> str
+                    return str(value)
+
+        return convert_string
+
     elif not isinstance(dataType, StructType):
         return lambda x: x
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to