Copilot commented on code in PR #64022:
URL: https://github.com/apache/airflow/pull/64022#discussion_r3066486849
##########
airflow-core/src/airflow/jobs/triggerer_job_runner.py:
##########
@@ -202,33 +202,46 @@ def _exit_gracefully(self, signum, frame) -> None:
sys.exit(os.EX_SOFTWARE)
def _execute(self) -> int | None:
- self.log.info("Starting the triggerer")
- self.register_signals()
- stats_factory = stats_utils.get_stats_factory(Stats)
- Stats.initialize(factory=stats_factory)
+ # Mark as server context for secrets backend detection when handling
GetConnection
+ # requests from the TriggerRunner subprocess (needs MetastoreBackend).
+ # The subprocess explicitly sets _AIRFLOW_PROCESS_CONTEXT=client to
prevent
+ # inheriting server privileges (runs user trigger/callback code).
+ # Similar to DagProcessorManager / DagProcessor child pattern.
+ _prev_ctx = os.environ.get("_AIRFLOW_PROCESS_CONTEXT")
+ os.environ["_AIRFLOW_PROCESS_CONTEXT"] = "server"
try:
- # Kick off runner sub-process without DB access
- self.trigger_runner = TriggerRunnerSupervisor.start(
- job=self.job,
- capacity=self.capacity,
- logger=log,
- queues=self.queues,
- )
+ self.log.info("Starting the triggerer")
+ self.register_signals()
+ stats_factory = stats_utils.get_stats_factory(Stats)
+ Stats.initialize(factory=stats_factory)
+ try:
+ # Kick off runner sub-process without DB access
+ self.trigger_runner = TriggerRunnerSupervisor.start(
+ job=self.job,
+ capacity=self.capacity,
+ logger=log,
+ queues=self.queues,
+ )
- # Run the main DB comms loop in this process
- self.trigger_runner.run()
- return self.trigger_runner._exit_code
- except Exception:
- self.log.exception("Exception when executing
TriggerRunnerSupervisor.run")
- raise
+ # Run the main DB comms loop in this process
+ self.trigger_runner.run()
+ return self.trigger_runner._exit_code
+ except Exception:
+ self.log.exception("Exception when executing
TriggerRunnerSupervisor.run")
+ raise
+ finally:
+ self.log.info("Waiting for triggers to clean up")
+ # Tell the subtproc to stop and then wait for it.
+ # If the user interrupts/terms again, _graceful_exit will
allow them
+ # to force-kill here.
+ self.trigger_runner.kill(escalation_delay=10, force=True)
+ self.log.info("Exited trigger loop")
Review Comment:
In `_execute()`, the `finally` block unconditionally calls
`self.trigger_runner.kill(...)`. If `TriggerRunnerSupervisor.start(...)` raises
(e.g. subprocess spawn failure) before assigning `self.trigger_runner`, this
`finally` will raise `AttributeError` and mask the original exception. Guard
the cleanup by initializing `self.trigger_runner = None` before the inner
`try`, and only calling `kill` when a runner instance exists (or use
`getattr(self, "trigger_runner", None)`).
##########
airflow-core/src/airflow/jobs/triggerer_job_runner.py:
##########
@@ -202,33 +202,46 @@ def _exit_gracefully(self, signum, frame) -> None:
sys.exit(os.EX_SOFTWARE)
def _execute(self) -> int | None:
- self.log.info("Starting the triggerer")
- self.register_signals()
- stats_factory = stats_utils.get_stats_factory(Stats)
- Stats.initialize(factory=stats_factory)
+ # Mark as server context for secrets backend detection when handling
GetConnection
+ # requests from the TriggerRunner subprocess (needs MetastoreBackend).
+ # The subprocess explicitly sets _AIRFLOW_PROCESS_CONTEXT=client to
prevent
+ # inheriting server privileges (runs user trigger/callback code).
+ # Similar to DagProcessorManager / DagProcessor child pattern.
+ _prev_ctx = os.environ.get("_AIRFLOW_PROCESS_CONTEXT")
+ os.environ["_AIRFLOW_PROCESS_CONTEXT"] = "server"
try:
- # Kick off runner sub-process without DB access
- self.trigger_runner = TriggerRunnerSupervisor.start(
- job=self.job,
- capacity=self.capacity,
- logger=log,
- queues=self.queues,
- )
+ self.log.info("Starting the triggerer")
+ self.register_signals()
+ stats_factory = stats_utils.get_stats_factory(Stats)
+ Stats.initialize(factory=stats_factory)
+ try:
+ # Kick off runner sub-process without DB access
+ self.trigger_runner = TriggerRunnerSupervisor.start(
+ job=self.job,
+ capacity=self.capacity,
+ logger=log,
+ queues=self.queues,
+ )
- # Run the main DB comms loop in this process
- self.trigger_runner.run()
- return self.trigger_runner._exit_code
- except Exception:
- self.log.exception("Exception when executing
TriggerRunnerSupervisor.run")
- raise
+ # Run the main DB comms loop in this process
+ self.trigger_runner.run()
+ return self.trigger_runner._exit_code
+ except Exception:
+ self.log.exception("Exception when executing
TriggerRunnerSupervisor.run")
+ raise
+ finally:
+ self.log.info("Waiting for triggers to clean up")
+ # Tell the subtproc to stop and then wait for it.
+ # If the user interrupts/terms again, _graceful_exit will
allow them
+ # to force-kill here.
+ self.trigger_runner.kill(escalation_delay=10, force=True)
+ self.log.info("Exited trigger loop")
+ return None
Review Comment:
`return None` after the inner `try/except/finally` appears unreachable
because the inner `try` always either returns (`return
self.trigger_runner._exit_code`) or raises. Removing the dead return will make
the control flow clearer and avoid confusion during future edits.
##########
airflow-core/tests/unit/jobs/test_triggerer_job.py:
##########
@@ -1284,6 +1284,36 @@ def test_stats_initialize_called_on_execute(self,
mock_supervisor_start, stats_i
call_kwargs = stats_init_mock.call_args.kwargs
assert "factory" in call_kwargs
+ @patch.object(TriggerRunnerSupervisor, "start")
+ def test_execute_sets_server_process_context(self, mock_supervisor_start,
session, monkeypatch):
+ """_execute marks triggerer as server context for secrets backend
detection."""
+ captured_context = {}
+
Review Comment:
The PR description says `_AIRFLOW_PROCESS_CONTEXT=server` is set in
`triggerer_run()` and mentions a test named
`test_triggerer_server_context_includes_metastore_backend`, but the actual
change sets the env var in `TriggererJobRunner._execute()` and this test only
asserts the env var value (it doesn’t validate that
`ensure_secrets_backend_loaded()` includes `MetastoreBackend`). Please update
the PR description to match the implementation/test, or adjust the code/test to
align with what’s described.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]