This is an automated email from the ASF dual-hosted git repository.
ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 7b62f5fc6360 [SPARK-54903][PYTHON] Make to_arrow_schema/to_arrow_type
able to set timezone
7b62f5fc6360 is described below
commit 7b62f5fc63600bfa51f99e36ebaaa20e4fe8cea2
Author: Ruifeng Zheng <[email protected]>
AuthorDate: Tue Jan 6 09:55:57 2026 +0800
[SPARK-54903][PYTHON] Make to_arrow_schema/to_arrow_type able to set
timezone
### What changes were proposed in this pull request?
This PR make `to_arrow_schema/to_arrow_type` able to set timezone, and keep
existing behaviors by setting `UTC` as the default timezone.
So that we can check whether the timezone is reasonable later.
### Why are the changes needed?
it is kind of problematical in pyspark timezone handling, one example is
`to_arrow_schema/to_arrow_type` is not able to set the timezone
while timezone is supported in JVM side
https://github.com/apache/spark/blob/d6f1e224df8fc620ab44817e4c1034f990f5d8ff/sql/api/src/main/scala/org/apache/spark/sql/util/ArrowUtils.scala#L40-L68
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
CI
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #53678 from zhengruifeng/revisit_timezone.
Authored-by: Ruifeng Zheng <[email protected]>
Signed-off-by: Ruifeng Zheng <[email protected]>
---
python/pyspark/pandas/typedef/typehints.py | 4 +-
python/pyspark/sql/connect/dataframe.py | 6 +-
python/pyspark/sql/connect/session.py | 7 ++-
python/pyspark/sql/conversion.py | 1 +
python/pyspark/sql/pandas/conversion.py | 12 +++-
python/pyspark/sql/pandas/serializers.py | 4 +-
python/pyspark/sql/pandas/types.py | 46 +++++---------
python/pyspark/sql/tests/arrow/test_arrow.py | 30 +++------
python/pyspark/sql/udf.py | 20 +++---
python/pyspark/sql/worker/plan_data_source_read.py | 2 +-
python/pyspark/worker.py | 73 +++++++++++-----------
11 files changed, 97 insertions(+), 108 deletions(-)
diff --git a/python/pyspark/pandas/typedef/typehints.py
b/python/pyspark/pandas/typedef/typehints.py
index dd40330fd7cb..8d2fd4a3c0af 100644
--- a/python/pyspark/pandas/typedef/typehints.py
+++ b/python/pyspark/pandas/typedef/typehints.py
@@ -305,7 +305,9 @@ def spark_type_to_pandas_dtype(
== "true"
)
return np.dtype(
- to_arrow_type(spark_type,
prefers_large_types=prefers_large_var_types).to_pandas_dtype()
+ to_arrow_type(
+ spark_type, timezone="UTC",
prefers_large_types=prefers_large_var_types
+ ).to_pandas_dtype()
)
diff --git a/python/pyspark/sql/connect/dataframe.py
b/python/pyspark/sql/connect/dataframe.py
index e60b1c21c556..4552fbe18f40 100644
--- a/python/pyspark/sql/connect/dataframe.py
+++ b/python/pyspark/sql/connect/dataframe.py
@@ -1840,7 +1840,11 @@ class DataFrame(ParentDataFrame):
return (table, schema)
def toArrow(self) -> "pa.Table":
- schema = to_arrow_schema(self.schema,
error_on_duplicated_field_names_in_struct=True)
+ schema = to_arrow_schema(
+ self.schema,
+ error_on_duplicated_field_names_in_struct=True,
+ timezone="UTC",
+ )
table, _ = self._to_table()
return table.cast(schema)
diff --git a/python/pyspark/sql/connect/session.py
b/python/pyspark/sql/connect/session.py
index ac1d1f5681e3..1c7cde825ef0 100644
--- a/python/pyspark/sql/connect/session.py
+++ b/python/pyspark/sql/connect/session.py
@@ -600,7 +600,9 @@ class SparkSession:
deduped_schema = cast(StructType,
_deduplicate_field_names(schema))
spark_types = [field.dataType for field in
deduped_schema.fields]
arrow_schema = to_arrow_schema(
- deduped_schema, prefers_large_types=prefers_large_types
+ deduped_schema,
+ timezone="UTC",
+ prefers_large_types=prefers_large_types,
)
arrow_types = [field.type for field in arrow_schema]
_cols = [str(x) if not isinstance(x, str) else x for x in
schema.fieldNames()]
@@ -620,7 +622,7 @@ class SparkSession:
for t in data.dtypes
]
arrow_types = [
- to_arrow_type(dt, prefers_large_types=prefers_large_types)
+ to_arrow_type(dt, timezone="UTC",
prefers_large_types=prefers_large_types)
if dt is not None
else None
for dt in spark_types
@@ -667,6 +669,7 @@ class SparkSession:
to_arrow_schema(
schema,
error_on_duplicated_field_names_in_struct=True,
+ timezone="UTC",
prefers_large_types=prefers_large_types,
)
)
diff --git a/python/pyspark/sql/conversion.py b/python/pyspark/sql/conversion.py
index e035f259c836..a657169ca796 100644
--- a/python/pyspark/sql/conversion.py
+++ b/python/pyspark/sql/conversion.py
@@ -510,6 +510,7 @@ class LocalDataToArrowConversion:
for field in schema.fields
]
),
+ timezone="UTC",
prefers_large_types=use_large_var_types,
)
diff --git a/python/pyspark/sql/pandas/conversion.py
b/python/pyspark/sql/pandas/conversion.py
index 0e63bcef7d88..5e6e181777f2 100644
--- a/python/pyspark/sql/pandas/conversion.py
+++ b/python/pyspark/sql/pandas/conversion.py
@@ -218,7 +218,9 @@ class PandasConversionMixin:
require_minimum_pyarrow_version()
arrow_schema = to_arrow_schema(
- self.schema, prefers_large_types=prefers_large_var_types
+ self.schema,
+ timezone="UTC",
+ prefers_large_types=prefers_large_var_types,
)
except Exception as e:
if arrowPySparkFallbackEnabled == "true":
@@ -345,6 +347,7 @@ class PandasConversionMixin:
schema = to_arrow_schema(
self.schema,
error_on_duplicated_field_names_in_struct=True,
+ timezone="UTC",
prefers_large_types=prefers_large_var_types,
)
@@ -435,7 +438,9 @@ class PandasConversionMixin:
from pyspark.sql.pandas.types import to_arrow_schema
import pyarrow as pa
- schema = to_arrow_schema(self.schema,
prefers_large_types=prefers_large_var_types)
+ schema = to_arrow_schema(
+ self.schema, timezone="UTC",
prefers_large_types=prefers_large_var_types
+ )
empty_arrays = [pa.array([], type=field.type) for field in schema]
return [pa.RecordBatch.from_arrays(empty_arrays, schema=schema)]
@@ -882,7 +887,7 @@ class SparkConversionMixin:
[
(
c,
- to_arrow_type(t,
prefers_large_types=prefers_large_var_types)
+ to_arrow_type(t, timezone="UTC",
prefers_large_types=prefers_large_var_types)
if t is not None
else None,
t,
@@ -961,6 +966,7 @@ class SparkConversionMixin:
to_arrow_schema(
schema,
error_on_duplicated_field_names_in_struct=True,
+ timezone="UTC",
prefers_large_types=prefers_large_var_types,
)
)
diff --git a/python/pyspark/sql/pandas/serializers.py
b/python/pyspark/sql/pandas/serializers.py
index fc86986e0fc0..7d3fbd78754c 100644
--- a/python/pyspark/sql/pandas/serializers.py
+++ b/python/pyspark/sql/pandas/serializers.py
@@ -1380,7 +1380,7 @@ class
ApplyInPandasWithStateSerializer(ArrowStreamPandasUDFSerializer):
)
self.result_count_pdf_arrow_type = to_arrow_type(
- self.result_count_df_type,
prefers_large_types=prefers_large_var_types
+ self.result_count_df_type, timezone="UTC",
prefers_large_types=prefers_large_var_types
)
self.result_state_df_type = StructType(
@@ -1393,7 +1393,7 @@ class
ApplyInPandasWithStateSerializer(ArrowStreamPandasUDFSerializer):
)
self.result_state_pdf_arrow_type = to_arrow_type(
- self.result_state_df_type,
prefers_large_types=prefers_large_var_types
+ self.result_state_df_type, timezone="UTC",
prefers_large_types=prefers_large_var_types
)
self.arrow_max_records_per_batch = (
arrow_max_records_per_batch if arrow_max_records_per_batch > 0
else 2**31 - 1
diff --git a/python/pyspark/sql/pandas/types.py
b/python/pyspark/sql/pandas/types.py
index 555dbe0a7c0d..dcecdae2b0d0 100644
--- a/python/pyspark/sql/pandas/types.py
+++ b/python/pyspark/sql/pandas/types.py
@@ -73,7 +73,7 @@ def to_arrow_type(
dt: DataType,
*,
error_on_duplicated_field_names_in_struct: bool = False,
- timestamp_utc: bool = True,
+ timezone: Optional[str] = None,
prefers_large_types: bool = False,
) -> "pa.DataType":
"""
@@ -86,12 +86,8 @@ def to_arrow_type(
error_on_duplicated_field_names_in_struct: bool, default False
Whether to raise an exception when there are duplicated field names in
a
:class:`pyspark.sql.types.StructType`. (default ``False``)
- timestamp_utc : bool, default True
- If ``True`` (the default), :class:`TimestampType` is converted to a
timezone-aware
- :class:`pyarrow.TimestampType` with UTC as the timezone. If ``False``,
- :class:`TimestampType` is converted to a timezone-naive
:class:`pyarrow.TimestampType`.
- The JVM expects timezone-aware timestamps to be in UTC. Always keep
this set to ``True``
- except in special cases, such as when this function is used in a test.
+ timezone : str, default None
+ timeZone required for TimestampType
Returns
-------
@@ -115,21 +111,15 @@ def to_arrow_type(
arrow_type = pa.float64()
elif type(dt) == DecimalType:
arrow_type = pa.decimal128(dt.precision, dt.scale)
- elif type(dt) == StringType and prefers_large_types:
- arrow_type = pa.large_string()
elif type(dt) == StringType:
- arrow_type = pa.string()
- elif type(dt) == BinaryType and prefers_large_types:
- arrow_type = pa.large_binary()
+ arrow_type = pa.large_string() if prefers_large_types else pa.string()
elif type(dt) == BinaryType:
- arrow_type = pa.binary()
+ arrow_type = pa.large_binary() if prefers_large_types else pa.binary()
elif type(dt) == DateType:
arrow_type = pa.date32()
- elif type(dt) == TimestampType and timestamp_utc:
- # Timestamps should be in UTC, JVM Arrow timestamps require a timezone
to be read
- arrow_type = pa.timestamp("us", tz="UTC")
elif type(dt) == TimestampType:
- arrow_type = pa.timestamp("us", tz=None)
+ assert timezone is not None
+ arrow_type = pa.timestamp("us", tz=timezone)
elif type(dt) == TimestampNTZType:
arrow_type = pa.timestamp("us", tz=None)
elif type(dt) == DayTimeIntervalType:
@@ -142,7 +132,7 @@ def to_arrow_type(
to_arrow_type(
dt.elementType,
error_on_duplicated_field_names_in_struct=error_on_duplicated_field_names_in_struct,
- timestamp_utc=timestamp_utc,
+ timezone=timezone,
prefers_large_types=prefers_large_types,
),
nullable=dt.containsNull,
@@ -154,7 +144,7 @@ def to_arrow_type(
to_arrow_type(
dt.keyType,
error_on_duplicated_field_names_in_struct=error_on_duplicated_field_names_in_struct,
- timestamp_utc=timestamp_utc,
+ timezone=timezone,
prefers_large_types=prefers_large_types,
),
nullable=False,
@@ -164,7 +154,7 @@ def to_arrow_type(
to_arrow_type(
dt.valueType,
error_on_duplicated_field_names_in_struct=error_on_duplicated_field_names_in_struct,
- timestamp_utc=timestamp_utc,
+ timezone=timezone,
prefers_large_types=prefers_large_types,
),
nullable=dt.valueContainsNull,
@@ -183,7 +173,7 @@ def to_arrow_type(
to_arrow_type(
field.dataType,
error_on_duplicated_field_names_in_struct=error_on_duplicated_field_names_in_struct,
- timestamp_utc=timestamp_utc,
+ timezone=timezone,
prefers_large_types=prefers_large_types,
),
nullable=field.nullable,
@@ -197,7 +187,7 @@ def to_arrow_type(
arrow_type = to_arrow_type(
dt.sqlType(),
error_on_duplicated_field_names_in_struct=error_on_duplicated_field_names_in_struct,
- timestamp_utc=timestamp_utc,
+ timezone=timezone,
prefers_large_types=prefers_large_types,
)
elif type(dt) == VariantType:
@@ -242,7 +232,7 @@ def to_arrow_schema(
schema: StructType,
*,
error_on_duplicated_field_names_in_struct: bool = False,
- timestamp_utc: bool = True,
+ timezone: Optional[str] = None,
prefers_large_types: bool = False,
) -> "pa.Schema":
"""
@@ -255,12 +245,8 @@ def to_arrow_schema(
error_on_duplicated_field_names_in_struct: bool, default False
Whether to raise an exception when there are duplicated field names in
an inner
:class:`pyspark.sql.types.StructType`. (default ``False``)
- timestamp_utc : bool, default True
- If ``True`` (the default), :class:`TimestampType` is converted to a
timezone-aware
- :class:`pyarrow.TimestampType` with UTC as the timezone. If ``False``,
- :class:`TimestampType` is converted to a timezone-naive
:class:`pyarrow.TimestampType`.
- The JVM expects timezone-aware timestamps to be in UTC. Always keep
this set to ``True``
- except in special cases, such as when this function is used in a test
+ timezone : str, default None
+ timeZone required for TimestampType
Returns
-------
@@ -274,7 +260,7 @@ def to_arrow_schema(
to_arrow_type(
field.dataType,
error_on_duplicated_field_names_in_struct=error_on_duplicated_field_names_in_struct,
- timestamp_utc=timestamp_utc,
+ timezone=timezone,
prefers_large_types=prefers_large_types,
),
nullable=field.nullable,
diff --git a/python/pyspark/sql/tests/arrow/test_arrow.py
b/python/pyspark/sql/tests/arrow/test_arrow.py
index db5c7d18a0fc..2ac4836e2c96 100644
--- a/python/pyspark/sql/tests/arrow/test_arrow.py
+++ b/python/pyspark/sql/tests/arrow/test_arrow.py
@@ -757,11 +757,11 @@ class ArrowTestsMixin:
def test_schema_conversion_roundtrip(self):
from pyspark.sql.pandas.types import from_arrow_schema, to_arrow_schema
- arrow_schema = to_arrow_schema(self.schema, prefers_large_types=False)
+ arrow_schema = to_arrow_schema(self.schema, timezone="UTC",
prefers_large_types=False)
schema_rt = from_arrow_schema(arrow_schema, prefer_timestamp_ntz=True)
self.assertEqual(self.schema, schema_rt)
- arrow_schema = to_arrow_schema(self.schema, prefers_large_types=True)
+ arrow_schema = to_arrow_schema(self.schema, timezone="UTC",
prefers_large_types=True)
schema_rt = from_arrow_schema(arrow_schema, prefer_timestamp_ntz=True)
self.assertEqual(self.schema, schema_rt)
@@ -823,11 +823,11 @@ class ArrowTestsMixin:
),
]:
with self.subTest(data_type=t):
- at = to_arrow_type(t)
+ at = to_arrow_type(t, timezone="UTC")
t2 = from_arrow_type(at)
self.assertEqual(t, t2)
- at2 = to_arrow_type(t, prefers_large_types=True)
+ at2 = to_arrow_type(t, timezone="UTC",
prefers_large_types=True)
t3 = from_arrow_type(at2)
self.assertEqual(t, t3)
@@ -1538,22 +1538,6 @@ class ArrowTestsMixin:
self.assertEqual(df.first(), expected)
- def test_createDataFrame_arrow_nested_timestamp(self):
- from pyspark.sql.pandas.types import to_arrow_schema
-
- schema = self.schema_nested_timestamp
- data = self.data_nested_timestamp
- pdf = pd.DataFrame.from_records(data, columns=schema.names)
- arrow_schema = to_arrow_schema(schema, timestamp_utc=False)
- t = pa.Table.from_pandas(pdf, arrow_schema)
-
- with self.sql_conf({"spark.sql.session.timeZone": "America/New_York"}):
- df = self.spark.createDataFrame(t, schema)
-
- expected = self.data_nested_timestamp_expected_ny
-
- self.assertEqual(df.first(), expected)
-
def test_toPandas_timestmap_tzinfo(self):
for arrow_enabled in [True, False]:
with self.subTest(arrow_enabled=arrow_enabled):
@@ -1620,15 +1604,15 @@ class ArrowTestsMixin:
assert_frame_equal(pdf, expected)
def test_toArrow_nested_timestamp(self):
+ from pyspark.sql.pandas.types import to_arrow_schema
+
schema = self.schema_nested_timestamp
data = self.data_nested_timestamp
df = self.spark.createDataFrame(data, schema)
t = df.toArrow()
- from pyspark.sql.pandas.types import to_arrow_schema
-
- arrow_schema = to_arrow_schema(schema)
+ arrow_schema = to_arrow_schema(schema, timezone="UTC")
expected = pa.Table.from_pydict(
{
"ts": [datetime.datetime(2023, 1, 1, 8, 0, 0)],
diff --git a/python/pyspark/sql/udf.py b/python/pyspark/sql/udf.py
index 61f65d5243b7..e145c6dfa79c 100644
--- a/python/pyspark/sql/udf.py
+++ b/python/pyspark/sql/udf.py
@@ -201,7 +201,7 @@ class UserDefinedFunction:
def _check_return_type(returnType: DataType, evalType: int) -> None:
if evalType == PythonEvalType.SQL_ARROW_BATCHED_UDF:
try:
- to_arrow_type(returnType)
+ to_arrow_type(returnType, timezone="UTC")
except TypeError:
raise PySparkNotImplementedError(
errorClass="NOT_IMPLEMENTED",
@@ -215,7 +215,7 @@ class UserDefinedFunction:
or evalType == PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF
):
try:
- to_arrow_type(returnType)
+ to_arrow_type(returnType, timezone="UTC")
except TypeError:
raise PySparkNotImplementedError(
errorClass="NOT_IMPLEMENTED",
@@ -228,7 +228,7 @@ class UserDefinedFunction:
or evalType == PythonEvalType.SQL_SCALAR_ARROW_ITER_UDF
):
try:
- to_arrow_type(returnType)
+ to_arrow_type(returnType, timezone="UTC")
except TypeError:
raise PySparkNotImplementedError(
errorClass="NOT_IMPLEMENTED",
@@ -243,7 +243,7 @@ class UserDefinedFunction:
):
if isinstance(returnType, StructType):
try:
- to_arrow_type(returnType)
+ to_arrow_type(returnType, timezone="UTC")
except TypeError:
raise PySparkNotImplementedError(
errorClass="NOT_IMPLEMENTED",
@@ -268,7 +268,7 @@ class UserDefinedFunction:
):
if isinstance(returnType, StructType):
try:
- to_arrow_type(returnType)
+ to_arrow_type(returnType, timezone="UTC")
except TypeError:
raise PySparkNotImplementedError(
errorClass="NOT_IMPLEMENTED",
@@ -290,7 +290,7 @@ class UserDefinedFunction:
):
if isinstance(returnType, StructType):
try:
- to_arrow_type(returnType)
+ to_arrow_type(returnType, timezone="UTC")
except TypeError:
raise PySparkNotImplementedError(
errorClass="NOT_IMPLEMENTED",
@@ -310,7 +310,7 @@ class UserDefinedFunction:
elif evalType == PythonEvalType.SQL_COGROUPED_MAP_PANDAS_UDF:
if isinstance(returnType, StructType):
try:
- to_arrow_type(returnType)
+ to_arrow_type(returnType, timezone="UTC")
except TypeError:
raise PySparkNotImplementedError(
errorClass="NOT_IMPLEMENTED",
@@ -330,7 +330,7 @@ class UserDefinedFunction:
elif evalType == PythonEvalType.SQL_COGROUPED_MAP_ARROW_UDF:
if isinstance(returnType, StructType):
try:
- to_arrow_type(returnType)
+ to_arrow_type(returnType, timezone="UTC")
except TypeError:
raise PySparkNotImplementedError(
errorClass="NOT_IMPLEMENTED",
@@ -358,7 +358,7 @@ class UserDefinedFunction:
f"{returnType}"
},
)
- to_arrow_type(returnType)
+ to_arrow_type(returnType, timezone="UTC")
except TypeError:
raise PySparkNotImplementedError(
errorClass="NOT_IMPLEMENTED",
@@ -370,7 +370,7 @@ class UserDefinedFunction:
elif evalType == PythonEvalType.SQL_GROUPED_AGG_ARROW_UDF:
try:
# Different from SQL_GROUPED_AGG_PANDAS_UDF, StructType is
allowed here
- to_arrow_type(returnType)
+ to_arrow_type(returnType, timezone="UTC")
except TypeError:
raise PySparkNotImplementedError(
errorClass="NOT_IMPLEMENTED",
diff --git a/python/pyspark/sql/worker/plan_data_source_read.py
b/python/pyspark/sql/worker/plan_data_source_read.py
index 51036f17586f..f4a1231955b4 100644
--- a/python/pyspark/sql/worker/plan_data_source_read.py
+++ b/python/pyspark/sql/worker/plan_data_source_read.py
@@ -77,7 +77,7 @@ def records_to_arrow_batches(
and append it to the records batch.
"""
- pa_schema = to_arrow_schema(return_type)
+ pa_schema = to_arrow_schema(return_type, timezone="UTC")
column_names = return_type.fieldNames()
column_converters = [
LocalDataToArrowConversion._create_converter(field.dataType) for field
in return_type.fields
diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py
index c7b0d0bf50c4..386efadce58e 100644
--- a/python/pyspark/worker.py
+++ b/python/pyspark/worker.py
@@ -224,7 +224,7 @@ def wrap_scalar_pandas_udf(f, args_offsets, kwargs_offsets,
return_type, runner_
func, args_kwargs_offsets = wrap_kwargs_support(f, args_offsets,
kwargs_offsets)
arrow_return_type = to_arrow_type(
- return_type, prefers_large_types=runner_conf.use_large_var_types
+ return_type, timezone="UTC",
prefers_large_types=runner_conf.use_large_var_types
)
def verify_result_type(result):
@@ -264,7 +264,7 @@ def wrap_scalar_arrow_udf(f, args_offsets, kwargs_offsets,
return_type, runner_c
func, args_kwargs_offsets = wrap_kwargs_support(f, args_offsets,
kwargs_offsets)
arrow_return_type = to_arrow_type(
- return_type, prefers_large_types=runner_conf.use_large_var_types
+ return_type, timezone="UTC",
prefers_large_types=runner_conf.use_large_var_types
)
def verify_result_type(result):
@@ -320,7 +320,7 @@ def wrap_arrow_batch_udf_arrow(f, args_offsets,
kwargs_offsets, return_type, run
zero_arg_exec = True
arrow_return_type = to_arrow_type(
- return_type, prefers_large_types=runner_conf.use_large_var_types
+ return_type, timezone="UTC",
prefers_large_types=runner_conf.use_large_var_types
)
if zero_arg_exec:
@@ -383,7 +383,7 @@ def wrap_arrow_batch_udf_legacy(f, args_offsets,
kwargs_offsets, return_type, ru
zero_arg_exec = True
arrow_return_type = to_arrow_type(
- return_type, prefers_large_types=runner_conf.use_large_var_types
+ return_type, timezone="UTC",
prefers_large_types=runner_conf.use_large_var_types
)
# "result_func" ensures the result of a Python UDF to be consistent
with/without Arrow
@@ -443,7 +443,7 @@ def wrap_arrow_batch_udf_legacy(f, args_offsets,
kwargs_offsets, return_type, ru
def wrap_pandas_batch_iter_udf(f, return_type, runner_conf):
arrow_return_type = to_arrow_type(
- return_type, prefers_large_types=runner_conf.use_large_var_types
+ return_type, timezone="UTC",
prefers_large_types=runner_conf.use_large_var_types
)
iter_type_label = "pandas.DataFrame" if type(return_type) == StructType
else "pandas.Series"
@@ -543,7 +543,7 @@ def verify_pandas_result(result, return_type,
assign_cols_by_name, truncate_retu
def wrap_arrow_array_iter_udf(f, return_type, runner_conf):
arrow_return_type = to_arrow_type(
- return_type, prefers_large_types=runner_conf.use_large_var_types
+ return_type, timezone="UTC",
prefers_large_types=runner_conf.use_large_var_types
)
def verify_result(result):
@@ -578,7 +578,7 @@ def wrap_arrow_array_iter_udf(f, return_type, runner_conf):
def wrap_arrow_batch_iter_udf(f, return_type, runner_conf):
arrow_return_type = to_arrow_type(
- return_type, prefers_large_types=runner_conf.use_large_var_types
+ return_type, timezone="UTC",
prefers_large_types=runner_conf.use_large_var_types
)
def verify_result(result):
@@ -614,11 +614,11 @@ def wrap_arrow_batch_iter_udf(f, return_type,
runner_conf):
def wrap_cogrouped_map_arrow_udf(f, return_type, argspec, runner_conf):
if runner_conf.assign_cols_by_name:
expected_cols_and_types = {
- col.name: to_arrow_type(col.dataType) for col in return_type.fields
+ col.name: to_arrow_type(col.dataType, timezone="UTC") for col in
return_type.fields
}
else:
expected_cols_and_types = [
- (col.name, to_arrow_type(col.dataType)) for col in
return_type.fields
+ (col.name, to_arrow_type(col.dataType, timezone="UTC")) for col in
return_type.fields
]
def wrapped(left_key_table, left_value_table, right_key_table,
right_value_table):
@@ -633,7 +633,10 @@ def wrap_cogrouped_map_arrow_udf(f, return_type, argspec,
runner_conf):
return result.to_batches()
- return lambda kl, vl, kr, vr: (wrapped(kl, vl, kr, vr),
to_arrow_type(return_type))
+ return lambda kl, vl, kr, vr: (
+ wrapped(kl, vl, kr, vr),
+ to_arrow_type(return_type, timezone="UTC"),
+ )
def wrap_cogrouped_map_pandas_udf(f, return_type, argspec, runner_conf):
@@ -656,7 +659,7 @@ def wrap_cogrouped_map_pandas_udf(f, return_type, argspec,
runner_conf):
return result
arrow_return_type = to_arrow_type(
- return_type, prefers_large_types=runner_conf.use_large_var_types
+ return_type, timezone="UTC",
prefers_large_types=runner_conf.use_large_var_types
)
return lambda kl, vl, kr, vr: [(wrapped(kl, vl, kr, vr),
arrow_return_type)]
@@ -757,11 +760,11 @@ def wrap_grouped_map_arrow_udf(f, return_type, argspec,
runner_conf):
if runner_conf.assign_cols_by_name:
expected_cols_and_types = {
- col.name: to_arrow_type(col.dataType) for col in return_type.fields
+ col.name: to_arrow_type(col.dataType, timezone="UTC") for col in
return_type.fields
}
else:
expected_cols_and_types = [
- (col.name, to_arrow_type(col.dataType)) for col in
return_type.fields
+ (col.name, to_arrow_type(col.dataType, timezone="UTC")) for col in
return_type.fields
]
def wrapped(key_batch, value_batches):
@@ -777,7 +780,7 @@ def wrap_grouped_map_arrow_udf(f, return_type, argspec,
runner_conf):
yield from result.to_batches()
arrow_return_type = to_arrow_type(
- return_type, prefers_large_types=runner_conf.use_large_var_types
+ return_type, timezone="UTC",
prefers_large_types=runner_conf.use_large_var_types
)
return lambda k, v: (wrapped(k, v), arrow_return_type)
@@ -785,11 +788,11 @@ def wrap_grouped_map_arrow_udf(f, return_type, argspec,
runner_conf):
def wrap_grouped_map_arrow_iter_udf(f, return_type, argspec, runner_conf):
if runner_conf.assign_cols_by_name:
expected_cols_and_types = {
- col.name: to_arrow_type(col.dataType) for col in return_type.fields
+ col.name: to_arrow_type(col.dataType, timezone="UTC") for col in
return_type.fields
}
else:
expected_cols_and_types = [
- (col.name, to_arrow_type(col.dataType)) for col in
return_type.fields
+ (col.name, to_arrow_type(col.dataType, timezone="UTC")) for col in
return_type.fields
]
def wrapped(key_batch, value_batches):
@@ -806,7 +809,7 @@ def wrap_grouped_map_arrow_iter_udf(f, return_type,
argspec, runner_conf):
yield from map(verify_element, result)
arrow_return_type = to_arrow_type(
- return_type, prefers_large_types=runner_conf.use_large_var_types
+ return_type, timezone="UTC",
prefers_large_types=runner_conf.use_large_var_types
)
return lambda k, v: (wrapped(k, v), arrow_return_type)
@@ -838,7 +841,7 @@ def wrap_grouped_map_pandas_udf(f, return_type, argspec,
runner_conf):
yield result
arrow_return_type = to_arrow_type(
- return_type, prefers_large_types=runner_conf.use_large_var_types
+ return_type, timezone="UTC",
prefers_large_types=runner_conf.use_large_var_types
)
def flatten_wrapper(k, v):
@@ -875,7 +878,7 @@ def wrap_grouped_map_pandas_iter_udf(f, return_type,
argspec, runner_conf):
yield from map(verify_element, result)
arrow_return_type = to_arrow_type(
- return_type, prefers_large_types=runner_conf.use_large_var_types
+ return_type, timezone="UTC",
prefers_large_types=runner_conf.use_large_var_types
)
def flatten_wrapper(k, v):
@@ -896,7 +899,7 @@ def wrap_grouped_transform_with_state_pandas_udf(f,
return_type, runner_conf):
return result_iter
arrow_return_type = to_arrow_type(
- return_type, prefers_large_types=runner_conf.use_large_var_types
+ return_type, timezone="UTC",
prefers_large_types=runner_conf.use_large_var_types
)
return lambda p, m, k, v: [(wrapped(p, m, k, v), arrow_return_type)]
@@ -919,7 +922,7 @@ def
wrap_grouped_transform_with_state_pandas_init_state_udf(f, return_type, runn
return result_iter
arrow_return_type = to_arrow_type(
- return_type, prefers_large_types=runner_conf.use_large_var_types
+ return_type, timezone="UTC",
prefers_large_types=runner_conf.use_large_var_types
)
return lambda p, m, k, v: [(wrapped(p, m, k, v), arrow_return_type)]
@@ -934,7 +937,7 @@ def wrap_grouped_transform_with_state_udf(f, return_type,
runner_conf):
return result_iter
arrow_return_type = to_arrow_type(
- return_type, prefers_large_types=runner_conf.use_large_var_types
+ return_type, timezone="UTC",
prefers_large_types=runner_conf.use_large_var_types
)
return lambda p, m, k, v: [(wrapped(p, m, k, v), arrow_return_type)]
@@ -956,7 +959,7 @@ def wrap_grouped_transform_with_state_init_state_udf(f,
return_type, runner_conf
return result_iter
arrow_return_type = to_arrow_type(
- return_type, prefers_large_types=runner_conf.use_large_var_types
+ return_type, timezone="UTC",
prefers_large_types=runner_conf.use_large_var_types
)
return lambda p, m, k, v: [(wrapped(p, m, k, v), arrow_return_type)]
@@ -1051,7 +1054,7 @@ def wrap_grouped_map_pandas_udf_with_state(f,
return_type, runner_conf):
)
arrow_return_type = to_arrow_type(
- return_type, prefers_large_types=runner_conf.use_large_var_types
+ return_type, timezone="UTC",
prefers_large_types=runner_conf.use_large_var_types
)
return lambda k, v, s: [(wrapped(k, v, s), arrow_return_type)]
@@ -1060,7 +1063,7 @@ def wrap_grouped_agg_pandas_udf(f, args_offsets,
kwargs_offsets, return_type, ru
func, args_kwargs_offsets = wrap_kwargs_support(f, args_offsets,
kwargs_offsets)
arrow_return_type = to_arrow_type(
- return_type, prefers_large_types=runner_conf.use_large_var_types
+ return_type, timezone="UTC",
prefers_large_types=runner_conf.use_large_var_types
)
def wrapped(*series):
@@ -1079,7 +1082,7 @@ def wrap_grouped_agg_arrow_udf(f, args_offsets,
kwargs_offsets, return_type, run
func, args_kwargs_offsets = wrap_kwargs_support(f, args_offsets,
kwargs_offsets)
arrow_return_type = to_arrow_type(
- return_type, prefers_large_types=runner_conf.use_large_var_types
+ return_type, timezone="UTC",
prefers_large_types=runner_conf.use_large_var_types
)
def wrapped(*series):
@@ -1098,7 +1101,7 @@ def wrap_grouped_agg_arrow_iter_udf(f, args_offsets,
kwargs_offsets, return_type
func, args_kwargs_offsets = wrap_kwargs_support(f, args_offsets,
kwargs_offsets)
arrow_return_type = to_arrow_type(
- return_type, prefers_large_types=runner_conf.use_large_var_types
+ return_type, timezone="UTC",
prefers_large_types=runner_conf.use_large_var_types
)
def wrapped(batch_iter):
@@ -1118,7 +1121,7 @@ def wrap_grouped_agg_pandas_iter_udf(f, args_offsets,
kwargs_offsets, return_typ
func, args_kwargs_offsets = wrap_kwargs_support(f, args_offsets,
kwargs_offsets)
arrow_return_type = to_arrow_type(
- return_type, prefers_large_types=runner_conf.use_large_var_types
+ return_type, timezone="UTC",
prefers_large_types=runner_conf.use_large_var_types
)
def wrapped(series_iter):
@@ -1186,7 +1189,7 @@ def wrap_unbounded_window_agg_pandas_udf(f, args_offsets,
kwargs_offsets, return
# to match window length, where grouped_agg_pandas_udf just returns
# the scalar value.
arrow_return_type = to_arrow_type(
- return_type, prefers_large_types=runner_conf.use_large_var_types
+ return_type, timezone="UTC",
prefers_large_types=runner_conf.use_large_var_types
)
def wrapped(*series):
@@ -1207,7 +1210,7 @@ def wrap_unbounded_window_agg_arrow_udf(f, args_offsets,
kwargs_offsets, return_
# This is similar to wrap_unbounded_window_agg_pandas_udf, the only
difference
# is that this function is for arrow udf.
arrow_return_type = to_arrow_type(
- return_type, prefers_large_types=runner_conf.use_large_var_types
+ return_type, timezone="UTC",
prefers_large_types=runner_conf.use_large_var_types
)
def wrapped(*series):
@@ -1228,7 +1231,7 @@ def wrap_bounded_window_agg_pandas_udf(f, args_offsets,
kwargs_offsets, return_t
func, args_kwargs_offsets = wrap_kwargs_support(f, args_offsets[2:],
kwargs_offsets)
arrow_return_type = to_arrow_type(
- return_type, prefers_large_types=runner_conf.use_large_var_types
+ return_type, timezone="UTC",
prefers_large_types=runner_conf.use_large_var_types
)
def wrapped(begin_index, end_index, *series):
@@ -1272,7 +1275,7 @@ def wrap_bounded_window_agg_arrow_udf(f, args_offsets,
kwargs_offsets, return_ty
func, args_kwargs_offsets = wrap_kwargs_support(f, args_offsets[2:],
kwargs_offsets)
arrow_return_type = to_arrow_type(
- return_type, prefers_large_types=runner_conf.use_large_var_types
+ return_type, timezone="UTC",
prefers_large_types=runner_conf.use_large_var_types
)
def wrapped(begin_index, end_index, *series):
@@ -2234,7 +2237,7 @@ def read_udtf(pickleSer, infile, eval_type, runner_conf):
import pandas as pd
arrow_return_type = to_arrow_type(
- return_type,
prefers_large_types=runner_conf.use_large_var_types
+ return_type, timezone="UTC",
prefers_large_types=runner_conf.use_large_var_types
)
return_type_size = len(return_type)
@@ -2361,7 +2364,7 @@ def read_udtf(pickleSer, infile, eval_type, runner_conf):
import pyarrow as pa
arrow_return_type = to_arrow_type(
- return_type,
prefers_large_types=runner_conf.use_large_var_types
+ return_type, timezone="UTC",
prefers_large_types=runner_conf.use_large_var_types
)
return_type_size = len(return_type)
@@ -2536,7 +2539,7 @@ def read_udtf(pickleSer, infile, eval_type, runner_conf):
import pyarrow as pa
arrow_return_type = to_arrow_type(
- return_type,
prefers_large_types=runner_conf.use_large_var_types
+ return_type, timezone="UTC",
prefers_large_types=runner_conf.use_large_var_types
)
return_type_size = len(return_type)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]