Fokko commented on code in PR #531:
URL: https://github.com/apache/iceberg-python/pull/531#discussion_r1529974666


##########
pyiceberg/io/pyarrow.py:
##########
@@ -1594,29 +1595,88 @@ def parquet_path_to_id_mapping(
     return result
 
 
-def fill_parquet_file_metadata(
-    data_file: DataFile,
+@dataclass
+class DataFileStatistics:
+    record_count: int
+    column_sizes: Dict[int, int]
+    value_counts: Dict[int, int]
+    null_value_counts: Dict[int, int]
+    nan_value_counts: Dict[int, int]
+    column_aggregates: Dict[int, StatsAggregator]
+    split_offsets: Optional[List[int]] = None

Review Comment:
   Any specific reason that the `split_offsets` is nullable in contrast to the 
rest.



##########
tests/integration/test_add_files.py:
##########
@@ -238,3 +239,109 @@ def 
test_add_files_to_unpartitioned_table_with_schema_updates(spark: SparkSessio
     for col in df.columns:
         value_count = 1 if col == "quux" else 6
         assert df.filter(df[col].isNotNull()).count() == value_count, 
f"Expected {value_count} rows to be non-null"
+
+
+@pytest.mark.integration
+def test_add_files_to_partitioned_table(spark: SparkSession, session_catalog: 
Catalog) -> None:

Review Comment:
   Can we parameterize this test to do both v1 and v2?



##########
pyiceberg/io/pyarrow.py:
##########
@@ -1594,29 +1595,88 @@ def parquet_path_to_id_mapping(
     return result
 
 
-def fill_parquet_file_metadata(
-    data_file: DataFile,
+@dataclass

Review Comment:
   Should we make it frozen?



##########
pyiceberg/manifest.py:
##########
@@ -389,6 +389,10 @@ def __eq__(self, other: Any) -> bool:
         """
         return self.file_path == other.file_path if isinstance(other, 
DataFile) else False
 
+    def update(self, other: Dict[str, Any]) -> None:

Review Comment:
   I like to avoid mutability. Looking at the code, I think we can also create 
the object at once:
   
   ```python
   data_file = DataFile(
           content=DataFileContent.DATA,
           file_path=file_path,
           file_format=FileFormat.PARQUET,
           partition=Record(),
           file_size_in_bytes=len(fo),
           # After this has been fixed:
           # https://github.com/apache/iceberg-python/issues/271
           # sort_order_id=task.sort_order_id,
           sort_order_id=None,
           # Just copy these from the table for now
           spec_id=table_metadata.default_spec_id,
           equality_ids=None,
           key_metadata=None,
           **statistics.to_serialized_dict()
       )
   ```



##########
pyiceberg/io/pyarrow.py:
##########
@@ -1594,29 +1595,88 @@ def parquet_path_to_id_mapping(
     return result
 
 
-def fill_parquet_file_metadata(
-    data_file: DataFile,
+@dataclass
+class DataFileStatistics:
+    record_count: int
+    column_sizes: Dict[int, int]
+    value_counts: Dict[int, int]
+    null_value_counts: Dict[int, int]
+    nan_value_counts: Dict[int, int]
+    column_aggregates: Dict[int, StatsAggregator]
+    split_offsets: Optional[List[int]] = None
+
+    def _partition_value(self, partition_field: PartitionField, schema: 
Schema) -> Any:
+        if partition_field.source_id not in self.column_aggregates:
+            return None
+
+        if not partition_field.transform.preserves_order:
+            raise ValueError(
+                f"Cannot infer partition value from parquet metadata for a 
non-linear Partition Field. {partition_field}"
+            )
+
+        lower_value = partition_record_value(
+            partition_field=partition_field,
+            
value=self.column_aggregates[partition_field.source_id].current_min,
+            schema=schema,
+        )
+        upper_value = partition_record_value(
+            partition_field=partition_field,
+            
value=self.column_aggregates[partition_field.source_id].current_max,
+            schema=schema,
+        )
+        if lower_value != upper_value:
+            raise ValueError(
+                f"Cannot infer partition value from parquet metadata as there 
are more than one partition values: {lower_value=}, {upper_value=}"

Review Comment:
   Can we add a test for this?



##########
pyiceberg/io/pyarrow.py:
##########
@@ -1594,29 +1595,88 @@ def parquet_path_to_id_mapping(
     return result
 
 
-def fill_parquet_file_metadata(

Review Comment:
   Removing this method will break daft: 
https://github.com/Eventual-Inc/Daft/pull/2016
   
   However, I wouldn't say I like that we assign this in place, so maybe we 
should go for it.



##########
tests/integration/test_add_files.py:
##########
@@ -238,3 +239,109 @@ def 
test_add_files_to_unpartitioned_table_with_schema_updates(spark: SparkSessio
     for col in df.columns:
         value_count = 1 if col == "quux" else 6
         assert df.filter(df[col].isNotNull()).count() == value_count, 
f"Expected {value_count} rows to be non-null"
+
+
+@pytest.mark.integration
+def test_add_files_to_partitioned_table(spark: SparkSession, session_catalog: 
Catalog) -> None:
+    identifier = "default.partitioned_table"
+
+    partition_spec = PartitionSpec(
+        PartitionField(source_id=4, field_id=1000, 
transform=IdentityTransform(), name="baz"),
+        PartitionField(source_id=10, field_id=1001, 
transform=MonthTransform(), name="qux_month"),
+        spec_id=0,
+    )
+
+    tbl = _create_table(session_catalog, identifier, partition_spec)
+
+    date_iter = iter([date(2024, 3, 7), date(2024, 3, 8), date(2024, 3, 16), 
date(2024, 3, 18), date(2024, 3, 19)])
+
+    file_paths = [f"s3://warehouse/default/partitioned/test-{i}.parquet" for i 
in range(5)]
+    # write parquet files
+    for file_path in file_paths:
+        fo = tbl.io.new_output(file_path)
+        with fo.create(overwrite=True) as fos:
+            with pq.ParquetWriter(fos, schema=ARROW_SCHEMA) as writer:
+                writer.write_table(
+                    pa.Table.from_pylist(
+                        [
+                            {
+                                "foo": True,
+                                "bar": "bar_string",
+                                "baz": 123,
+                                "qux": next(date_iter),
+                            }
+                        ],
+                        schema=ARROW_SCHEMA,
+                    )
+                )
+
+    # add the parquet files as data files
+    tbl.add_files(file_paths=file_paths)
+
+    # NameMapping must have been set to enable reads
+    assert tbl.name_mapping() is not None
+
+    rows = spark.sql(
+        f"""
+        SELECT added_data_files_count, existing_data_files_count, 
deleted_data_files_count
+        FROM {identifier}.all_manifests
+    """
+    ).collect()
+
+    assert [row.added_data_files_count for row in rows] == [5]
+    assert [row.existing_data_files_count for row in rows] == [0]
+    assert [row.deleted_data_files_count for row in rows] == [0]
+
+    df = spark.table(identifier)
+    assert df.count() == 5, "Expected 5 rows"
+    for col in df.columns:
+        assert df.filter(df[col].isNotNull()).count() == 5, "Expected all 5 
rows to be non-null"
+
+    partition_rows = spark.sql(
+        f"""
+        SELECT partition, record_count, file_count
+        FROM {identifier}.partitions
+    """
+    ).collect()
+
+    assert [row.record_count for row in partition_rows] == [5]
+    assert [row.file_count for row in partition_rows] == [5]
+    assert [(row.partition.baz, row.partition.qux_month) for row in 
partition_rows] == [(123, 650)]
+
+
+@pytest.mark.integration
+def test_add_files_to_bucket_partitioned_table_fails(spark: SparkSession, 
session_catalog: Catalog) -> None:

Review Comment:
   Can we parameterize this test to do both v1 and v2?



##########
tests/integration/test_add_files.py:
##########
@@ -238,3 +239,109 @@ def 
test_add_files_to_unpartitioned_table_with_schema_updates(spark: SparkSessio
     for col in df.columns:
         value_count = 1 if col == "quux" else 6
         assert df.filter(df[col].isNotNull()).count() == value_count, 
f"Expected {value_count} rows to be non-null"
+
+
+@pytest.mark.integration
+def test_add_files_to_partitioned_table(spark: SparkSession, session_catalog: 
Catalog) -> None:
+    identifier = "default.partitioned_table"
+
+    partition_spec = PartitionSpec(
+        PartitionField(source_id=4, field_id=1000, 
transform=IdentityTransform(), name="baz"),
+        PartitionField(source_id=10, field_id=1001, 
transform=MonthTransform(), name="qux_month"),
+        spec_id=0,
+    )
+
+    tbl = _create_table(session_catalog, identifier, partition_spec)
+
+    date_iter = iter([date(2024, 3, 7), date(2024, 3, 8), date(2024, 3, 16), 
date(2024, 3, 18), date(2024, 3, 19)])
+
+    file_paths = [f"s3://warehouse/default/partitioned/test-{i}.parquet" for i 
in range(5)]
+    # write parquet files
+    for file_path in file_paths:
+        fo = tbl.io.new_output(file_path)
+        with fo.create(overwrite=True) as fos:
+            with pq.ParquetWriter(fos, schema=ARROW_SCHEMA) as writer:
+                writer.write_table(
+                    pa.Table.from_pylist(
+                        [
+                            {
+                                "foo": True,
+                                "bar": "bar_string",
+                                "baz": 123,
+                                "qux": next(date_iter),
+                            }
+                        ],
+                        schema=ARROW_SCHEMA,
+                    )
+                )
+
+    # add the parquet files as data files
+    tbl.add_files(file_paths=file_paths)
+
+    # NameMapping must have been set to enable reads
+    assert tbl.name_mapping() is not None
+
+    rows = spark.sql(
+        f"""
+        SELECT added_data_files_count, existing_data_files_count, 
deleted_data_files_count
+        FROM {identifier}.all_manifests
+    """
+    ).collect()
+
+    assert [row.added_data_files_count for row in rows] == [5]
+    assert [row.existing_data_files_count for row in rows] == [0]
+    assert [row.deleted_data_files_count for row in rows] == [0]
+
+    df = spark.table(identifier)
+    assert df.count() == 5, "Expected 5 rows"
+    for col in df.columns:
+        assert df.filter(df[col].isNotNull()).count() == 5, "Expected all 5 
rows to be non-null"
+
+    partition_rows = spark.sql(
+        f"""
+        SELECT partition, record_count, file_count
+        FROM {identifier}.partitions
+    """
+    ).collect()
+
+    assert [row.record_count for row in partition_rows] == [5]
+    assert [row.file_count for row in partition_rows] == [5]
+    assert [(row.partition.baz, row.partition.qux_month) for row in 
partition_rows] == [(123, 650)]
+
+
+@pytest.mark.integration
+def test_add_files_to_bucket_partitioned_table_fails(spark: SparkSession, 
session_catalog: Catalog) -> None:
+    identifier = "default.partitioned_table_2"
+
+    partition_spec = PartitionSpec(
+        PartitionField(source_id=4, field_id=1000, 
transform=BucketTransform(num_buckets=3), name="baz_bucket_3"),
+        spec_id=0,
+    )
+
+    tbl = _create_table(session_catalog, identifier, partition_spec)
+
+    int_iter = iter(range(5))
+
+    file_paths = [f"s3://warehouse/default/partitioned_2/test-{i}.parquet" for 
i in range(5)]
+    # write parquet files
+    for file_path in file_paths:
+        fo = tbl.io.new_output(file_path)
+        with fo.create(overwrite=True) as fos:
+            with pq.ParquetWriter(fos, schema=ARROW_SCHEMA) as writer:
+                writer.write_table(
+                    pa.Table.from_pylist(
+                        [
+                            {
+                                "foo": True,
+                                "bar": "bar_string",
+                                "baz": next(int_iter),
+                                "qux": date(2024, 3, 7),
+                            }
+                        ],
+                        schema=ARROW_SCHEMA,
+                    )
+                )
+
+    # add the parquet files as data files
+    with pytest.raises(ValueError):

Review Comment:
   Can we assert the error message?



##########
pyiceberg/partitioning.py:
##########
@@ -388,16 +388,33 @@ def partition(self) -> Record:  # partition key 
transformed with iceberg interna
             if len(partition_fields) != 1:
                 raise ValueError("partition_fields must contain exactly one 
field.")
             partition_field = partition_fields[0]
-            iceberg_type = 
self.schema.find_field(name_or_id=raw_partition_field_value.field.source_id).field_type
-            iceberg_typed_value = _to_partition_representation(iceberg_type, 
raw_partition_field_value.value)
-            transformed_value = 
partition_field.transform.transform(iceberg_type)(iceberg_typed_value)
-            iceberg_typed_key_values[partition_field.name] = transformed_value
+            iceberg_typed_key_values[partition_field.name] = 
partition_record_value(
+                partition_field=partition_field,
+                value=raw_partition_field_value.value,
+                schema=self.schema,
+            )
         return Record(**iceberg_typed_key_values)
 
     def to_path(self) -> str:
         return self.partition_spec.partition_to_path(self.partition, 
self.schema)
 
 
+def partition_record_value(partition_field: PartitionField, value: Any, 
schema: Schema) -> Any:
+    """
+    Return the Partition Record representation of the value.
+
+    The value is first converted to internal partition representation.
+    For example, UUID is converted to str, DateType to epoch-days, etc.

Review Comment:
   ```suggestion
       For example, UUID is converted to bytes[16], DateType to days since 
epoch, etc.
   ```



##########
mkdocs/docs/api.md:
##########
@@ -330,6 +330,13 @@ tbl.add_files(file_paths=file_paths)
 
 <!-- prettier-ignore-start -->
 
+!!! note "Partitions"
+    `add_files` only requires the client to read the existing parquet files' 
metadata footer in order to infer the partition value of each file. This 
implementation also supports adding files to Iceberg tables with partition 
transforms like MonthTransform, and TruncateTransform which preserve the order 
of the values after the transformation (Any Transform that has 
`preserves_order` property set to True is supported).

Review Comment:
   ```suggestion
       `add_files` only requires the client to read the existing parquet files' 
metadata footer to infer the partition value of each file. This implementation 
also supports adding files to Iceberg tables with partition transforms like 
`MonthTransform`, and `TruncateTransform` which preserve the order of the 
values after the transformation (Any Transform that has the `preserves_order` 
property set to True is supported).
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to