This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit aa596fd90927ca0af731fcf983b6ac1cad8d096c Author: Andrea Cosentino <anco...@gmail.com> AuthorDate: Tue Sep 1 13:07:15 2020 +0200 Fixed CS --- .../aws2/firehose/KinesisFirehose2Operations.java | 6 +- .../aws2/firehose/KinesisFirehose2Producer.java | 71 +++++++++------------- 2 files changed, 31 insertions(+), 46 deletions(-) diff --git a/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Operations.java b/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Operations.java index 01aa840..bfd27a3 100644 --- a/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Operations.java +++ b/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Operations.java @@ -18,9 +18,5 @@ package org.apache.camel.component.aws2.firehose; public enum KinesisFirehose2Operations { - sendBatchRecord, - createDeliveryStream, - deleteDeliveryStream, - describeDeliveryStream, - updateDestination + sendBatchRecord, createDeliveryStream, deleteDeliveryStream, describeDeliveryStream, updateDestination } diff --git a/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Producer.java b/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Producer.java index 5ab81a5..9e3fa42 100644 --- a/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Producer.java +++ b/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Producer.java @@ -51,7 +51,7 @@ public class KinesisFirehose2Producer extends DefaultProducer { @Override public KinesisFirehose2Endpoint getEndpoint() { - return (KinesisFirehose2Endpoint) super.getEndpoint(); + return (KinesisFirehose2Endpoint)super.getEndpoint(); } @Override @@ -61,23 +61,23 @@ public class KinesisFirehose2Producer extends DefaultProducer { processSingleRecord(exchange); } else { switch (operation) { - case sendBatchRecord: - sendBatchRecord(getClient(), exchange); - break; - case createDeliveryStream: - createDeliveryStream(getClient(), exchange); - break; - case deleteDeliveryStream: - deleteDeliveryStream(getClient(), exchange); - break; - case updateDestination: - updateDestination(getClient(), exchange); - break; - case describeDeliveryStream: - describeDeliveryStream(getClient(), exchange); - break; - default: - throw new IllegalArgumentException("Unsupported operation"); + case sendBatchRecord: + sendBatchRecord(getClient(), exchange); + break; + case createDeliveryStream: + createDeliveryStream(getClient(), exchange); + break; + case deleteDeliveryStream: + deleteDeliveryStream(getClient(), exchange); + break; + case updateDestination: + updateDestination(getClient(), exchange); + break; + case describeDeliveryStream: + describeDeliveryStream(getClient(), exchange); + break; + default: + throw new IllegalArgumentException("Unsupported operation"); } } } @@ -89,8 +89,7 @@ public class KinesisFirehose2Producer extends DefaultProducer { Message message = getMessageForResponse(exchange); message.setBody(result); } else { - throw new IllegalArgumentException( - "The createDeliveryStream operation expects a CreateDeliveryStream instance as body"); + throw new IllegalArgumentException("The createDeliveryStream operation expects a CreateDeliveryStream instance as body"); } } @@ -102,17 +101,13 @@ public class KinesisFirehose2Producer extends DefaultProducer { message.setBody(result); } else { if (ObjectHelper.isNotEmpty(exchange.getIn().getHeader(KinesisFirehose2Constants.KINESIS_FIREHOSE_STREAM_NAME))) { - DeleteDeliveryStreamRequest req - = DeleteDeliveryStreamRequest.builder() - .deliveryStreamName(exchange.getIn() - .getHeader(KinesisFirehose2Constants.KINESIS_FIREHOSE_STREAM_NAME, String.class)) - .build(); + DeleteDeliveryStreamRequest req = DeleteDeliveryStreamRequest.builder() + .deliveryStreamName(exchange.getIn().getHeader(KinesisFirehose2Constants.KINESIS_FIREHOSE_STREAM_NAME, String.class)).build(); DeleteDeliveryStreamResponse result = client.deleteDeliveryStream(req); Message message = getMessageForResponse(exchange); message.setBody(result); } else { - throw new IllegalArgumentException( - "The deleteDeliveryStream operation expects at least an delivery stream name header or a DeleteDeliveryStreamRequest instance"); + throw new IllegalArgumentException("The deleteDeliveryStream operation expects at least an delivery stream name header or a DeleteDeliveryStreamRequest instance"); } } } @@ -124,30 +119,25 @@ public class KinesisFirehose2Producer extends DefaultProducer { Message message = getMessageForResponse(exchange); message.setBody(result); } else { - throw new IllegalArgumentException( - "The updateDestination operation expects an UpdateDestinationRequest instance as body"); + throw new IllegalArgumentException("The updateDestination operation expects an UpdateDestinationRequest instance as body"); } } - + private void describeDeliveryStream(FirehoseClient client, Exchange exchange) { if (exchange.getIn().getBody() instanceof DescribeDeliveryStreamRequest) { - DescribeDeliveryStreamRequest req = exchange.getIn().getBody(DescribeDeliveryStreamRequest.class); + DescribeDeliveryStreamRequest req = exchange.getIn().getBody(DescribeDeliveryStreamRequest.class); DescribeDeliveryStreamResponse result = client.describeDeliveryStream(req); Message message = getMessageForResponse(exchange); message.setBody(result); } else { if (ObjectHelper.isNotEmpty(exchange.getIn().getHeader(KinesisFirehose2Constants.KINESIS_FIREHOSE_STREAM_NAME))) { - DescribeDeliveryStreamRequest req - = DescribeDeliveryStreamRequest.builder() - .deliveryStreamName(exchange.getIn() - .getHeader(KinesisFirehose2Constants.KINESIS_FIREHOSE_STREAM_NAME, String.class)) - .build(); + DescribeDeliveryStreamRequest req = DescribeDeliveryStreamRequest.builder() + .deliveryStreamName(exchange.getIn().getHeader(KinesisFirehose2Constants.KINESIS_FIREHOSE_STREAM_NAME, String.class)).build(); DescribeDeliveryStreamResponse result = client.describeDeliveryStream(req); Message message = getMessageForResponse(exchange); message.setBody(result); } else { - throw new IllegalArgumentException( - "The describeDeliveryStream operation expects at least an delivery stream name header or a DeleteDeliveryStreamRequest instance"); + throw new IllegalArgumentException("The describeDeliveryStream operation expects at least an delivery stream name header or a DeleteDeliveryStreamRequest instance"); } } } @@ -157,7 +147,7 @@ public class KinesisFirehose2Producer extends DefaultProducer { Iterable c = exchange.getIn().getBody(Iterable.class); PutRecordBatchRequest.Builder batchRequest = PutRecordBatchRequest.builder(); batchRequest.deliveryStreamName(getEndpoint().getConfiguration().getStreamName()); - batchRequest.records((Collection<Record>) c); + batchRequest.records((Collection<Record>)c); PutRecordBatchResponse result = client.putRecordBatch(batchRequest.build()); Message message = getMessageForResponse(exchange); message.setBody(result); @@ -202,8 +192,7 @@ public class KinesisFirehose2Producer extends DefaultProducer { } private KinesisFirehose2Operations determineOperation(Exchange exchange) { - KinesisFirehose2Operations operation = exchange.getIn().getHeader(KinesisFirehose2Constants.KINESIS_FIREHOSE_OPERATION, - KinesisFirehose2Operations.class); + KinesisFirehose2Operations operation = exchange.getIn().getHeader(KinesisFirehose2Constants.KINESIS_FIREHOSE_OPERATION, KinesisFirehose2Operations.class); if (operation == null) { operation = getConfiguration().getOperation(); }