This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit bc913593e200712d259d7ae4e018df7bbf51a427 Author: Andrea Cosentino <anco...@gmail.com> AuthorDate: Tue Sep 1 12:19:48 2020 +0200 CAMEL-15471 - Kinesis-Firehose: Add more operation to producer side --- .../aws2/firehose/KinesisFirehose2Operations.java | 1 + .../aws2/firehose/KinesisFirehose2Producer.java | 28 ++++++++++++++++++++++ 2 files changed, 29 insertions(+) diff --git a/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Operations.java b/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Operations.java index 365f8a6..01aa840 100644 --- a/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Operations.java +++ b/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Operations.java @@ -21,5 +21,6 @@ public enum KinesisFirehose2Operations { sendBatchRecord, createDeliveryStream, deleteDeliveryStream, + describeDeliveryStream, updateDestination } diff --git a/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Producer.java b/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Producer.java index 8b3b13c..5ab81a5 100644 --- a/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Producer.java +++ b/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Producer.java @@ -31,6 +31,8 @@ import software.amazon.awssdk.services.firehose.model.CreateDeliveryStreamReques import software.amazon.awssdk.services.firehose.model.CreateDeliveryStreamResponse; import software.amazon.awssdk.services.firehose.model.DeleteDeliveryStreamRequest; import software.amazon.awssdk.services.firehose.model.DeleteDeliveryStreamResponse; +import software.amazon.awssdk.services.firehose.model.DescribeDeliveryStreamRequest; +import software.amazon.awssdk.services.firehose.model.DescribeDeliveryStreamResponse; import software.amazon.awssdk.services.firehose.model.PutRecordBatchRequest; import software.amazon.awssdk.services.firehose.model.PutRecordBatchResponse; import software.amazon.awssdk.services.firehose.model.PutRecordRequest; @@ -71,6 +73,9 @@ public class KinesisFirehose2Producer extends DefaultProducer { case updateDestination: updateDestination(getClient(), exchange); break; + case describeDeliveryStream: + describeDeliveryStream(getClient(), exchange); + break; default: throw new IllegalArgumentException("Unsupported operation"); } @@ -123,6 +128,29 @@ public class KinesisFirehose2Producer extends DefaultProducer { "The updateDestination operation expects an UpdateDestinationRequest instance as body"); } } + + private void describeDeliveryStream(FirehoseClient client, Exchange exchange) { + if (exchange.getIn().getBody() instanceof DescribeDeliveryStreamRequest) { + DescribeDeliveryStreamRequest req = exchange.getIn().getBody(DescribeDeliveryStreamRequest.class); + DescribeDeliveryStreamResponse result = client.describeDeliveryStream(req); + Message message = getMessageForResponse(exchange); + message.setBody(result); + } else { + if (ObjectHelper.isNotEmpty(exchange.getIn().getHeader(KinesisFirehose2Constants.KINESIS_FIREHOSE_STREAM_NAME))) { + DescribeDeliveryStreamRequest req + = DescribeDeliveryStreamRequest.builder() + .deliveryStreamName(exchange.getIn() + .getHeader(KinesisFirehose2Constants.KINESIS_FIREHOSE_STREAM_NAME, String.class)) + .build(); + DescribeDeliveryStreamResponse result = client.describeDeliveryStream(req); + Message message = getMessageForResponse(exchange); + message.setBody(result); + } else { + throw new IllegalArgumentException( + "The describeDeliveryStream operation expects at least an delivery stream name header or a DeleteDeliveryStreamRequest instance"); + } + } + } private void sendBatchRecord(FirehoseClient client, Exchange exchange) { if (exchange.getIn().getBody() instanceof Iterable) {