jqin61 commented on code in PR #555: URL: https://github.com/apache/iceberg-python/pull/555#discussion_r1544785650
########## pyiceberg/table/__init__.py: ########## @@ -3108,3 +3138,127 @@ def snapshots(self) -> "pa.Table": snapshots, schema=snapshots_schema, ) + + +@dataclass(frozen=True) +class TablePartition: + partition_key: PartitionKey + arrow_table_partition: pa.Table + + +def _get_partition_sort_order(partition_columns: list[str], reverse: bool = False) -> dict[str, Any]: + order = 'ascending' if not reverse else 'descending' + null_placement = 'at_start' if reverse else 'at_end' + return {'sort_keys': [(column_name, order) for column_name in partition_columns], 'null_placement': null_placement} + + +def group_by_partition_scheme( + iceberg_table_metadata: TableMetadata, arrow_table: pa.Table, partition_columns: list[str] +) -> pa.Table: + """Given a table sort it by current partition scheme with all transform functions supported.""" + from pyiceberg.transforms import IdentityTransform + + supported = {IdentityTransform} + if not all( + type(field.transform) in supported for field in iceberg_table_metadata.spec().fields if field in partition_columns + ): + raise ValueError( + f"Not all transforms are supported, get: {[transform in supported for transform in iceberg_table_metadata.spec().fields]}." + ) + + # only works for identity + sort_options = _get_partition_sort_order(partition_columns, reverse=False) + sorted_arrow_table = arrow_table.sort_by(sorting=sort_options['sort_keys'], null_placement=sort_options['null_placement']) + return sorted_arrow_table + + +def get_partition_columns(iceberg_table_metadata: TableMetadata, arrow_table: pa.Table) -> list[str]: + arrow_table_cols = set(arrow_table.column_names) + partition_cols = [] + for transform_field in iceberg_table_metadata.spec().fields: + column_name = iceberg_table_metadata.schema().find_column_name(transform_field.source_id) + if not column_name: + raise ValueError(f"{transform_field=} could not be found in {iceberg_table_metadata.schema()}.") + if column_name not in arrow_table_cols: + continue + partition_cols.append(column_name) + return partition_cols + + +def _get_table_partitions( + arrow_table: pa.Table, + partition_spec: PartitionSpec, + schema: Schema, + slice_instructions: list[dict[str, Any]], +) -> list[TablePartition]: + sorted_slice_instructions = sorted(slice_instructions, key=lambda x: x['offset']) + + partition_fields = partition_spec.fields + + offsets = [inst["offset"] for inst in sorted_slice_instructions] + projected_and_filtered = { + partition_field.source_id: arrow_table[schema.find_field(name_or_id=partition_field.source_id).name] + .take(offsets) + .to_pylist() + for partition_field in partition_fields + } + + table_partitions = [] + for inst in sorted_slice_instructions: + partition_slice = arrow_table.slice(**inst) + fieldvalues = [ + PartitionFieldValue(partition_field, projected_and_filtered[partition_field.source_id][inst["offset"]]) + for partition_field in partition_fields + ] + partition_key = PartitionKey(raw_partition_field_values=fieldvalues, partition_spec=partition_spec, schema=schema) + table_partitions.append(TablePartition(partition_key=partition_key, arrow_table_partition=partition_slice)) + + return table_partitions + + +def partition(iceberg_table_metadata: TableMetadata, arrow_table: pa.Table) -> Iterable[TablePartition]: Review Comment: yep totally agree, I updated in the later commit of the original PR with the exact changes you mentioned and also added unit tests around this function. Let me pull in those changes -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org