This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 7b62f5fc6360 [SPARK-54903][PYTHON] Make to_arrow_schema/to_arrow_type 
able to set timezone
7b62f5fc6360 is described below

commit 7b62f5fc63600bfa51f99e36ebaaa20e4fe8cea2
Author: Ruifeng Zheng <[email protected]>
AuthorDate: Tue Jan 6 09:55:57 2026 +0800

    [SPARK-54903][PYTHON] Make to_arrow_schema/to_arrow_type able to set 
timezone
    
    ### What changes were proposed in this pull request?
    This PR make `to_arrow_schema/to_arrow_type` able to set timezone, and keep 
existing behaviors by setting `UTC` as the default timezone.
    So that we can check whether the timezone is reasonable later.
    
    ### Why are the changes needed?
    it is kind of problematical in pyspark timezone handling, one example is
    `to_arrow_schema/to_arrow_type` is not able to set the timezone
    
    while timezone is supported in JVM side
    
    
https://github.com/apache/spark/blob/d6f1e224df8fc620ab44817e4c1034f990f5d8ff/sql/api/src/main/scala/org/apache/spark/sql/util/ArrowUtils.scala#L40-L68
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    CI
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #53678 from zhengruifeng/revisit_timezone.
    
    Authored-by: Ruifeng Zheng <[email protected]>
    Signed-off-by: Ruifeng Zheng <[email protected]>
---
 python/pyspark/pandas/typedef/typehints.py         |  4 +-
 python/pyspark/sql/connect/dataframe.py            |  6 +-
 python/pyspark/sql/connect/session.py              |  7 ++-
 python/pyspark/sql/conversion.py                   |  1 +
 python/pyspark/sql/pandas/conversion.py            | 12 +++-
 python/pyspark/sql/pandas/serializers.py           |  4 +-
 python/pyspark/sql/pandas/types.py                 | 46 +++++---------
 python/pyspark/sql/tests/arrow/test_arrow.py       | 30 +++------
 python/pyspark/sql/udf.py                          | 20 +++---
 python/pyspark/sql/worker/plan_data_source_read.py |  2 +-
 python/pyspark/worker.py                           | 73 +++++++++++-----------
 11 files changed, 97 insertions(+), 108 deletions(-)

diff --git a/python/pyspark/pandas/typedef/typehints.py 
b/python/pyspark/pandas/typedef/typehints.py
index dd40330fd7cb..8d2fd4a3c0af 100644
--- a/python/pyspark/pandas/typedef/typehints.py
+++ b/python/pyspark/pandas/typedef/typehints.py
@@ -305,7 +305,9 @@ def spark_type_to_pandas_dtype(
             == "true"
         )
         return np.dtype(
-            to_arrow_type(spark_type, 
prefers_large_types=prefers_large_var_types).to_pandas_dtype()
+            to_arrow_type(
+                spark_type, timezone="UTC", 
prefers_large_types=prefers_large_var_types
+            ).to_pandas_dtype()
         )
 
 
diff --git a/python/pyspark/sql/connect/dataframe.py 
b/python/pyspark/sql/connect/dataframe.py
index e60b1c21c556..4552fbe18f40 100644
--- a/python/pyspark/sql/connect/dataframe.py
+++ b/python/pyspark/sql/connect/dataframe.py
@@ -1840,7 +1840,11 @@ class DataFrame(ParentDataFrame):
         return (table, schema)
 
     def toArrow(self) -> "pa.Table":
-        schema = to_arrow_schema(self.schema, 
error_on_duplicated_field_names_in_struct=True)
+        schema = to_arrow_schema(
+            self.schema,
+            error_on_duplicated_field_names_in_struct=True,
+            timezone="UTC",
+        )
         table, _ = self._to_table()
         return table.cast(schema)
 
diff --git a/python/pyspark/sql/connect/session.py 
b/python/pyspark/sql/connect/session.py
index ac1d1f5681e3..1c7cde825ef0 100644
--- a/python/pyspark/sql/connect/session.py
+++ b/python/pyspark/sql/connect/session.py
@@ -600,7 +600,9 @@ class SparkSession:
                 deduped_schema = cast(StructType, 
_deduplicate_field_names(schema))
                 spark_types = [field.dataType for field in 
deduped_schema.fields]
                 arrow_schema = to_arrow_schema(
-                    deduped_schema, prefers_large_types=prefers_large_types
+                    deduped_schema,
+                    timezone="UTC",
+                    prefers_large_types=prefers_large_types,
                 )
                 arrow_types = [field.type for field in arrow_schema]
                 _cols = [str(x) if not isinstance(x, str) else x for x in 
schema.fieldNames()]
@@ -620,7 +622,7 @@ class SparkSession:
                     for t in data.dtypes
                 ]
                 arrow_types = [
-                    to_arrow_type(dt, prefers_large_types=prefers_large_types)
+                    to_arrow_type(dt, timezone="UTC", 
prefers_large_types=prefers_large_types)
                     if dt is not None
                     else None
                     for dt in spark_types
@@ -667,6 +669,7 @@ class SparkSession:
                     to_arrow_schema(
                         schema,
                         error_on_duplicated_field_names_in_struct=True,
+                        timezone="UTC",
                         prefers_large_types=prefers_large_types,
                     )
                 )
diff --git a/python/pyspark/sql/conversion.py b/python/pyspark/sql/conversion.py
index e035f259c836..a657169ca796 100644
--- a/python/pyspark/sql/conversion.py
+++ b/python/pyspark/sql/conversion.py
@@ -510,6 +510,7 @@ class LocalDataToArrowConversion:
                         for field in schema.fields
                     ]
                 ),
+                timezone="UTC",
                 prefers_large_types=use_large_var_types,
             )
 
diff --git a/python/pyspark/sql/pandas/conversion.py 
b/python/pyspark/sql/pandas/conversion.py
index 0e63bcef7d88..5e6e181777f2 100644
--- a/python/pyspark/sql/pandas/conversion.py
+++ b/python/pyspark/sql/pandas/conversion.py
@@ -218,7 +218,9 @@ class PandasConversionMixin:
 
                 require_minimum_pyarrow_version()
                 arrow_schema = to_arrow_schema(
-                    self.schema, prefers_large_types=prefers_large_var_types
+                    self.schema,
+                    timezone="UTC",
+                    prefers_large_types=prefers_large_var_types,
                 )
             except Exception as e:
                 if arrowPySparkFallbackEnabled == "true":
@@ -345,6 +347,7 @@ class PandasConversionMixin:
         schema = to_arrow_schema(
             self.schema,
             error_on_duplicated_field_names_in_struct=True,
+            timezone="UTC",
             prefers_large_types=prefers_large_var_types,
         )
 
@@ -435,7 +438,9 @@ class PandasConversionMixin:
             from pyspark.sql.pandas.types import to_arrow_schema
             import pyarrow as pa
 
-            schema = to_arrow_schema(self.schema, 
prefers_large_types=prefers_large_var_types)
+            schema = to_arrow_schema(
+                self.schema, timezone="UTC", 
prefers_large_types=prefers_large_var_types
+            )
             empty_arrays = [pa.array([], type=field.type) for field in schema]
             return [pa.RecordBatch.from_arrays(empty_arrays, schema=schema)]
 
@@ -882,7 +887,7 @@ class SparkConversionMixin:
             [
                 (
                     c,
-                    to_arrow_type(t, 
prefers_large_types=prefers_large_var_types)
+                    to_arrow_type(t, timezone="UTC", 
prefers_large_types=prefers_large_var_types)
                     if t is not None
                     else None,
                     t,
@@ -961,6 +966,7 @@ class SparkConversionMixin:
             to_arrow_schema(
                 schema,
                 error_on_duplicated_field_names_in_struct=True,
+                timezone="UTC",
                 prefers_large_types=prefers_large_var_types,
             )
         )
diff --git a/python/pyspark/sql/pandas/serializers.py 
b/python/pyspark/sql/pandas/serializers.py
index fc86986e0fc0..7d3fbd78754c 100644
--- a/python/pyspark/sql/pandas/serializers.py
+++ b/python/pyspark/sql/pandas/serializers.py
@@ -1380,7 +1380,7 @@ class 
ApplyInPandasWithStateSerializer(ArrowStreamPandasUDFSerializer):
         )
 
         self.result_count_pdf_arrow_type = to_arrow_type(
-            self.result_count_df_type, 
prefers_large_types=prefers_large_var_types
+            self.result_count_df_type, timezone="UTC", 
prefers_large_types=prefers_large_var_types
         )
 
         self.result_state_df_type = StructType(
@@ -1393,7 +1393,7 @@ class 
ApplyInPandasWithStateSerializer(ArrowStreamPandasUDFSerializer):
         )
 
         self.result_state_pdf_arrow_type = to_arrow_type(
-            self.result_state_df_type, 
prefers_large_types=prefers_large_var_types
+            self.result_state_df_type, timezone="UTC", 
prefers_large_types=prefers_large_var_types
         )
         self.arrow_max_records_per_batch = (
             arrow_max_records_per_batch if arrow_max_records_per_batch > 0 
else 2**31 - 1
diff --git a/python/pyspark/sql/pandas/types.py 
b/python/pyspark/sql/pandas/types.py
index 555dbe0a7c0d..dcecdae2b0d0 100644
--- a/python/pyspark/sql/pandas/types.py
+++ b/python/pyspark/sql/pandas/types.py
@@ -73,7 +73,7 @@ def to_arrow_type(
     dt: DataType,
     *,
     error_on_duplicated_field_names_in_struct: bool = False,
-    timestamp_utc: bool = True,
+    timezone: Optional[str] = None,
     prefers_large_types: bool = False,
 ) -> "pa.DataType":
     """
@@ -86,12 +86,8 @@ def to_arrow_type(
     error_on_duplicated_field_names_in_struct: bool, default False
         Whether to raise an exception when there are duplicated field names in 
a
         :class:`pyspark.sql.types.StructType`. (default ``False``)
-    timestamp_utc : bool, default True
-        If ``True`` (the default), :class:`TimestampType` is converted to a 
timezone-aware
-        :class:`pyarrow.TimestampType` with UTC as the timezone. If ``False``,
-        :class:`TimestampType` is converted to a timezone-naive 
:class:`pyarrow.TimestampType`.
-        The JVM expects timezone-aware timestamps to be in UTC. Always keep 
this set to ``True``
-        except in special cases, such as when this function is used in a test.
+    timezone : str, default None
+        timeZone required for TimestampType
 
     Returns
     -------
@@ -115,21 +111,15 @@ def to_arrow_type(
         arrow_type = pa.float64()
     elif type(dt) == DecimalType:
         arrow_type = pa.decimal128(dt.precision, dt.scale)
-    elif type(dt) == StringType and prefers_large_types:
-        arrow_type = pa.large_string()
     elif type(dt) == StringType:
-        arrow_type = pa.string()
-    elif type(dt) == BinaryType and prefers_large_types:
-        arrow_type = pa.large_binary()
+        arrow_type = pa.large_string() if prefers_large_types else pa.string()
     elif type(dt) == BinaryType:
-        arrow_type = pa.binary()
+        arrow_type = pa.large_binary() if prefers_large_types else pa.binary()
     elif type(dt) == DateType:
         arrow_type = pa.date32()
-    elif type(dt) == TimestampType and timestamp_utc:
-        # Timestamps should be in UTC, JVM Arrow timestamps require a timezone 
to be read
-        arrow_type = pa.timestamp("us", tz="UTC")
     elif type(dt) == TimestampType:
-        arrow_type = pa.timestamp("us", tz=None)
+        assert timezone is not None
+        arrow_type = pa.timestamp("us", tz=timezone)
     elif type(dt) == TimestampNTZType:
         arrow_type = pa.timestamp("us", tz=None)
     elif type(dt) == DayTimeIntervalType:
@@ -142,7 +132,7 @@ def to_arrow_type(
             to_arrow_type(
                 dt.elementType,
                 
error_on_duplicated_field_names_in_struct=error_on_duplicated_field_names_in_struct,
-                timestamp_utc=timestamp_utc,
+                timezone=timezone,
                 prefers_large_types=prefers_large_types,
             ),
             nullable=dt.containsNull,
@@ -154,7 +144,7 @@ def to_arrow_type(
             to_arrow_type(
                 dt.keyType,
                 
error_on_duplicated_field_names_in_struct=error_on_duplicated_field_names_in_struct,
-                timestamp_utc=timestamp_utc,
+                timezone=timezone,
                 prefers_large_types=prefers_large_types,
             ),
             nullable=False,
@@ -164,7 +154,7 @@ def to_arrow_type(
             to_arrow_type(
                 dt.valueType,
                 
error_on_duplicated_field_names_in_struct=error_on_duplicated_field_names_in_struct,
-                timestamp_utc=timestamp_utc,
+                timezone=timezone,
                 prefers_large_types=prefers_large_types,
             ),
             nullable=dt.valueContainsNull,
@@ -183,7 +173,7 @@ def to_arrow_type(
                 to_arrow_type(
                     field.dataType,
                     
error_on_duplicated_field_names_in_struct=error_on_duplicated_field_names_in_struct,
-                    timestamp_utc=timestamp_utc,
+                    timezone=timezone,
                     prefers_large_types=prefers_large_types,
                 ),
                 nullable=field.nullable,
@@ -197,7 +187,7 @@ def to_arrow_type(
         arrow_type = to_arrow_type(
             dt.sqlType(),
             
error_on_duplicated_field_names_in_struct=error_on_duplicated_field_names_in_struct,
-            timestamp_utc=timestamp_utc,
+            timezone=timezone,
             prefers_large_types=prefers_large_types,
         )
     elif type(dt) == VariantType:
@@ -242,7 +232,7 @@ def to_arrow_schema(
     schema: StructType,
     *,
     error_on_duplicated_field_names_in_struct: bool = False,
-    timestamp_utc: bool = True,
+    timezone: Optional[str] = None,
     prefers_large_types: bool = False,
 ) -> "pa.Schema":
     """
@@ -255,12 +245,8 @@ def to_arrow_schema(
     error_on_duplicated_field_names_in_struct: bool, default False
         Whether to raise an exception when there are duplicated field names in 
an inner
         :class:`pyspark.sql.types.StructType`. (default ``False``)
-    timestamp_utc : bool, default True
-        If ``True`` (the default), :class:`TimestampType` is converted to a 
timezone-aware
-        :class:`pyarrow.TimestampType` with UTC as the timezone. If ``False``,
-        :class:`TimestampType` is converted to a timezone-naive 
:class:`pyarrow.TimestampType`.
-        The JVM expects timezone-aware timestamps to be in UTC. Always keep 
this set to ``True``
-        except in special cases, such as when this function is used in a test
+    timezone : str, default None
+        timeZone required for TimestampType
 
     Returns
     -------
@@ -274,7 +260,7 @@ def to_arrow_schema(
             to_arrow_type(
                 field.dataType,
                 
error_on_duplicated_field_names_in_struct=error_on_duplicated_field_names_in_struct,
-                timestamp_utc=timestamp_utc,
+                timezone=timezone,
                 prefers_large_types=prefers_large_types,
             ),
             nullable=field.nullable,
diff --git a/python/pyspark/sql/tests/arrow/test_arrow.py 
b/python/pyspark/sql/tests/arrow/test_arrow.py
index db5c7d18a0fc..2ac4836e2c96 100644
--- a/python/pyspark/sql/tests/arrow/test_arrow.py
+++ b/python/pyspark/sql/tests/arrow/test_arrow.py
@@ -757,11 +757,11 @@ class ArrowTestsMixin:
     def test_schema_conversion_roundtrip(self):
         from pyspark.sql.pandas.types import from_arrow_schema, to_arrow_schema
 
-        arrow_schema = to_arrow_schema(self.schema, prefers_large_types=False)
+        arrow_schema = to_arrow_schema(self.schema, timezone="UTC", 
prefers_large_types=False)
         schema_rt = from_arrow_schema(arrow_schema, prefer_timestamp_ntz=True)
         self.assertEqual(self.schema, schema_rt)
 
-        arrow_schema = to_arrow_schema(self.schema, prefers_large_types=True)
+        arrow_schema = to_arrow_schema(self.schema, timezone="UTC", 
prefers_large_types=True)
         schema_rt = from_arrow_schema(arrow_schema, prefer_timestamp_ntz=True)
         self.assertEqual(self.schema, schema_rt)
 
@@ -823,11 +823,11 @@ class ArrowTestsMixin:
             ),
         ]:
             with self.subTest(data_type=t):
-                at = to_arrow_type(t)
+                at = to_arrow_type(t, timezone="UTC")
                 t2 = from_arrow_type(at)
                 self.assertEqual(t, t2)
 
-                at2 = to_arrow_type(t, prefers_large_types=True)
+                at2 = to_arrow_type(t, timezone="UTC", 
prefers_large_types=True)
                 t3 = from_arrow_type(at2)
                 self.assertEqual(t, t3)
 
@@ -1538,22 +1538,6 @@ class ArrowTestsMixin:
 
         self.assertEqual(df.first(), expected)
 
-    def test_createDataFrame_arrow_nested_timestamp(self):
-        from pyspark.sql.pandas.types import to_arrow_schema
-
-        schema = self.schema_nested_timestamp
-        data = self.data_nested_timestamp
-        pdf = pd.DataFrame.from_records(data, columns=schema.names)
-        arrow_schema = to_arrow_schema(schema, timestamp_utc=False)
-        t = pa.Table.from_pandas(pdf, arrow_schema)
-
-        with self.sql_conf({"spark.sql.session.timeZone": "America/New_York"}):
-            df = self.spark.createDataFrame(t, schema)
-
-        expected = self.data_nested_timestamp_expected_ny
-
-        self.assertEqual(df.first(), expected)
-
     def test_toPandas_timestmap_tzinfo(self):
         for arrow_enabled in [True, False]:
             with self.subTest(arrow_enabled=arrow_enabled):
@@ -1620,15 +1604,15 @@ class ArrowTestsMixin:
         assert_frame_equal(pdf, expected)
 
     def test_toArrow_nested_timestamp(self):
+        from pyspark.sql.pandas.types import to_arrow_schema
+
         schema = self.schema_nested_timestamp
         data = self.data_nested_timestamp
         df = self.spark.createDataFrame(data, schema)
 
         t = df.toArrow()
 
-        from pyspark.sql.pandas.types import to_arrow_schema
-
-        arrow_schema = to_arrow_schema(schema)
+        arrow_schema = to_arrow_schema(schema, timezone="UTC")
         expected = pa.Table.from_pydict(
             {
                 "ts": [datetime.datetime(2023, 1, 1, 8, 0, 0)],
diff --git a/python/pyspark/sql/udf.py b/python/pyspark/sql/udf.py
index 61f65d5243b7..e145c6dfa79c 100644
--- a/python/pyspark/sql/udf.py
+++ b/python/pyspark/sql/udf.py
@@ -201,7 +201,7 @@ class UserDefinedFunction:
     def _check_return_type(returnType: DataType, evalType: int) -> None:
         if evalType == PythonEvalType.SQL_ARROW_BATCHED_UDF:
             try:
-                to_arrow_type(returnType)
+                to_arrow_type(returnType, timezone="UTC")
             except TypeError:
                 raise PySparkNotImplementedError(
                     errorClass="NOT_IMPLEMENTED",
@@ -215,7 +215,7 @@ class UserDefinedFunction:
             or evalType == PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF
         ):
             try:
-                to_arrow_type(returnType)
+                to_arrow_type(returnType, timezone="UTC")
             except TypeError:
                 raise PySparkNotImplementedError(
                     errorClass="NOT_IMPLEMENTED",
@@ -228,7 +228,7 @@ class UserDefinedFunction:
             or evalType == PythonEvalType.SQL_SCALAR_ARROW_ITER_UDF
         ):
             try:
-                to_arrow_type(returnType)
+                to_arrow_type(returnType, timezone="UTC")
             except TypeError:
                 raise PySparkNotImplementedError(
                     errorClass="NOT_IMPLEMENTED",
@@ -243,7 +243,7 @@ class UserDefinedFunction:
         ):
             if isinstance(returnType, StructType):
                 try:
-                    to_arrow_type(returnType)
+                    to_arrow_type(returnType, timezone="UTC")
                 except TypeError:
                     raise PySparkNotImplementedError(
                         errorClass="NOT_IMPLEMENTED",
@@ -268,7 +268,7 @@ class UserDefinedFunction:
         ):
             if isinstance(returnType, StructType):
                 try:
-                    to_arrow_type(returnType)
+                    to_arrow_type(returnType, timezone="UTC")
                 except TypeError:
                     raise PySparkNotImplementedError(
                         errorClass="NOT_IMPLEMENTED",
@@ -290,7 +290,7 @@ class UserDefinedFunction:
         ):
             if isinstance(returnType, StructType):
                 try:
-                    to_arrow_type(returnType)
+                    to_arrow_type(returnType, timezone="UTC")
                 except TypeError:
                     raise PySparkNotImplementedError(
                         errorClass="NOT_IMPLEMENTED",
@@ -310,7 +310,7 @@ class UserDefinedFunction:
         elif evalType == PythonEvalType.SQL_COGROUPED_MAP_PANDAS_UDF:
             if isinstance(returnType, StructType):
                 try:
-                    to_arrow_type(returnType)
+                    to_arrow_type(returnType, timezone="UTC")
                 except TypeError:
                     raise PySparkNotImplementedError(
                         errorClass="NOT_IMPLEMENTED",
@@ -330,7 +330,7 @@ class UserDefinedFunction:
         elif evalType == PythonEvalType.SQL_COGROUPED_MAP_ARROW_UDF:
             if isinstance(returnType, StructType):
                 try:
-                    to_arrow_type(returnType)
+                    to_arrow_type(returnType, timezone="UTC")
                 except TypeError:
                     raise PySparkNotImplementedError(
                         errorClass="NOT_IMPLEMENTED",
@@ -358,7 +358,7 @@ class UserDefinedFunction:
                             f"{returnType}"
                         },
                     )
-                to_arrow_type(returnType)
+                to_arrow_type(returnType, timezone="UTC")
             except TypeError:
                 raise PySparkNotImplementedError(
                     errorClass="NOT_IMPLEMENTED",
@@ -370,7 +370,7 @@ class UserDefinedFunction:
         elif evalType == PythonEvalType.SQL_GROUPED_AGG_ARROW_UDF:
             try:
                 # Different from SQL_GROUPED_AGG_PANDAS_UDF, StructType is 
allowed here
-                to_arrow_type(returnType)
+                to_arrow_type(returnType, timezone="UTC")
             except TypeError:
                 raise PySparkNotImplementedError(
                     errorClass="NOT_IMPLEMENTED",
diff --git a/python/pyspark/sql/worker/plan_data_source_read.py 
b/python/pyspark/sql/worker/plan_data_source_read.py
index 51036f17586f..f4a1231955b4 100644
--- a/python/pyspark/sql/worker/plan_data_source_read.py
+++ b/python/pyspark/sql/worker/plan_data_source_read.py
@@ -77,7 +77,7 @@ def records_to_arrow_batches(
     and append it to the records batch.
     """
 
-    pa_schema = to_arrow_schema(return_type)
+    pa_schema = to_arrow_schema(return_type, timezone="UTC")
     column_names = return_type.fieldNames()
     column_converters = [
         LocalDataToArrowConversion._create_converter(field.dataType) for field 
in return_type.fields
diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py
index c7b0d0bf50c4..386efadce58e 100644
--- a/python/pyspark/worker.py
+++ b/python/pyspark/worker.py
@@ -224,7 +224,7 @@ def wrap_scalar_pandas_udf(f, args_offsets, kwargs_offsets, 
return_type, runner_
     func, args_kwargs_offsets = wrap_kwargs_support(f, args_offsets, 
kwargs_offsets)
 
     arrow_return_type = to_arrow_type(
-        return_type, prefers_large_types=runner_conf.use_large_var_types
+        return_type, timezone="UTC", 
prefers_large_types=runner_conf.use_large_var_types
     )
 
     def verify_result_type(result):
@@ -264,7 +264,7 @@ def wrap_scalar_arrow_udf(f, args_offsets, kwargs_offsets, 
return_type, runner_c
     func, args_kwargs_offsets = wrap_kwargs_support(f, args_offsets, 
kwargs_offsets)
 
     arrow_return_type = to_arrow_type(
-        return_type, prefers_large_types=runner_conf.use_large_var_types
+        return_type, timezone="UTC", 
prefers_large_types=runner_conf.use_large_var_types
     )
 
     def verify_result_type(result):
@@ -320,7 +320,7 @@ def wrap_arrow_batch_udf_arrow(f, args_offsets, 
kwargs_offsets, return_type, run
         zero_arg_exec = True
 
     arrow_return_type = to_arrow_type(
-        return_type, prefers_large_types=runner_conf.use_large_var_types
+        return_type, timezone="UTC", 
prefers_large_types=runner_conf.use_large_var_types
     )
 
     if zero_arg_exec:
@@ -383,7 +383,7 @@ def wrap_arrow_batch_udf_legacy(f, args_offsets, 
kwargs_offsets, return_type, ru
         zero_arg_exec = True
 
     arrow_return_type = to_arrow_type(
-        return_type, prefers_large_types=runner_conf.use_large_var_types
+        return_type, timezone="UTC", 
prefers_large_types=runner_conf.use_large_var_types
     )
 
     # "result_func" ensures the result of a Python UDF to be consistent 
with/without Arrow
@@ -443,7 +443,7 @@ def wrap_arrow_batch_udf_legacy(f, args_offsets, 
kwargs_offsets, return_type, ru
 
 def wrap_pandas_batch_iter_udf(f, return_type, runner_conf):
     arrow_return_type = to_arrow_type(
-        return_type, prefers_large_types=runner_conf.use_large_var_types
+        return_type, timezone="UTC", 
prefers_large_types=runner_conf.use_large_var_types
     )
     iter_type_label = "pandas.DataFrame" if type(return_type) == StructType 
else "pandas.Series"
 
@@ -543,7 +543,7 @@ def verify_pandas_result(result, return_type, 
assign_cols_by_name, truncate_retu
 
 def wrap_arrow_array_iter_udf(f, return_type, runner_conf):
     arrow_return_type = to_arrow_type(
-        return_type, prefers_large_types=runner_conf.use_large_var_types
+        return_type, timezone="UTC", 
prefers_large_types=runner_conf.use_large_var_types
     )
 
     def verify_result(result):
@@ -578,7 +578,7 @@ def wrap_arrow_array_iter_udf(f, return_type, runner_conf):
 
 def wrap_arrow_batch_iter_udf(f, return_type, runner_conf):
     arrow_return_type = to_arrow_type(
-        return_type, prefers_large_types=runner_conf.use_large_var_types
+        return_type, timezone="UTC", 
prefers_large_types=runner_conf.use_large_var_types
     )
 
     def verify_result(result):
@@ -614,11 +614,11 @@ def wrap_arrow_batch_iter_udf(f, return_type, 
runner_conf):
 def wrap_cogrouped_map_arrow_udf(f, return_type, argspec, runner_conf):
     if runner_conf.assign_cols_by_name:
         expected_cols_and_types = {
-            col.name: to_arrow_type(col.dataType) for col in return_type.fields
+            col.name: to_arrow_type(col.dataType, timezone="UTC") for col in 
return_type.fields
         }
     else:
         expected_cols_and_types = [
-            (col.name, to_arrow_type(col.dataType)) for col in 
return_type.fields
+            (col.name, to_arrow_type(col.dataType, timezone="UTC")) for col in 
return_type.fields
         ]
 
     def wrapped(left_key_table, left_value_table, right_key_table, 
right_value_table):
@@ -633,7 +633,10 @@ def wrap_cogrouped_map_arrow_udf(f, return_type, argspec, 
runner_conf):
 
         return result.to_batches()
 
-    return lambda kl, vl, kr, vr: (wrapped(kl, vl, kr, vr), 
to_arrow_type(return_type))
+    return lambda kl, vl, kr, vr: (
+        wrapped(kl, vl, kr, vr),
+        to_arrow_type(return_type, timezone="UTC"),
+    )
 
 
 def wrap_cogrouped_map_pandas_udf(f, return_type, argspec, runner_conf):
@@ -656,7 +659,7 @@ def wrap_cogrouped_map_pandas_udf(f, return_type, argspec, 
runner_conf):
         return result
 
     arrow_return_type = to_arrow_type(
-        return_type, prefers_large_types=runner_conf.use_large_var_types
+        return_type, timezone="UTC", 
prefers_large_types=runner_conf.use_large_var_types
     )
     return lambda kl, vl, kr, vr: [(wrapped(kl, vl, kr, vr), 
arrow_return_type)]
 
@@ -757,11 +760,11 @@ def wrap_grouped_map_arrow_udf(f, return_type, argspec, 
runner_conf):
 
     if runner_conf.assign_cols_by_name:
         expected_cols_and_types = {
-            col.name: to_arrow_type(col.dataType) for col in return_type.fields
+            col.name: to_arrow_type(col.dataType, timezone="UTC") for col in 
return_type.fields
         }
     else:
         expected_cols_and_types = [
-            (col.name, to_arrow_type(col.dataType)) for col in 
return_type.fields
+            (col.name, to_arrow_type(col.dataType, timezone="UTC")) for col in 
return_type.fields
         ]
 
     def wrapped(key_batch, value_batches):
@@ -777,7 +780,7 @@ def wrap_grouped_map_arrow_udf(f, return_type, argspec, 
runner_conf):
         yield from result.to_batches()
 
     arrow_return_type = to_arrow_type(
-        return_type, prefers_large_types=runner_conf.use_large_var_types
+        return_type, timezone="UTC", 
prefers_large_types=runner_conf.use_large_var_types
     )
     return lambda k, v: (wrapped(k, v), arrow_return_type)
 
@@ -785,11 +788,11 @@ def wrap_grouped_map_arrow_udf(f, return_type, argspec, 
runner_conf):
 def wrap_grouped_map_arrow_iter_udf(f, return_type, argspec, runner_conf):
     if runner_conf.assign_cols_by_name:
         expected_cols_and_types = {
-            col.name: to_arrow_type(col.dataType) for col in return_type.fields
+            col.name: to_arrow_type(col.dataType, timezone="UTC") for col in 
return_type.fields
         }
     else:
         expected_cols_and_types = [
-            (col.name, to_arrow_type(col.dataType)) for col in 
return_type.fields
+            (col.name, to_arrow_type(col.dataType, timezone="UTC")) for col in 
return_type.fields
         ]
 
     def wrapped(key_batch, value_batches):
@@ -806,7 +809,7 @@ def wrap_grouped_map_arrow_iter_udf(f, return_type, 
argspec, runner_conf):
         yield from map(verify_element, result)
 
     arrow_return_type = to_arrow_type(
-        return_type, prefers_large_types=runner_conf.use_large_var_types
+        return_type, timezone="UTC", 
prefers_large_types=runner_conf.use_large_var_types
     )
     return lambda k, v: (wrapped(k, v), arrow_return_type)
 
@@ -838,7 +841,7 @@ def wrap_grouped_map_pandas_udf(f, return_type, argspec, 
runner_conf):
         yield result
 
     arrow_return_type = to_arrow_type(
-        return_type, prefers_large_types=runner_conf.use_large_var_types
+        return_type, timezone="UTC", 
prefers_large_types=runner_conf.use_large_var_types
     )
 
     def flatten_wrapper(k, v):
@@ -875,7 +878,7 @@ def wrap_grouped_map_pandas_iter_udf(f, return_type, 
argspec, runner_conf):
         yield from map(verify_element, result)
 
     arrow_return_type = to_arrow_type(
-        return_type, prefers_large_types=runner_conf.use_large_var_types
+        return_type, timezone="UTC", 
prefers_large_types=runner_conf.use_large_var_types
     )
 
     def flatten_wrapper(k, v):
@@ -896,7 +899,7 @@ def wrap_grouped_transform_with_state_pandas_udf(f, 
return_type, runner_conf):
         return result_iter
 
     arrow_return_type = to_arrow_type(
-        return_type, prefers_large_types=runner_conf.use_large_var_types
+        return_type, timezone="UTC", 
prefers_large_types=runner_conf.use_large_var_types
     )
     return lambda p, m, k, v: [(wrapped(p, m, k, v), arrow_return_type)]
 
@@ -919,7 +922,7 @@ def 
wrap_grouped_transform_with_state_pandas_init_state_udf(f, return_type, runn
         return result_iter
 
     arrow_return_type = to_arrow_type(
-        return_type, prefers_large_types=runner_conf.use_large_var_types
+        return_type, timezone="UTC", 
prefers_large_types=runner_conf.use_large_var_types
     )
     return lambda p, m, k, v: [(wrapped(p, m, k, v), arrow_return_type)]
 
@@ -934,7 +937,7 @@ def wrap_grouped_transform_with_state_udf(f, return_type, 
runner_conf):
         return result_iter
 
     arrow_return_type = to_arrow_type(
-        return_type, prefers_large_types=runner_conf.use_large_var_types
+        return_type, timezone="UTC", 
prefers_large_types=runner_conf.use_large_var_types
     )
     return lambda p, m, k, v: [(wrapped(p, m, k, v), arrow_return_type)]
 
@@ -956,7 +959,7 @@ def wrap_grouped_transform_with_state_init_state_udf(f, 
return_type, runner_conf
         return result_iter
 
     arrow_return_type = to_arrow_type(
-        return_type, prefers_large_types=runner_conf.use_large_var_types
+        return_type, timezone="UTC", 
prefers_large_types=runner_conf.use_large_var_types
     )
     return lambda p, m, k, v: [(wrapped(p, m, k, v), arrow_return_type)]
 
@@ -1051,7 +1054,7 @@ def wrap_grouped_map_pandas_udf_with_state(f, 
return_type, runner_conf):
         )
 
     arrow_return_type = to_arrow_type(
-        return_type, prefers_large_types=runner_conf.use_large_var_types
+        return_type, timezone="UTC", 
prefers_large_types=runner_conf.use_large_var_types
     )
     return lambda k, v, s: [(wrapped(k, v, s), arrow_return_type)]
 
@@ -1060,7 +1063,7 @@ def wrap_grouped_agg_pandas_udf(f, args_offsets, 
kwargs_offsets, return_type, ru
     func, args_kwargs_offsets = wrap_kwargs_support(f, args_offsets, 
kwargs_offsets)
 
     arrow_return_type = to_arrow_type(
-        return_type, prefers_large_types=runner_conf.use_large_var_types
+        return_type, timezone="UTC", 
prefers_large_types=runner_conf.use_large_var_types
     )
 
     def wrapped(*series):
@@ -1079,7 +1082,7 @@ def wrap_grouped_agg_arrow_udf(f, args_offsets, 
kwargs_offsets, return_type, run
     func, args_kwargs_offsets = wrap_kwargs_support(f, args_offsets, 
kwargs_offsets)
 
     arrow_return_type = to_arrow_type(
-        return_type, prefers_large_types=runner_conf.use_large_var_types
+        return_type, timezone="UTC", 
prefers_large_types=runner_conf.use_large_var_types
     )
 
     def wrapped(*series):
@@ -1098,7 +1101,7 @@ def wrap_grouped_agg_arrow_iter_udf(f, args_offsets, 
kwargs_offsets, return_type
     func, args_kwargs_offsets = wrap_kwargs_support(f, args_offsets, 
kwargs_offsets)
 
     arrow_return_type = to_arrow_type(
-        return_type, prefers_large_types=runner_conf.use_large_var_types
+        return_type, timezone="UTC", 
prefers_large_types=runner_conf.use_large_var_types
     )
 
     def wrapped(batch_iter):
@@ -1118,7 +1121,7 @@ def wrap_grouped_agg_pandas_iter_udf(f, args_offsets, 
kwargs_offsets, return_typ
     func, args_kwargs_offsets = wrap_kwargs_support(f, args_offsets, 
kwargs_offsets)
 
     arrow_return_type = to_arrow_type(
-        return_type, prefers_large_types=runner_conf.use_large_var_types
+        return_type, timezone="UTC", 
prefers_large_types=runner_conf.use_large_var_types
     )
 
     def wrapped(series_iter):
@@ -1186,7 +1189,7 @@ def wrap_unbounded_window_agg_pandas_udf(f, args_offsets, 
kwargs_offsets, return
     # to match window length, where grouped_agg_pandas_udf just returns
     # the scalar value.
     arrow_return_type = to_arrow_type(
-        return_type, prefers_large_types=runner_conf.use_large_var_types
+        return_type, timezone="UTC", 
prefers_large_types=runner_conf.use_large_var_types
     )
 
     def wrapped(*series):
@@ -1207,7 +1210,7 @@ def wrap_unbounded_window_agg_arrow_udf(f, args_offsets, 
kwargs_offsets, return_
     # This is similar to wrap_unbounded_window_agg_pandas_udf, the only 
difference
     # is that this function is for arrow udf.
     arrow_return_type = to_arrow_type(
-        return_type, prefers_large_types=runner_conf.use_large_var_types
+        return_type, timezone="UTC", 
prefers_large_types=runner_conf.use_large_var_types
     )
 
     def wrapped(*series):
@@ -1228,7 +1231,7 @@ def wrap_bounded_window_agg_pandas_udf(f, args_offsets, 
kwargs_offsets, return_t
     func, args_kwargs_offsets = wrap_kwargs_support(f, args_offsets[2:], 
kwargs_offsets)
 
     arrow_return_type = to_arrow_type(
-        return_type, prefers_large_types=runner_conf.use_large_var_types
+        return_type, timezone="UTC", 
prefers_large_types=runner_conf.use_large_var_types
     )
 
     def wrapped(begin_index, end_index, *series):
@@ -1272,7 +1275,7 @@ def wrap_bounded_window_agg_arrow_udf(f, args_offsets, 
kwargs_offsets, return_ty
     func, args_kwargs_offsets = wrap_kwargs_support(f, args_offsets[2:], 
kwargs_offsets)
 
     arrow_return_type = to_arrow_type(
-        return_type, prefers_large_types=runner_conf.use_large_var_types
+        return_type, timezone="UTC", 
prefers_large_types=runner_conf.use_large_var_types
     )
 
     def wrapped(begin_index, end_index, *series):
@@ -2234,7 +2237,7 @@ def read_udtf(pickleSer, infile, eval_type, runner_conf):
             import pandas as pd
 
             arrow_return_type = to_arrow_type(
-                return_type, 
prefers_large_types=runner_conf.use_large_var_types
+                return_type, timezone="UTC", 
prefers_large_types=runner_conf.use_large_var_types
             )
             return_type_size = len(return_type)
 
@@ -2361,7 +2364,7 @@ def read_udtf(pickleSer, infile, eval_type, runner_conf):
             import pyarrow as pa
 
             arrow_return_type = to_arrow_type(
-                return_type, 
prefers_large_types=runner_conf.use_large_var_types
+                return_type, timezone="UTC", 
prefers_large_types=runner_conf.use_large_var_types
             )
             return_type_size = len(return_type)
 
@@ -2536,7 +2539,7 @@ def read_udtf(pickleSer, infile, eval_type, runner_conf):
             import pyarrow as pa
 
             arrow_return_type = to_arrow_type(
-                return_type, 
prefers_large_types=runner_conf.use_large_var_types
+                return_type, timezone="UTC", 
prefers_large_types=runner_conf.use_large_var_types
             )
             return_type_size = len(return_type)
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to