akhilesharora commented on code in PR #63020:
URL: https://github.com/apache/airflow/pull/63020#discussion_r3078729456
##########
providers/amazon/src/airflow/providers/amazon/aws/operators/eks.py:
##########
@@ -1171,6 +1173,100 @@ def trigger_reentry(self, context: Context, event:
dict[str, Any]) -> Any:
) as self.config_file:
return super().trigger_reentry(context, event)
+ def invoke_defer_method(
+ self, last_log_time: DateTime | None = None, context: Context | None =
None
+ ) -> None:
+ """
+ Override to generate a token-based kubeconfig for the triggerer.
+
+ The base KubernetesPodOperator.invoke_defer_method() calls
convert_config_file_to_dict()
+ which reads the kubeconfig file into a dict. For EKS, this kubeconfig
contains an exec
+ block that references a temp file with AWS credentials. This temp file
only exists on
+ the worker and is deleted when the context managers exit.
+
+ When the trigger is serialized and sent to the triggerer (which runs
on a different host),
+ the exec block tries to source a file that doesn't exist, causing 401
Unauthorized errors.
+
+ This override generates a kubeconfig with an embedded bearer token
instead of an exec
+ block, allowing the config to work on the triggerer without requiring
local temp files.
+ """
Review Comment:
Done
##########
providers/amazon/src/airflow/providers/amazon/aws/operators/eks.py:
##########
@@ -1171,6 +1173,100 @@ def trigger_reentry(self, context: Context, event:
dict[str, Any]) -> Any:
) as self.config_file:
return super().trigger_reentry(context, event)
+ def invoke_defer_method(
+ self, last_log_time: DateTime | None = None, context: Context | None =
None
+ ) -> None:
+ """
+ Override to generate a token-based kubeconfig for the triggerer.
+
+ The base KubernetesPodOperator.invoke_defer_method() calls
convert_config_file_to_dict()
+ which reads the kubeconfig file into a dict. For EKS, this kubeconfig
contains an exec
+ block that references a temp file with AWS credentials. This temp file
only exists on
+ the worker and is deleted when the context managers exit.
+
+ When the trigger is serialized and sent to the triggerer (which runs
on a different host),
+ the exec block tries to source a file that doesn't exist, causing 401
Unauthorized errors.
+
+ This override generates a kubeconfig with an embedded bearer token
instead of an exec
+ block, allowing the config to work on the triggerer without requiring
local temp files.
+ """
+ eks_hook = EksHook(
+ aws_conn_id=self.aws_conn_id,
+ region_name=self.region,
+ )
+
+ # Generate a kubeconfig dict with an embedded token (no exec block)
+ self._config_dict = eks_hook.generate_config_dict_for_deferral(
+ eks_cluster_name=self.cluster_name,
+ pod_namespace=self.namespace,
+ )
+
+ # Now call the parent's invoke_defer_method, but skip
convert_config_file_to_dict
+ # since we've already set self._config_dict
+ # We need to replicate the parent logic but use our config_dict
+ import datetime
+
+ from airflow.providers.cncf.kubernetes.triggers.pod import
ContainerState, KubernetesPodTrigger
+ from airflow.providers.common.compat.sdk import
AirflowNotFoundException, BaseHook
Review Comment:
Moved to module level, matches the parent class.
##########
providers/amazon/tests/unit/amazon/aws/hooks/test_eks.py:
##########
@@ -1273,6 +1273,126 @@ def test_generate_config_file(self, mock_conn,
aws_conn_id, region_name, expecte
if expected_region_args:
assert expected_region_args in command_arg
+ @mock.patch("airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook.conn")
+
@mock.patch("airflow.providers.amazon.aws.utils.eks_get_token.fetch_access_token_for_cluster")
+ def test_generate_config_dict_for_deferral(self, mock_fetch_token,
mock_conn):
+ """Test that generate_config_dict_for_deferral creates a config with
embedded token.
+
+ This test verifies that the method generates a kubeconfig dict with a
bearer token
+ embedded directly (instead of an exec block that references temp
files), allowing
+ the config to be serialized and used on the triggerer.
Review Comment:
Already single line, no change needed.
##########
providers/amazon/tests/unit/amazon/aws/operators/test_eks.py:
##########
@@ -1116,3 +1116,92 @@ def
test_refresh_cached_properties_raises_when_no_credentials(
# Verify super()._refresh_cached_properties() was NOT called since we
raised
mock_super_refresh.assert_not_called()
+
+
@mock.patch("airflow.providers.amazon.aws.operators.eks.EksPodOperator.defer")
+
@mock.patch("airflow.providers.amazon.aws.hooks.eks.EksHook.generate_config_dict_for_deferral")
+ @mock.patch("airflow.providers.amazon.aws.hooks.eks.EksHook.__init__",
return_value=None)
+ def test_invoke_defer_method_generates_token_based_config(
+ self,
+ mock_eks_hook,
+ mock_generate_config_dict,
+ mock_defer,
+ ):
+ """Test that invoke_defer_method generates a token-based config dict
for the triggerer.
+
+ This test verifies that EksPodOperator.invoke_defer_method() generates
a kubeconfig
+ with an embedded bearer token (instead of an exec block with temp file
references)
+ so that the triggerer can authenticate without requiring files that
only exist on the worker.
+ """
Review Comment:
Already single line, no change needed.
##########
providers/amazon/newsfragments/63020.bugfix.rst:
##########
@@ -0,0 +1 @@
+Fix EksPodOperator deferrable mode failing on remote triggerers with 401
Unauthorized by embedding bearer token in kubeconfig instead of using exec
block with temp file references
Review Comment:
Removed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]