holmuk commented on code in PR #65058:
URL: https://github.com/apache/airflow/pull/65058#discussion_r3071582831


##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/job.py:
##########
@@ -121,24 +147,30 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
 
     async def run(self) -> AsyncIterator[TriggerEvent]:
         """Get current job status and yield a TriggerEvent."""
-        if self.do_xcom_push:
-            xcom_results = []
-            for pod_name in self.pod_names:
-                pod = await self.hook.get_pod(name=pod_name, 
namespace=self.pod_namespace)
-                await self.hook.wait_until_container_complete(
-                    name=pod_name, namespace=self.pod_namespace, 
container_name=self.base_container_name
-                )
-                self.log.info("Checking if xcom sidecar container is started.")
-                await self.hook.wait_until_container_started(
-                    name=pod_name,
-                    namespace=self.pod_namespace,
-                    container_name=PodDefaults.SIDECAR_CONTAINER_NAME,
+        xcom_results: list[str] | None = None
+        poll_interval = max(self.poll_interval, 0.1)
+
+        job_task = asyncio.create_task(
+            self.hook.wait_until_job_complete(
+                name=self.job_name,
+                namespace=self.job_namespace,
+                poll_interval=poll_interval,
+            )
+        )
+        try:
+            if self.do_xcom_push:
+                xcom_results = await self._collect_xcom_results(
+                    job_task=job_task,
+                    poll_interval=poll_interval,
                 )
-                self.log.info("Extracting result from xcom sidecar container.")
-                loop = asyncio.get_running_loop()
-                xcom_result = await loop.run_in_executor(None, 
self.pod_manager.extract_xcom, pod)
-                xcom_results.append(xcom_result)
-        job: V1Job = await 
self.hook.wait_until_job_complete(name=self.job_name, 
namespace=self.job_namespace)
+
+            job: V1Job = await job_task
+        finally:
+            if not job_task.done():
+                job_task.cancel()
+                with suppress(asyncio.CancelledError):
+                    await job_task

Review Comment:
   `job_task` doesn't create a new job in k8s, it only checks for status of an 
existing job. In `finally`, if the `job_task` is still running, we cancel the 
task to avoid leaving a background coroutine alive. The second `await job_task` 
is only waiting for the cancellation to complete.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to