kevinjqliu commented on code in PR #1452: URL: https://github.com/apache/iceberg-python/pull/1452#discussion_r1909263268
########## pyiceberg/io/pyarrow.py: ########## @@ -2234,7 +2235,9 @@ def data_file_statistics_from_parquet_metadata( ) -def write_file(io: FileIO, table_metadata: TableMetadata, tasks: Iterator[WriteTask]) -> Iterator[DataFile]: +def write_file( + io: FileIO, location_provider: LocationProvider, table_metadata: TableMetadata, tasks: Iterator[WriteTask] +) -> Iterator[DataFile]: from pyiceberg.table import DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE, TableProperties Review Comment: we might want `location_provider: LocationProvider` last for backwards compatibility ########## tests/integration/test_writes/test_writes.py: ########## @@ -285,6 +285,33 @@ def test_data_files(spark: SparkSession, session_catalog: Catalog, arrow_table_w assert [row.deleted_data_files_count for row in rows] == [0, 1, 0, 0, 0] +@pytest.mark.integration +@pytest.mark.parametrize("format_version", [1, 2]) +def test_object_storage_data_files( Review Comment: wdyt something like, ```suggestion def test_object_storage_data_files_paths_hash_entropy( ``` ########## tests/integration/test_writes/test_partitioned_writes.py: ########## @@ -280,6 +280,43 @@ def test_query_filter_v1_v2_append_null( assert df.where(f"{col} is null").count() == 2, f"Expected 2 null rows for {col}" +@pytest.mark.integration +@pytest.mark.parametrize( + "part_col", ["int", "bool", "string", "string_long", "long", "float", "double", "date", "timestamp", "timestamptz", "binary"] +) +@pytest.mark.parametrize("format_version", [1, 2]) +def test_object_storage_excludes_partition( + session_catalog: Catalog, spark: SparkSession, arrow_table_with_null: pa.Table, part_col: str, format_version: int +) -> None: + nested_field = TABLE_SCHEMA.find_field(part_col) + partition_spec = PartitionSpec( + PartitionField(source_id=nested_field.field_id, field_id=1001, transform=IdentityTransform(), name=part_col) + ) + + tbl = _create_table( + session_catalog=session_catalog, + identifier=f"default.arrow_table_v{format_version}_with_null_partitioned_on_col_{part_col}", + properties={"format-version": str(format_version), "write.object-storage.enabled": True}, Review Comment: nit: use the constant ########## tests/integration/test_writes/test_partitioned_writes.py: ########## @@ -280,6 +280,43 @@ def test_query_filter_v1_v2_append_null( assert df.where(f"{col} is null").count() == 2, f"Expected 2 null rows for {col}" +@pytest.mark.integration +@pytest.mark.parametrize( + "part_col", ["int", "bool", "string", "string_long", "long", "float", "double", "date", "timestamp", "timestamptz", "binary"] +) +@pytest.mark.parametrize("format_version", [1, 2]) +def test_object_storage_excludes_partition( + session_catalog: Catalog, spark: SparkSession, arrow_table_with_null: pa.Table, part_col: str, format_version: int +) -> None: + nested_field = TABLE_SCHEMA.find_field(part_col) + partition_spec = PartitionSpec( + PartitionField(source_id=nested_field.field_id, field_id=1001, transform=IdentityTransform(), name=part_col) + ) + + tbl = _create_table( + session_catalog=session_catalog, + identifier=f"default.arrow_table_v{format_version}_with_null_partitioned_on_col_{part_col}", + properties={"format-version": str(format_version), "write.object-storage.enabled": True}, Review Comment: also mention in test that `write.object-storage.partitioned-paths` defaults to True ########## tests/integration/test_writes/test_partitioned_writes.py: ########## @@ -280,6 +280,43 @@ def test_query_filter_v1_v2_append_null( assert df.where(f"{col} is null").count() == 2, f"Expected 2 null rows for {col}" +@pytest.mark.integration +@pytest.mark.parametrize( + "part_col", ["int", "bool", "string", "string_long", "long", "float", "double", "date", "timestamp", "timestamptz", "binary"] +) +@pytest.mark.parametrize("format_version", [1, 2]) +def test_object_storage_excludes_partition( Review Comment: nit: ```suggestion def test_object_storage_location_provider_excludes_partition_path( ``` ########## tests/table/test_locations.py: ########## @@ -0,0 +1,123 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from typing import Optional + +import pytest + +from pyiceberg.partitioning import PartitionField, PartitionFieldValue, PartitionKey, PartitionSpec +from pyiceberg.schema import Schema +from pyiceberg.table.locations import LocationProvider, load_location_provider +from pyiceberg.transforms import IdentityTransform +from pyiceberg.typedef import EMPTY_DICT +from pyiceberg.types import NestedField, StringType + +PARTITION_FIELD = PartitionField(source_id=1, field_id=1002, transform=IdentityTransform(), name="string_field") +PARTITION_KEY = PartitionKey( + raw_partition_field_values=[PartitionFieldValue(PARTITION_FIELD, "example_string")], + partition_spec=PartitionSpec(PARTITION_FIELD), + schema=Schema(NestedField(field_id=1, name="string_field", field_type=StringType(), required=False)), +) + + +class CustomLocationProvider(LocationProvider): + def new_data_location(self, data_file_name: str, partition_key: Optional[PartitionKey] = None) -> str: + return f"custom_location_provider/{data_file_name}" + + +def test_default_location_provider() -> None: + provider = load_location_provider(table_location="table_location", table_properties=EMPTY_DICT) + + assert provider.new_data_location("my_file") == "table_location/data/my_file" + + +def test_custom_location_provider() -> None: + qualified_name = CustomLocationProvider.__module__ + "." + CustomLocationProvider.__name__ + provider = load_location_provider( + table_location="table_location", table_properties={"write.location-provider.impl": qualified_name} + ) + + assert provider.new_data_location("my_file") == "custom_location_provider/my_file" + + +def test_custom_location_provider_single_path() -> None: + with pytest.raises(ValueError, match=r"write\.location-provider\.impl should be full path"): + load_location_provider(table_location="table_location", table_properties={"write.location-provider.impl": "not_found"}) + + +def test_custom_location_provider_not_found() -> None: + with pytest.raises(ValueError, match=r"Could not initialize LocationProvider"): + load_location_provider( + table_location="table_location", table_properties={"write.location-provider.impl": "module.not_found"} + ) + + +def test_object_storage_injects_entropy() -> None: + provider = load_location_provider(table_location="table_location", table_properties={"write.object-storage.enabled": "true"}) + + location = provider.new_data_location("test.parquet") + parts = location.split("/") + + assert len(parts) == 7 + assert parts[0] == "table_location" + assert parts[1] == "data" + # Entropy directories in the middle Review Comment: since this test is called `test_object_storage_injects_entropy` should we test the entropy part? similar to ``` # Entropy binary directories should have been injected for dir_name in parts[6:10]: assert dir_name assert all(c in "01" for c in dir_name) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org