This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch 22805-3
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 6523c2189ba96cfbf2a14f4143180c85f00da047
Author: Andrea Cosentino <[email protected]>
AuthorDate: Mon Jan 12 14:58:13 2026 +0100

    CAMEL-22842 - Camel-AWS components: Avoid duplicated code and add 
pagination to producer operation where it makes sense - AWS Kinesis
    
    Signed-off-by: Andrea Cosentino <[email protected]>
---
 .../catalog/components/aws2-kinesis-firehose.json  |   6 +-
 .../camel/catalog/components/aws2-kinesis.json     |   4 +-
 .../aws2/firehose/aws2-kinesis-firehose.json       |   6 +-
 .../camel/component/aws2/kinesis/aws2-kinesis.json |   4 +-
 .../aws2/firehose/KinesisFirehose2Constants.java   |  14 ++
 .../aws2/firehose/KinesisFirehose2Producer.java    | 200 ++++++++++++++-------
 .../component/aws2/kinesis/Kinesis2Constants.java  |   8 +
 .../component/aws2/kinesis/Kinesis2Producer.java   |   7 +
 .../dsl/Kinesis2EndpointBuilderFactory.java        |  24 +++
 .../KinesisFirehose2EndpointBuilderFactory.java    |  51 ++++++
 10 files changed, 251 insertions(+), 73 deletions(-)

diff --git 
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/aws2-kinesis-firehose.json
 
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/aws2-kinesis-firehose.json
index cec21273e67b..86486b58dfc5 100644
--- 
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/aws2-kinesis-firehose.json
+++ 
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/aws2-kinesis-firehose.json
@@ -48,7 +48,11 @@
   "headers": {
     "CamelAwsKinesisFirehoseRecordId": { "index": 0, "kind": "header", 
"displayName": "", "group": "producer", "label": "", "required": false, 
"javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": 
false, "secret": false, "description": "The record ID, as defined in 
http:\/\/docs.aws.amazon.com\/firehose\/latest\/APIReference\/API_PutRecord.html#API_PutRecord_ResponseSyntaxResponse
 Syntax", "constantName": 
"org.apache.camel.component.aws2.firehose.KinesisFirehose2 [...]
     "CamelAwsKinesisFirehoseOperation": { "index": 1, "kind": "header", 
"displayName": "", "group": "producer", "label": "", "required": false, 
"javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": 
false, "secret": false, "description": "The operation we want to perform", 
"constantName": 
"org.apache.camel.component.aws2.firehose.KinesisFirehose2Constants#KINESIS_FIREHOSE_OPERATION"
 },
-    "CamelAwsKinesisFirehoseDeliveryStreamName": { "index": 2, "kind": 
"header", "displayName": "", "group": "producer", "label": "", "required": 
false, "javaType": "String", "deprecated": false, "deprecationNote": "", 
"autowired": false, "secret": false, "description": "The name of the delivery 
stream.", "constantName": 
"org.apache.camel.component.aws2.firehose.KinesisFirehose2Constants#KINESIS_FIREHOSE_STREAM_NAME"
 }
+    "CamelAwsKinesisFirehoseDeliveryStreamName": { "index": 2, "kind": 
"header", "displayName": "", "group": "producer", "label": "", "required": 
false, "javaType": "String", "deprecated": false, "deprecationNote": "", 
"autowired": false, "secret": false, "description": "The name of the delivery 
stream.", "constantName": 
"org.apache.camel.component.aws2.firehose.KinesisFirehose2Constants#KINESIS_FIREHOSE_STREAM_NAME"
 },
+    "CamelAwsKinesisFirehoseDeliveryStreamArn": { "index": 3, "kind": 
"header", "displayName": "", "group": "createDeliveryStream", "label": 
"createDeliveryStream", "required": false, "javaType": "String", "deprecated": 
false, "deprecationNote": "", "autowired": false, "secret": false, 
"description": "The ARN of the delivery stream.", "constantName": 
"org.apache.camel.component.aws2.firehose.KinesisFirehose2Constants#DELIVERY_STREAM_ARN"
 },
+    "CamelAwsKinesisFirehoseFailedRecordCount": { "index": 4, "kind": 
"header", "displayName": "", "group": "sendBatchRecord", "label": 
"sendBatchRecord", "required": false, "javaType": "Integer", "deprecated": 
false, "deprecationNote": "", "autowired": false, "secret": false, 
"description": "The number of records that failed in a batch put operation.", 
"constantName": 
"org.apache.camel.component.aws2.firehose.KinesisFirehose2Constants#FAILED_RECORD_COUNT"
 },
+    "CamelAwsKinesisFirehoseEncrypted": { "index": 5, "kind": "header", 
"displayName": "", "group": "sendBatchRecord", "label": "sendBatchRecord", 
"required": false, "javaType": "Boolean", "deprecated": false, 
"deprecationNote": "", "autowired": false, "secret": false, "description": 
"Whether the batch operation was encrypted.", "constantName": 
"org.apache.camel.component.aws2.firehose.KinesisFirehose2Constants#ENCRYPTED" 
},
+    "CamelAwsKinesisFirehoseDeliveryStreamStatus": { "index": 6, "kind": 
"header", "displayName": "", "group": "describeDeliveryStream", "label": 
"describeDeliveryStream", "required": false, "javaType": "String", 
"deprecated": false, "deprecationNote": "", "autowired": false, "secret": 
false, "description": "The status of the delivery stream.", "constantName": 
"org.apache.camel.component.aws2.firehose.KinesisFirehose2Constants#DELIVERY_STREAM_STATUS"
 }
   },
   "properties": {
     "streamName": { "index": 0, "kind": "path", "displayName": "Stream Name", 
"group": "producer", "label": "", "required": true, "type": "string", 
"javaType": "java.lang.String", "deprecated": false, "deprecationNote": "", 
"autowired": false, "secret": false, "configurationClass": 
"org.apache.camel.component.aws2.firehose.KinesisFirehose2Configuration", 
"configurationField": "configuration", "description": "Name of the stream" },
diff --git 
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/aws2-kinesis.json
 
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/aws2-kinesis.json
index 770eef58d151..def81dc36333 100644
--- 
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/aws2-kinesis.json
+++ 
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/aws2-kinesis.json
@@ -67,7 +67,9 @@
     "CamelAwsKinesisPartitionKey": { "index": 2, "kind": "header", 
"displayName": "", "group": "common", "label": "", "required": false, 
"javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": 
false, "secret": false, "description": "Identifies which shard in the stream 
the data record is assigned to.", "constantName": 
"org.apache.camel.component.aws2.kinesis.Kinesis2Constants#PARTITION_KEY" },
     "CamelMessageTimestamp": { "index": 3, "kind": "header", "displayName": 
"", "group": "common", "label": "", "required": false, "javaType": "long", 
"deprecated": false, "deprecationNote": "", "autowired": false, "secret": 
false, "description": "The timestamp of the message", "constantName": 
"org.apache.camel.component.aws2.kinesis.Kinesis2Constants#MESSAGE_TIMESTAMP" },
     "CamelKinesisDbResumeAction": { "index": 4, "kind": "header", 
"displayName": "", "group": "consumer", "label": "consumer", "required": false, 
"javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": 
false, "secret": false, "description": "The resume action to execute when 
resuming.", "constantName": 
"org.apache.camel.component.aws2.kinesis.Kinesis2Constants#RESUME_ACTION" },
-    "CamelAwsKinesisShardId": { "index": 5, "kind": "header", "displayName": 
"", "group": "common", "label": "", "required": false, "javaType": "String", 
"deprecated": false, "deprecationNote": "", "autowired": false, "secret": 
false, "description": "The shard ID of the shard where the data record was 
placed.", "constantName": 
"org.apache.camel.component.aws2.kinesis.Kinesis2Constants#SHARD_ID" }
+    "CamelAwsKinesisShardId": { "index": 5, "kind": "header", "displayName": 
"", "group": "common", "label": "", "required": false, "javaType": "String", 
"deprecated": false, "deprecationNote": "", "autowired": false, "secret": 
false, "description": "The shard ID of the shard where the data record was 
placed.", "constantName": 
"org.apache.camel.component.aws2.kinesis.Kinesis2Constants#SHARD_ID" },
+    "CamelAwsKinesisFailedRecordCount": { "index": 6, "kind": "header", 
"displayName": "", "group": "producer", "label": "producer", "required": false, 
"javaType": "Integer", "deprecated": false, "deprecationNote": "", "autowired": 
false, "secret": false, "description": "The number of records that failed in a 
batch put operation.", "constantName": 
"org.apache.camel.component.aws2.kinesis.Kinesis2Constants#FAILED_RECORD_COUNT" 
},
+    "CamelAwsKinesisRecordCount": { "index": 7, "kind": "header", 
"displayName": "", "group": "producer", "label": "producer", "required": false, 
"javaType": "Integer", "deprecated": false, "deprecationNote": "", "autowired": 
false, "secret": false, "description": "The total number of records in a batch 
put operation.", "constantName": 
"org.apache.camel.component.aws2.kinesis.Kinesis2Constants#RECORD_COUNT" }
   },
   "properties": {
     "streamName": { "index": 0, "kind": "path", "displayName": "Stream Name", 
"group": "common", "label": "", "required": true, "type": "string", "javaType": 
"java.lang.String", "deprecated": false, "deprecationNote": "", "autowired": 
false, "secret": false, "configurationClass": 
"org.apache.camel.component.aws2.kinesis.Kinesis2Configuration", 
"configurationField": "configuration", "description": "Name of the stream" },
diff --git 
a/components/camel-aws/camel-aws2-kinesis/src/generated/resources/META-INF/org/apache/camel/component/aws2/firehose/aws2-kinesis-firehose.json
 
b/components/camel-aws/camel-aws2-kinesis/src/generated/resources/META-INF/org/apache/camel/component/aws2/firehose/aws2-kinesis-firehose.json
index cec21273e67b..86486b58dfc5 100644
--- 
a/components/camel-aws/camel-aws2-kinesis/src/generated/resources/META-INF/org/apache/camel/component/aws2/firehose/aws2-kinesis-firehose.json
+++ 
b/components/camel-aws/camel-aws2-kinesis/src/generated/resources/META-INF/org/apache/camel/component/aws2/firehose/aws2-kinesis-firehose.json
@@ -48,7 +48,11 @@
   "headers": {
     "CamelAwsKinesisFirehoseRecordId": { "index": 0, "kind": "header", 
"displayName": "", "group": "producer", "label": "", "required": false, 
"javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": 
false, "secret": false, "description": "The record ID, as defined in 
http:\/\/docs.aws.amazon.com\/firehose\/latest\/APIReference\/API_PutRecord.html#API_PutRecord_ResponseSyntaxResponse
 Syntax", "constantName": 
"org.apache.camel.component.aws2.firehose.KinesisFirehose2 [...]
     "CamelAwsKinesisFirehoseOperation": { "index": 1, "kind": "header", 
"displayName": "", "group": "producer", "label": "", "required": false, 
"javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": 
false, "secret": false, "description": "The operation we want to perform", 
"constantName": 
"org.apache.camel.component.aws2.firehose.KinesisFirehose2Constants#KINESIS_FIREHOSE_OPERATION"
 },
-    "CamelAwsKinesisFirehoseDeliveryStreamName": { "index": 2, "kind": 
"header", "displayName": "", "group": "producer", "label": "", "required": 
false, "javaType": "String", "deprecated": false, "deprecationNote": "", 
"autowired": false, "secret": false, "description": "The name of the delivery 
stream.", "constantName": 
"org.apache.camel.component.aws2.firehose.KinesisFirehose2Constants#KINESIS_FIREHOSE_STREAM_NAME"
 }
+    "CamelAwsKinesisFirehoseDeliveryStreamName": { "index": 2, "kind": 
"header", "displayName": "", "group": "producer", "label": "", "required": 
false, "javaType": "String", "deprecated": false, "deprecationNote": "", 
"autowired": false, "secret": false, "description": "The name of the delivery 
stream.", "constantName": 
"org.apache.camel.component.aws2.firehose.KinesisFirehose2Constants#KINESIS_FIREHOSE_STREAM_NAME"
 },
+    "CamelAwsKinesisFirehoseDeliveryStreamArn": { "index": 3, "kind": 
"header", "displayName": "", "group": "createDeliveryStream", "label": 
"createDeliveryStream", "required": false, "javaType": "String", "deprecated": 
false, "deprecationNote": "", "autowired": false, "secret": false, 
"description": "The ARN of the delivery stream.", "constantName": 
"org.apache.camel.component.aws2.firehose.KinesisFirehose2Constants#DELIVERY_STREAM_ARN"
 },
+    "CamelAwsKinesisFirehoseFailedRecordCount": { "index": 4, "kind": 
"header", "displayName": "", "group": "sendBatchRecord", "label": 
"sendBatchRecord", "required": false, "javaType": "Integer", "deprecated": 
false, "deprecationNote": "", "autowired": false, "secret": false, 
"description": "The number of records that failed in a batch put operation.", 
"constantName": 
"org.apache.camel.component.aws2.firehose.KinesisFirehose2Constants#FAILED_RECORD_COUNT"
 },
+    "CamelAwsKinesisFirehoseEncrypted": { "index": 5, "kind": "header", 
"displayName": "", "group": "sendBatchRecord", "label": "sendBatchRecord", 
"required": false, "javaType": "Boolean", "deprecated": false, 
"deprecationNote": "", "autowired": false, "secret": false, "description": 
"Whether the batch operation was encrypted.", "constantName": 
"org.apache.camel.component.aws2.firehose.KinesisFirehose2Constants#ENCRYPTED" 
},
+    "CamelAwsKinesisFirehoseDeliveryStreamStatus": { "index": 6, "kind": 
"header", "displayName": "", "group": "describeDeliveryStream", "label": 
"describeDeliveryStream", "required": false, "javaType": "String", 
"deprecated": false, "deprecationNote": "", "autowired": false, "secret": 
false, "description": "The status of the delivery stream.", "constantName": 
"org.apache.camel.component.aws2.firehose.KinesisFirehose2Constants#DELIVERY_STREAM_STATUS"
 }
   },
   "properties": {
     "streamName": { "index": 0, "kind": "path", "displayName": "Stream Name", 
"group": "producer", "label": "", "required": true, "type": "string", 
"javaType": "java.lang.String", "deprecated": false, "deprecationNote": "", 
"autowired": false, "secret": false, "configurationClass": 
"org.apache.camel.component.aws2.firehose.KinesisFirehose2Configuration", 
"configurationField": "configuration", "description": "Name of the stream" },
diff --git 
a/components/camel-aws/camel-aws2-kinesis/src/generated/resources/META-INF/org/apache/camel/component/aws2/kinesis/aws2-kinesis.json
 
b/components/camel-aws/camel-aws2-kinesis/src/generated/resources/META-INF/org/apache/camel/component/aws2/kinesis/aws2-kinesis.json
index 770eef58d151..def81dc36333 100644
--- 
a/components/camel-aws/camel-aws2-kinesis/src/generated/resources/META-INF/org/apache/camel/component/aws2/kinesis/aws2-kinesis.json
+++ 
b/components/camel-aws/camel-aws2-kinesis/src/generated/resources/META-INF/org/apache/camel/component/aws2/kinesis/aws2-kinesis.json
@@ -67,7 +67,9 @@
     "CamelAwsKinesisPartitionKey": { "index": 2, "kind": "header", 
"displayName": "", "group": "common", "label": "", "required": false, 
"javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": 
false, "secret": false, "description": "Identifies which shard in the stream 
the data record is assigned to.", "constantName": 
"org.apache.camel.component.aws2.kinesis.Kinesis2Constants#PARTITION_KEY" },
     "CamelMessageTimestamp": { "index": 3, "kind": "header", "displayName": 
"", "group": "common", "label": "", "required": false, "javaType": "long", 
"deprecated": false, "deprecationNote": "", "autowired": false, "secret": 
false, "description": "The timestamp of the message", "constantName": 
"org.apache.camel.component.aws2.kinesis.Kinesis2Constants#MESSAGE_TIMESTAMP" },
     "CamelKinesisDbResumeAction": { "index": 4, "kind": "header", 
"displayName": "", "group": "consumer", "label": "consumer", "required": false, 
"javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": 
false, "secret": false, "description": "The resume action to execute when 
resuming.", "constantName": 
"org.apache.camel.component.aws2.kinesis.Kinesis2Constants#RESUME_ACTION" },
-    "CamelAwsKinesisShardId": { "index": 5, "kind": "header", "displayName": 
"", "group": "common", "label": "", "required": false, "javaType": "String", 
"deprecated": false, "deprecationNote": "", "autowired": false, "secret": 
false, "description": "The shard ID of the shard where the data record was 
placed.", "constantName": 
"org.apache.camel.component.aws2.kinesis.Kinesis2Constants#SHARD_ID" }
+    "CamelAwsKinesisShardId": { "index": 5, "kind": "header", "displayName": 
"", "group": "common", "label": "", "required": false, "javaType": "String", 
"deprecated": false, "deprecationNote": "", "autowired": false, "secret": 
false, "description": "The shard ID of the shard where the data record was 
placed.", "constantName": 
"org.apache.camel.component.aws2.kinesis.Kinesis2Constants#SHARD_ID" },
+    "CamelAwsKinesisFailedRecordCount": { "index": 6, "kind": "header", 
"displayName": "", "group": "producer", "label": "producer", "required": false, 
"javaType": "Integer", "deprecated": false, "deprecationNote": "", "autowired": 
false, "secret": false, "description": "The number of records that failed in a 
batch put operation.", "constantName": 
"org.apache.camel.component.aws2.kinesis.Kinesis2Constants#FAILED_RECORD_COUNT" 
},
+    "CamelAwsKinesisRecordCount": { "index": 7, "kind": "header", 
"displayName": "", "group": "producer", "label": "producer", "required": false, 
"javaType": "Integer", "deprecated": false, "deprecationNote": "", "autowired": 
false, "secret": false, "description": "The total number of records in a batch 
put operation.", "constantName": 
"org.apache.camel.component.aws2.kinesis.Kinesis2Constants#RECORD_COUNT" }
   },
   "properties": {
     "streamName": { "index": 0, "kind": "path", "displayName": "Stream Name", 
"group": "common", "label": "", "required": true, "type": "string", "javaType": 
"java.lang.String", "deprecated": false, "deprecationNote": "", "autowired": 
false, "secret": false, "configurationClass": 
"org.apache.camel.component.aws2.kinesis.Kinesis2Configuration", 
"configurationField": "configuration", "description": "Name of the stream" },
diff --git 
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Constants.java
 
b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Constants.java
index bb64a316670c..83c91c92ffc2 100644
--- 
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Constants.java
+++ 
b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Constants.java
@@ -28,4 +28,18 @@ public interface KinesisFirehose2Constants {
     String KINESIS_FIREHOSE_OPERATION = "CamelAwsKinesisFirehoseOperation";
     @Metadata(description = "The name of the delivery stream.", javaType = 
"String")
     String KINESIS_FIREHOSE_STREAM_NAME = 
"CamelAwsKinesisFirehoseDeliveryStreamName";
+
+    // Response metadata
+    @Metadata(label = "createDeliveryStream",
+              description = "The ARN of the delivery stream.", javaType = 
"String")
+    String DELIVERY_STREAM_ARN = "CamelAwsKinesisFirehoseDeliveryStreamArn";
+    @Metadata(label = "sendBatchRecord",
+              description = "The number of records that failed in a batch put 
operation.", javaType = "Integer")
+    String FAILED_RECORD_COUNT = "CamelAwsKinesisFirehoseFailedRecordCount";
+    @Metadata(label = "sendBatchRecord",
+              description = "Whether the batch operation was encrypted.", 
javaType = "Boolean")
+    String ENCRYPTED = "CamelAwsKinesisFirehoseEncrypted";
+    @Metadata(label = "describeDeliveryStream",
+              description = "The status of the delivery stream.", javaType = 
"String")
+    String DELIVERY_STREAM_STATUS = 
"CamelAwsKinesisFirehoseDeliveryStreamStatus";
 }
diff --git 
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Producer.java
 
b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Producer.java
index 04a55dd7bee8..cfd55e00706e 100644
--- 
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Producer.java
+++ 
b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Producer.java
@@ -18,19 +18,23 @@ package org.apache.camel.component.aws2.firehose;
 
 import java.nio.ByteBuffer;
 import java.util.Collection;
+import java.util.function.BiConsumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
 
 import org.apache.camel.Exchange;
+import org.apache.camel.InvalidPayloadException;
 import org.apache.camel.Message;
 import org.apache.camel.support.DefaultProducer;
 import org.apache.camel.util.ObjectHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.awscore.exception.AwsServiceException;
 import software.amazon.awssdk.core.SdkBytes;
 import software.amazon.awssdk.services.firehose.FirehoseClient;
 import 
software.amazon.awssdk.services.firehose.model.CreateDeliveryStreamRequest;
 import 
software.amazon.awssdk.services.firehose.model.CreateDeliveryStreamResponse;
 import 
software.amazon.awssdk.services.firehose.model.DeleteDeliveryStreamRequest;
-import 
software.amazon.awssdk.services.firehose.model.DeleteDeliveryStreamResponse;
 import 
software.amazon.awssdk.services.firehose.model.DescribeDeliveryStreamRequest;
 import 
software.amazon.awssdk.services.firehose.model.DescribeDeliveryStreamResponse;
 import software.amazon.awssdk.services.firehose.model.PutRecordBatchRequest;
@@ -39,7 +43,6 @@ import 
software.amazon.awssdk.services.firehose.model.PutRecordRequest;
 import software.amazon.awssdk.services.firehose.model.PutRecordResponse;
 import software.amazon.awssdk.services.firehose.model.Record;
 import software.amazon.awssdk.services.firehose.model.UpdateDestinationRequest;
-import 
software.amazon.awssdk.services.firehose.model.UpdateDestinationResponse;
 
 public class KinesisFirehose2Producer extends DefaultProducer {
 
@@ -82,89 +85,88 @@ public class KinesisFirehose2Producer extends 
DefaultProducer {
         }
     }
 
-    private void createDeliveryStream(FirehoseClient client, Exchange 
exchange) {
-        if (exchange.getIn().getBody() instanceof CreateDeliveryStreamRequest) 
{
-            CreateDeliveryStreamRequest req = 
exchange.getIn().getBody(CreateDeliveryStreamRequest.class);
-            CreateDeliveryStreamResponse result = 
client.createDeliveryStream(req);
-            Message message = getMessageForResponse(exchange);
-            message.setBody(result);
-        } else {
-            throw new IllegalArgumentException(
-                    "The createDeliveryStream operation expects a 
CreateDeliveryStream instance as body");
-        }
+    private void createDeliveryStream(FirehoseClient client, Exchange 
exchange) throws InvalidPayloadException {
+        executeOperation(
+                exchange,
+                CreateDeliveryStreamRequest.class,
+                client::createDeliveryStream,
+                null,
+                "createDeliveryStream",
+                (CreateDeliveryStreamResponse response, Message message) -> {
+                    
message.setHeader(KinesisFirehose2Constants.DELIVERY_STREAM_ARN, 
response.deliveryStreamARN());
+                });
     }
 
-    private void deleteDeliveryStream(FirehoseClient client, Exchange 
exchange) {
-        if (exchange.getIn().getBody() instanceof DeleteDeliveryStreamRequest) 
{
-            DeleteDeliveryStreamRequest req = 
exchange.getIn().getBody(DeleteDeliveryStreamRequest.class);
-            DeleteDeliveryStreamResponse result = 
client.deleteDeliveryStream(req);
-            Message message = getMessageForResponse(exchange);
-            message.setBody(result);
-        } else {
-            if 
(ObjectHelper.isNotEmpty(exchange.getIn().getHeader(KinesisFirehose2Constants.KINESIS_FIREHOSE_STREAM_NAME)))
 {
-                DeleteDeliveryStreamRequest req = 
DeleteDeliveryStreamRequest.builder()
-                        
.deliveryStreamName(exchange.getIn().getHeader(KinesisFirehose2Constants.KINESIS_FIREHOSE_STREAM_NAME,
-                                String.class))
-                        .build();
-                DeleteDeliveryStreamResponse result = 
client.deleteDeliveryStream(req);
-                Message message = getMessageForResponse(exchange);
-                message.setBody(result);
-            } else {
-                throw new IllegalArgumentException(
-                        "The deleteDeliveryStream operation expects at least 
an delivery stream name header or a DeleteDeliveryStreamRequest instance");
-            }
-        }
+    private void deleteDeliveryStream(FirehoseClient client, Exchange 
exchange) throws InvalidPayloadException {
+        executeOperation(
+                exchange,
+                DeleteDeliveryStreamRequest.class,
+                client::deleteDeliveryStream,
+                () -> {
+                    String streamName = getOptionalHeader(exchange, 
KinesisFirehose2Constants.KINESIS_FIREHOSE_STREAM_NAME,
+                            String.class);
+                    if (streamName == null) {
+                        throw new IllegalArgumentException(
+                                "The deleteDeliveryStream operation expects at 
least a delivery stream name header or a DeleteDeliveryStreamRequest instance");
+                    }
+                    return client.deleteDeliveryStream(
+                            
DeleteDeliveryStreamRequest.builder().deliveryStreamName(streamName).build());
+                },
+                "deleteDeliveryStream");
     }
 
-    private void updateDestination(FirehoseClient client, Exchange exchange) {
-        if (exchange.getIn().getBody() instanceof CreateDeliveryStreamRequest) 
{
-            UpdateDestinationRequest req = 
exchange.getIn().getBody(UpdateDestinationRequest.class);
-            UpdateDestinationResponse result = client.updateDestination(req);
-            Message message = getMessageForResponse(exchange);
-            message.setBody(result);
-        } else {
-            throw new IllegalArgumentException(
-                    "The updateDestination operation expects an 
UpdateDestinationRequest instance as body");
-        }
+    private void updateDestination(FirehoseClient client, Exchange exchange) 
throws InvalidPayloadException {
+        executeOperation(
+                exchange,
+                UpdateDestinationRequest.class,
+                client::updateDestination,
+                null,
+                "updateDestination");
     }
 
-    private void describeDeliveryStream(FirehoseClient client, Exchange 
exchange) {
-        if (exchange.getIn().getBody() instanceof 
DescribeDeliveryStreamRequest) {
-            DescribeDeliveryStreamRequest req = 
exchange.getIn().getBody(DescribeDeliveryStreamRequest.class);
-            DescribeDeliveryStreamResponse result = 
client.describeDeliveryStream(req);
-            Message message = getMessageForResponse(exchange);
-            message.setBody(result);
-        } else {
-            if 
(ObjectHelper.isNotEmpty(exchange.getIn().getHeader(KinesisFirehose2Constants.KINESIS_FIREHOSE_STREAM_NAME)))
 {
-                DescribeDeliveryStreamRequest req = 
DescribeDeliveryStreamRequest.builder()
-                        
.deliveryStreamName(exchange.getIn().getHeader(KinesisFirehose2Constants.KINESIS_FIREHOSE_STREAM_NAME,
-                                String.class))
-                        .build();
-                DescribeDeliveryStreamResponse result = 
client.describeDeliveryStream(req);
-                Message message = getMessageForResponse(exchange);
-                message.setBody(result);
-            } else {
-                throw new IllegalArgumentException(
-                        "The describeDeliveryStream operation expects at least 
an delivery stream name header or a DeleteDeliveryStreamRequest instance");
-            }
-        }
+    private void describeDeliveryStream(FirehoseClient client, Exchange 
exchange) throws InvalidPayloadException {
+        executeOperation(
+                exchange,
+                DescribeDeliveryStreamRequest.class,
+                client::describeDeliveryStream,
+                () -> {
+                    String streamName = getOptionalHeader(exchange, 
KinesisFirehose2Constants.KINESIS_FIREHOSE_STREAM_NAME,
+                            String.class);
+                    if (streamName == null) {
+                        throw new IllegalArgumentException(
+                                "The describeDeliveryStream operation expects 
at least a delivery stream name header or a DescribeDeliveryStreamRequest 
instance");
+                    }
+                    return client.describeDeliveryStream(
+                            
DescribeDeliveryStreamRequest.builder().deliveryStreamName(streamName).build());
+                },
+                "describeDeliveryStream",
+                (DescribeDeliveryStreamResponse response, Message message) -> {
+                    if (response.deliveryStreamDescription() != null) {
+                        
message.setHeader(KinesisFirehose2Constants.DELIVERY_STREAM_ARN,
+                                
response.deliveryStreamDescription().deliveryStreamARN());
+                        
message.setHeader(KinesisFirehose2Constants.DELIVERY_STREAM_STATUS,
+                                
response.deliveryStreamDescription().deliveryStreamStatusAsString());
+                    }
+                });
     }
 
+    @SuppressWarnings("unchecked")
     private void sendBatchRecord(FirehoseClient client, Exchange exchange) {
+        PutRecordBatchResponse result;
         if (exchange.getIn().getBody() instanceof Iterable) {
-            Iterable c = exchange.getIn().getBody(Iterable.class);
+            Iterable<Record> c = exchange.getIn().getBody(Iterable.class);
             PutRecordBatchRequest.Builder batchRequest = 
PutRecordBatchRequest.builder();
             
batchRequest.deliveryStreamName(getEndpoint().getConfiguration().getStreamName());
             batchRequest.records((Collection<Record>) c);
-            PutRecordBatchResponse result = 
client.putRecordBatch(batchRequest.build());
-            Message message = getMessageForResponse(exchange);
-            message.setBody(result);
+            result = client.putRecordBatch(batchRequest.build());
         } else {
             PutRecordBatchRequest req = 
exchange.getIn().getBody(PutRecordBatchRequest.class);
-            PutRecordBatchResponse result = client.putRecordBatch(req);
-            Message message = getMessageForResponse(exchange);
-            message.setBody(result);
+            result = client.putRecordBatch(req);
         }
+        Message message = getMessageForResponse(exchange);
+        message.setBody(result);
+        message.setHeader(KinesisFirehose2Constants.FAILED_RECORD_COUNT, 
result.failedPutCount());
+        message.setHeader(KinesisFirehose2Constants.ENCRYPTED, 
result.encrypted());
     }
 
     public void processSingleRecord(final Exchange exchange) {
@@ -207,4 +209,64 @@ public class KinesisFirehose2Producer extends 
DefaultProducer {
         }
         return operation;
     }
+
+    /**
+     * Executes a Firehose operation with POJO request support.
+     */
+    private <REQ, RES> void executeOperation(
+            Exchange exchange,
+            Class<REQ> requestClass,
+            Function<REQ, RES> pojoExecutor,
+            Supplier<RES> headerExecutor,
+            String operationName)
+            throws InvalidPayloadException {
+        executeOperation(exchange, requestClass, pojoExecutor, headerExecutor, 
operationName, null);
+    }
+
+    /**
+     * Executes a Firehose operation with POJO request support and optional 
response post-processing.
+     */
+    private <REQ, RES> void executeOperation(
+            Exchange exchange,
+            Class<REQ> requestClass,
+            Function<REQ, RES> pojoExecutor,
+            Supplier<RES> headerExecutor,
+            String operationName,
+            BiConsumer<RES, Message> responseProcessor)
+            throws InvalidPayloadException {
+
+        RES result;
+        Object payload = exchange.getIn().getBody();
+        if (requestClass.isInstance(payload)) {
+            try {
+                result = pojoExecutor.apply(requestClass.cast(payload));
+            } catch (AwsServiceException ase) {
+                LOG.trace("{} command returned the error code {}", 
operationName, ase.awsErrorDetails().errorCode());
+                throw ase;
+            }
+        } else if (headerExecutor != null) {
+            try {
+                result = headerExecutor.get();
+            } catch (AwsServiceException ase) {
+                LOG.trace("{} command returned the error code {}", 
operationName, ase.awsErrorDetails().errorCode());
+                throw ase;
+            }
+        } else {
+            throw new IllegalArgumentException(
+                    String.format("The %s operation expects a %s instance as 
body",
+                            operationName, requestClass.getSimpleName()));
+        }
+        Message message = getMessageForResponse(exchange);
+        message.setBody(result);
+        if (responseProcessor != null) {
+            responseProcessor.accept(result, message);
+        }
+    }
+
+    /**
+     * Gets an optional header value.
+     */
+    private <T> T getOptionalHeader(Exchange exchange, String headerName, 
Class<T> headerType) {
+        return exchange.getIn().getHeader(headerName, headerType);
+    }
 }
diff --git 
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Constants.java
 
b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Constants.java
index 78b6db63a1d3..5473cef5a58f 100644
--- 
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Constants.java
+++ 
b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Constants.java
@@ -40,4 +40,12 @@ public interface Kinesis2Constants {
      */
     @Metadata(description = "The shard ID of the shard where the data record 
was placed.", javaType = "String")
     String SHARD_ID = "CamelAwsKinesisShardId";
+
+    // Batch operation response metadata
+    @Metadata(label = "producer",
+              description = "The number of records that failed in a batch put 
operation.", javaType = "Integer")
+    String FAILED_RECORD_COUNT = "CamelAwsKinesisFailedRecordCount";
+    @Metadata(label = "producer",
+              description = "The total number of records in a batch put 
operation.", javaType = "Integer")
+    String RECORD_COUNT = "CamelAwsKinesisRecordCount";
 }
diff --git 
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Producer.java
 
b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Producer.java
index c51fe777379c..45594f657a56 100644
--- 
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Producer.java
+++ 
b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Producer.java
@@ -77,18 +77,25 @@ public class Kinesis2Producer extends DefaultProducer {
         Object partitionKey = 
exchange.getIn().getHeader(Kinesis2Constants.PARTITION_KEY);
         ensurePartitionKeyNotNull(partitionKey);
         List<List<PutRecordsRequestEntry>> requestBatchList = 
createRequestBatchList(exchange, partitionKey);
+        int totalRecordCount = 0;
+        int totalFailedCount = 0;
         for (List<PutRecordsRequestEntry> requestBatch : requestBatchList) {
             PutRecordsRequest putRecordsRequest = PutRecordsRequest.builder()
                     
.streamName(getEndpoint().getConfiguration().getStreamName())
                     .records(requestBatch)
                     .build();
             PutRecordsResponse putRecordsResponse = 
connection.getClient(getEndpoint()).putRecords(putRecordsRequest);
+            totalRecordCount += putRecordsResponse.records().size();
+            totalFailedCount += putRecordsResponse.failedRecordCount();
             if (putRecordsResponse.failedRecordCount() > 0) {
                 throw new RuntimeException(
                         "Failed to send records " + 
putRecordsResponse.failedRecordCount() + " of "
                                            + 
putRecordsResponse.records().size());
             }
         }
+        Message message = exchange.getMessage();
+        message.setHeader(Kinesis2Constants.RECORD_COUNT, totalRecordCount);
+        message.setHeader(Kinesis2Constants.FAILED_RECORD_COUNT, 
totalFailedCount);
     }
 
     private List<List<PutRecordsRequestEntry>> createRequestBatchList(Exchange 
exchange, Object partitionKey) {
diff --git 
a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/Kinesis2EndpointBuilderFactory.java
 
b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/Kinesis2EndpointBuilderFactory.java
index fe9b2528214a..d5953e1a03cc 100644
--- 
a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/Kinesis2EndpointBuilderFactory.java
+++ 
b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/Kinesis2EndpointBuilderFactory.java
@@ -2889,6 +2889,30 @@ public interface Kinesis2EndpointBuilderFactory {
         public String awsKinesisShardId() {
             return "CamelAwsKinesisShardId";
         }
+        /**
+         * The number of records that failed in a batch put operation.
+         * 
+         * The option is a: {@code Integer} type.
+         * 
+         * Group: producer
+         * 
+         * @return the name of the header {@code AwsKinesisFailedRecordCount}.
+         */
+        public String awsKinesisFailedRecordCount() {
+            return "CamelAwsKinesisFailedRecordCount";
+        }
+        /**
+         * The total number of records in a batch put operation.
+         * 
+         * The option is a: {@code Integer} type.
+         * 
+         * Group: producer
+         * 
+         * @return the name of the header {@code AwsKinesisRecordCount}.
+         */
+        public String awsKinesisRecordCount() {
+            return "CamelAwsKinesisRecordCount";
+        }
     }
     static Kinesis2EndpointBuilder endpointBuilder(String componentName, 
String path) {
         class Kinesis2EndpointBuilderImpl extends AbstractEndpointBuilder 
implements Kinesis2EndpointBuilder, AdvancedKinesis2EndpointBuilder {
diff --git 
a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KinesisFirehose2EndpointBuilderFactory.java
 
b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KinesisFirehose2EndpointBuilderFactory.java
index 84d70512b601..4484a5c880ed 100644
--- 
a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KinesisFirehose2EndpointBuilderFactory.java
+++ 
b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KinesisFirehose2EndpointBuilderFactory.java
@@ -638,6 +638,57 @@ public interface KinesisFirehose2EndpointBuilderFactory {
         public String awsKinesisFirehoseDeliveryStreamName() {
             return "CamelAwsKinesisFirehoseDeliveryStreamName";
         }
+        /**
+         * The ARN of the delivery stream.
+         * 
+         * The option is a: {@code String} type.
+         * 
+         * Group: createDeliveryStream
+         * 
+         * @return the name of the header {@code
+         * AwsKinesisFirehoseDeliveryStreamArn}.
+         */
+        public String awsKinesisFirehoseDeliveryStreamArn() {
+            return "CamelAwsKinesisFirehoseDeliveryStreamArn";
+        }
+        /**
+         * The number of records that failed in a batch put operation.
+         * 
+         * The option is a: {@code Integer} type.
+         * 
+         * Group: sendBatchRecord
+         * 
+         * @return the name of the header {@code
+         * AwsKinesisFirehoseFailedRecordCount}.
+         */
+        public String awsKinesisFirehoseFailedRecordCount() {
+            return "CamelAwsKinesisFirehoseFailedRecordCount";
+        }
+        /**
+         * Whether the batch operation was encrypted.
+         * 
+         * The option is a: {@code Boolean} type.
+         * 
+         * Group: sendBatchRecord
+         * 
+         * @return the name of the header {@code AwsKinesisFirehoseEncrypted}.
+         */
+        public String awsKinesisFirehoseEncrypted() {
+            return "CamelAwsKinesisFirehoseEncrypted";
+        }
+        /**
+         * The status of the delivery stream.
+         * 
+         * The option is a: {@code String} type.
+         * 
+         * Group: describeDeliveryStream
+         * 
+         * @return the name of the header {@code
+         * AwsKinesisFirehoseDeliveryStreamStatus}.
+         */
+        public String awsKinesisFirehoseDeliveryStreamStatus() {
+            return "CamelAwsKinesisFirehoseDeliveryStreamStatus";
+        }
     }
     static KinesisFirehose2EndpointBuilder endpointBuilder(String 
componentName, String path) {
         class KinesisFirehose2EndpointBuilderImpl extends 
AbstractEndpointBuilder implements KinesisFirehose2EndpointBuilder, 
AdvancedKinesisFirehose2EndpointBuilder {


Reply via email to