This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch 22805-3 in repository https://gitbox.apache.org/repos/asf/camel.git
commit 8988982d53648c38e7937496575af978ed7b1bef Author: Andrea Cosentino <[email protected]> AuthorDate: Mon Jan 12 14:17:13 2026 +0100 CAMEL-22840 - Camel-AWS components: Avoid duplicated code and add pagination to producer operation where it makes sense - AWS Eventbridge Signed-off-by: Andrea Cosentino <[email protected]> --- .../camel/catalog/components/aws2-eventbridge.json | 7 +- .../aws2/eventbridge/aws2-eventbridge.json | 7 +- .../aws2/eventbridge/EventbridgeConstants.java | 19 + .../aws2/eventbridge/EventbridgeProducer.java | 623 ++++++++++----------- .../dsl/EventbridgeEndpointBuilderFactory.java | 61 ++ 5 files changed, 381 insertions(+), 336 deletions(-) diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/aws2-eventbridge.json b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/aws2-eventbridge.json index a658e25fef91..0281ae8efa7b 100644 --- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/aws2-eventbridge.json +++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/aws2-eventbridge.json @@ -58,7 +58,12 @@ "CamelAwsEventbridgeTargetArn": { "index": 6, "kind": "header", "displayName": "", "group": "producer", "label": "", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The Amazon Resource Name (ARN) of the target resource.", "constantName": "org.apache.camel.component.aws2.eventbridge.EventbridgeConstants#TARGET_ARN" }, "CamelAwsEventbridgeResourcesArn": { "index": 7, "kind": "header", "displayName": "", "group": "producer", "label": "", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Comma separated list of Amazon Resource Names (ARN) of the resources related to Event", "constantName": "org.apache.camel.component.aws2.eventbridge.EventbridgeConstants#EVENT_RESOURCES_ARN" }, "CamelAwsEventbridgeSource": { "index": 8, "kind": "header", "displayName": "", "group": "producer", "label": "", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The source related to Event", "constantName": "org.apache.camel.component.aws2.eventbridge.EventbridgeConstants#EVENT_SOURCE" }, - "CamelAwsEventbridgeDetailType": { "index": 9, "kind": "header", "displayName": "", "group": "producer", "label": "", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The detail type related to Event", "constantName": "org.apache.camel.component.aws2.eventbridge.EventbridgeConstants#EVENT_DETAIL_TYPE" } + "CamelAwsEventbridgeDetailType": { "index": 9, "kind": "header", "displayName": "", "group": "producer", "label": "", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The detail type related to Event", "constantName": "org.apache.camel.component.aws2.eventbridge.EventbridgeConstants#EVENT_DETAIL_TYPE" }, + "CamelAwsEventbridgeNextToken": { "index": 10, "kind": "header", "displayName": "", "group": "listRules listTargetsByRule listRuleNamesByTarget", "label": "listRules listTargetsByRule listRuleNamesByTarget", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The token for the next set of results.", "constantName": "org.apache.camel.component.aws2.eventbridge.EventbridgeConstants#NEXT_TOKEN" }, + "CamelAwsEventbridgeLimit": { "index": 11, "kind": "header", "displayName": "", "group": "listRules listTargetsByRule listRuleNamesByTarget", "label": "listRules listTargetsByRule listRuleNamesByTarget", "required": false, "javaType": "Integer", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The maximum number of results to return.", "constantName": "org.apache.camel.component.aws2.eventbridge.EventbridgeConstants#LIMIT" }, + "CamelAwsEventbridgeIsTruncated": { "index": 12, "kind": "header", "displayName": "", "group": "listRules listTargetsByRule listRuleNamesByTarget", "label": "listRules listTargetsByRule listRuleNamesByTarget", "required": false, "javaType": "Boolean", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Whether the response has more results (is truncated).", "constantName": "org.apache.camel.component.aws2.eventbridge.EventbridgeConstants#I [...] + "CamelAwsEventbridgeRuleArn": { "index": 13, "kind": "header", "displayName": "", "group": "putRule describeRule", "label": "putRule describeRule", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The Amazon Resource Name (ARN) of the rule.", "constantName": "org.apache.camel.component.aws2.eventbridge.EventbridgeConstants#RULE_ARN" }, + "CamelAwsEventbridgeFailedEntryCount": { "index": 14, "kind": "header", "displayName": "", "group": "putEvent putTargets removeTargets", "label": "putEvent putTargets removeTargets", "required": false, "javaType": "Integer", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The number of failed entries in the response.", "constantName": "org.apache.camel.component.aws2.eventbridge.EventbridgeConstants#FAILED_ENTRY_COUNT" } }, "properties": { "eventbusNameOrArn": { "index": 0, "kind": "path", "displayName": "Eventbus Name Or Arn", "group": "producer", "label": "", "required": true, "type": "string", "javaType": "java.lang.String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Event bus name or ARN" }, diff --git a/components/camel-aws/camel-aws2-eventbridge/src/generated/resources/META-INF/org/apache/camel/component/aws2/eventbridge/aws2-eventbridge.json b/components/camel-aws/camel-aws2-eventbridge/src/generated/resources/META-INF/org/apache/camel/component/aws2/eventbridge/aws2-eventbridge.json index a658e25fef91..0281ae8efa7b 100644 --- a/components/camel-aws/camel-aws2-eventbridge/src/generated/resources/META-INF/org/apache/camel/component/aws2/eventbridge/aws2-eventbridge.json +++ b/components/camel-aws/camel-aws2-eventbridge/src/generated/resources/META-INF/org/apache/camel/component/aws2/eventbridge/aws2-eventbridge.json @@ -58,7 +58,12 @@ "CamelAwsEventbridgeTargetArn": { "index": 6, "kind": "header", "displayName": "", "group": "producer", "label": "", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The Amazon Resource Name (ARN) of the target resource.", "constantName": "org.apache.camel.component.aws2.eventbridge.EventbridgeConstants#TARGET_ARN" }, "CamelAwsEventbridgeResourcesArn": { "index": 7, "kind": "header", "displayName": "", "group": "producer", "label": "", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Comma separated list of Amazon Resource Names (ARN) of the resources related to Event", "constantName": "org.apache.camel.component.aws2.eventbridge.EventbridgeConstants#EVENT_RESOURCES_ARN" }, "CamelAwsEventbridgeSource": { "index": 8, "kind": "header", "displayName": "", "group": "producer", "label": "", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The source related to Event", "constantName": "org.apache.camel.component.aws2.eventbridge.EventbridgeConstants#EVENT_SOURCE" }, - "CamelAwsEventbridgeDetailType": { "index": 9, "kind": "header", "displayName": "", "group": "producer", "label": "", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The detail type related to Event", "constantName": "org.apache.camel.component.aws2.eventbridge.EventbridgeConstants#EVENT_DETAIL_TYPE" } + "CamelAwsEventbridgeDetailType": { "index": 9, "kind": "header", "displayName": "", "group": "producer", "label": "", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The detail type related to Event", "constantName": "org.apache.camel.component.aws2.eventbridge.EventbridgeConstants#EVENT_DETAIL_TYPE" }, + "CamelAwsEventbridgeNextToken": { "index": 10, "kind": "header", "displayName": "", "group": "listRules listTargetsByRule listRuleNamesByTarget", "label": "listRules listTargetsByRule listRuleNamesByTarget", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The token for the next set of results.", "constantName": "org.apache.camel.component.aws2.eventbridge.EventbridgeConstants#NEXT_TOKEN" }, + "CamelAwsEventbridgeLimit": { "index": 11, "kind": "header", "displayName": "", "group": "listRules listTargetsByRule listRuleNamesByTarget", "label": "listRules listTargetsByRule listRuleNamesByTarget", "required": false, "javaType": "Integer", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The maximum number of results to return.", "constantName": "org.apache.camel.component.aws2.eventbridge.EventbridgeConstants#LIMIT" }, + "CamelAwsEventbridgeIsTruncated": { "index": 12, "kind": "header", "displayName": "", "group": "listRules listTargetsByRule listRuleNamesByTarget", "label": "listRules listTargetsByRule listRuleNamesByTarget", "required": false, "javaType": "Boolean", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Whether the response has more results (is truncated).", "constantName": "org.apache.camel.component.aws2.eventbridge.EventbridgeConstants#I [...] + "CamelAwsEventbridgeRuleArn": { "index": 13, "kind": "header", "displayName": "", "group": "putRule describeRule", "label": "putRule describeRule", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The Amazon Resource Name (ARN) of the rule.", "constantName": "org.apache.camel.component.aws2.eventbridge.EventbridgeConstants#RULE_ARN" }, + "CamelAwsEventbridgeFailedEntryCount": { "index": 14, "kind": "header", "displayName": "", "group": "putEvent putTargets removeTargets", "label": "putEvent putTargets removeTargets", "required": false, "javaType": "Integer", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The number of failed entries in the response.", "constantName": "org.apache.camel.component.aws2.eventbridge.EventbridgeConstants#FAILED_ENTRY_COUNT" } }, "properties": { "eventbusNameOrArn": { "index": 0, "kind": "path", "displayName": "Eventbus Name Or Arn", "group": "producer", "label": "", "required": true, "type": "string", "javaType": "java.lang.String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Event bus name or ARN" }, diff --git a/components/camel-aws/camel-aws2-eventbridge/src/main/java/org/apache/camel/component/aws2/eventbridge/EventbridgeConstants.java b/components/camel-aws/camel-aws2-eventbridge/src/main/java/org/apache/camel/component/aws2/eventbridge/EventbridgeConstants.java index 65ab60d89528..47d231708c4b 100644 --- a/components/camel-aws/camel-aws2-eventbridge/src/main/java/org/apache/camel/component/aws2/eventbridge/EventbridgeConstants.java +++ b/components/camel-aws/camel-aws2-eventbridge/src/main/java/org/apache/camel/component/aws2/eventbridge/EventbridgeConstants.java @@ -43,4 +43,23 @@ public interface EventbridgeConstants { String EVENT_SOURCE = "CamelAwsEventbridgeSource"; @Metadata(description = "The detail type related to Event", javaType = "String") String EVENT_DETAIL_TYPE = "CamelAwsEventbridgeDetailType"; + + // Pagination constants + @Metadata(label = "listRules listTargetsByRule listRuleNamesByTarget", + description = "The token for the next set of results.", javaType = "String") + String NEXT_TOKEN = "CamelAwsEventbridgeNextToken"; + @Metadata(label = "listRules listTargetsByRule listRuleNamesByTarget", + description = "The maximum number of results to return.", javaType = "Integer") + String LIMIT = "CamelAwsEventbridgeLimit"; + @Metadata(label = "listRules listTargetsByRule listRuleNamesByTarget", + description = "Whether the response has more results (is truncated).", javaType = "Boolean") + String IS_TRUNCATED = "CamelAwsEventbridgeIsTruncated"; + + // Response metadata + @Metadata(label = "putRule describeRule", + description = "The Amazon Resource Name (ARN) of the rule.", javaType = "String") + String RULE_ARN = "CamelAwsEventbridgeRuleArn"; + @Metadata(label = "putEvent putTargets removeTargets", + description = "The number of failed entries in the response.", javaType = "Integer") + String FAILED_ENTRY_COUNT = "CamelAwsEventbridgeFailedEntryCount"; } diff --git a/components/camel-aws/camel-aws2-eventbridge/src/main/java/org/apache/camel/component/aws2/eventbridge/EventbridgeProducer.java b/components/camel-aws/camel-aws2-eventbridge/src/main/java/org/apache/camel/component/aws2/eventbridge/EventbridgeProducer.java index 8dd3dc39c5b5..9595cd159f43 100644 --- a/components/camel-aws/camel-aws2-eventbridge/src/main/java/org/apache/camel/component/aws2/eventbridge/EventbridgeProducer.java +++ b/components/camel-aws/camel-aws2-eventbridge/src/main/java/org/apache/camel/component/aws2/eventbridge/EventbridgeProducer.java @@ -20,6 +20,9 @@ import java.io.IOException; import java.io.InputStream; import java.nio.charset.Charset; import java.util.Collection; +import java.util.function.BiConsumer; +import java.util.function.Function; +import java.util.function.Supplier; import java.util.stream.Stream; import org.apache.camel.Endpoint; @@ -39,13 +42,10 @@ import org.slf4j.LoggerFactory; import software.amazon.awssdk.awscore.exception.AwsServiceException; import software.amazon.awssdk.services.eventbridge.EventBridgeClient; import software.amazon.awssdk.services.eventbridge.model.DeleteRuleRequest; -import software.amazon.awssdk.services.eventbridge.model.DeleteRuleResponse; import software.amazon.awssdk.services.eventbridge.model.DescribeRuleRequest; import software.amazon.awssdk.services.eventbridge.model.DescribeRuleResponse; import software.amazon.awssdk.services.eventbridge.model.DisableRuleRequest; -import software.amazon.awssdk.services.eventbridge.model.DisableRuleResponse; import software.amazon.awssdk.services.eventbridge.model.EnableRuleRequest; -import software.amazon.awssdk.services.eventbridge.model.EnableRuleResponse; import software.amazon.awssdk.services.eventbridge.model.ListRuleNamesByTargetRequest; import software.amazon.awssdk.services.eventbridge.model.ListRuleNamesByTargetResponse; import software.amazon.awssdk.services.eventbridge.model.ListRulesRequest; @@ -154,23 +154,22 @@ public class EventbridgeProducer extends DefaultProducer { } Message message = getMessageForResponse(exchange); message.setBody(result); + message.setHeader(EventbridgeConstants.RULE_ARN, result.ruleArn()); } } else { PutRuleRequest.Builder builder = PutRuleRequest.builder(); - if (ObjectHelper.isNotEmpty(exchange.getIn().getHeader(EventbridgeConstants.RULE_NAME))) { - String ruleName = exchange.getIn().getHeader(EventbridgeConstants.RULE_NAME, String.class); + String ruleName = getOptionalHeader(exchange, EventbridgeConstants.RULE_NAME, String.class); + if (ruleName != null) { builder.name(ruleName); } - if (ObjectHelper.isEmpty(exchange.getIn().getHeader(EventbridgeConstants.EVENT_PATTERN))) { + String eventPattern = getOptionalHeader(exchange, EventbridgeConstants.EVENT_PATTERN, String.class); + if (eventPattern == null) { try (InputStream s = ResourceHelper.resolveMandatoryResourceAsInputStream(this.getEndpoint().getCamelContext(), getConfiguration().getEventPatternFile())) { - String eventPattern = IOUtils.toString(s, Charset.defaultCharset()); - builder.eventPattern(eventPattern); + eventPattern = IOUtils.toString(s, Charset.defaultCharset()); } - } else { - String eventPattern = exchange.getIn().getHeader(EventbridgeConstants.EVENT_PATTERN, String.class); - builder.eventPattern(eventPattern); } + builder.eventPattern(eventPattern); builder.eventBusName(getConfiguration().getEventbusName()); PutRuleResponse result; try { @@ -181,377 +180,333 @@ public class EventbridgeProducer extends DefaultProducer { } Message message = getMessageForResponse(exchange); message.setBody(result); + message.setHeader(EventbridgeConstants.RULE_ARN, result.ruleArn()); } } + @SuppressWarnings("unchecked") private void putTargets(EventBridgeClient eventbridgeClient, Exchange exchange) throws InvalidPayloadException { - if (getConfiguration().isPojoRequest()) { - Object payload = exchange.getIn().getMandatoryBody(); - if (payload instanceof PutTargetsRequest) { - PutTargetsResponse result; - try { - result = eventbridgeClient.putTargets((PutTargetsRequest) payload); - } catch (AwsServiceException ase) { - LOG.trace("PutTargets command returned the error code {}", ase.awsErrorDetails().errorCode()); - throw ase; - } - Message message = getMessageForResponse(exchange); - message.setBody(result); - } - } else { - PutTargetsRequest.Builder builder = PutTargetsRequest.builder(); - if (ObjectHelper.isNotEmpty(exchange.getIn().getHeader(EventbridgeConstants.RULE_NAME))) { - String ruleName = exchange.getIn().getHeader(EventbridgeConstants.RULE_NAME, String.class); - builder.rule(ruleName); - } - if (ObjectHelper.isNotEmpty(exchange.getIn().getHeader(EventbridgeConstants.TARGETS))) { - Collection<Target> targets = exchange.getIn().getHeader(EventbridgeConstants.TARGETS, Collection.class); - builder.targets(targets); - } else { - throw new IllegalArgumentException("At least one targets must be specified"); - } - builder.eventBusName(getConfiguration().getEventbusName()); - PutTargetsResponse result; - try { - result = eventbridgeClient.putTargets(builder.build()); - } catch (AwsServiceException ase) { - LOG.trace("Put Targets command returned the error code {}", ase.awsErrorDetails().errorCode()); - throw ase; - } - Message message = getMessageForResponse(exchange); - message.setBody(result); - } + executeOperation( + exchange, + PutTargetsRequest.class, + eventbridgeClient::putTargets, + () -> { + PutTargetsRequest.Builder builder = PutTargetsRequest.builder(); + String ruleName = getOptionalHeader(exchange, EventbridgeConstants.RULE_NAME, String.class); + if (ruleName != null) { + builder.rule(ruleName); + } + Collection<Target> targets = getOptionalHeader(exchange, EventbridgeConstants.TARGETS, Collection.class); + if (targets == null || targets.isEmpty()) { + throw new IllegalArgumentException("At least one targets must be specified"); + } + builder.targets(targets); + builder.eventBusName(getConfiguration().getEventbusName()); + return eventbridgeClient.putTargets(builder.build()); + }, + "Put Targets", + (PutTargetsResponse response, Message message) -> { + message.setHeader(EventbridgeConstants.FAILED_ENTRY_COUNT, response.failedEntryCount()); + }); } + @SuppressWarnings("unchecked") private void removeTargets(EventBridgeClient eventbridgeClient, Exchange exchange) throws InvalidPayloadException { - if (getConfiguration().isPojoRequest()) { - Object payload = exchange.getIn().getMandatoryBody(); - if (payload instanceof RemoveTargetsRequest) { - RemoveTargetsResponse result; - try { - result = eventbridgeClient.removeTargets((RemoveTargetsRequest) payload); - } catch (AwsServiceException ase) { - LOG.trace("RemoveTargets command returned the error code {}", ase.awsErrorDetails().errorCode()); - throw ase; - } - Message message = getMessageForResponse(exchange); - message.setBody(result); - } - } else { - RemoveTargetsRequest.Builder builder = RemoveTargetsRequest.builder(); - if (ObjectHelper.isNotEmpty(exchange.getIn().getHeader(EventbridgeConstants.RULE_NAME))) { - String ruleName = exchange.getIn().getHeader(EventbridgeConstants.RULE_NAME, String.class); - builder.rule(ruleName); - } - if (ObjectHelper.isNotEmpty(exchange.getIn().getHeader(EventbridgeConstants.TARGETS_IDS))) { - Collection<String> ids = exchange.getIn().getHeader(EventbridgeConstants.TARGETS_IDS, Collection.class); - builder.ids(ids); - } else { - throw new IllegalArgumentException("At least one targets must be specified"); - } - builder.eventBusName(getConfiguration().getEventbusName()); - RemoveTargetsResponse result; - try { - result = eventbridgeClient.removeTargets(builder.build()); - } catch (AwsServiceException ase) { - LOG.trace("Remove Targets command returned the error code {}", ase.awsErrorDetails().errorCode()); - throw ase; - } - Message message = getMessageForResponse(exchange); - message.setBody(result); - } + executeOperation( + exchange, + RemoveTargetsRequest.class, + eventbridgeClient::removeTargets, + () -> { + RemoveTargetsRequest.Builder builder = RemoveTargetsRequest.builder(); + String ruleName = getOptionalHeader(exchange, EventbridgeConstants.RULE_NAME, String.class); + if (ruleName != null) { + builder.rule(ruleName); + } + Collection<String> ids = getOptionalHeader(exchange, EventbridgeConstants.TARGETS_IDS, Collection.class); + if (ids == null || ids.isEmpty()) { + throw new IllegalArgumentException("At least one target ID must be specified"); + } + builder.ids(ids); + builder.eventBusName(getConfiguration().getEventbusName()); + return eventbridgeClient.removeTargets(builder.build()); + }, + "Remove Targets", + (RemoveTargetsResponse response, Message message) -> { + message.setHeader(EventbridgeConstants.FAILED_ENTRY_COUNT, response.failedEntryCount()); + }); } private void deleteRule(EventBridgeClient eventbridgeClient, Exchange exchange) throws InvalidPayloadException { - if (getConfiguration().isPojoRequest()) { - Object payload = exchange.getIn().getMandatoryBody(); - if (payload instanceof DeleteRuleRequest) { - DeleteRuleResponse result; - try { - result = eventbridgeClient.deleteRule((DeleteRuleRequest) payload); - } catch (AwsServiceException ase) { - LOG.trace("Delete Rule command returned the error code {}", ase.awsErrorDetails().errorCode()); - throw ase; - } - Message message = getMessageForResponse(exchange); - message.setBody(result); - } - } else { - DeleteRuleRequest.Builder builder = DeleteRuleRequest.builder(); - if (ObjectHelper.isNotEmpty(exchange.getIn().getHeader(EventbridgeConstants.RULE_NAME))) { - String ruleName = exchange.getIn().getHeader(EventbridgeConstants.RULE_NAME, String.class); - builder.name(ruleName); - } - builder.eventBusName(getConfiguration().getEventbusName()); - DeleteRuleResponse result; - try { - result = eventbridgeClient.deleteRule(builder.build()); - } catch (AwsServiceException ase) { - LOG.trace("Delete Rule command returned the error code {}", ase.awsErrorDetails().errorCode()); - throw ase; - } - Message message = getMessageForResponse(exchange); - message.setBody(result); - } + executeOperation( + exchange, + DeleteRuleRequest.class, + eventbridgeClient::deleteRule, + () -> { + DeleteRuleRequest.Builder builder = DeleteRuleRequest.builder(); + String ruleName = getOptionalHeader(exchange, EventbridgeConstants.RULE_NAME, String.class); + if (ruleName != null) { + builder.name(ruleName); + } + builder.eventBusName(getConfiguration().getEventbusName()); + return eventbridgeClient.deleteRule(builder.build()); + }, + "Delete Rule"); } private void enableRule(EventBridgeClient eventbridgeClient, Exchange exchange) throws InvalidPayloadException { - if (getConfiguration().isPojoRequest()) { - Object payload = exchange.getIn().getMandatoryBody(); - if (payload instanceof EnableRuleRequest) { - EnableRuleResponse result; - try { - result = eventbridgeClient.enableRule((EnableRuleRequest) payload); - } catch (AwsServiceException ase) { - LOG.trace("Enable Rule command returned the error code {}", ase.awsErrorDetails().errorCode()); - throw ase; - } - Message message = getMessageForResponse(exchange); - message.setBody(result); - } - } else { - EnableRuleRequest.Builder builder = EnableRuleRequest.builder(); - if (ObjectHelper.isNotEmpty(exchange.getIn().getHeader(EventbridgeConstants.RULE_NAME))) { - String ruleName = exchange.getIn().getHeader(EventbridgeConstants.RULE_NAME, String.class); - builder.name(ruleName); - } - builder.eventBusName(getConfiguration().getEventbusName()); - EnableRuleResponse result; - try { - result = eventbridgeClient.enableRule(builder.build()); - } catch (AwsServiceException ase) { - LOG.trace("Enable Rule command returned the error code {}", ase.awsErrorDetails().errorCode()); - throw ase; - } - Message message = getMessageForResponse(exchange); - message.setBody(result); - } + executeOperation( + exchange, + EnableRuleRequest.class, + eventbridgeClient::enableRule, + () -> { + EnableRuleRequest.Builder builder = EnableRuleRequest.builder(); + String ruleName = getOptionalHeader(exchange, EventbridgeConstants.RULE_NAME, String.class); + if (ruleName != null) { + builder.name(ruleName); + } + builder.eventBusName(getConfiguration().getEventbusName()); + return eventbridgeClient.enableRule(builder.build()); + }, + "Enable Rule"); } private void disableRule(EventBridgeClient eventbridgeClient, Exchange exchange) throws InvalidPayloadException { - if (getConfiguration().isPojoRequest()) { - Object payload = exchange.getIn().getMandatoryBody(); - if (payload instanceof DisableRuleRequest) { - DisableRuleResponse result; - try { - result = eventbridgeClient.disableRule((DisableRuleRequest) payload); - } catch (AwsServiceException ase) { - LOG.trace("Disable Rule command returned the error code {}", ase.awsErrorDetails().errorCode()); - throw ase; - } - Message message = getMessageForResponse(exchange); - message.setBody(result); - } - } else { - DisableRuleRequest.Builder builder = DisableRuleRequest.builder(); - if (ObjectHelper.isNotEmpty(exchange.getIn().getHeader(EventbridgeConstants.RULE_NAME))) { - String ruleName = exchange.getIn().getHeader(EventbridgeConstants.RULE_NAME, String.class); - builder.name(ruleName); - } - builder.eventBusName(getConfiguration().getEventbusName()); - DisableRuleResponse result; - try { - result = eventbridgeClient.disableRule(builder.build()); - } catch (AwsServiceException ase) { - LOG.trace("Disable Rule command returned the error code {}", ase.awsErrorDetails().errorCode()); - throw ase; - } - Message message = getMessageForResponse(exchange); - message.setBody(result); - } + executeOperation( + exchange, + DisableRuleRequest.class, + eventbridgeClient::disableRule, + () -> { + DisableRuleRequest.Builder builder = DisableRuleRequest.builder(); + String ruleName = getOptionalHeader(exchange, EventbridgeConstants.RULE_NAME, String.class); + if (ruleName != null) { + builder.name(ruleName); + } + builder.eventBusName(getConfiguration().getEventbusName()); + return eventbridgeClient.disableRule(builder.build()); + }, + "Disable Rule"); } private void listRules(EventBridgeClient eventbridgeClient, Exchange exchange) throws InvalidPayloadException { - if (getConfiguration().isPojoRequest()) { - Object payload = exchange.getIn().getMandatoryBody(); - if (payload instanceof ListRulesRequest) { - ListRulesResponse result; - try { - result = eventbridgeClient.listRules((ListRulesRequest) payload); - } catch (AwsServiceException ase) { - LOG.trace("List Rules command returned the error code {}", ase.awsErrorDetails().errorCode()); - throw ase; - } - Message message = getMessageForResponse(exchange); - message.setBody(result); - } - } else { - ListRulesRequest.Builder builder = ListRulesRequest.builder(); - if (ObjectHelper.isNotEmpty(exchange.getIn().getHeader(EventbridgeConstants.RULE_NAME_PREFIX))) { - String ruleNamePrefix = exchange.getIn().getHeader(EventbridgeConstants.RULE_NAME_PREFIX, String.class); - builder.namePrefix(ruleNamePrefix); - } - builder.eventBusName(getConfiguration().getEventbusName()); - ListRulesResponse result; - try { - result = eventbridgeClient.listRules(builder.build()); - } catch (AwsServiceException ase) { - LOG.trace("Disable Rule command returned the error code {}", ase.awsErrorDetails().errorCode()); - throw ase; - } - Message message = getMessageForResponse(exchange); - message.setBody(result); - } + executeOperation( + exchange, + ListRulesRequest.class, + eventbridgeClient::listRules, + () -> { + ListRulesRequest.Builder builder = ListRulesRequest.builder(); + String ruleNamePrefix = getOptionalHeader(exchange, EventbridgeConstants.RULE_NAME_PREFIX, String.class); + if (ruleNamePrefix != null) { + builder.namePrefix(ruleNamePrefix); + } + String nextToken = getOptionalHeader(exchange, EventbridgeConstants.NEXT_TOKEN, String.class); + if (nextToken != null) { + builder.nextToken(nextToken); + } + Integer limit = getOptionalHeader(exchange, EventbridgeConstants.LIMIT, Integer.class); + if (limit != null) { + builder.limit(limit); + } + builder.eventBusName(getConfiguration().getEventbusName()); + return eventbridgeClient.listRules(builder.build()); + }, + "List Rules", + (ListRulesResponse response, Message message) -> { + message.setHeader(EventbridgeConstants.NEXT_TOKEN, response.nextToken()); + message.setHeader(EventbridgeConstants.IS_TRUNCATED, response.nextToken() != null); + }); } private void describeRule(EventBridgeClient eventbridgeClient, Exchange exchange) throws InvalidPayloadException { - if (getConfiguration().isPojoRequest()) { - Object payload = exchange.getIn().getMandatoryBody(); - if (payload instanceof DescribeRuleRequest) { - DescribeRuleResponse result; - try { - result = eventbridgeClient.describeRule((DescribeRuleRequest) payload); - } catch (AwsServiceException ase) { - LOG.trace("Describe Rule command returned the error code {}", ase.awsErrorDetails().errorCode()); - throw ase; - } - Message message = getMessageForResponse(exchange); - message.setBody(result); - } - } else { - DescribeRuleRequest.Builder builder = DescribeRuleRequest.builder(); - if (ObjectHelper.isNotEmpty(exchange.getIn().getHeader(EventbridgeConstants.RULE_NAME))) { - String ruleName = exchange.getIn().getHeader(EventbridgeConstants.RULE_NAME, String.class); - builder.name(ruleName); - } - builder.eventBusName(getConfiguration().getEventbusName()); - DescribeRuleResponse result; - try { - result = eventbridgeClient.describeRule(builder.build()); - } catch (AwsServiceException ase) { - LOG.trace("Describe Rule command returned the error code {}", ase.awsErrorDetails().errorCode()); - throw ase; - } - Message message = getMessageForResponse(exchange); - message.setBody(result); - } + executeOperation( + exchange, + DescribeRuleRequest.class, + eventbridgeClient::describeRule, + () -> { + DescribeRuleRequest.Builder builder = DescribeRuleRequest.builder(); + String ruleName = getOptionalHeader(exchange, EventbridgeConstants.RULE_NAME, String.class); + if (ruleName != null) { + builder.name(ruleName); + } + builder.eventBusName(getConfiguration().getEventbusName()); + return eventbridgeClient.describeRule(builder.build()); + }, + "Describe Rule", + (DescribeRuleResponse response, Message message) -> { + message.setHeader(EventbridgeConstants.RULE_ARN, response.arn()); + }); } private void listTargetsByRule(EventBridgeClient eventbridgeClient, Exchange exchange) throws InvalidPayloadException { - if (getConfiguration().isPojoRequest()) { - Object payload = exchange.getIn().getMandatoryBody(); - if (payload instanceof ListTargetsByRuleRequest) { - ListTargetsByRuleResponse result; - try { - result = eventbridgeClient.listTargetsByRule((ListTargetsByRuleRequest) payload); - } catch (AwsServiceException ase) { - LOG.trace("List Targets by Rule command returned the error code {}", ase.awsErrorDetails().errorCode()); - throw ase; - } - Message message = getMessageForResponse(exchange); - message.setBody(result); - } - } else { - ListTargetsByRuleRequest.Builder builder = ListTargetsByRuleRequest.builder(); - if (ObjectHelper.isNotEmpty(exchange.getIn().getHeader(EventbridgeConstants.RULE_NAME))) { - String ruleName = exchange.getIn().getHeader(EventbridgeConstants.RULE_NAME, String.class); - builder.rule(ruleName); - } - builder.eventBusName(getConfiguration().getEventbusName()); - ListTargetsByRuleResponse result; - try { - result = eventbridgeClient.listTargetsByRule(builder.build()); - } catch (AwsServiceException ase) { - LOG.trace("List Targets by Rule command returned the error code {}", ase.awsErrorDetails().errorCode()); - throw ase; - } - Message message = getMessageForResponse(exchange); - message.setBody(result); - } + executeOperation( + exchange, + ListTargetsByRuleRequest.class, + eventbridgeClient::listTargetsByRule, + () -> { + ListTargetsByRuleRequest.Builder builder = ListTargetsByRuleRequest.builder(); + String ruleName = getOptionalHeader(exchange, EventbridgeConstants.RULE_NAME, String.class); + if (ruleName != null) { + builder.rule(ruleName); + } + String nextToken = getOptionalHeader(exchange, EventbridgeConstants.NEXT_TOKEN, String.class); + if (nextToken != null) { + builder.nextToken(nextToken); + } + Integer limit = getOptionalHeader(exchange, EventbridgeConstants.LIMIT, Integer.class); + if (limit != null) { + builder.limit(limit); + } + builder.eventBusName(getConfiguration().getEventbusName()); + return eventbridgeClient.listTargetsByRule(builder.build()); + }, + "List Targets by Rule", + (ListTargetsByRuleResponse response, Message message) -> { + message.setHeader(EventbridgeConstants.NEXT_TOKEN, response.nextToken()); + message.setHeader(EventbridgeConstants.IS_TRUNCATED, response.nextToken() != null); + }); } private void listRuleNamesByTarget(EventBridgeClient eventbridgeClient, Exchange exchange) throws InvalidPayloadException { - if (getConfiguration().isPojoRequest()) { - Object payload = exchange.getIn().getMandatoryBody(); - if (payload instanceof ListRuleNamesByTargetRequest) { - ListRuleNamesByTargetResponse result; - try { - result = eventbridgeClient.listRuleNamesByTarget((ListRuleNamesByTargetRequest) payload); - } catch (AwsServiceException ase) { - LOG.trace("List Rule Name by Targets command returned the error code {}", - ase.awsErrorDetails().errorCode()); - throw ase; - } - Message message = getMessageForResponse(exchange); - message.setBody(result); - } - } else { - ListRuleNamesByTargetRequest.Builder builder = ListRuleNamesByTargetRequest.builder(); - if (ObjectHelper.isNotEmpty(exchange.getIn().getHeader(EventbridgeConstants.TARGET_ARN))) { - String targetArn = exchange.getIn().getHeader(EventbridgeConstants.TARGET_ARN, String.class); - builder.targetArn(targetArn); - } - builder.eventBusName(getConfiguration().getEventbusName()); - ListRuleNamesByTargetResponse result; - try { - result = eventbridgeClient.listRuleNamesByTarget(builder.build()); - } catch (AwsServiceException ase) { - LOG.trace("List Rule by Target command returned the error code {}", ase.awsErrorDetails().errorCode()); - throw ase; - } - Message message = getMessageForResponse(exchange); - message.setBody(result); - } + executeOperation( + exchange, + ListRuleNamesByTargetRequest.class, + eventbridgeClient::listRuleNamesByTarget, + () -> { + ListRuleNamesByTargetRequest.Builder builder = ListRuleNamesByTargetRequest.builder(); + String targetArn = getOptionalHeader(exchange, EventbridgeConstants.TARGET_ARN, String.class); + if (targetArn != null) { + builder.targetArn(targetArn); + } + String nextToken = getOptionalHeader(exchange, EventbridgeConstants.NEXT_TOKEN, String.class); + if (nextToken != null) { + builder.nextToken(nextToken); + } + Integer limit = getOptionalHeader(exchange, EventbridgeConstants.LIMIT, Integer.class); + if (limit != null) { + builder.limit(limit); + } + builder.eventBusName(getConfiguration().getEventbusName()); + return eventbridgeClient.listRuleNamesByTarget(builder.build()); + }, + "List Rule Names by Target", + (ListRuleNamesByTargetResponse response, Message message) -> { + message.setHeader(EventbridgeConstants.NEXT_TOKEN, response.nextToken()); + message.setHeader(EventbridgeConstants.IS_TRUNCATED, response.nextToken() != null); + }); } private void putEvent(EventBridgeClient eventbridgeClient, Exchange exchange) throws InvalidPayloadException { + executeOperation( + exchange, + PutEventsRequest.class, + eventbridgeClient::putEvents, + () -> { + PutEventsRequest.Builder builder = PutEventsRequest.builder(); + PutEventsRequestEntry.Builder entryBuilder = PutEventsRequestEntry.builder(); + String resourcesArn = getRequiredHeader(exchange, EventbridgeConstants.EVENT_RESOURCES_ARN, String.class, + "At least one resource ARN must be specified"); + entryBuilder.resources(Stream.of(resourcesArn.split(",")).toList()); + String detailType = getRequiredHeader(exchange, EventbridgeConstants.EVENT_DETAIL_TYPE, String.class, + "Detail Type must be specified"); + entryBuilder.detailType(detailType); + String source = getRequiredHeader(exchange, EventbridgeConstants.EVENT_SOURCE, String.class, + "Source must be specified"); + entryBuilder.source(source); + entryBuilder.eventBusName(getConfiguration().getEventbusName()); + try { + entryBuilder.detail(exchange.getMessage().getMandatoryBody(String.class)); + } catch (InvalidPayloadException e) { + throw new RuntimeException(e); + } + builder.entries(entryBuilder.build()); + return eventbridgeClient.putEvents(builder.build()); + }, + "Put Events", + (PutEventsResponse response, Message message) -> { + message.setHeader(EventbridgeConstants.FAILED_ENTRY_COUNT, response.failedEntryCount()); + }); + } + + @Override + public EventbridgeEndpoint getEndpoint() { + return (EventbridgeEndpoint) super.getEndpoint(); + } + + public static Message getMessageForResponse(final Exchange exchange) { + return exchange.getMessage(); + } + + /** + * Executes an EventBridge operation with POJO request support. + */ + private <REQ, RES> void executeOperation( + Exchange exchange, + Class<REQ> requestClass, + Function<REQ, RES> pojoExecutor, + Supplier<RES> headerExecutor, + String operationName) + throws InvalidPayloadException { + executeOperation(exchange, requestClass, pojoExecutor, headerExecutor, operationName, null); + } + + /** + * Executes an EventBridge operation with POJO request support and optional response post-processing. + */ + private <REQ, RES> void executeOperation( + Exchange exchange, + Class<REQ> requestClass, + Function<REQ, RES> pojoExecutor, + Supplier<RES> headerExecutor, + String operationName, + BiConsumer<RES, Message> responseProcessor) + throws InvalidPayloadException { + + RES result; if (getConfiguration().isPojoRequest()) { Object payload = exchange.getIn().getMandatoryBody(); - if (payload instanceof PutEventsRequest) { - PutEventsResponse result; + if (requestClass.isInstance(payload)) { try { - result = eventbridgeClient.putEvents((PutEventsRequest) payload); + result = pojoExecutor.apply(requestClass.cast(payload)); } catch (AwsServiceException ase) { - LOG.trace("PutEvents command returned the error code {}", ase.awsErrorDetails().errorCode()); + LOG.trace("{} command returned the error code {}", operationName, ase.awsErrorDetails().errorCode()); throw ase; } - Message message = getMessageForResponse(exchange); - message.setBody(result); - } - } else { - PutEventsRequest.Builder builder = PutEventsRequest.builder(); - PutEventsRequestEntry.Builder entryBuilder = PutEventsRequestEntry.builder(); - if (ObjectHelper.isNotEmpty(exchange.getIn().getHeader(EventbridgeConstants.EVENT_RESOURCES_ARN))) { - String resourcesArn = exchange.getIn().getHeader(EventbridgeConstants.EVENT_RESOURCES_ARN, String.class); - entryBuilder.resources(Stream.of(resourcesArn.split(",")).toList()); - } else { - throw new IllegalArgumentException("At least one resource ARN must be specified"); - } - if (ObjectHelper.isNotEmpty(exchange.getIn().getHeader(EventbridgeConstants.EVENT_DETAIL_TYPE))) { - String detailType = exchange.getIn().getHeader(EventbridgeConstants.EVENT_DETAIL_TYPE, String.class); - entryBuilder.detailType(detailType); } else { - throw new IllegalArgumentException("Detail Type must be specified"); + throw new IllegalArgumentException( + String.format("Expected body of type %s but was %s", + requestClass.getName(), + payload != null ? payload.getClass().getName() : "null")); } - if (ObjectHelper.isNotEmpty(exchange.getIn().getHeader(EventbridgeConstants.EVENT_SOURCE))) { - String source = exchange.getIn().getHeader(EventbridgeConstants.EVENT_SOURCE, String.class); - entryBuilder.source(source); - } else { - throw new IllegalArgumentException("Source must be specified"); - } - entryBuilder.eventBusName(getConfiguration().getEventbusName()); - entryBuilder.detail(exchange.getMessage().getMandatoryBody(String.class)); - - builder.entries(entryBuilder.build()); - PutEventsResponse result; + } else { try { - result = eventbridgeClient.putEvents(builder.build()); + result = headerExecutor.get(); } catch (AwsServiceException ase) { - LOG.trace("Put Events command returned the error code {}", ase.awsErrorDetails().errorCode()); + LOG.trace("{} command returned the error code {}", operationName, ase.awsErrorDetails().errorCode()); throw ase; } - Message message = getMessageForResponse(exchange); - message.setBody(result); + } + Message message = getMessageForResponse(exchange); + message.setBody(result); + if (responseProcessor != null) { + responseProcessor.accept(result, message); } } - @Override - public EventbridgeEndpoint getEndpoint() { - return (EventbridgeEndpoint) super.getEndpoint(); + /** + * Gets a required header value or throws an IllegalArgumentException. + */ + private <T> T getRequiredHeader(Exchange exchange, String headerName, Class<T> headerType, String errorMessage) { + T value = exchange.getIn().getHeader(headerName, headerType); + if (ObjectHelper.isEmpty(value)) { + throw new IllegalArgumentException(errorMessage); + } + return value; } - public static Message getMessageForResponse(final Exchange exchange) { - return exchange.getMessage(); + /** + * Gets an optional header value. + */ + private <T> T getOptionalHeader(Exchange exchange, String headerName, Class<T> headerType) { + return exchange.getIn().getHeader(headerName, headerType); } @Override diff --git a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/EventbridgeEndpointBuilderFactory.java b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/EventbridgeEndpointBuilderFactory.java index 8594f24b1565..d0c3512a6625 100644 --- a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/EventbridgeEndpointBuilderFactory.java +++ b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/EventbridgeEndpointBuilderFactory.java @@ -731,6 +731,67 @@ public interface EventbridgeEndpointBuilderFactory { public String awsEventbridgeDetailType() { return "CamelAwsEventbridgeDetailType"; } + /** + * The token for the next set of results. + * + * The option is a: {@code String} type. + * + * Group: listRules listTargetsByRule listRuleNamesByTarget + * + * @return the name of the header {@code AwsEventbridgeNextToken}. + */ + public String awsEventbridgeNextToken() { + return "CamelAwsEventbridgeNextToken"; + } + /** + * The maximum number of results to return. + * + * The option is a: {@code Integer} type. + * + * Group: listRules listTargetsByRule listRuleNamesByTarget + * + * @return the name of the header {@code AwsEventbridgeLimit}. + */ + public String awsEventbridgeLimit() { + return "CamelAwsEventbridgeLimit"; + } + /** + * Whether the response has more results (is truncated). + * + * The option is a: {@code Boolean} type. + * + * Group: listRules listTargetsByRule listRuleNamesByTarget + * + * @return the name of the header {@code AwsEventbridgeIsTruncated}. + */ + public String awsEventbridgeIsTruncated() { + return "CamelAwsEventbridgeIsTruncated"; + } + /** + * The Amazon Resource Name (ARN) of the rule. + * + * The option is a: {@code String} type. + * + * Group: putRule describeRule + * + * @return the name of the header {@code AwsEventbridgeRuleArn}. + */ + public String awsEventbridgeRuleArn() { + return "CamelAwsEventbridgeRuleArn"; + } + /** + * The number of failed entries in the response. + * + * The option is a: {@code Integer} type. + * + * Group: putEvent putTargets removeTargets + * + * @return the name of the header {@code + * AwsEventbridgeFailedEntryCount}. + */ + public String awsEventbridgeFailedEntryCount() { + return "CamelAwsEventbridgeFailedEntryCount"; + } } static EventbridgeEndpointBuilder endpointBuilder(String componentName, String path) { class EventbridgeEndpointBuilderImpl extends AbstractEndpointBuilder implements EventbridgeEndpointBuilder, AdvancedEventbridgeEndpointBuilder {
