Copilot commented on code in PR #64905:
URL: https://github.com/apache/airflow/pull/64905#discussion_r3055070768


##########
airflow-core/tests/unit/migrations/test_migration_utils.py:
##########
@@ -0,0 +1,102 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Stairway test for Airflow DB migrations.
+
+Walks every individual migration step forward (upgrade) then backward 
(downgrade)
+to verify that all migrations are fully reversible.  This catches problems 
like:
+- a migration that creates a constraint the downgrade forgets to drop
+- a downgrade that references a column that was never added by the upgrade
+
+The test starts from the squashed baseline (revision 4bc4d934e2bc, the 2.6.2
+snapshot) and iterates through each revision in topological order.
+"""
+
+from __future__ import annotations
+
+import pytest
+from alembic import command
+from alembic.script import ScriptDirectory
+
+from airflow.utils.db import _get_alembic_config
+
+pytestmark = pytest.mark.db_test
+
+# The squashed "start-of-history" revision – stairway starts here.
+_SQUASHED_BASE_REVISION = "4bc4d934e2bc"
+
+
+def _get_revisions_in_order() -> list[str]:
+    """Return revision IDs in upgrade order, starting after the squashed 
base."""
+    config = _get_alembic_config()
+    script = ScriptDirectory.from_config(config)
+
+    # walk_revisions() yields from head → base; reverse for upgrade order
+    all_revisions = list(script.walk_revisions())
+    all_revisions.reverse()
+
+    revision_ids = [rev.revision for rev in all_revisions]
+
+    # Find the squashed base and return everything *after* it (the incremental 
steps)
+    try:
+        base_index = revision_ids.index(_SQUASHED_BASE_REVISION)
+    except ValueError:
+        return revision_ids  # squashed revision not present, return all
+
+    return revision_ids[base_index + 1 :]
+
+
+# Force these tests to run sequentially on the same worker to prevent DB state 
+# conflicts during parallel testing (e.g., when using pytest-xdist).
[email protected]_group(name="migration_stairway")
[email protected](
+    "revision_id",
+    [pytest.param(rev, id=rev) for rev in _get_revisions_in_order()],
+)
+def test_migration_stairway(revision_id: str) -> None:
+    """
+    For each migration step verify that upgrade followed by downgrade works.
+
+    The test applies the migration under test (upgrade to *revision_id*), then
+    immediately rolls it back (downgrade to the previous revision), and finally
+    re-applies it so that subsequent parametrized steps start from the correct
+    state.
+    """
+    config = _get_alembic_config()
+    script = ScriptDirectory.from_config(config)
+
+    revision_obj = script.get_revision(revision_id)
+    if revision_obj is None:
+        pytest.skip(f"Revision {revision_id!r} not found in migration scripts")
+
+    prev_revision = revision_obj.down_revision
+    if isinstance(prev_revision, tuple):
+        # Merge point – pick the first parent.
+        prev_revision = prev_revision[0]
+
+    # Step 1: upgrade to this revision
+    command.upgrade(config, revision=revision_id)
+
+    # Determine the target revision for downgrade
+    target = prev_revision if prev_revision else "base"
+    
+    try:
+        # Step 2: downgrade back to the previous revision
+        command.downgrade(config, revision=target)
+    finally:
+        # Step 3: re-apply so subsequent steps see a consistent DB state.
+        command.upgrade(config, revision=revision_id)

Review Comment:
   The test claims to start from the squashed baseline and then do single-step 
upgrade→downgrade per revision, but there is no setup that ensures the DB is at 
`_SQUASHED_BASE_REVISION` (or at the immediate parent) before the first 
parametrized case runs. In a typical DB-test run the DB is initialized at the 
current head, so the first `command.downgrade()` will traverse many revisions 
and won’t validate a single-step downgrade for just `revision_id`. Add an 
explicit module/session-scoped setup that brings the DB to the squashed 
baseline before running the stairway, and ensure teardown restores the DB to 
`heads` even on failure so other DB tests aren’t left on an intermediate schema.



##########
airflow-core/tests/unit/migrations/test_migration_utils.py:
##########
@@ -0,0 +1,102 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Stairway test for Airflow DB migrations.
+
+Walks every individual migration step forward (upgrade) then backward 
(downgrade)
+to verify that all migrations are fully reversible.  This catches problems 
like:
+- a migration that creates a constraint the downgrade forgets to drop
+- a downgrade that references a column that was never added by the upgrade
+
+The test starts from the squashed baseline (revision 4bc4d934e2bc, the 2.6.2
+snapshot) and iterates through each revision in topological order.
+"""
+
+from __future__ import annotations
+
+import pytest
+from alembic import command
+from alembic.script import ScriptDirectory
+
+from airflow.utils.db import _get_alembic_config
+
+pytestmark = pytest.mark.db_test
+
+# The squashed "start-of-history" revision – stairway starts here.
+_SQUASHED_BASE_REVISION = "4bc4d934e2bc"
+
+
+def _get_revisions_in_order() -> list[str]:
+    """Return revision IDs in upgrade order, starting after the squashed 
base."""
+    config = _get_alembic_config()
+    script = ScriptDirectory.from_config(config)
+
+    # walk_revisions() yields from head → base; reverse for upgrade order
+    all_revisions = list(script.walk_revisions())
+    all_revisions.reverse()
+
+    revision_ids = [rev.revision for rev in all_revisions]
+
+    # Find the squashed base and return everything *after* it (the incremental 
steps)
+    try:
+        base_index = revision_ids.index(_SQUASHED_BASE_REVISION)
+    except ValueError:
+        return revision_ids  # squashed revision not present, return all
+
+    return revision_ids[base_index + 1 :]
+
+
+# Force these tests to run sequentially on the same worker to prevent DB state 
+# conflicts during parallel testing (e.g., when using pytest-xdist).
[email protected]_group(name="migration_stairway")
[email protected](
+    "revision_id",
+    [pytest.param(rev, id=rev) for rev in _get_revisions_in_order()],
+)

Review Comment:
   This parametrized test implicitly depends on state being carried from one 
`revision_id` case to the next (so each case starts from the previous 
revision). With pytest-xdist, individual parametrized cases are scheduled 
independently and (with Breeze) xdist is invoked without `--dist=loadgroup`, so 
`xdist_group` will not actually keep all params on the same worker. Consider 
rewriting this as a single test that loops through revisions (so order/state is 
guaranteed), or otherwise make each case self-contained and not rely on 
cross-test DB state.



##########
airflow-core/tests/unit/migrations/test_migration_utils.py:
##########
@@ -0,0 +1,102 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Stairway test for Airflow DB migrations.
+
+Walks every individual migration step forward (upgrade) then backward 
(downgrade)
+to verify that all migrations are fully reversible.  This catches problems 
like:
+- a migration that creates a constraint the downgrade forgets to drop
+- a downgrade that references a column that was never added by the upgrade
+
+The test starts from the squashed baseline (revision 4bc4d934e2bc, the 2.6.2
+snapshot) and iterates through each revision in topological order.
+"""
+
+from __future__ import annotations
+
+import pytest
+from alembic import command
+from alembic.script import ScriptDirectory
+
+from airflow.utils.db import _get_alembic_config
+
+pytestmark = pytest.mark.db_test
+
+# The squashed "start-of-history" revision – stairway starts here.
+_SQUASHED_BASE_REVISION = "4bc4d934e2bc"
+
+
+def _get_revisions_in_order() -> list[str]:
+    """Return revision IDs in upgrade order, starting after the squashed 
base."""
+    config = _get_alembic_config()
+    script = ScriptDirectory.from_config(config)
+
+    # walk_revisions() yields from head → base; reverse for upgrade order
+    all_revisions = list(script.walk_revisions())
+    all_revisions.reverse()
+
+    revision_ids = [rev.revision for rev in all_revisions]
+
+    # Find the squashed base and return everything *after* it (the incremental 
steps)
+    try:
+        base_index = revision_ids.index(_SQUASHED_BASE_REVISION)
+    except ValueError:
+        return revision_ids  # squashed revision not present, return all
+
+    return revision_ids[base_index + 1 :]
+
+
+# Force these tests to run sequentially on the same worker to prevent DB state 
+# conflicts during parallel testing (e.g., when using pytest-xdist).
[email protected]_group(name="migration_stairway")
[email protected](

Review Comment:
   There’s trailing whitespace on the comment line here (and also around the 
blank line before the `try:`). Please remove trailing spaces to keep diffs 
clean and satisfy common linters.



##########
airflow-core/tests/unit/migrations/test_migration_utils.py:
##########
@@ -0,0 +1,102 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Stairway test for Airflow DB migrations.
+
+Walks every individual migration step forward (upgrade) then backward 
(downgrade)
+to verify that all migrations are fully reversible.  This catches problems 
like:
+- a migration that creates a constraint the downgrade forgets to drop
+- a downgrade that references a column that was never added by the upgrade
+
+The test starts from the squashed baseline (revision 4bc4d934e2bc, the 2.6.2
+snapshot) and iterates through each revision in topological order.
+"""
+
+from __future__ import annotations
+
+import pytest
+from alembic import command
+from alembic.script import ScriptDirectory
+
+from airflow.utils.db import _get_alembic_config
+
+pytestmark = pytest.mark.db_test
+
+# The squashed "start-of-history" revision – stairway starts here.
+_SQUASHED_BASE_REVISION = "4bc4d934e2bc"
+
+
+def _get_revisions_in_order() -> list[str]:
+    """Return revision IDs in upgrade order, starting after the squashed 
base."""
+    config = _get_alembic_config()
+    script = ScriptDirectory.from_config(config)
+
+    # walk_revisions() yields from head → base; reverse for upgrade order
+    all_revisions = list(script.walk_revisions())
+    all_revisions.reverse()
+
+    revision_ids = [rev.revision for rev in all_revisions]
+
+    # Find the squashed base and return everything *after* it (the incremental 
steps)
+    try:
+        base_index = revision_ids.index(_SQUASHED_BASE_REVISION)
+    except ValueError:
+        return revision_ids  # squashed revision not present, return all
+
+    return revision_ids[base_index + 1 :]
+
+
+# Force these tests to run sequentially on the same worker to prevent DB state 
+# conflicts during parallel testing (e.g., when using pytest-xdist).
[email protected]_group(name="migration_stairway")
[email protected](
+    "revision_id",
+    [pytest.param(rev, id=rev) for rev in _get_revisions_in_order()],
+)
+def test_migration_stairway(revision_id: str) -> None:
+    """
+    For each migration step verify that upgrade followed by downgrade works.
+
+    The test applies the migration under test (upgrade to *revision_id*), then
+    immediately rolls it back (downgrade to the previous revision), and finally
+    re-applies it so that subsequent parametrized steps start from the correct
+    state.
+    """
+    config = _get_alembic_config()
+    script = ScriptDirectory.from_config(config)
+
+    revision_obj = script.get_revision(revision_id)
+    if revision_obj is None:
+        pytest.skip(f"Revision {revision_id!r} not found in migration scripts")
+
+    prev_revision = revision_obj.down_revision
+    if isinstance(prev_revision, tuple):
+        # Merge point – pick the first parent.
+        prev_revision = prev_revision[0]
+

Review Comment:
   Merge revisions aren’t handled correctly: when `down_revision` is a tuple, 
picking the first parent can send the downgrade to an arbitrary branch and 
won’t exercise the real “one step back” for the merge revision. Prefer using 
Alembic’s relative targets (e.g. downgrade one step from the current revision) 
or otherwise compute the correct downgrade target for merge points so the test 
validates reversibility of the merge migration itself.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to