Fokko commented on code in PR #40:
URL: https://github.com/apache/iceberg-python/pull/40#discussion_r1349777029


##########
pyiceberg/avro/resolver.py:
##########
@@ -233,7 +255,107 @@ def skip(self, decoder: BinaryDecoder) -> None:
         pass
 
 
-class SchemaResolver(PrimitiveWithPartnerVisitor[IcebergType, Reader]):
+class WriteSchemaResolver(PrimitiveWithPartnerVisitor[IcebergType, Writer]):
+    def schema(self, schema: Schema, expected_schema: Optional[IcebergType], 
result: Writer) -> Writer:
+        return result
+
+    def struct(self, struct: StructType, provided_struct: 
Optional[IcebergType], field_writers: List[Writer]) -> Writer:
+        if not isinstance(provided_struct, StructType):
+            raise ResolveError(f"File/write schema are not aligned for struct, 
got {provided_struct}")
+
+        provided_struct_positions: Dict[int, int] = {field.field_id: pos for 
pos, field in enumerate(provided_struct.fields)}
+
+        results: List[Tuple[Optional[int], Writer]] = []
+        iter(field_writers)
+
+        for pos, write_field in enumerate(struct.fields):
+            if write_field.field_id in provided_struct_positions:
+                
results.append((provided_struct_positions[write_field.field_id], 
field_writers[pos]))
+            else:
+                # There is a default value
+                if isinstance(write_field, NestedField) and 
write_field.write_default is not None:
+                    # The field is not in the record, but there is a write 
default value
+                    default_writer = DefaultWriter(
+                        writer=visit(write_field.field_type, 
CONSTRUCT_WRITER_VISITOR), value=write_field.write_default
+                    )
+                    results.append((None, default_writer))
+                elif write_field.required:
+                    raise ValueError(f"Field is required, and there is no 
write default: {write_field}")
+                else:
+                    results.append((pos, NoneWriter()))
+
+        return StructWriter(field_writers=tuple(results))
+
+    def field(self, field: NestedField, expected_field: Optional[IcebergType], 
field_writer: Writer) -> Writer:
+        return field_writer if field.required else OptionWriter(field_writer)
+
+    def list(self, list_type: ListType, expected_list: Optional[IcebergType], 
element_reader: Writer) -> Writer:
+        if expected_list and not isinstance(expected_list, ListType):
+            raise ResolveError(f"File/read schema are not aligned for list, 
got {expected_list}")
+
+        return ListWriter(element_reader if list_type.element_required else 
OptionWriter(element_reader))
+
+    def map(self, map_type: MapType, expected_map: Optional[IcebergType], 
key_reader: Writer, value_reader: Writer) -> Writer:
+        if expected_map and not isinstance(expected_map, MapType):
+            raise ResolveError(f"File/read schema are not aligned for map, got 
{expected_map}")
+
+        return MapWriter(key_reader, value_reader if map_type.value_required 
else OptionWriter(value_reader))
+
+    def primitive(self, primitive: PrimitiveType, expected_primitive: 
Optional[IcebergType]) -> Writer:
+        if expected_primitive is not None:
+            if not isinstance(expected_primitive, PrimitiveType):
+                raise ResolveError(f"File/read schema are not aligned for 
{primitive}, got {expected_primitive}")
+
+            # ensure that the type can be projected to the expected
+            if primitive != expected_primitive:
+                promote(primitive, expected_primitive)
+
+        return super().primitive(primitive, expected_primitive)
+
+    def visit_boolean(self, boolean_type: BooleanType, partner: 
Optional[IcebergType]) -> Writer:
+        return BooleanWriter()
+
+    def visit_integer(self, integer_type: IntegerType, partner: 
Optional[IcebergType]) -> Writer:
+        return IntegerWriter()
+
+    def visit_long(self, long_type: LongType, partner: Optional[IcebergType]) 
-> Writer:
+        return IntegerWriter()
+
+    def visit_float(self, float_type: FloatType, partner: 
Optional[IcebergType]) -> Writer:
+        return FloatWriter()
+
+    def visit_double(self, double_type: DoubleType, partner: 
Optional[IcebergType]) -> Writer:
+        return DoubleWriter()

Review Comment:
   We don't support the un-promotion of types, and the writer tree guarantees 
that the binary data that is written is the same type as in the Avro schema. 
I've added a test around un-promotion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to