Copilot commented on code in PR #65058:
URL: https://github.com/apache/airflow/pull/65058#discussion_r3068263736
##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/job.py:
##########
@@ -156,6 +187,85 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
}
)
+ async def _get_pod_or_none_for_xcom(self, pod_name: str) -> V1Pod | None:
+ try:
+ return await self.hook.get_pod(name=pod_name,
namespace=self.pod_namespace)
+ except Exception as err:
+ if self._is_not_found_error(err):
+ self.log.info("Pod '%s' no longer exists; skipping XCom
extraction.", pod_name)
+ return None
+ raise
+
+ async def _wait_until_container_complete_or_job_done(
+ self, pod_name: str, job_task: asyncio.Task[V1Job]
+ ) -> WaitOutcome:
+ return await self._wait_until_container_state_or_job_done(
+ pod_name=pod_name,
+ container_name=self.base_container_name,
+ wait_method=self.hook.wait_until_container_complete,
+ job_task=job_task,
+ state_label="completed",
+ )
+
+ async def _wait_until_sidecar_started_or_job_done(
+ self, pod_name: str, job_task: asyncio.Task[V1Job]
+ ) -> WaitOutcome:
+ return await self._wait_until_container_state_or_job_done(
+ pod_name=pod_name,
+ container_name=PodDefaults.SIDECAR_CONTAINER_NAME,
+ wait_method=self.hook.wait_until_container_started,
+ job_task=job_task,
+ state_label="running",
+ )
+
+ async def _wait_until_container_state_or_job_done(
+ self,
+ pod_name: str,
+ container_name: str,
+ wait_method: Any,
+ job_task: asyncio.Task[V1Job],
+ state_label: str,
+ ) -> WaitOutcome:
+ poll_interval = self.poll_interval if self.poll_interval > 0 else 0.1
+
+ while True:
+ try:
+ await asyncio.wait_for(
+ wait_method(
+ name=pod_name,
+ namespace=self.pod_namespace,
+ container_name=container_name,
+ poll_interval=poll_interval,
+ ),
+ timeout=poll_interval,
+ )
Review Comment:
_wait_until_container_state_or_job_done uses asyncio.wait_for(...,
timeout=poll_interval) around hook.wait_until_container_* calls. Since those
hook methods themselves poll/sleep for poll_interval, this pattern repeatedly
cancels and restarts the underlying coroutine, and can prevent ever returning
"ready" if a single get_pod call takes longer than poll_interval (common on
slow/loaded clusters). Consider running the wait_method in its own task and
using asyncio.wait to race it against job_task (or use a timeout that
comfortably exceeds the worst-case API call + sleep), so you don’t cancel the
hook call on every poll tick.
##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/job.py:
##########
@@ -121,24 +125,51 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
async def run(self) -> AsyncIterator[TriggerEvent]:
"""Get current job status and yield a TriggerEvent."""
- if self.do_xcom_push:
- xcom_results = []
- for pod_name in self.pod_names:
- pod = await self.hook.get_pod(name=pod_name,
namespace=self.pod_namespace)
- await self.hook.wait_until_container_complete(
- name=pod_name, namespace=self.pod_namespace,
container_name=self.base_container_name
- )
- self.log.info("Checking if xcom sidecar container is started.")
- await self.hook.wait_until_container_started(
- name=pod_name,
- namespace=self.pod_namespace,
- container_name=PodDefaults.SIDECAR_CONTAINER_NAME,
- )
- self.log.info("Extracting result from xcom sidecar container.")
- loop = asyncio.get_running_loop()
- xcom_result = await loop.run_in_executor(None,
self.pod_manager.extract_xcom, pod)
- xcom_results.append(xcom_result)
- job: V1Job = await
self.hook.wait_until_job_complete(name=self.job_name,
namespace=self.job_namespace)
+ xcom_results: list[str] | None = None
+ job_task = asyncio.create_task(
+ self.hook.wait_until_job_complete(
+ name=self.job_name,
+ namespace=self.job_namespace,
+ poll_interval=self.poll_interval,
Review Comment:
run() now forwards self.poll_interval directly into
hook.wait_until_job_complete. If poll_interval is configured as 0 or a negative
value, AsyncKubernetesHook.wait_until_job_complete will effectively busy-loop
(sleep(0)) and spam the API. Consider clamping to a small positive minimum
(similar to _wait_until_container_state_or_job_done’s 0.1 fallback) before
passing it to wait_until_job_complete.
```suggestion
poll_interval = max(self.poll_interval, 0.1)
job_task = asyncio.create_task(
self.hook.wait_until_job_complete(
name=self.job_name,
namespace=self.job_namespace,
poll_interval=poll_interval,
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]