OscarLigthart commented on code in PR #63484:
URL: https://github.com/apache/airflow/pull/63484#discussion_r3018700616


##########
airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py:
##########
@@ -293,9 +295,38 @@ def clear_dag_run(
 
     dag = dag_bag.get_dag_for_run(dag_run, session=session)
 
+    if not dag:
+        raise HTTPException(status.HTTP_404_NOT_FOUND, f"Dag with id {dag_id} 
was not found")
+
+    if body.only_new:
+        if body.dry_run:
+            new_task_ids = dag.clear(
+                run_id=dag_run_id,
+                task_ids=None,
+                only_new=True,
+                dry_run=True,
+                session=session,
+            )
+            new_tasks = [
+                NewTaskResponse(task_id=task_id, task_display_name=task_id)
+                for task_id in sorted(new_task_ids)
+            ]
+            return NewTaskCollectionResponse(
+                new_tasks=new_tasks,
+                total_entries=len(new_tasks),
+            )
+        dag.clear(
+            run_id=dag_run_id,
+            task_ids=None,
+            only_new=True,
+            session=session,
+        )
+        dag_run_cleared = session.scalar(select(DagRun).where(DagRun.id == 
dag_run.id))
+        if not dag_run_cleared:
+            raise HTTPException(status.HTTP_404_NOT_FOUND, "DAG run not found 
after clearing")
+        return dag_run_cleared

Review Comment:
   I managed to merge these paths also for the different serializers, it was a 
stupid oversight to begin with. I can still merge this into 
`TaskInstanceCollectionResponse`, but whichever way we decide to go, this part 
is simplified.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to