kevinjqliu commented on code in PR #1443:
URL: https://github.com/apache/iceberg-python/pull/1443#discussion_r1922764225


##########
pyiceberg/io/pyarrow.py:
##########
@@ -1237,16 +1265,26 @@ def _task_to_record_batches(
         # When V3 support is introduced, we will update 
`downcast_ns_timestamp_to_us` flag based on
         # the table format version.
         file_schema = pyarrow_to_schema(physical_schema, name_mapping, 
downcast_ns_timestamp_to_us=True)
+
         pyarrow_filter = None
         if bound_row_filter is not AlwaysTrue():
             translated_row_filter = translate_column_names(bound_row_filter, 
file_schema, case_sensitive=case_sensitive)
             bound_file_filter = bind(file_schema, translated_row_filter, 
case_sensitive=case_sensitive)
             pyarrow_filter = expression_to_pyarrow(bound_file_filter)
 
+        # Apply column projection rules for missing partitions and default 
values

Review Comment:
   ```suggestion
           # Apply column projection rules
   ```
   i dont think so support default values yet



##########
tests/io/test_pyarrow.py:
##########
@@ -1122,6 +1127,127 @@ def test_projection_concat_files(schema_int: Schema, 
file_int: str) -> None:
     assert repr(result_table.schema) == "id: int32"
 
 
+def test_projection_single_partition_value(tmp_path: str, catalog: 
InMemoryCatalog) -> None:
+    # Test by adding a non-partitioned data file to a partitioned table, 
verifying partition value projection from manifest metadata.
+    # TODO: Update to use a data file created by writing data to an 
unpartitioned table once add_files supports field IDs.
+    # (context: 
https://github.com/apache/iceberg-python/pull/1443#discussion_r1901374875)
+
+    schema = Schema(
+        NestedField(1, "other_field", StringType(), required=False), 
NestedField(2, "partition_id", IntegerType(), required=False)
+    )
+
+    partition_spec = PartitionSpec(
+        PartitionField(2, 1000, IdentityTransform(), "partition_id"),
+    )
+
+    table = catalog.create_table(
+        "default.test_projection_partition",
+        schema=schema,
+        partition_spec=partition_spec,
+        properties={TableProperties.DEFAULT_NAME_MAPPING: 
create_mapping_from_schema(schema).model_dump_json()},
+    )
+
+    file_data = pa.array(["foo"], type=pa.string())
+    file_loc = f"{tmp_path}/test.parquet"
+    pq.write_table(pa.table([file_data], names=["other_field"]), file_loc)
+
+    statistics = data_file_statistics_from_parquet_metadata(
+        parquet_metadata=pq.read_metadata(file_loc),
+        stats_columns=compute_statistics_plan(table.schema(), 
table.metadata.properties),
+        parquet_column_mapping=parquet_path_to_id_mapping(table.schema()),
+    )
+
+    unpartitioned_file = DataFile(
+        content=DataFileContent.DATA,
+        file_path=file_loc,
+        file_format=FileFormat.PARQUET,
+        partition=Record(partition_id=1),

Review Comment:
   is `1` a significant here? unpartitioned data file should have empty 
`Record()`
   
https://github.com/apache/iceberg-python/blob/b15934d5a9e6bf97b047f63239cc21ba1c15cdd4/pyiceberg/io/pyarrow.py#L2341



##########
pyiceberg/io/pyarrow.py:
##########
@@ -1237,16 +1265,26 @@ def _task_to_record_batches(
         # When V3 support is introduced, we will update 
`downcast_ns_timestamp_to_us` flag based on
         # the table format version.
         file_schema = pyarrow_to_schema(physical_schema, name_mapping, 
downcast_ns_timestamp_to_us=True)
+
         pyarrow_filter = None
         if bound_row_filter is not AlwaysTrue():
             translated_row_filter = translate_column_names(bound_row_filter, 
file_schema, case_sensitive=case_sensitive)
             bound_file_filter = bind(file_schema, translated_row_filter, 
case_sensitive=case_sensitive)
             pyarrow_filter = expression_to_pyarrow(bound_file_filter)
 
+        # Apply column projection rules for missing partitions and default 
values
+        # https://iceberg.apache.org/spec/#column-projection
         file_project_schema = prune_columns(file_schema, projected_field_ids, 
select_full_types=False)
 
-        if file_schema is None:
-            raise ValueError(f"Missing Iceberg schema in Metadata for file: 
{path}")
+        project_schema_diff = 
projected_field_ids.difference(file_project_schema.field_ids)
+        should_project_columns = len(project_schema_diff) > 0
+
+        projected_missing_fields = {}
+
+        if should_project_columns and partition_spec is not None:
+            projected_missing_fields = _get_column_projection_values(
+                task.file, projected_schema, project_schema_diff, 
partition_spec
+            )

Review Comment:
   Nit: wdyt about structuring the code like this? 
   
   ```suggestion
           should_project_columns, projected_missing_fields = 
_get_column_projection_values(
                   task.file, projected_schema, partition_spec
               )
   ```
   
   and in `_get_column_projection_values`, move the rest of the logic
   
   ```
   def _get_column_projection_values(...):
           project_schema_diff = 
projected_field_ids.difference(file_project_schema.field_ids)
           should_project_columns = len(project_schema_diff) > 0
           projected_missing_fields = {}
           if not should_project_columns:
                   return False, {}         
           ...
   ```



##########
pyiceberg/io/pyarrow.py:
##########
@@ -1216,6 +1217,32 @@ def _field_id(self, field: pa.Field) -> int:
         return -1
 
 
+def _get_column_projection_values(
+    file: DataFile,
+    projected_schema: Schema,
+    project_schema_diff: Set[int],
+    partition_spec: PartitionSpec,
+) -> Dict[str, object]:

Review Comment:
   ```suggestion
   ) -> Dict[str, Any]:
   ```
   



##########
tests/io/test_pyarrow.py:
##########
@@ -1122,6 +1127,127 @@ def test_projection_concat_files(schema_int: Schema, 
file_int: str) -> None:
     assert repr(result_table.schema) == "id: int32"
 
 
+def test_projection_single_partition_value(tmp_path: str, catalog: 
InMemoryCatalog) -> None:

Review Comment:
   ```suggestion
   def test_identity_transform_column_projection(tmp_path: str, catalog: 
InMemoryCatalog) -> None:
   ```



##########
tests/io/test_pyarrow.py:
##########
@@ -1122,6 +1127,127 @@ def test_projection_concat_files(schema_int: Schema, 
file_int: str) -> None:
     assert repr(result_table.schema) == "id: int32"
 
 
+def test_projection_single_partition_value(tmp_path: str, catalog: 
InMemoryCatalog) -> None:
+    # Test by adding a non-partitioned data file to a partitioned table, 
verifying partition value projection from manifest metadata.
+    # TODO: Update to use a data file created by writing data to an 
unpartitioned table once add_files supports field IDs.
+    # (context: 
https://github.com/apache/iceberg-python/pull/1443#discussion_r1901374875)
+
+    schema = Schema(
+        NestedField(1, "other_field", StringType(), required=False), 
NestedField(2, "partition_id", IntegerType(), required=False)
+    )
+
+    partition_spec = PartitionSpec(
+        PartitionField(2, 1000, IdentityTransform(), "partition_id"),
+    )
+
+    table = catalog.create_table(
+        "default.test_projection_partition",
+        schema=schema,
+        partition_spec=partition_spec,
+        properties={TableProperties.DEFAULT_NAME_MAPPING: 
create_mapping_from_schema(schema).model_dump_json()},
+    )
+
+    file_data = pa.array(["foo"], type=pa.string())
+    file_loc = f"{tmp_path}/test.parquet"
+    pq.write_table(pa.table([file_data], names=["other_field"]), file_loc)
+
+    statistics = data_file_statistics_from_parquet_metadata(
+        parquet_metadata=pq.read_metadata(file_loc),
+        stats_columns=compute_statistics_plan(table.schema(), 
table.metadata.properties),
+        parquet_column_mapping=parquet_path_to_id_mapping(table.schema()),
+    )
+
+    unpartitioned_file = DataFile(
+        content=DataFileContent.DATA,
+        file_path=file_loc,
+        file_format=FileFormat.PARQUET,
+        partition=Record(partition_id=1),
+        file_size_in_bytes=os.path.getsize(file_loc),
+        sort_order_id=None,
+        spec_id=table.metadata.default_spec_id,
+        equality_ids=None,
+        key_metadata=None,
+        **statistics.to_serialized_dict(),
+    )
+
+    with table.transaction() as transaction:
+        with transaction.update_snapshot().overwrite() as update:
+            update.append_data_file(unpartitioned_file)
+
+    assert (
+        str(table.scan().to_arrow())
+        == """pyarrow.Table
+other_field: large_string
+partition_id: int64
+----
+other_field: [["foo"]]
+partition_id: [[1]]"""
+    )
+
+
+def test_projection_multiple_partition_values(tmp_path: str, catalog: 
InMemoryCatalog) -> None:
+    # Test by adding a non-partitioned data file to a multi-partitioned table, 
verifying partition value projection from manifest metadata.
+    # TODO: Update to use a data file created by writing data to an 
unpartitioned table once add_files supports field IDs.
+    # (context: 
https://github.com/apache/iceberg-python/pull/1443#discussion_r1901374875)
+    schema = Schema(
+        NestedField(1, "other_field", StringType(), required=False), 
NestedField(2, "partition_id", IntegerType(), required=False)
+    )
+
+    partition_spec = PartitionSpec(
+        PartitionField(2, 1000, VoidTransform(), "void_partition_id"),
+        PartitionField(2, 1001, IdentityTransform(), "partition_id"),
+    )

Review Comment:
   i think we'd want to test multiple `IdentityTransform`s here. 
   
   im thinking about a case for multiple-level of partitioning in hive-style.
   ```
   s3://my_table/a=100/b=foo/...parquet
   ```
   
   i think `_get_column_projection_values` might not support this right now
   



##########
tests/io/test_pyarrow.py:
##########
@@ -1122,6 +1127,127 @@ def test_projection_concat_files(schema_int: Schema, 
file_int: str) -> None:
     assert repr(result_table.schema) == "id: int32"
 
 
+def test_projection_single_partition_value(tmp_path: str, catalog: 
InMemoryCatalog) -> None:
+    # Test by adding a non-partitioned data file to a partitioned table, 
verifying partition value projection from manifest metadata.
+    # TODO: Update to use a data file created by writing data to an 
unpartitioned table once add_files supports field IDs.
+    # (context: 
https://github.com/apache/iceberg-python/pull/1443#discussion_r1901374875)
+
+    schema = Schema(
+        NestedField(1, "other_field", StringType(), required=False), 
NestedField(2, "partition_id", IntegerType(), required=False)
+    )
+
+    partition_spec = PartitionSpec(
+        PartitionField(2, 1000, IdentityTransform(), "partition_id"),
+    )
+
+    table = catalog.create_table(
+        "default.test_projection_partition",
+        schema=schema,
+        partition_spec=partition_spec,
+        properties={TableProperties.DEFAULT_NAME_MAPPING: 
create_mapping_from_schema(schema).model_dump_json()},
+    )
+
+    file_data = pa.array(["foo"], type=pa.string())
+    file_loc = f"{tmp_path}/test.parquet"
+    pq.write_table(pa.table([file_data], names=["other_field"]), file_loc)
+
+    statistics = data_file_statistics_from_parquet_metadata(
+        parquet_metadata=pq.read_metadata(file_loc),
+        stats_columns=compute_statistics_plan(table.schema(), 
table.metadata.properties),
+        parquet_column_mapping=parquet_path_to_id_mapping(table.schema()),
+    )
+
+    unpartitioned_file = DataFile(
+        content=DataFileContent.DATA,
+        file_path=file_loc,
+        file_format=FileFormat.PARQUET,
+        partition=Record(partition_id=1),

Review Comment:
   ah, this is the projected value. can you add a comment so this is clear
   
   https://iceberg.apache.org/spec/#column-projection
   ```
   Return the value from partition metadata if an Identity Transform exists for 
the field and the partition value is present in the partition struct on 
data_file object in the manifest. This allows for metadata only migrations of 
Hive tables.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to