akhilesharora commented on code in PR #63020:
URL: https://github.com/apache/airflow/pull/63020#discussion_r3078728429


##########
providers/amazon/src/airflow/providers/amazon/aws/hooks/eks.py:
##########
@@ -620,11 +619,11 @@ def generate_config_file(
         cluster_cert = cluster["cluster"]["certificateAuthority"]["data"]
         cluster_ep = cluster["cluster"]["endpoint"]
 
-        os.environ["AWS_STS_REGIONAL_ENDPOINTS"] = "regional"
-        try:
-            sts_url = 
f"{StsHook(region_name=session.region_name).conn_client_meta.endpoint_url}/?Action=GetCallerIdentity&Version=2011-06-15"
-        finally:
-            del os.environ["AWS_STS_REGIONAL_ENDPOINTS"]
+        # Construct regional STS URL directly to avoid modifying 
process-global os.environ.
+        # EKS token generation requires a regional STS endpoint.
+        sts_url = (
+            
f"https://sts.{session.region_name}.amazonaws.com/?Action=GetCallerIdentity&Version=2011-06-15";
+        )

Review Comment:
   You're right, that change wasn't needed and broke partition support. 
Reverted `generate_config_file` to the original StsHook approach.



##########
providers/amazon/src/airflow/providers/amazon/aws/hooks/eks.py:
##########
@@ -678,3 +678,99 @@ def generate_config_file(
             config_file.write(config_text)
             config_file.flush()
             yield config_file.name
+
+    def generate_config_dict_for_deferral(
+        self,
+        eks_cluster_name: str,
+        pod_namespace: str | None,
+    ) -> dict:
+        """
+        Generate a kubeconfig dict with embedded token for use in deferrable 
mode.
+
+        This method generates a kubeconfig that uses a pre-fetched bearer 
token instead of
+        an exec credential plugin. This is necessary for deferrable mode 
because:
+        1. The exec plugin references temp files that only exist on the worker
+        2. The triggerer runs on a different host where those temp files don't 
exist
+        3. By embedding the token directly, the config can be serialized and 
used anywhere
+
+        Note: The token has a limited lifetime (typically 14 minutes). The 
triggerer should
+        complete its work within this window, or the trigger_reentry will 
fetch fresh credentials.
+
+        :param eks_cluster_name: The name of the cluster to generate 
kubeconfig for.
+        :param pod_namespace: The namespace to run within kubernetes.
+        :return: A kubeconfig dict with embedded bearer token.
+        """
+        from botocore.exceptions import ClientError
+
+        from airflow.providers.amazon.aws.utils.eks_get_token import 
fetch_access_token_for_cluster
+
+        # Get cluster details
+        eks_client = self.conn
+        session = self.get_session()
+
+        try:
+            cluster = eks_client.describe_cluster(name=eks_cluster_name)
+        except ClientError as e:
+            raise ValueError(
+                f"Failed to describe EKS cluster '{eks_cluster_name}': 
{e.response['Error']['Message']}"
+            ) from e
+
+        cluster_cert = cluster["cluster"]["certificateAuthority"]["data"]
+        cluster_ep = cluster["cluster"]["endpoint"]
+
+        # Generate the STS URL for token generation
+        os.environ["AWS_STS_REGIONAL_ENDPOINTS"] = "regional"
+        try:
+            sts_url = 
f"{StsHook(region_name=session.region_name).conn_client_meta.endpoint_url}/?Action=GetCallerIdentity&Version=2011-06-15"
+        finally:
+            del os.environ["AWS_STS_REGIONAL_ENDPOINTS"]
+

Review Comment:
   Understood. Reverted `generate_config_file` entirely. For 
`generate_config_dict_for_deferral` I'm using the same StsHook pattern with 
save/restore of the original env var value.



##########
providers/amazon/src/airflow/providers/amazon/aws/hooks/eks.py:
##########
@@ -678,3 +677,90 @@ def generate_config_file(
             config_file.write(config_text)
             config_file.flush()
             yield config_file.name
+
+    def generate_config_dict_for_deferral(
+        self,
+        eks_cluster_name: str,
+        pod_namespace: str | None,
+    ) -> dict:
+        """
+        Generate a kubeconfig dict with an embedded bearer token for 
deferrable execution.
+
+        The token-based config avoids the exec credential plugin so it can be 
safely
+        serialized and used by the triggerer process.
+
+        :param eks_cluster_name: The name of the EKS cluster.
+        :param pod_namespace: The Kubernetes namespace.
+        :return: Kubeconfig dictionary with embedded bearer token.
+        """
+        from botocore.exceptions import BotoCoreError, ClientError
+
+        from airflow.providers.amazon.aws.utils.eks_get_token import 
fetch_access_token_for_cluster
+
+        # Get cluster details
+        eks_client = self.conn
+        session = self.get_session()
+
+        try:
+            cluster = eks_client.describe_cluster(name=eks_cluster_name)
+        except ClientError as e:
+            raise ValueError(
+                f"Failed to describe EKS cluster '{eks_cluster_name}': 
{e.response['Error']['Message']}"
+            ) from e
+
+        cluster_cert = cluster["cluster"]["certificateAuthority"]["data"]
+        cluster_ep = cluster["cluster"]["endpoint"]
+
+        # Construct regional STS URL directly to avoid modifying 
process-global os.environ.
+        # EKS token generation requires a regional STS endpoint.
+        sts_url = (
+            
f"https://sts.{session.region_name}.amazonaws.com/?Action=GetCallerIdentity&Version=2011-06-15";
+        )
+
+        # Fetch the access token directly
+        try:
+            access_token = fetch_access_token_for_cluster(
+                eks_cluster_name=eks_cluster_name,
+                sts_url=sts_url,
+                region_name=session.region_name,
+            )
+        except (BotoCoreError, ClientError, ValueError) as e:
+            raise ValueError(f"Failed to fetch EKS access token for cluster 
'{eks_cluster_name}': {e}") from e
+
+        if not access_token:
+            raise ValueError(
+                f"Empty access token returned for EKS cluster 
'{eks_cluster_name}'. "
+                "Check AWS credentials and IAM permissions."
+            )
+
+        # Build kubeconfig with embedded token instead of exec plugin
+        return {
+            "apiVersion": "v1",
+            "kind": "Config",
+            "clusters": [
+                {
+                    "cluster": {"server": cluster_ep, 
"certificate-authority-data": cluster_cert},
+                    "name": eks_cluster_name,
+                }
+            ],
+            "contexts": [
+                {
+                    "context": {
+                        "cluster": eks_cluster_name,
+                        "namespace": pod_namespace,
+                        "user": _POD_USERNAME,
+                    },
+                    "name": _CONTEXT_NAME,
+                }
+            ],
+            "current-context": _CONTEXT_NAME,
+            "preferences": {},
+            "users": [
+                {
+                    "name": _POD_USERNAME,
+                    "user": {
+                        "token": access_token,

Review Comment:
   The token is valid for about 14 minutes. For pods that take longer the 
trigger will get 401s since `KubernetesPodTrigger` has no token refresh. 
Without this fix though, deferrable mode is completely broken on remote 
triggerers. This covers the common case of shorter-lived pods. Proper token 
refresh would need changes in `KubernetesPodTrigger`.



##########
providers/amazon/src/airflow/providers/amazon/aws/hooks/eks.py:
##########
@@ -678,3 +678,99 @@ def generate_config_file(
             config_file.write(config_text)
             config_file.flush()
             yield config_file.name
+
+    def generate_config_dict_for_deferral(
+        self,
+        eks_cluster_name: str,
+        pod_namespace: str | None,
+    ) -> dict:
+        """
+        Generate a kubeconfig dict with embedded token for use in deferrable 
mode.
+
+        This method generates a kubeconfig that uses a pre-fetched bearer 
token instead of
+        an exec credential plugin. This is necessary for deferrable mode 
because:
+        1. The exec plugin references temp files that only exist on the worker
+        2. The triggerer runs on a different host where those temp files don't 
exist
+        3. By embedding the token directly, the config can be serialized and 
used anywhere
+
+        Note: The token has a limited lifetime (typically 14 minutes). The 
triggerer should
+        complete its work within this window, or the trigger_reentry will 
fetch fresh credentials.
+
+        :param eks_cluster_name: The name of the cluster to generate 
kubeconfig for.
+        :param pod_namespace: The namespace to run within kubernetes.
+        :return: A kubeconfig dict with embedded bearer token.

Review Comment:
   Done



##########
providers/amazon/src/airflow/providers/amazon/aws/hooks/eks.py:
##########
@@ -678,3 +678,99 @@ def generate_config_file(
             config_file.write(config_text)
             config_file.flush()
             yield config_file.name
+
+    def generate_config_dict_for_deferral(
+        self,
+        eks_cluster_name: str,
+        pod_namespace: str | None,
+    ) -> dict:
+        """
+        Generate a kubeconfig dict with embedded token for use in deferrable 
mode.
+
+        This method generates a kubeconfig that uses a pre-fetched bearer 
token instead of
+        an exec credential plugin. This is necessary for deferrable mode 
because:
+        1. The exec plugin references temp files that only exist on the worker
+        2. The triggerer runs on a different host where those temp files don't 
exist
+        3. By embedding the token directly, the config can be serialized and 
used anywhere
+
+        Note: The token has a limited lifetime (typically 14 minutes). The 
triggerer should
+        complete its work within this window, or the trigger_reentry will 
fetch fresh credentials.
+
+        :param eks_cluster_name: The name of the cluster to generate 
kubeconfig for.
+        :param pod_namespace: The namespace to run within kubernetes.
+        :return: A kubeconfig dict with embedded bearer token.
+        """
+        from botocore.exceptions import ClientError
+
+        from airflow.providers.amazon.aws.utils.eks_get_token import 
fetch_access_token_for_cluster
+
+        # Get cluster details
+        eks_client = self.conn
+        session = self.get_session()
+
+        try:
+            cluster = eks_client.describe_cluster(name=eks_cluster_name)
+        except ClientError as e:
+            raise ValueError(
+                f"Failed to describe EKS cluster '{eks_cluster_name}': 
{e.response['Error']['Message']}"
+            ) from e
+
+        cluster_cert = cluster["cluster"]["certificateAuthority"]["data"]
+        cluster_ep = cluster["cluster"]["endpoint"]
+
+        # Generate the STS URL for token generation
+        os.environ["AWS_STS_REGIONAL_ENDPOINTS"] = "regional"
+        try:
+            sts_url = 
f"{StsHook(region_name=session.region_name).conn_client_meta.endpoint_url}/?Action=GetCallerIdentity&Version=2011-06-15"
+        finally:
+            del os.environ["AWS_STS_REGIONAL_ENDPOINTS"]
+
+        # Fetch the access token directly
+        try:
+            access_token = fetch_access_token_for_cluster(
+                eks_cluster_name=eks_cluster_name,
+                sts_url=sts_url,
+                region_name=session.region_name,
+            )
+        except Exception as e:

Review Comment:
   Narrowed to \`(BotoCoreError, ClientError, ValueError)\`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to