Prab-27 commented on code in PR #64997:
URL: https://github.com/apache/airflow/pull/64997#discussion_r3066045570
##########
providers/celery/src/airflow/providers/celery/executors/celery_executor.py:
##########
@@ -225,12 +225,41 @@ def _send_workloads(self, workload_tuples_to_send:
Sequence[WorkloadInCelery]):
result.backend = cached_celery_backend
self.running.add(key)
self.workloads[key] = result
+ # Persist the Celery task_id before scheduler event handling
so a replacement
+ # scheduler can still adopt the task after a crash/restart.
+ self._persist_task_external_executor_id(key, result.task_id)
# Store the Celery task_id (workload execution ID) in the
event buffer. This will get "overwritten" if the task
# has another event, but that is fine, because the only other
events are success/failed at
# which point we don't need the ID anymore anyway.
self.event_buffer[key] = (TaskInstanceState.QUEUED,
result.task_id)
+ def _persist_task_external_executor_id(self, key: WorkloadKey, task_id:
str) -> None:
+ """Persist Celery task ids for task workloads so they survive
scheduler restarts."""
+ from sqlalchemy import update
+
+ from airflow.models.taskinstance import TaskInstance as TI
+ from airflow.models.taskinstancekey import TaskInstanceKey
Review Comment:
@DaveT1991 These imports statemtns are defined above. Could we use them here
?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]