Fixed CS. Fixes #708. Fixes #706. Fixes #705.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/92f6c9b9 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/92f6c9b9 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/92f6c9b9 Branch: refs/heads/master Commit: 92f6c9b93dfde4ae3ffe3182c082db29b4c03e71 Parents: 72370df Author: Claus Ibsen <davscl...@apache.org> Authored: Sat Dec 5 10:35:53 2015 +0100 Committer: Claus Ibsen <davscl...@apache.org> Committed: Sat Dec 5 10:35:53 2015 +0100 ---------------------------------------------------------------------- .../component/aws/kinesis/KinesisComponent.java | 1 + .../component/aws/kinesis/KinesisConstants.java | 9 +-- .../component/aws/kinesis/KinesisConsumer.java | 25 +++---- .../component/aws/kinesis/KinesisEndpoint.java | 11 ++- .../aws/kinesis/RecordStringConverter.java | 29 ++++---- .../services/org/apache/camel/TypeConverter | 2 +- .../aws/kinesis/KinesisConsumerTest.java | 70 +++++++++++--------- .../aws/kinesis/KinesisEndpointTest.java | 16 +++-- .../aws/kinesis/RecordStringConverterTest.java | 11 +-- 9 files changed, 91 insertions(+), 83 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/92f6c9b9/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisComponent.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisComponent.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisComponent.java index d3f34ab..9740600 100644 --- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisComponent.java +++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisComponent.java @@ -17,6 +17,7 @@ package org.apache.camel.component.aws.kinesis; import java.util.Map; + import org.apache.camel.CamelContext; import org.apache.camel.Endpoint; import org.apache.camel.impl.UriEndpointComponent; http://git-wip-us.apache.org/repos/asf/camel/blob/92f6c9b9/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisConstants.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisConstants.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisConstants.java index b028123..22da493 100644 --- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisConstants.java +++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisConstants.java @@ -18,12 +18,13 @@ package org.apache.camel.component.aws.kinesis; public interface KinesisConstants { - public static final String SEQUENCE_NUMBER = "KinesisSequenceNumber"; - public static final String APPROX_ARRIVAL_TIME = "KinesisApproximateArrivalTimestamp"; - public static final String PARTITION_KEY = "KinesisPartitionKey"; + String SEQUENCE_NUMBER = "CamelAwsKinesisSequenceNumber"; + String APPROX_ARRIVAL_TIME = "CamelAwsKinesisApproximateArrivalTimestamp"; + String PARTITION_KEY = "CamelAwsKinesisPartitionKey"; + /** * in a Kinesis Record object, the shard ID is obtained from the getPartitionKey method. */ - public static final String SHARD_ID = "KinesisPartitionKey"; + String SHARD_ID = "CamelAwsKinesisPartitionKey"; } http://git-wip-us.apache.org/repos/asf/camel/blob/92f6c9b9/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisConsumer.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisConsumer.java index b301f38..def7e63 100644 --- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisConsumer.java +++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisConsumer.java @@ -16,6 +16,10 @@ */ package org.apache.camel.component.aws.kinesis; +import java.util.ArrayDeque; +import java.util.List; +import java.util.Queue; + import com.amazonaws.services.kinesis.AmazonKinesis; import com.amazonaws.services.kinesis.model.DescribeStreamRequest; import com.amazonaws.services.kinesis.model.DescribeStreamResult; @@ -24,9 +28,6 @@ import com.amazonaws.services.kinesis.model.GetRecordsResult; import com.amazonaws.services.kinesis.model.GetShardIteratorRequest; import com.amazonaws.services.kinesis.model.GetShardIteratorResult; import com.amazonaws.services.kinesis.model.Record; -import java.util.ArrayDeque; -import java.util.List; -import java.util.Queue; import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; import org.apache.camel.Processor; @@ -39,21 +40,17 @@ import org.slf4j.LoggerFactory; public class KinesisConsumer extends ScheduledBatchPollingConsumer { private static final Logger LOG = LoggerFactory.getLogger(KinesisConsumer.class); - private String currentShardIterator = null; + private String currentShardIterator; public KinesisConsumer(KinesisEndpoint endpoint, Processor processor) { super(endpoint, processor); } - /* - * Returns the number of messages polled. - */ @Override protected int poll() throws Exception { GetRecordsRequest req = new GetRecordsRequest() .withShardIterator(getShardItertor()) - .withLimit(getEndpoint().getMaxResultsPerRequest()) - ; + .withLimit(getEndpoint().getMaxResultsPerRequest()); GetRecordsResult result = getClient().getRecords(req); Queue<Exchange> exchanges = createExchanges(result.getRecords()); @@ -61,8 +58,8 @@ public class KinesisConsumer extends ScheduledBatchPollingConsumer { // May cache the last successful sequence number, and pass it to the // getRecords request. That way, on the next poll, we start from where - // we left off, however, I don't know what happens to subsiquent - // exchanges when an earlier echange fails. + // we left off, however, I don't know what happens to subsequent + // exchanges when an earlier echangee fails. currentShardIterator = result.getNextShardIterator(); @@ -100,15 +97,13 @@ public class KinesisConsumer extends ScheduledBatchPollingConsumer { // either return a cached one or get a new one via a GetShardIterator request. if (currentShardIterator == null) { DescribeStreamRequest req1 = new DescribeStreamRequest() - .withStreamName(getEndpoint().getStreamName()) - ; + .withStreamName(getEndpoint().getStreamName()); DescribeStreamResult res1 = getClient().describeStream(req1); GetShardIteratorRequest req = new GetShardIteratorRequest() .withStreamName(getEndpoint().getStreamName()) .withShardId(res1.getStreamDescription().getShards().get(0).getShardId()) // XXX only uses the first shard - .withShardIteratorType(getEndpoint().getIteratorType()) - ; + .withShardIteratorType(getEndpoint().getIteratorType()); GetShardIteratorResult result = getClient().getShardIterator(req); currentShardIterator = result.getShardIterator(); } http://git-wip-us.apache.org/repos/asf/camel/blob/92f6c9b9/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisEndpoint.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisEndpoint.java index b4c7597..e34da43 100644 --- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisEndpoint.java +++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisEndpoint.java @@ -29,10 +29,10 @@ import org.apache.camel.spi.UriEndpoint; import org.apache.camel.spi.UriParam; import org.apache.camel.spi.UriPath; -@UriEndpoint(scheme = "aws-kinesis", title = "AWS Kinesis", syntax = "aws-kinesis:streamName", consumerClass = KinesisConsumer.class, label = "cloud,messaging") +@UriEndpoint(scheme = "aws-kinesis", title = "AWS Kinesis", syntax = "aws-kinesis:streamName", consumerOnly = true, consumerClass = KinesisConsumer.class, label = "cloud,messaging") public class KinesisEndpoint extends ScheduledPollEndpoint { - @UriPath(label = "consumer,producer", description = "Name of the stream") + @UriPath(label = "consumer", description = "Name of the stream") @Metadata(required = "true") private String streamName; @@ -41,7 +41,7 @@ public class KinesisEndpoint extends ScheduledPollEndpoint { @Metadata(required = "true") private AmazonKinesis amazonKinesisClient; - @UriParam(label = "consumer", description = "Maximum number of records that will be fetched in each poll") + @UriParam(label = "consumer", description = "Maximum number of records that will be fetched in each poll", defaultValue = "1") private int maxResultsPerRequest = 1; @UriParam(label = "consumer", description = "Defines where in the Kinesis stream to start getting records") @@ -74,7 +74,6 @@ public class KinesisEndpoint extends ScheduledPollEndpoint { @Override public boolean isSingleton() { - // probably right. return true; } @@ -117,7 +116,7 @@ public class KinesisEndpoint extends ScheduledPollEndpoint { @Override public String toString() { - return "KinesisEndpoint{" + "amazonKinesisClient=[redacted], maxResultsPerRequest=" + maxResultsPerRequest + ", iteratorType=" + iteratorType + ", streamName=" + streamName + '}'; + return "KinesisEndpoint{amazonKinesisClient=[redacted], maxResultsPerRequest=" + maxResultsPerRequest + ", iteratorType=" + iteratorType + ", streamName=" + streamName + '}'; } - + } http://git-wip-us.apache.org/repos/asf/camel/blob/92f6c9b9/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/RecordStringConverter.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/RecordStringConverter.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/RecordStringConverter.java index bda8983..19b8590 100644 --- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/RecordStringConverter.java +++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/RecordStringConverter.java @@ -16,28 +16,31 @@ */ package org.apache.camel.component.aws.kinesis; -import com.amazonaws.services.kinesis.model.Record; import java.nio.ByteBuffer; import java.nio.charset.Charset; -import java.util.ArrayList; -import java.util.List; + +import com.amazonaws.services.kinesis.model.Record; import org.apache.camel.Converter; @Converter -public class RecordStringConverter { +public final class RecordStringConverter { + + private RecordStringConverter() { + } @Converter public static String toString(Record record) { - List<Byte> bytes = new ArrayList<>(); - ByteBuffer buf = record.getData().asReadOnlyBuffer(); - while (buf.hasRemaining()) { - bytes.add(buf.get()); - } - byte[] a = new byte[bytes.size()]; - for (int i = 0; i < bytes.size(); ++i) { - a[i] = bytes.get(i); + Charset charset = Charset.forName("UTF-8"); + + ByteBuffer buffer = record.getData(); + if (buffer.hasArray()) { + byte[] bytes = record.getData().array(); + return new String(bytes, charset); + } else { + byte[] bytes = new byte[buffer.remaining()]; + buffer.get(bytes); + return new String(bytes, charset); } - return new String(a, Charset.forName("UTF-8")); } } http://git-wip-us.apache.org/repos/asf/camel/blob/92f6c9b9/components/camel-aws/src/main/resources/META-INF/services/org/apache/camel/TypeConverter ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/main/resources/META-INF/services/org/apache/camel/TypeConverter b/components/camel-aws/src/main/resources/META-INF/services/org/apache/camel/TypeConverter index 28707d6..4873a46 100644 --- a/components/camel-aws/src/main/resources/META-INF/services/org/apache/camel/TypeConverter +++ b/components/camel-aws/src/main/resources/META-INF/services/org/apache/camel/TypeConverter @@ -15,4 +15,4 @@ ## limitations under the License. ## --------------------------------------------------------------------------- -org.apache.camel.component.aws.kinesis +org.apache.camel.component.aws.kinesis.RecordStringConverter http://git-wip-us.apache.org/repos/asf/camel/blob/92f6c9b9/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisConsumerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisConsumerTest.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisConsumerTest.java index db0df68..3f31986 100644 --- a/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisConsumerTest.java +++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisConsumerTest.java @@ -16,6 +16,8 @@ */ package org.apache.camel.component.aws.kinesis; +import java.util.Date; + import com.amazonaws.services.kinesis.AmazonKinesis; import com.amazonaws.services.kinesis.model.DescribeStreamRequest; import com.amazonaws.services.kinesis.model.DescribeStreamResult; @@ -27,27 +29,32 @@ import com.amazonaws.services.kinesis.model.Record; import com.amazonaws.services.kinesis.model.Shard; import com.amazonaws.services.kinesis.model.ShardIteratorType; import com.amazonaws.services.kinesis.model.StreamDescription; -import java.util.Date; import org.apache.camel.AsyncCallback; import org.apache.camel.AsyncProcessor; import org.apache.camel.CamelContext; import org.apache.camel.Exchange; import org.apache.camel.impl.DefaultCamelContext; -import static org.hamcrest.CoreMatchers.is; -import org.junit.Test; -import static org.junit.Assert.*; import org.junit.Before; +import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.ArgumentCaptor; import org.mockito.Mock; -import static org.mockito.Mockito.*; import org.mockito.runners.MockitoJUnitRunner; +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + @RunWith(MockitoJUnitRunner.class) public class KinesisConsumerTest { - @Mock private AmazonKinesis kinesisClient; - @Mock private AsyncProcessor processor; + @Mock + private AmazonKinesis kinesisClient; + @Mock + private AsyncProcessor processor; private final CamelContext context = new DefaultCamelContext(); private final KinesisComponent component = new KinesisComponent(context); @@ -62,19 +69,19 @@ public class KinesisConsumerTest { undertest = new KinesisConsumer(endpoint, processor); when(kinesisClient.getRecords(any(GetRecordsRequest.class))) - .thenReturn(new GetRecordsResult() - .withNextShardIterator("nextShardIterator") - ); + .thenReturn(new GetRecordsResult() + .withNextShardIterator("nextShardIterator") + ); when(kinesisClient.describeStream(any(DescribeStreamRequest.class))) - .thenReturn(new DescribeStreamResult() - .withStreamDescription(new StreamDescription() - .withShards(new Shard().withShardId("shardId")) - ) - ); + .thenReturn(new DescribeStreamResult() + .withStreamDescription(new StreamDescription() + .withShards(new Shard().withShardId("shardId")) + ) + ); when(kinesisClient.getShardIterator(any(GetShardIteratorRequest.class))) - .thenReturn(new GetShardIteratorResult() - .withShardIterator("shardIterator") - ); + .thenReturn(new GetShardIteratorResult() + .withShardIterator("shardIterator") + ); } @Test @@ -103,7 +110,6 @@ public class KinesisConsumerTest { assertThat(getRecordsReqCap.getValue().getShardIterator(), is("shardIterator")); } - @Test public void itUsesTheShardIteratorOnSubsiquentPolls() throws Exception { undertest.poll(); @@ -121,10 +127,10 @@ public class KinesisConsumerTest { @Test public void recordsAreSentToTheProcessor() throws Exception { when(kinesisClient.getRecords(any(GetRecordsRequest.class))) - .thenReturn(new GetRecordsResult() - .withNextShardIterator("nextShardIterator") - .withRecords(new Record().withSequenceNumber("1"), new Record().withSequenceNumber("2")) - ); + .thenReturn(new GetRecordsResult() + .withNextShardIterator("nextShardIterator") + .withRecords(new Record().withSequenceNumber("1"), new Record().withSequenceNumber("2")) + ); int messageCount = undertest.poll(); @@ -138,16 +144,15 @@ public class KinesisConsumerTest { @Test public void exchangePropertiesAreSet() throws Exception { - when(kinesisClient.getRecords(any(GetRecordsRequest.class))) - .thenReturn(new GetRecordsResult() - .withNextShardIterator("nextShardIterator") - .withRecords(new Record() - .withSequenceNumber("1") - .withApproximateArrivalTimestamp(new Date(42)) - .withPartitionKey("shardId") - ) - ); + .thenReturn(new GetRecordsResult() + .withNextShardIterator("nextShardIterator") + .withRecords(new Record() + .withSequenceNumber("1") + .withApproximateArrivalTimestamp(new Date(42)) + .withPartitionKey("shardId") + ) + ); undertest.poll(); @@ -159,4 +164,5 @@ public class KinesisConsumerTest { assertThat(exchangeCaptor.getValue().getProperty(KinesisConstants.SEQUENCE_NUMBER, String.class), is("1")); assertThat(exchangeCaptor.getValue().getProperty(KinesisConstants.SHARD_ID, String.class), is("shardId")); } + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/92f6c9b9/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisEndpointTest.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisEndpointTest.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisEndpointTest.java index 50653e3..a8f87c2 100644 --- a/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisEndpointTest.java +++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisEndpointTest.java @@ -21,18 +21,20 @@ import com.amazonaws.services.kinesis.model.ShardIteratorType; import org.apache.camel.CamelContext; import org.apache.camel.impl.DefaultCamelContext; import org.apache.camel.impl.SimpleRegistry; -import static org.hamcrest.CoreMatchers.is; -import static org.junit.Assert.assertThat; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mock; import org.mockito.runners.MockitoJUnitRunner; +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; + @RunWith(MockitoJUnitRunner.class) public class KinesisEndpointTest { - @Mock private AmazonKinesis amazonKinesisClient; + @Mock + private AmazonKinesis amazonKinesisClient; private CamelContext camelContext; @@ -45,11 +47,11 @@ public class KinesisEndpointTest { @Test public void allTheEndpointParams() throws Exception { - KinesisEndpoint endpoint = (KinesisEndpoint)camelContext.getEndpoint("aws-kinesis://some_stream_name" + KinesisEndpoint endpoint = (KinesisEndpoint) camelContext.getEndpoint("aws-kinesis://some_stream_name" + "?amazonKinesisClient=#kinesisClient" + "&maxResultsPerRequest=101" + "&iteratorType=latest" - ); + ); assertThat(endpoint.getClient(), is(amazonKinesisClient)); assertThat(endpoint.getStreamName(), is("some_stream_name")); @@ -59,9 +61,9 @@ public class KinesisEndpointTest { @Test public void onlyRequiredEndpointParams() throws Exception { - KinesisEndpoint endpoint = (KinesisEndpoint)camelContext.getEndpoint("aws-kinesis://some_stream_name" + KinesisEndpoint endpoint = (KinesisEndpoint) camelContext.getEndpoint("aws-kinesis://some_stream_name" + "?amazonKinesisClient=#kinesisClient" - ); + ); assertThat(endpoint.getClient(), is(amazonKinesisClient)); assertThat(endpoint.getStreamName(), is("some_stream_name")); http://git-wip-us.apache.org/repos/asf/camel/blob/92f6c9b9/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/RecordStringConverterTest.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/RecordStringConverterTest.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/RecordStringConverterTest.java index 48f8edb..15556d9 100644 --- a/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/RecordStringConverterTest.java +++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/RecordStringConverterTest.java @@ -16,12 +16,14 @@ */ package org.apache.camel.component.aws.kinesis; -import com.amazonaws.services.kinesis.model.Record; import java.nio.ByteBuffer; import java.nio.charset.Charset; -import static org.hamcrest.CoreMatchers.is; + +import com.amazonaws.services.kinesis.model.Record; import org.junit.Test; -import static org.junit.Assert.*; + +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; public class RecordStringConverterTest { @@ -29,8 +31,7 @@ public class RecordStringConverterTest { public void convertRecordToString() throws Exception { Record record = new Record() .withSequenceNumber("1") - .withData(ByteBuffer.wrap("this is a String".getBytes(Charset.forName("UTF-8")))) - ; + .withData(ByteBuffer.wrap("this is a String".getBytes(Charset.forName("UTF-8")))); String result = RecordStringConverter.toString(record); assertThat(result, is("this is a String"));