This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit 0a2f6983433aa17ad97bbe8710e7321a99d4ef88 Author: Andrea Cosentino <anco...@gmail.com> AuthorDate: Wed Jul 1 08:30:43 2020 +0200 CAMEL-15264 - Camel-AWS2-Kinesis: Add more operations support --- .../aws2/firehose/KinesisFirehose2Constants.java | 1 + .../aws2/firehose/KinesisFirehose2Operations.java | 2 +- .../aws2/firehose/KinesisFirehose2Producer.java | 26 ++++++++++++++++++++-- 3 files changed, 26 insertions(+), 3 deletions(-) diff --git a/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Constants.java b/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Constants.java index da68897..6261e67 100644 --- a/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Constants.java +++ b/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Constants.java @@ -20,4 +20,5 @@ public interface KinesisFirehose2Constants { String RECORD_ID = "CamelAwsKinesisFirehoseRecordId"; String KINESIS_FIREHOSE_OPERATION = "CamelAwsKinesisFirehoseOperation"; + String KINESIS_FIREHOSE_STREAM_NAME = "CamelAwsKinesisFirehoseDeliveryStreamName"; } diff --git a/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Operations.java b/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Operations.java index 4703837..93ec644 100644 --- a/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Operations.java +++ b/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Operations.java @@ -18,5 +18,5 @@ package org.apache.camel.component.aws2.firehose; public enum KinesisFirehose2Operations { - sendBatchRecord, createDeliveryStream + sendBatchRecord, createDeliveryStream, deleteDeliveryStream } diff --git a/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Producer.java b/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Producer.java index 8874daa..95aff68 100644 --- a/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Producer.java +++ b/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Producer.java @@ -29,6 +29,8 @@ import software.amazon.awssdk.core.SdkBytes; import software.amazon.awssdk.services.firehose.FirehoseClient; import software.amazon.awssdk.services.firehose.model.CreateDeliveryStreamRequest; import software.amazon.awssdk.services.firehose.model.CreateDeliveryStreamResponse; +import software.amazon.awssdk.services.firehose.model.DeleteDeliveryStreamRequest; +import software.amazon.awssdk.services.firehose.model.DeleteDeliveryStreamResponse; import software.amazon.awssdk.services.firehose.model.PutRecordBatchRequest; import software.amazon.awssdk.services.firehose.model.PutRecordBatchResponse; import software.amazon.awssdk.services.firehose.model.PutRecordRequest; @@ -61,6 +63,9 @@ public class KinesisFirehose2Producer extends DefaultProducer { case createDeliveryStream: createDeliveryStream(getClient(), exchange); break; + case deleteDeliveryStream: + deleteDeliveryStream(getClient(), exchange); + break; default: throw new IllegalArgumentException("Unsupported operation"); } @@ -75,8 +80,25 @@ public class KinesisFirehose2Producer extends DefaultProducer { message.setBody(result); } else { throw new IllegalArgumentException("The createDeliveryStream operation expects a CreateDeliveryStream instance as body"); - } - + } + } + + private void deleteDeliveryStream(FirehoseClient client, Exchange exchange) { + if (exchange.getIn().getBody() instanceof DeleteDeliveryStreamRequest) { + DeleteDeliveryStreamRequest req = exchange.getIn().getBody(DeleteDeliveryStreamRequest.class); + DeleteDeliveryStreamResponse result = client.deleteDeliveryStream(req); + Message message = getMessageForResponse(exchange); + message.setBody(result); + } else { + if (ObjectHelper.isNotEmpty(exchange.getIn().getHeader(KinesisFirehose2Constants.KINESIS_FIREHOSE_STREAM_NAME))) { + DeleteDeliveryStreamRequest req = DeleteDeliveryStreamRequest.builder().deliveryStreamName(exchange.getIn().getHeader(KinesisFirehose2Constants.KINESIS_FIREHOSE_STREAM_NAME, String.class)).build(); + DeleteDeliveryStreamResponse result = client.deleteDeliveryStream(req); + Message message = getMessageForResponse(exchange); + message.setBody(result); + } else { + throw new IllegalArgumentException("The deleteDeliveryStream operation expects at least an delivery stream name header or a DeleteDeliveryStreamRequest instance"); + } + } } private void sendBatchRecord(FirehoseClient client, Exchange exchange) {