syun64 commented on code in PR #902: URL: https://github.com/apache/iceberg-python/pull/902#discussion_r1672619363
########## pyiceberg/io/pyarrow.py: ########## @@ -1271,54 +1274,62 @@ def project_batches( def to_requested_schema( - requested_schema: Schema, file_schema: Schema, batch: pa.RecordBatch, downcast_ns_timestamp_to_us: bool = False + requested_schema: Schema, + file_schema: Schema, + batch: pa.RecordBatch, + downcast_ns_timestamp_to_us: bool = False, + include_field_ids: bool = False, ) -> pa.RecordBatch: + # We could re-use some of these visitors struct_array = visit_with_partner( - requested_schema, batch, ArrowProjectionVisitor(file_schema, downcast_ns_timestamp_to_us), ArrowAccessor(file_schema) + requested_schema, + batch, + ArrowProjectionVisitor(file_schema, downcast_ns_timestamp_to_us, include_field_ids), + ArrowAccessor(file_schema), ) - - arrays = [] - fields = [] - for pos, field in enumerate(requested_schema.fields): - array = struct_array.field(pos) - arrays.append(array) - fields.append(pa.field(field.name, array.type, field.optional)) - return pa.RecordBatch.from_arrays(arrays, schema=pa.schema(fields)) + return pa.RecordBatch.from_struct_array(struct_array) class ArrowProjectionVisitor(SchemaWithPartnerVisitor[pa.Array, Optional[pa.Array]]): file_schema: Schema + _include_field_ids: bool - def __init__(self, file_schema: Schema, downcast_ns_timestamp_to_us: bool = False): + def __init__(self, file_schema: Schema, downcast_ns_timestamp_to_us: bool = False, include_field_ids: bool = False) -> None: self.file_schema = file_schema + self._include_field_ids = include_field_ids self.downcast_ns_timestamp_to_us = downcast_ns_timestamp_to_us def _cast_if_needed(self, field: NestedField, values: pa.Array) -> pa.Array: file_field = self.file_schema.find_field(field.field_id) + if field.field_type.is_primitive: if field.field_type != file_field.field_type: - return values.cast(schema_to_pyarrow(promote(file_field.field_type, field.field_type), include_field_ids=False)) - elif (target_type := schema_to_pyarrow(field.field_type, include_field_ids=False)) != values.type: - # if file_field and field_type (e.g. String) are the same - # but the pyarrow type of the array is different from the expected type - # (e.g. string vs larger_string), we want to cast the array to the larger type - safe = True + return values.cast( + schema_to_pyarrow(promote(file_field.field_type, field.field_type), include_field_ids=self._include_field_ids) + ) + elif (target_type := schema_to_pyarrow(field.field_type, include_field_ids=True)) != values.type: + # Downcasting of nanoseconds to microseconds if ( pa.types.is_timestamp(target_type) and target_type.unit == "us" and pa.types.is_timestamp(values.type) and values.type.unit == "ns" ): - safe = False - return values.cast(target_type, safe=safe) + return values.cast(target_type, safe=False) return values def _construct_field(self, field: NestedField, arrow_type: pa.DataType) -> pa.Field: + metadata = {} + if field.doc: + metadata[PYARROW_FIELD_DOC_KEY] = field.doc + if self._include_field_ids: + metadata[PYARROW_PARQUET_FIELD_ID_KEY] = str(field.field_id) Review Comment: Ah good catch on this one as well 👍 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org