gabeiglio commented on code in PR #1443: URL: https://github.com/apache/iceberg-python/pull/1443#discussion_r1941979863
########## pyiceberg/io/pyarrow.py: ########## @@ -1216,6 +1218,45 @@ def _field_id(self, field: pa.Field) -> int: return -1 +def _get_column_projection_values( + file: DataFile, projected_schema: Schema, partition_spec: Optional[PartitionSpec], file_project_field_ids: Set[int] +) -> Tuple[bool, Dict[str, Any]]: + """Apply Column Projection rules to File Schema.""" + project_schema_diff = projected_schema.field_ids.difference(file_project_field_ids) + should_project_columns = len(project_schema_diff) > 0 + projected_missing_fields: Dict[str, Any] = {} + + if not should_project_columns: + return False, {} + + partition_schema: StructType + accessors: Dict[int, Accessor] + + if partition_spec is not None: + partition_schema = partition_spec.partition_type(projected_schema) + accessors = build_position_accessors(partition_schema) + else: + return False, {} + + for field_id in project_schema_diff: + for partition_field in partition_spec.fields_by_source_id(field_id): + if isinstance(partition_field.transform, IdentityTransform): + accesor = accessors.get(partition_field.field_id) + + if accesor is None: Review Comment: fixed ########## pyiceberg/io/pyarrow.py: ########## @@ -1216,6 +1218,45 @@ def _field_id(self, field: pa.Field) -> int: return -1 +def _get_column_projection_values( + file: DataFile, projected_schema: Schema, partition_spec: Optional[PartitionSpec], file_project_field_ids: Set[int] +) -> Tuple[bool, Dict[str, Any]]: + """Apply Column Projection rules to File Schema.""" + project_schema_diff = projected_schema.field_ids.difference(file_project_field_ids) + should_project_columns = len(project_schema_diff) > 0 + projected_missing_fields: Dict[str, Any] = {} + + if not should_project_columns: + return False, {} + + partition_schema: StructType + accessors: Dict[int, Accessor] + + if partition_spec is not None: + partition_schema = partition_spec.partition_type(projected_schema) + accessors = build_position_accessors(partition_schema) + else: + return False, {} + + for field_id in project_schema_diff: + for partition_field in partition_spec.fields_by_source_id(field_id): + if isinstance(partition_field.transform, IdentityTransform): + accesor = accessors.get(partition_field.field_id) + + if accesor is None: + continue + + # The partition field may not exist in the partition record of the data file. + # This can happen when new partition fields are introduced after the file was written. + try: + if partition_value := accesor.get(file.partition): Review Comment: fixed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org