kevinjqliu commented on code in PR #1500: URL: https://github.com/apache/iceberg-python/pull/1500#discussion_r1957416511
########## pyiceberg/table/update/sorting.py: ########## @@ -0,0 +1,117 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from typing import TYPE_CHECKING, Any, List, Tuple + +from pyiceberg.table.sorting import NullOrder, SortDirection, SortField, SortOrder +from pyiceberg.table.update import ( + AddSortOrderUpdate, + AssertDefaultSortOrderId, + SetDefaultSortOrderUpdate, + TableRequirement, + TableUpdate, + UpdatesAndRequirements, + UpdateTableMetadata, +) +from pyiceberg.transforms import Transform + +if TYPE_CHECKING: + from pyiceberg.table import Transaction + + +class UpdateSortOrder(UpdateTableMetadata["UpdateSortOrder"]): + _transaction: Transaction + _last_assigned_order_id: int + _case_sensitive: bool + _fields: List[SortField] + _last_sort_order_id: int + + def __init__(self, transaction: Transaction, case_sensitive: bool = True) -> None: + super().__init__(transaction) + self._fields: List[SortField] = [] + self._case_sensitive: bool = case_sensitive + self._last_sort_order_id: int = transaction.table_metadata.default_sort_order_id + + def _column_name_to_id(self, column_name: str) -> int: + """Map the column name to the column field id.""" + return ( + self._transaction.table_metadata.schema() + .find_field( + name_or_id=column_name, + case_sensitive=self._case_sensitive, + ) + .field_id + ) + + def _add_sort_field( + self, + source_id: int, + transform: Transform[Any, Any], + direction: SortDirection, + null_order: NullOrder, + ) -> UpdateSortOrder: + """Add a sort field to the sort order list.""" + self._fields.append( + SortField( + source_id=source_id, + transform=transform, + direction=direction, + null_order=null_order, + ) + ) + return self + + def asc(self, source_column_name: str, transform: Transform[Any, Any], null_order: NullOrder = NullOrder.NULLS_LAST) -> UpdateSortOrder: + """Add a sort field with ascending order.""" + return self._add_sort_field( + source_id=self._column_name_to_id(source_column_name), + transform=transform, + direction=SortDirection.ASC, + null_order=null_order, + ) + + def desc( + self, source_column_name: str, transform: Transform[Any, Any], null_order: NullOrder = NullOrder.NULLS_LAST + ) -> UpdateSortOrder: + """Add a sort field with descending order.""" + return self._add_sort_field( + source_id=self._column_name_to_id(source_column_name), + transform=transform, + direction=SortDirection.DESC, + null_order=null_order, + ) + + def _apply(self) -> SortOrder: + """Return the sort order.""" + return SortOrder(*self._fields, order_id=self._last_sort_order_id + 1) + + def _commit(self) -> UpdatesAndRequirements: Review Comment: this implementation defers from the java one. heres the code path for the java implementation [1](https://github.com/apache/iceberg/blob/71493b92dc2e0b953c184f76ad76e7f8794da8b1/core/src/main/java/org/apache/iceberg/TableMetadata.java#L1199-L1234), [2](https://github.com/apache/iceberg/blob/71493b92dc2e0b953c184f76ad76e7f8794da8b1/core/src/main/java/org/apache/iceberg/TableMetadata.java#L1729-L1779) In particular, the java implementation tries to retry sort order whenever possible. i dont think `self._last_sort_order_id: int = transaction.table_metadata.default_sort_order_id` and then using `order_id=self._last_sort_order_id + 1` is correct since `default_sort_order_id` might be always be the highest sort order. `_last_sort_order_id` should default to null and only changed when a new sort order is added. ########## tests/integration/test_sort_order_update.py: ########## @@ -0,0 +1,99 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# pylint:disable=redefined-outer-name + +import pytest + +from pyiceberg.catalog import Catalog +from pyiceberg.exceptions import NoSuchTableError +from pyiceberg.schema import Schema +from pyiceberg.table import Table +from pyiceberg.table.sorting import NullOrder, SortDirection, SortField, SortOrder +from pyiceberg.table.update.sorting import SortOrderBuilder +from pyiceberg.transforms import ( + IdentityTransform, +) +from pyiceberg.types import ( + LongType, + NestedField, + StringType, + TimestampType, +) + + +def _simple_table(catalog: Catalog, table_schema_simple: Schema) -> Table: + return _create_table_with_schema(catalog, table_schema_simple, "1") + + +def _table(catalog: Catalog) -> Table: + schema_with_timestamp = Schema( + NestedField(1, "id", LongType(), required=False), + NestedField(2, "event_ts", TimestampType(), required=False), + NestedField(3, "str", StringType(), required=False), + ) + return _create_table_with_schema(catalog, schema_with_timestamp, "1") + + +def _table_v2(catalog: Catalog) -> Table: + schema_with_timestamp = Schema( + NestedField(1, "id", LongType(), required=False), + NestedField(2, "event_ts", TimestampType(), required=False), + NestedField(3, "str", StringType(), required=False), + ) + return _create_table_with_schema(catalog, schema_with_timestamp, "2") + + +def _create_table_with_schema(catalog: Catalog, schema: Schema, format_version: str) -> Table: + tbl_name = "default.test_schema_evolution" + try: + catalog.drop_table(tbl_name) + except NoSuchTableError: + pass + return catalog.create_table(identifier=tbl_name, schema=schema, properties={"format-version": format_version}) + + +@pytest.mark.integration +def test_sort_order_builder() -> None: + builder = SortOrderBuilder(last_sort_order_id=0) + builder.add_sort_field(1, IdentityTransform(), SortDirection.ASC, NullOrder.NULLS_FIRST) + builder.add_sort_field(2, IdentityTransform(), SortDirection.DESC, NullOrder.NULLS_LAST) + assert builder.sort_order == SortOrder( + SortField(1, IdentityTransform(), SortDirection.ASC, NullOrder.NULLS_FIRST), + SortField(2, IdentityTransform(), SortDirection.DESC, NullOrder.NULLS_LAST), + ) + + +@pytest.mark.integration +@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog"), pytest.lazy_fixture("session_catalog_hive")]) +def test_map_column_name_to_id(catalog: Catalog, table_schema_simple: Schema) -> None: + simple_table = _simple_table(catalog, table_schema_simple) + for col_name, col_id in {"foo": 1, "bar": 2, "baz": 3}.items(): + assert col_id == simple_table.replace_sort_order()._column_name_to_id(col_name) + + +@pytest.mark.integration +@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog"), pytest.lazy_fixture("session_catalog_hive")]) +def test_replace_sort_order(catalog: Catalog, table_schema_simple: Schema) -> None: + simple_table = _simple_table(catalog, table_schema_simple) + simple_table.replace_sort_order().asc("foo", IdentityTransform(), NullOrder.NULLS_FIRST).desc( + "bar", IdentityTransform(), NullOrder.NULLS_LAST + ).commit() + assert simple_table.sort_order() == SortOrder( + SortField(source_id=1, transform=IdentityTransform(), direction=SortDirection.ASC, null_order=NullOrder.NULLS_FIRST), + SortField(source_id=2, transform=IdentityTransform(), direction=SortDirection.DESC, null_order=NullOrder.NULLS_LAST), + order_id=1, + ) Review Comment: modify an existing one. but i see you added both. thanks! ########## mkdocs/docs/api.md: ########## @@ -1194,6 +1194,24 @@ with table.update_spec() as update: update.rename_field("bucketed_id", "sharded_id") ``` +## Sort order updates + +Users can update the sort order on existing tables for new data. See [sorting](https://iceberg.apache.org/spec/#sorting) for more details. + +The API to use when updating a sort order is the `update_sort_order` API on the table. + +Sort orders can only be updated by adding a new sort order. They cannot be deleted or modified. Review Comment: im 90% sure this is right, but want to call it out and see if others agree. cc @fokko ########## pyiceberg/table/update/sorting.py: ########## @@ -0,0 +1,117 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from typing import TYPE_CHECKING, Any, List, Tuple + +from pyiceberg.table.sorting import NullOrder, SortDirection, SortField, SortOrder +from pyiceberg.table.update import ( + AddSortOrderUpdate, + AssertDefaultSortOrderId, + SetDefaultSortOrderUpdate, + TableRequirement, + TableUpdate, + UpdatesAndRequirements, + UpdateTableMetadata, +) +from pyiceberg.transforms import Transform + +if TYPE_CHECKING: + from pyiceberg.table import Transaction + + +class UpdateSortOrder(UpdateTableMetadata["UpdateSortOrder"]): + _transaction: Transaction + _last_assigned_order_id: int Review Comment: this is not used ```suggestion ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org