Nataneljpwd commented on code in PR #65058:
URL: https://github.com/apache/airflow/pull/65058#discussion_r3069697768
##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/job.py:
##########
@@ -121,24 +147,30 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
async def run(self) -> AsyncIterator[TriggerEvent]:
"""Get current job status and yield a TriggerEvent."""
- if self.do_xcom_push:
- xcom_results = []
- for pod_name in self.pod_names:
- pod = await self.hook.get_pod(name=pod_name,
namespace=self.pod_namespace)
- await self.hook.wait_until_container_complete(
- name=pod_name, namespace=self.pod_namespace,
container_name=self.base_container_name
- )
- self.log.info("Checking if xcom sidecar container is started.")
- await self.hook.wait_until_container_started(
- name=pod_name,
- namespace=self.pod_namespace,
- container_name=PodDefaults.SIDECAR_CONTAINER_NAME,
+ xcom_results: list[str] | None = None
+ poll_interval = max(self.poll_interval, 0.1)
+
+ job_task = asyncio.create_task(
+ self.hook.wait_until_job_complete(
+ name=self.job_name,
+ namespace=self.job_namespace,
+ poll_interval=poll_interval,
+ )
+ )
+ try:
+ if self.do_xcom_push:
+ xcom_results = await self._collect_xcom_results(
+ job_task=job_task,
+ poll_interval=poll_interval,
)
- self.log.info("Extracting result from xcom sidecar container.")
- loop = asyncio.get_running_loop()
- xcom_result = await loop.run_in_executor(None,
self.pod_manager.extract_xcom, pod)
- xcom_results.append(xcom_result)
- job: V1Job = await
self.hook.wait_until_job_complete(name=self.job_name,
namespace=self.job_namespace)
+
+ job: V1Job = await job_task
+ finally:
+ if not job_task.done():
+ job_task.cancel()
+ with suppress(asyncio.CancelledError):
+ await job_task
Review Comment:
This looks a little weird, we retry in the finally no matter what, even if
the job_task threw an exception, or hasn't finished yet, we retry, but if an
api request has been sent, you cannot cancel it, and so I see a case where the
request was sent, job was created, task was cancelled and then you retry
creating the job, either failing because of a unique name constraint or running
the job twice
This generally looks weird to retry in the finally block, I would suggest
either to hand ethe exception as intended or put the try except only on the
collect XCOM
##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/job.py:
##########
@@ -156,6 +188,233 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
}
)
+ async def _get_pod_or_none_for_xcom(self, pod_name: str) -> V1Pod | None:
+ try:
+ return await self.hook.get_pod(name=pod_name,
namespace=self.pod_namespace)
+ except Exception as err:
+ if self._is_not_found_error(err):
+ self.log.info("Pod '%s' no longer exists; skipping XCom
extraction.", pod_name)
+ return None
+ raise
+
+ async def _wait_until_container_complete_or_job_done(
+ self, pod_name: str, job_task: asyncio.Task[V1Job]
+ ) -> WaitOutcome:
+ return await self._wait_until_container_state_or_job_done(
+ pod_name=pod_name,
+ container_name=self.base_container_name,
+ wait_method=self.hook.wait_until_container_complete,
+ job_task=job_task,
+ state_label="completed",
+ )
+
+ async def _wait_until_sidecar_started_or_job_done(
+ self, pod_name: str, job_task: asyncio.Task[V1Job]
+ ) -> WaitOutcome:
+ return await self._wait_until_container_state_or_job_done(
+ pod_name=pod_name,
+ container_name=PodDefaults.SIDECAR_CONTAINER_NAME,
+ wait_method=self.hook.wait_until_container_started,
+ job_task=job_task,
+ state_label="running",
+ )
+
+ async def _wait_until_container_state_or_job_done(
+ self,
+ pod_name: str,
+ container_name: str,
+ wait_method: Any,
+ job_task: asyncio.Task[V1Job],
+ state_label: str,
+ ) -> WaitOutcome:
+ poll_interval = max(self.poll_interval, 0.1)
+ wait_task = asyncio.create_task(
+ wait_method(
+ name=pod_name,
+ namespace=self.pod_namespace,
+ container_name=container_name,
+ poll_interval=poll_interval,
+ )
+ )
+
+ try:
+ while True:
+ done, _ = await asyncio.wait(
+ {wait_task, job_task},
+ timeout=poll_interval,
+ return_when=asyncio.FIRST_COMPLETED,
+ )
+
+ if wait_task in done:
+ try:
+ await wait_task
+ return "ready"
+ except Exception as err:
+ if self._is_not_found_error(err):
+ self.log.info(
+ "Pod '%s' no longer exists while waiting for
container '%s' state '%s'; skipping.",
+ pod_name,
+ container_name,
+ state_label,
+ )
+ return "pod_missing"
+ raise
+
+ if job_task in done:
+ self.log.info(
+ "Job '%s' finished before pod '%s' container '%s'
reached state '%s'; stopping XCom wait.",
+ self.job_name,
+ pod_name,
+ container_name,
+ state_label,
+ )
+ return "job_done"
Review Comment:
Why is this in a while true loop? We await the first success and then either
the first or the second is completed, no need for the loop here IMO
##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/job.py:
##########
@@ -156,6 +188,233 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
}
)
+ async def _get_pod_or_none_for_xcom(self, pod_name: str) -> V1Pod | None:
+ try:
+ return await self.hook.get_pod(name=pod_name,
namespace=self.pod_namespace)
+ except Exception as err:
+ if self._is_not_found_error(err):
+ self.log.info("Pod '%s' no longer exists; skipping XCom
extraction.", pod_name)
+ return None
+ raise
+
+ async def _wait_until_container_complete_or_job_done(
+ self, pod_name: str, job_task: asyncio.Task[V1Job]
+ ) -> WaitOutcome:
+ return await self._wait_until_container_state_or_job_done(
+ pod_name=pod_name,
+ container_name=self.base_container_name,
+ wait_method=self.hook.wait_until_container_complete,
+ job_task=job_task,
+ state_label="completed",
+ )
+
+ async def _wait_until_sidecar_started_or_job_done(
+ self, pod_name: str, job_task: asyncio.Task[V1Job]
+ ) -> WaitOutcome:
+ return await self._wait_until_container_state_or_job_done(
+ pod_name=pod_name,
+ container_name=PodDefaults.SIDECAR_CONTAINER_NAME,
+ wait_method=self.hook.wait_until_container_started,
+ job_task=job_task,
+ state_label="running",
+ )
+
+ async def _wait_until_container_state_or_job_done(
+ self,
+ pod_name: str,
+ container_name: str,
+ wait_method: Any,
+ job_task: asyncio.Task[V1Job],
+ state_label: str,
+ ) -> WaitOutcome:
+ poll_interval = max(self.poll_interval, 0.1)
+ wait_task = asyncio.create_task(
+ wait_method(
+ name=pod_name,
+ namespace=self.pod_namespace,
+ container_name=container_name,
+ poll_interval=poll_interval,
+ )
+ )
+
+ try:
+ while True:
+ done, _ = await asyncio.wait(
+ {wait_task, job_task},
+ timeout=poll_interval,
+ return_when=asyncio.FIRST_COMPLETED,
+ )
+
+ if wait_task in done:
+ try:
+ await wait_task
+ return "ready"
+ except Exception as err:
+ if self._is_not_found_error(err):
+ self.log.info(
+ "Pod '%s' no longer exists while waiting for
container '%s' state '%s'; skipping.",
+ pod_name,
+ container_name,
+ state_label,
+ )
+ return "pod_missing"
+ raise
+
+ if job_task in done:
+ self.log.info(
+ "Job '%s' finished before pod '%s' container '%s'
reached state '%s'; stopping XCom wait.",
+ self.job_name,
+ pod_name,
+ container_name,
+ state_label,
+ )
+ return "job_done"
+ finally:
+ if not wait_task.done():
+ wait_task.cancel()
+ with suppress(asyncio.CancelledError):
+ await wait_task
+
+ async def _collect_xcom_results(
+ self,
+ job_task: asyncio.Task[V1Job],
+ poll_interval: float,
+ ) -> list[str]:
+ pre_job_results, post_job_pod_names = await
self._collect_xcom_until_job_done(job_task=job_task)
+ if not post_job_pod_names:
+ return pre_job_results
+
+ attempts = await self._collect_xcom_after_job_done_best_effort(
+ pod_names=post_job_pod_names,
+ poll_interval=poll_interval,
+ )
+ summary = self._summarize_post_job_attempts(attempts)
+ self._log_post_job_summary(summary=summary)
+ pre_job_results.extend(self._extract_successful_xcom_values(attempts))
+ return pre_job_results
+
+ async def _collect_xcom_until_job_done(
+ self,
+ job_task: asyncio.Task[V1Job],
+ ) -> tuple[list[str], list[str]]:
+ xcom_results: list[str] = []
+ post_job_pod_names: list[str] = []
+
+ for pod_index, pod_name in enumerate(self.pod_names):
+ pod = await self._get_pod_or_none_for_xcom(pod_name=pod_name)
+ if pod is None:
+ continue
+
+ completion_outcome = await
self._wait_until_container_complete_or_job_done(
+ pod_name=pod_name,
+ job_task=job_task,
+ )
+ if completion_outcome == "job_done":
+ post_job_pod_names = self.pod_names[pod_index:]
+ break
+ if completion_outcome == "pod_missing":
+ continue
+
+ self.log.info("Checking if xcom sidecar container is started.")
+ sidecar_outcome = await
self._wait_until_sidecar_started_or_job_done(
+ pod_name=pod_name,
+ job_task=job_task,
+ )
+ if sidecar_outcome == "job_done":
+ post_job_pod_names = self.pod_names[pod_index:]
+ break
+ if sidecar_outcome == "pod_missing":
+ continue
+
+ xcom_results.append(await
self._extract_xcom_for_ready_pod(pod=pod))
+
+ return xcom_results, post_job_pod_names
+
+ async def _extract_xcom_for_ready_pod(self, pod: V1Pod) -> str:
+ self.log.info("Extracting result from xcom sidecar container.")
+ loop = asyncio.get_running_loop()
+ return await loop.run_in_executor(None, self.pod_manager.extract_xcom,
pod)
+
+ async def _collect_xcom_after_job_done_best_effort(
+ self,
+ pod_names: list[str],
+ poll_interval: float,
+ ) -> list[PodXComAttempt]:
+ if not pod_names:
+ return []
+
+ per_pod_timeout = max(poll_interval, 0.1)
+ max_concurrency = min(5, len(pod_names))
+ semaphore = asyncio.Semaphore(max_concurrency)
+
+ self.log.info(
+ "Job is done; collecting XCom best-effort for %d pod(s) with
per-pod timeout %.2f seconds and max concurrency %d.",
+ len(pod_names),
+ per_pod_timeout,
+ max_concurrency,
+ )
+
+ async def extract_one_pod(pod_name: str) -> PodXComAttempt:
+ async with semaphore:
+ try:
+ return await asyncio.wait_for(
+
self._extract_xcom_for_pod_best_effort(pod_name=pod_name),
+ timeout=per_pod_timeout,
+ )
+ except asyncio.TimeoutError:
+ self.log.warning(
+ "Timed out extracting XCom from pod '%s' after job
completion; skipping.",
+ pod_name,
+ )
+ return PodXComAttempt(pod_name=pod_name, outcome="timeout")
+
+ return await asyncio.gather(*(extract_one_pod(pod_name) for pod_name
in pod_names))
+
+ async def _extract_xcom_for_pod_best_effort(self, pod_name: str) ->
PodXComAttempt:
+ pod = await self._get_pod_or_none_for_xcom(pod_name=pod_name)
+ if pod is None:
+ return PodXComAttempt(pod_name=pod_name, outcome="missing")
+
+ self.log.info("Extracting result from xcom sidecar container
(best-effort).")
+ loop = asyncio.get_running_loop()
+ try:
+ result = await loop.run_in_executor(None,
self.pod_manager.extract_xcom, pod)
Review Comment:
And here you do not use the method you created above, why is this run with
loop.run_in_executor?
Is there a reason?
##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/job.py:
##########
@@ -156,6 +188,233 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
}
)
+ async def _get_pod_or_none_for_xcom(self, pod_name: str) -> V1Pod | None:
+ try:
+ return await self.hook.get_pod(name=pod_name,
namespace=self.pod_namespace)
+ except Exception as err:
+ if self._is_not_found_error(err):
+ self.log.info("Pod '%s' no longer exists; skipping XCom
extraction.", pod_name)
+ return None
+ raise
+
+ async def _wait_until_container_complete_or_job_done(
+ self, pod_name: str, job_task: asyncio.Task[V1Job]
+ ) -> WaitOutcome:
+ return await self._wait_until_container_state_or_job_done(
+ pod_name=pod_name,
+ container_name=self.base_container_name,
+ wait_method=self.hook.wait_until_container_complete,
+ job_task=job_task,
+ state_label="completed",
+ )
+
+ async def _wait_until_sidecar_started_or_job_done(
+ self, pod_name: str, job_task: asyncio.Task[V1Job]
+ ) -> WaitOutcome:
+ return await self._wait_until_container_state_or_job_done(
+ pod_name=pod_name,
+ container_name=PodDefaults.SIDECAR_CONTAINER_NAME,
+ wait_method=self.hook.wait_until_container_started,
+ job_task=job_task,
+ state_label="running",
+ )
+
+ async def _wait_until_container_state_or_job_done(
+ self,
+ pod_name: str,
+ container_name: str,
+ wait_method: Any,
+ job_task: asyncio.Task[V1Job],
+ state_label: str,
+ ) -> WaitOutcome:
+ poll_interval = max(self.poll_interval, 0.1)
+ wait_task = asyncio.create_task(
+ wait_method(
+ name=pod_name,
+ namespace=self.pod_namespace,
+ container_name=container_name,
+ poll_interval=poll_interval,
+ )
+ )
+
+ try:
+ while True:
+ done, _ = await asyncio.wait(
+ {wait_task, job_task},
+ timeout=poll_interval,
+ return_when=asyncio.FIRST_COMPLETED,
+ )
+
+ if wait_task in done:
+ try:
+ await wait_task
+ return "ready"
+ except Exception as err:
Review Comment:
Except only the specific kubernetes exception
##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/job.py:
##########
@@ -156,6 +188,233 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
}
)
+ async def _get_pod_or_none_for_xcom(self, pod_name: str) -> V1Pod | None:
+ try:
+ return await self.hook.get_pod(name=pod_name,
namespace=self.pod_namespace)
+ except Exception as err:
+ if self._is_not_found_error(err):
+ self.log.info("Pod '%s' no longer exists; skipping XCom
extraction.", pod_name)
+ return None
+ raise
Review Comment:
Can just return none or not catch the exception and in the place that calls
it you can check for 404, as it will be less confusing, this logic is not too
complex to extract to a method IMO, WDYT?
##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/job.py:
##########
@@ -156,6 +188,233 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
}
)
+ async def _get_pod_or_none_for_xcom(self, pod_name: str) -> V1Pod | None:
+ try:
+ return await self.hook.get_pod(name=pod_name,
namespace=self.pod_namespace)
+ except Exception as err:
+ if self._is_not_found_error(err):
+ self.log.info("Pod '%s' no longer exists; skipping XCom
extraction.", pod_name)
+ return None
+ raise
+
+ async def _wait_until_container_complete_or_job_done(
+ self, pod_name: str, job_task: asyncio.Task[V1Job]
+ ) -> WaitOutcome:
+ return await self._wait_until_container_state_or_job_done(
+ pod_name=pod_name,
+ container_name=self.base_container_name,
+ wait_method=self.hook.wait_until_container_complete,
+ job_task=job_task,
+ state_label="completed",
+ )
+
+ async def _wait_until_sidecar_started_or_job_done(
+ self, pod_name: str, job_task: asyncio.Task[V1Job]
+ ) -> WaitOutcome:
+ return await self._wait_until_container_state_or_job_done(
+ pod_name=pod_name,
+ container_name=PodDefaults.SIDECAR_CONTAINER_NAME,
+ wait_method=self.hook.wait_until_container_started,
+ job_task=job_task,
+ state_label="running",
+ )
+
+ async def _wait_until_container_state_or_job_done(
+ self,
+ pod_name: str,
+ container_name: str,
+ wait_method: Any,
+ job_task: asyncio.Task[V1Job],
+ state_label: str,
+ ) -> WaitOutcome:
+ poll_interval = max(self.poll_interval, 0.1)
+ wait_task = asyncio.create_task(
+ wait_method(
+ name=pod_name,
+ namespace=self.pod_namespace,
+ container_name=container_name,
+ poll_interval=poll_interval,
+ )
+ )
+
+ try:
+ while True:
+ done, _ = await asyncio.wait(
+ {wait_task, job_task},
+ timeout=poll_interval,
+ return_when=asyncio.FIRST_COMPLETED,
+ )
+
+ if wait_task in done:
+ try:
+ await wait_task
+ return "ready"
+ except Exception as err:
+ if self._is_not_found_error(err):
+ self.log.info(
+ "Pod '%s' no longer exists while waiting for
container '%s' state '%s'; skipping.",
+ pod_name,
+ container_name,
+ state_label,
+ )
+ return "pod_missing"
+ raise
+
+ if job_task in done:
+ self.log.info(
+ "Job '%s' finished before pod '%s' container '%s'
reached state '%s'; stopping XCom wait.",
+ self.job_name,
+ pod_name,
+ container_name,
+ state_label,
+ )
+ return "job_done"
+ finally:
+ if not wait_task.done():
+ wait_task.cancel()
+ with suppress(asyncio.CancelledError):
+ await wait_task
+
+ async def _collect_xcom_results(
+ self,
+ job_task: asyncio.Task[V1Job],
+ poll_interval: float,
+ ) -> list[str]:
+ pre_job_results, post_job_pod_names = await
self._collect_xcom_until_job_done(job_task=job_task)
+ if not post_job_pod_names:
+ return pre_job_results
+
+ attempts = await self._collect_xcom_after_job_done_best_effort(
+ pod_names=post_job_pod_names,
+ poll_interval=poll_interval,
+ )
+ summary = self._summarize_post_job_attempts(attempts)
+ self._log_post_job_summary(summary=summary)
+ pre_job_results.extend(self._extract_successful_xcom_values(attempts))
+ return pre_job_results
+
+ async def _collect_xcom_until_job_done(
+ self,
+ job_task: asyncio.Task[V1Job],
+ ) -> tuple[list[str], list[str]]:
+ xcom_results: list[str] = []
+ post_job_pod_names: list[str] = []
+
+ for pod_index, pod_name in enumerate(self.pod_names):
+ pod = await self._get_pod_or_none_for_xcom(pod_name=pod_name)
+ if pod is None:
+ continue
+
+ completion_outcome = await
self._wait_until_container_complete_or_job_done(
+ pod_name=pod_name,
+ job_task=job_task,
+ )
+ if completion_outcome == "job_done":
+ post_job_pod_names = self.pod_names[pod_index:]
+ break
+ if completion_outcome == "pod_missing":
+ continue
+
+ self.log.info("Checking if xcom sidecar container is started.")
+ sidecar_outcome = await
self._wait_until_sidecar_started_or_job_done(
+ pod_name=pod_name,
+ job_task=job_task,
+ )
+ if sidecar_outcome == "job_done":
+ post_job_pod_names = self.pod_names[pod_index:]
+ break
+ if sidecar_outcome == "pod_missing":
+ continue
+
+ xcom_results.append(await
self._extract_xcom_for_ready_pod(pod=pod))
+
+ return xcom_results, post_job_pod_names
+
+ async def _extract_xcom_for_ready_pod(self, pod: V1Pod) -> str:
+ self.log.info("Extracting result from xcom sidecar container.")
+ loop = asyncio.get_running_loop()
+ return await loop.run_in_executor(None, self.pod_manager.extract_xcom,
pod)
+
+ async def _collect_xcom_after_job_done_best_effort(
+ self,
+ pod_names: list[str],
+ poll_interval: float,
+ ) -> list[PodXComAttempt]:
+ if not pod_names:
+ return []
+
+ per_pod_timeout = max(poll_interval, 0.1)
Review Comment:
I have seen this check a couple of times already, can't it be done once at
the top level?
##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/job.py:
##########
@@ -156,6 +188,233 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
}
)
+ async def _get_pod_or_none_for_xcom(self, pod_name: str) -> V1Pod | None:
+ try:
+ return await self.hook.get_pod(name=pod_name,
namespace=self.pod_namespace)
+ except Exception as err:
+ if self._is_not_found_error(err):
+ self.log.info("Pod '%s' no longer exists; skipping XCom
extraction.", pod_name)
+ return None
+ raise
+
+ async def _wait_until_container_complete_or_job_done(
+ self, pod_name: str, job_task: asyncio.Task[V1Job]
+ ) -> WaitOutcome:
+ return await self._wait_until_container_state_or_job_done(
+ pod_name=pod_name,
+ container_name=self.base_container_name,
+ wait_method=self.hook.wait_until_container_complete,
+ job_task=job_task,
+ state_label="completed",
+ )
+
+ async def _wait_until_sidecar_started_or_job_done(
+ self, pod_name: str, job_task: asyncio.Task[V1Job]
+ ) -> WaitOutcome:
+ return await self._wait_until_container_state_or_job_done(
+ pod_name=pod_name,
+ container_name=PodDefaults.SIDECAR_CONTAINER_NAME,
+ wait_method=self.hook.wait_until_container_started,
+ job_task=job_task,
+ state_label="running",
+ )
+
+ async def _wait_until_container_state_or_job_done(
+ self,
+ pod_name: str,
+ container_name: str,
+ wait_method: Any,
+ job_task: asyncio.Task[V1Job],
+ state_label: str,
+ ) -> WaitOutcome:
+ poll_interval = max(self.poll_interval, 0.1)
+ wait_task = asyncio.create_task(
+ wait_method(
+ name=pod_name,
+ namespace=self.pod_namespace,
+ container_name=container_name,
+ poll_interval=poll_interval,
+ )
+ )
+
+ try:
+ while True:
+ done, _ = await asyncio.wait(
+ {wait_task, job_task},
+ timeout=poll_interval,
+ return_when=asyncio.FIRST_COMPLETED,
+ )
+
+ if wait_task in done:
+ try:
+ await wait_task
Review Comment:
Why do we await a task which is done?
##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/job.py:
##########
@@ -156,6 +188,233 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
}
)
+ async def _get_pod_or_none_for_xcom(self, pod_name: str) -> V1Pod | None:
+ try:
+ return await self.hook.get_pod(name=pod_name,
namespace=self.pod_namespace)
+ except Exception as err:
+ if self._is_not_found_error(err):
+ self.log.info("Pod '%s' no longer exists; skipping XCom
extraction.", pod_name)
+ return None
+ raise
+
+ async def _wait_until_container_complete_or_job_done(
+ self, pod_name: str, job_task: asyncio.Task[V1Job]
+ ) -> WaitOutcome:
+ return await self._wait_until_container_state_or_job_done(
+ pod_name=pod_name,
+ container_name=self.base_container_name,
+ wait_method=self.hook.wait_until_container_complete,
+ job_task=job_task,
+ state_label="completed",
+ )
+
+ async def _wait_until_sidecar_started_or_job_done(
+ self, pod_name: str, job_task: asyncio.Task[V1Job]
+ ) -> WaitOutcome:
+ return await self._wait_until_container_state_or_job_done(
+ pod_name=pod_name,
+ container_name=PodDefaults.SIDECAR_CONTAINER_NAME,
+ wait_method=self.hook.wait_until_container_started,
+ job_task=job_task,
+ state_label="running",
+ )
Review Comment:
Why is this needed? Isn't it simpler to just call the methods directly? It
seems unnecessary and adds code for no reason
##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/job.py:
##########
@@ -156,6 +188,233 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
}
)
+ async def _get_pod_or_none_for_xcom(self, pod_name: str) -> V1Pod | None:
+ try:
+ return await self.hook.get_pod(name=pod_name,
namespace=self.pod_namespace)
+ except Exception as err:
+ if self._is_not_found_error(err):
+ self.log.info("Pod '%s' no longer exists; skipping XCom
extraction.", pod_name)
+ return None
+ raise
+
+ async def _wait_until_container_complete_or_job_done(
+ self, pod_name: str, job_task: asyncio.Task[V1Job]
+ ) -> WaitOutcome:
+ return await self._wait_until_container_state_or_job_done(
+ pod_name=pod_name,
+ container_name=self.base_container_name,
+ wait_method=self.hook.wait_until_container_complete,
+ job_task=job_task,
+ state_label="completed",
+ )
+
+ async def _wait_until_sidecar_started_or_job_done(
+ self, pod_name: str, job_task: asyncio.Task[V1Job]
+ ) -> WaitOutcome:
+ return await self._wait_until_container_state_or_job_done(
+ pod_name=pod_name,
+ container_name=PodDefaults.SIDECAR_CONTAINER_NAME,
+ wait_method=self.hook.wait_until_container_started,
+ job_task=job_task,
+ state_label="running",
+ )
+
+ async def _wait_until_container_state_or_job_done(
+ self,
+ pod_name: str,
+ container_name: str,
+ wait_method: Any,
+ job_task: asyncio.Task[V1Job],
+ state_label: str,
+ ) -> WaitOutcome:
+ poll_interval = max(self.poll_interval, 0.1)
+ wait_task = asyncio.create_task(
+ wait_method(
+ name=pod_name,
+ namespace=self.pod_namespace,
+ container_name=container_name,
+ poll_interval=poll_interval,
+ )
+ )
+
+ try:
+ while True:
+ done, _ = await asyncio.wait(
+ {wait_task, job_task},
+ timeout=poll_interval,
+ return_when=asyncio.FIRST_COMPLETED,
+ )
+
+ if wait_task in done:
+ try:
+ await wait_task
+ return "ready"
+ except Exception as err:
+ if self._is_not_found_error(err):
+ self.log.info(
+ "Pod '%s' no longer exists while waiting for
container '%s' state '%s'; skipping.",
+ pod_name,
+ container_name,
+ state_label,
+ )
+ return "pod_missing"
+ raise
+
+ if job_task in done:
+ self.log.info(
+ "Job '%s' finished before pod '%s' container '%s'
reached state '%s'; stopping XCom wait.",
+ self.job_name,
+ pod_name,
+ container_name,
+ state_label,
+ )
+ return "job_done"
+ finally:
+ if not wait_task.done():
+ wait_task.cancel()
+ with suppress(asyncio.CancelledError):
+ await wait_task
Review Comment:
Again, this is weird, why not just wrap only the relevant part in a try
catch and do this either way?
##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/job.py:
##########
@@ -156,6 +188,233 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
}
)
+ async def _get_pod_or_none_for_xcom(self, pod_name: str) -> V1Pod | None:
+ try:
+ return await self.hook.get_pod(name=pod_name,
namespace=self.pod_namespace)
+ except Exception as err:
+ if self._is_not_found_error(err):
+ self.log.info("Pod '%s' no longer exists; skipping XCom
extraction.", pod_name)
+ return None
+ raise
+
+ async def _wait_until_container_complete_or_job_done(
+ self, pod_name: str, job_task: asyncio.Task[V1Job]
+ ) -> WaitOutcome:
+ return await self._wait_until_container_state_or_job_done(
+ pod_name=pod_name,
+ container_name=self.base_container_name,
+ wait_method=self.hook.wait_until_container_complete,
+ job_task=job_task,
+ state_label="completed",
+ )
+
+ async def _wait_until_sidecar_started_or_job_done(
+ self, pod_name: str, job_task: asyncio.Task[V1Job]
+ ) -> WaitOutcome:
+ return await self._wait_until_container_state_or_job_done(
+ pod_name=pod_name,
+ container_name=PodDefaults.SIDECAR_CONTAINER_NAME,
+ wait_method=self.hook.wait_until_container_started,
+ job_task=job_task,
+ state_label="running",
+ )
+
+ async def _wait_until_container_state_or_job_done(
+ self,
+ pod_name: str,
+ container_name: str,
+ wait_method: Any,
+ job_task: asyncio.Task[V1Job],
+ state_label: str,
+ ) -> WaitOutcome:
+ poll_interval = max(self.poll_interval, 0.1)
+ wait_task = asyncio.create_task(
+ wait_method(
+ name=pod_name,
+ namespace=self.pod_namespace,
+ container_name=container_name,
+ poll_interval=poll_interval,
+ )
+ )
+
+ try:
+ while True:
+ done, _ = await asyncio.wait(
+ {wait_task, job_task},
+ timeout=poll_interval,
+ return_when=asyncio.FIRST_COMPLETED,
+ )
+
+ if wait_task in done:
+ try:
+ await wait_task
+ return "ready"
+ except Exception as err:
+ if self._is_not_found_error(err):
+ self.log.info(
+ "Pod '%s' no longer exists while waiting for
container '%s' state '%s'; skipping.",
+ pod_name,
+ container_name,
+ state_label,
+ )
+ return "pod_missing"
+ raise
+
+ if job_task in done:
+ self.log.info(
+ "Job '%s' finished before pod '%s' container '%s'
reached state '%s'; stopping XCom wait.",
+ self.job_name,
+ pod_name,
+ container_name,
+ state_label,
+ )
+ return "job_done"
+ finally:
+ if not wait_task.done():
+ wait_task.cancel()
+ with suppress(asyncio.CancelledError):
+ await wait_task
+
+ async def _collect_xcom_results(
+ self,
+ job_task: asyncio.Task[V1Job],
+ poll_interval: float,
+ ) -> list[str]:
+ pre_job_results, post_job_pod_names = await
self._collect_xcom_until_job_done(job_task=job_task)
+ if not post_job_pod_names:
+ return pre_job_results
+
+ attempts = await self._collect_xcom_after_job_done_best_effort(
+ pod_names=post_job_pod_names,
+ poll_interval=poll_interval,
+ )
+ summary = self._summarize_post_job_attempts(attempts)
+ self._log_post_job_summary(summary=summary)
+ pre_job_results.extend(self._extract_successful_xcom_values(attempts))
+ return pre_job_results
+
+ async def _collect_xcom_until_job_done(
+ self,
+ job_task: asyncio.Task[V1Job],
+ ) -> tuple[list[str], list[str]]:
+ xcom_results: list[str] = []
+ post_job_pod_names: list[str] = []
+
+ for pod_index, pod_name in enumerate(self.pod_names):
+ pod = await self._get_pod_or_none_for_xcom(pod_name=pod_name)
+ if pod is None:
+ continue
+
+ completion_outcome = await
self._wait_until_container_complete_or_job_done(
+ pod_name=pod_name,
+ job_task=job_task,
+ )
+ if completion_outcome == "job_done":
+ post_job_pod_names = self.pod_names[pod_index:]
+ break
+ if completion_outcome == "pod_missing":
+ continue
+
+ self.log.info("Checking if xcom sidecar container is started.")
+ sidecar_outcome = await
self._wait_until_sidecar_started_or_job_done(
+ pod_name=pod_name,
+ job_task=job_task,
+ )
+ if sidecar_outcome == "job_done":
+ post_job_pod_names = self.pod_names[pod_index:]
+ break
+ if sidecar_outcome == "pod_missing":
+ continue
+
+ xcom_results.append(await
self._extract_xcom_for_ready_pod(pod=pod))
+
+ return xcom_results, post_job_pod_names
+
+ async def _extract_xcom_for_ready_pod(self, pod: V1Pod) -> str:
+ self.log.info("Extracting result from xcom sidecar container.")
+ loop = asyncio.get_running_loop()
+ return await loop.run_in_executor(None, self.pod_manager.extract_xcom,
pod)
+
+ async def _collect_xcom_after_job_done_best_effort(
+ self,
+ pod_names: list[str],
+ poll_interval: float,
+ ) -> list[PodXComAttempt]:
+ if not pod_names:
+ return []
+
+ per_pod_timeout = max(poll_interval, 0.1)
+ max_concurrency = min(5, len(pod_names))
+ semaphore = asyncio.Semaphore(max_concurrency)
+
+ self.log.info(
+ "Job is done; collecting XCom best-effort for %d pod(s) with
per-pod timeout %.2f seconds and max concurrency %d.",
+ len(pod_names),
+ per_pod_timeout,
+ max_concurrency,
+ )
+
+ async def extract_one_pod(pod_name: str) -> PodXComAttempt:
+ async with semaphore:
+ try:
+ return await asyncio.wait_for(
+
self._extract_xcom_for_pod_best_effort(pod_name=pod_name),
+ timeout=per_pod_timeout,
+ )
+ except asyncio.TimeoutError:
+ self.log.warning(
+ "Timed out extracting XCom from pod '%s' after job
completion; skipping.",
+ pod_name,
+ )
+ return PodXComAttempt(pod_name=pod_name, outcome="timeout")
+
+ return await asyncio.gather(*(extract_one_pod(pod_name) for pod_name
in pod_names))
+
+ async def _extract_xcom_for_pod_best_effort(self, pod_name: str) ->
PodXComAttempt:
+ pod = await self._get_pod_or_none_for_xcom(pod_name=pod_name)
+ if pod is None:
+ return PodXComAttempt(pod_name=pod_name, outcome="missing")
+
+ self.log.info("Extracting result from xcom sidecar container
(best-effort).")
+ loop = asyncio.get_running_loop()
+ try:
+ result = await loop.run_in_executor(None,
self.pod_manager.extract_xcom, pod)
+ return PodXComAttempt(pod_name=pod_name, outcome="success",
result=result)
+ except Exception as err:
Review Comment:
Only except specific related exceptions, not all
##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/job.py:
##########
@@ -156,6 +188,233 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
}
)
+ async def _get_pod_or_none_for_xcom(self, pod_name: str) -> V1Pod | None:
+ try:
+ return await self.hook.get_pod(name=pod_name,
namespace=self.pod_namespace)
+ except Exception as err:
+ if self._is_not_found_error(err):
+ self.log.info("Pod '%s' no longer exists; skipping XCom
extraction.", pod_name)
+ return None
+ raise
+
+ async def _wait_until_container_complete_or_job_done(
+ self, pod_name: str, job_task: asyncio.Task[V1Job]
+ ) -> WaitOutcome:
+ return await self._wait_until_container_state_or_job_done(
+ pod_name=pod_name,
+ container_name=self.base_container_name,
+ wait_method=self.hook.wait_until_container_complete,
+ job_task=job_task,
+ state_label="completed",
+ )
+
+ async def _wait_until_sidecar_started_or_job_done(
+ self, pod_name: str, job_task: asyncio.Task[V1Job]
+ ) -> WaitOutcome:
+ return await self._wait_until_container_state_or_job_done(
+ pod_name=pod_name,
+ container_name=PodDefaults.SIDECAR_CONTAINER_NAME,
+ wait_method=self.hook.wait_until_container_started,
+ job_task=job_task,
+ state_label="running",
+ )
+
+ async def _wait_until_container_state_or_job_done(
+ self,
+ pod_name: str,
+ container_name: str,
+ wait_method: Any,
+ job_task: asyncio.Task[V1Job],
+ state_label: str,
+ ) -> WaitOutcome:
+ poll_interval = max(self.poll_interval, 0.1)
+ wait_task = asyncio.create_task(
+ wait_method(
+ name=pod_name,
+ namespace=self.pod_namespace,
+ container_name=container_name,
+ poll_interval=poll_interval,
+ )
+ )
+
+ try:
+ while True:
+ done, _ = await asyncio.wait(
+ {wait_task, job_task},
+ timeout=poll_interval,
+ return_when=asyncio.FIRST_COMPLETED,
+ )
+
+ if wait_task in done:
+ try:
+ await wait_task
+ return "ready"
+ except Exception as err:
+ if self._is_not_found_error(err):
+ self.log.info(
+ "Pod '%s' no longer exists while waiting for
container '%s' state '%s'; skipping.",
+ pod_name,
+ container_name,
+ state_label,
+ )
+ return "pod_missing"
+ raise
+
+ if job_task in done:
+ self.log.info(
+ "Job '%s' finished before pod '%s' container '%s'
reached state '%s'; stopping XCom wait.",
+ self.job_name,
+ pod_name,
+ container_name,
+ state_label,
+ )
+ return "job_done"
+ finally:
+ if not wait_task.done():
+ wait_task.cancel()
+ with suppress(asyncio.CancelledError):
+ await wait_task
+
+ async def _collect_xcom_results(
+ self,
+ job_task: asyncio.Task[V1Job],
+ poll_interval: float,
+ ) -> list[str]:
+ pre_job_results, post_job_pod_names = await
self._collect_xcom_until_job_done(job_task=job_task)
+ if not post_job_pod_names:
+ return pre_job_results
+
+ attempts = await self._collect_xcom_after_job_done_best_effort(
+ pod_names=post_job_pod_names,
+ poll_interval=poll_interval,
+ )
+ summary = self._summarize_post_job_attempts(attempts)
+ self._log_post_job_summary(summary=summary)
+ pre_job_results.extend(self._extract_successful_xcom_values(attempts))
+ return pre_job_results
+
+ async def _collect_xcom_until_job_done(
+ self,
+ job_task: asyncio.Task[V1Job],
+ ) -> tuple[list[str], list[str]]:
+ xcom_results: list[str] = []
+ post_job_pod_names: list[str] = []
+
+ for pod_index, pod_name in enumerate(self.pod_names):
+ pod = await self._get_pod_or_none_for_xcom(pod_name=pod_name)
+ if pod is None:
+ continue
+
+ completion_outcome = await
self._wait_until_container_complete_or_job_done(
+ pod_name=pod_name,
+ job_task=job_task,
+ )
+ if completion_outcome == "job_done":
+ post_job_pod_names = self.pod_names[pod_index:]
+ break
+ if completion_outcome == "pod_missing":
+ continue
+
+ self.log.info("Checking if xcom sidecar container is started.")
+ sidecar_outcome = await
self._wait_until_sidecar_started_or_job_done(
+ pod_name=pod_name,
+ job_task=job_task,
+ )
+ if sidecar_outcome == "job_done":
+ post_job_pod_names = self.pod_names[pod_index:]
+ break
+ if sidecar_outcome == "pod_missing":
+ continue
+
+ xcom_results.append(await
self._extract_xcom_for_ready_pod(pod=pod))
+
+ return xcom_results, post_job_pod_names
+
+ async def _extract_xcom_for_ready_pod(self, pod: V1Pod) -> str:
+ self.log.info("Extracting result from xcom sidecar container.")
+ loop = asyncio.get_running_loop()
+ return await loop.run_in_executor(None, self.pod_manager.extract_xcom,
pod)
+
+ async def _collect_xcom_after_job_done_best_effort(
+ self,
+ pod_names: list[str],
+ poll_interval: float,
+ ) -> list[PodXComAttempt]:
+ if not pod_names:
+ return []
+
+ per_pod_timeout = max(poll_interval, 0.1)
+ max_concurrency = min(5, len(pod_names))
+ semaphore = asyncio.Semaphore(max_concurrency)
+
+ self.log.info(
+ "Job is done; collecting XCom best-effort for %d pod(s) with
per-pod timeout %.2f seconds and max concurrency %d.",
+ len(pod_names),
+ per_pod_timeout,
+ max_concurrency,
+ )
+
+ async def extract_one_pod(pod_name: str) -> PodXComAttempt:
+ async with semaphore:
+ try:
+ return await asyncio.wait_for(
+
self._extract_xcom_for_pod_best_effort(pod_name=pod_name),
+ timeout=per_pod_timeout,
+ )
+ except asyncio.TimeoutError:
+ self.log.warning(
+ "Timed out extracting XCom from pod '%s' after job
completion; skipping.",
+ pod_name,
+ )
+ return PodXComAttempt(pod_name=pod_name, outcome="timeout")
+
+ return await asyncio.gather(*(extract_one_pod(pod_name) for pod_name
in pod_names))
+
+ async def _extract_xcom_for_pod_best_effort(self, pod_name: str) ->
PodXComAttempt:
+ pod = await self._get_pod_or_none_for_xcom(pod_name=pod_name)
+ if pod is None:
+ return PodXComAttempt(pod_name=pod_name, outcome="missing")
+
+ self.log.info("Extracting result from xcom sidecar container
(best-effort).")
+ loop = asyncio.get_running_loop()
+ try:
+ result = await loop.run_in_executor(None,
self.pod_manager.extract_xcom, pod)
+ return PodXComAttempt(pod_name=pod_name, outcome="success",
result=result)
+ except Exception as err:
+ if self._is_not_found_error(err):
+ self.log.info("Pod '%s' no longer exists during XCom
extraction; skipping.", pod_name)
+ return PodXComAttempt(pod_name=pod_name, outcome="missing")
+ self.log.warning("Unable to extract XCom from pod '%s': %r.
Skipping.", pod_name, err)
+ return PodXComAttempt(pod_name=pod_name, outcome="error")
+
+ @staticmethod
+ def _summarize_post_job_attempts(attempts: list[PodXComAttempt]) ->
PostJobXComSummary:
+ return PostJobXComSummary(
+ total=len(attempts),
+ succeeded=sum(1 for attempt in attempts if attempt.outcome ==
"success"),
+ skipped_missing=sum(1 for attempt in attempts if attempt.outcome
== "missing"),
+ timed_out=sum(1 for attempt in attempts if attempt.outcome ==
"timeout"),
+ failed_other=sum(1 for attempt in attempts if attempt.outcome ==
"error"),
+ )
+
+ @staticmethod
+ def _extract_successful_xcom_values(attempts: list[PodXComAttempt]) ->
list[str]:
+ return [attempt.result for attempt in attempts if attempt.result is
not None]
Review Comment:
Is this method here needed?
##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/job.py:
##########
@@ -156,6 +188,233 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
}
)
+ async def _get_pod_or_none_for_xcom(self, pod_name: str) -> V1Pod | None:
+ try:
+ return await self.hook.get_pod(name=pod_name,
namespace=self.pod_namespace)
+ except Exception as err:
+ if self._is_not_found_error(err):
+ self.log.info("Pod '%s' no longer exists; skipping XCom
extraction.", pod_name)
+ return None
+ raise
+
+ async def _wait_until_container_complete_or_job_done(
+ self, pod_name: str, job_task: asyncio.Task[V1Job]
+ ) -> WaitOutcome:
+ return await self._wait_until_container_state_or_job_done(
+ pod_name=pod_name,
+ container_name=self.base_container_name,
+ wait_method=self.hook.wait_until_container_complete,
+ job_task=job_task,
+ state_label="completed",
+ )
+
+ async def _wait_until_sidecar_started_or_job_done(
+ self, pod_name: str, job_task: asyncio.Task[V1Job]
+ ) -> WaitOutcome:
+ return await self._wait_until_container_state_or_job_done(
+ pod_name=pod_name,
+ container_name=PodDefaults.SIDECAR_CONTAINER_NAME,
+ wait_method=self.hook.wait_until_container_started,
+ job_task=job_task,
+ state_label="running",
+ )
+
+ async def _wait_until_container_state_or_job_done(
+ self,
+ pod_name: str,
+ container_name: str,
+ wait_method: Any,
+ job_task: asyncio.Task[V1Job],
+ state_label: str,
+ ) -> WaitOutcome:
+ poll_interval = max(self.poll_interval, 0.1)
+ wait_task = asyncio.create_task(
+ wait_method(
+ name=pod_name,
+ namespace=self.pod_namespace,
+ container_name=container_name,
+ poll_interval=poll_interval,
+ )
+ )
+
+ try:
+ while True:
+ done, _ = await asyncio.wait(
+ {wait_task, job_task},
+ timeout=poll_interval,
+ return_when=asyncio.FIRST_COMPLETED,
+ )
+
+ if wait_task in done:
+ try:
+ await wait_task
+ return "ready"
+ except Exception as err:
+ if self._is_not_found_error(err):
+ self.log.info(
+ "Pod '%s' no longer exists while waiting for
container '%s' state '%s'; skipping.",
+ pod_name,
+ container_name,
+ state_label,
+ )
+ return "pod_missing"
+ raise
+
+ if job_task in done:
+ self.log.info(
+ "Job '%s' finished before pod '%s' container '%s'
reached state '%s'; stopping XCom wait.",
+ self.job_name,
+ pod_name,
+ container_name,
+ state_label,
+ )
+ return "job_done"
+ finally:
+ if not wait_task.done():
+ wait_task.cancel()
+ with suppress(asyncio.CancelledError):
+ await wait_task
+
+ async def _collect_xcom_results(
+ self,
+ job_task: asyncio.Task[V1Job],
+ poll_interval: float,
+ ) -> list[str]:
+ pre_job_results, post_job_pod_names = await
self._collect_xcom_until_job_done(job_task=job_task)
+ if not post_job_pod_names:
+ return pre_job_results
+
+ attempts = await self._collect_xcom_after_job_done_best_effort(
+ pod_names=post_job_pod_names,
+ poll_interval=poll_interval,
+ )
+ summary = self._summarize_post_job_attempts(attempts)
+ self._log_post_job_summary(summary=summary)
+ pre_job_results.extend(self._extract_successful_xcom_values(attempts))
+ return pre_job_results
+
+ async def _collect_xcom_until_job_done(
+ self,
+ job_task: asyncio.Task[V1Job],
+ ) -> tuple[list[str], list[str]]:
+ xcom_results: list[str] = []
+ post_job_pod_names: list[str] = []
+
+ for pod_index, pod_name in enumerate(self.pod_names):
+ pod = await self._get_pod_or_none_for_xcom(pod_name=pod_name)
+ if pod is None:
+ continue
+
+ completion_outcome = await
self._wait_until_container_complete_or_job_done(
+ pod_name=pod_name,
+ job_task=job_task,
+ )
+ if completion_outcome == "job_done":
+ post_job_pod_names = self.pod_names[pod_index:]
+ break
+ if completion_outcome == "pod_missing":
+ continue
+
+ self.log.info("Checking if xcom sidecar container is started.")
+ sidecar_outcome = await
self._wait_until_sidecar_started_or_job_done(
+ pod_name=pod_name,
+ job_task=job_task,
+ )
+ if sidecar_outcome == "job_done":
+ post_job_pod_names = self.pod_names[pod_index:]
+ break
+ if sidecar_outcome == "pod_missing":
+ continue
+
+ xcom_results.append(await
self._extract_xcom_for_ready_pod(pod=pod))
+
+ return xcom_results, post_job_pod_names
+
+ async def _extract_xcom_for_ready_pod(self, pod: V1Pod) -> str:
+ self.log.info("Extracting result from xcom sidecar container.")
+ loop = asyncio.get_running_loop()
+ return await loop.run_in_executor(None, self.pod_manager.extract_xcom,
pod)
+
+ async def _collect_xcom_after_job_done_best_effort(
+ self,
+ pod_names: list[str],
+ poll_interval: float,
+ ) -> list[PodXComAttempt]:
+ if not pod_names:
+ return []
+
+ per_pod_timeout = max(poll_interval, 0.1)
+ max_concurrency = min(5, len(pod_names))
+ semaphore = asyncio.Semaphore(max_concurrency)
+
+ self.log.info(
+ "Job is done; collecting XCom best-effort for %d pod(s) with
per-pod timeout %.2f seconds and max concurrency %d.",
+ len(pod_names),
+ per_pod_timeout,
+ max_concurrency,
+ )
+
+ async def extract_one_pod(pod_name: str) -> PodXComAttempt:
+ async with semaphore:
+ try:
+ return await asyncio.wait_for(
+
self._extract_xcom_for_pod_best_effort(pod_name=pod_name),
+ timeout=per_pod_timeout,
+ )
+ except asyncio.TimeoutError:
+ self.log.warning(
+ "Timed out extracting XCom from pod '%s' after job
completion; skipping.",
+ pod_name,
+ )
+ return PodXComAttempt(pod_name=pod_name, outcome="timeout")
+
+ return await asyncio.gather(*(extract_one_pod(pod_name) for pod_name
in pod_names))
+
+ async def _extract_xcom_for_pod_best_effort(self, pod_name: str) ->
PodXComAttempt:
+ pod = await self._get_pod_or_none_for_xcom(pod_name=pod_name)
+ if pod is None:
+ return PodXComAttempt(pod_name=pod_name, outcome="missing")
+
+ self.log.info("Extracting result from xcom sidecar container
(best-effort).")
+ loop = asyncio.get_running_loop()
+ try:
+ result = await loop.run_in_executor(None,
self.pod_manager.extract_xcom, pod)
+ return PodXComAttempt(pod_name=pod_name, outcome="success",
result=result)
+ except Exception as err:
+ if self._is_not_found_error(err):
+ self.log.info("Pod '%s' no longer exists during XCom
extraction; skipping.", pod_name)
+ return PodXComAttempt(pod_name=pod_name, outcome="missing")
+ self.log.warning("Unable to extract XCom from pod '%s': %r.
Skipping.", pod_name, err)
+ return PodXComAttempt(pod_name=pod_name, outcome="error")
+
+ @staticmethod
+ def _summarize_post_job_attempts(attempts: list[PodXComAttempt]) ->
PostJobXComSummary:
+ return PostJobXComSummary(
+ total=len(attempts),
+ succeeded=sum(1 for attempt in attempts if attempt.outcome ==
"success"),
+ skipped_missing=sum(1 for attempt in attempts if attempt.outcome
== "missing"),
+ timed_out=sum(1 for attempt in attempts if attempt.outcome ==
"timeout"),
+ failed_other=sum(1 for attempt in attempts if attempt.outcome ==
"error"),
+ )
+
+ @staticmethod
+ def _extract_successful_xcom_values(attempts: list[PodXComAttempt]) ->
list[str]:
+ return [attempt.result for attempt in attempts if attempt.result is
not None]
+
+ def _log_post_job_summary(self, summary: PostJobXComSummary) -> None:
+ self.log.info(
+ "Best-effort XCom collection summary after job completion:
total=%d, succeeded=%d, skipped_missing=%d, timed_out=%d, failed_other=%d",
+ summary.total,
+ summary.succeeded,
+ summary.skipped_missing,
+ summary.timed_out,
+ summary.failed_other,
+ )
Review Comment:
Why do we need a method for logging? Just use the log line itself, if it's
used multiple times use a string template and format
##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/job.py:
##########
@@ -156,6 +188,233 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
}
)
+ async def _get_pod_or_none_for_xcom(self, pod_name: str) -> V1Pod | None:
+ try:
+ return await self.hook.get_pod(name=pod_name,
namespace=self.pod_namespace)
+ except Exception as err:
+ if self._is_not_found_error(err):
+ self.log.info("Pod '%s' no longer exists; skipping XCom
extraction.", pod_name)
+ return None
+ raise
+
+ async def _wait_until_container_complete_or_job_done(
+ self, pod_name: str, job_task: asyncio.Task[V1Job]
+ ) -> WaitOutcome:
+ return await self._wait_until_container_state_or_job_done(
+ pod_name=pod_name,
+ container_name=self.base_container_name,
+ wait_method=self.hook.wait_until_container_complete,
+ job_task=job_task,
+ state_label="completed",
+ )
+
+ async def _wait_until_sidecar_started_or_job_done(
+ self, pod_name: str, job_task: asyncio.Task[V1Job]
+ ) -> WaitOutcome:
+ return await self._wait_until_container_state_or_job_done(
+ pod_name=pod_name,
+ container_name=PodDefaults.SIDECAR_CONTAINER_NAME,
+ wait_method=self.hook.wait_until_container_started,
+ job_task=job_task,
+ state_label="running",
+ )
+
+ async def _wait_until_container_state_or_job_done(
+ self,
+ pod_name: str,
+ container_name: str,
+ wait_method: Any,
+ job_task: asyncio.Task[V1Job],
+ state_label: str,
+ ) -> WaitOutcome:
+ poll_interval = max(self.poll_interval, 0.1)
+ wait_task = asyncio.create_task(
+ wait_method(
+ name=pod_name,
+ namespace=self.pod_namespace,
+ container_name=container_name,
+ poll_interval=poll_interval,
+ )
+ )
+
+ try:
+ while True:
+ done, _ = await asyncio.wait(
+ {wait_task, job_task},
+ timeout=poll_interval,
+ return_when=asyncio.FIRST_COMPLETED,
+ )
+
+ if wait_task in done:
+ try:
+ await wait_task
+ return "ready"
+ except Exception as err:
+ if self._is_not_found_error(err):
+ self.log.info(
+ "Pod '%s' no longer exists while waiting for
container '%s' state '%s'; skipping.",
+ pod_name,
+ container_name,
+ state_label,
+ )
+ return "pod_missing"
+ raise
+
+ if job_task in done:
+ self.log.info(
+ "Job '%s' finished before pod '%s' container '%s'
reached state '%s'; stopping XCom wait.",
+ self.job_name,
+ pod_name,
+ container_name,
+ state_label,
+ )
+ return "job_done"
+ finally:
+ if not wait_task.done():
+ wait_task.cancel()
+ with suppress(asyncio.CancelledError):
+ await wait_task
+
+ async def _collect_xcom_results(
+ self,
+ job_task: asyncio.Task[V1Job],
+ poll_interval: float,
+ ) -> list[str]:
+ pre_job_results, post_job_pod_names = await
self._collect_xcom_until_job_done(job_task=job_task)
+ if not post_job_pod_names:
+ return pre_job_results
+
+ attempts = await self._collect_xcom_after_job_done_best_effort(
+ pod_names=post_job_pod_names,
+ poll_interval=poll_interval,
+ )
+ summary = self._summarize_post_job_attempts(attempts)
+ self._log_post_job_summary(summary=summary)
+ pre_job_results.extend(self._extract_successful_xcom_values(attempts))
+ return pre_job_results
+
+ async def _collect_xcom_until_job_done(
+ self,
+ job_task: asyncio.Task[V1Job],
+ ) -> tuple[list[str], list[str]]:
+ xcom_results: list[str] = []
+ post_job_pod_names: list[str] = []
+
+ for pod_index, pod_name in enumerate(self.pod_names):
+ pod = await self._get_pod_or_none_for_xcom(pod_name=pod_name)
+ if pod is None:
+ continue
+
+ completion_outcome = await
self._wait_until_container_complete_or_job_done(
+ pod_name=pod_name,
+ job_task=job_task,
+ )
+ if completion_outcome == "job_done":
+ post_job_pod_names = self.pod_names[pod_index:]
+ break
+ if completion_outcome == "pod_missing":
+ continue
+
+ self.log.info("Checking if xcom sidecar container is started.")
+ sidecar_outcome = await
self._wait_until_sidecar_started_or_job_done(
+ pod_name=pod_name,
+ job_task=job_task,
+ )
+ if sidecar_outcome == "job_done":
+ post_job_pod_names = self.pod_names[pod_index:]
+ break
+ if sidecar_outcome == "pod_missing":
+ continue
+
+ xcom_results.append(await
self._extract_xcom_for_ready_pod(pod=pod))
+
+ return xcom_results, post_job_pod_names
+
+ async def _extract_xcom_for_ready_pod(self, pod: V1Pod) -> str:
+ self.log.info("Extracting result from xcom sidecar container.")
+ loop = asyncio.get_running_loop()
+ return await loop.run_in_executor(None, self.pod_manager.extract_xcom,
pod)
Review Comment:
There is no real logic here, should this be a separate method?
##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/job.py:
##########
@@ -156,6 +188,233 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
}
)
+ async def _get_pod_or_none_for_xcom(self, pod_name: str) -> V1Pod | None:
+ try:
+ return await self.hook.get_pod(name=pod_name,
namespace=self.pod_namespace)
+ except Exception as err:
+ if self._is_not_found_error(err):
+ self.log.info("Pod '%s' no longer exists; skipping XCom
extraction.", pod_name)
+ return None
+ raise
+
+ async def _wait_until_container_complete_or_job_done(
+ self, pod_name: str, job_task: asyncio.Task[V1Job]
+ ) -> WaitOutcome:
+ return await self._wait_until_container_state_or_job_done(
+ pod_name=pod_name,
+ container_name=self.base_container_name,
+ wait_method=self.hook.wait_until_container_complete,
+ job_task=job_task,
+ state_label="completed",
+ )
+
+ async def _wait_until_sidecar_started_or_job_done(
+ self, pod_name: str, job_task: asyncio.Task[V1Job]
+ ) -> WaitOutcome:
+ return await self._wait_until_container_state_or_job_done(
+ pod_name=pod_name,
+ container_name=PodDefaults.SIDECAR_CONTAINER_NAME,
+ wait_method=self.hook.wait_until_container_started,
+ job_task=job_task,
+ state_label="running",
+ )
+
+ async def _wait_until_container_state_or_job_done(
+ self,
+ pod_name: str,
+ container_name: str,
+ wait_method: Any,
+ job_task: asyncio.Task[V1Job],
+ state_label: str,
+ ) -> WaitOutcome:
+ poll_interval = max(self.poll_interval, 0.1)
+ wait_task = asyncio.create_task(
+ wait_method(
+ name=pod_name,
+ namespace=self.pod_namespace,
+ container_name=container_name,
+ poll_interval=poll_interval,
+ )
+ )
+
+ try:
+ while True:
+ done, _ = await asyncio.wait(
+ {wait_task, job_task},
+ timeout=poll_interval,
+ return_when=asyncio.FIRST_COMPLETED,
+ )
+
+ if wait_task in done:
+ try:
+ await wait_task
+ return "ready"
+ except Exception as err:
+ if self._is_not_found_error(err):
+ self.log.info(
+ "Pod '%s' no longer exists while waiting for
container '%s' state '%s'; skipping.",
+ pod_name,
+ container_name,
+ state_label,
+ )
+ return "pod_missing"
+ raise
+
+ if job_task in done:
+ self.log.info(
+ "Job '%s' finished before pod '%s' container '%s'
reached state '%s'; stopping XCom wait.",
+ self.job_name,
+ pod_name,
+ container_name,
+ state_label,
+ )
+ return "job_done"
+ finally:
+ if not wait_task.done():
+ wait_task.cancel()
+ with suppress(asyncio.CancelledError):
+ await wait_task
+
+ async def _collect_xcom_results(
+ self,
+ job_task: asyncio.Task[V1Job],
+ poll_interval: float,
+ ) -> list[str]:
+ pre_job_results, post_job_pod_names = await
self._collect_xcom_until_job_done(job_task=job_task)
+ if not post_job_pod_names:
+ return pre_job_results
+
+ attempts = await self._collect_xcom_after_job_done_best_effort(
+ pod_names=post_job_pod_names,
+ poll_interval=poll_interval,
+ )
+ summary = self._summarize_post_job_attempts(attempts)
+ self._log_post_job_summary(summary=summary)
+ pre_job_results.extend(self._extract_successful_xcom_values(attempts))
+ return pre_job_results
+
+ async def _collect_xcom_until_job_done(
+ self,
+ job_task: asyncio.Task[V1Job],
+ ) -> tuple[list[str], list[str]]:
+ xcom_results: list[str] = []
+ post_job_pod_names: list[str] = []
+
+ for pod_index, pod_name in enumerate(self.pod_names):
+ pod = await self._get_pod_or_none_for_xcom(pod_name=pod_name)
+ if pod is None:
+ continue
+
+ completion_outcome = await
self._wait_until_container_complete_or_job_done(
+ pod_name=pod_name,
+ job_task=job_task,
+ )
+ if completion_outcome == "job_done":
+ post_job_pod_names = self.pod_names[pod_index:]
Review Comment:
Just use the pod_name variable
##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/job.py:
##########
@@ -156,6 +188,233 @@ async def run(self) -> AsyncIterator[TriggerEvent]:
}
)
+ async def _get_pod_or_none_for_xcom(self, pod_name: str) -> V1Pod | None:
+ try:
+ return await self.hook.get_pod(name=pod_name,
namespace=self.pod_namespace)
+ except Exception as err:
+ if self._is_not_found_error(err):
+ self.log.info("Pod '%s' no longer exists; skipping XCom
extraction.", pod_name)
+ return None
+ raise
+
+ async def _wait_until_container_complete_or_job_done(
+ self, pod_name: str, job_task: asyncio.Task[V1Job]
+ ) -> WaitOutcome:
+ return await self._wait_until_container_state_or_job_done(
+ pod_name=pod_name,
+ container_name=self.base_container_name,
+ wait_method=self.hook.wait_until_container_complete,
+ job_task=job_task,
+ state_label="completed",
+ )
+
+ async def _wait_until_sidecar_started_or_job_done(
+ self, pod_name: str, job_task: asyncio.Task[V1Job]
+ ) -> WaitOutcome:
+ return await self._wait_until_container_state_or_job_done(
+ pod_name=pod_name,
+ container_name=PodDefaults.SIDECAR_CONTAINER_NAME,
+ wait_method=self.hook.wait_until_container_started,
+ job_task=job_task,
+ state_label="running",
+ )
+
+ async def _wait_until_container_state_or_job_done(
+ self,
+ pod_name: str,
+ container_name: str,
+ wait_method: Any,
+ job_task: asyncio.Task[V1Job],
+ state_label: str,
+ ) -> WaitOutcome:
+ poll_interval = max(self.poll_interval, 0.1)
+ wait_task = asyncio.create_task(
+ wait_method(
+ name=pod_name,
+ namespace=self.pod_namespace,
+ container_name=container_name,
+ poll_interval=poll_interval,
+ )
+ )
+
+ try:
+ while True:
+ done, _ = await asyncio.wait(
+ {wait_task, job_task},
+ timeout=poll_interval,
+ return_when=asyncio.FIRST_COMPLETED,
+ )
+
+ if wait_task in done:
+ try:
+ await wait_task
+ return "ready"
+ except Exception as err:
+ if self._is_not_found_error(err):
+ self.log.info(
+ "Pod '%s' no longer exists while waiting for
container '%s' state '%s'; skipping.",
+ pod_name,
+ container_name,
+ state_label,
+ )
+ return "pod_missing"
+ raise
+
+ if job_task in done:
+ self.log.info(
+ "Job '%s' finished before pod '%s' container '%s'
reached state '%s'; stopping XCom wait.",
+ self.job_name,
+ pod_name,
+ container_name,
+ state_label,
+ )
+ return "job_done"
+ finally:
+ if not wait_task.done():
+ wait_task.cancel()
+ with suppress(asyncio.CancelledError):
+ await wait_task
+
+ async def _collect_xcom_results(
+ self,
+ job_task: asyncio.Task[V1Job],
+ poll_interval: float,
+ ) -> list[str]:
+ pre_job_results, post_job_pod_names = await
self._collect_xcom_until_job_done(job_task=job_task)
+ if not post_job_pod_names:
+ return pre_job_results
+
+ attempts = await self._collect_xcom_after_job_done_best_effort(
+ pod_names=post_job_pod_names,
+ poll_interval=poll_interval,
+ )
+ summary = self._summarize_post_job_attempts(attempts)
+ self._log_post_job_summary(summary=summary)
+ pre_job_results.extend(self._extract_successful_xcom_values(attempts))
+ return pre_job_results
+
+ async def _collect_xcom_until_job_done(
+ self,
+ job_task: asyncio.Task[V1Job],
+ ) -> tuple[list[str], list[str]]:
+ xcom_results: list[str] = []
+ post_job_pod_names: list[str] = []
+
+ for pod_index, pod_name in enumerate(self.pod_names):
+ pod = await self._get_pod_or_none_for_xcom(pod_name=pod_name)
+ if pod is None:
+ continue
+
+ completion_outcome = await
self._wait_until_container_complete_or_job_done(
+ pod_name=pod_name,
+ job_task=job_task,
+ )
+ if completion_outcome == "job_done":
+ post_job_pod_names = self.pod_names[pod_index:]
+ break
+ if completion_outcome == "pod_missing":
+ continue
+
+ self.log.info("Checking if xcom sidecar container is started.")
+ sidecar_outcome = await
self._wait_until_sidecar_started_or_job_done(
+ pod_name=pod_name,
+ job_task=job_task,
+ )
+ if sidecar_outcome == "job_done":
+ post_job_pod_names = self.pod_names[pod_index:]
+ break
+ if sidecar_outcome == "pod_missing":
+ continue
+
+ xcom_results.append(await
self._extract_xcom_for_ready_pod(pod=pod))
+
+ return xcom_results, post_job_pod_names
+
+ async def _extract_xcom_for_ready_pod(self, pod: V1Pod) -> str:
+ self.log.info("Extracting result from xcom sidecar container.")
+ loop = asyncio.get_running_loop()
+ return await loop.run_in_executor(None, self.pod_manager.extract_xcom,
pod)
+
+ async def _collect_xcom_after_job_done_best_effort(
+ self,
+ pod_names: list[str],
+ poll_interval: float,
+ ) -> list[PodXComAttempt]:
+ if not pod_names:
+ return []
+
+ per_pod_timeout = max(poll_interval, 0.1)
+ max_concurrency = min(5, len(pod_names))
+ semaphore = asyncio.Semaphore(max_concurrency)
+
+ self.log.info(
+ "Job is done; collecting XCom best-effort for %d pod(s) with
per-pod timeout %.2f seconds and max concurrency %d.",
+ len(pod_names),
+ per_pod_timeout,
+ max_concurrency,
+ )
+
+ async def extract_one_pod(pod_name: str) -> PodXComAttempt:
+ async with semaphore:
+ try:
+ return await asyncio.wait_for(
+
self._extract_xcom_for_pod_best_effort(pod_name=pod_name),
+ timeout=per_pod_timeout,
+ )
+ except asyncio.TimeoutError:
+ self.log.warning(
+ "Timed out extracting XCom from pod '%s' after job
completion; skipping.",
+ pod_name,
+ )
+ return PodXComAttempt(pod_name=pod_name, outcome="timeout")
Review Comment:
Why is a semaphore used here?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]