rdblue commented on code in PR #6506: URL: https://github.com/apache/iceberg/pull/6506#discussion_r1060129011
########## python/pyiceberg/avro/resolver.py: ########## @@ -14,123 +14,250 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -from functools import singledispatch +# pylint: disable=arguments-renamed,unused-argument +from functools import partial from typing import ( + Callable, + Dict, List, Optional, Tuple, Union, ) from pyiceberg.avro.reader import ( - ConstructReader, + BinaryReader, + BooleanReader, + DateReader, + DecimalReader, + DoubleReader, + FixedReader, + FloatReader, + IntegerReader, ListReader, MapReader, NoneReader, OptionReader, Reader, + StringReader, + StructProtocolReader, StructReader, + TimeReader, + TimestampReader, + TimestamptzReader, + UUIDReader, ) from pyiceberg.exceptions import ResolveError -from pyiceberg.schema import Schema, promote, visit +from pyiceberg.schema import ( + PartnerAccessor, + PrimitiveWithPartnerVisitor, + Schema, + promote, + visit_with_partner, +) +from pyiceberg.typedef import EMPTY_DICT, StructProtocol from pyiceberg.types import ( + BinaryType, + BooleanType, + DateType, + DecimalType, DoubleType, + FixedType, FloatType, IcebergType, + IntegerType, ListType, + LongType, MapType, + NestedField, PrimitiveType, + StringType, StructType, + TimestampType, + TimestamptzType, + TimeType, + UUIDType, ) -@singledispatch -def resolve(file_schema: Union[Schema, IcebergType], read_schema: Union[Schema, IcebergType]) -> Reader: - """This resolves the file and read schema +def construct_reader(file_schema: Union[Schema, IcebergType]) -> Reader: + """Constructs a reader from a file schema + + Args: + file_schema (Schema | IcebergType): The schema of the Avro file + + Raises: + NotImplementedError: If attempting to resolve an unrecognized object type + """ + return resolve(file_schema, file_schema) + - The function traverses the schema in post-order fashion +def resolve( + file_schema: Union[Schema, IcebergType], + read_schema: Union[Schema, IcebergType], + read_types: Dict[int, Callable[[Schema], StructProtocol]] = EMPTY_DICT, +) -> Reader: + """Resolves the file and read schema to produce a reader - Args: - file_schema (Schema | IcebergType): The schema of the Avro file - read_schema (Schema | IcebergType): The requested read schema which is equal, subset or superset of the file schema + Args: + file_schema (Schema | IcebergType): The schema of the Avro file + read_schema (Schema | IcebergType): The requested read schema which is equal, subset or superset of the file schema + read_types (Dict[int, Callable[[Schema], StructProtocol]]): A dict of types to use for struct data - Raises: - NotImplementedError: If attempting to resolve an unrecognized object type + Raises: + NotImplementedError: If attempting to resolve an unrecognized object type """ - raise NotImplementedError(f"Cannot resolve non-type: {file_schema}") + return visit_with_partner(file_schema, read_schema, SchemaResolver(read_types), SchemaPartnerAccessor()) # type: ignore + + +class SchemaResolver(PrimitiveWithPartnerVisitor[IcebergType, Reader]): + read_types: Dict[int, Callable[[Schema], StructProtocol]] + field_ids: List[int] + + def before_field(self, field: NestedField, field_partner: Optional[IcebergType]) -> None: + self.field_ids.append(field.field_id) + + def after_field(self, field: NestedField, field_partner: Optional[IcebergType]) -> None: + self.field_ids.pop() + + def create_struct_reader(self, read_schema: StructType, field_readers: Tuple[Tuple[Optional[int], Reader], ...]) -> Reader: + current_field_id = self.field_ids[-1] if self.field_ids else -1 + if constructor := self.read_types.get(current_field_id): + return StructProtocolReader(field_readers, partial(constructor, read_schema)) + + return StructReader(field_readers) + + def __init__(self, read_types: Dict[int, Callable[[Schema], StructProtocol]]): + self.read_types = read_types + self.field_ids = [] + + def schema(self, schema: Schema, expected_schema: Optional[IcebergType], result: Reader) -> Reader: + return result + + def struct(self, struct: StructType, expected_struct: Optional[IcebergType], field_readers: List[Reader]) -> Reader: + if not expected_struct: + # no values are expected so the reader will only be used for skipping + return StructReader(tuple(enumerate(field_readers))) + + if not isinstance(expected_struct, StructType): + raise ResolveError(f"File/read schema are not aligned for struct, got {expected_struct}") + + results: List[Tuple[Optional[int], Reader]] = [] + expected_positions: Dict[int, int] = {field.field_id: pos for pos, field in enumerate(expected_struct.fields)} + + # first, add readers for the file fields that must be in order + for field, result_reader in zip(struct.fields, field_readers): Review Comment: This was mostly to match the previous implementation. I'm good with your suggestion here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org