Copilot commented on code in PR #64611: URL: https://github.com/apache/airflow/pull/64611#discussion_r3070046868
########## airflow-core/src/airflow/migrations/versions/0111_3_3_0_add_gin_index_on_asset_event_extra.py: ########## @@ -0,0 +1,57 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +Add GIN index on asset_event.extra for PostgreSQL. + +Revision ID: 5a5d3253e946 +Revises: a4c2d171ae18 +Create Date: 2026-04-01 23:00:00.000000 + +""" + +from __future__ import annotations + +from alembic import op +from sqlalchemy import text + +# revision identifiers, used by Alembic. +revision = "5a5d3253e946" +down_revision = "a4c2d171ae18" +branch_labels = None +depends_on = None +airflow_version = "3.3.0" + + +def upgrade(): + """Add GIN index on asset_event.extra for PostgreSQL only.""" + conn = op.get_bind() + if conn.dialect.name == "postgresql": + op.execute( Review Comment: This migration uses `op.get_bind()` to check the dialect. In Alembic offline mode (SQL-only), there is no bind/connection, so this can fail. Use `op.get_context().dialect.name` (or similar context-based dialect detection) instead, and apply the same pattern in `downgrade()`. ########## task-sdk/src/airflow/sdk/api/client.py: ########## @@ -683,15 +684,20 @@ def get( if before: common_params["before"] = before.isoformat() common_params["ascending"] = ascending - if limit: + if limit is not None: common_params["limit"] = limit + extra_params: list[tuple[str, str]] = [] + if extra: + extra_params = [("extra", f"{k}={v}") for k, v in extra.items()] if name or uri: resp = self.client.get( - "asset-events/by-asset", params={"name": name, "uri": uri, **common_params} + "asset-events/by-asset", + params=[*{"name": name, "uri": uri, **common_params}.items(), *extra_params], ) elif alias_name: resp = self.client.get( - "asset-events/by-asset-alias", params={"name": alias_name, **common_params} + "asset-events/by-asset-alias", + params=[*{"name": alias_name, **common_params}.items(), *extra_params], ) Review Comment: `params` is built from a dict that may contain `None` values for `name`/`uri`. Depending on httpx behavior this can either serialize as the string "None" or raise a type error; either way it’s safer to omit keys with `None` values (similar to how other client methods filter `None` params). Consider filtering `{"name": name, "uri": uri, ...}` before calling `.items()`. ########## airflow-core/src/airflow/utils/sqlalchemy.py: ########## @@ -56,6 +58,56 @@ def get_dialect_name(session: Session) -> str | None: return getattr(bind.dialect, "name", None) +class JsonContains(ColumnElement): + """ + Dialect-aware JSON containment check. + + Compiles to ``@>`` on PostgreSQL (GIN-indexable), ``JSON_CONTAINS`` on + MySQL, and per-key ``json_extract`` comparisons on SQLite. + + All dialects use bound parameters to avoid SQL injection. + """ + + inherit_cache = False + type = NullType() + + def __init__(self, column, kv_dict: dict[str, str]): + self.column = column + self.kv_dict = kv_dict + + +@compiles(JsonContains, "postgresql") +def _pg_json_contains(element, compiler, **kw): + from sqlalchemy import cast, literal + + col = cast(element.column, JSONB) + param = literal(json.dumps(element.kv_dict)).cast(JSONB) + expr = col.contains(param) + return compiler.process(expr, **kw) + + +@compiles(JsonContains, "mysql") +def _mysql_json_contains(element, compiler, **kw): + from sqlalchemy import bindparam, func + + param = bindparam(None, json.dumps(element.kv_dict), expanding=False) + expr = func.JSON_CONTAINS(element.column, param) + return compiler.process(expr == 1, **kw) + + +@compiles(JsonContains) +def _default_json_contains(element, compiler, **kw): + from sqlalchemy import and_, func, literal + Review Comment: Avoid `from ... import ...` inside these compilation functions. These are core code paths and the project guideline is to keep imports at module scope unless there’s a clear circular/lazy-loading reason; moving these SQLAlchemy imports to the top of the module will make this easier to maintain and consistent with the rest of the file. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
