Fokko commented on code in PR #453:
URL: https://github.com/apache/iceberg-python/pull/453#discussion_r1505870389


##########
pyiceberg/partitioning.py:
##########
@@ -215,3 +246,59 @@ def assign_fresh_partition_spec_ids(spec: PartitionSpec, 
old_schema: Schema, fre
             )
         )
     return PartitionSpec(*partition_fields, spec_id=INITIAL_PARTITION_SPEC_ID)
+
+
+@dataclass(frozen=True)
+class PartitionFieldValue:
+    field: PartitionField
+    value: Any
+
+
+@dataclass(frozen=True)
+class PartitionKey:
+    raw_partition_field_values: List[PartitionFieldValue]
+    partition_spec: PartitionSpec
+    schema: Schema
+
+    @cached_property
+    def partition(self) -> Record:  # partition key transformed with iceberg 
internal representation as input
+        iceberg_typed_key_values = {}
+        for raw_partition_field_value in self.raw_partition_field_values:
+            partition_fields = 
self.partition_spec.source_id_to_fields_map[raw_partition_field_value.field.source_id]
+            if len(partition_fields) != 1:
+                raise ValueError("partition_fields must contain exactly one 
field.")
+            partition_field = partition_fields[0]
+            iceberg_type = 
self.schema.find_field(name_or_id=raw_partition_field_value.field.source_id).field_type
+            iceberg_typed_value = 
_to_iceberg_internal_representation(iceberg_type, 
raw_partition_field_value.value)
+            transformed_value = 
partition_field.transform.transform(iceberg_type)(iceberg_typed_value)
+            iceberg_typed_key_values[partition_field.name] = transformed_value
+        return Record(**iceberg_typed_key_values)

Review Comment:
   We're now getting into the realm of premature optimization, but ideally you 
don't need to set the names of the keys. The concept of a Record is that is 
only contains the data. Just below: 
`self.partition_spec.partition_to_path(self.partition, self.schema)` you can 
see that you both pass in the partition, and the schema itself. The positions 
of the schema should match with the data.



##########
pyiceberg/partitioning.py:
##########
@@ -215,3 +246,59 @@ def assign_fresh_partition_spec_ids(spec: PartitionSpec, 
old_schema: Schema, fre
             )
         )
     return PartitionSpec(*partition_fields, spec_id=INITIAL_PARTITION_SPEC_ID)
+
+
+@dataclass(frozen=True)
+class PartitionFieldValue:
+    field: PartitionField
+    value: Any
+
+
+@dataclass(frozen=True)
+class PartitionKey:
+    raw_partition_field_values: List[PartitionFieldValue]
+    partition_spec: PartitionSpec
+    schema: Schema
+
+    @cached_property
+    def partition(self) -> Record:  # partition key transformed with iceberg 
internal representation as input
+        iceberg_typed_key_values = {}
+        for raw_partition_field_value in self.raw_partition_field_values:
+            partition_fields = 
self.partition_spec.source_id_to_fields_map[raw_partition_field_value.field.source_id]
+            if len(partition_fields) != 1:
+                raise ValueError("partition_fields must contain exactly one 
field.")
+            partition_field = partition_fields[0]
+            iceberg_type = 
self.schema.find_field(name_or_id=raw_partition_field_value.field.source_id).field_type
+            iceberg_typed_value = 
_to_iceberg_internal_representation(iceberg_type, 
raw_partition_field_value.value)
+            transformed_value = 
partition_field.transform.transform(iceberg_type)(iceberg_typed_value)
+            iceberg_typed_key_values[partition_field.name] = transformed_value
+        return Record(**iceberg_typed_key_values)
+
+    def to_path(self) -> str:
+        return self.partition_spec.partition_to_path(self.partition, 
self.schema)
+
+
+@singledispatch
+def _to_iceberg_internal_representation(type: IcebergType, value: Any) -> Any:

Review Comment:
   To avoid confusion later on. Can we change this name to 
`_to_partition_representation`? The internal representation of a UUID is 
`bytes` and not `str`



##########
pyiceberg/partitioning.py:
##########
@@ -193,6 +207,23 @@ def partition_type(self, schema: Schema) -> StructType:
             nested_fields.append(NestedField(field.field_id, field.name, 
result_type, required=False))
         return StructType(*nested_fields)
 
+    def partition_to_path(self, data: Record, schema: Schema) -> str:
+        partition_type = self.partition_type(schema)
+        field_types = partition_type.fields
+
+        field_strs = []
+        value_strs = []
+        for pos, value in enumerate(data.record_fields()):
+            partition_field = self.fields[pos]  # partition field

Review Comment:
   ```suggestion
               partition_field = self.fields[pos]
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to