Fokko commented on code in PR #531: URL: https://github.com/apache/iceberg-python/pull/531#discussion_r1529974666
########## pyiceberg/io/pyarrow.py: ########## @@ -1594,29 +1595,88 @@ def parquet_path_to_id_mapping( return result -def fill_parquet_file_metadata( - data_file: DataFile, +@dataclass +class DataFileStatistics: + record_count: int + column_sizes: Dict[int, int] + value_counts: Dict[int, int] + null_value_counts: Dict[int, int] + nan_value_counts: Dict[int, int] + column_aggregates: Dict[int, StatsAggregator] + split_offsets: Optional[List[int]] = None Review Comment: Any specific reason that the `split_offsets` is nullable in contrast to the rest. ########## tests/integration/test_add_files.py: ########## @@ -238,3 +239,109 @@ def test_add_files_to_unpartitioned_table_with_schema_updates(spark: SparkSessio for col in df.columns: value_count = 1 if col == "quux" else 6 assert df.filter(df[col].isNotNull()).count() == value_count, f"Expected {value_count} rows to be non-null" + + +@pytest.mark.integration +def test_add_files_to_partitioned_table(spark: SparkSession, session_catalog: Catalog) -> None: Review Comment: Can we parameterize this test to do both v1 and v2? ########## pyiceberg/io/pyarrow.py: ########## @@ -1594,29 +1595,88 @@ def parquet_path_to_id_mapping( return result -def fill_parquet_file_metadata( - data_file: DataFile, +@dataclass Review Comment: Should we make it frozen? ########## pyiceberg/manifest.py: ########## @@ -389,6 +389,10 @@ def __eq__(self, other: Any) -> bool: """ return self.file_path == other.file_path if isinstance(other, DataFile) else False + def update(self, other: Dict[str, Any]) -> None: Review Comment: I like to avoid mutability. Looking at the code, I think we can also create the object at once: ```python data_file = DataFile( content=DataFileContent.DATA, file_path=file_path, file_format=FileFormat.PARQUET, partition=Record(), file_size_in_bytes=len(fo), # After this has been fixed: # https://github.com/apache/iceberg-python/issues/271 # sort_order_id=task.sort_order_id, sort_order_id=None, # Just copy these from the table for now spec_id=table_metadata.default_spec_id, equality_ids=None, key_metadata=None, **statistics.to_serialized_dict() ) ``` ########## pyiceberg/io/pyarrow.py: ########## @@ -1594,29 +1595,88 @@ def parquet_path_to_id_mapping( return result -def fill_parquet_file_metadata( - data_file: DataFile, +@dataclass +class DataFileStatistics: + record_count: int + column_sizes: Dict[int, int] + value_counts: Dict[int, int] + null_value_counts: Dict[int, int] + nan_value_counts: Dict[int, int] + column_aggregates: Dict[int, StatsAggregator] + split_offsets: Optional[List[int]] = None + + def _partition_value(self, partition_field: PartitionField, schema: Schema) -> Any: + if partition_field.source_id not in self.column_aggregates: + return None + + if not partition_field.transform.preserves_order: + raise ValueError( + f"Cannot infer partition value from parquet metadata for a non-linear Partition Field. {partition_field}" + ) + + lower_value = partition_record_value( + partition_field=partition_field, + value=self.column_aggregates[partition_field.source_id].current_min, + schema=schema, + ) + upper_value = partition_record_value( + partition_field=partition_field, + value=self.column_aggregates[partition_field.source_id].current_max, + schema=schema, + ) + if lower_value != upper_value: + raise ValueError( + f"Cannot infer partition value from parquet metadata as there are more than one partition values: {lower_value=}, {upper_value=}" Review Comment: Can we add a test for this? ########## pyiceberg/io/pyarrow.py: ########## @@ -1594,29 +1595,88 @@ def parquet_path_to_id_mapping( return result -def fill_parquet_file_metadata( Review Comment: Removing this method will break daft: https://github.com/Eventual-Inc/Daft/pull/2016 However, I wouldn't say I like that we assign this in place, so maybe we should go for it. ########## tests/integration/test_add_files.py: ########## @@ -238,3 +239,109 @@ def test_add_files_to_unpartitioned_table_with_schema_updates(spark: SparkSessio for col in df.columns: value_count = 1 if col == "quux" else 6 assert df.filter(df[col].isNotNull()).count() == value_count, f"Expected {value_count} rows to be non-null" + + +@pytest.mark.integration +def test_add_files_to_partitioned_table(spark: SparkSession, session_catalog: Catalog) -> None: + identifier = "default.partitioned_table" + + partition_spec = PartitionSpec( + PartitionField(source_id=4, field_id=1000, transform=IdentityTransform(), name="baz"), + PartitionField(source_id=10, field_id=1001, transform=MonthTransform(), name="qux_month"), + spec_id=0, + ) + + tbl = _create_table(session_catalog, identifier, partition_spec) + + date_iter = iter([date(2024, 3, 7), date(2024, 3, 8), date(2024, 3, 16), date(2024, 3, 18), date(2024, 3, 19)]) + + file_paths = [f"s3://warehouse/default/partitioned/test-{i}.parquet" for i in range(5)] + # write parquet files + for file_path in file_paths: + fo = tbl.io.new_output(file_path) + with fo.create(overwrite=True) as fos: + with pq.ParquetWriter(fos, schema=ARROW_SCHEMA) as writer: + writer.write_table( + pa.Table.from_pylist( + [ + { + "foo": True, + "bar": "bar_string", + "baz": 123, + "qux": next(date_iter), + } + ], + schema=ARROW_SCHEMA, + ) + ) + + # add the parquet files as data files + tbl.add_files(file_paths=file_paths) + + # NameMapping must have been set to enable reads + assert tbl.name_mapping() is not None + + rows = spark.sql( + f""" + SELECT added_data_files_count, existing_data_files_count, deleted_data_files_count + FROM {identifier}.all_manifests + """ + ).collect() + + assert [row.added_data_files_count for row in rows] == [5] + assert [row.existing_data_files_count for row in rows] == [0] + assert [row.deleted_data_files_count for row in rows] == [0] + + df = spark.table(identifier) + assert df.count() == 5, "Expected 5 rows" + for col in df.columns: + assert df.filter(df[col].isNotNull()).count() == 5, "Expected all 5 rows to be non-null" + + partition_rows = spark.sql( + f""" + SELECT partition, record_count, file_count + FROM {identifier}.partitions + """ + ).collect() + + assert [row.record_count for row in partition_rows] == [5] + assert [row.file_count for row in partition_rows] == [5] + assert [(row.partition.baz, row.partition.qux_month) for row in partition_rows] == [(123, 650)] + + +@pytest.mark.integration +def test_add_files_to_bucket_partitioned_table_fails(spark: SparkSession, session_catalog: Catalog) -> None: Review Comment: Can we parameterize this test to do both v1 and v2? ########## tests/integration/test_add_files.py: ########## @@ -238,3 +239,109 @@ def test_add_files_to_unpartitioned_table_with_schema_updates(spark: SparkSessio for col in df.columns: value_count = 1 if col == "quux" else 6 assert df.filter(df[col].isNotNull()).count() == value_count, f"Expected {value_count} rows to be non-null" + + +@pytest.mark.integration +def test_add_files_to_partitioned_table(spark: SparkSession, session_catalog: Catalog) -> None: + identifier = "default.partitioned_table" + + partition_spec = PartitionSpec( + PartitionField(source_id=4, field_id=1000, transform=IdentityTransform(), name="baz"), + PartitionField(source_id=10, field_id=1001, transform=MonthTransform(), name="qux_month"), + spec_id=0, + ) + + tbl = _create_table(session_catalog, identifier, partition_spec) + + date_iter = iter([date(2024, 3, 7), date(2024, 3, 8), date(2024, 3, 16), date(2024, 3, 18), date(2024, 3, 19)]) + + file_paths = [f"s3://warehouse/default/partitioned/test-{i}.parquet" for i in range(5)] + # write parquet files + for file_path in file_paths: + fo = tbl.io.new_output(file_path) + with fo.create(overwrite=True) as fos: + with pq.ParquetWriter(fos, schema=ARROW_SCHEMA) as writer: + writer.write_table( + pa.Table.from_pylist( + [ + { + "foo": True, + "bar": "bar_string", + "baz": 123, + "qux": next(date_iter), + } + ], + schema=ARROW_SCHEMA, + ) + ) + + # add the parquet files as data files + tbl.add_files(file_paths=file_paths) + + # NameMapping must have been set to enable reads + assert tbl.name_mapping() is not None + + rows = spark.sql( + f""" + SELECT added_data_files_count, existing_data_files_count, deleted_data_files_count + FROM {identifier}.all_manifests + """ + ).collect() + + assert [row.added_data_files_count for row in rows] == [5] + assert [row.existing_data_files_count for row in rows] == [0] + assert [row.deleted_data_files_count for row in rows] == [0] + + df = spark.table(identifier) + assert df.count() == 5, "Expected 5 rows" + for col in df.columns: + assert df.filter(df[col].isNotNull()).count() == 5, "Expected all 5 rows to be non-null" + + partition_rows = spark.sql( + f""" + SELECT partition, record_count, file_count + FROM {identifier}.partitions + """ + ).collect() + + assert [row.record_count for row in partition_rows] == [5] + assert [row.file_count for row in partition_rows] == [5] + assert [(row.partition.baz, row.partition.qux_month) for row in partition_rows] == [(123, 650)] + + +@pytest.mark.integration +def test_add_files_to_bucket_partitioned_table_fails(spark: SparkSession, session_catalog: Catalog) -> None: + identifier = "default.partitioned_table_2" + + partition_spec = PartitionSpec( + PartitionField(source_id=4, field_id=1000, transform=BucketTransform(num_buckets=3), name="baz_bucket_3"), + spec_id=0, + ) + + tbl = _create_table(session_catalog, identifier, partition_spec) + + int_iter = iter(range(5)) + + file_paths = [f"s3://warehouse/default/partitioned_2/test-{i}.parquet" for i in range(5)] + # write parquet files + for file_path in file_paths: + fo = tbl.io.new_output(file_path) + with fo.create(overwrite=True) as fos: + with pq.ParquetWriter(fos, schema=ARROW_SCHEMA) as writer: + writer.write_table( + pa.Table.from_pylist( + [ + { + "foo": True, + "bar": "bar_string", + "baz": next(int_iter), + "qux": date(2024, 3, 7), + } + ], + schema=ARROW_SCHEMA, + ) + ) + + # add the parquet files as data files + with pytest.raises(ValueError): Review Comment: Can we assert the error message? ########## pyiceberg/partitioning.py: ########## @@ -388,16 +388,33 @@ def partition(self) -> Record: # partition key transformed with iceberg interna if len(partition_fields) != 1: raise ValueError("partition_fields must contain exactly one field.") partition_field = partition_fields[0] - iceberg_type = self.schema.find_field(name_or_id=raw_partition_field_value.field.source_id).field_type - iceberg_typed_value = _to_partition_representation(iceberg_type, raw_partition_field_value.value) - transformed_value = partition_field.transform.transform(iceberg_type)(iceberg_typed_value) - iceberg_typed_key_values[partition_field.name] = transformed_value + iceberg_typed_key_values[partition_field.name] = partition_record_value( + partition_field=partition_field, + value=raw_partition_field_value.value, + schema=self.schema, + ) return Record(**iceberg_typed_key_values) def to_path(self) -> str: return self.partition_spec.partition_to_path(self.partition, self.schema) +def partition_record_value(partition_field: PartitionField, value: Any, schema: Schema) -> Any: + """ + Return the Partition Record representation of the value. + + The value is first converted to internal partition representation. + For example, UUID is converted to str, DateType to epoch-days, etc. Review Comment: ```suggestion For example, UUID is converted to bytes[16], DateType to days since epoch, etc. ``` ########## mkdocs/docs/api.md: ########## @@ -330,6 +330,13 @@ tbl.add_files(file_paths=file_paths) <!-- prettier-ignore-start --> +!!! note "Partitions" + `add_files` only requires the client to read the existing parquet files' metadata footer in order to infer the partition value of each file. This implementation also supports adding files to Iceberg tables with partition transforms like MonthTransform, and TruncateTransform which preserve the order of the values after the transformation (Any Transform that has `preserves_order` property set to True is supported). Review Comment: ```suggestion `add_files` only requires the client to read the existing parquet files' metadata footer to infer the partition value of each file. This implementation also supports adding files to Iceberg tables with partition transforms like `MonthTransform`, and `TruncateTransform` which preserve the order of the values after the transformation (Any Transform that has the `preserves_order` property set to True is supported). ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org