HonahX commented on code in PR #219: URL: https://github.com/apache/iceberg-python/pull/219#discussion_r1433405001
########## pyiceberg/io/pyarrow.py: ########## @@ -828,7 +933,9 @@ def _task_to_table( schema_raw = metadata.get(ICEBERG_SCHEMA) # TODO: if field_ids are not present, Name Mapping should be implemented to look them up in the table schema, Review Comment: I think this TODO can be removed😊 ########## pyiceberg/io/pyarrow.py: ########## @@ -698,77 +708,147 @@ def before_field(self, field: pa.Field) -> None: def after_field(self, field: pa.Field) -> None: """Override this method to perform an action immediately after visiting a field.""" + def before_list_element(self, element: pa.Field) -> None: + """Override this method to perform an action immediately before visiting an element within a ListType.""" + + def after_list_element(self, element: pa.Field) -> None: + """Override this method to perform an action immediately after visiting an element within a ListType.""" + + def before_map_key(self, key: pa.Field) -> None: + """Override this method to perform an action immediately before visiting a key within a MapType.""" + + def after_map_key(self, key: pa.Field) -> None: + """Override this method to perform an action immediately after visiting a key within a MapType.""" + + def before_map_value(self, value: pa.Field) -> None: + """Override this method to perform an action immediately before visiting a value within a MapType.""" + + def after_map_value(self, value: pa.Field) -> None: + """Override this method to perform an action immediately after visiting a value within a MapType.""" + @abstractmethod - def schema(self, schema: pa.Schema, field_results: List[Optional[T]]) -> Optional[T]: + def schema(self, schema: pa.Schema, struct_result: T) -> T: """Visit a schema.""" @abstractmethod - def struct(self, struct: pa.StructType, field_results: List[Optional[T]]) -> Optional[T]: + def struct(self, struct: pa.StructType, field_results: List[T]) -> T: """Visit a struct.""" @abstractmethod - def list(self, list_type: pa.ListType, element_result: Optional[T]) -> Optional[T]: + def field(self, field: pa.Field, field_result: T) -> T: + """Visit a field.""" + + @abstractmethod + def list(self, list_type: pa.ListType, element_result: T) -> T: """Visit a list.""" @abstractmethod - def map(self, map_type: pa.MapType, key_result: Optional[T], value_result: Optional[T]) -> Optional[T]: + def map(self, map_type: pa.MapType, key_result: T, value_result: T) -> T: """Visit a map.""" @abstractmethod - def primitive(self, primitive: pa.DataType) -> Optional[T]: + def primitive(self, primitive: pa.DataType) -> T: """Visit a primitive type.""" def _get_field_id(field: pa.Field) -> Optional[int]: - for pyarrow_field_id_key in PYARROW_FIELD_ID_KEYS: - if field_id_str := field.metadata.get(pyarrow_field_id_key): - return int(field_id_str.decode()) + if field.metadata: + for pyarrow_field_id_key in PYARROW_FIELD_ID_KEYS: + if field_id_str := field.metadata.get(pyarrow_field_id_key): + return int(field_id_str.decode()) return None def _get_field_doc(field: pa.Field) -> Optional[str]: - for pyarrow_doc_key in PYARROW_FIELD_DOC_KEYS: - if doc_str := field.metadata.get(pyarrow_doc_key): - return doc_str.decode() + if field.metadata: + for pyarrow_doc_key in PYARROW_FIELD_DOC_KEYS: + if doc_str := field.metadata.get(pyarrow_doc_key): + return doc_str.decode() return None -class _ConvertToIceberg(PyArrowSchemaVisitor[Union[IcebergType, Schema]]): - def _convert_fields(self, arrow_fields: Iterable[pa.Field], field_results: List[Optional[IcebergType]]) -> List[NestedField]: - fields = [] - for i, field in enumerate(arrow_fields): - field_id = _get_field_id(field) - field_doc = _get_field_doc(field) - field_type = field_results[i] - if field_type is not None and field_id is not None: - fields.append(NestedField(field_id, field.name, field_type, required=not field.nullable, doc=field_doc)) - return fields - - def schema(self, schema: pa.Schema, field_results: List[Optional[IcebergType]]) -> Schema: - return Schema(*self._convert_fields(schema, field_results)) - - def struct(self, struct: pa.StructType, field_results: List[Optional[IcebergType]]) -> IcebergType: - return StructType(*self._convert_fields(struct, field_results)) - - def list(self, list_type: pa.ListType, element_result: Optional[IcebergType]) -> Optional[IcebergType]: +class _HasIds(PyArrowSchemaVisitor[bool]): + def schema(self, schema: pa.Schema, struct_result: bool) -> bool: + return struct_result + + def struct(self, struct: pa.StructType, field_results: List[bool]) -> bool: + return all(field_results) + + def field(self, field: pa.Field, field_result: bool) -> bool: + return all([_get_field_id(field) is not None, field_result]) + + def list(self, list_type: pa.ListType, element_result: bool) -> bool: element_field = list_type.value_field element_id = _get_field_id(element_field) - if element_result is not None and element_id is not None: - return ListType(element_id, element_result, element_required=not element_field.nullable) - return None + return element_result and element_id is not None - def map( - self, map_type: pa.MapType, key_result: Optional[IcebergType], value_result: Optional[IcebergType] - ) -> Optional[IcebergType]: + def map(self, map_type: pa.MapType, key_result: bool, value_result: bool) -> bool: key_field = map_type.key_field key_id = _get_field_id(key_field) value_field = map_type.item_field value_id = _get_field_id(value_field) - if key_result is not None and value_result is not None and key_id is not None and value_id is not None: - return MapType(key_id, key_result, value_id, value_result, value_required=not value_field.nullable) - return None + return all([key_id is not None, value_id is not None, key_result, value_result]) + + def primitive(self, primitive: pa.DataType) -> bool: + return True - def primitive(self, primitive: pa.DataType) -> IcebergType: + +class _ConvertToIceberg(PyArrowSchemaVisitor[Union[IcebergType, Schema]]): + """Converts PyArrowSchema to Iceberg Schema. Applies the IDs from name_mapping if provided.""" + + field_names: List[str] + + def __init__(self, name_mapping: Optional[NameMapping] = None) -> None: + self.field_names = [] + self.name_mapping = name_mapping + + def _current_path(self) -> str: + return ".".join(self.field_names) + + def _path(self, name: str) -> str: Review Comment: Seems this is not used? ########## pyiceberg/io/pyarrow.py: ########## @@ -698,77 +708,147 @@ def before_field(self, field: pa.Field) -> None: def after_field(self, field: pa.Field) -> None: """Override this method to perform an action immediately after visiting a field.""" + def before_list_element(self, element: pa.Field) -> None: + """Override this method to perform an action immediately before visiting an element within a ListType.""" + + def after_list_element(self, element: pa.Field) -> None: + """Override this method to perform an action immediately after visiting an element within a ListType.""" + + def before_map_key(self, key: pa.Field) -> None: + """Override this method to perform an action immediately before visiting a key within a MapType.""" + + def after_map_key(self, key: pa.Field) -> None: + """Override this method to perform an action immediately after visiting a key within a MapType.""" + + def before_map_value(self, value: pa.Field) -> None: + """Override this method to perform an action immediately before visiting a value within a MapType.""" + + def after_map_value(self, value: pa.Field) -> None: + """Override this method to perform an action immediately after visiting a value within a MapType.""" + @abstractmethod - def schema(self, schema: pa.Schema, field_results: List[Optional[T]]) -> Optional[T]: + def schema(self, schema: pa.Schema, struct_result: T) -> T: """Visit a schema.""" @abstractmethod - def struct(self, struct: pa.StructType, field_results: List[Optional[T]]) -> Optional[T]: + def struct(self, struct: pa.StructType, field_results: List[T]) -> T: """Visit a struct.""" @abstractmethod - def list(self, list_type: pa.ListType, element_result: Optional[T]) -> Optional[T]: + def field(self, field: pa.Field, field_result: T) -> T: + """Visit a field.""" + + @abstractmethod + def list(self, list_type: pa.ListType, element_result: T) -> T: """Visit a list.""" @abstractmethod - def map(self, map_type: pa.MapType, key_result: Optional[T], value_result: Optional[T]) -> Optional[T]: + def map(self, map_type: pa.MapType, key_result: T, value_result: T) -> T: """Visit a map.""" @abstractmethod - def primitive(self, primitive: pa.DataType) -> Optional[T]: + def primitive(self, primitive: pa.DataType) -> T: """Visit a primitive type.""" def _get_field_id(field: pa.Field) -> Optional[int]: - for pyarrow_field_id_key in PYARROW_FIELD_ID_KEYS: - if field_id_str := field.metadata.get(pyarrow_field_id_key): - return int(field_id_str.decode()) + if field.metadata: + for pyarrow_field_id_key in PYARROW_FIELD_ID_KEYS: + if field_id_str := field.metadata.get(pyarrow_field_id_key): + return int(field_id_str.decode()) return None def _get_field_doc(field: pa.Field) -> Optional[str]: - for pyarrow_doc_key in PYARROW_FIELD_DOC_KEYS: - if doc_str := field.metadata.get(pyarrow_doc_key): - return doc_str.decode() + if field.metadata: + for pyarrow_doc_key in PYARROW_FIELD_DOC_KEYS: + if doc_str := field.metadata.get(pyarrow_doc_key): + return doc_str.decode() return None -class _ConvertToIceberg(PyArrowSchemaVisitor[Union[IcebergType, Schema]]): - def _convert_fields(self, arrow_fields: Iterable[pa.Field], field_results: List[Optional[IcebergType]]) -> List[NestedField]: - fields = [] - for i, field in enumerate(arrow_fields): - field_id = _get_field_id(field) - field_doc = _get_field_doc(field) - field_type = field_results[i] - if field_type is not None and field_id is not None: - fields.append(NestedField(field_id, field.name, field_type, required=not field.nullable, doc=field_doc)) - return fields - - def schema(self, schema: pa.Schema, field_results: List[Optional[IcebergType]]) -> Schema: - return Schema(*self._convert_fields(schema, field_results)) - - def struct(self, struct: pa.StructType, field_results: List[Optional[IcebergType]]) -> IcebergType: - return StructType(*self._convert_fields(struct, field_results)) - - def list(self, list_type: pa.ListType, element_result: Optional[IcebergType]) -> Optional[IcebergType]: +class _HasIds(PyArrowSchemaVisitor[bool]): + def schema(self, schema: pa.Schema, struct_result: bool) -> bool: + return struct_result + + def struct(self, struct: pa.StructType, field_results: List[bool]) -> bool: + return all(field_results) + + def field(self, field: pa.Field, field_result: bool) -> bool: + return all([_get_field_id(field) is not None, field_result]) + + def list(self, list_type: pa.ListType, element_result: bool) -> bool: element_field = list_type.value_field element_id = _get_field_id(element_field) - if element_result is not None and element_id is not None: - return ListType(element_id, element_result, element_required=not element_field.nullable) - return None + return element_result and element_id is not None - def map( - self, map_type: pa.MapType, key_result: Optional[IcebergType], value_result: Optional[IcebergType] - ) -> Optional[IcebergType]: + def map(self, map_type: pa.MapType, key_result: bool, value_result: bool) -> bool: key_field = map_type.key_field key_id = _get_field_id(key_field) value_field = map_type.item_field value_id = _get_field_id(value_field) - if key_result is not None and value_result is not None and key_id is not None and value_id is not None: - return MapType(key_id, key_result, value_id, value_result, value_required=not value_field.nullable) - return None + return all([key_id is not None, value_id is not None, key_result, value_result]) + + def primitive(self, primitive: pa.DataType) -> bool: + return True - def primitive(self, primitive: pa.DataType) -> IcebergType: + +class _ConvertToIceberg(PyArrowSchemaVisitor[Union[IcebergType, Schema]]): + """Converts PyArrowSchema to Iceberg Schema. Applies the IDs from name_mapping if provided.""" + + field_names: List[str] + + def __init__(self, name_mapping: Optional[NameMapping] = None) -> None: + self.field_names = [] + self.name_mapping = name_mapping + + def _current_path(self) -> str: + return ".".join(self.field_names) + + def _path(self, name: str) -> str: + return ".".join(self.field_names + [name]) + + def _get_field_id(self, field: pa.Field) -> int: + if self.name_mapping: + return self.name_mapping.find(self._current_path()).field_id + elif field_id := _get_field_id(field): + return field_id + else: + raise ValueError(f"Cannot convert {field} to Iceberg Field as field_id is empty.") + + def schema(self, schema: pa.Schema, struct_result: StructType) -> Schema: + return Schema(*struct_result.fields) + + def struct(self, struct: pa.StructType, field_results: List[NestedField]) -> StructType: + return StructType(*field_results) + + def field(self, field: pa.Field, field_result: IcebergType) -> NestedField: + field_id = self._get_field_id(field) + field_doc = _get_field_doc(field) + field_type = field_result + if field_type is None: + raise ValueError(f"Cannot convert {field} to Iceberg Field as field_type is empty.") Review Comment: Quick question: Could `field_type` ever be None, given that we raise `TypeError` for unsupported types? If so, should we consider marking `field_result` as `Optional[IcebergType]`? ########## pyiceberg/io/pyarrow.py: ########## @@ -698,77 +708,147 @@ def before_field(self, field: pa.Field) -> None: def after_field(self, field: pa.Field) -> None: """Override this method to perform an action immediately after visiting a field.""" + def before_list_element(self, element: pa.Field) -> None: + """Override this method to perform an action immediately before visiting an element within a ListType.""" + + def after_list_element(self, element: pa.Field) -> None: + """Override this method to perform an action immediately after visiting an element within a ListType.""" + + def before_map_key(self, key: pa.Field) -> None: + """Override this method to perform an action immediately before visiting a key within a MapType.""" + + def after_map_key(self, key: pa.Field) -> None: + """Override this method to perform an action immediately after visiting a key within a MapType.""" + + def before_map_value(self, value: pa.Field) -> None: + """Override this method to perform an action immediately before visiting a value within a MapType.""" + + def after_map_value(self, value: pa.Field) -> None: + """Override this method to perform an action immediately after visiting a value within a MapType.""" + @abstractmethod - def schema(self, schema: pa.Schema, field_results: List[Optional[T]]) -> Optional[T]: + def schema(self, schema: pa.Schema, struct_result: T) -> T: """Visit a schema.""" @abstractmethod - def struct(self, struct: pa.StructType, field_results: List[Optional[T]]) -> Optional[T]: + def struct(self, struct: pa.StructType, field_results: List[T]) -> T: """Visit a struct.""" @abstractmethod - def list(self, list_type: pa.ListType, element_result: Optional[T]) -> Optional[T]: + def field(self, field: pa.Field, field_result: T) -> T: + """Visit a field.""" + + @abstractmethod + def list(self, list_type: pa.ListType, element_result: T) -> T: """Visit a list.""" @abstractmethod - def map(self, map_type: pa.MapType, key_result: Optional[T], value_result: Optional[T]) -> Optional[T]: + def map(self, map_type: pa.MapType, key_result: T, value_result: T) -> T: """Visit a map.""" @abstractmethod - def primitive(self, primitive: pa.DataType) -> Optional[T]: + def primitive(self, primitive: pa.DataType) -> T: """Visit a primitive type.""" def _get_field_id(field: pa.Field) -> Optional[int]: - for pyarrow_field_id_key in PYARROW_FIELD_ID_KEYS: - if field_id_str := field.metadata.get(pyarrow_field_id_key): - return int(field_id_str.decode()) + if field.metadata: + for pyarrow_field_id_key in PYARROW_FIELD_ID_KEYS: + if field_id_str := field.metadata.get(pyarrow_field_id_key): + return int(field_id_str.decode()) return None def _get_field_doc(field: pa.Field) -> Optional[str]: - for pyarrow_doc_key in PYARROW_FIELD_DOC_KEYS: - if doc_str := field.metadata.get(pyarrow_doc_key): - return doc_str.decode() + if field.metadata: + for pyarrow_doc_key in PYARROW_FIELD_DOC_KEYS: + if doc_str := field.metadata.get(pyarrow_doc_key): + return doc_str.decode() return None -class _ConvertToIceberg(PyArrowSchemaVisitor[Union[IcebergType, Schema]]): - def _convert_fields(self, arrow_fields: Iterable[pa.Field], field_results: List[Optional[IcebergType]]) -> List[NestedField]: - fields = [] - for i, field in enumerate(arrow_fields): - field_id = _get_field_id(field) - field_doc = _get_field_doc(field) - field_type = field_results[i] - if field_type is not None and field_id is not None: - fields.append(NestedField(field_id, field.name, field_type, required=not field.nullable, doc=field_doc)) - return fields - - def schema(self, schema: pa.Schema, field_results: List[Optional[IcebergType]]) -> Schema: - return Schema(*self._convert_fields(schema, field_results)) - - def struct(self, struct: pa.StructType, field_results: List[Optional[IcebergType]]) -> IcebergType: - return StructType(*self._convert_fields(struct, field_results)) - - def list(self, list_type: pa.ListType, element_result: Optional[IcebergType]) -> Optional[IcebergType]: +class _HasIds(PyArrowSchemaVisitor[bool]): + def schema(self, schema: pa.Schema, struct_result: bool) -> bool: + return struct_result + + def struct(self, struct: pa.StructType, field_results: List[bool]) -> bool: + return all(field_results) + + def field(self, field: pa.Field, field_result: bool) -> bool: + return all([_get_field_id(field) is not None, field_result]) + + def list(self, list_type: pa.ListType, element_result: bool) -> bool: element_field = list_type.value_field element_id = _get_field_id(element_field) - if element_result is not None and element_id is not None: - return ListType(element_id, element_result, element_required=not element_field.nullable) - return None + return element_result and element_id is not None - def map( - self, map_type: pa.MapType, key_result: Optional[IcebergType], value_result: Optional[IcebergType] - ) -> Optional[IcebergType]: + def map(self, map_type: pa.MapType, key_result: bool, value_result: bool) -> bool: key_field = map_type.key_field key_id = _get_field_id(key_field) value_field = map_type.item_field value_id = _get_field_id(value_field) - if key_result is not None and value_result is not None and key_id is not None and value_id is not None: - return MapType(key_id, key_result, value_id, value_result, value_required=not value_field.nullable) - return None + return all([key_id is not None, value_id is not None, key_result, value_result]) + + def primitive(self, primitive: pa.DataType) -> bool: + return True - def primitive(self, primitive: pa.DataType) -> IcebergType: + +class _ConvertToIceberg(PyArrowSchemaVisitor[Union[IcebergType, Schema]]): + """Converts PyArrowSchema to Iceberg Schema. Applies the IDs from name_mapping if provided.""" + + field_names: List[str] + Review Comment: ```suggestion name_mapping: Optional[NameMapping] ``` Shall we add `name_mapping` here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org