This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch 22805-3 in repository https://gitbox.apache.org/repos/asf/camel.git
commit 6523c2189ba96cfbf2a14f4143180c85f00da047 Author: Andrea Cosentino <[email protected]> AuthorDate: Mon Jan 12 14:58:13 2026 +0100 CAMEL-22842 - Camel-AWS components: Avoid duplicated code and add pagination to producer operation where it makes sense - AWS Kinesis Signed-off-by: Andrea Cosentino <[email protected]> --- .../catalog/components/aws2-kinesis-firehose.json | 6 +- .../camel/catalog/components/aws2-kinesis.json | 4 +- .../aws2/firehose/aws2-kinesis-firehose.json | 6 +- .../camel/component/aws2/kinesis/aws2-kinesis.json | 4 +- .../aws2/firehose/KinesisFirehose2Constants.java | 14 ++ .../aws2/firehose/KinesisFirehose2Producer.java | 200 ++++++++++++++------- .../component/aws2/kinesis/Kinesis2Constants.java | 8 + .../component/aws2/kinesis/Kinesis2Producer.java | 7 + .../dsl/Kinesis2EndpointBuilderFactory.java | 24 +++ .../KinesisFirehose2EndpointBuilderFactory.java | 51 ++++++ 10 files changed, 251 insertions(+), 73 deletions(-) diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/aws2-kinesis-firehose.json b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/aws2-kinesis-firehose.json index cec21273e67b..86486b58dfc5 100644 --- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/aws2-kinesis-firehose.json +++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/aws2-kinesis-firehose.json @@ -48,7 +48,11 @@ "headers": { "CamelAwsKinesisFirehoseRecordId": { "index": 0, "kind": "header", "displayName": "", "group": "producer", "label": "", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The record ID, as defined in http:\/\/docs.aws.amazon.com\/firehose\/latest\/APIReference\/API_PutRecord.html#API_PutRecord_ResponseSyntaxResponse Syntax", "constantName": "org.apache.camel.component.aws2.firehose.KinesisFirehose2 [...] "CamelAwsKinesisFirehoseOperation": { "index": 1, "kind": "header", "displayName": "", "group": "producer", "label": "", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The operation we want to perform", "constantName": "org.apache.camel.component.aws2.firehose.KinesisFirehose2Constants#KINESIS_FIREHOSE_OPERATION" }, - "CamelAwsKinesisFirehoseDeliveryStreamName": { "index": 2, "kind": "header", "displayName": "", "group": "producer", "label": "", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The name of the delivery stream.", "constantName": "org.apache.camel.component.aws2.firehose.KinesisFirehose2Constants#KINESIS_FIREHOSE_STREAM_NAME" } + "CamelAwsKinesisFirehoseDeliveryStreamName": { "index": 2, "kind": "header", "displayName": "", "group": "producer", "label": "", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The name of the delivery stream.", "constantName": "org.apache.camel.component.aws2.firehose.KinesisFirehose2Constants#KINESIS_FIREHOSE_STREAM_NAME" }, + "CamelAwsKinesisFirehoseDeliveryStreamArn": { "index": 3, "kind": "header", "displayName": "", "group": "createDeliveryStream", "label": "createDeliveryStream", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The ARN of the delivery stream.", "constantName": "org.apache.camel.component.aws2.firehose.KinesisFirehose2Constants#DELIVERY_STREAM_ARN" }, + "CamelAwsKinesisFirehoseFailedRecordCount": { "index": 4, "kind": "header", "displayName": "", "group": "sendBatchRecord", "label": "sendBatchRecord", "required": false, "javaType": "Integer", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The number of records that failed in a batch put operation.", "constantName": "org.apache.camel.component.aws2.firehose.KinesisFirehose2Constants#FAILED_RECORD_COUNT" }, + "CamelAwsKinesisFirehoseEncrypted": { "index": 5, "kind": "header", "displayName": "", "group": "sendBatchRecord", "label": "sendBatchRecord", "required": false, "javaType": "Boolean", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Whether the batch operation was encrypted.", "constantName": "org.apache.camel.component.aws2.firehose.KinesisFirehose2Constants#ENCRYPTED" }, + "CamelAwsKinesisFirehoseDeliveryStreamStatus": { "index": 6, "kind": "header", "displayName": "", "group": "describeDeliveryStream", "label": "describeDeliveryStream", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The status of the delivery stream.", "constantName": "org.apache.camel.component.aws2.firehose.KinesisFirehose2Constants#DELIVERY_STREAM_STATUS" } }, "properties": { "streamName": { "index": 0, "kind": "path", "displayName": "Stream Name", "group": "producer", "label": "", "required": true, "type": "string", "javaType": "java.lang.String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.aws2.firehose.KinesisFirehose2Configuration", "configurationField": "configuration", "description": "Name of the stream" }, diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/aws2-kinesis.json b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/aws2-kinesis.json index 770eef58d151..def81dc36333 100644 --- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/aws2-kinesis.json +++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/aws2-kinesis.json @@ -67,7 +67,9 @@ "CamelAwsKinesisPartitionKey": { "index": 2, "kind": "header", "displayName": "", "group": "common", "label": "", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Identifies which shard in the stream the data record is assigned to.", "constantName": "org.apache.camel.component.aws2.kinesis.Kinesis2Constants#PARTITION_KEY" }, "CamelMessageTimestamp": { "index": 3, "kind": "header", "displayName": "", "group": "common", "label": "", "required": false, "javaType": "long", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The timestamp of the message", "constantName": "org.apache.camel.component.aws2.kinesis.Kinesis2Constants#MESSAGE_TIMESTAMP" }, "CamelKinesisDbResumeAction": { "index": 4, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The resume action to execute when resuming.", "constantName": "org.apache.camel.component.aws2.kinesis.Kinesis2Constants#RESUME_ACTION" }, - "CamelAwsKinesisShardId": { "index": 5, "kind": "header", "displayName": "", "group": "common", "label": "", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The shard ID of the shard where the data record was placed.", "constantName": "org.apache.camel.component.aws2.kinesis.Kinesis2Constants#SHARD_ID" } + "CamelAwsKinesisShardId": { "index": 5, "kind": "header", "displayName": "", "group": "common", "label": "", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The shard ID of the shard where the data record was placed.", "constantName": "org.apache.camel.component.aws2.kinesis.Kinesis2Constants#SHARD_ID" }, + "CamelAwsKinesisFailedRecordCount": { "index": 6, "kind": "header", "displayName": "", "group": "producer", "label": "producer", "required": false, "javaType": "Integer", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The number of records that failed in a batch put operation.", "constantName": "org.apache.camel.component.aws2.kinesis.Kinesis2Constants#FAILED_RECORD_COUNT" }, + "CamelAwsKinesisRecordCount": { "index": 7, "kind": "header", "displayName": "", "group": "producer", "label": "producer", "required": false, "javaType": "Integer", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The total number of records in a batch put operation.", "constantName": "org.apache.camel.component.aws2.kinesis.Kinesis2Constants#RECORD_COUNT" } }, "properties": { "streamName": { "index": 0, "kind": "path", "displayName": "Stream Name", "group": "common", "label": "", "required": true, "type": "string", "javaType": "java.lang.String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.aws2.kinesis.Kinesis2Configuration", "configurationField": "configuration", "description": "Name of the stream" }, diff --git a/components/camel-aws/camel-aws2-kinesis/src/generated/resources/META-INF/org/apache/camel/component/aws2/firehose/aws2-kinesis-firehose.json b/components/camel-aws/camel-aws2-kinesis/src/generated/resources/META-INF/org/apache/camel/component/aws2/firehose/aws2-kinesis-firehose.json index cec21273e67b..86486b58dfc5 100644 --- a/components/camel-aws/camel-aws2-kinesis/src/generated/resources/META-INF/org/apache/camel/component/aws2/firehose/aws2-kinesis-firehose.json +++ b/components/camel-aws/camel-aws2-kinesis/src/generated/resources/META-INF/org/apache/camel/component/aws2/firehose/aws2-kinesis-firehose.json @@ -48,7 +48,11 @@ "headers": { "CamelAwsKinesisFirehoseRecordId": { "index": 0, "kind": "header", "displayName": "", "group": "producer", "label": "", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The record ID, as defined in http:\/\/docs.aws.amazon.com\/firehose\/latest\/APIReference\/API_PutRecord.html#API_PutRecord_ResponseSyntaxResponse Syntax", "constantName": "org.apache.camel.component.aws2.firehose.KinesisFirehose2 [...] "CamelAwsKinesisFirehoseOperation": { "index": 1, "kind": "header", "displayName": "", "group": "producer", "label": "", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The operation we want to perform", "constantName": "org.apache.camel.component.aws2.firehose.KinesisFirehose2Constants#KINESIS_FIREHOSE_OPERATION" }, - "CamelAwsKinesisFirehoseDeliveryStreamName": { "index": 2, "kind": "header", "displayName": "", "group": "producer", "label": "", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The name of the delivery stream.", "constantName": "org.apache.camel.component.aws2.firehose.KinesisFirehose2Constants#KINESIS_FIREHOSE_STREAM_NAME" } + "CamelAwsKinesisFirehoseDeliveryStreamName": { "index": 2, "kind": "header", "displayName": "", "group": "producer", "label": "", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The name of the delivery stream.", "constantName": "org.apache.camel.component.aws2.firehose.KinesisFirehose2Constants#KINESIS_FIREHOSE_STREAM_NAME" }, + "CamelAwsKinesisFirehoseDeliveryStreamArn": { "index": 3, "kind": "header", "displayName": "", "group": "createDeliveryStream", "label": "createDeliveryStream", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The ARN of the delivery stream.", "constantName": "org.apache.camel.component.aws2.firehose.KinesisFirehose2Constants#DELIVERY_STREAM_ARN" }, + "CamelAwsKinesisFirehoseFailedRecordCount": { "index": 4, "kind": "header", "displayName": "", "group": "sendBatchRecord", "label": "sendBatchRecord", "required": false, "javaType": "Integer", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The number of records that failed in a batch put operation.", "constantName": "org.apache.camel.component.aws2.firehose.KinesisFirehose2Constants#FAILED_RECORD_COUNT" }, + "CamelAwsKinesisFirehoseEncrypted": { "index": 5, "kind": "header", "displayName": "", "group": "sendBatchRecord", "label": "sendBatchRecord", "required": false, "javaType": "Boolean", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Whether the batch operation was encrypted.", "constantName": "org.apache.camel.component.aws2.firehose.KinesisFirehose2Constants#ENCRYPTED" }, + "CamelAwsKinesisFirehoseDeliveryStreamStatus": { "index": 6, "kind": "header", "displayName": "", "group": "describeDeliveryStream", "label": "describeDeliveryStream", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The status of the delivery stream.", "constantName": "org.apache.camel.component.aws2.firehose.KinesisFirehose2Constants#DELIVERY_STREAM_STATUS" } }, "properties": { "streamName": { "index": 0, "kind": "path", "displayName": "Stream Name", "group": "producer", "label": "", "required": true, "type": "string", "javaType": "java.lang.String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.aws2.firehose.KinesisFirehose2Configuration", "configurationField": "configuration", "description": "Name of the stream" }, diff --git a/components/camel-aws/camel-aws2-kinesis/src/generated/resources/META-INF/org/apache/camel/component/aws2/kinesis/aws2-kinesis.json b/components/camel-aws/camel-aws2-kinesis/src/generated/resources/META-INF/org/apache/camel/component/aws2/kinesis/aws2-kinesis.json index 770eef58d151..def81dc36333 100644 --- a/components/camel-aws/camel-aws2-kinesis/src/generated/resources/META-INF/org/apache/camel/component/aws2/kinesis/aws2-kinesis.json +++ b/components/camel-aws/camel-aws2-kinesis/src/generated/resources/META-INF/org/apache/camel/component/aws2/kinesis/aws2-kinesis.json @@ -67,7 +67,9 @@ "CamelAwsKinesisPartitionKey": { "index": 2, "kind": "header", "displayName": "", "group": "common", "label": "", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Identifies which shard in the stream the data record is assigned to.", "constantName": "org.apache.camel.component.aws2.kinesis.Kinesis2Constants#PARTITION_KEY" }, "CamelMessageTimestamp": { "index": 3, "kind": "header", "displayName": "", "group": "common", "label": "", "required": false, "javaType": "long", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The timestamp of the message", "constantName": "org.apache.camel.component.aws2.kinesis.Kinesis2Constants#MESSAGE_TIMESTAMP" }, "CamelKinesisDbResumeAction": { "index": 4, "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The resume action to execute when resuming.", "constantName": "org.apache.camel.component.aws2.kinesis.Kinesis2Constants#RESUME_ACTION" }, - "CamelAwsKinesisShardId": { "index": 5, "kind": "header", "displayName": "", "group": "common", "label": "", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The shard ID of the shard where the data record was placed.", "constantName": "org.apache.camel.component.aws2.kinesis.Kinesis2Constants#SHARD_ID" } + "CamelAwsKinesisShardId": { "index": 5, "kind": "header", "displayName": "", "group": "common", "label": "", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The shard ID of the shard where the data record was placed.", "constantName": "org.apache.camel.component.aws2.kinesis.Kinesis2Constants#SHARD_ID" }, + "CamelAwsKinesisFailedRecordCount": { "index": 6, "kind": "header", "displayName": "", "group": "producer", "label": "producer", "required": false, "javaType": "Integer", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The number of records that failed in a batch put operation.", "constantName": "org.apache.camel.component.aws2.kinesis.Kinesis2Constants#FAILED_RECORD_COUNT" }, + "CamelAwsKinesisRecordCount": { "index": 7, "kind": "header", "displayName": "", "group": "producer", "label": "producer", "required": false, "javaType": "Integer", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The total number of records in a batch put operation.", "constantName": "org.apache.camel.component.aws2.kinesis.Kinesis2Constants#RECORD_COUNT" } }, "properties": { "streamName": { "index": 0, "kind": "path", "displayName": "Stream Name", "group": "common", "label": "", "required": true, "type": "string", "javaType": "java.lang.String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.aws2.kinesis.Kinesis2Configuration", "configurationField": "configuration", "description": "Name of the stream" }, diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Constants.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Constants.java index bb64a316670c..83c91c92ffc2 100644 --- a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Constants.java +++ b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Constants.java @@ -28,4 +28,18 @@ public interface KinesisFirehose2Constants { String KINESIS_FIREHOSE_OPERATION = "CamelAwsKinesisFirehoseOperation"; @Metadata(description = "The name of the delivery stream.", javaType = "String") String KINESIS_FIREHOSE_STREAM_NAME = "CamelAwsKinesisFirehoseDeliveryStreamName"; + + // Response metadata + @Metadata(label = "createDeliveryStream", + description = "The ARN of the delivery stream.", javaType = "String") + String DELIVERY_STREAM_ARN = "CamelAwsKinesisFirehoseDeliveryStreamArn"; + @Metadata(label = "sendBatchRecord", + description = "The number of records that failed in a batch put operation.", javaType = "Integer") + String FAILED_RECORD_COUNT = "CamelAwsKinesisFirehoseFailedRecordCount"; + @Metadata(label = "sendBatchRecord", + description = "Whether the batch operation was encrypted.", javaType = "Boolean") + String ENCRYPTED = "CamelAwsKinesisFirehoseEncrypted"; + @Metadata(label = "describeDeliveryStream", + description = "The status of the delivery stream.", javaType = "String") + String DELIVERY_STREAM_STATUS = "CamelAwsKinesisFirehoseDeliveryStreamStatus"; } diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Producer.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Producer.java index 04a55dd7bee8..cfd55e00706e 100644 --- a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Producer.java +++ b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Producer.java @@ -18,19 +18,23 @@ package org.apache.camel.component.aws2.firehose; import java.nio.ByteBuffer; import java.util.Collection; +import java.util.function.BiConsumer; +import java.util.function.Function; +import java.util.function.Supplier; import org.apache.camel.Exchange; +import org.apache.camel.InvalidPayloadException; import org.apache.camel.Message; import org.apache.camel.support.DefaultProducer; import org.apache.camel.util.ObjectHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import software.amazon.awssdk.awscore.exception.AwsServiceException; import software.amazon.awssdk.core.SdkBytes; import software.amazon.awssdk.services.firehose.FirehoseClient; import software.amazon.awssdk.services.firehose.model.CreateDeliveryStreamRequest; import software.amazon.awssdk.services.firehose.model.CreateDeliveryStreamResponse; import software.amazon.awssdk.services.firehose.model.DeleteDeliveryStreamRequest; -import software.amazon.awssdk.services.firehose.model.DeleteDeliveryStreamResponse; import software.amazon.awssdk.services.firehose.model.DescribeDeliveryStreamRequest; import software.amazon.awssdk.services.firehose.model.DescribeDeliveryStreamResponse; import software.amazon.awssdk.services.firehose.model.PutRecordBatchRequest; @@ -39,7 +43,6 @@ import software.amazon.awssdk.services.firehose.model.PutRecordRequest; import software.amazon.awssdk.services.firehose.model.PutRecordResponse; import software.amazon.awssdk.services.firehose.model.Record; import software.amazon.awssdk.services.firehose.model.UpdateDestinationRequest; -import software.amazon.awssdk.services.firehose.model.UpdateDestinationResponse; public class KinesisFirehose2Producer extends DefaultProducer { @@ -82,89 +85,88 @@ public class KinesisFirehose2Producer extends DefaultProducer { } } - private void createDeliveryStream(FirehoseClient client, Exchange exchange) { - if (exchange.getIn().getBody() instanceof CreateDeliveryStreamRequest) { - CreateDeliveryStreamRequest req = exchange.getIn().getBody(CreateDeliveryStreamRequest.class); - CreateDeliveryStreamResponse result = client.createDeliveryStream(req); - Message message = getMessageForResponse(exchange); - message.setBody(result); - } else { - throw new IllegalArgumentException( - "The createDeliveryStream operation expects a CreateDeliveryStream instance as body"); - } + private void createDeliveryStream(FirehoseClient client, Exchange exchange) throws InvalidPayloadException { + executeOperation( + exchange, + CreateDeliveryStreamRequest.class, + client::createDeliveryStream, + null, + "createDeliveryStream", + (CreateDeliveryStreamResponse response, Message message) -> { + message.setHeader(KinesisFirehose2Constants.DELIVERY_STREAM_ARN, response.deliveryStreamARN()); + }); } - private void deleteDeliveryStream(FirehoseClient client, Exchange exchange) { - if (exchange.getIn().getBody() instanceof DeleteDeliveryStreamRequest) { - DeleteDeliveryStreamRequest req = exchange.getIn().getBody(DeleteDeliveryStreamRequest.class); - DeleteDeliveryStreamResponse result = client.deleteDeliveryStream(req); - Message message = getMessageForResponse(exchange); - message.setBody(result); - } else { - if (ObjectHelper.isNotEmpty(exchange.getIn().getHeader(KinesisFirehose2Constants.KINESIS_FIREHOSE_STREAM_NAME))) { - DeleteDeliveryStreamRequest req = DeleteDeliveryStreamRequest.builder() - .deliveryStreamName(exchange.getIn().getHeader(KinesisFirehose2Constants.KINESIS_FIREHOSE_STREAM_NAME, - String.class)) - .build(); - DeleteDeliveryStreamResponse result = client.deleteDeliveryStream(req); - Message message = getMessageForResponse(exchange); - message.setBody(result); - } else { - throw new IllegalArgumentException( - "The deleteDeliveryStream operation expects at least an delivery stream name header or a DeleteDeliveryStreamRequest instance"); - } - } + private void deleteDeliveryStream(FirehoseClient client, Exchange exchange) throws InvalidPayloadException { + executeOperation( + exchange, + DeleteDeliveryStreamRequest.class, + client::deleteDeliveryStream, + () -> { + String streamName = getOptionalHeader(exchange, KinesisFirehose2Constants.KINESIS_FIREHOSE_STREAM_NAME, + String.class); + if (streamName == null) { + throw new IllegalArgumentException( + "The deleteDeliveryStream operation expects at least a delivery stream name header or a DeleteDeliveryStreamRequest instance"); + } + return client.deleteDeliveryStream( + DeleteDeliveryStreamRequest.builder().deliveryStreamName(streamName).build()); + }, + "deleteDeliveryStream"); } - private void updateDestination(FirehoseClient client, Exchange exchange) { - if (exchange.getIn().getBody() instanceof CreateDeliveryStreamRequest) { - UpdateDestinationRequest req = exchange.getIn().getBody(UpdateDestinationRequest.class); - UpdateDestinationResponse result = client.updateDestination(req); - Message message = getMessageForResponse(exchange); - message.setBody(result); - } else { - throw new IllegalArgumentException( - "The updateDestination operation expects an UpdateDestinationRequest instance as body"); - } + private void updateDestination(FirehoseClient client, Exchange exchange) throws InvalidPayloadException { + executeOperation( + exchange, + UpdateDestinationRequest.class, + client::updateDestination, + null, + "updateDestination"); } - private void describeDeliveryStream(FirehoseClient client, Exchange exchange) { - if (exchange.getIn().getBody() instanceof DescribeDeliveryStreamRequest) { - DescribeDeliveryStreamRequest req = exchange.getIn().getBody(DescribeDeliveryStreamRequest.class); - DescribeDeliveryStreamResponse result = client.describeDeliveryStream(req); - Message message = getMessageForResponse(exchange); - message.setBody(result); - } else { - if (ObjectHelper.isNotEmpty(exchange.getIn().getHeader(KinesisFirehose2Constants.KINESIS_FIREHOSE_STREAM_NAME))) { - DescribeDeliveryStreamRequest req = DescribeDeliveryStreamRequest.builder() - .deliveryStreamName(exchange.getIn().getHeader(KinesisFirehose2Constants.KINESIS_FIREHOSE_STREAM_NAME, - String.class)) - .build(); - DescribeDeliveryStreamResponse result = client.describeDeliveryStream(req); - Message message = getMessageForResponse(exchange); - message.setBody(result); - } else { - throw new IllegalArgumentException( - "The describeDeliveryStream operation expects at least an delivery stream name header or a DeleteDeliveryStreamRequest instance"); - } - } + private void describeDeliveryStream(FirehoseClient client, Exchange exchange) throws InvalidPayloadException { + executeOperation( + exchange, + DescribeDeliveryStreamRequest.class, + client::describeDeliveryStream, + () -> { + String streamName = getOptionalHeader(exchange, KinesisFirehose2Constants.KINESIS_FIREHOSE_STREAM_NAME, + String.class); + if (streamName == null) { + throw new IllegalArgumentException( + "The describeDeliveryStream operation expects at least a delivery stream name header or a DescribeDeliveryStreamRequest instance"); + } + return client.describeDeliveryStream( + DescribeDeliveryStreamRequest.builder().deliveryStreamName(streamName).build()); + }, + "describeDeliveryStream", + (DescribeDeliveryStreamResponse response, Message message) -> { + if (response.deliveryStreamDescription() != null) { + message.setHeader(KinesisFirehose2Constants.DELIVERY_STREAM_ARN, + response.deliveryStreamDescription().deliveryStreamARN()); + message.setHeader(KinesisFirehose2Constants.DELIVERY_STREAM_STATUS, + response.deliveryStreamDescription().deliveryStreamStatusAsString()); + } + }); } + @SuppressWarnings("unchecked") private void sendBatchRecord(FirehoseClient client, Exchange exchange) { + PutRecordBatchResponse result; if (exchange.getIn().getBody() instanceof Iterable) { - Iterable c = exchange.getIn().getBody(Iterable.class); + Iterable<Record> c = exchange.getIn().getBody(Iterable.class); PutRecordBatchRequest.Builder batchRequest = PutRecordBatchRequest.builder(); batchRequest.deliveryStreamName(getEndpoint().getConfiguration().getStreamName()); batchRequest.records((Collection<Record>) c); - PutRecordBatchResponse result = client.putRecordBatch(batchRequest.build()); - Message message = getMessageForResponse(exchange); - message.setBody(result); + result = client.putRecordBatch(batchRequest.build()); } else { PutRecordBatchRequest req = exchange.getIn().getBody(PutRecordBatchRequest.class); - PutRecordBatchResponse result = client.putRecordBatch(req); - Message message = getMessageForResponse(exchange); - message.setBody(result); + result = client.putRecordBatch(req); } + Message message = getMessageForResponse(exchange); + message.setBody(result); + message.setHeader(KinesisFirehose2Constants.FAILED_RECORD_COUNT, result.failedPutCount()); + message.setHeader(KinesisFirehose2Constants.ENCRYPTED, result.encrypted()); } public void processSingleRecord(final Exchange exchange) { @@ -207,4 +209,64 @@ public class KinesisFirehose2Producer extends DefaultProducer { } return operation; } + + /** + * Executes a Firehose operation with POJO request support. + */ + private <REQ, RES> void executeOperation( + Exchange exchange, + Class<REQ> requestClass, + Function<REQ, RES> pojoExecutor, + Supplier<RES> headerExecutor, + String operationName) + throws InvalidPayloadException { + executeOperation(exchange, requestClass, pojoExecutor, headerExecutor, operationName, null); + } + + /** + * Executes a Firehose operation with POJO request support and optional response post-processing. + */ + private <REQ, RES> void executeOperation( + Exchange exchange, + Class<REQ> requestClass, + Function<REQ, RES> pojoExecutor, + Supplier<RES> headerExecutor, + String operationName, + BiConsumer<RES, Message> responseProcessor) + throws InvalidPayloadException { + + RES result; + Object payload = exchange.getIn().getBody(); + if (requestClass.isInstance(payload)) { + try { + result = pojoExecutor.apply(requestClass.cast(payload)); + } catch (AwsServiceException ase) { + LOG.trace("{} command returned the error code {}", operationName, ase.awsErrorDetails().errorCode()); + throw ase; + } + } else if (headerExecutor != null) { + try { + result = headerExecutor.get(); + } catch (AwsServiceException ase) { + LOG.trace("{} command returned the error code {}", operationName, ase.awsErrorDetails().errorCode()); + throw ase; + } + } else { + throw new IllegalArgumentException( + String.format("The %s operation expects a %s instance as body", + operationName, requestClass.getSimpleName())); + } + Message message = getMessageForResponse(exchange); + message.setBody(result); + if (responseProcessor != null) { + responseProcessor.accept(result, message); + } + } + + /** + * Gets an optional header value. + */ + private <T> T getOptionalHeader(Exchange exchange, String headerName, Class<T> headerType) { + return exchange.getIn().getHeader(headerName, headerType); + } } diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Constants.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Constants.java index 78b6db63a1d3..5473cef5a58f 100644 --- a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Constants.java +++ b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Constants.java @@ -40,4 +40,12 @@ public interface Kinesis2Constants { */ @Metadata(description = "The shard ID of the shard where the data record was placed.", javaType = "String") String SHARD_ID = "CamelAwsKinesisShardId"; + + // Batch operation response metadata + @Metadata(label = "producer", + description = "The number of records that failed in a batch put operation.", javaType = "Integer") + String FAILED_RECORD_COUNT = "CamelAwsKinesisFailedRecordCount"; + @Metadata(label = "producer", + description = "The total number of records in a batch put operation.", javaType = "Integer") + String RECORD_COUNT = "CamelAwsKinesisRecordCount"; } diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Producer.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Producer.java index c51fe777379c..45594f657a56 100644 --- a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Producer.java +++ b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Producer.java @@ -77,18 +77,25 @@ public class Kinesis2Producer extends DefaultProducer { Object partitionKey = exchange.getIn().getHeader(Kinesis2Constants.PARTITION_KEY); ensurePartitionKeyNotNull(partitionKey); List<List<PutRecordsRequestEntry>> requestBatchList = createRequestBatchList(exchange, partitionKey); + int totalRecordCount = 0; + int totalFailedCount = 0; for (List<PutRecordsRequestEntry> requestBatch : requestBatchList) { PutRecordsRequest putRecordsRequest = PutRecordsRequest.builder() .streamName(getEndpoint().getConfiguration().getStreamName()) .records(requestBatch) .build(); PutRecordsResponse putRecordsResponse = connection.getClient(getEndpoint()).putRecords(putRecordsRequest); + totalRecordCount += putRecordsResponse.records().size(); + totalFailedCount += putRecordsResponse.failedRecordCount(); if (putRecordsResponse.failedRecordCount() > 0) { throw new RuntimeException( "Failed to send records " + putRecordsResponse.failedRecordCount() + " of " + putRecordsResponse.records().size()); } } + Message message = exchange.getMessage(); + message.setHeader(Kinesis2Constants.RECORD_COUNT, totalRecordCount); + message.setHeader(Kinesis2Constants.FAILED_RECORD_COUNT, totalFailedCount); } private List<List<PutRecordsRequestEntry>> createRequestBatchList(Exchange exchange, Object partitionKey) { diff --git a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/Kinesis2EndpointBuilderFactory.java b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/Kinesis2EndpointBuilderFactory.java index fe9b2528214a..d5953e1a03cc 100644 --- a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/Kinesis2EndpointBuilderFactory.java +++ b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/Kinesis2EndpointBuilderFactory.java @@ -2889,6 +2889,30 @@ public interface Kinesis2EndpointBuilderFactory { public String awsKinesisShardId() { return "CamelAwsKinesisShardId"; } + /** + * The number of records that failed in a batch put operation. + * + * The option is a: {@code Integer} type. + * + * Group: producer + * + * @return the name of the header {@code AwsKinesisFailedRecordCount}. + */ + public String awsKinesisFailedRecordCount() { + return "CamelAwsKinesisFailedRecordCount"; + } + /** + * The total number of records in a batch put operation. + * + * The option is a: {@code Integer} type. + * + * Group: producer + * + * @return the name of the header {@code AwsKinesisRecordCount}. + */ + public String awsKinesisRecordCount() { + return "CamelAwsKinesisRecordCount"; + } } static Kinesis2EndpointBuilder endpointBuilder(String componentName, String path) { class Kinesis2EndpointBuilderImpl extends AbstractEndpointBuilder implements Kinesis2EndpointBuilder, AdvancedKinesis2EndpointBuilder { diff --git a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KinesisFirehose2EndpointBuilderFactory.java b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KinesisFirehose2EndpointBuilderFactory.java index 84d70512b601..4484a5c880ed 100644 --- a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KinesisFirehose2EndpointBuilderFactory.java +++ b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KinesisFirehose2EndpointBuilderFactory.java @@ -638,6 +638,57 @@ public interface KinesisFirehose2EndpointBuilderFactory { public String awsKinesisFirehoseDeliveryStreamName() { return "CamelAwsKinesisFirehoseDeliveryStreamName"; } + /** + * The ARN of the delivery stream. + * + * The option is a: {@code String} type. + * + * Group: createDeliveryStream + * + * @return the name of the header {@code + * AwsKinesisFirehoseDeliveryStreamArn}. + */ + public String awsKinesisFirehoseDeliveryStreamArn() { + return "CamelAwsKinesisFirehoseDeliveryStreamArn"; + } + /** + * The number of records that failed in a batch put operation. + * + * The option is a: {@code Integer} type. + * + * Group: sendBatchRecord + * + * @return the name of the header {@code + * AwsKinesisFirehoseFailedRecordCount}. + */ + public String awsKinesisFirehoseFailedRecordCount() { + return "CamelAwsKinesisFirehoseFailedRecordCount"; + } + /** + * Whether the batch operation was encrypted. + * + * The option is a: {@code Boolean} type. + * + * Group: sendBatchRecord + * + * @return the name of the header {@code AwsKinesisFirehoseEncrypted}. + */ + public String awsKinesisFirehoseEncrypted() { + return "CamelAwsKinesisFirehoseEncrypted"; + } + /** + * The status of the delivery stream. + * + * The option is a: {@code String} type. + * + * Group: describeDeliveryStream + * + * @return the name of the header {@code + * AwsKinesisFirehoseDeliveryStreamStatus}. + */ + public String awsKinesisFirehoseDeliveryStreamStatus() { + return "CamelAwsKinesisFirehoseDeliveryStreamStatus"; + } } static KinesisFirehose2EndpointBuilder endpointBuilder(String componentName, String path) { class KinesisFirehose2EndpointBuilderImpl extends AbstractEndpointBuilder implements KinesisFirehose2EndpointBuilder, AdvancedKinesisFirehose2EndpointBuilder {
