This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit 2458679ebd525f5eb23a41f8c5468afe83d6e070 Author: Andrea Cosentino <anco...@gmail.com> AuthorDate: Tue Jun 30 18:57:43 2020 +0200 CAMEL-15192 - AWS2-Kinesis-Firehose: Support creation of deliveryStream --- .../aws2/firehose/KinesisFirehose2Operations.java | 2 +- .../aws2/firehose/KinesisFirehose2Producer.java | 17 +++++++++++++++++ 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Operations.java b/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Operations.java index 3fee7ae..4703837 100644 --- a/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Operations.java +++ b/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Operations.java @@ -18,5 +18,5 @@ package org.apache.camel.component.aws2.firehose; public enum KinesisFirehose2Operations { - sendBatchRecord + sendBatchRecord, createDeliveryStream } diff --git a/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Producer.java b/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Producer.java index 6792bdb..8874daa 100644 --- a/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Producer.java +++ b/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Producer.java @@ -27,6 +27,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.core.SdkBytes; import software.amazon.awssdk.services.firehose.FirehoseClient; +import software.amazon.awssdk.services.firehose.model.CreateDeliveryStreamRequest; +import software.amazon.awssdk.services.firehose.model.CreateDeliveryStreamResponse; import software.amazon.awssdk.services.firehose.model.PutRecordBatchRequest; import software.amazon.awssdk.services.firehose.model.PutRecordBatchResponse; import software.amazon.awssdk.services.firehose.model.PutRecordRequest; @@ -56,12 +58,27 @@ public class KinesisFirehose2Producer extends DefaultProducer { case sendBatchRecord: sendBatchRecord(getClient(), exchange); break; + case createDeliveryStream: + createDeliveryStream(getClient(), exchange); + break; default: throw new IllegalArgumentException("Unsupported operation"); } } } + private void createDeliveryStream(FirehoseClient client, Exchange exchange) { + if (exchange.getIn().getBody() instanceof CreateDeliveryStreamRequest) { + CreateDeliveryStreamRequest req = exchange.getIn().getBody(CreateDeliveryStreamRequest.class); + CreateDeliveryStreamResponse result = client.createDeliveryStream(req); + Message message = getMessageForResponse(exchange); + message.setBody(result); + } else { + throw new IllegalArgumentException("The createDeliveryStream operation expects a CreateDeliveryStream instance as body"); + } + + } + private void sendBatchRecord(FirehoseClient client, Exchange exchange) { if (exchange.getIn().getBody() instanceof Iterable) { Iterable c = exchange.getIn().getBody(Iterable.class);