kevinjqliu commented on code in PR #1443: URL: https://github.com/apache/iceberg-python/pull/1443#discussion_r1922764225
########## pyiceberg/io/pyarrow.py: ########## @@ -1237,16 +1265,26 @@ def _task_to_record_batches( # When V3 support is introduced, we will update `downcast_ns_timestamp_to_us` flag based on # the table format version. file_schema = pyarrow_to_schema(physical_schema, name_mapping, downcast_ns_timestamp_to_us=True) + pyarrow_filter = None if bound_row_filter is not AlwaysTrue(): translated_row_filter = translate_column_names(bound_row_filter, file_schema, case_sensitive=case_sensitive) bound_file_filter = bind(file_schema, translated_row_filter, case_sensitive=case_sensitive) pyarrow_filter = expression_to_pyarrow(bound_file_filter) + # Apply column projection rules for missing partitions and default values Review Comment: ```suggestion # Apply column projection rules ``` i dont think so support default values yet ########## tests/io/test_pyarrow.py: ########## @@ -1122,6 +1127,127 @@ def test_projection_concat_files(schema_int: Schema, file_int: str) -> None: assert repr(result_table.schema) == "id: int32" +def test_projection_single_partition_value(tmp_path: str, catalog: InMemoryCatalog) -> None: + # Test by adding a non-partitioned data file to a partitioned table, verifying partition value projection from manifest metadata. + # TODO: Update to use a data file created by writing data to an unpartitioned table once add_files supports field IDs. + # (context: https://github.com/apache/iceberg-python/pull/1443#discussion_r1901374875) + + schema = Schema( + NestedField(1, "other_field", StringType(), required=False), NestedField(2, "partition_id", IntegerType(), required=False) + ) + + partition_spec = PartitionSpec( + PartitionField(2, 1000, IdentityTransform(), "partition_id"), + ) + + table = catalog.create_table( + "default.test_projection_partition", + schema=schema, + partition_spec=partition_spec, + properties={TableProperties.DEFAULT_NAME_MAPPING: create_mapping_from_schema(schema).model_dump_json()}, + ) + + file_data = pa.array(["foo"], type=pa.string()) + file_loc = f"{tmp_path}/test.parquet" + pq.write_table(pa.table([file_data], names=["other_field"]), file_loc) + + statistics = data_file_statistics_from_parquet_metadata( + parquet_metadata=pq.read_metadata(file_loc), + stats_columns=compute_statistics_plan(table.schema(), table.metadata.properties), + parquet_column_mapping=parquet_path_to_id_mapping(table.schema()), + ) + + unpartitioned_file = DataFile( + content=DataFileContent.DATA, + file_path=file_loc, + file_format=FileFormat.PARQUET, + partition=Record(partition_id=1), Review Comment: is `1` a significant here? unpartitioned data file should have empty `Record()` https://github.com/apache/iceberg-python/blob/b15934d5a9e6bf97b047f63239cc21ba1c15cdd4/pyiceberg/io/pyarrow.py#L2341 ########## pyiceberg/io/pyarrow.py: ########## @@ -1237,16 +1265,26 @@ def _task_to_record_batches( # When V3 support is introduced, we will update `downcast_ns_timestamp_to_us` flag based on # the table format version. file_schema = pyarrow_to_schema(physical_schema, name_mapping, downcast_ns_timestamp_to_us=True) + pyarrow_filter = None if bound_row_filter is not AlwaysTrue(): translated_row_filter = translate_column_names(bound_row_filter, file_schema, case_sensitive=case_sensitive) bound_file_filter = bind(file_schema, translated_row_filter, case_sensitive=case_sensitive) pyarrow_filter = expression_to_pyarrow(bound_file_filter) + # Apply column projection rules for missing partitions and default values + # https://iceberg.apache.org/spec/#column-projection file_project_schema = prune_columns(file_schema, projected_field_ids, select_full_types=False) - if file_schema is None: - raise ValueError(f"Missing Iceberg schema in Metadata for file: {path}") + project_schema_diff = projected_field_ids.difference(file_project_schema.field_ids) + should_project_columns = len(project_schema_diff) > 0 + + projected_missing_fields = {} + + if should_project_columns and partition_spec is not None: + projected_missing_fields = _get_column_projection_values( + task.file, projected_schema, project_schema_diff, partition_spec + ) Review Comment: Nit: wdyt about structuring the code like this? ```suggestion should_project_columns, projected_missing_fields = _get_column_projection_values( task.file, projected_schema, partition_spec ) ``` and in `_get_column_projection_values`, move the rest of the logic ``` def _get_column_projection_values(...): project_schema_diff = projected_field_ids.difference(file_project_schema.field_ids) should_project_columns = len(project_schema_diff) > 0 projected_missing_fields = {} if not should_project_columns: return False, {} ... ``` ########## pyiceberg/io/pyarrow.py: ########## @@ -1216,6 +1217,32 @@ def _field_id(self, field: pa.Field) -> int: return -1 +def _get_column_projection_values( + file: DataFile, + projected_schema: Schema, + project_schema_diff: Set[int], + partition_spec: PartitionSpec, +) -> Dict[str, object]: Review Comment: ```suggestion ) -> Dict[str, Any]: ``` ########## tests/io/test_pyarrow.py: ########## @@ -1122,6 +1127,127 @@ def test_projection_concat_files(schema_int: Schema, file_int: str) -> None: assert repr(result_table.schema) == "id: int32" +def test_projection_single_partition_value(tmp_path: str, catalog: InMemoryCatalog) -> None: Review Comment: ```suggestion def test_identity_transform_column_projection(tmp_path: str, catalog: InMemoryCatalog) -> None: ``` ########## tests/io/test_pyarrow.py: ########## @@ -1122,6 +1127,127 @@ def test_projection_concat_files(schema_int: Schema, file_int: str) -> None: assert repr(result_table.schema) == "id: int32" +def test_projection_single_partition_value(tmp_path: str, catalog: InMemoryCatalog) -> None: + # Test by adding a non-partitioned data file to a partitioned table, verifying partition value projection from manifest metadata. + # TODO: Update to use a data file created by writing data to an unpartitioned table once add_files supports field IDs. + # (context: https://github.com/apache/iceberg-python/pull/1443#discussion_r1901374875) + + schema = Schema( + NestedField(1, "other_field", StringType(), required=False), NestedField(2, "partition_id", IntegerType(), required=False) + ) + + partition_spec = PartitionSpec( + PartitionField(2, 1000, IdentityTransform(), "partition_id"), + ) + + table = catalog.create_table( + "default.test_projection_partition", + schema=schema, + partition_spec=partition_spec, + properties={TableProperties.DEFAULT_NAME_MAPPING: create_mapping_from_schema(schema).model_dump_json()}, + ) + + file_data = pa.array(["foo"], type=pa.string()) + file_loc = f"{tmp_path}/test.parquet" + pq.write_table(pa.table([file_data], names=["other_field"]), file_loc) + + statistics = data_file_statistics_from_parquet_metadata( + parquet_metadata=pq.read_metadata(file_loc), + stats_columns=compute_statistics_plan(table.schema(), table.metadata.properties), + parquet_column_mapping=parquet_path_to_id_mapping(table.schema()), + ) + + unpartitioned_file = DataFile( + content=DataFileContent.DATA, + file_path=file_loc, + file_format=FileFormat.PARQUET, + partition=Record(partition_id=1), + file_size_in_bytes=os.path.getsize(file_loc), + sort_order_id=None, + spec_id=table.metadata.default_spec_id, + equality_ids=None, + key_metadata=None, + **statistics.to_serialized_dict(), + ) + + with table.transaction() as transaction: + with transaction.update_snapshot().overwrite() as update: + update.append_data_file(unpartitioned_file) + + assert ( + str(table.scan().to_arrow()) + == """pyarrow.Table +other_field: large_string +partition_id: int64 +---- +other_field: [["foo"]] +partition_id: [[1]]""" + ) + + +def test_projection_multiple_partition_values(tmp_path: str, catalog: InMemoryCatalog) -> None: + # Test by adding a non-partitioned data file to a multi-partitioned table, verifying partition value projection from manifest metadata. + # TODO: Update to use a data file created by writing data to an unpartitioned table once add_files supports field IDs. + # (context: https://github.com/apache/iceberg-python/pull/1443#discussion_r1901374875) + schema = Schema( + NestedField(1, "other_field", StringType(), required=False), NestedField(2, "partition_id", IntegerType(), required=False) + ) + + partition_spec = PartitionSpec( + PartitionField(2, 1000, VoidTransform(), "void_partition_id"), + PartitionField(2, 1001, IdentityTransform(), "partition_id"), + ) Review Comment: i think we'd want to test multiple `IdentityTransform`s here. im thinking about a case for multiple-level of partitioning in hive-style. ``` s3://my_table/a=100/b=foo/...parquet ``` i think `_get_column_projection_values` might not support this right now ########## tests/io/test_pyarrow.py: ########## @@ -1122,6 +1127,127 @@ def test_projection_concat_files(schema_int: Schema, file_int: str) -> None: assert repr(result_table.schema) == "id: int32" +def test_projection_single_partition_value(tmp_path: str, catalog: InMemoryCatalog) -> None: + # Test by adding a non-partitioned data file to a partitioned table, verifying partition value projection from manifest metadata. + # TODO: Update to use a data file created by writing data to an unpartitioned table once add_files supports field IDs. + # (context: https://github.com/apache/iceberg-python/pull/1443#discussion_r1901374875) + + schema = Schema( + NestedField(1, "other_field", StringType(), required=False), NestedField(2, "partition_id", IntegerType(), required=False) + ) + + partition_spec = PartitionSpec( + PartitionField(2, 1000, IdentityTransform(), "partition_id"), + ) + + table = catalog.create_table( + "default.test_projection_partition", + schema=schema, + partition_spec=partition_spec, + properties={TableProperties.DEFAULT_NAME_MAPPING: create_mapping_from_schema(schema).model_dump_json()}, + ) + + file_data = pa.array(["foo"], type=pa.string()) + file_loc = f"{tmp_path}/test.parquet" + pq.write_table(pa.table([file_data], names=["other_field"]), file_loc) + + statistics = data_file_statistics_from_parquet_metadata( + parquet_metadata=pq.read_metadata(file_loc), + stats_columns=compute_statistics_plan(table.schema(), table.metadata.properties), + parquet_column_mapping=parquet_path_to_id_mapping(table.schema()), + ) + + unpartitioned_file = DataFile( + content=DataFileContent.DATA, + file_path=file_loc, + file_format=FileFormat.PARQUET, + partition=Record(partition_id=1), Review Comment: ah, this is the projected value. can you add a comment so this is clear https://iceberg.apache.org/spec/#column-projection ``` Return the value from partition metadata if an Identity Transform exists for the field and the partition value is present in the partition struct on data_file object in the manifest. This allows for metadata only migrations of Hive tables. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org