pierrejeambrun commented on code in PR #63484:
URL: https://github.com/apache/airflow/pull/63484#discussion_r3015850923
##########
airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py:
##########
@@ -281,7 +283,7 @@ def clear_dag_run(
body: DAGRunClearBody,
dag_bag: DagBagDep,
session: SessionDep,
-) -> TaskInstanceCollectionResponse | DAGRunResponse:
+) -> TaskInstanceCollectionResponse | DAGRunResponse |
NewTaskCollectionResponse:
Review Comment:
`NewTaskCollectionResponse` I don't think we should add yet another return
type here. It makes things confusing.
Does it makes sense to use `TaskInstanceCollectionResponse` and fill all
other attributes to 'None' ?
To keep things consistent.
##########
airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py:
##########
@@ -293,9 +295,38 @@ def clear_dag_run(
dag = dag_bag.get_dag_for_run(dag_run, session=session)
+ if not dag:
+ raise HTTPException(status.HTTP_404_NOT_FOUND, f"Dag with id {dag_id}
was not found")
+
+ if body.only_new:
+ if body.dry_run:
+ new_task_ids = dag.clear(
+ run_id=dag_run_id,
+ task_ids=None,
+ only_new=True,
+ dry_run=True,
+ session=session,
+ )
+ new_tasks = [
+ NewTaskResponse(task_id=task_id, task_display_name=task_id)
+ for task_id in sorted(new_task_ids)
+ ]
+ return NewTaskCollectionResponse(
+ new_tasks=new_tasks,
+ total_entries=len(new_tasks),
+ )
+ dag.clear(
+ run_id=dag_run_id,
+ task_ids=None,
+ only_new=True,
+ session=session,
+ )
+ dag_run_cleared = session.scalar(select(DagRun).where(DagRun.id ==
dag_run.id))
+ if not dag_run_cleared:
+ raise HTTPException(status.HTTP_404_NOT_FOUND, "DAG run not found
after clearing")
+ return dag_run_cleared
Review Comment:
Also having the same serializers for both will allow to merge those two
path. We only need to convert the tasks into a TaskInstanceResponse for the
`only_new` branch.
##########
airflow-core/src/airflow/ui/src/components/Clear/Run/ClearRunDialog.tsx:
##########
@@ -54,18 +65,48 @@ const ClearRunDialog = ({ dagRun, onClose, open }: Props)
=> {
const refetchInterval = useAutoRefresh({ dagId });
- const { data: affectedTasks = { task_instances: [], total_entries: 0 } } =
useClearDagRunDryRun({
+ const { data: dryRunData } = useClearDagRunDryRun({
dagId,
dagRunId,
options: {
- refetchInterval: (query) =>
- query.state.data?.task_instances.some((ti: TaskInstanceResponse) =>
isStatePending(ti.state))
+ refetchInterval: (query) => {
+ const queryData = query.state.data;
+
+ if (!queryData || isNewTaskCollection(queryData)) {
+ return false;
+ }
+
+ return queryData.task_instances.some((ti: TaskInstanceResponse) =>
isStatePending(ti.state))
? refetchInterval
- : false,
+ : false;
+ },
+ },
Review Comment:
I wouldn't do this. It's super complicated just to same some refreshes in
the modal only when 'only_new' is activated.
This would make the code simpler.
##########
airflow-core/src/airflow/ui/src/components/Clear/Run/ClearRunDialog.tsx:
##########
@@ -54,18 +65,48 @@ const ClearRunDialog = ({ dagRun, onClose, open }: Props)
=> {
const refetchInterval = useAutoRefresh({ dagId });
- const { data: affectedTasks = { task_instances: [], total_entries: 0 } } =
useClearDagRunDryRun({
+ const { data: dryRunData } = useClearDagRunDryRun({
dagId,
dagRunId,
options: {
- refetchInterval: (query) =>
- query.state.data?.task_instances.some((ti: TaskInstanceResponse) =>
isStatePending(ti.state))
+ refetchInterval: (query) => {
+ const queryData = query.state.data;
+
+ if (!queryData || isNewTaskCollection(queryData)) {
+ return false;
+ }
+
+ return queryData.task_instances.some((ti: TaskInstanceResponse) =>
isStatePending(ti.state))
? refetchInterval
- : false,
+ : false;
+ },
+ },
+ requestBody: {
+ only_failed: onlyFailed,
+ only_new: onlyNew,
+ run_on_latest_version: runOnLatestVersion,
},
- requestBody: { only_failed: onlyFailed, run_on_latest_version:
runOnLatestVersion },
});
+ // Normalise both response shapes into the format ActionAccordion expects.
+ const affectedTasks: TaskInstanceCollectionResponse = (() => {
+ const empty: TaskInstanceCollectionResponse = { task_instances: [],
total_entries: 0 };
+
+ if (!dryRunData) {
+ return empty;
+ }
+ if (isNewTaskCollection(dryRunData)) {
+ return {
+ task_instances: dryRunData.new_tasks.map(
+ (task) => ({ task_id: task.task_id }) as TaskInstanceResponse,
+ ),
+ total_entries: dryRunData.total_entries,
+ };
+ }
+
+ return dryRunData;
+ })();
Review Comment:
This should be done on the backend. so you do not have to do such extra
processing. API should return a consistent response type.
##########
airflow-core/src/airflow/ui/src/queries/useClearDagRunDryRun.ts:
##########
@@ -19,7 +19,13 @@
import { useQuery, type UseQueryOptions } from "@tanstack/react-query";
import { DagRunService } from "openapi/requests/services.gen";
-import type { DAGRunClearBody, TaskInstanceCollectionResponse } from
"openapi/requests/types.gen";
+import type {
+ DAGRunClearBody,
+ NewTaskCollectionResponse,
+ TaskInstanceCollectionResponse,
+} from "openapi/requests/types.gen";
+
+type ClearDagRunDryRunResponse = NewTaskCollectionResponse |
TaskInstanceCollectionResponse;
Review Comment:
Will avoid such union
##########
airflow-core/src/airflow/ui/src/components/Clear/Run/ClearRunDialog.tsx:
##########
@@ -78,12 +119,14 @@ const ClearRunDialog = ({ dagRun, onClose, open }: Props)
=> {
onSuccess: onClose,
});
- // Check if bundle versions are different
- const currentDagBundleVersion = dagDetails?.bundle_version;
- const dagRunBundleVersion = dagRun.bundle_version;
- const bundleVersionsDiffer = currentDagBundleVersion !== dagRunBundleVersion;
- const shouldShowBundleVersionOption =
- bundleVersionsDiffer && dagRunBundleVersion !== null &&
dagRunBundleVersion !== "";
+ // Check if DAG versions differ (works for both bundle-versioned and local
bundles)
+ const latestDagVersionNumber =
dagDetails?.latest_dag_version?.version_number;
+ const dagRunVersionNumber = dagRun.dag_versions.at(-1)?.version_number;
+ const versionsDiffer =
+ latestDagVersionNumber !== undefined &&
+ dagRunVersionNumber !== undefined &&
+ latestDagVersionNumber !== dagRunVersionNumber;
+ const shouldShowBundleVersionOption = versionsDiffer && !onlyNew;
Review Comment:
There was a 'null' check before too, should we verify that both are !==
undefined and !== null?
What happens if `dagRunBundleVersion` is null ? we will show the bundle
version option while this is an unversioned bundle?
##########
airflow-core/src/airflow/ui/src/components/Clear/Run/ClearRunDialog.tsx:
##########
@@ -45,6 +55,7 @@ const ClearRunDialog = ({ dagRun, onClose, open }: Props) => {
const [note, setNote] = useState<string | null>(dagRun.note);
const [selectedOptions, setSelectedOptions] =
useState<Array<string>>(["existingTasks"]);
const onlyFailed = selectedOptions.includes("onlyFailed");
+ const onlyNew = selectedOptions.includes("new_tasks");
Review Comment:
camelCase. Similarly to "onlyFailed"
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]