kevinjqliu commented on code in PR #1443:
URL: https://github.com/apache/iceberg-python/pull/1443#discussion_r1901382763


##########
pyiceberg/io/pyarrow.py:
##########
@@ -1237,16 +1257,20 @@ def _task_to_record_batches(
         # When V3 support is introduced, we will update 
`downcast_ns_timestamp_to_us` flag based on
         # the table format version.
         file_schema = pyarrow_to_schema(physical_schema, name_mapping, 
downcast_ns_timestamp_to_us=True)
+
         pyarrow_filter = None
         if bound_row_filter is not AlwaysTrue():
             translated_row_filter = translate_column_names(bound_row_filter, 
file_schema, case_sensitive=case_sensitive)
             bound_file_filter = bind(file_schema, translated_row_filter, 
case_sensitive=case_sensitive)
             pyarrow_filter = expression_to_pyarrow(bound_file_filter)
 
+        # Apply column projection rules for missing partitions and default 
values
+        # https://iceberg.apache.org/spec/#column-projection
         file_project_schema = prune_columns(file_schema, projected_field_ids, 
select_full_types=False)
 
-        if file_schema is None:
-            raise ValueError(f"Missing Iceberg schema in Metadata for file: 
{path}")
+        projected_missing_fields = _get_column_projection_values(

Review Comment:
   nit: add a if statement here for `should_project_columns` based on  
`projected_field_ids.difference(file_project_schema.field_ids)`. 
   
   I find this easier to read when looking at this function
   
   similar we can use `should_project_columns` to gate on whether we need to 
set columns in the resulting record batches



##########
tests/io/test_pyarrow.py:
##########
@@ -1122,6 +1127,63 @@ def test_projection_concat_files(schema_int: Schema, 
file_int: str) -> None:
     assert repr(result_table.schema) == "id: int32"
 
 
+def test_projection_partition_inference(tmp_path: str, catalog: 
InMemoryCatalog) -> None:
+    schema = Schema(
+        NestedField(1, "other_field", StringType(), required=False), 
NestedField(2, "partition_id", IntegerType(), required=False)
+    )
+
+    partition_spec = PartitionSpec(PartitionField(2, 1000, 
IdentityTransform(), "partition_id"))
+
+    table = catalog.create_table(
+        "default.test_projection_partition_inference",
+        schema=schema,
+        partition_spec=partition_spec,
+        properties={TableProperties.DEFAULT_NAME_MAPPING: 
create_mapping_from_schema(schema).model_dump_json()},
+    )
+
+    file_data = pa.array(["foo"], type=pa.string())
+    file_loc = f"{tmp_path}/test.parquet"
+    pq.write_table(pa.table([file_data], names=["other_field"]), file_loc)
+
+    statistics = data_file_statistics_from_parquet_metadata(
+        parquet_metadata=pq.read_metadata(file_loc),
+        stats_columns=compute_statistics_plan(table.schema(), 
table.metadata.properties),
+        parquet_column_mapping=parquet_path_to_id_mapping(table.schema()),
+    )
+
+    unpartitioned_file = DataFile(
+        content=DataFileContent.DATA,
+        file_path=file_loc,
+        file_format=FileFormat.PARQUET,
+        partition=Record(partition_id=1),
+        file_size_in_bytes=os.path.getsize(file_loc),
+        sort_order_id=None,
+        spec_id=table.metadata.default_spec_id,
+        equality_ids=None,
+        key_metadata=None,
+        **statistics.to_serialized_dict(),
+    )
+
+    with table.transaction() as transaction:
+        with transaction.update_snapshot().overwrite() as update:
+            update.append_data_file(unpartitioned_file)

Review Comment:
   nit: what do you think about generating the data file with a regular 
unpartitioned table write and then registering the file in a new partitioned 
table?
   something like
   ```
   schema = Schema(
       NestedField(1, "other_field", StringType(), required=False)
   )
   schema_with_partition_field = Schema(
       NestedField(1, "other_field", StringType(), required=False), 
NestedField(2, "partition_id", IntegerType(), required=False)
   )
   
   unpartitioned_table = catalog.create_table(
       "default.unpartitioned_table",
       schema=schema,
   )
   dummy_data = pa.table(pa.array(["foo"], type=pa.string()))
   unpartitioned_table.append(dummy_data)
   data_files = 
unpartitioned_table.inspect.files().to_pylist().pop()["file_path"]
   assert len(data_files) == 1
   
   partitioned_table = catalog.create_table(
       "default.partitioned_table",
       schema=schema_with_partition_field,
   )
   
   partitioned_table.add_files(file_paths=data_files)
   ```



##########
pyiceberg/io/pyarrow.py:
##########
@@ -1286,14 +1310,20 @@ def _task_to_record_batches(
                         continue
                     output_batches = arrow_table.to_batches()
             for output_batch in output_batches:
-                yield _to_requested_schema(
+                result_batch = _to_requested_schema(
                     projected_schema,
                     file_project_schema,
                     output_batch,
                     downcast_ns_timestamp_to_us=True,
                     use_large_types=use_large_types,
                 )
 
+                # Inject projected column values if available
+                for name, value in projected_missing_fields.items():
+                    result_batch = 
result_batch.set_column(result_batch.schema.get_field_index(name), name, 
[value])

Review Comment:
   nit: 
[`set_column`](https://arrow.apache.org/docs/python/generated/pyarrow.Table.html#pyarrow.Table.set_column)
 will overwrite here. might be a good idea to check that the value dont not 
already exists



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to