kevinjqliu commented on code in PR #1443: URL: https://github.com/apache/iceberg-python/pull/1443#discussion_r1901382763
########## pyiceberg/io/pyarrow.py: ########## @@ -1237,16 +1257,20 @@ def _task_to_record_batches( # When V3 support is introduced, we will update `downcast_ns_timestamp_to_us` flag based on # the table format version. file_schema = pyarrow_to_schema(physical_schema, name_mapping, downcast_ns_timestamp_to_us=True) + pyarrow_filter = None if bound_row_filter is not AlwaysTrue(): translated_row_filter = translate_column_names(bound_row_filter, file_schema, case_sensitive=case_sensitive) bound_file_filter = bind(file_schema, translated_row_filter, case_sensitive=case_sensitive) pyarrow_filter = expression_to_pyarrow(bound_file_filter) + # Apply column projection rules for missing partitions and default values + # https://iceberg.apache.org/spec/#column-projection file_project_schema = prune_columns(file_schema, projected_field_ids, select_full_types=False) - if file_schema is None: - raise ValueError(f"Missing Iceberg schema in Metadata for file: {path}") + projected_missing_fields = _get_column_projection_values( Review Comment: nit: add a if statement here for `should_project_columns` based on `projected_field_ids.difference(file_project_schema.field_ids)`. I find this easier to read when looking at this function similar we can use `should_project_columns` to gate on whether we need to set columns in the resulting record batches ########## tests/io/test_pyarrow.py: ########## @@ -1122,6 +1127,63 @@ def test_projection_concat_files(schema_int: Schema, file_int: str) -> None: assert repr(result_table.schema) == "id: int32" +def test_projection_partition_inference(tmp_path: str, catalog: InMemoryCatalog) -> None: + schema = Schema( + NestedField(1, "other_field", StringType(), required=False), NestedField(2, "partition_id", IntegerType(), required=False) + ) + + partition_spec = PartitionSpec(PartitionField(2, 1000, IdentityTransform(), "partition_id")) + + table = catalog.create_table( + "default.test_projection_partition_inference", + schema=schema, + partition_spec=partition_spec, + properties={TableProperties.DEFAULT_NAME_MAPPING: create_mapping_from_schema(schema).model_dump_json()}, + ) + + file_data = pa.array(["foo"], type=pa.string()) + file_loc = f"{tmp_path}/test.parquet" + pq.write_table(pa.table([file_data], names=["other_field"]), file_loc) + + statistics = data_file_statistics_from_parquet_metadata( + parquet_metadata=pq.read_metadata(file_loc), + stats_columns=compute_statistics_plan(table.schema(), table.metadata.properties), + parquet_column_mapping=parquet_path_to_id_mapping(table.schema()), + ) + + unpartitioned_file = DataFile( + content=DataFileContent.DATA, + file_path=file_loc, + file_format=FileFormat.PARQUET, + partition=Record(partition_id=1), + file_size_in_bytes=os.path.getsize(file_loc), + sort_order_id=None, + spec_id=table.metadata.default_spec_id, + equality_ids=None, + key_metadata=None, + **statistics.to_serialized_dict(), + ) + + with table.transaction() as transaction: + with transaction.update_snapshot().overwrite() as update: + update.append_data_file(unpartitioned_file) Review Comment: nit: what do you think about generating the data file with a regular unpartitioned table write and then registering the file in a new partitioned table? something like ``` schema = Schema( NestedField(1, "other_field", StringType(), required=False) ) schema_with_partition_field = Schema( NestedField(1, "other_field", StringType(), required=False), NestedField(2, "partition_id", IntegerType(), required=False) ) unpartitioned_table = catalog.create_table( "default.unpartitioned_table", schema=schema, ) dummy_data = pa.table(pa.array(["foo"], type=pa.string())) unpartitioned_table.append(dummy_data) data_files = unpartitioned_table.inspect.files().to_pylist().pop()["file_path"] assert len(data_files) == 1 partitioned_table = catalog.create_table( "default.partitioned_table", schema=schema_with_partition_field, ) partitioned_table.add_files(file_paths=data_files) ``` ########## pyiceberg/io/pyarrow.py: ########## @@ -1286,14 +1310,20 @@ def _task_to_record_batches( continue output_batches = arrow_table.to_batches() for output_batch in output_batches: - yield _to_requested_schema( + result_batch = _to_requested_schema( projected_schema, file_project_schema, output_batch, downcast_ns_timestamp_to_us=True, use_large_types=use_large_types, ) + # Inject projected column values if available + for name, value in projected_missing_fields.items(): + result_batch = result_batch.set_column(result_batch.schema.get_field_index(name), name, [value]) Review Comment: nit: [`set_column`](https://arrow.apache.org/docs/python/generated/pyarrow.Table.html#pyarrow.Table.set_column) will overwrite here. might be a good idea to check that the value dont not already exists -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org