This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 2458679ebd525f5eb23a41f8c5468afe83d6e070
Author: Andrea Cosentino <anco...@gmail.com>
AuthorDate: Tue Jun 30 18:57:43 2020 +0200

    CAMEL-15192 - AWS2-Kinesis-Firehose: Support creation of deliveryStream
---
 .../aws2/firehose/KinesisFirehose2Operations.java       |  2 +-
 .../aws2/firehose/KinesisFirehose2Producer.java         | 17 +++++++++++++++++
 2 files changed, 18 insertions(+), 1 deletion(-)

diff --git 
a/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Operations.java
 
b/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Operations.java
index 3fee7ae..4703837 100644
--- 
a/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Operations.java
+++ 
b/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Operations.java
@@ -18,5 +18,5 @@ package org.apache.camel.component.aws2.firehose;
 
 public enum KinesisFirehose2Operations {
 
-    sendBatchRecord
+    sendBatchRecord, createDeliveryStream
 }
diff --git 
a/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Producer.java
 
b/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Producer.java
index 6792bdb..8874daa 100644
--- 
a/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Producer.java
+++ 
b/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Producer.java
@@ -27,6 +27,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import software.amazon.awssdk.core.SdkBytes;
 import software.amazon.awssdk.services.firehose.FirehoseClient;
+import 
software.amazon.awssdk.services.firehose.model.CreateDeliveryStreamRequest;
+import 
software.amazon.awssdk.services.firehose.model.CreateDeliveryStreamResponse;
 import software.amazon.awssdk.services.firehose.model.PutRecordBatchRequest;
 import software.amazon.awssdk.services.firehose.model.PutRecordBatchResponse;
 import software.amazon.awssdk.services.firehose.model.PutRecordRequest;
@@ -56,12 +58,27 @@ public class KinesisFirehose2Producer extends 
DefaultProducer {
                 case sendBatchRecord:
                     sendBatchRecord(getClient(), exchange);
                     break;
+                case createDeliveryStream:
+                    createDeliveryStream(getClient(), exchange);
+                    break;
                 default:
                     throw new IllegalArgumentException("Unsupported 
operation");
             }
         }
     }
 
+    private void createDeliveryStream(FirehoseClient client, Exchange 
exchange) {
+        if (exchange.getIn().getBody() instanceof CreateDeliveryStreamRequest) 
{
+            CreateDeliveryStreamRequest req = 
exchange.getIn().getBody(CreateDeliveryStreamRequest.class);
+            CreateDeliveryStreamResponse result = 
client.createDeliveryStream(req);
+            Message message = getMessageForResponse(exchange);
+            message.setBody(result);
+        } else {
+            throw new IllegalArgumentException("The createDeliveryStream 
operation expects a CreateDeliveryStream instance as body");
+        }
+        
+    }
+
     private void sendBatchRecord(FirehoseClient client, Exchange exchange) {
         if (exchange.getIn().getBody() instanceof Iterable) {
             Iterable c = exchange.getIn().getBody(Iterable.class);

Reply via email to