This is an automated email from the ASF dual-hosted git repository.
ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new ac1835484320 [SPARK-52433][PYTHON] Unify the string coercion in
createDataFrame
ac1835484320 is described below
commit ac18354843203b63e4974ba47ad081a34afac7eb
Author: Ruifeng Zheng <[email protected]>
AuthorDate: Mon Jun 16 10:32:16 2025 +0800
[SPARK-52433][PYTHON] Unify the string coercion in createDataFrame
### What changes were proposed in this pull request?
Unify the string coercion in createDataFrame
### Why are the changes needed?
currently there is behavior difference between PySpark Classic and PySpark
Connect:
```python
from pyspark.sql.types import StructType, StructField, StringType,
IntegerType
items = [{'id': '5558382', 'broker': {'teamId': 3398, 'contactEmail':
'abc.xyz123.ca'}} ]
schema = StructType([
StructField("id", StringType()),
StructField("broker", StringType())
])
df = spark.createDataFrame(items, schema=schema)
df.show(truncate=False)
```
PySpark Classic:
```
+-------+------------------------------------------+
|id |broker |
+-------+------------------------------------------+
|5558382|{contactEmail=abc.xyz123.ca, teamId=3398}|
+-------+------------------------------------------+
```
PySpark Connect:
```
+-------+--------------------------------------------------+
|id |broker |
+-------+--------------------------------------------------+
|5558382|{'teamId': 3398, 'contactEmail': 'abc.xyz123.ca'}|
+-------+--------------------------------------------------+
```
The latter seems more reasonable, the dicts should be converted into
strings in the python side.
### Does this PR introduce _any_ user-facing change?
yes, in the PySpark classic
### How was this patch tested?
new UT
### Was this patch authored or co-authored using generative AI tooling?
no
Closes #51140 from zhengruifeng/py_create_df.
Authored-by: Ruifeng Zheng <[email protected]>
Signed-off-by: Ruifeng Zheng <[email protected]>
---
python/pyspark/sql/conversion.py | 2 +-
python/pyspark/sql/session.py | 10 ++++++++--
python/pyspark/sql/tests/test_creation.py | 18 ++++++++++++++++++
python/pyspark/sql/types.py | 26 ++++++++++++++++++++++++--
4 files changed, 51 insertions(+), 5 deletions(-)
diff --git a/python/pyspark/sql/conversion.py b/python/pyspark/sql/conversion.py
index a0cd89da3822..1ce961554940 100644
--- a/python/pyspark/sql/conversion.py
+++ b/python/pyspark/sql/conversion.py
@@ -255,7 +255,7 @@ class LocalDataToArrowConversion:
return None
else:
if isinstance(value, bool):
- # To match the PySpark which convert bool to string in
+ # To match the PySpark Classic which convert bool to
string in
# the JVM side (python.EvaluatePython.makeFromJava)
return str(value).lower()
else:
diff --git a/python/pyspark/sql/session.py b/python/pyspark/sql/session.py
index 241424c954b2..2cb0f2f59b1b 100644
--- a/python/pyspark/sql/session.py
+++ b/python/pyspark/sql/session.py
@@ -54,6 +54,7 @@ from pyspark.sql.types import (
DataType,
StructField,
StructType,
+ VariantVal,
_make_type_verifier,
_infer_schema,
_has_nulltype,
@@ -1194,10 +1195,14 @@ class SparkSession(SparkConversionMixin):
if not isinstance(data, list):
data = list(data)
+ if any(isinstance(d, VariantVal) for d in data):
+ raise PySparkValueError("Rows cannot be of type VariantVal")
+
+ tupled_data: Iterable[Tuple]
if schema is None or isinstance(schema, (list, tuple)):
struct = self._inferSchemaFromList(data, names=schema)
converter = _create_converter(struct)
- tupled_data: Iterable[Tuple] = map(converter, data)
+ tupled_data = map(converter, data)
if isinstance(schema, (list, tuple)):
for i, name in enumerate(schema):
struct.fields[i].name = name
@@ -1205,7 +1210,8 @@ class SparkSession(SparkConversionMixin):
elif isinstance(schema, StructType):
struct = schema
- tupled_data = data
+ converter = _create_converter(struct)
+ tupled_data = map(converter, data)
else:
raise PySparkTypeError(
diff --git a/python/pyspark/sql/tests/test_creation.py
b/python/pyspark/sql/tests/test_creation.py
index c6917aa234b4..0045deb76389 100644
--- a/python/pyspark/sql/tests/test_creation.py
+++ b/python/pyspark/sql/tests/test_creation.py
@@ -24,6 +24,9 @@ from typing import cast
from pyspark.sql import Row
import pyspark.sql.functions as F
from pyspark.sql.types import (
+ StructType,
+ StructField,
+ StringType,
DateType,
TimestampType,
TimestampNTZType,
@@ -42,6 +45,21 @@ from pyspark.testing.sqlutils import (
class DataFrameCreationTestsMixin:
+ def test_create_str_from_dict(self):
+ data = [
+ {"broker": {"teamId": 3398, "contactEmail": "[email protected]"}},
+ ]
+
+ for schema in [
+ StructType([StructField("broker", StringType())]),
+ "broker: string",
+ ]:
+ df = self.spark.createDataFrame(data, schema=schema)
+ self.assertEqual(
+ df.first().broker,
+ """{'teamId': 3398, 'contactEmail': '[email protected]'}""",
+ )
+
def test_create_dataframe_from_array_of_long(self):
import array
diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py
index 26658bb8d604..6ff6d78a94c5 100644
--- a/python/pyspark/sql/types.py
+++ b/python/pyspark/sql/types.py
@@ -2510,6 +2510,9 @@ def _need_converter(dataType: DataType) -> bool:
return _need_converter(dataType.keyType) or
_need_converter(dataType.valueType)
elif isinstance(dataType, NullType):
return True
+ elif isinstance(dataType, StringType):
+ # Coercion to StringType is allowed, e.g. dict -> str
+ return True
else:
return False
@@ -2521,16 +2524,35 @@ def _create_converter(dataType: DataType) -> Callable:
if isinstance(dataType, ArrayType):
conv = _create_converter(dataType.elementType)
- return lambda row: [conv(v) for v in row]
+ return lambda row: [conv(v) for v in row] if row is not None else None
elif isinstance(dataType, MapType):
kconv = _create_converter(dataType.keyType)
vconv = _create_converter(dataType.valueType)
- return lambda row: dict((kconv(k), vconv(v)) for k, v in row.items())
+ return (
+ lambda row: dict((kconv(k), vconv(v)) for k, v in row.items())
+ if row is not None
+ else None
+ )
elif isinstance(dataType, NullType):
return lambda x: None
+ elif isinstance(dataType, StringType):
+
+ def convert_string(value: Any) -> Any:
+ if value is None:
+ return None
+ else:
+ if isinstance(value, bool):
+ # To match the Classic behavior
+ return str(value).lower()
+ else:
+ # Coercion to StringType is allowed, e.g. dict -> str
+ return str(value)
+
+ return convert_string
+
elif not isinstance(dataType, StructType):
return lambda x: x
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]