Fokko commented on code in PR #453: URL: https://github.com/apache/iceberg-python/pull/453#discussion_r1505870389
########## pyiceberg/partitioning.py: ########## @@ -215,3 +246,59 @@ def assign_fresh_partition_spec_ids(spec: PartitionSpec, old_schema: Schema, fre ) ) return PartitionSpec(*partition_fields, spec_id=INITIAL_PARTITION_SPEC_ID) + + +@dataclass(frozen=True) +class PartitionFieldValue: + field: PartitionField + value: Any + + +@dataclass(frozen=True) +class PartitionKey: + raw_partition_field_values: List[PartitionFieldValue] + partition_spec: PartitionSpec + schema: Schema + + @cached_property + def partition(self) -> Record: # partition key transformed with iceberg internal representation as input + iceberg_typed_key_values = {} + for raw_partition_field_value in self.raw_partition_field_values: + partition_fields = self.partition_spec.source_id_to_fields_map[raw_partition_field_value.field.source_id] + if len(partition_fields) != 1: + raise ValueError("partition_fields must contain exactly one field.") + partition_field = partition_fields[0] + iceberg_type = self.schema.find_field(name_or_id=raw_partition_field_value.field.source_id).field_type + iceberg_typed_value = _to_iceberg_internal_representation(iceberg_type, raw_partition_field_value.value) + transformed_value = partition_field.transform.transform(iceberg_type)(iceberg_typed_value) + iceberg_typed_key_values[partition_field.name] = transformed_value + return Record(**iceberg_typed_key_values) Review Comment: We're now getting into the realm of premature optimization, but ideally you don't need to set the names of the keys. The concept of a Record is that is only contains the data. Just below: `self.partition_spec.partition_to_path(self.partition, self.schema)` you can see that you both pass in the partition, and the schema itself. The positions of the schema should match with the data. ########## pyiceberg/partitioning.py: ########## @@ -215,3 +246,59 @@ def assign_fresh_partition_spec_ids(spec: PartitionSpec, old_schema: Schema, fre ) ) return PartitionSpec(*partition_fields, spec_id=INITIAL_PARTITION_SPEC_ID) + + +@dataclass(frozen=True) +class PartitionFieldValue: + field: PartitionField + value: Any + + +@dataclass(frozen=True) +class PartitionKey: + raw_partition_field_values: List[PartitionFieldValue] + partition_spec: PartitionSpec + schema: Schema + + @cached_property + def partition(self) -> Record: # partition key transformed with iceberg internal representation as input + iceberg_typed_key_values = {} + for raw_partition_field_value in self.raw_partition_field_values: + partition_fields = self.partition_spec.source_id_to_fields_map[raw_partition_field_value.field.source_id] + if len(partition_fields) != 1: + raise ValueError("partition_fields must contain exactly one field.") + partition_field = partition_fields[0] + iceberg_type = self.schema.find_field(name_or_id=raw_partition_field_value.field.source_id).field_type + iceberg_typed_value = _to_iceberg_internal_representation(iceberg_type, raw_partition_field_value.value) + transformed_value = partition_field.transform.transform(iceberg_type)(iceberg_typed_value) + iceberg_typed_key_values[partition_field.name] = transformed_value + return Record(**iceberg_typed_key_values) + + def to_path(self) -> str: + return self.partition_spec.partition_to_path(self.partition, self.schema) + + +@singledispatch +def _to_iceberg_internal_representation(type: IcebergType, value: Any) -> Any: Review Comment: To avoid confusion later on. Can we change this name to `_to_partition_representation`? The internal representation of a UUID is `bytes` and not `str` ########## pyiceberg/partitioning.py: ########## @@ -193,6 +207,23 @@ def partition_type(self, schema: Schema) -> StructType: nested_fields.append(NestedField(field.field_id, field.name, result_type, required=False)) return StructType(*nested_fields) + def partition_to_path(self, data: Record, schema: Schema) -> str: + partition_type = self.partition_type(schema) + field_types = partition_type.fields + + field_strs = [] + value_strs = [] + for pos, value in enumerate(data.record_fields()): + partition_field = self.fields[pos] # partition field Review Comment: ```suggestion partition_field = self.fields[pos] ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org