Copilot commented on code in PR #63978:
URL: https://github.com/apache/airflow/pull/63978#discussion_r3025333657


##########
airflow-core/src/airflow/migrations/versions/0101_3_2_0_ui_improvements_for_deadlines.py:
##########
@@ -421,31 +549,33 @@ def 
migrate_existing_deadline_alert_data_from_serialized_dag() -> None:
         if dialect == "mysql":
             # Avoid selecting large columns during ORDER BY to prevent sort 
buffer overflow
             result = conn.execute(
-                sa.text("""
+                sa.text(f"""
                     SELECT sd.id, sd.dag_id, sd.data, sd.data_compressed, 
sd.created_at
                     FROM serialized_dag sd
                     INNER JOIN (
-                        SELECT id, dag_id
+                        SELECT id
                         FROM serialized_dag
-                        WHERE dag_id > :last_dag_id
-                        ORDER BY dag_id
+                        WHERE id > :last_id
+                        {deadline_filter}
+                        ORDER BY id
                         LIMIT :batch_size
                     ) AS subq ON sd.id = subq.id
                 """),
-                {"last_dag_id": last_dag_id, "batch_size": BATCH_SIZE},
+                {"last_id": last_id, "batch_size": BATCH_SIZE},
             )
-            batch_results = sorted(result, key=lambda r: r.dag_id)
+
+            batch_results = sorted(list(result), key=lambda r: r.id)

Review Comment:
   The Python-side `sorted(list(result), ...)` adds extra memory and CPU 
overhead per batch. Since ordering is needed only to ensure stable pagination, 
prefer adding `ORDER BY sd.id` to the outer query and then use `batch_results = 
list(result)` (removing the in-Python sort).



##########
airflow-core/tests/unit/migrations/test_0101_ui_improvements_for_deadlines.py:
##########
@@ -0,0 +1,112 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import importlib.util
+from pathlib import Path
+
+from sqlalchemy.exc import OperationalError
+
+from tests_common.test_utils.paths import AIRFLOW_CORE_SOURCES_PATH
+
+MIGRATION_PATH = Path(
+    AIRFLOW_CORE_SOURCES_PATH,
+    "airflow/migrations/versions/0101_3_2_0_ui_improvements_for_deadlines.py",
+)
+
+
+def load_migration_module():
+    spec = importlib.util.spec_from_file_location(
+        "migration_0101_ui_improvements_for_deadlines", MIGRATION_PATH
+    )
+    module = importlib.util.module_from_spec(spec)
+    assert spec.loader is not None

Review Comment:
   `importlib.util.spec_from_file_location(...)` can return `None`, which would 
raise an `AttributeError` at `spec.loader` before the assertion runs. Add an 
assertion that `spec is not None` (or combine into a single `assert spec is not 
None and spec.loader is not None`) before calling `module_from_spec`.
   ```suggestion
       assert spec is not None and spec.loader is not None
       module = importlib.util.module_from_spec(spec)
   ```



##########
airflow-core/src/airflow/migrations/versions/0101_3_2_0_ui_improvements_for_deadlines.py:
##########
@@ -396,18 +397,145 @@ def 
migrate_existing_deadline_alert_data_from_serialized_dag() -> None:
         )
         return
 
+    _migrate_deadline_alerts()
+
+
+def _backfill_deadline_alert_ids(
+    conn: Connection,
+    dag_first_alert_ids: dict[str, str],
+    batch_size: int,
+    dags_with_errors: ErrorDict,
+) -> int:
+    """Populate deadline.deadline_alert_id in primary-key order to avoid 
dagrun_id table scans."""
+    if not dag_first_alert_ids:
+        log.info("No deadline alerts migrated; skipping deadline backfill")
+        return 0
+
+    last_deadline_id = "00000000-0000-0000-0000-000000000000"
+    updated_deadlines = 0
+    missing_alert_dag_ids: set[str] = set()
+
+    total_deadlines = conn.execute(
+        sa.text("SELECT COUNT(*) FROM deadline WHERE deadline_alert_id IS 
NULL")
+    ).scalar()
+    total_batches = (total_deadlines + batch_size - 1) // batch_size
+    batch_num = 0
+
+    log.info(
+        "Starting deadline backfill",
+        batch_size=batch_size,
+        total_deadlines=total_deadlines,
+        total_batches=total_batches,
+    )
+
+    while True:
+        batch_num += 1
+        batch_rows = list(
+            conn.execute(
+                sa.text("""
+                    SELECT d.id, dr.dag_id
+                    FROM deadline d
+                    JOIN dag_run dr ON dr.id = d.dagrun_id
+                    WHERE d.deadline_alert_id IS NULL
+                      AND d.id > :last_deadline_id
+                    ORDER BY d.id
+                    LIMIT :batch_size
+                """),
+                {"last_deadline_id": last_deadline_id, "batch_size": 
batch_size},
+            )
+        )
+
+        if not batch_rows:
+            break
+
+        last_deadline_id = str(batch_rows[-1].id)
+        deadline_ids_by_alert: dict[str, list[Any]] = defaultdict(list)

Review Comment:
   `deadline_ids_by_alert` is initialized as a `defaultdict(list)` but 
annotated as a plain `dict[...]`, which is a type mismatch and can trip static 
checking. Update the annotation to `defaultdict[str, list[...]]` (or change the 
initializer to a plain dict + `setdefault`) and narrow `Any` to the actual 
`deadline.id` type used here (often `str`/UUID).



##########
airflow-core/src/airflow/migrations/versions/0101_3_2_0_ui_improvements_for_deadlines.py:
##########
@@ -396,18 +397,145 @@ def 
migrate_existing_deadline_alert_data_from_serialized_dag() -> None:
         )
         return
 
+    _migrate_deadline_alerts()
+
+
+def _backfill_deadline_alert_ids(
+    conn: Connection,
+    dag_first_alert_ids: dict[str, str],
+    batch_size: int,
+    dags_with_errors: ErrorDict,
+) -> int:
+    """Populate deadline.deadline_alert_id in primary-key order to avoid 
dagrun_id table scans."""
+    if not dag_first_alert_ids:
+        log.info("No deadline alerts migrated; skipping deadline backfill")
+        return 0
+
+    last_deadline_id = "00000000-0000-0000-0000-000000000000"
+    updated_deadlines = 0
+    missing_alert_dag_ids: set[str] = set()
+
+    total_deadlines = conn.execute(
+        sa.text("SELECT COUNT(*) FROM deadline WHERE deadline_alert_id IS 
NULL")
+    ).scalar()
+    total_batches = (total_deadlines + batch_size - 1) // batch_size
+    batch_num = 0
+
+    log.info(
+        "Starting deadline backfill",
+        batch_size=batch_size,
+        total_deadlines=total_deadlines,
+        total_batches=total_batches,
+    )
+
+    while True:
+        batch_num += 1
+        batch_rows = list(
+            conn.execute(
+                sa.text("""
+                    SELECT d.id, dr.dag_id
+                    FROM deadline d
+                    JOIN dag_run dr ON dr.id = d.dagrun_id
+                    WHERE d.deadline_alert_id IS NULL
+                      AND d.id > :last_deadline_id
+                    ORDER BY d.id
+                    LIMIT :batch_size
+                """),
+                {"last_deadline_id": last_deadline_id, "batch_size": 
batch_size},
+            )
+        )
+
+        if not batch_rows:
+            break
+
+        last_deadline_id = str(batch_rows[-1].id)
+        deadline_ids_by_alert: dict[str, list[Any]] = defaultdict(list)
+
+        for deadline_id, dag_id in batch_rows:
+            alert_id = dag_first_alert_ids.get(dag_id)
+            if alert_id is None:
+                missing_alert_dag_ids.add(dag_id)
+                continue
+            deadline_ids_by_alert[alert_id].append(deadline_id)
+
+        with _begin_nested_transaction(conn) as batch_conn:
+            for alert_id, deadline_ids in deadline_ids_by_alert.items():
+                batch_conn.execute(
+                    sa.text("""
+                        UPDATE deadline
+                        SET deadline_alert_id = :alert_id
+                        WHERE deadline_alert_id IS NULL
+                          AND id IN :deadline_ids
+                    """).bindparams(sa.bindparam("deadline_ids", 
expanding=True)),
+                    {"alert_id": alert_id, "deadline_ids": deadline_ids},
+                )
+                updated_deadlines += len(deadline_ids)
+
+        log.info(
+            "Deadline backfill batch complete",
+            batch_num=batch_num,
+            total_batches=total_batches,
+            updated_deadlines=updated_deadlines,
+        )
+
+    for dag_id in sorted(missing_alert_dag_ids):
+        dags_with_errors[dag_id].append("Could not find migrated deadline 
alert for historical deadlines")
+
+    return updated_deadlines
+
+
+def _migrate_deadline_alerts() -> None:
     BATCH_SIZE = conf.getint("database", "migration_batch_size", 
fallback=DEFAULT_BATCH_SIZE)
 
     processed_dags: list[str] = []
     dags_with_deadlines: set[str] = set()
     migrated_alerts_count: int = 0
+    updated_deadlines_count: int = 0
     dags_with_errors: ErrorDict = defaultdict(list)
+    dag_first_alert_ids: dict[str, str] = {}
     batch_num = 0
-    last_dag_id = ""
+    # Paginate by primary key (id) instead of dag_id because id is indexed (PK)
+    # while dag_id has no index — using dag_id would cause a full table scan + 
sort
+    # on every batch. UUID7 ids are time-ordered so the pagination order is 
stable.
+    last_id = "00000000-0000-0000-0000-000000000000"
 
     conn = op.get_bind()
     dialect = conn.dialect.name
 
+    # Build dialect-specific filter to skip rows without deadline data at the 
SQL level.
+    # This avoids transferring and processing large data blobs for the 
majority of DAGs
+    # that have no deadline configuration. Compressed rows are always included 
since the
+    # DB cannot inspect their content.
+    if dialect == "postgresql":
+        deadline_filter = (
+            "AND ("
+            "  data_compressed IS NOT NULL"
+            "  OR (data IS NOT NULL"
+            "      AND (data::jsonb -> 'dag' -> 'deadline') IS NOT NULL"
+            "      AND (data::jsonb -> 'dag' -> 'deadline') != 'null'::jsonb"
+            "      AND (data::jsonb -> 'dag' -> 'deadline') != '[]'::jsonb)"
+            ")"
+        )
+    elif dialect == "mysql":
+        deadline_filter = (
+            "AND ("
+            "  data_compressed IS NOT NULL"
+            "  OR (data IS NOT NULL"
+            "      AND JSON_EXTRACT(data, '$.dag.deadline') IS NOT NULL"
+            "      AND IFNULL(JSON_LENGTH(JSON_EXTRACT(data, 
'$.dag.deadline')), 0) > 0)"
+            ")"
+        )
+    else:
+        deadline_filter = (
+            "AND ("
+            "  data_compressed IS NOT NULL"
+            "  OR (data IS NOT NULL"
+            "      AND json_extract(data, '$.dag.deadline') IS NOT NULL"
+            "      AND json_extract(data, '$.dag.deadline') != 'null'"
+            "      AND json_extract(data, '$.dag.deadline') != '[]')"
+            ")"
+        )
+
     total_dags = conn.execute(
         sa.text("SELECT COUNT(*) FROM serialized_dag WHERE data IS NOT NULL OR 
data_compressed IS NOT NULL")

Review Comment:
   Progress logging (`total_dags`/`total_batches`) no longer reflects the 
actual rows processed because the main batching queries now apply 
`deadline_filter`. Consider updating this COUNT query to include the same 
`deadline_filter` logic so batch counts and ETA/progress logs are accurate and 
less confusing operationally.
   ```suggestion
           sa.text(f"SELECT COUNT(*) FROM serialized_dag WHERE 1=1 
{deadline_filter}")
   ```



##########
airflow-core/src/airflow/migrations/versions/0101_3_2_0_ui_improvements_for_deadlines.py:
##########
@@ -396,18 +397,145 @@ def 
migrate_existing_deadline_alert_data_from_serialized_dag() -> None:
         )
         return
 
+    _migrate_deadline_alerts()
+
+
+def _backfill_deadline_alert_ids(
+    conn: Connection,
+    dag_first_alert_ids: dict[str, str],
+    batch_size: int,
+    dags_with_errors: ErrorDict,
+) -> int:
+    """Populate deadline.deadline_alert_id in primary-key order to avoid 
dagrun_id table scans."""

Review Comment:
   `_backfill_deadline_alert_ids` introduces substantial new behavior (batched 
PK pagination, per-alert grouping updates, and missing-alert error reporting) 
but isn’t covered by the added migration unit tests (which currently focus on 
`temporary_index`). Add unit tests that validate: (1) batched selection 
advances by `deadline.id`, (2) deadlines are updated for dag_ids present in 
`dag_first_alert_ids`, and (3) dag_ids missing from the mapping are reported 
into `dags_with_errors`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to