syun64 commented on code in PR #531:
URL: https://github.com/apache/iceberg-python/pull/531#discussion_r1530566151


##########
tests/integration/test_add_files.py:
##########
@@ -238,3 +239,109 @@ def 
test_add_files_to_unpartitioned_table_with_schema_updates(spark: SparkSessio
     for col in df.columns:
         value_count = 1 if col == "quux" else 6
         assert df.filter(df[col].isNotNull()).count() == value_count, 
f"Expected {value_count} rows to be non-null"
+
+
+@pytest.mark.integration
+def test_add_files_to_partitioned_table(spark: SparkSession, session_catalog: 
Catalog) -> None:
+    identifier = "default.partitioned_table"
+
+    partition_spec = PartitionSpec(
+        PartitionField(source_id=4, field_id=1000, 
transform=IdentityTransform(), name="baz"),
+        PartitionField(source_id=10, field_id=1001, 
transform=MonthTransform(), name="qux_month"),
+        spec_id=0,
+    )
+
+    tbl = _create_table(session_catalog, identifier, partition_spec)
+
+    date_iter = iter([date(2024, 3, 7), date(2024, 3, 8), date(2024, 3, 16), 
date(2024, 3, 18), date(2024, 3, 19)])
+
+    file_paths = [f"s3://warehouse/default/partitioned/test-{i}.parquet" for i 
in range(5)]
+    # write parquet files
+    for file_path in file_paths:
+        fo = tbl.io.new_output(file_path)
+        with fo.create(overwrite=True) as fos:
+            with pq.ParquetWriter(fos, schema=ARROW_SCHEMA) as writer:
+                writer.write_table(
+                    pa.Table.from_pylist(
+                        [
+                            {
+                                "foo": True,
+                                "bar": "bar_string",
+                                "baz": 123,
+                                "qux": next(date_iter),
+                            }
+                        ],
+                        schema=ARROW_SCHEMA,
+                    )
+                )
+
+    # add the parquet files as data files
+    tbl.add_files(file_paths=file_paths)
+
+    # NameMapping must have been set to enable reads
+    assert tbl.name_mapping() is not None
+
+    rows = spark.sql(
+        f"""
+        SELECT added_data_files_count, existing_data_files_count, 
deleted_data_files_count
+        FROM {identifier}.all_manifests
+    """
+    ).collect()
+
+    assert [row.added_data_files_count for row in rows] == [5]
+    assert [row.existing_data_files_count for row in rows] == [0]
+    assert [row.deleted_data_files_count for row in rows] == [0]
+
+    df = spark.table(identifier)
+    assert df.count() == 5, "Expected 5 rows"
+    for col in df.columns:
+        assert df.filter(df[col].isNotNull()).count() == 5, "Expected all 5 
rows to be non-null"
+
+    partition_rows = spark.sql(
+        f"""
+        SELECT partition, record_count, file_count
+        FROM {identifier}.partitions
+    """
+    ).collect()
+
+    assert [row.record_count for row in partition_rows] == [5]
+    assert [row.file_count for row in partition_rows] == [5]
+    assert [(row.partition.baz, row.partition.qux_month) for row in 
partition_rows] == [(123, 650)]
+
+
+@pytest.mark.integration
+def test_add_files_to_bucket_partitioned_table_fails(spark: SparkSession, 
session_catalog: Catalog) -> None:

Review Comment:
   Just made the change 😄 



##########
pyiceberg/io/pyarrow.py:
##########
@@ -1594,29 +1595,88 @@ def parquet_path_to_id_mapping(
     return result
 
 
-def fill_parquet_file_metadata(
-    data_file: DataFile,
+@dataclass
+class DataFileStatistics:
+    record_count: int
+    column_sizes: Dict[int, int]
+    value_counts: Dict[int, int]
+    null_value_counts: Dict[int, int]
+    nan_value_counts: Dict[int, int]
+    column_aggregates: Dict[int, StatsAggregator]
+    split_offsets: Optional[List[int]] = None
+
+    def _partition_value(self, partition_field: PartitionField, schema: 
Schema) -> Any:
+        if partition_field.source_id not in self.column_aggregates:
+            return None
+
+        if not partition_field.transform.preserves_order:
+            raise ValueError(
+                f"Cannot infer partition value from parquet metadata for a 
non-linear Partition Field. {partition_field}"
+            )
+
+        lower_value = partition_record_value(
+            partition_field=partition_field,
+            
value=self.column_aggregates[partition_field.source_id].current_min,
+            schema=schema,
+        )
+        upper_value = partition_record_value(
+            partition_field=partition_field,
+            
value=self.column_aggregates[partition_field.source_id].current_max,
+            schema=schema,
+        )
+        if lower_value != upper_value:
+            raise ValueError(
+                f"Cannot infer partition value from parquet metadata as there 
are more than one partition values: {lower_value=}, {upper_value=}"

Review Comment:
   Sure! :)



##########
pyiceberg/io/pyarrow.py:
##########
@@ -1594,29 +1595,88 @@ def parquet_path_to_id_mapping(
     return result
 
 
-def fill_parquet_file_metadata(

Review Comment:
   Awesome :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to