Copilot commented on code in PR #63728:
URL: https://github.com/apache/airflow/pull/63728#discussion_r3066477950


##########
airflow-core/tests/unit/utils/test_db_cleanup.py:
##########
@@ -422,6 +422,65 @@ def test_cleanup_with_dag_id_filtering(self, dag_ids, 
exclude_dag_ids, expected_
                 f"Expected {expected_remaining_dag_ids} to remain, but got 
{remaining_dag_ids}"
             )
 
+    def test_dag_version_cleanup_skips_referenced_rows(self):
+        """
+        Verify that dag_version rows still referenced by task_instance are not 
deleted,
+        preventing ForeignKeyViolation (issue #63703).
+
+        Scenario: dag_version created before the cutoff, but a task_instance 
created
+        after the cutoff still references it via dag_version_id (RESTRICT FK).
+        """
+        base_date = pendulum.DateTime(2022, 1, 1, 
tzinfo=pendulum.timezone("UTC"))
+
+        with create_session() as session:
+            bundle_name = "testing"
+            session.add(DagBundleModel(name=bundle_name))
+            session.flush()
+
+            dag_id = "test-dag-version-fk"
+            dag = DAG(dag_id=dag_id)
+            dm = DagModel(dag_id=dag_id, bundle_name=bundle_name)
+            session.add(dm)
+            SerializedDagModel.write_dag(LazyDeserializedDAG.from_dag(dag), 
bundle_name=bundle_name)
+            dag_version = DagVersion.get_latest_version(dag.dag_id)

Review Comment:
   This test intends `dag_version.created_at` to be before `clean_before_date`, 
but `created_at` is never set/frozen—so it will typically be ‘now’ and 
therefore *after* the cutoff, meaning the row wouldn’t be eligible for deletion 
in the first place. To ensure the test actually reproduces the FK-violation 
scenario, freeze time for the dag_version creation or explicitly set 
`dag_version.created_at` to `base_date` (and assert the precondition 
`dag_version.created_at < clean_before_date`).



##########
airflow-core/tests/unit/utils/test_db_cleanup.py:
##########
@@ -422,6 +422,65 @@ def test_cleanup_with_dag_id_filtering(self, dag_ids, 
exclude_dag_ids, expected_
                 f"Expected {expected_remaining_dag_ids} to remain, but got 
{remaining_dag_ids}"
             )
 
+    def test_dag_version_cleanup_skips_referenced_rows(self):
+        """
+        Verify that dag_version rows still referenced by task_instance are not 
deleted,
+        preventing ForeignKeyViolation (issue #63703).
+
+        Scenario: dag_version created before the cutoff, but a task_instance 
created
+        after the cutoff still references it via dag_version_id (RESTRICT FK).
+        """
+        base_date = pendulum.DateTime(2022, 1, 1, 
tzinfo=pendulum.timezone("UTC"))

Review Comment:
   This test intends `dag_version.created_at` to be before `clean_before_date`, 
but `created_at` is never set/frozen—so it will typically be ‘now’ and 
therefore *after* the cutoff, meaning the row wouldn’t be eligible for deletion 
in the first place. To ensure the test actually reproduces the FK-violation 
scenario, freeze time for the dag_version creation or explicitly set 
`dag_version.created_at` to `base_date` (and assert the precondition 
`dag_version.created_at < clean_before_date`).



##########
airflow-core/tests/unit/utils/test_db_cleanup.py:
##########
@@ -422,6 +422,65 @@ def test_cleanup_with_dag_id_filtering(self, dag_ids, 
exclude_dag_ids, expected_
                 f"Expected {expected_remaining_dag_ids} to remain, but got 
{remaining_dag_ids}"
             )
 
+    def test_dag_version_cleanup_skips_referenced_rows(self):
+        """
+        Verify that dag_version rows still referenced by task_instance are not 
deleted,
+        preventing ForeignKeyViolation (issue #63703).
+
+        Scenario: dag_version created before the cutoff, but a task_instance 
created
+        after the cutoff still references it via dag_version_id (RESTRICT FK).
+        """
+        base_date = pendulum.DateTime(2022, 1, 1, 
tzinfo=pendulum.timezone("UTC"))
+
+        with create_session() as session:
+            bundle_name = "testing"
+            session.add(DagBundleModel(name=bundle_name))
+            session.flush()
+
+            dag_id = "test-dag-version-fk"
+            dag = DAG(dag_id=dag_id)
+            dm = DagModel(dag_id=dag_id, bundle_name=bundle_name)
+            session.add(dm)
+            SerializedDagModel.write_dag(LazyDeserializedDAG.from_dag(dag), 
bundle_name=bundle_name)
+            dag_version = DagVersion.get_latest_version(dag.dag_id)
+
+            # Create a dag_run and task_instance with start_date AFTER the 
cutoff,
+            # but referencing the old dag_version (created before cutoff).
+            run_start = base_date.add(days=20)
+            dag_run = DagRun(
+                dag.dag_id,
+                run_id="run_after_cutoff",
+                run_type=DagRunType.MANUAL,
+                start_date=run_start,
+            )
+            ti = create_task_instance(
+                PythonOperator(task_id="dummy-task", python_callable=print),
+                run_id=dag_run.run_id,
+                dag_version_id=dag_version.id,
+            )
+            ti.dag_id = dag.dag_id
+            ti.start_date = run_start
+            session.add(dag_run)
+            session.add(ti)
+            session.commit()
+
+            # Clean with a cutoff that is after the dag_version creation but 
before
+            # the task_instance start_date.
+            clean_before_date = base_date.add(days=10)

Review Comment:
   This test intends `dag_version.created_at` to be before `clean_before_date`, 
but `created_at` is never set/frozen—so it will typically be ‘now’ and 
therefore *after* the cutoff, meaning the row wouldn’t be eligible for deletion 
in the first place. To ensure the test actually reproduces the FK-violation 
scenario, freeze time for the dag_version creation or explicitly set 
`dag_version.created_at` to `base_date` (and assert the precondition 
`dag_version.created_at < clean_before_date`).



##########
airflow-core/src/airflow/utils/db_cleanup.py:
##########
@@ -373,6 +382,21 @@ def _build_query(
         )
         conditions.append(column(max_date_col_name).is_(None))
     query = query.where(and_(*conditions))
+
+    # Exclude rows still referenced by child tables (FK with RESTRICT).
+    fk_check_columns = kwargs.get("fk_check_columns")
+    if fk_check_columns:
+        for child_table_name, child_fk_col, parent_pk_col in fk_check_columns:
+            child_table = table(child_table_name, column(child_fk_col))
+            parent_pk = getattr(base_table, parent_pk_col)
+            query = query.where(
+                ~exists(
+                    select(literal(1))
+                    .select_from(child_table)
+                    .where(child_table.c[child_fk_col] == parent_pk)
+                )
+            )

Review Comment:
   `base_table` is an aliased selectable and columns are accessed via `.c[...]` 
elsewhere in this function. Using `getattr(base_table, parent_pk_col)` is 
likely to fail (or produce the wrong SQL expression) because `id`/PK columns 
generally aren’t exposed as attributes on these alias objects. Use 
`base_table.c[parent_pk_col]` (and consider raising a clear error if the PK 
column isn’t present in the selectable) to ensure the FK check is actually 
correlated correctly.



##########
airflow-core/src/airflow/utils/db_cleanup.py:
##########
@@ -373,6 +382,21 @@ def _build_query(
         )
         conditions.append(column(max_date_col_name).is_(None))
     query = query.where(and_(*conditions))
+
+    # Exclude rows still referenced by child tables (FK with RESTRICT).
+    fk_check_columns = kwargs.get("fk_check_columns")
+    if fk_check_columns:

Review Comment:
   `fk_check_columns` is now a first-class behavior change in `_build_query`, 
but it’s threaded via `**kwargs`, which makes the API harder to understand and 
type-check. Consider adding an explicit, typed keyword parameter (e.g. 
`fk_check_columns: list[tuple[str, str, str]] | None = None`) to `_build_query` 
(and similarly to `_cleanup_table` if needed) rather than relying on 
`kwargs.get`.
   ```suggestion
       if fk_check_columns is not None:
   ```



##########
airflow-core/src/airflow/utils/db_cleanup.py:
##########
@@ -88,6 +88,11 @@ class _TableConfig:
     # because the relationships are unlikely to change and the number of 
tables is small.
     # Relying on automation here would increase complexity and reduce 
maintainability.
     dependent_tables: list[str] | None = None
+    # Columns in other tables that hold FK references to this table's primary 
key.
+    # Used to add NOT EXISTS guards so we don't delete rows still referenced by
+    # surviving child rows (e.g. task_instance.dag_version_id → 
dag_version.id).
+    # Each entry is (child_table_name, child_fk_column_name, 
parent_pk_column_name).
+    fk_check_columns: list[tuple[str, str, str]] | None = None

Review Comment:
   Since `fk_check_columns` changes which rows are eligible for deletion, it 
would be helpful for `dry_run`/config output to surface it (similar to 
`dependent_tables` and `keep_last*` outputs). Consider including 
`fk_check_columns` in the human-readable config that gets printed so operators 
can understand why some parent rows are being skipped.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to