holmuk commented on code in PR #65058:
URL: https://github.com/apache/airflow/pull/65058#discussion_r3071476175


##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/job.py:
##########
@@ -156,6 +188,233 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
             }
         )
 
+    async def _get_pod_or_none_for_xcom(self, pod_name: str) -> V1Pod | None:
+        try:
+            return await self.hook.get_pod(name=pod_name, 
namespace=self.pod_namespace)
+        except Exception as err:
+            if self._is_not_found_error(err):
+                self.log.info("Pod '%s' no longer exists; skipping XCom 
extraction.", pod_name)
+                return None
+            raise
+
+    async def _wait_until_container_complete_or_job_done(
+        self, pod_name: str, job_task: asyncio.Task[V1Job]
+    ) -> WaitOutcome:
+        return await self._wait_until_container_state_or_job_done(
+            pod_name=pod_name,
+            container_name=self.base_container_name,
+            wait_method=self.hook.wait_until_container_complete,
+            job_task=job_task,
+            state_label="completed",
+        )
+
+    async def _wait_until_sidecar_started_or_job_done(
+        self, pod_name: str, job_task: asyncio.Task[V1Job]
+    ) -> WaitOutcome:
+        return await self._wait_until_container_state_or_job_done(
+            pod_name=pod_name,
+            container_name=PodDefaults.SIDECAR_CONTAINER_NAME,
+            wait_method=self.hook.wait_until_container_started,
+            job_task=job_task,
+            state_label="running",
+        )
+
+    async def _wait_until_container_state_or_job_done(
+        self,
+        pod_name: str,
+        container_name: str,
+        wait_method: Any,
+        job_task: asyncio.Task[V1Job],
+        state_label: str,
+    ) -> WaitOutcome:
+        poll_interval = max(self.poll_interval, 0.1)
+        wait_task = asyncio.create_task(
+            wait_method(
+                name=pod_name,
+                namespace=self.pod_namespace,
+                container_name=container_name,
+                poll_interval=poll_interval,
+            )
+        )
+
+        try:
+            while True:
+                done, _ = await asyncio.wait(
+                    {wait_task, job_task},
+                    timeout=poll_interval,
+                    return_when=asyncio.FIRST_COMPLETED,
+                )
+
+                if wait_task in done:
+                    try:
+                        await wait_task
+                        return "ready"
+                    except Exception as err:
+                        if self._is_not_found_error(err):
+                            self.log.info(
+                                "Pod '%s' no longer exists while waiting for 
container '%s' state '%s'; skipping.",
+                                pod_name,
+                                container_name,
+                                state_label,
+                            )
+                            return "pod_missing"
+                        raise
+
+                if job_task in done:
+                    self.log.info(
+                        "Job '%s' finished before pod '%s' container '%s' 
reached state '%s'; stopping XCom wait.",
+                        self.job_name,
+                        pod_name,
+                        container_name,
+                        state_label,
+                    )
+                    return "job_done"
+        finally:
+            if not wait_task.done():
+                wait_task.cancel()
+                with suppress(asyncio.CancelledError):
+                    await wait_task
+
+    async def _collect_xcom_results(
+        self,
+        job_task: asyncio.Task[V1Job],
+        poll_interval: float,
+    ) -> list[str]:
+        pre_job_results, post_job_pod_names = await 
self._collect_xcom_until_job_done(job_task=job_task)
+        if not post_job_pod_names:
+            return pre_job_results
+
+        attempts = await self._collect_xcom_after_job_done_best_effort(
+            pod_names=post_job_pod_names,
+            poll_interval=poll_interval,
+        )
+        summary = self._summarize_post_job_attempts(attempts)
+        self._log_post_job_summary(summary=summary)
+        pre_job_results.extend(self._extract_successful_xcom_values(attempts))
+        return pre_job_results
+
+    async def _collect_xcom_until_job_done(
+        self,
+        job_task: asyncio.Task[V1Job],
+    ) -> tuple[list[str], list[str]]:
+        xcom_results: list[str] = []
+        post_job_pod_names: list[str] = []
+
+        for pod_index, pod_name in enumerate(self.pod_names):
+            pod = await self._get_pod_or_none_for_xcom(pod_name=pod_name)
+            if pod is None:
+                continue
+
+            completion_outcome = await 
self._wait_until_container_complete_or_job_done(
+                pod_name=pod_name,
+                job_task=job_task,
+            )
+            if completion_outcome == "job_done":
+                post_job_pod_names = self.pod_names[pod_index:]

Review Comment:
   `pod_name` is the pod where we got `job_done` status, and we are going to 
extract XCom from this pod and all the pods that are following the `pod_name` 
named pod. That's why we can't use `pod_name` alone.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to