Copilot commented on code in PR #63185:
URL: https://github.com/apache/airflow/pull/63185#discussion_r3066493622
##########
airflow-core/src/airflow/dag_processing/bundles/manager.py:
##########
@@ -293,6 +294,66 @@ def _extract_and_sign_template(bundle_name: str) ->
tuple[str | None, dict]:
session.execute(delete(ParseImportError).where(ParseImportError.bundle_name ==
name))
self.log.info("Deleted import errors for bundle %s which is no
longer configured", name)
+ session.flush()
+
+ @provide_session
+ def reassign_dags_with_unconfigured_bundles(self, *, session: Session =
NEW_SESSION) -> int:
+ """
+ Reassign Dags that reference unconfigured bundles (None or incorrect)
to the first configured bundle as a fallback.
+
+ This addresses Dags that reference bundles incorrectly (i.e. the Dag's
`bundle_name` matches a value in the database, but that database value does not
correspond to a user-configured bundle) as a side effect of the
+ `0082_3_1_0_make_bundle_name_not_nullable.py` migration (see:
https://github.com/apache/airflow/issues/63323).
+
+ Instead of attempting to infer the correct bundle for each Dag during
the migration, we reassign all Dags with unconfigured bundles to the first
configured bundle at DagFileProcessorManager startup. This relaxes the
+ "Requested bundle '{name}' is not configured."
+ error that would otherwise occur when triggering a DagRun immediately
after the migration.
+
+ This fallback is not always semantically correct in environments using
multiple bundles, but it is a safe, temporary measure that allows users to
successfully trigger DagRuns right after the migration.
+
+ The correct Dag-to-bundle assignments will be restored by the Dag
processor on the next parsing cycle.
+
+ :param session: ORM Session
+ :return: Number of Dags reassigned.
+ """
+ from airflow.models.dag import DagModel
+ # lazy import to avoid circular import issues
+
+ configured_names = self.bundle_names
+ if not configured_names:
+ # This should not happen because we already have validation at
parse_config in constructor.
+ raise AirflowConfigException(
+ "No Dag bundles are currently configured. Cannot reassign Dags
with unconfigured bundles to a valid bundle. Please add at least one bundle
configuration to your config."
+ )
Review Comment:
`reassign_dags_with_unconfigured_bundles()` currently raises when
`bundle_names` is empty, but `DagBundlesManager.parse_config()` explicitly
allows an empty `dag_bundle_config_list` (see tests covering `"[]"`). Since
`DagFileProcessorManager.sync_bundles()` now calls this unconditionally at
startup, an intentionally empty bundle config will crash the Dag processor.
Consider making this a no-op (return 0 + log) when no bundles are configured,
or enforce non-empty bundles at config-parse time consistently across the
codebase.
```suggestion
self.log.info(
"No Dag bundles are configured; skipping reassignment of
Dags with unconfigured bundles."
)
return 0
```
##########
airflow-core/src/airflow/dag_processing/bundles/manager.py:
##########
@@ -293,6 +294,66 @@ def _extract_and_sign_template(bundle_name: str) ->
tuple[str | None, dict]:
session.execute(delete(ParseImportError).where(ParseImportError.bundle_name ==
name))
self.log.info("Deleted import errors for bundle %s which is no
longer configured", name)
+ session.flush()
+
+ @provide_session
+ def reassign_dags_with_unconfigured_bundles(self, *, session: Session =
NEW_SESSION) -> int:
+ """
+ Reassign Dags that reference unconfigured bundles (None or incorrect)
to the first configured bundle as a fallback.
+
+ This addresses Dags that reference bundles incorrectly (i.e. the Dag's
`bundle_name` matches a value in the database, but that database value does not
correspond to a user-configured bundle) as a side effect of the
+ `0082_3_1_0_make_bundle_name_not_nullable.py` migration (see:
https://github.com/apache/airflow/issues/63323).
+
+ Instead of attempting to infer the correct bundle for each Dag during
the migration, we reassign all Dags with unconfigured bundles to the first
configured bundle at DagFileProcessorManager startup. This relaxes the
+ "Requested bundle '{name}' is not configured."
+ error that would otherwise occur when triggering a DagRun immediately
after the migration.
+
+ This fallback is not always semantically correct in environments using
multiple bundles, but it is a safe, temporary measure that allows users to
successfully trigger DagRuns right after the migration.
+
+ The correct Dag-to-bundle assignments will be restored by the Dag
processor on the next parsing cycle.
+
+ :param session: ORM Session
+ :return: Number of Dags reassigned.
+ """
+ from airflow.models.dag import DagModel
+ # lazy import to avoid circular import issues
+
Review Comment:
This `DagModel` import is inside the method body, but unlike
`ParseImportError` in `sync_bundles_to_db`, it doesn't appear to be required to
avoid a circular import (`airflow.models.dag` does not import
`DagBundlesManager`). Please move this import to module scope to match
Airflow’s import conventions and avoid repeated imports on every call.
##########
airflow-core/tests/unit/dag_processing/bundles/test_dag_bundle_manager.py:
##########
@@ -449,3 +457,161 @@ def test_multiple_bundles_one_fails(clear_db, session):
def test_get_all_bundle_names():
assert DagBundlesManager().get_all_bundle_names() == ["dags-folder",
"example_dags"]
+
+
[email protected]
+def clear_dags_and_bundles():
+ clear_db_dags()
+ clear_db_dag_bundles()
+ yield
+ clear_db_dags()
+ clear_db_dag_bundles()
+
+
+def _add_dag(session, dag_id: str, bundle_name: str) -> DagModel:
+ dag = DagModel(dag_id=dag_id, bundle_name=bundle_name,
fileloc=f"/tmp/{dag_id}.py")
+ session.add(dag)
+ session.flush()
+ return dag
+
+
[email protected]_test
+class TestReassignDagsWithUnconfiguredBundles:
+ """Tests for DagBundlesManager.reassign_dags_with_unconfigured_bundles."""
+
+ def _manager_with_bundle_names(self, names: list[str]) ->
DagBundlesManager:
+ """Return a DagBundlesManager whose ``bundle_names`` property returns
*names*.
+
+ :param names: Bundle names to expose via the ``bundle_names`` property.
+ :return: A patched ``DagBundlesManager`` instance.
+ """
+ with patch.dict(
+ os.environ,
+ {"AIRFLOW__DAG_PROCESSOR__DAG_BUNDLE_CONFIG_LIST":
json.dumps(BASIC_BUNDLE_CONFIG)},
+ ):
+ manager = DagBundlesManager()
+ # Override bundle_names so the method uses the names we want without
+ # requiring a full bundle config for each name variant.
+ manager.__class__ = type(
+ "PatchedManager", (DagBundlesManager,), {"bundle_names":
property(lambda self: names)}
+ )
+ return manager
Review Comment:
Reassigning `manager.__class__` at runtime to override `bundle_names` is a
brittle pattern and makes the test harder to understand. Prefer patching the
property via `monkeypatch`/`patch.object` (or injecting configured names via
env/config) so the instance type remains stable.
##########
airflow-core/src/airflow/dag_processing/bundles/manager.py:
##########
@@ -293,6 +294,66 @@ def _extract_and_sign_template(bundle_name: str) ->
tuple[str | None, dict]:
session.execute(delete(ParseImportError).where(ParseImportError.bundle_name ==
name))
self.log.info("Deleted import errors for bundle %s which is no
longer configured", name)
+ session.flush()
+
+ @provide_session
+ def reassign_dags_with_unconfigured_bundles(self, *, session: Session =
NEW_SESSION) -> int:
+ """
+ Reassign Dags that reference unconfigured bundles (None or incorrect)
to the first configured bundle as a fallback.
+
+ This addresses Dags that reference bundles incorrectly (i.e. the Dag's
`bundle_name` matches a value in the database, but that database value does not
correspond to a user-configured bundle) as a side effect of the
+ `0082_3_1_0_make_bundle_name_not_nullable.py` migration (see:
https://github.com/apache/airflow/issues/63323).
+
+ Instead of attempting to infer the correct bundle for each Dag during
the migration, we reassign all Dags with unconfigured bundles to the first
configured bundle at DagFileProcessorManager startup. This relaxes the
+ "Requested bundle '{name}' is not configured."
+ error that would otherwise occur when triggering a DagRun immediately
after the migration.
+
+ This fallback is not always semantically correct in environments using
multiple bundles, but it is a safe, temporary measure that allows users to
successfully trigger DagRuns right after the migration.
+
+ The correct Dag-to-bundle assignments will be restored by the Dag
processor on the next parsing cycle.
Review Comment:
Several lines in this new docstring exceed the repo’s configured
110-character line length (ruff/black). Please wrap the long lines (especially
the migration reference + issue link paragraph) so CI doesn’t fail with E501.
```suggestion
Reassign Dags that reference unconfigured bundles (None or
incorrect) to the first
configured bundle as a fallback.
This addresses Dags that reference bundles incorrectly (i.e. the
Dag's `bundle_name`
matches a value in the database, but that database value does not
correspond to a
user-configured bundle) as a side effect of the
`0082_3_1_0_make_bundle_name_not_nullable.py` migration
(see: https://github.com/apache/airflow/issues/63323).
Instead of attempting to infer the correct bundle for each Dag
during the migration, we
reassign all Dags with unconfigured bundles to the first configured
bundle at
DagFileProcessorManager startup. This relaxes the
"Requested bundle '{name}' is not configured."
error that would otherwise occur when triggering a DagRun
immediately after the migration.
This fallback is not always semantically correct in environments
using multiple bundles,
but it is a safe, temporary measure that allows users to
successfully trigger DagRuns
right after the migration.
The correct Dag-to-bundle assignments will be restored by the Dag
processor on the next
parsing cycle.
```
##########
airflow-core/src/airflow/dag_processing/bundles/manager.py:
##########
@@ -293,6 +294,66 @@ def _extract_and_sign_template(bundle_name: str) ->
tuple[str | None, dict]:
session.execute(delete(ParseImportError).where(ParseImportError.bundle_name ==
name))
self.log.info("Deleted import errors for bundle %s which is no
longer configured", name)
+ session.flush()
+
Review Comment:
`session.flush()` here looks redundant: `session.execute(delete(...))` has
already hit the DB, and `@provide_session` will flush/commit on exit when it
owns the session. If this is only meant to support the subsequent startup
reassignment, it shouldn’t be necessary; consider removing it (or add a short
comment explaining why an explicit flush is required).
```suggestion
```
##########
airflow-core/src/airflow/dag_processing/bundles/manager.py:
##########
@@ -293,6 +294,66 @@ def _extract_and_sign_template(bundle_name: str) ->
tuple[str | None, dict]:
session.execute(delete(ParseImportError).where(ParseImportError.bundle_name ==
name))
self.log.info("Deleted import errors for bundle %s which is no
longer configured", name)
+ session.flush()
+
+ @provide_session
+ def reassign_dags_with_unconfigured_bundles(self, *, session: Session =
NEW_SESSION) -> int:
+ """
+ Reassign Dags that reference unconfigured bundles (None or incorrect)
to the first configured bundle as a fallback.
+
+ This addresses Dags that reference bundles incorrectly (i.e. the Dag's
`bundle_name` matches a value in the database, but that database value does not
correspond to a user-configured bundle) as a side effect of the
+ `0082_3_1_0_make_bundle_name_not_nullable.py` migration (see:
https://github.com/apache/airflow/issues/63323).
+
+ Instead of attempting to infer the correct bundle for each Dag during
the migration, we reassign all Dags with unconfigured bundles to the first
configured bundle at DagFileProcessorManager startup. This relaxes the
+ "Requested bundle '{name}' is not configured."
+ error that would otherwise occur when triggering a DagRun immediately
after the migration.
+
+ This fallback is not always semantically correct in environments using
multiple bundles, but it is a safe, temporary measure that allows users to
successfully trigger DagRuns right after the migration.
+
+ The correct Dag-to-bundle assignments will be restored by the Dag
processor on the next parsing cycle.
+
+ :param session: ORM Session
+ :return: Number of Dags reassigned.
+ """
+ from airflow.models.dag import DagModel
+ # lazy import to avoid circular import issues
+
+ configured_names = self.bundle_names
+ if not configured_names:
+ # This should not happen because we already have validation at
parse_config in constructor.
+ raise AirflowConfigException(
+ "No Dag bundles are currently configured. Cannot reassign Dags
with unconfigured bundles to a valid bundle. Please add at least one bundle
configuration to your config."
+ )
+ default_bundle = configured_names[0]
+
+ count = cast(
+ "CursorResult",
+ session.execute(
+ update(DagModel)
+ .where(
+ or_(
+ DagModel.bundle_name.notin_(configured_names),
+ DagModel.bundle_name.is_(None),
+ )
+ )
+ .values(bundle_name=default_bundle)
+ ),
Review Comment:
For consistency with other bulk updates on `DagModel` in the codebase (e.g.
`airflow/dag_processing/collection.py` and core API routes), add
`.execution_options(synchronize_session="fetch")` to this `update(DagModel)`
statement so callers that pass an existing session don’t end up with stale
in-memory `DagModel` objects.
##########
airflow-core/tests/unit/dag_processing/test_manager.py:
##########
@@ -1249,6 +1249,18 @@ def test_add_callback_skips_when_bundle_init_fails(self,
mock_bundle_manager):
bundle.initialize.assert_called_once()
assert len(manager._callback_to_execute) == 0
+ @mock.patch("airflow.dag_processing.manager.DagBundlesManager")
+ def test_reassign_called_once_at_startup_not_on_refresh(self,
mock_bundle_manager):
+ """reassign_dags_with_unconfigured_bundles is called exactly once by
sync_bundles, not by _refresh_dag_bundles."""
Review Comment:
This new test docstring line is likely over the repo’s 110-character line
length limit (ruff E501). Please wrap it across multiple lines to avoid lint
failures.
```suggestion
"""
reassign_dags_with_unconfigured_bundles is called exactly once by
sync_bundles,
not by _refresh_dag_bundles.
"""
```
##########
airflow-core/tests/unit/dag_processing/bundles/test_dag_bundle_manager.py:
##########
@@ -449,3 +457,161 @@ def test_multiple_bundles_one_fails(clear_db, session):
def test_get_all_bundle_names():
assert DagBundlesManager().get_all_bundle_names() == ["dags-folder",
"example_dags"]
+
+
[email protected]
+def clear_dags_and_bundles():
+ clear_db_dags()
+ clear_db_dag_bundles()
+ yield
+ clear_db_dags()
+ clear_db_dag_bundles()
+
+
+def _add_dag(session, dag_id: str, bundle_name: str) -> DagModel:
+ dag = DagModel(dag_id=dag_id, bundle_name=bundle_name,
fileloc=f"/tmp/{dag_id}.py")
+ session.add(dag)
+ session.flush()
+ return dag
+
+
[email protected]_test
+class TestReassignDagsWithUnconfiguredBundles:
+ """Tests for DagBundlesManager.reassign_dags_with_unconfigured_bundles."""
+
+ def _manager_with_bundle_names(self, names: list[str]) ->
DagBundlesManager:
+ """Return a DagBundlesManager whose ``bundle_names`` property returns
*names*.
+
+ :param names: Bundle names to expose via the ``bundle_names`` property.
+ :return: A patched ``DagBundlesManager`` instance.
+ """
+ with patch.dict(
+ os.environ,
+ {"AIRFLOW__DAG_PROCESSOR__DAG_BUNDLE_CONFIG_LIST":
json.dumps(BASIC_BUNDLE_CONFIG)},
+ ):
+ manager = DagBundlesManager()
+ # Override bundle_names so the method uses the names we want without
+ # requiring a full bundle config for each name variant.
+ manager.__class__ = type(
+ "PatchedManager", (DagBundlesManager,), {"bundle_names":
property(lambda self: names)}
+ )
+ return manager
+
+ def test_no_configured_bundles_raises(self, clear_dags_and_bundles,
session):
+ """Raise AirflowConfigException when no bundles are configured."""
+ manager = self._manager_with_bundle_names([])
+ with pytest.raises(AirflowConfigException, match="No Dag bundles are
currently configured"):
+ manager.reassign_dags_with_unconfigured_bundles(session=session)
Review Comment:
This test asserts that `reassign_dags_with_unconfigured_bundles()` should
raise when no bundles are configured, but `DagBundlesManager` currently permits
an explicitly empty `dag_bundle_config_list` (and `DagFileProcessorManager` can
run with no bundles). Since `sync_bundles()` now calls reassignment
unconditionally at startup, raising here will crash the Dag processor in that
configuration. Consider changing the behavior (and this test) to a no-op that
returns 0 when no bundles are configured.
```suggestion
def test_no_configured_bundles_is_noop(self, clear_dags_and_bundles,
session):
"""Return 0 without raising when no bundles are configured."""
manager = self._manager_with_bundle_names([])
assert
manager.reassign_dags_with_unconfigured_bundles(session=session) == 0
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]