This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch 22805-2 in repository https://gitbox.apache.org/repos/asf/camel.git
commit ac06158009734f3755974ad18bb09eae6d00ea1e Author: Andrea Cosentino <[email protected]> AuthorDate: Mon Jan 12 11:56:03 2026 +0100 CAMEL-22838 - Camel-AWS components: Avoid duplicated code and add pagination to producer operation where it makes sense - AWS EKS Signed-off-by: Andrea Cosentino <[email protected]> --- .../apache/camel/catalog/components/aws2-eks.json | 5 +- .../apache/camel/component/aws2/eks/aws2-eks.json | 5 +- .../camel/component/aws2/eks/EKS2Constants.java | 13 + .../camel/component/aws2/eks/EKS2Producer.java | 261 ++++++++++++--------- .../endpoint/dsl/EKS2EndpointBuilderFactory.java | 36 +++ 5 files changed, 201 insertions(+), 119 deletions(-) diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/aws2-eks.json b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/aws2-eks.json index bac6a8cfac64..1ad2e0e2beed 100644 --- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/aws2-eks.json +++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/aws2-eks.json @@ -53,7 +53,10 @@ "CamelAwsEKSDescription": { "index": 2, "kind": "header", "displayName": "", "group": "producer", "label": "", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "A key description to use while performing a createKey operation", "constantName": "org.apache.camel.component.aws2.eks.EKS2Constants#DESCRIPTION" }, "CamelAwsEKSClusterName": { "index": 3, "kind": "header", "displayName": "", "group": "producer", "label": "", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The cluster name", "constantName": "org.apache.camel.component.aws2.eks.EKS2Constants#CLUSTER_NAME" }, "CamelAwsEKSRoleARN": { "index": 4, "kind": "header", "displayName": "", "group": "producer", "label": "", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The role ARN to use while creating the cluster", "constantName": "org.apache.camel.component.aws2.eks.EKS2Constants#ROLE_ARN" }, - "CamelAwsEKSVPCConfig": { "index": 5, "kind": "header", "displayName": "", "group": "producer", "label": "", "required": false, "javaType": "software.amazon.awssdk.services.eks.model.VpcConfigRequest", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The VPC config for the creations of an EKS cluster", "constantName": "org.apache.camel.component.aws2.eks.EKS2Constants#VPC_CONFIG" } + "CamelAwsEKSVPCConfig": { "index": 5, "kind": "header", "displayName": "", "group": "producer", "label": "", "required": false, "javaType": "software.amazon.awssdk.services.eks.model.VpcConfigRequest", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The VPC config for the creations of an EKS cluster", "constantName": "org.apache.camel.component.aws2.eks.EKS2Constants#VPC_CONFIG" }, + "CamelAwsEKSNextToken": { "index": 6, "kind": "header", "displayName": "", "group": "listClusters", "label": "listClusters", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The token for the next set of results.", "constantName": "org.apache.camel.component.aws2.eks.EKS2Constants#NEXT_TOKEN" }, + "CamelAwsEKSIsTruncated": { "index": 7, "kind": "header", "displayName": "", "group": "listClusters", "label": "listClusters", "required": false, "javaType": "Boolean", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Whether the response has more results (is truncated).", "constantName": "org.apache.camel.component.aws2.eks.EKS2Constants#IS_TRUNCATED" }, + "CamelAwsEKSClusterArn": { "index": 8, "kind": "header", "displayName": "", "group": "createCluster describeCluster deleteCluster", "label": "createCluster describeCluster deleteCluster", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The ARN of the cluster.", "constantName": "org.apache.camel.component.aws2.eks.EKS2Constants#CLUSTER_ARN" } }, "properties": { "label": { "index": 0, "kind": "path", "displayName": "Label", "group": "producer", "label": "", "required": true, "type": "string", "javaType": "java.lang.String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.aws2.eks.EKS2Configuration", "configurationField": "configuration", "description": "Logical name" }, diff --git a/components/camel-aws/camel-aws2-eks/src/generated/resources/META-INF/org/apache/camel/component/aws2/eks/aws2-eks.json b/components/camel-aws/camel-aws2-eks/src/generated/resources/META-INF/org/apache/camel/component/aws2/eks/aws2-eks.json index bac6a8cfac64..1ad2e0e2beed 100644 --- a/components/camel-aws/camel-aws2-eks/src/generated/resources/META-INF/org/apache/camel/component/aws2/eks/aws2-eks.json +++ b/components/camel-aws/camel-aws2-eks/src/generated/resources/META-INF/org/apache/camel/component/aws2/eks/aws2-eks.json @@ -53,7 +53,10 @@ "CamelAwsEKSDescription": { "index": 2, "kind": "header", "displayName": "", "group": "producer", "label": "", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "A key description to use while performing a createKey operation", "constantName": "org.apache.camel.component.aws2.eks.EKS2Constants#DESCRIPTION" }, "CamelAwsEKSClusterName": { "index": 3, "kind": "header", "displayName": "", "group": "producer", "label": "", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The cluster name", "constantName": "org.apache.camel.component.aws2.eks.EKS2Constants#CLUSTER_NAME" }, "CamelAwsEKSRoleARN": { "index": 4, "kind": "header", "displayName": "", "group": "producer", "label": "", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The role ARN to use while creating the cluster", "constantName": "org.apache.camel.component.aws2.eks.EKS2Constants#ROLE_ARN" }, - "CamelAwsEKSVPCConfig": { "index": 5, "kind": "header", "displayName": "", "group": "producer", "label": "", "required": false, "javaType": "software.amazon.awssdk.services.eks.model.VpcConfigRequest", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The VPC config for the creations of an EKS cluster", "constantName": "org.apache.camel.component.aws2.eks.EKS2Constants#VPC_CONFIG" } + "CamelAwsEKSVPCConfig": { "index": 5, "kind": "header", "displayName": "", "group": "producer", "label": "", "required": false, "javaType": "software.amazon.awssdk.services.eks.model.VpcConfigRequest", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The VPC config for the creations of an EKS cluster", "constantName": "org.apache.camel.component.aws2.eks.EKS2Constants#VPC_CONFIG" }, + "CamelAwsEKSNextToken": { "index": 6, "kind": "header", "displayName": "", "group": "listClusters", "label": "listClusters", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The token for the next set of results.", "constantName": "org.apache.camel.component.aws2.eks.EKS2Constants#NEXT_TOKEN" }, + "CamelAwsEKSIsTruncated": { "index": 7, "kind": "header", "displayName": "", "group": "listClusters", "label": "listClusters", "required": false, "javaType": "Boolean", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Whether the response has more results (is truncated).", "constantName": "org.apache.camel.component.aws2.eks.EKS2Constants#IS_TRUNCATED" }, + "CamelAwsEKSClusterArn": { "index": 8, "kind": "header", "displayName": "", "group": "createCluster describeCluster deleteCluster", "label": "createCluster describeCluster deleteCluster", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The ARN of the cluster.", "constantName": "org.apache.camel.component.aws2.eks.EKS2Constants#CLUSTER_ARN" } }, "properties": { "label": { "index": 0, "kind": "path", "displayName": "Label", "group": "producer", "label": "", "required": true, "type": "string", "javaType": "java.lang.String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.aws2.eks.EKS2Configuration", "configurationField": "configuration", "description": "Logical name" }, diff --git a/components/camel-aws/camel-aws2-eks/src/main/java/org/apache/camel/component/aws2/eks/EKS2Constants.java b/components/camel-aws/camel-aws2-eks/src/main/java/org/apache/camel/component/aws2/eks/EKS2Constants.java index f673f41ee0f5..f1ffaea2059f 100644 --- a/components/camel-aws/camel-aws2-eks/src/main/java/org/apache/camel/component/aws2/eks/EKS2Constants.java +++ b/components/camel-aws/camel-aws2-eks/src/main/java/org/apache/camel/component/aws2/eks/EKS2Constants.java @@ -35,4 +35,17 @@ public interface EKS2Constants { @Metadata(description = "The VPC config for the creations of an EKS cluster", javaType = "software.amazon.awssdk.services.eks.model.VpcConfigRequest") String VPC_CONFIG = "CamelAwsEKSVPCConfig"; + + // Pagination constants + @Metadata(label = "listClusters", + description = "The token for the next set of results.", javaType = "String") + String NEXT_TOKEN = "CamelAwsEKSNextToken"; + @Metadata(label = "listClusters", + description = "Whether the response has more results (is truncated).", javaType = "Boolean") + String IS_TRUNCATED = "CamelAwsEKSIsTruncated"; + + // Response metadata + @Metadata(label = "createCluster describeCluster deleteCluster", + description = "The ARN of the cluster.", javaType = "String") + String CLUSTER_ARN = "CamelAwsEKSClusterArn"; } diff --git a/components/camel-aws/camel-aws2-eks/src/main/java/org/apache/camel/component/aws2/eks/EKS2Producer.java b/components/camel-aws/camel-aws2-eks/src/main/java/org/apache/camel/component/aws2/eks/EKS2Producer.java index e2eeaa0fa82a..d3efc22ec543 100644 --- a/components/camel-aws/camel-aws2-eks/src/main/java/org/apache/camel/component/aws2/eks/EKS2Producer.java +++ b/components/camel-aws/camel-aws2-eks/src/main/java/org/apache/camel/component/aws2/eks/EKS2Producer.java @@ -16,6 +16,10 @@ */ package org.apache.camel.component.aws2.eks; +import java.util.function.BiConsumer; +import java.util.function.Function; +import java.util.function.Supplier; + import org.apache.camel.Endpoint; import org.apache.camel.Exchange; import org.apache.camel.InvalidPayloadException; @@ -100,147 +104,170 @@ public class EKS2Producer extends DefaultProducer { } private void listClusters(EksClient eksClient, Exchange exchange) throws InvalidPayloadException { - if (getConfiguration().isPojoRequest()) { - Object payload = exchange.getIn().getMandatoryBody(); - if (payload instanceof ListClustersRequest) { - ListClustersResponse result; - try { - result = eksClient.listClusters((ListClustersRequest) payload); - } catch (AwsServiceException ase) { - LOG.trace("List Clusters command returned the error code {}", ase.awsErrorDetails().errorCode()); - throw ase; - } - Message message = getMessageForResponse(exchange); - message.setBody(result); - } - } else { - ListClustersRequest.Builder builder = ListClustersRequest.builder(); - if (ObjectHelper.isNotEmpty(exchange.getIn().getHeader(EKS2Constants.MAX_RESULTS))) { - int maxRes = exchange.getIn().getHeader(EKS2Constants.MAX_RESULTS, Integer.class); - builder.maxResults(maxRes); - } - ListClustersResponse result; - try { - result = eksClient.listClusters(builder.build()); - } catch (AwsServiceException ase) { - LOG.trace("List Clusters command returned the error code {}", ase.awsErrorDetails().errorCode()); - throw ase; - } - Message message = getMessageForResponse(exchange); - message.setBody(result); - } + executeOperation( + exchange, + ListClustersRequest.class, + eksClient::listClusters, + () -> { + ListClustersRequest.Builder builder = ListClustersRequest.builder(); + Integer maxResults = getOptionalHeader(exchange, EKS2Constants.MAX_RESULTS, Integer.class); + if (maxResults != null) { + builder.maxResults(maxResults); + } + String nextToken = getOptionalHeader(exchange, EKS2Constants.NEXT_TOKEN, String.class); + if (nextToken != null) { + builder.nextToken(nextToken); + } + return eksClient.listClusters(builder.build()); + }, + "List Clusters", + (ListClustersResponse response, Message message) -> { + message.setHeader(EKS2Constants.NEXT_TOKEN, response.nextToken()); + message.setHeader(EKS2Constants.IS_TRUNCATED, response.nextToken() != null); + }); } private void createCluster(EksClient eksClient, Exchange exchange) throws InvalidPayloadException { - if (getConfiguration().isPojoRequest()) { - Object payload = exchange.getIn().getMandatoryBody(); - if (payload instanceof CreateClusterRequest) { - CreateClusterResponse result; - try { - result = eksClient.createCluster((CreateClusterRequest) payload); - } catch (AwsServiceException ase) { - LOG.trace("Create Cluster command returned the error code {}", ase.awsErrorDetails().errorCode()); - throw ase; - } - Message message = getMessageForResponse(exchange); - message.setBody(result); - } - } else { - CreateClusterRequest.Builder builder = CreateClusterRequest.builder(); - if (ObjectHelper.isNotEmpty(exchange.getIn().getHeader(EKS2Constants.CLUSTER_NAME))) { - String name = exchange.getIn().getHeader(EKS2Constants.CLUSTER_NAME, String.class); - builder.name(name); - } - if (ObjectHelper.isNotEmpty(exchange.getIn().getHeader(EKS2Constants.ROLE_ARN))) { - String roleArn = exchange.getIn().getHeader(EKS2Constants.ROLE_ARN, String.class); - builder.roleArn(roleArn); - } - if (ObjectHelper.isNotEmpty(exchange.getIn().getHeader(EKS2Constants.VPC_CONFIG))) { - VpcConfigRequest vpcConfig = exchange.getIn().getHeader(EKS2Constants.VPC_CONFIG, VpcConfigRequest.class); - builder.resourcesVpcConfig(vpcConfig); - } - CreateClusterResponse result; - try { - result = eksClient.createCluster(builder.build()); - } catch (AwsServiceException ase) { - LOG.trace("Create Cluster command returned the error code {}", ase.awsErrorDetails().errorCode()); - throw ase; - } - Message message = getMessageForResponse(exchange); - message.setBody(result); - } + executeOperation( + exchange, + CreateClusterRequest.class, + eksClient::createCluster, + () -> { + CreateClusterRequest.Builder builder = CreateClusterRequest.builder(); + String clusterName = getOptionalHeader(exchange, EKS2Constants.CLUSTER_NAME, String.class); + if (clusterName != null) { + builder.name(clusterName); + } + String roleArn = getOptionalHeader(exchange, EKS2Constants.ROLE_ARN, String.class); + if (roleArn != null) { + builder.roleArn(roleArn); + } + VpcConfigRequest vpcConfig = getOptionalHeader(exchange, EKS2Constants.VPC_CONFIG, VpcConfigRequest.class); + if (vpcConfig != null) { + builder.resourcesVpcConfig(vpcConfig); + } + return eksClient.createCluster(builder.build()); + }, + "Create Cluster", + (CreateClusterResponse response, Message message) -> { + if (response.cluster() != null) { + message.setHeader(EKS2Constants.CLUSTER_ARN, response.cluster().arn()); + } + }); } private void describeCluster(EksClient eksClient, Exchange exchange) throws InvalidPayloadException { - if (getConfiguration().isPojoRequest()) { - Object payload = exchange.getIn().getMandatoryBody(); - if (payload instanceof DescribeClusterRequest) { - DescribeClusterResponse result; - try { - result = eksClient.describeCluster((DescribeClusterRequest) payload); - } catch (AwsServiceException ase) { - LOG.trace("Describe Cluster command returned the error code {}", ase.awsErrorDetails().errorCode()); - throw ase; - } - Message message = getMessageForResponse(exchange); - message.setBody(result); - } - } else { - DescribeClusterRequest.Builder builder = DescribeClusterRequest.builder(); - if (ObjectHelper.isNotEmpty(exchange.getIn().getHeader(EKS2Constants.CLUSTER_NAME))) { - String name = exchange.getIn().getHeader(EKS2Constants.CLUSTER_NAME, String.class); - builder.name(name); - } else { - throw new IllegalArgumentException("Cluster name must be specified"); - } - DescribeClusterResponse result; - try { - result = eksClient.describeCluster(builder.build()); - } catch (AwsServiceException ase) { - LOG.trace("Describe Cluster command returned the error code {}", ase.awsErrorDetails().errorCode()); - throw ase; - } - Message message = getMessageForResponse(exchange); - message.setBody(result); - } + executeOperation( + exchange, + DescribeClusterRequest.class, + eksClient::describeCluster, + () -> { + String clusterName = getRequiredHeader(exchange, EKS2Constants.CLUSTER_NAME, String.class, + "Cluster name must be specified"); + return eksClient.describeCluster(DescribeClusterRequest.builder().name(clusterName).build()); + }, + "Describe Cluster", + (DescribeClusterResponse response, Message message) -> { + if (response.cluster() != null) { + message.setHeader(EKS2Constants.CLUSTER_ARN, response.cluster().arn()); + } + }); } private void deleteCluster(EksClient eksClient, Exchange exchange) throws InvalidPayloadException { + executeOperation( + exchange, + DeleteClusterRequest.class, + eksClient::deleteCluster, + () -> { + String clusterName = getRequiredHeader(exchange, EKS2Constants.CLUSTER_NAME, String.class, + "Cluster name must be specified"); + return eksClient.deleteCluster(DeleteClusterRequest.builder().name(clusterName).build()); + }, + "Delete Cluster", + (DeleteClusterResponse response, Message message) -> { + if (response.cluster() != null) { + message.setHeader(EKS2Constants.CLUSTER_ARN, response.cluster().arn()); + } + }); + } + + public static Message getMessageForResponse(final Exchange exchange) { + return exchange.getMessage(); + } + + /** + * Executes an EKS operation with POJO request support. + */ + private <REQ, RES> void executeOperation( + Exchange exchange, + Class<REQ> requestClass, + Function<REQ, RES> pojoExecutor, + Supplier<RES> headerExecutor, + String operationName) + throws InvalidPayloadException { + executeOperation(exchange, requestClass, pojoExecutor, headerExecutor, operationName, null); + } + + /** + * Executes an EKS operation with POJO request support and optional response post-processing. + */ + private <REQ, RES> void executeOperation( + Exchange exchange, + Class<REQ> requestClass, + Function<REQ, RES> pojoExecutor, + Supplier<RES> headerExecutor, + String operationName, + BiConsumer<RES, Message> responseProcessor) + throws InvalidPayloadException { + + RES result; if (getConfiguration().isPojoRequest()) { Object payload = exchange.getIn().getMandatoryBody(); - if (payload instanceof DeleteClusterRequest) { - DeleteClusterResponse result; + if (requestClass.isInstance(payload)) { try { - result = eksClient.deleteCluster((DeleteClusterRequest) payload); + result = pojoExecutor.apply(requestClass.cast(payload)); } catch (AwsServiceException ase) { - LOG.trace("Delete Cluster command returned the error code {}", ase.awsErrorDetails().errorCode()); + LOG.trace("{} command returned the error code {}", operationName, ase.awsErrorDetails().errorCode()); throw ase; } - Message message = getMessageForResponse(exchange); - message.setBody(result); - } - } else { - DeleteClusterRequest.Builder builder = DeleteClusterRequest.builder(); - if (ObjectHelper.isNotEmpty(exchange.getIn().getHeader(EKS2Constants.CLUSTER_NAME))) { - String name = exchange.getIn().getHeader(EKS2Constants.CLUSTER_NAME, String.class); - builder.name(name); } else { - throw new IllegalArgumentException("Cluster name must be specified"); + throw new IllegalArgumentException( + String.format("Expected body of type %s but was %s", + requestClass.getName(), + payload != null ? payload.getClass().getName() : "null")); } - DeleteClusterResponse result; + } else { try { - result = eksClient.deleteCluster(builder.build()); + result = headerExecutor.get(); } catch (AwsServiceException ase) { - LOG.trace("Delete Cluster command returned the error code {}", ase.awsErrorDetails().errorCode()); + LOG.trace("{} command returned the error code {}", operationName, ase.awsErrorDetails().errorCode()); throw ase; } - Message message = getMessageForResponse(exchange); - message.setBody(result); + } + Message message = getMessageForResponse(exchange); + message.setBody(result); + if (responseProcessor != null) { + responseProcessor.accept(result, message); } } - public static Message getMessageForResponse(final Exchange exchange) { - return exchange.getMessage(); + /** + * Gets a required header value or throws an IllegalArgumentException. + */ + private <T> T getRequiredHeader(Exchange exchange, String headerName, Class<T> headerType, String errorMessage) { + T value = exchange.getIn().getHeader(headerName, headerType); + if (ObjectHelper.isEmpty(value)) { + throw new IllegalArgumentException(errorMessage); + } + return value; + } + + /** + * Gets an optional header value. + */ + private <T> T getOptionalHeader(Exchange exchange, String headerName, Class<T> headerType) { + return exchange.getIn().getHeader(headerName, headerType); } @Override diff --git a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/EKS2EndpointBuilderFactory.java b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/EKS2EndpointBuilderFactory.java index 9fe35407aec7..883938e82614 100644 --- a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/EKS2EndpointBuilderFactory.java +++ b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/EKS2EndpointBuilderFactory.java @@ -665,6 +665,42 @@ public interface EKS2EndpointBuilderFactory { public String awsEKSVPCConfig() { return "CamelAwsEKSVPCConfig"; } + /** + * The token for the next set of results. + * + * The option is a: {@code String} type. + * + * Group: listClusters + * + * @return the name of the header {@code AwsEKSNextToken}. + */ + public String awsEKSNextToken() { + return "CamelAwsEKSNextToken"; + } + /** + * Whether the response has more results (is truncated). + * + * The option is a: {@code Boolean} type. + * + * Group: listClusters + * + * @return the name of the header {@code AwsEKSIsTruncated}. + */ + public String awsEKSIsTruncated() { + return "CamelAwsEKSIsTruncated"; + } + /** + * The ARN of the cluster. + * + * The option is a: {@code String} type. + * + * Group: createCluster describeCluster deleteCluster + * + * @return the name of the header {@code AwsEKSClusterArn}. + */ + public String awsEKSClusterArn() { + return "CamelAwsEKSClusterArn"; + } } static EKS2EndpointBuilder endpointBuilder(String componentName, String path) { class EKS2EndpointBuilderImpl extends AbstractEndpointBuilder implements EKS2EndpointBuilder, AdvancedEKS2EndpointBuilder {
