Copilot commented on code in PR #64751:
URL: https://github.com/apache/airflow/pull/64751#discussion_r3066478875
##########
airflow-core/src/airflow/models/deadline_alert.py:
##########
@@ -50,11 +50,19 @@ class DeadlineAlert(Base):
name: Mapped[str | None] = mapped_column(String(250), nullable=True)
description: Mapped[str | None] = mapped_column(Text, nullable=True)
reference: Mapped[dict] = mapped_column(JSON, nullable=False)
- interval: Mapped[float] = mapped_column(Float, nullable=False)
+ interval: Mapped[dict] = mapped_column(JSON, nullable=False)
callback_def: Mapped[dict] = mapped_column(JSON, nullable=False)
def __repr__(self):
- interval_seconds = int(self.interval)
+
+ if isinstance(self.interval, (int, float)):
+ interval_seconds = int(self.interval)
+
+ elif isinstance(self.interval, datetime.timedelta):
+ interval_seconds = int(self.interval.total_seconds())
+
+ else:
+ interval_display = "dynamic"
if interval_seconds >= 3600:
interval_display = f"{interval_seconds // 3600}h"
Review Comment:
`interval_seconds` is not set in the final `else` branch, but is used
unconditionally afterward (`if interval_seconds >= 3600`). For non-numeric /
non-`timedelta` intervals (e.g., serialized dicts or dynamic interval objects),
`__repr__` will raise `UnboundLocalError`. Restructure `__repr__` to either
return/format immediately for dynamic intervals or initialize
`interval_seconds` to a sentinel and branch safely.
##########
airflow-core/src/airflow/migrations/versions/0111_3_3_0_change_deadline_interval_to_json.py:
##########
@@ -0,0 +1,248 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Change type of interval in Deadline Alerts table to JSON.
+
+Revision ID: 82f208dbbad5
+Revises: a4c2d171ae18
+Create Date: 2026-04-06 16:55:46.517409
+
+"""
+
+from __future__ import annotations
+
+import sqlalchemy as sa
+from alembic import context, op
+
+# revision identifiers, used by Alembic.
+revision = "82f208dbbad5"
+down_revision = "a4c2d171ae18"
+branch_labels = None
+depends_on = None
+airflow_version = "3.3.0"
+
+
+def upgrade():
+ """Apply change deadline interval to JSON."""
+ conn = op.get_bind()
+ dialect = conn.dialect.name
+
+ if context.is_offline_mode():
+ print(
+ """
+ Manual conversion required:
+
+ PostgreSQL:
+ UPDATE deadline_alert
+ SET interval = json_build_object(
+ '__classname__', 'datetime.timedelta',
+ '__version__', 2,
+ '__data__', (interval::text)::float
+ )
+ WHERE jsonb_typeof(interval::jsonb) = 'number';
+
+ MySQL:
+ UPDATE deadline_alert
+ SET `interval` = JSON_OBJECT(
+ '__classname__', 'datetime.timedelta',
+ '__version__', 2,
+ '__data__', `interval`
+ );
+
+ SQLite:
+ UPDATE deadline_alert
+ SET interval =
+ '{"__classname__":"datetime.timedelta","__version__":'
+ || '2' ||
+ ',"__data__":' || CAST(interval AS TEXT) || '}';
+ """
+ )
+ return
+
+ with op.batch_alter_table("deadline_alert") as batch_op:
+ if dialect == "postgresql":
+ batch_op.alter_column(
+ "interval",
+ existing_type=sa.FLOAT(),
+ type_=sa.JSON(),
+ postgresql_using="to_json(interval)",
+ existing_nullable=False,
+ )
+ else:
+ batch_op.alter_column(
+ "interval",
+ existing_type=sa.FLOAT(),
+ type_=sa.JSON(),
+ existing_nullable=False,
+ )
+
+ if dialect == "postgresql":
+ op.execute("""
+ UPDATE deadline_alert
+ SET interval = json_build_object(
+ '__classname__', 'datetime.timedelta',
+ '__version__', 2,
+ '__data__', (interval::text)::float
+ )
+ WHERE jsonb_typeof(interval::jsonb) = 'number'
+ """)
+
+ elif dialect == "mysql":
+ op.execute("""
+ UPDATE deadline_alert
+ SET `interval` = JSON_OBJECT(
+ '__classname__', 'datetime.timedelta',
+ '__version__', 2,
+ '__data__', `interval`
+ )
+ """)
+
+ else:
+ op.execute("""
+ UPDATE deadline_alert
+ SET interval =
+ '{"__classname__":"datetime.timedelta","__version__":'
+ || '2' ||
+ ',"__data__":' || CAST(interval AS TEXT) || '}'
+ """)
+
+
+def downgrade():
+ """Revert deadline interval back to float."""
+ conn = op.get_bind()
+ dialect = conn.dialect.name
+
+ if context.is_offline_mode():
+ print(
+ """
+ Manual downgrade required:
+
+ PostgreSQL:
+ UPDATE deadline_alert
+ SET interval =
+ CASE
+ WHEN interval::jsonb ? '__data__'
+ THEN to_json((interval->>'__data__')::double precision)
+ ELSE to_json((interval::text)::double precision)
+ END;
+
+ MySQL:
+ UPDATE deadline_alert
+ SET `interval` =
+ CASE
+ WHEN JSON_EXTRACT(`interval`, '$.__data__') IS NOT NULL
+ THEN CAST(JSON_EXTRACT(`interval`, '$.__data__') AS
DECIMAL(20,6))
+ ELSE CAST(`interval` AS DECIMAL(20,6))
+ END;
+
+ SQLite:
+ UPDATE deadline_alert
+ SET interval =
+ CASE
+ WHEN json_extract(interval, '$.__data__') IS NOT NULL
+ THEN CAST(json_extract(interval, '$.__data__') AS REAL)
+ ELSE CAST(interval AS REAL)
+ END;
+ """
+ )
+ return
+
+ if dialect == "postgresql":
+ op.execute("""
+ UPDATE deadline_alert
+ SET interval =
+ CASE
+ WHEN interval::jsonb ? '__data__'
+ THEN to_json((interval->>'__data__')::double precision)
+ ELSE to_json((interval::text)::double precision)
+ END
+ """)
+
+ elif dialect == "mysql":
+ op.execute("""
+ UPDATE deadline_alert
+ SET `interval` =
+ CASE
+ WHEN JSON_EXTRACT(`interval`, '$.__data__') IS NOT NULL
+ THEN CAST(JSON_EXTRACT(`interval`, '$.__data__') AS
DECIMAL(20,6))
+ ELSE CAST(`interval` AS DECIMAL(20,6))
+ END
+ """)
+
+ # Serialized VariableInterval objects do not contain a numeric "__data__"
field
+ # and therefore cannot be converted back to a float representation.
+ # During downgrade, only timedelta-style serialized values are converted.
+ # Other serialized interval types (e.g. VariableInterval) are not
explicitly
+ # handled here and will fall through to the generic casting branch, which
may
+ # result in NULL or backend-specific casting behavior.
+ else:
+ # Detect availability of SQLite JSON functions (JSON1 extension).
+ # If available, use json_extract for robust parsing.
+ # Otherwise, fall back to string-based extraction.
+ json_functions_available = False
+ try:
+ conn.execute(sa.text("SELECT JSON_SET('{}', '$.test',
'value')")).fetchone()
+ json_functions_available = True
+ print("SQLite JSON functions detected, using optimized SQL
approach")
+ except Exception:
+ print("SQLite JSON functions not available, using Python fallback
for JSON processing")
+
+ if json_functions_available:
+ op.execute("""
+ UPDATE deadline_alert
+ SET interval =
+ CASE
+ WHEN json_extract(interval, '$.__data__') IS NOT NULL
+ THEN CAST(json_extract(interval, '$.__data__') AS REAL)
+ ELSE CAST(interval AS REAL)
+ END
+ """)
+ else:
+ op.execute("""
+ UPDATE deadline_alert
+ SET interval =
+ CASE
+ WHEN instr(interval, '__data__') > 0
+ THEN CAST(
+ substr(
+ interval,
+ instr(interval, '__data__') +
+ instr(substr(interval, instr(interval,
'__data__')), ':')
+ ) AS FLOAT
+ )
+ ELSE CAST(interval AS FLOAT)
+ END
+ """)
+
+ with op.batch_alter_table("deadline_alert") as batch_op:
+ if dialect == "postgresql":
+ batch_op.alter_column(
+ "interval",
+ existing_type=sa.JSON(),
+ type_=sa.FLOAT(),
+ postgresql_using="(interval->>'__data__')::double precision",
+ existing_nullable=False,
+ )
Review Comment:
The PostgreSQL downgrade path will fail for serialized non-`timedelta` JSON
objects (e.g., `VariableInterval`) because the `ELSE
to_json((interval::text)::double precision)` branch attempts to cast a JSON
object’s text (like `{\"__classname__\": ...}`) to `double precision`, which
will error and abort the migration. Similarly,
`postgresql_using=\"(interval->>'__data__')::double precision\"` will fail when
`__data__` is absent. Update the downgrade to only convert rows that are known
numeric or known `datetime.timedelta` serialized shapes (e.g., check
`interval->>'__classname__' = 'datetime.timedelta'` or
`jsonb_typeof(interval::jsonb) = 'number'`) and decide an explicit fallback for
unsupported dynamic objects (e.g., set to NULL / a safe default / raise with a
targeted message).
##########
airflow-core/src/airflow/migrations/versions/0111_3_3_0_change_deadline_interval_to_json.py:
##########
@@ -0,0 +1,248 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Change type of interval in Deadline Alerts table to JSON.
+
+Revision ID: 82f208dbbad5
+Revises: a4c2d171ae18
+Create Date: 2026-04-06 16:55:46.517409
+
+"""
+
+from __future__ import annotations
+
+import sqlalchemy as sa
+from alembic import context, op
+
+# revision identifiers, used by Alembic.
+revision = "82f208dbbad5"
+down_revision = "a4c2d171ae18"
+branch_labels = None
+depends_on = None
+airflow_version = "3.3.0"
+
+
+def upgrade():
+ """Apply change deadline interval to JSON."""
+ conn = op.get_bind()
+ dialect = conn.dialect.name
+
+ if context.is_offline_mode():
+ print(
+ """
+ Manual conversion required:
+
+ PostgreSQL:
+ UPDATE deadline_alert
+ SET interval = json_build_object(
+ '__classname__', 'datetime.timedelta',
+ '__version__', 2,
+ '__data__', (interval::text)::float
+ )
+ WHERE jsonb_typeof(interval::jsonb) = 'number';
+
+ MySQL:
+ UPDATE deadline_alert
+ SET `interval` = JSON_OBJECT(
+ '__classname__', 'datetime.timedelta',
+ '__version__', 2,
+ '__data__', `interval`
+ );
+
+ SQLite:
+ UPDATE deadline_alert
+ SET interval =
+ '{"__classname__":"datetime.timedelta","__version__":'
+ || '2' ||
+ ',"__data__":' || CAST(interval AS TEXT) || '}';
+ """
+ )
+ return
+
+ with op.batch_alter_table("deadline_alert") as batch_op:
+ if dialect == "postgresql":
+ batch_op.alter_column(
+ "interval",
+ existing_type=sa.FLOAT(),
+ type_=sa.JSON(),
+ postgresql_using="to_json(interval)",
+ existing_nullable=False,
+ )
+ else:
+ batch_op.alter_column(
+ "interval",
+ existing_type=sa.FLOAT(),
+ type_=sa.JSON(),
+ existing_nullable=False,
+ )
+
+ if dialect == "postgresql":
+ op.execute("""
+ UPDATE deadline_alert
+ SET interval = json_build_object(
+ '__classname__', 'datetime.timedelta',
+ '__version__', 2,
+ '__data__', (interval::text)::float
+ )
+ WHERE jsonb_typeof(interval::jsonb) = 'number'
+ """)
+
+ elif dialect == "mysql":
+ op.execute("""
+ UPDATE deadline_alert
+ SET `interval` = JSON_OBJECT(
+ '__classname__', 'datetime.timedelta',
+ '__version__', 2,
+ '__data__', `interval`
+ )
+ """)
+
+ else:
+ op.execute("""
+ UPDATE deadline_alert
+ SET interval =
+ '{"__classname__":"datetime.timedelta","__version__":'
+ || '2' ||
+ ',"__data__":' || CAST(interval AS TEXT) || '}'
+ """)
+
+
+def downgrade():
+ """Revert deadline interval back to float."""
+ conn = op.get_bind()
+ dialect = conn.dialect.name
+
+ if context.is_offline_mode():
+ print(
+ """
+ Manual downgrade required:
+
+ PostgreSQL:
+ UPDATE deadline_alert
+ SET interval =
+ CASE
+ WHEN interval::jsonb ? '__data__'
+ THEN to_json((interval->>'__data__')::double precision)
+ ELSE to_json((interval::text)::double precision)
+ END;
+
+ MySQL:
+ UPDATE deadline_alert
+ SET `interval` =
+ CASE
+ WHEN JSON_EXTRACT(`interval`, '$.__data__') IS NOT NULL
+ THEN CAST(JSON_EXTRACT(`interval`, '$.__data__') AS
DECIMAL(20,6))
+ ELSE CAST(`interval` AS DECIMAL(20,6))
+ END;
+
+ SQLite:
+ UPDATE deadline_alert
+ SET interval =
+ CASE
+ WHEN json_extract(interval, '$.__data__') IS NOT NULL
+ THEN CAST(json_extract(interval, '$.__data__') AS REAL)
+ ELSE CAST(interval AS REAL)
+ END;
+ """
+ )
+ return
+
+ if dialect == "postgresql":
+ op.execute("""
+ UPDATE deadline_alert
+ SET interval =
+ CASE
+ WHEN interval::jsonb ? '__data__'
+ THEN to_json((interval->>'__data__')::double precision)
+ ELSE to_json((interval::text)::double precision)
+ END
+ """)
Review Comment:
The PostgreSQL downgrade path will fail for serialized non-`timedelta` JSON
objects (e.g., `VariableInterval`) because the `ELSE
to_json((interval::text)::double precision)` branch attempts to cast a JSON
object’s text (like `{\"__classname__\": ...}`) to `double precision`, which
will error and abort the migration. Similarly,
`postgresql_using=\"(interval->>'__data__')::double precision\"` will fail when
`__data__` is absent. Update the downgrade to only convert rows that are known
numeric or known `datetime.timedelta` serialized shapes (e.g., check
`interval->>'__classname__' = 'datetime.timedelta'` or
`jsonb_typeof(interval::jsonb) = 'number'`) and decide an explicit fallback for
unsupported dynamic objects (e.g., set to NULL / a safe default / raise with a
targeted message).
##########
airflow-core/newsfragments/64751.feature.rst:
##########
@@ -0,0 +1 @@
+Allow DeadlineAlert intervals to be dynamically resolved at DAG parse time
using objects such as VariableInterval.
Review Comment:
The newsfragment says intervals are resolved at 'DAG parse time', but the PR
description and implementation resolve `VariableInterval` during DagRun
deadline evaluation/creation. Please update the fragment text to match the
actual behavior to avoid user confusion.
##########
task-sdk/src/airflow/sdk/definitions/deadline.py:
##########
@@ -340,3 +347,58 @@ def decorator(
return reference_class
return decorator
+
+
[email protected]
+class VariableInterval:
+ """
+ Interval backed by an Airflow Variable.
+
+ This allows DeadlineAlert intervals to be configured dynamically using
+ Airflow Variables. The variable value is interpreted as minutes and
+ converted into a ``timedelta``.
+
+ ------
+ Usage:
+ ------
+
+ .. code-block:: python
+
+ from airflow.sdk import DAG, DeadlineAlert, DeadlineReference,
AsyncCallback
+
+ DAG(
+ dag_id="dag_with_variable_interval",
+ deadline=DeadlineAlert(
+ reference=DeadlineReference.DAGRUN_QUEUED_AT,
+ interval=VariableInterval("deadline_minutes"),
+ callback=AsyncCallback(my_callback),
+ ),
+ )
+
+ ------
+ Notes:
+ ------
+ * Resolution occurs when deadlines are evaluated (during DagRun creation).
+ * Changes to the Variable affect only newly parsed DAGs and future DagRuns.
+ * Existing deadlines are not retroactively updated.
+ """
+
+ key: str
+
Review Comment:
`DeadlineAlert.__hash__` hashes `self.interval`, but `@attrs.define`
defaults can make instances unhashable when `eq=True` (common default), which
will raise `TypeError: unhashable type: 'VariableInterval'` when a
`DeadlineAlert` is hashed. Make `VariableInterval` explicitly hashable (e.g.,
`@attrs.define(frozen=True)` or an explicit `__hash__`) or update
`DeadlineAlert.__hash__` to avoid hashing non-hashable intervals.
```suggestion
def __hash__(self) -> int:
return hash(self.key)
```
##########
airflow-core/src/airflow/serialization/decoders.py:
##########
@@ -162,9 +162,18 @@ def decode_deadline_alert(encoded_data: dict):
reference_data = data[DeadlineAlertFields.REFERENCE]
reference = decode_deadline_reference(reference_data)
+ raw_interval = data[DeadlineAlertFields.INTERVAL]
+
+ # Backward compatibility: previously interval was stored as
total_seconds() (float/int).
+ # Handle numeric values by converting to timedelta.
+ if isinstance(raw_interval, (int, float)):
+ interval = datetime.timedelta(seconds=raw_interval)
+ else:
+ interval = cast("datetime.timedelta", deserialize(raw_interval))
Review Comment:
The `cast(\"datetime.timedelta\", ...)` is misleading now that
`deserialize(raw_interval)` can return non-`timedelta` interval objects (e.g.,
`VariableInterval`). This doesn’t affect runtime behavior, but it hides type
mismatches from linters/type-checkers and makes it easier to accidentally treat
the result as always a `timedelta`. Prefer widening the type (e.g., `timedelta
| VariableInterval | ...`) and removing or adjusting the cast accordingly.
##########
airflow-core/src/airflow/migrations/versions/0111_3_3_0_change_deadline_interval_to_json.py:
##########
@@ -0,0 +1,248 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Change type of interval in Deadline Alerts table to JSON.
+
+Revision ID: 82f208dbbad5
+Revises: a4c2d171ae18
+Create Date: 2026-04-06 16:55:46.517409
+
+"""
+
+from __future__ import annotations
+
+import sqlalchemy as sa
+from alembic import context, op
+
+# revision identifiers, used by Alembic.
+revision = "82f208dbbad5"
+down_revision = "a4c2d171ae18"
+branch_labels = None
+depends_on = None
+airflow_version = "3.3.0"
+
+
+def upgrade():
+ """Apply change deadline interval to JSON."""
+ conn = op.get_bind()
+ dialect = conn.dialect.name
+
+ if context.is_offline_mode():
+ print(
+ """
+ Manual conversion required:
+
+ PostgreSQL:
+ UPDATE deadline_alert
+ SET interval = json_build_object(
+ '__classname__', 'datetime.timedelta',
+ '__version__', 2,
+ '__data__', (interval::text)::float
+ )
+ WHERE jsonb_typeof(interval::jsonb) = 'number';
Review Comment:
The offline-mode 'Manual conversion required' instructions appear
inconsistent with the actual upgrade sequence: they do not include the required
`ALTER TABLE`/type change step, and the PostgreSQL `WHERE
jsonb_typeof(interval::jsonb)` predicate implies `interval` is already JSON
(but offline mode returns before any conversion). Consider updating the offline
instructions to be a complete, executable sequence (type change +
transformation) and ensure predicates match the pre/post column type at each
step.
##########
task-sdk/src/airflow/sdk/definitions/deadline.py:
##########
@@ -340,3 +347,58 @@ def decorator(
return reference_class
return decorator
+
+
[email protected]
Review Comment:
`DeadlineAlert.__hash__` hashes `self.interval`, but `@attrs.define`
defaults can make instances unhashable when `eq=True` (common default), which
will raise `TypeError: unhashable type: 'VariableInterval'` when a
`DeadlineAlert` is hashed. Make `VariableInterval` explicitly hashable (e.g.,
`@attrs.define(frozen=True)` or an explicit `__hash__`) or update
`DeadlineAlert.__hash__` to avoid hashing non-hashable intervals.
```suggestion
@attrs.define(unsafe_hash=True)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]