This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 0a2f6983433aa17ad97bbe8710e7321a99d4ef88
Author: Andrea Cosentino <anco...@gmail.com>
AuthorDate: Wed Jul 1 08:30:43 2020 +0200

    CAMEL-15264 - Camel-AWS2-Kinesis: Add more operations support
---
 .../aws2/firehose/KinesisFirehose2Constants.java   |  1 +
 .../aws2/firehose/KinesisFirehose2Operations.java  |  2 +-
 .../aws2/firehose/KinesisFirehose2Producer.java    | 26 ++++++++++++++++++++--
 3 files changed, 26 insertions(+), 3 deletions(-)

diff --git 
a/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Constants.java
 
b/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Constants.java
index da68897..6261e67 100644
--- 
a/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Constants.java
+++ 
b/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Constants.java
@@ -20,4 +20,5 @@ public interface KinesisFirehose2Constants {
 
     String RECORD_ID = "CamelAwsKinesisFirehoseRecordId";
     String KINESIS_FIREHOSE_OPERATION = "CamelAwsKinesisFirehoseOperation";
+    String KINESIS_FIREHOSE_STREAM_NAME = 
"CamelAwsKinesisFirehoseDeliveryStreamName";
 }
diff --git 
a/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Operations.java
 
b/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Operations.java
index 4703837..93ec644 100644
--- 
a/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Operations.java
+++ 
b/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Operations.java
@@ -18,5 +18,5 @@ package org.apache.camel.component.aws2.firehose;
 
 public enum KinesisFirehose2Operations {
 
-    sendBatchRecord, createDeliveryStream
+    sendBatchRecord, createDeliveryStream, deleteDeliveryStream
 }
diff --git 
a/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Producer.java
 
b/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Producer.java
index 8874daa..95aff68 100644
--- 
a/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Producer.java
+++ 
b/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Producer.java
@@ -29,6 +29,8 @@ import software.amazon.awssdk.core.SdkBytes;
 import software.amazon.awssdk.services.firehose.FirehoseClient;
 import 
software.amazon.awssdk.services.firehose.model.CreateDeliveryStreamRequest;
 import 
software.amazon.awssdk.services.firehose.model.CreateDeliveryStreamResponse;
+import 
software.amazon.awssdk.services.firehose.model.DeleteDeliveryStreamRequest;
+import 
software.amazon.awssdk.services.firehose.model.DeleteDeliveryStreamResponse;
 import software.amazon.awssdk.services.firehose.model.PutRecordBatchRequest;
 import software.amazon.awssdk.services.firehose.model.PutRecordBatchResponse;
 import software.amazon.awssdk.services.firehose.model.PutRecordRequest;
@@ -61,6 +63,9 @@ public class KinesisFirehose2Producer extends DefaultProducer 
{
                 case createDeliveryStream:
                     createDeliveryStream(getClient(), exchange);
                     break;
+                case deleteDeliveryStream:
+                    deleteDeliveryStream(getClient(), exchange);
+                    break;
                 default:
                     throw new IllegalArgumentException("Unsupported 
operation");
             }
@@ -75,8 +80,25 @@ public class KinesisFirehose2Producer extends 
DefaultProducer {
             message.setBody(result);
         } else {
             throw new IllegalArgumentException("The createDeliveryStream 
operation expects a CreateDeliveryStream instance as body");
-        }
-        
+        }    
+    }
+    
+    private void deleteDeliveryStream(FirehoseClient client, Exchange 
exchange) {
+        if (exchange.getIn().getBody() instanceof DeleteDeliveryStreamRequest) 
{
+            DeleteDeliveryStreamRequest req = 
exchange.getIn().getBody(DeleteDeliveryStreamRequest.class);
+            DeleteDeliveryStreamResponse result = 
client.deleteDeliveryStream(req);
+            Message message = getMessageForResponse(exchange);
+            message.setBody(result);
+        } else {
+            if 
(ObjectHelper.isNotEmpty(exchange.getIn().getHeader(KinesisFirehose2Constants.KINESIS_FIREHOSE_STREAM_NAME)))
 {
+                DeleteDeliveryStreamRequest req = 
DeleteDeliveryStreamRequest.builder().deliveryStreamName(exchange.getIn().getHeader(KinesisFirehose2Constants.KINESIS_FIREHOSE_STREAM_NAME,
 String.class)).build();
+                DeleteDeliveryStreamResponse result = 
client.deleteDeliveryStream(req);
+                Message message = getMessageForResponse(exchange);
+                message.setBody(result);
+            } else {
+                throw new IllegalArgumentException("The deleteDeliveryStream 
operation expects at least an delivery stream name header or a 
DeleteDeliveryStreamRequest instance");
+            }
+        }    
     }
 
     private void sendBatchRecord(FirehoseClient client, Exchange exchange) {

Reply via email to