Fokko commented on code in PR #531:
URL: https://github.com/apache/iceberg-python/pull/531#discussion_r1529974666
##########
pyiceberg/io/pyarrow.py:
##########
@@ -1594,29 +1595,88 @@ def parquet_path_to_id_mapping(
return result
-def fill_parquet_file_metadata(
- data_file: DataFile,
+@dataclass
+class DataFileStatistics:
+ record_count: int
+ column_sizes: Dict[int, int]
+ value_counts: Dict[int, int]
+ null_value_counts: Dict[int, int]
+ nan_value_counts: Dict[int, int]
+ column_aggregates: Dict[int, StatsAggregator]
+ split_offsets: Optional[List[int]] = None
Review Comment:
Any specific reason that the `split_offsets` is nullable in contrast to the
rest.
##########
tests/integration/test_add_files.py:
##########
@@ -238,3 +239,109 @@ def
test_add_files_to_unpartitioned_table_with_schema_updates(spark: SparkSessio
for col in df.columns:
value_count = 1 if col == "quux" else 6
assert df.filter(df[col].isNotNull()).count() == value_count,
f"Expected {value_count} rows to be non-null"
+
+
[email protected]
+def test_add_files_to_partitioned_table(spark: SparkSession, session_catalog:
Catalog) -> None:
Review Comment:
Can we parameterize this test to do both v1 and v2?
##########
pyiceberg/io/pyarrow.py:
##########
@@ -1594,29 +1595,88 @@ def parquet_path_to_id_mapping(
return result
-def fill_parquet_file_metadata(
- data_file: DataFile,
+@dataclass
Review Comment:
Should we make it frozen?
##########
pyiceberg/manifest.py:
##########
@@ -389,6 +389,10 @@ def __eq__(self, other: Any) -> bool:
"""
return self.file_path == other.file_path if isinstance(other,
DataFile) else False
+ def update(self, other: Dict[str, Any]) -> None:
Review Comment:
I like to avoid mutability. Looking at the code, I think we can also create
the object at once:
```python
data_file = DataFile(
content=DataFileContent.DATA,
file_path=file_path,
file_format=FileFormat.PARQUET,
partition=Record(),
file_size_in_bytes=len(fo),
# After this has been fixed:
# https://github.com/apache/iceberg-python/issues/271
# sort_order_id=task.sort_order_id,
sort_order_id=None,
# Just copy these from the table for now
spec_id=table_metadata.default_spec_id,
equality_ids=None,
key_metadata=None,
**statistics.to_serialized_dict()
)
```
##########
pyiceberg/io/pyarrow.py:
##########
@@ -1594,29 +1595,88 @@ def parquet_path_to_id_mapping(
return result
-def fill_parquet_file_metadata(
- data_file: DataFile,
+@dataclass
+class DataFileStatistics:
+ record_count: int
+ column_sizes: Dict[int, int]
+ value_counts: Dict[int, int]
+ null_value_counts: Dict[int, int]
+ nan_value_counts: Dict[int, int]
+ column_aggregates: Dict[int, StatsAggregator]
+ split_offsets: Optional[List[int]] = None
+
+ def _partition_value(self, partition_field: PartitionField, schema:
Schema) -> Any:
+ if partition_field.source_id not in self.column_aggregates:
+ return None
+
+ if not partition_field.transform.preserves_order:
+ raise ValueError(
+ f"Cannot infer partition value from parquet metadata for a
non-linear Partition Field. {partition_field}"
+ )
+
+ lower_value = partition_record_value(
+ partition_field=partition_field,
+
value=self.column_aggregates[partition_field.source_id].current_min,
+ schema=schema,
+ )
+ upper_value = partition_record_value(
+ partition_field=partition_field,
+
value=self.column_aggregates[partition_field.source_id].current_max,
+ schema=schema,
+ )
+ if lower_value != upper_value:
+ raise ValueError(
+ f"Cannot infer partition value from parquet metadata as there
are more than one partition values: {lower_value=}, {upper_value=}"
Review Comment:
Can we add a test for this?
##########
pyiceberg/io/pyarrow.py:
##########
@@ -1594,29 +1595,88 @@ def parquet_path_to_id_mapping(
return result
-def fill_parquet_file_metadata(
Review Comment:
Removing this method will break daft:
https://github.com/Eventual-Inc/Daft/pull/2016
However, I wouldn't say I like that we assign this in place, so maybe we
should go for it.
##########
tests/integration/test_add_files.py:
##########
@@ -238,3 +239,109 @@ def
test_add_files_to_unpartitioned_table_with_schema_updates(spark: SparkSessio
for col in df.columns:
value_count = 1 if col == "quux" else 6
assert df.filter(df[col].isNotNull()).count() == value_count,
f"Expected {value_count} rows to be non-null"
+
+
[email protected]
+def test_add_files_to_partitioned_table(spark: SparkSession, session_catalog:
Catalog) -> None:
+ identifier = "default.partitioned_table"
+
+ partition_spec = PartitionSpec(
+ PartitionField(source_id=4, field_id=1000,
transform=IdentityTransform(), name="baz"),
+ PartitionField(source_id=10, field_id=1001,
transform=MonthTransform(), name="qux_month"),
+ spec_id=0,
+ )
+
+ tbl = _create_table(session_catalog, identifier, partition_spec)
+
+ date_iter = iter([date(2024, 3, 7), date(2024, 3, 8), date(2024, 3, 16),
date(2024, 3, 18), date(2024, 3, 19)])
+
+ file_paths = [f"s3://warehouse/default/partitioned/test-{i}.parquet" for i
in range(5)]
+ # write parquet files
+ for file_path in file_paths:
+ fo = tbl.io.new_output(file_path)
+ with fo.create(overwrite=True) as fos:
+ with pq.ParquetWriter(fos, schema=ARROW_SCHEMA) as writer:
+ writer.write_table(
+ pa.Table.from_pylist(
+ [
+ {
+ "foo": True,
+ "bar": "bar_string",
+ "baz": 123,
+ "qux": next(date_iter),
+ }
+ ],
+ schema=ARROW_SCHEMA,
+ )
+ )
+
+ # add the parquet files as data files
+ tbl.add_files(file_paths=file_paths)
+
+ # NameMapping must have been set to enable reads
+ assert tbl.name_mapping() is not None
+
+ rows = spark.sql(
+ f"""
+ SELECT added_data_files_count, existing_data_files_count,
deleted_data_files_count
+ FROM {identifier}.all_manifests
+ """
+ ).collect()
+
+ assert [row.added_data_files_count for row in rows] == [5]
+ assert [row.existing_data_files_count for row in rows] == [0]
+ assert [row.deleted_data_files_count for row in rows] == [0]
+
+ df = spark.table(identifier)
+ assert df.count() == 5, "Expected 5 rows"
+ for col in df.columns:
+ assert df.filter(df[col].isNotNull()).count() == 5, "Expected all 5
rows to be non-null"
+
+ partition_rows = spark.sql(
+ f"""
+ SELECT partition, record_count, file_count
+ FROM {identifier}.partitions
+ """
+ ).collect()
+
+ assert [row.record_count for row in partition_rows] == [5]
+ assert [row.file_count for row in partition_rows] == [5]
+ assert [(row.partition.baz, row.partition.qux_month) for row in
partition_rows] == [(123, 650)]
+
+
[email protected]
+def test_add_files_to_bucket_partitioned_table_fails(spark: SparkSession,
session_catalog: Catalog) -> None:
Review Comment:
Can we parameterize this test to do both v1 and v2?
##########
tests/integration/test_add_files.py:
##########
@@ -238,3 +239,109 @@ def
test_add_files_to_unpartitioned_table_with_schema_updates(spark: SparkSessio
for col in df.columns:
value_count = 1 if col == "quux" else 6
assert df.filter(df[col].isNotNull()).count() == value_count,
f"Expected {value_count} rows to be non-null"
+
+
[email protected]
+def test_add_files_to_partitioned_table(spark: SparkSession, session_catalog:
Catalog) -> None:
+ identifier = "default.partitioned_table"
+
+ partition_spec = PartitionSpec(
+ PartitionField(source_id=4, field_id=1000,
transform=IdentityTransform(), name="baz"),
+ PartitionField(source_id=10, field_id=1001,
transform=MonthTransform(), name="qux_month"),
+ spec_id=0,
+ )
+
+ tbl = _create_table(session_catalog, identifier, partition_spec)
+
+ date_iter = iter([date(2024, 3, 7), date(2024, 3, 8), date(2024, 3, 16),
date(2024, 3, 18), date(2024, 3, 19)])
+
+ file_paths = [f"s3://warehouse/default/partitioned/test-{i}.parquet" for i
in range(5)]
+ # write parquet files
+ for file_path in file_paths:
+ fo = tbl.io.new_output(file_path)
+ with fo.create(overwrite=True) as fos:
+ with pq.ParquetWriter(fos, schema=ARROW_SCHEMA) as writer:
+ writer.write_table(
+ pa.Table.from_pylist(
+ [
+ {
+ "foo": True,
+ "bar": "bar_string",
+ "baz": 123,
+ "qux": next(date_iter),
+ }
+ ],
+ schema=ARROW_SCHEMA,
+ )
+ )
+
+ # add the parquet files as data files
+ tbl.add_files(file_paths=file_paths)
+
+ # NameMapping must have been set to enable reads
+ assert tbl.name_mapping() is not None
+
+ rows = spark.sql(
+ f"""
+ SELECT added_data_files_count, existing_data_files_count,
deleted_data_files_count
+ FROM {identifier}.all_manifests
+ """
+ ).collect()
+
+ assert [row.added_data_files_count for row in rows] == [5]
+ assert [row.existing_data_files_count for row in rows] == [0]
+ assert [row.deleted_data_files_count for row in rows] == [0]
+
+ df = spark.table(identifier)
+ assert df.count() == 5, "Expected 5 rows"
+ for col in df.columns:
+ assert df.filter(df[col].isNotNull()).count() == 5, "Expected all 5
rows to be non-null"
+
+ partition_rows = spark.sql(
+ f"""
+ SELECT partition, record_count, file_count
+ FROM {identifier}.partitions
+ """
+ ).collect()
+
+ assert [row.record_count for row in partition_rows] == [5]
+ assert [row.file_count for row in partition_rows] == [5]
+ assert [(row.partition.baz, row.partition.qux_month) for row in
partition_rows] == [(123, 650)]
+
+
[email protected]
+def test_add_files_to_bucket_partitioned_table_fails(spark: SparkSession,
session_catalog: Catalog) -> None:
+ identifier = "default.partitioned_table_2"
+
+ partition_spec = PartitionSpec(
+ PartitionField(source_id=4, field_id=1000,
transform=BucketTransform(num_buckets=3), name="baz_bucket_3"),
+ spec_id=0,
+ )
+
+ tbl = _create_table(session_catalog, identifier, partition_spec)
+
+ int_iter = iter(range(5))
+
+ file_paths = [f"s3://warehouse/default/partitioned_2/test-{i}.parquet" for
i in range(5)]
+ # write parquet files
+ for file_path in file_paths:
+ fo = tbl.io.new_output(file_path)
+ with fo.create(overwrite=True) as fos:
+ with pq.ParquetWriter(fos, schema=ARROW_SCHEMA) as writer:
+ writer.write_table(
+ pa.Table.from_pylist(
+ [
+ {
+ "foo": True,
+ "bar": "bar_string",
+ "baz": next(int_iter),
+ "qux": date(2024, 3, 7),
+ }
+ ],
+ schema=ARROW_SCHEMA,
+ )
+ )
+
+ # add the parquet files as data files
+ with pytest.raises(ValueError):
Review Comment:
Can we assert the error message?
##########
pyiceberg/partitioning.py:
##########
@@ -388,16 +388,33 @@ def partition(self) -> Record: # partition key
transformed with iceberg interna
if len(partition_fields) != 1:
raise ValueError("partition_fields must contain exactly one
field.")
partition_field = partition_fields[0]
- iceberg_type =
self.schema.find_field(name_or_id=raw_partition_field_value.field.source_id).field_type
- iceberg_typed_value = _to_partition_representation(iceberg_type,
raw_partition_field_value.value)
- transformed_value =
partition_field.transform.transform(iceberg_type)(iceberg_typed_value)
- iceberg_typed_key_values[partition_field.name] = transformed_value
+ iceberg_typed_key_values[partition_field.name] =
partition_record_value(
+ partition_field=partition_field,
+ value=raw_partition_field_value.value,
+ schema=self.schema,
+ )
return Record(**iceberg_typed_key_values)
def to_path(self) -> str:
return self.partition_spec.partition_to_path(self.partition,
self.schema)
+def partition_record_value(partition_field: PartitionField, value: Any,
schema: Schema) -> Any:
+ """
+ Return the Partition Record representation of the value.
+
+ The value is first converted to internal partition representation.
+ For example, UUID is converted to str, DateType to epoch-days, etc.
Review Comment:
```suggestion
For example, UUID is converted to bytes[16], DateType to days since
epoch, etc.
```
##########
mkdocs/docs/api.md:
##########
@@ -330,6 +330,13 @@ tbl.add_files(file_paths=file_paths)
<!-- prettier-ignore-start -->
+!!! note "Partitions"
+ `add_files` only requires the client to read the existing parquet files'
metadata footer in order to infer the partition value of each file. This
implementation also supports adding files to Iceberg tables with partition
transforms like MonthTransform, and TruncateTransform which preserve the order
of the values after the transformation (Any Transform that has
`preserves_order` property set to True is supported).
Review Comment:
```suggestion
`add_files` only requires the client to read the existing parquet files'
metadata footer to infer the partition value of each file. This implementation
also supports adding files to Iceberg tables with partition transforms like
`MonthTransform`, and `TruncateTransform` which preserve the order of the
values after the transformation (Any Transform that has the `preserves_order`
property set to True is supported).
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]