Copilot commented on code in PR #64905: URL: https://github.com/apache/airflow/pull/64905#discussion_r3055070768
########## airflow-core/tests/unit/migrations/test_migration_utils.py: ########## @@ -0,0 +1,102 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +""" +Stairway test for Airflow DB migrations. + +Walks every individual migration step forward (upgrade) then backward (downgrade) +to verify that all migrations are fully reversible. This catches problems like: +- a migration that creates a constraint the downgrade forgets to drop +- a downgrade that references a column that was never added by the upgrade + +The test starts from the squashed baseline (revision 4bc4d934e2bc, the 2.6.2 +snapshot) and iterates through each revision in topological order. +""" + +from __future__ import annotations + +import pytest +from alembic import command +from alembic.script import ScriptDirectory + +from airflow.utils.db import _get_alembic_config + +pytestmark = pytest.mark.db_test + +# The squashed "start-of-history" revision – stairway starts here. +_SQUASHED_BASE_REVISION = "4bc4d934e2bc" + + +def _get_revisions_in_order() -> list[str]: + """Return revision IDs in upgrade order, starting after the squashed base.""" + config = _get_alembic_config() + script = ScriptDirectory.from_config(config) + + # walk_revisions() yields from head → base; reverse for upgrade order + all_revisions = list(script.walk_revisions()) + all_revisions.reverse() + + revision_ids = [rev.revision for rev in all_revisions] + + # Find the squashed base and return everything *after* it (the incremental steps) + try: + base_index = revision_ids.index(_SQUASHED_BASE_REVISION) + except ValueError: + return revision_ids # squashed revision not present, return all + + return revision_ids[base_index + 1 :] + + +# Force these tests to run sequentially on the same worker to prevent DB state +# conflicts during parallel testing (e.g., when using pytest-xdist). [email protected]_group(name="migration_stairway") [email protected]( + "revision_id", + [pytest.param(rev, id=rev) for rev in _get_revisions_in_order()], +) +def test_migration_stairway(revision_id: str) -> None: + """ + For each migration step verify that upgrade followed by downgrade works. + + The test applies the migration under test (upgrade to *revision_id*), then + immediately rolls it back (downgrade to the previous revision), and finally + re-applies it so that subsequent parametrized steps start from the correct + state. + """ + config = _get_alembic_config() + script = ScriptDirectory.from_config(config) + + revision_obj = script.get_revision(revision_id) + if revision_obj is None: + pytest.skip(f"Revision {revision_id!r} not found in migration scripts") + + prev_revision = revision_obj.down_revision + if isinstance(prev_revision, tuple): + # Merge point – pick the first parent. + prev_revision = prev_revision[0] + + # Step 1: upgrade to this revision + command.upgrade(config, revision=revision_id) + + # Determine the target revision for downgrade + target = prev_revision if prev_revision else "base" + + try: + # Step 2: downgrade back to the previous revision + command.downgrade(config, revision=target) + finally: + # Step 3: re-apply so subsequent steps see a consistent DB state. + command.upgrade(config, revision=revision_id) Review Comment: The test claims to start from the squashed baseline and then do single-step upgrade→downgrade per revision, but there is no setup that ensures the DB is at `_SQUASHED_BASE_REVISION` (or at the immediate parent) before the first parametrized case runs. In a typical DB-test run the DB is initialized at the current head, so the first `command.downgrade()` will traverse many revisions and won’t validate a single-step downgrade for just `revision_id`. Add an explicit module/session-scoped setup that brings the DB to the squashed baseline before running the stairway, and ensure teardown restores the DB to `heads` even on failure so other DB tests aren’t left on an intermediate schema. ########## airflow-core/tests/unit/migrations/test_migration_utils.py: ########## @@ -0,0 +1,102 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +""" +Stairway test for Airflow DB migrations. + +Walks every individual migration step forward (upgrade) then backward (downgrade) +to verify that all migrations are fully reversible. This catches problems like: +- a migration that creates a constraint the downgrade forgets to drop +- a downgrade that references a column that was never added by the upgrade + +The test starts from the squashed baseline (revision 4bc4d934e2bc, the 2.6.2 +snapshot) and iterates through each revision in topological order. +""" + +from __future__ import annotations + +import pytest +from alembic import command +from alembic.script import ScriptDirectory + +from airflow.utils.db import _get_alembic_config + +pytestmark = pytest.mark.db_test + +# The squashed "start-of-history" revision – stairway starts here. +_SQUASHED_BASE_REVISION = "4bc4d934e2bc" + + +def _get_revisions_in_order() -> list[str]: + """Return revision IDs in upgrade order, starting after the squashed base.""" + config = _get_alembic_config() + script = ScriptDirectory.from_config(config) + + # walk_revisions() yields from head → base; reverse for upgrade order + all_revisions = list(script.walk_revisions()) + all_revisions.reverse() + + revision_ids = [rev.revision for rev in all_revisions] + + # Find the squashed base and return everything *after* it (the incremental steps) + try: + base_index = revision_ids.index(_SQUASHED_BASE_REVISION) + except ValueError: + return revision_ids # squashed revision not present, return all + + return revision_ids[base_index + 1 :] + + +# Force these tests to run sequentially on the same worker to prevent DB state +# conflicts during parallel testing (e.g., when using pytest-xdist). [email protected]_group(name="migration_stairway") [email protected]( + "revision_id", + [pytest.param(rev, id=rev) for rev in _get_revisions_in_order()], +) Review Comment: This parametrized test implicitly depends on state being carried from one `revision_id` case to the next (so each case starts from the previous revision). With pytest-xdist, individual parametrized cases are scheduled independently and (with Breeze) xdist is invoked without `--dist=loadgroup`, so `xdist_group` will not actually keep all params on the same worker. Consider rewriting this as a single test that loops through revisions (so order/state is guaranteed), or otherwise make each case self-contained and not rely on cross-test DB state. ########## airflow-core/tests/unit/migrations/test_migration_utils.py: ########## @@ -0,0 +1,102 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +""" +Stairway test for Airflow DB migrations. + +Walks every individual migration step forward (upgrade) then backward (downgrade) +to verify that all migrations are fully reversible. This catches problems like: +- a migration that creates a constraint the downgrade forgets to drop +- a downgrade that references a column that was never added by the upgrade + +The test starts from the squashed baseline (revision 4bc4d934e2bc, the 2.6.2 +snapshot) and iterates through each revision in topological order. +""" + +from __future__ import annotations + +import pytest +from alembic import command +from alembic.script import ScriptDirectory + +from airflow.utils.db import _get_alembic_config + +pytestmark = pytest.mark.db_test + +# The squashed "start-of-history" revision – stairway starts here. +_SQUASHED_BASE_REVISION = "4bc4d934e2bc" + + +def _get_revisions_in_order() -> list[str]: + """Return revision IDs in upgrade order, starting after the squashed base.""" + config = _get_alembic_config() + script = ScriptDirectory.from_config(config) + + # walk_revisions() yields from head → base; reverse for upgrade order + all_revisions = list(script.walk_revisions()) + all_revisions.reverse() + + revision_ids = [rev.revision for rev in all_revisions] + + # Find the squashed base and return everything *after* it (the incremental steps) + try: + base_index = revision_ids.index(_SQUASHED_BASE_REVISION) + except ValueError: + return revision_ids # squashed revision not present, return all + + return revision_ids[base_index + 1 :] + + +# Force these tests to run sequentially on the same worker to prevent DB state +# conflicts during parallel testing (e.g., when using pytest-xdist). [email protected]_group(name="migration_stairway") [email protected]( Review Comment: There’s trailing whitespace on the comment line here (and also around the blank line before the `try:`). Please remove trailing spaces to keep diffs clean and satisfy common linters. ########## airflow-core/tests/unit/migrations/test_migration_utils.py: ########## @@ -0,0 +1,102 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +""" +Stairway test for Airflow DB migrations. + +Walks every individual migration step forward (upgrade) then backward (downgrade) +to verify that all migrations are fully reversible. This catches problems like: +- a migration that creates a constraint the downgrade forgets to drop +- a downgrade that references a column that was never added by the upgrade + +The test starts from the squashed baseline (revision 4bc4d934e2bc, the 2.6.2 +snapshot) and iterates through each revision in topological order. +""" + +from __future__ import annotations + +import pytest +from alembic import command +from alembic.script import ScriptDirectory + +from airflow.utils.db import _get_alembic_config + +pytestmark = pytest.mark.db_test + +# The squashed "start-of-history" revision – stairway starts here. +_SQUASHED_BASE_REVISION = "4bc4d934e2bc" + + +def _get_revisions_in_order() -> list[str]: + """Return revision IDs in upgrade order, starting after the squashed base.""" + config = _get_alembic_config() + script = ScriptDirectory.from_config(config) + + # walk_revisions() yields from head → base; reverse for upgrade order + all_revisions = list(script.walk_revisions()) + all_revisions.reverse() + + revision_ids = [rev.revision for rev in all_revisions] + + # Find the squashed base and return everything *after* it (the incremental steps) + try: + base_index = revision_ids.index(_SQUASHED_BASE_REVISION) + except ValueError: + return revision_ids # squashed revision not present, return all + + return revision_ids[base_index + 1 :] + + +# Force these tests to run sequentially on the same worker to prevent DB state +# conflicts during parallel testing (e.g., when using pytest-xdist). [email protected]_group(name="migration_stairway") [email protected]( + "revision_id", + [pytest.param(rev, id=rev) for rev in _get_revisions_in_order()], +) +def test_migration_stairway(revision_id: str) -> None: + """ + For each migration step verify that upgrade followed by downgrade works. + + The test applies the migration under test (upgrade to *revision_id*), then + immediately rolls it back (downgrade to the previous revision), and finally + re-applies it so that subsequent parametrized steps start from the correct + state. + """ + config = _get_alembic_config() + script = ScriptDirectory.from_config(config) + + revision_obj = script.get_revision(revision_id) + if revision_obj is None: + pytest.skip(f"Revision {revision_id!r} not found in migration scripts") + + prev_revision = revision_obj.down_revision + if isinstance(prev_revision, tuple): + # Merge point – pick the first parent. + prev_revision = prev_revision[0] + Review Comment: Merge revisions aren’t handled correctly: when `down_revision` is a tuple, picking the first parent can send the downgrade to an arbitrary branch and won’t exercise the real “one step back” for the merge revision. Prefer using Alembic’s relative targets (e.g. downgrade one step from the current revision) or otherwise compute the correct downgrade target for merge points so the test validates reversibility of the merge migration itself. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
