DaveT1991 commented on code in PR #64997:
URL: https://github.com/apache/airflow/pull/64997#discussion_r3066098434


##########
providers/celery/src/airflow/providers/celery/executors/celery_executor.py:
##########
@@ -225,12 +225,41 @@ def _send_workloads(self, workload_tuples_to_send: 
Sequence[WorkloadInCelery]):
                 result.backend = cached_celery_backend
                 self.running.add(key)
                 self.workloads[key] = result
+                # Persist the Celery task_id before scheduler event handling 
so a replacement
+                # scheduler can still adopt the task after a crash/restart.
+                self._persist_task_external_executor_id(key, result.task_id)
 
                 # Store the Celery task_id (workload execution ID) in the 
event buffer. This will get "overwritten" if the task
                 # has another event, but that is fine, because the only other 
events are success/failed at
                 # which point we don't need the ID anymore anyway.
                 self.event_buffer[key] = (TaskInstanceState.QUEUED, 
result.task_id)
 
+    def _persist_task_external_executor_id(self, key: WorkloadKey, task_id: 
str) -> None:
+        """Persist Celery task ids for task workloads so they survive 
scheduler restarts."""
+        from sqlalchemy import update
+
+        from airflow.models.taskinstance import TaskInstance as TI
+        from airflow.models.taskinstancekey import TaskInstanceKey

Review Comment:
   Of course, can you check now and let me know? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to