rdblue commented on code in PR #6506:
URL: https://github.com/apache/iceberg/pull/6506#discussion_r1060128809


##########
python/pyiceberg/avro/resolver.py:
##########
@@ -14,123 +14,250 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-from functools import singledispatch
+# pylint: disable=arguments-renamed,unused-argument
+from functools import partial
 from typing import (
+    Callable,
+    Dict,
     List,
     Optional,
     Tuple,
     Union,
 )
 
 from pyiceberg.avro.reader import (
-    ConstructReader,
+    BinaryReader,
+    BooleanReader,
+    DateReader,
+    DecimalReader,
+    DoubleReader,
+    FixedReader,
+    FloatReader,
+    IntegerReader,
     ListReader,
     MapReader,
     NoneReader,
     OptionReader,
     Reader,
+    StringReader,
+    StructProtocolReader,
     StructReader,
+    TimeReader,
+    TimestampReader,
+    TimestamptzReader,
+    UUIDReader,
 )
 from pyiceberg.exceptions import ResolveError
-from pyiceberg.schema import Schema, promote, visit
+from pyiceberg.schema import (
+    PartnerAccessor,
+    PrimitiveWithPartnerVisitor,
+    Schema,
+    promote,
+    visit_with_partner,
+)
+from pyiceberg.typedef import EMPTY_DICT, StructProtocol
 from pyiceberg.types import (
+    BinaryType,
+    BooleanType,
+    DateType,
+    DecimalType,
     DoubleType,
+    FixedType,
     FloatType,
     IcebergType,
+    IntegerType,
     ListType,
+    LongType,
     MapType,
+    NestedField,
     PrimitiveType,
+    StringType,
     StructType,
+    TimestampType,
+    TimestamptzType,
+    TimeType,
+    UUIDType,
 )
 
 
-@singledispatch
-def resolve(file_schema: Union[Schema, IcebergType], read_schema: 
Union[Schema, IcebergType]) -> Reader:
-    """This resolves the file and read schema
+def construct_reader(file_schema: Union[Schema, IcebergType]) -> Reader:
+    """Constructs a reader from a file schema
+
+    Args:
+        file_schema (Schema | IcebergType): The schema of the Avro file
+
+    Raises:
+        NotImplementedError: If attempting to resolve an unrecognized object 
type
+    """
+    return resolve(file_schema, file_schema)
+
 
-    The function traverses the schema in post-order fashion
+def resolve(
+    file_schema: Union[Schema, IcebergType],
+    read_schema: Union[Schema, IcebergType],
+    read_types: Dict[int, Callable[[Schema], StructProtocol]] = EMPTY_DICT,
+) -> Reader:
+    """Resolves the file and read schema to produce a reader
 
-     Args:
-         file_schema (Schema | IcebergType): The schema of the Avro file
-         read_schema (Schema | IcebergType): The requested read schema which 
is equal, subset or superset of the file schema
+    Args:
+        file_schema (Schema | IcebergType): The schema of the Avro file
+        read_schema (Schema | IcebergType): The requested read schema which is 
equal, subset or superset of the file schema
+        read_types (Dict[int, Callable[[Schema], StructProtocol]]): A dict of 
types to use for struct data
 
-     Raises:
-         NotImplementedError: If attempting to resolve an unrecognized object 
type
+    Raises:
+        NotImplementedError: If attempting to resolve an unrecognized object 
type
     """
-    raise NotImplementedError(f"Cannot resolve non-type: {file_schema}")
+    return visit_with_partner(file_schema, read_schema, 
SchemaResolver(read_types), SchemaPartnerAccessor())  # type: ignore
+
+
+class SchemaResolver(PrimitiveWithPartnerVisitor[IcebergType, Reader]):
+    read_types: Dict[int, Callable[[Schema], StructProtocol]]
+    field_ids: List[int]
+
+    def before_field(self, field: NestedField, field_partner: 
Optional[IcebergType]) -> None:
+        self.field_ids.append(field.field_id)
+
+    def after_field(self, field: NestedField, field_partner: 
Optional[IcebergType]) -> None:
+        self.field_ids.pop()
+
+    def create_struct_reader(self, read_schema: StructType, field_readers: 
Tuple[Tuple[Optional[int], Reader], ...]) -> Reader:
+        current_field_id = self.field_ids[-1] if self.field_ids else -1
+        if constructor := self.read_types.get(current_field_id):
+            return StructProtocolReader(field_readers, partial(constructor, 
read_schema))
+
+        return StructReader(field_readers)
+
+    def __init__(self, read_types: Dict[int, Callable[[Schema], 
StructProtocol]]):
+        self.read_types = read_types
+        self.field_ids = []
+
+    def schema(self, schema: Schema, expected_schema: Optional[IcebergType], 
result: Reader) -> Reader:
+        return result
+
+    def struct(self, struct: StructType, expected_struct: 
Optional[IcebergType], field_readers: List[Reader]) -> Reader:
+        if not expected_struct:
+            # no values are expected so the reader will only be used for 
skipping
+            return StructReader(tuple(enumerate(field_readers)))
+
+        if not isinstance(expected_struct, StructType):
+            raise ResolveError(f"File/read schema are not aligned for struct, 
got {expected_struct}")
+
+        results: List[Tuple[Optional[int], Reader]] = []
+        expected_positions: Dict[int, int] = {field.field_id: pos for pos, 
field in enumerate(expected_struct.fields)}
+
+        # first, add readers for the file fields that must be in order
+        for field, result_reader in zip(struct.fields, field_readers):
+            read_pos = expected_positions.get(field.field_id)
+            results.append((read_pos, result_reader))
+
+        file_fields = {field.field_id: field for field in struct.fields}
+        for pos, read_field in enumerate(expected_struct.fields):
+            if read_field.field_id not in file_fields:
+                if read_field.required:
+                    raise ResolveError(f"{read_field} is non-optional, and not 
part of the file schema")
+                # Just set the new field to None
+                results.append((pos, NoneReader()))
+
+        return self.create_struct_reader(expected_struct, tuple(results))
+
+    def field(self, field: NestedField, expected_field: Optional[IcebergType], 
field_reader: Reader) -> Reader:
+        return field_reader if field.required else OptionReader(field_reader)
+
+    def list(self, list_type: ListType, expected_list: Optional[IcebergType], 
element_reader: Reader) -> Reader:
+        if expected_list and not isinstance(expected_list, ListType):
+            raise ResolveError(f"File/read schema are not aligned for list, 
got {expected_list}")
+
+        return ListReader(element_reader if list_type.element_required else 
OptionReader(element_reader))
+
+    def map(self, map_type: MapType, expected_map: Optional[IcebergType], 
key_reader: Reader, value_reader: Reader) -> Reader:
+        if expected_map and not isinstance(expected_map, MapType):
+            raise ResolveError(f"File/read schema are not aligned for map, got 
{expected_map}")
 
+        return MapReader(key_reader, value_reader if map_type.value_required 
else OptionReader(value_reader))
 
-@resolve.register(Schema)
-def _(file_schema: Schema, read_schema: Schema) -> Reader:
-    """Visit a Schema and starts resolving it by converting it to a struct"""
-    return resolve(file_schema.as_struct(), read_schema.as_struct())
+    def primitive(self, primitive: PrimitiveType, expected_primitive: 
Optional[IcebergType]) -> Reader:
+        if expected_primitive is not None:
+            if not isinstance(expected_primitive, PrimitiveType):
+                raise ResolveError(f"File/read schema are not aligned for 
{primitive}, got {expected_primitive}")
 
+            # ensure that the type can be projected to the expected
+            if primitive != expected_primitive:
+                promote(primitive, expected_primitive)

Review Comment:
   Yes, that's on purpose. This will throw an exception if promotion is 
invalid. The type used to construct the reader must match the file schema, so 
we don't want to capture the returned type and create a reader based on it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to