kevinjqliu commented on code in PR #1539:
URL: https://github.com/apache/iceberg-python/pull/1539#discussion_r1922707073


##########
tests/benchmark/test_benchmark.py:
##########
@@ -0,0 +1,72 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import statistics
+import timeit
+import urllib
+
+import pyarrow as pa
+import pyarrow.parquet as pq
+import pytest
+
+from pyiceberg.transforms import DayTransform
+
+
+@pytest.fixture(scope="session")
+def taxi_dataset(tmp_path_factory: pytest.TempPathFactory) -> pa.Table:
+    """Reads the Taxi dataset to disk"""
+    taxi_dataset = 
"https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2022-01.parquet";
+    taxi_dataset_dest = tmp_path_factory.mktemp("taxi_dataset") / 
"yellow_tripdata_2022-01.parquet"
+    urllib.request.urlretrieve(taxi_dataset, taxi_dataset_dest)
+
+    return pq.read_table(taxi_dataset_dest)
+
+
+@pytest.mark.benchmark

Review Comment:
   CI is failing since we dont have the new marker added here 
   
https://github.com/apache/iceberg-python/blob/b15934d5a9e6bf97b047f63239cc21ba1c15cdd4/pyproject.toml#L1216-L1223



##########
tests/integration/test_writes/test_partitioned_writes.py:
##########
@@ -1126,3 +1127,25 @@ def test_append_multiple_partitions(
         """
     )
     assert files_df.count() == 6
+
+
+@pytest.mark.integration
+def test_pyarrow_overflow(session_catalog: Catalog) -> None:
+    """Test what happens when the offset is beyond 32 bits"""
+    identifier = "default.arrow_table_overflow"
+    try:
+        session_catalog.drop_table(identifier=identifier)
+    except NoSuchTableError:
+        pass
+
+    x = pa.array([random.randint(0, 999) for _ in range(30_000)])
+    ta = pa.chunked_array([x] * 10_000)
+    y = ["fixed_string"] * 30_000
+    tb = pa.chunked_array([y] * 10_000)
+    # Create pa.table
+    arrow_table = pa.table({"a": ta, "b": tb})

Review Comment:
   it wasnt obv to me that this test offset is beyond 32 bits, but i ran it and 
`4800280000` is `>2^32`/`4294967296`
   ```
   >>> len(arrow_table)
   300000000
   >>> arrow_table.get_total_buffer_size()
   4800280000
   ```



##########
pyiceberg/io/pyarrow.py:
##########
@@ -2594,42 +2566,46 @@ def _determine_partitions(spec: PartitionSpec, schema: 
Schema, arrow_table: pa.T
     We then retrieve the partition keys by offsets.
     And slice the arrow table by offsets and lengths of each partition.
     """
-    partition_columns: List[Tuple[PartitionField, NestedField]] = [
-        (partition_field, schema.find_field(partition_field.source_id)) for 
partition_field in spec.fields
-    ]
-    partition_values_table = pa.table(
-        {
-            str(partition.field_id): 
partition.transform.pyarrow_transform(field.field_type)(arrow_table[field.name])
-            for partition, field in partition_columns
-        }
-    )
+    # Assign unique names to columns where the partition transform has been 
applied
+    # to avoid conflicts
+    partition_fields = [f"_partition_{field.name}" for field in spec.fields]
+
+    for partition, name in zip(spec.fields, partition_fields):
+        source_field = schema.find_field(partition.source_id)
+        arrow_table = arrow_table.append_column(
+            name, 
partition.transform.pyarrow_transform(source_field.field_type)(arrow_table[source_field.name])
+        )
+
+    unique_partition_fields = 
arrow_table.select(partition_fields).group_by(partition_fields).aggregate([])

Review Comment:
   from [`group_by` 
docs](https://arrow.apache.org/docs/python/generated/pyarrow.Table.html#pyarrow.Table.group_by)
   
   ```
   use_threadsbool, default True
   Whether to use multithreading or not. When set to True (the default), no 
stable ordering of the output is guaranteed.
   ```
   
   should we be concerned with stable ordering here? 



##########
pyiceberg/partitioning.py:
##########
@@ -425,8 +426,13 @@ def _to_partition_representation(type: IcebergType, value: 
Any) -> Any:
 
 @_to_partition_representation.register(TimestampType)
 @_to_partition_representation.register(TimestamptzType)
-def _(type: IcebergType, value: Optional[datetime]) -> Optional[int]:
-    return datetime_to_micros(value) if value is not None else None
+def _(type: IcebergType, value: Optional[Union[datetime, int]]) -> 
Optional[int]:
+    if value is None:
+        return None
+    elif isinstance(value, int):
+        return value

Review Comment:
   what can return int here? since this function is registered to 
   ```
   @_to_partition_representation.register(TimestampType)
   @_to_partition_representation.register(TimestamptzType)
   ```



##########
pyiceberg/io/pyarrow.py:
##########


Review Comment:
   the docstring for `_determine_partitions` is outdated with this change
   
   
https://github.com/apache/iceberg-python/pull/1539/files#diff-8d5e63f2a87ead8cebe2fd8ac5dcf2198d229f01e16bb9e06e21f7277c328abdR2547-R2568



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to