This is an automated email from the ASF dual-hosted git repository.

jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 8f2c9a65dda Edge worker maintenance state is remembered if worker 
crashes (#50338)
8f2c9a65dda is described below

commit 8f2c9a65dda0898c61fd21b235cab7054b882a18
Author: majorosdonat <[email protected]>
AuthorDate: Thu May 8 15:11:05 2025 +0200

    Edge worker maintenance state is remembered if worker crashes (#50338)
    
    * Edge worker maintenance state is remembered if worker crashes
    
    * Add pytests
    
    * Consolidate code
    
    ---------
    
    Co-authored-by: Majoros Donat (XC-DX/EET2-Bp) <[email protected]>
---
 .../airflow/providers/edge3/executors/edge_executor.py | 18 ++++++++++++++++--
 .../providers/edge3/worker_api/routes/worker.py        |  7 ++++++-
 .../tests/unit/edge3/worker_api/routes/test_worker.py  | 18 ++++++++++++++++++
 3 files changed, 40 insertions(+), 3 deletions(-)

diff --git 
a/providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py 
b/providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py
index 55d5414cd89..a2ebe435652 100644
--- a/providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py
+++ b/providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py
@@ -177,7 +177,11 @@ class EdgeExecutor(BaseExecutor):
             .with_for_update(skip_locked=True)
             .filter(
                 EdgeWorkerModel.state.not_in(
-                    [EdgeWorkerState.UNKNOWN, EdgeWorkerState.OFFLINE, 
EdgeWorkerState.OFFLINE_MAINTENANCE]
+                    [
+                        EdgeWorkerState.UNKNOWN,
+                        EdgeWorkerState.OFFLINE,
+                        EdgeWorkerState.OFFLINE_MAINTENANCE,
+                    ]
                 ),
                 EdgeWorkerModel.last_update < (timezone.utcnow() - 
timedelta(seconds=heartbeat_interval * 5)),
             )
@@ -186,7 +190,17 @@ class EdgeExecutor(BaseExecutor):
 
         for worker in lifeless_workers:
             changed = True
-            worker.state = EdgeWorkerState.UNKNOWN
+            #  If the worker dies in maintenance mode we want to remember it, 
so it can start in maintenance mode
+            worker.state = (
+                EdgeWorkerState.OFFLINE_MAINTENANCE
+                if worker.state
+                in (
+                    EdgeWorkerState.MAINTENANCE_MODE,
+                    EdgeWorkerState.MAINTENANCE_PENDING,
+                    EdgeWorkerState.MAINTENANCE_REQUEST,
+                )
+                else EdgeWorkerState.UNKNOWN
+            )
             reset_metrics(worker.worker_name)
 
         return changed
diff --git 
a/providers/edge3/src/airflow/providers/edge3/worker_api/routes/worker.py 
b/providers/edge3/src/airflow/providers/edge3/worker_api/routes/worker.py
index f251d32acf9..b24330733c8 100644
--- a/providers/edge3/src/airflow/providers/edge3/worker_api/routes/worker.py
+++ b/providers/edge3/src/airflow/providers/edge3/worker_api/routes/worker.py
@@ -122,7 +122,12 @@ def redefine_state(worker_state: EdgeWorkerState, 
body_state: EdgeWorkerState) -
             EdgeWorkerState.MAINTENANCE_PENDING,
             EdgeWorkerState.MAINTENANCE_MODE,
         )
-        or worker_state == EdgeWorkerState.OFFLINE_MAINTENANCE
+        or worker_state
+        in (
+            EdgeWorkerState.OFFLINE_MAINTENANCE,
+            EdgeWorkerState.MAINTENANCE_MODE,
+            EdgeWorkerState.MAINTENANCE_PENDING,
+        )
         and body_state == EdgeWorkerState.STARTING
     ):
         return EdgeWorkerState.MAINTENANCE_REQUEST
diff --git a/providers/edge3/tests/unit/edge3/worker_api/routes/test_worker.py 
b/providers/edge3/tests/unit/edge3/worker_api/routes/test_worker.py
index 4cf5ffc8003..3553ec0e083 100644
--- a/providers/edge3/tests/unit/edge3/worker_api/routes/test_worker.py
+++ b/providers/edge3/tests/unit/edge3/worker_api/routes/test_worker.py
@@ -143,6 +143,24 @@ class TestWorkerApiRoutes:
                 EdgeWorkerState.MAINTENANCE_REQUEST,
                 id="maintenance_starting",
             ),
+            pytest.param(
+                EdgeWorkerState.MAINTENANCE_MODE,
+                EdgeWorkerState.STARTING,
+                EdgeWorkerState.MAINTENANCE_REQUEST,
+                id="maintenance_crash",
+            ),
+            pytest.param(
+                EdgeWorkerState.MAINTENANCE_PENDING,
+                EdgeWorkerState.STARTING,
+                EdgeWorkerState.MAINTENANCE_REQUEST,
+                id="maintenance_crash_2",
+            ),
+            pytest.param(
+                EdgeWorkerState.MAINTENANCE_REQUEST,
+                EdgeWorkerState.STARTING,
+                EdgeWorkerState.MAINTENANCE_REQUEST,
+                id="maintenance_crash_3",
+            ),
         ],
     )
     def test_redefine_state(

Reply via email to