wolfdn commented on code in PR #64962:
URL: https://github.com/apache/airflow/pull/64962#discussion_r3079115384
##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py:
##########
@@ -741,21 +741,33 @@ def await_container_completion(self, pod: V1Pod,
container_name: str, polling_ti
time.sleep(polling_time)
def await_pod_completion(
- self, pod: V1Pod, istio_enabled: bool = False, container_name: str =
"base"
+ self,
+ pod: V1Pod,
+ istio_enabled: bool = False,
+ container_name: str = "base",
+ do_xcom_push: bool = False,
) -> V1Pod:
"""
Monitor a pod and return the final state.
:param istio_enabled: whether istio is enabled in the namespace
:param pod: pod spec that will be monitored
:param container_name: name of the container within the pod
+ :param do_xcom_push: whether to push XComs
:return: tuple[State, str | None]
"""
Review Comment:
Done
##########
providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py:
##########
@@ -1055,6 +1055,72 @@ def
test_pod_with_istio_delete_after_await_container_error(
else:
delete_pod_mock.assert_not_called()
+ @pytest.mark.parametrize(
+ ("base_container_exit_code", "expect_failure"),
+ [
+ pytest.param(0, False, id="base-succeeded"),
+ pytest.param(1, True, id="base-failed"),
+ ],
+ )
+ @patch(f"{POD_MANAGER_CLASS}.extract_xcom")
+ @patch(f"{POD_MANAGER_CLASS}.await_xcom_sidecar_container_start")
+ @patch(f"{POD_MANAGER_CLASS}.delete_pod")
+ @patch(f"{POD_MANAGER_CLASS}.await_pod_completion")
+ @patch(f"{KPO_MODULE}.KubernetesPodOperator.find_pod")
+ def test_cleanup_with_xcom_sidecar_uses_base_container_status(
+ self,
+ find_pod_mock,
+ await_pod_completion_mock,
+ delete_pod_mock,
+ mock_await_xcom_sidecar,
+ mock_extract_xcom,
+ base_container_exit_code,
+ expect_failure,
+ ):
+ """
+ When do_xcom_push=True, cleanup should determine success/failure based
on
+ the base container's exit status, not the pod phase. The xcom sidecar
may
+ keep the pod in Running phase after the base container completes.
+ """
+ mock_extract_xcom.return_value = "{}"
+ mock_await_xcom_sidecar.return_value = None
+
+ base_status = MagicMock()
+ base_status.name = "base"
+ base_status.state.terminated.exit_code = base_container_exit_code
+ base_status.state.terminated.message = "task failed" if
base_container_exit_code else None
+
+ xcom_sidecar_status = MagicMock()
+ xcom_sidecar_status.name = "airflow-xcom-sidecar"
+ xcom_sidecar_status.state.running = True
+ xcom_sidecar_status.state.terminated = None
+
+ # Pod is still Running because xcom sidecar is alive
+ remote_pod = MagicMock()
+ remote_pod.status.phase = "Running"
+ remote_pod.status.container_statuses = [base_status,
xcom_sidecar_status]
+ remote_pod.metadata.name = "pod-with-xcom-sidecar"
+ remote_pod.metadata.namespace = "default"
+ remote_pod.spec.containers = [MagicMock(), MagicMock()]
+
+ await_pod_completion_mock.return_value = remote_pod
+ find_pod_mock.return_value = remote_pod
+
+ k = KubernetesPodOperator(
+ task_id="task",
+ do_xcom_push=True,
+ )
+
+ context = create_context(k)
+ context["ti"].xcom_push = MagicMock()
+
+ if expect_failure:
+ self.await_pod_mock.side_effect = AirflowException("fake failure")
Review Comment:
done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]