Copilot commented on code in PR #64022:
URL: https://github.com/apache/airflow/pull/64022#discussion_r3066486849


##########
airflow-core/src/airflow/jobs/triggerer_job_runner.py:
##########
@@ -202,33 +202,46 @@ def _exit_gracefully(self, signum, frame) -> None:
             sys.exit(os.EX_SOFTWARE)
 
     def _execute(self) -> int | None:
-        self.log.info("Starting the triggerer")
-        self.register_signals()
-        stats_factory = stats_utils.get_stats_factory(Stats)
-        Stats.initialize(factory=stats_factory)
+        # Mark as server context for secrets backend detection when handling 
GetConnection
+        # requests from the TriggerRunner subprocess (needs MetastoreBackend).
+        # The subprocess explicitly sets _AIRFLOW_PROCESS_CONTEXT=client to 
prevent
+        # inheriting server privileges (runs user trigger/callback code).
+        # Similar to DagProcessorManager / DagProcessor child pattern.
+        _prev_ctx = os.environ.get("_AIRFLOW_PROCESS_CONTEXT")
+        os.environ["_AIRFLOW_PROCESS_CONTEXT"] = "server"
         try:
-            # Kick off runner sub-process without DB access
-            self.trigger_runner = TriggerRunnerSupervisor.start(
-                job=self.job,
-                capacity=self.capacity,
-                logger=log,
-                queues=self.queues,
-            )
+            self.log.info("Starting the triggerer")
+            self.register_signals()
+            stats_factory = stats_utils.get_stats_factory(Stats)
+            Stats.initialize(factory=stats_factory)
+            try:
+                # Kick off runner sub-process without DB access
+                self.trigger_runner = TriggerRunnerSupervisor.start(
+                    job=self.job,
+                    capacity=self.capacity,
+                    logger=log,
+                    queues=self.queues,
+                )
 
-            # Run the main DB comms loop in this process
-            self.trigger_runner.run()
-            return self.trigger_runner._exit_code
-        except Exception:
-            self.log.exception("Exception when executing 
TriggerRunnerSupervisor.run")
-            raise
+                # Run the main DB comms loop in this process
+                self.trigger_runner.run()
+                return self.trigger_runner._exit_code
+            except Exception:
+                self.log.exception("Exception when executing 
TriggerRunnerSupervisor.run")
+                raise
+            finally:
+                self.log.info("Waiting for triggers to clean up")
+                # Tell the subtproc to stop and then wait for it.
+                # If the user interrupts/terms again, _graceful_exit will 
allow them
+                # to force-kill here.
+                self.trigger_runner.kill(escalation_delay=10, force=True)
+                self.log.info("Exited trigger loop")

Review Comment:
   In `_execute()`, the `finally` block unconditionally calls 
`self.trigger_runner.kill(...)`. If `TriggerRunnerSupervisor.start(...)` raises 
(e.g. subprocess spawn failure) before assigning `self.trigger_runner`, this 
`finally` will raise `AttributeError` and mask the original exception. Guard 
the cleanup by initializing `self.trigger_runner = None` before the inner 
`try`, and only calling `kill` when a runner instance exists (or use 
`getattr(self, "trigger_runner", None)`).



##########
airflow-core/src/airflow/jobs/triggerer_job_runner.py:
##########
@@ -202,33 +202,46 @@ def _exit_gracefully(self, signum, frame) -> None:
             sys.exit(os.EX_SOFTWARE)
 
     def _execute(self) -> int | None:
-        self.log.info("Starting the triggerer")
-        self.register_signals()
-        stats_factory = stats_utils.get_stats_factory(Stats)
-        Stats.initialize(factory=stats_factory)
+        # Mark as server context for secrets backend detection when handling 
GetConnection
+        # requests from the TriggerRunner subprocess (needs MetastoreBackend).
+        # The subprocess explicitly sets _AIRFLOW_PROCESS_CONTEXT=client to 
prevent
+        # inheriting server privileges (runs user trigger/callback code).
+        # Similar to DagProcessorManager / DagProcessor child pattern.
+        _prev_ctx = os.environ.get("_AIRFLOW_PROCESS_CONTEXT")
+        os.environ["_AIRFLOW_PROCESS_CONTEXT"] = "server"
         try:
-            # Kick off runner sub-process without DB access
-            self.trigger_runner = TriggerRunnerSupervisor.start(
-                job=self.job,
-                capacity=self.capacity,
-                logger=log,
-                queues=self.queues,
-            )
+            self.log.info("Starting the triggerer")
+            self.register_signals()
+            stats_factory = stats_utils.get_stats_factory(Stats)
+            Stats.initialize(factory=stats_factory)
+            try:
+                # Kick off runner sub-process without DB access
+                self.trigger_runner = TriggerRunnerSupervisor.start(
+                    job=self.job,
+                    capacity=self.capacity,
+                    logger=log,
+                    queues=self.queues,
+                )
 
-            # Run the main DB comms loop in this process
-            self.trigger_runner.run()
-            return self.trigger_runner._exit_code
-        except Exception:
-            self.log.exception("Exception when executing 
TriggerRunnerSupervisor.run")
-            raise
+                # Run the main DB comms loop in this process
+                self.trigger_runner.run()
+                return self.trigger_runner._exit_code
+            except Exception:
+                self.log.exception("Exception when executing 
TriggerRunnerSupervisor.run")
+                raise
+            finally:
+                self.log.info("Waiting for triggers to clean up")
+                # Tell the subtproc to stop and then wait for it.
+                # If the user interrupts/terms again, _graceful_exit will 
allow them
+                # to force-kill here.
+                self.trigger_runner.kill(escalation_delay=10, force=True)
+                self.log.info("Exited trigger loop")
+            return None

Review Comment:
   `return None` after the inner `try/except/finally` appears unreachable 
because the inner `try` always either returns (`return 
self.trigger_runner._exit_code`) or raises. Removing the dead return will make 
the control flow clearer and avoid confusion during future edits.
   



##########
airflow-core/tests/unit/jobs/test_triggerer_job.py:
##########
@@ -1284,6 +1284,36 @@ def test_stats_initialize_called_on_execute(self, 
mock_supervisor_start, stats_i
         call_kwargs = stats_init_mock.call_args.kwargs
         assert "factory" in call_kwargs
 
+    @patch.object(TriggerRunnerSupervisor, "start")
+    def test_execute_sets_server_process_context(self, mock_supervisor_start, 
session, monkeypatch):
+        """_execute marks triggerer as server context for secrets backend 
detection."""
+        captured_context = {}
+

Review Comment:
   The PR description says `_AIRFLOW_PROCESS_CONTEXT=server` is set in 
`triggerer_run()` and mentions a test named 
`test_triggerer_server_context_includes_metastore_backend`, but the actual 
change sets the env var in `TriggererJobRunner._execute()` and this test only 
asserts the env var value (it doesn’t validate that 
`ensure_secrets_backend_loaded()` includes `MetastoreBackend`). Please update 
the PR description to match the implementation/test, or adjust the code/test to 
align with what’s described.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to