ashb commented on code in PR #64845: URL: https://github.com/apache/airflow/pull/64845#discussion_r3057737563
########## airflow-core/src/airflow/api_fastapi/common/cursors.py: ########## @@ -0,0 +1,128 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +""" +Cursor-based (keyset) pagination helpers. + +:meta private: +""" + +from __future__ import annotations + +import base64 +import json +import uuid as uuid_mod +from datetime import datetime +from typing import TYPE_CHECKING, Any + +from fastapi import HTTPException, status +from sqlalchemy import and_, or_ + +if TYPE_CHECKING: + from sqlalchemy.sql import Select + + from airflow.api_fastapi.common.parameters import SortParam + + +def _encode_value(val: Any) -> dict[str, Any]: + """Encode a single Python value as a typed {"type": ..., "value": ...} object.""" + if val is None: + return {"type": "null", "value": None} + if isinstance(val, uuid_mod.UUID): + return {"type": "uuid", "value": str(val)} + if isinstance(val, datetime): + return {"type": "datetime", "value": val.isoformat()} + if isinstance(val, int): + return {"type": "int", "value": val} + return {"type": "str", "value": str(val)} + + +def _decode_value(entry: dict[str, Any]) -> Any: + """Decode a typed cursor entry back to its Python value.""" + type_tag = entry["type"] + raw = entry["value"] + if type_tag == "null": + return None + if type_tag == "uuid": + return uuid_mod.UUID(str(raw)) + if type_tag == "datetime": + return datetime.fromisoformat(str(raw)) + if type_tag == "int": + return int(raw) + return str(raw) + + +def encode_cursor(row: Any, sort_param: SortParam) -> str: + """ + Encode cursor token from the last row of a result set. + + The token is a base64url-encoded JSON list of typed objects, each + containing ``{"type": "<tag>", "value": <serialized>}`` so the + cursor is self-describing and can be decoded without column metadata. + """ + resolved = sort_param.get_resolved_columns() + if not resolved: + raise ValueError("SortParam has no resolved columns.") + + entries = [_encode_value(getattr(row, attr_name, None)) for attr_name, _col, _desc in resolved] + return base64.urlsafe_b64encode(json.dumps(entries).encode()).decode() + + +def decode_cursor(token: str) -> list[dict[str, Any]]: + """Decode a cursor token and return the list of typed value entries.""" + try: + data = json.loads(base64.urlsafe_b64decode(token)) + except Exception: + raise HTTPException(status.HTTP_400_BAD_REQUEST, "Invalid cursor token") + + if not isinstance(data, list) or any( + not isinstance(entry, dict) or "type" not in entry or "value" not in entry for entry in data + ): + raise HTTPException(status.HTTP_400_BAD_REQUEST, "Invalid cursor token structure") + + return data + + +def apply_cursor_filter(statement: Select, cursor: str, sort_param: SortParam) -> Select: + """ + Apply a keyset pagination WHERE clause from a cursor token. + + Builds a composite comparison that respects mixed ASC/DESC ordering + on the resolved sort columns. + """ + cursor_entries = decode_cursor(cursor) + + resolved = sort_param.get_resolved_columns() + if len(cursor_entries) != len(resolved): + raise HTTPException(status.HTTP_400_BAD_REQUEST, "Cursor token does not match current query shape") + + parsed_values = [_decode_value(entry) for entry in cursor_entries] + + # Build the keyset WHERE clause for mixed ASC/DESC ordering. + # For columns (c1 ASC, c2 DESC, c3 ASC) with cursor values (v1, v2, v3): + # (c1 > v1) OR + # (c1 = v1 AND c2 < v2) OR + # (c1 = v1 AND c2 = v2 AND c3 > v3) + or_clauses = [] + for i, (_, col, is_desc) in enumerate(resolved): + eq_conditions = [resolved[j][1] == parsed_values[j] for j in range(i)] + if is_desc: + bound = col < parsed_values[i] + else: + bound = col > parsed_values[i] + or_clauses.append(and_(*eq_conditions, bound)) Review Comment: I've played this game before. This query will miss items between pages (or sometimes duplicate them) when sorting by a column. From my (non-sharable, sorry) impl of this: ```python # Example query this builds: # # Dag.dag_id >= :dag_id # # with multiple columns # # and_(Dag.next_run_at >= :run_at, or_(Dag.next_run_at > :run_at, Dag.dag_id >= :dag_id)) # # Or when sorting by -next_dagrun, dag_id: # # and_( # Dag.next_dagrun <= :next_dagrun, # or_(Dag.next_dagrun < :next_dagrun, Dag.dag_id >= :dag_id) # ) ``` The key part there is the `<=` sometimes, and `<` others. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
