syun64 commented on code in PR #531: URL: https://github.com/apache/iceberg-python/pull/531#discussion_r1530566151
########## tests/integration/test_add_files.py: ########## @@ -238,3 +239,109 @@ def test_add_files_to_unpartitioned_table_with_schema_updates(spark: SparkSessio for col in df.columns: value_count = 1 if col == "quux" else 6 assert df.filter(df[col].isNotNull()).count() == value_count, f"Expected {value_count} rows to be non-null" + + +@pytest.mark.integration +def test_add_files_to_partitioned_table(spark: SparkSession, session_catalog: Catalog) -> None: + identifier = "default.partitioned_table" + + partition_spec = PartitionSpec( + PartitionField(source_id=4, field_id=1000, transform=IdentityTransform(), name="baz"), + PartitionField(source_id=10, field_id=1001, transform=MonthTransform(), name="qux_month"), + spec_id=0, + ) + + tbl = _create_table(session_catalog, identifier, partition_spec) + + date_iter = iter([date(2024, 3, 7), date(2024, 3, 8), date(2024, 3, 16), date(2024, 3, 18), date(2024, 3, 19)]) + + file_paths = [f"s3://warehouse/default/partitioned/test-{i}.parquet" for i in range(5)] + # write parquet files + for file_path in file_paths: + fo = tbl.io.new_output(file_path) + with fo.create(overwrite=True) as fos: + with pq.ParquetWriter(fos, schema=ARROW_SCHEMA) as writer: + writer.write_table( + pa.Table.from_pylist( + [ + { + "foo": True, + "bar": "bar_string", + "baz": 123, + "qux": next(date_iter), + } + ], + schema=ARROW_SCHEMA, + ) + ) + + # add the parquet files as data files + tbl.add_files(file_paths=file_paths) + + # NameMapping must have been set to enable reads + assert tbl.name_mapping() is not None + + rows = spark.sql( + f""" + SELECT added_data_files_count, existing_data_files_count, deleted_data_files_count + FROM {identifier}.all_manifests + """ + ).collect() + + assert [row.added_data_files_count for row in rows] == [5] + assert [row.existing_data_files_count for row in rows] == [0] + assert [row.deleted_data_files_count for row in rows] == [0] + + df = spark.table(identifier) + assert df.count() == 5, "Expected 5 rows" + for col in df.columns: + assert df.filter(df[col].isNotNull()).count() == 5, "Expected all 5 rows to be non-null" + + partition_rows = spark.sql( + f""" + SELECT partition, record_count, file_count + FROM {identifier}.partitions + """ + ).collect() + + assert [row.record_count for row in partition_rows] == [5] + assert [row.file_count for row in partition_rows] == [5] + assert [(row.partition.baz, row.partition.qux_month) for row in partition_rows] == [(123, 650)] + + +@pytest.mark.integration +def test_add_files_to_bucket_partitioned_table_fails(spark: SparkSession, session_catalog: Catalog) -> None: Review Comment: Just made the change 😄 ########## pyiceberg/io/pyarrow.py: ########## @@ -1594,29 +1595,88 @@ def parquet_path_to_id_mapping( return result -def fill_parquet_file_metadata( - data_file: DataFile, +@dataclass +class DataFileStatistics: + record_count: int + column_sizes: Dict[int, int] + value_counts: Dict[int, int] + null_value_counts: Dict[int, int] + nan_value_counts: Dict[int, int] + column_aggregates: Dict[int, StatsAggregator] + split_offsets: Optional[List[int]] = None + + def _partition_value(self, partition_field: PartitionField, schema: Schema) -> Any: + if partition_field.source_id not in self.column_aggregates: + return None + + if not partition_field.transform.preserves_order: + raise ValueError( + f"Cannot infer partition value from parquet metadata for a non-linear Partition Field. {partition_field}" + ) + + lower_value = partition_record_value( + partition_field=partition_field, + value=self.column_aggregates[partition_field.source_id].current_min, + schema=schema, + ) + upper_value = partition_record_value( + partition_field=partition_field, + value=self.column_aggregates[partition_field.source_id].current_max, + schema=schema, + ) + if lower_value != upper_value: + raise ValueError( + f"Cannot infer partition value from parquet metadata as there are more than one partition values: {lower_value=}, {upper_value=}" Review Comment: Sure! :) ########## pyiceberg/io/pyarrow.py: ########## @@ -1594,29 +1595,88 @@ def parquet_path_to_id_mapping( return result -def fill_parquet_file_metadata( Review Comment: Awesome :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org