Copilot commented on code in PR #65058:
URL: https://github.com/apache/airflow/pull/65058#discussion_r3068263736


##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/job.py:
##########
@@ -156,6 +187,85 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
             }
         )
 
+    async def _get_pod_or_none_for_xcom(self, pod_name: str) -> V1Pod | None:
+        try:
+            return await self.hook.get_pod(name=pod_name, 
namespace=self.pod_namespace)
+        except Exception as err:
+            if self._is_not_found_error(err):
+                self.log.info("Pod '%s' no longer exists; skipping XCom 
extraction.", pod_name)
+                return None
+            raise
+
+    async def _wait_until_container_complete_or_job_done(
+        self, pod_name: str, job_task: asyncio.Task[V1Job]
+    ) -> WaitOutcome:
+        return await self._wait_until_container_state_or_job_done(
+            pod_name=pod_name,
+            container_name=self.base_container_name,
+            wait_method=self.hook.wait_until_container_complete,
+            job_task=job_task,
+            state_label="completed",
+        )
+
+    async def _wait_until_sidecar_started_or_job_done(
+        self, pod_name: str, job_task: asyncio.Task[V1Job]
+    ) -> WaitOutcome:
+        return await self._wait_until_container_state_or_job_done(
+            pod_name=pod_name,
+            container_name=PodDefaults.SIDECAR_CONTAINER_NAME,
+            wait_method=self.hook.wait_until_container_started,
+            job_task=job_task,
+            state_label="running",
+        )
+
+    async def _wait_until_container_state_or_job_done(
+        self,
+        pod_name: str,
+        container_name: str,
+        wait_method: Any,
+        job_task: asyncio.Task[V1Job],
+        state_label: str,
+    ) -> WaitOutcome:
+        poll_interval = self.poll_interval if self.poll_interval > 0 else 0.1
+
+        while True:
+            try:
+                await asyncio.wait_for(
+                    wait_method(
+                        name=pod_name,
+                        namespace=self.pod_namespace,
+                        container_name=container_name,
+                        poll_interval=poll_interval,
+                    ),
+                    timeout=poll_interval,
+                )

Review Comment:
   _wait_until_container_state_or_job_done uses asyncio.wait_for(..., 
timeout=poll_interval) around hook.wait_until_container_* calls. Since those 
hook methods themselves poll/sleep for poll_interval, this pattern repeatedly 
cancels and restarts the underlying coroutine, and can prevent ever returning 
"ready" if a single get_pod call takes longer than poll_interval (common on 
slow/loaded clusters). Consider running the wait_method in its own task and 
using asyncio.wait to race it against job_task (or use a timeout that 
comfortably exceeds the worst-case API call + sleep), so you don’t cancel the 
hook call on every poll tick.



##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/job.py:
##########
@@ -121,24 +125,51 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
 
     async def run(self) -> AsyncIterator[TriggerEvent]:
         """Get current job status and yield a TriggerEvent."""
-        if self.do_xcom_push:
-            xcom_results = []
-            for pod_name in self.pod_names:
-                pod = await self.hook.get_pod(name=pod_name, 
namespace=self.pod_namespace)
-                await self.hook.wait_until_container_complete(
-                    name=pod_name, namespace=self.pod_namespace, 
container_name=self.base_container_name
-                )
-                self.log.info("Checking if xcom sidecar container is started.")
-                await self.hook.wait_until_container_started(
-                    name=pod_name,
-                    namespace=self.pod_namespace,
-                    container_name=PodDefaults.SIDECAR_CONTAINER_NAME,
-                )
-                self.log.info("Extracting result from xcom sidecar container.")
-                loop = asyncio.get_running_loop()
-                xcom_result = await loop.run_in_executor(None, 
self.pod_manager.extract_xcom, pod)
-                xcom_results.append(xcom_result)
-        job: V1Job = await 
self.hook.wait_until_job_complete(name=self.job_name, 
namespace=self.job_namespace)
+        xcom_results: list[str] | None = None
+        job_task = asyncio.create_task(
+            self.hook.wait_until_job_complete(
+                name=self.job_name,
+                namespace=self.job_namespace,
+                poll_interval=self.poll_interval,

Review Comment:
   run() now forwards self.poll_interval directly into 
hook.wait_until_job_complete. If poll_interval is configured as 0 or a negative 
value, AsyncKubernetesHook.wait_until_job_complete will effectively busy-loop 
(sleep(0)) and spam the API. Consider clamping to a small positive minimum 
(similar to _wait_until_container_state_or_job_done’s 0.1 fallback) before 
passing it to wait_until_job_complete.
   ```suggestion
           poll_interval = max(self.poll_interval, 0.1)
           job_task = asyncio.create_task(
               self.hook.wait_until_job_complete(
                   name=self.job_name,
                   namespace=self.job_namespace,
                   poll_interval=poll_interval,
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to