Repository: camel Updated Branches: refs/heads/master 19e63dbaf -> e78a02960
Added an endpoint for consuming Amazon Kinesis Streams. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/72370df1 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/72370df1 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/72370df1 Branch: refs/heads/master Commit: 72370df11c9768b697a930e22c417f1af95dd919 Parents: 19e63db Author: Candle <can...@candle.me.uk> Authored: Wed Dec 2 16:12:48 2015 +0000 Committer: Claus Ibsen <davscl...@apache.org> Committed: Sat Dec 5 10:06:17 2015 +0100 ---------------------------------------------------------------------- .../component/aws/kinesis/KinesisComponent.java | 44 +++++ .../component/aws/kinesis/KinesisConstants.java | 29 ++++ .../component/aws/kinesis/KinesisConsumer.java | 126 +++++++++++++++ .../component/aws/kinesis/KinesisEndpoint.java | 123 ++++++++++++++ .../aws/kinesis/RecordStringConverter.java | 43 +++++ .../services/org/apache/camel/TypeConverter | 18 +++ .../org/apache/camel/component/aws-kinesis | 18 +++ .../aws/kinesis/KinesisConsumerTest.java | 162 +++++++++++++++++++ .../aws/kinesis/KinesisEndpointTest.java | 71 ++++++++ .../aws/kinesis/RecordStringConverterTest.java | 38 +++++ 10 files changed, 672 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/72370df1/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisComponent.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisComponent.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisComponent.java new file mode 100644 index 0000000..d3f34ab --- /dev/null +++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisComponent.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.aws.kinesis; + +import java.util.Map; +import org.apache.camel.CamelContext; +import org.apache.camel.Endpoint; +import org.apache.camel.impl.UriEndpointComponent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class KinesisComponent extends UriEndpointComponent { + private static final Logger LOG = LoggerFactory.getLogger(KinesisComponent.class); + + public KinesisComponent() { + super(KinesisEndpoint.class); + } + + public KinesisComponent(CamelContext context) { + super(context, KinesisEndpoint.class); + } + + @Override + protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception { + KinesisEndpoint endpoint = new KinesisEndpoint(uri, remaining, this); + + LOG.debug("Created endpoint: {}", endpoint.toString()); + return endpoint; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/72370df1/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisConstants.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisConstants.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisConstants.java new file mode 100644 index 0000000..b028123 --- /dev/null +++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisConstants.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.aws.kinesis; + +public interface KinesisConstants { + + public static final String SEQUENCE_NUMBER = "KinesisSequenceNumber"; + public static final String APPROX_ARRIVAL_TIME = "KinesisApproximateArrivalTimestamp"; + public static final String PARTITION_KEY = "KinesisPartitionKey"; + /** + * in a Kinesis Record object, the shard ID is obtained from the getPartitionKey method. + */ + public static final String SHARD_ID = "KinesisPartitionKey"; + +} http://git-wip-us.apache.org/repos/asf/camel/blob/72370df1/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisConsumer.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisConsumer.java new file mode 100644 index 0000000..b301f38 --- /dev/null +++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisConsumer.java @@ -0,0 +1,126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.aws.kinesis; + +import com.amazonaws.services.kinesis.AmazonKinesis; +import com.amazonaws.services.kinesis.model.DescribeStreamRequest; +import com.amazonaws.services.kinesis.model.DescribeStreamResult; +import com.amazonaws.services.kinesis.model.GetRecordsRequest; +import com.amazonaws.services.kinesis.model.GetRecordsResult; +import com.amazonaws.services.kinesis.model.GetShardIteratorRequest; +import com.amazonaws.services.kinesis.model.GetShardIteratorResult; +import com.amazonaws.services.kinesis.model.Record; +import java.util.ArrayDeque; +import java.util.List; +import java.util.Queue; +import org.apache.camel.AsyncCallback; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.impl.ScheduledBatchPollingConsumer; +import org.apache.camel.util.CastUtils; +import org.apache.camel.util.ObjectHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class KinesisConsumer extends ScheduledBatchPollingConsumer { + private static final Logger LOG = LoggerFactory.getLogger(KinesisConsumer.class); + + private String currentShardIterator = null; + + public KinesisConsumer(KinesisEndpoint endpoint, Processor processor) { + super(endpoint, processor); + } + + /* + * Returns the number of messages polled. + */ + @Override + protected int poll() throws Exception { + GetRecordsRequest req = new GetRecordsRequest() + .withShardIterator(getShardItertor()) + .withLimit(getEndpoint().getMaxResultsPerRequest()) + ; + GetRecordsResult result = getClient().getRecords(req); + + Queue<Exchange> exchanges = createExchanges(result.getRecords()); + int processedExchangeCount = processBatch(CastUtils.cast(exchanges)); + + // May cache the last successful sequence number, and pass it to the + // getRecords request. That way, on the next poll, we start from where + // we left off, however, I don't know what happens to subsiquent + // exchanges when an earlier echange fails. + + currentShardIterator = result.getNextShardIterator(); + + return processedExchangeCount; + } + + @Override + public int processBatch(Queue<Object> exchanges) throws Exception { + int processedExchanges = 0; + while (!exchanges.isEmpty()) { + final Exchange exchange = ObjectHelper.cast(Exchange.class, exchanges.poll()); + + LOG.trace("Processing exchange [{}] started.", exchange); + getAsyncProcessor().process(exchange, new AsyncCallback() { + @Override + public void done(boolean doneSync) { + LOG.trace("Processing exchange [{}] done.", exchange); + } + }); + processedExchanges++; + } + return processedExchanges; + } + + private AmazonKinesis getClient() { + return getEndpoint().getClient(); + } + + @Override + public KinesisEndpoint getEndpoint() { + return (KinesisEndpoint) super.getEndpoint(); + } + + private String getShardItertor() { + // either return a cached one or get a new one via a GetShardIterator request. + if (currentShardIterator == null) { + DescribeStreamRequest req1 = new DescribeStreamRequest() + .withStreamName(getEndpoint().getStreamName()) + ; + DescribeStreamResult res1 = getClient().describeStream(req1); + + GetShardIteratorRequest req = new GetShardIteratorRequest() + .withStreamName(getEndpoint().getStreamName()) + .withShardId(res1.getStreamDescription().getShards().get(0).getShardId()) // XXX only uses the first shard + .withShardIteratorType(getEndpoint().getIteratorType()) + ; + GetShardIteratorResult result = getClient().getShardIterator(req); + currentShardIterator = result.getShardIterator(); + } + LOG.debug("Shard Iterator is: {}", currentShardIterator); + return currentShardIterator; + } + + private Queue<Exchange> createExchanges(List<Record> records) { + Queue<Exchange> exchanges = new ArrayDeque<>(); + for (Record record : records) { + exchanges.add(getEndpoint().createExchange(record)); + } + return exchanges; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/72370df1/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisEndpoint.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisEndpoint.java new file mode 100644 index 0000000..b4c7597 --- /dev/null +++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisEndpoint.java @@ -0,0 +1,123 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.aws.kinesis; + +import com.amazonaws.services.kinesis.AmazonKinesis; +import com.amazonaws.services.kinesis.model.Record; +import com.amazonaws.services.kinesis.model.ShardIteratorType; +import org.apache.camel.Consumer; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.Producer; +import org.apache.camel.impl.ScheduledPollEndpoint; +import org.apache.camel.spi.Metadata; +import org.apache.camel.spi.UriEndpoint; +import org.apache.camel.spi.UriParam; +import org.apache.camel.spi.UriPath; + +@UriEndpoint(scheme = "aws-kinesis", title = "AWS Kinesis", syntax = "aws-kinesis:streamName", consumerClass = KinesisConsumer.class, label = "cloud,messaging") +public class KinesisEndpoint extends ScheduledPollEndpoint { + + @UriPath(label = "consumer,producer", description = "Name of the stream") + @Metadata(required = "true") + private String streamName; + + // For now, always assume that we've been supplied a client in the Camel registry. + @UriParam(label = "consumer", description = "Amazon Kinesis client to use for all requests for this endpoint") + @Metadata(required = "true") + private AmazonKinesis amazonKinesisClient; + + @UriParam(label = "consumer", description = "Maximum number of records that will be fetched in each poll") + private int maxResultsPerRequest = 1; + + @UriParam(label = "consumer", description = "Defines where in the Kinesis stream to start getting records") + private ShardIteratorType iteratorType = ShardIteratorType.TRIM_HORIZON; + + public KinesisEndpoint(String uri, String streamName, KinesisComponent component) { + super(uri, component); + this.streamName = streamName; + } + + @Override + public Producer createProducer() throws Exception { + throw new UnsupportedOperationException("Not supported yet."); + } + + @Override + public Consumer createConsumer(Processor processor) throws Exception { + return new KinesisConsumer(this, processor); + } + + Exchange createExchange(Record record) { + Exchange ex = super.createExchange(); + ex.getIn().setBody(record, Record.class); + ex.setProperty(KinesisConstants.APPROX_ARRIVAL_TIME, record.getApproximateArrivalTimestamp()); + ex.setProperty(KinesisConstants.PARTITION_KEY, record.getPartitionKey()); + ex.setProperty(KinesisConstants.SEQUENCE_NUMBER, record.getSequenceNumber()); + + return ex; + } + + @Override + public boolean isSingleton() { + // probably right. + return true; + } + + AmazonKinesis getClient() { + return amazonKinesisClient; + } + + // required for injection. + public AmazonKinesis getAmazonKinesisClient() { + return amazonKinesisClient; + } + + public void setAmazonKinesisClient(AmazonKinesis amazonKinesisClient) { + this.amazonKinesisClient = amazonKinesisClient; + } + + public int getMaxResultsPerRequest() { + return maxResultsPerRequest; + } + + public void setMaxResultsPerRequest(int maxResultsPerRequest) { + this.maxResultsPerRequest = maxResultsPerRequest; + } + + public String getStreamName() { + return streamName; + } + + public void setStreamName(String streamName) { + this.streamName = streamName; + } + + public ShardIteratorType getIteratorType() { + return iteratorType; + } + + public void setIteratorType(ShardIteratorType iteratorType) { + this.iteratorType = iteratorType; + } + + @Override + public String toString() { + return "KinesisEndpoint{" + "amazonKinesisClient=[redacted], maxResultsPerRequest=" + maxResultsPerRequest + ", iteratorType=" + iteratorType + ", streamName=" + streamName + '}'; + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/72370df1/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/RecordStringConverter.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/RecordStringConverter.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/RecordStringConverter.java new file mode 100644 index 0000000..bda8983 --- /dev/null +++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/RecordStringConverter.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.aws.kinesis; + +import com.amazonaws.services.kinesis.model.Record; +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.List; +import org.apache.camel.Converter; + +@Converter +public class RecordStringConverter { + + @Converter + public static String toString(Record record) { + List<Byte> bytes = new ArrayList<>(); + ByteBuffer buf = record.getData().asReadOnlyBuffer(); + while (buf.hasRemaining()) { + bytes.add(buf.get()); + } + byte[] a = new byte[bytes.size()]; + for (int i = 0; i < bytes.size(); ++i) { + a[i] = bytes.get(i); + } + return new String(a, Charset.forName("UTF-8")); + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/72370df1/components/camel-aws/src/main/resources/META-INF/services/org/apache/camel/TypeConverter ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/main/resources/META-INF/services/org/apache/camel/TypeConverter b/components/camel-aws/src/main/resources/META-INF/services/org/apache/camel/TypeConverter new file mode 100644 index 0000000..28707d6 --- /dev/null +++ b/components/camel-aws/src/main/resources/META-INF/services/org/apache/camel/TypeConverter @@ -0,0 +1,18 @@ +## --------------------------------------------------------------------------- +## Licensed to the Apache Software Foundation (ASF) under one or more +## contributor license agreements. See the NOTICE file distributed with +## this work for additional information regarding copyright ownership. +## The ASF licenses this file to You under the Apache License, Version 2.0 +## (the "License"); you may not use this file except in compliance with +## the License. You may obtain a copy of the License at +## +## http://www.apache.org/licenses/LICENSE-2.0 +## +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +## See the License for the specific language governing permissions and +## limitations under the License. +## --------------------------------------------------------------------------- + +org.apache.camel.component.aws.kinesis http://git-wip-us.apache.org/repos/asf/camel/blob/72370df1/components/camel-aws/src/main/resources/META-INF/services/org/apache/camel/component/aws-kinesis ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/main/resources/META-INF/services/org/apache/camel/component/aws-kinesis b/components/camel-aws/src/main/resources/META-INF/services/org/apache/camel/component/aws-kinesis new file mode 100644 index 0000000..2329cbf --- /dev/null +++ b/components/camel-aws/src/main/resources/META-INF/services/org/apache/camel/component/aws-kinesis @@ -0,0 +1,18 @@ +## --------------------------------------------------------------------------- +## Licensed to the Apache Software Foundation (ASF) under one or more +## contributor license agreements. See the NOTICE file distributed with +## this work for additional information regarding copyright ownership. +## The ASF licenses this file to You under the Apache License, Version 2.0 +## (the "License"); you may not use this file except in compliance with +## the License. You may obtain a copy of the License at +## +## http://www.apache.org/licenses/LICENSE-2.0 +## +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +## See the License for the specific language governing permissions and +## limitations under the License. +## --------------------------------------------------------------------------- + +class=org.apache.camel.component.aws.kinesis.KinesisComponent http://git-wip-us.apache.org/repos/asf/camel/blob/72370df1/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisConsumerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisConsumerTest.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisConsumerTest.java new file mode 100644 index 0000000..db0df68 --- /dev/null +++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisConsumerTest.java @@ -0,0 +1,162 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.aws.kinesis; + +import com.amazonaws.services.kinesis.AmazonKinesis; +import com.amazonaws.services.kinesis.model.DescribeStreamRequest; +import com.amazonaws.services.kinesis.model.DescribeStreamResult; +import com.amazonaws.services.kinesis.model.GetRecordsRequest; +import com.amazonaws.services.kinesis.model.GetRecordsResult; +import com.amazonaws.services.kinesis.model.GetShardIteratorRequest; +import com.amazonaws.services.kinesis.model.GetShardIteratorResult; +import com.amazonaws.services.kinesis.model.Record; +import com.amazonaws.services.kinesis.model.Shard; +import com.amazonaws.services.kinesis.model.ShardIteratorType; +import com.amazonaws.services.kinesis.model.StreamDescription; +import java.util.Date; +import org.apache.camel.AsyncCallback; +import org.apache.camel.AsyncProcessor; +import org.apache.camel.CamelContext; +import org.apache.camel.Exchange; +import org.apache.camel.impl.DefaultCamelContext; +import static org.hamcrest.CoreMatchers.is; +import org.junit.Test; +import static org.junit.Assert.*; +import org.junit.Before; +import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import static org.mockito.Mockito.*; +import org.mockito.runners.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class KinesisConsumerTest { + + @Mock private AmazonKinesis kinesisClient; + @Mock private AsyncProcessor processor; + + private final CamelContext context = new DefaultCamelContext(); + private final KinesisComponent component = new KinesisComponent(context); + + private KinesisConsumer undertest; + + @Before + public void setup() throws Exception { + KinesisEndpoint endpoint = new KinesisEndpoint(null, "streamName", component); + endpoint.setAmazonKinesisClient(kinesisClient); + endpoint.setIteratorType(ShardIteratorType.LATEST); + undertest = new KinesisConsumer(endpoint, processor); + + when(kinesisClient.getRecords(any(GetRecordsRequest.class))) + .thenReturn(new GetRecordsResult() + .withNextShardIterator("nextShardIterator") + ); + when(kinesisClient.describeStream(any(DescribeStreamRequest.class))) + .thenReturn(new DescribeStreamResult() + .withStreamDescription(new StreamDescription() + .withShards(new Shard().withShardId("shardId")) + ) + ); + when(kinesisClient.getShardIterator(any(GetShardIteratorRequest.class))) + .thenReturn(new GetShardIteratorResult() + .withShardIterator("shardIterator") + ); + } + + @Test + public void itObtainsAShardIteratorOnFirstPoll() throws Exception { + undertest.poll(); + + final ArgumentCaptor<DescribeStreamRequest> describeStreamReqCap = ArgumentCaptor.forClass(DescribeStreamRequest.class); + final ArgumentCaptor<GetShardIteratorRequest> getShardIteratorReqCap = ArgumentCaptor.forClass(GetShardIteratorRequest.class); + + verify(kinesisClient).describeStream(describeStreamReqCap.capture()); + assertThat(describeStreamReqCap.getValue().getStreamName(), is("streamName")); + + verify(kinesisClient).getShardIterator(getShardIteratorReqCap.capture()); + assertThat(getShardIteratorReqCap.getValue().getStreamName(), is("streamName")); + assertThat(getShardIteratorReqCap.getValue().getShardId(), is("shardId")); + assertThat(getShardIteratorReqCap.getValue().getShardIteratorType(), is("LATEST")); + } + + @Test + public void itUsesTheShardIteratorOnPolls() throws Exception { + undertest.poll(); + + final ArgumentCaptor<GetRecordsRequest> getRecordsReqCap = ArgumentCaptor.forClass(GetRecordsRequest.class); + verify(kinesisClient).getRecords(getRecordsReqCap.capture()); + + assertThat(getRecordsReqCap.getValue().getShardIterator(), is("shardIterator")); + } + + + @Test + public void itUsesTheShardIteratorOnSubsiquentPolls() throws Exception { + undertest.poll(); + undertest.poll(); + + final ArgumentCaptor<GetRecordsRequest> getRecordsReqCap = ArgumentCaptor.forClass(GetRecordsRequest.class); + + verify(kinesisClient, times(1)).describeStream(any(DescribeStreamRequest.class)); + verify(kinesisClient, times(1)).getShardIterator(any(GetShardIteratorRequest.class)); + verify(kinesisClient, times(2)).getRecords(getRecordsReqCap.capture()); + assertThat(getRecordsReqCap.getAllValues().get(0).getShardIterator(), is("shardIterator")); + assertThat(getRecordsReqCap.getAllValues().get(1).getShardIterator(), is("nextShardIterator")); + } + + @Test + public void recordsAreSentToTheProcessor() throws Exception { + when(kinesisClient.getRecords(any(GetRecordsRequest.class))) + .thenReturn(new GetRecordsResult() + .withNextShardIterator("nextShardIterator") + .withRecords(new Record().withSequenceNumber("1"), new Record().withSequenceNumber("2")) + ); + + int messageCount = undertest.poll(); + + assertThat(messageCount, is(2)); + final ArgumentCaptor<Exchange> exchangeCaptor = ArgumentCaptor.forClass(Exchange.class); + + verify(processor, times(2)).process(exchangeCaptor.capture(), any(AsyncCallback.class)); + assertThat(exchangeCaptor.getAllValues().get(0).getIn().getBody(Record.class).getSequenceNumber(), is("1")); + assertThat(exchangeCaptor.getAllValues().get(1).getIn().getBody(Record.class).getSequenceNumber(), is("2")); + } + + @Test + public void exchangePropertiesAreSet() throws Exception { + + when(kinesisClient.getRecords(any(GetRecordsRequest.class))) + .thenReturn(new GetRecordsResult() + .withNextShardIterator("nextShardIterator") + .withRecords(new Record() + .withSequenceNumber("1") + .withApproximateArrivalTimestamp(new Date(42)) + .withPartitionKey("shardId") + ) + ); + + undertest.poll(); + + final ArgumentCaptor<Exchange> exchangeCaptor = ArgumentCaptor.forClass(Exchange.class); + + verify(processor).process(exchangeCaptor.capture(), any(AsyncCallback.class)); + assertThat(exchangeCaptor.getValue().getProperty(KinesisConstants.APPROX_ARRIVAL_TIME, long.class), is(42L)); + assertThat(exchangeCaptor.getValue().getProperty(KinesisConstants.PARTITION_KEY, String.class), is("shardId")); + assertThat(exchangeCaptor.getValue().getProperty(KinesisConstants.SEQUENCE_NUMBER, String.class), is("1")); + assertThat(exchangeCaptor.getValue().getProperty(KinesisConstants.SHARD_ID, String.class), is("shardId")); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/72370df1/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisEndpointTest.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisEndpointTest.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisEndpointTest.java new file mode 100644 index 0000000..50653e3 --- /dev/null +++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisEndpointTest.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.aws.kinesis; + +import com.amazonaws.services.kinesis.AmazonKinesis; +import com.amazonaws.services.kinesis.model.ShardIteratorType; +import org.apache.camel.CamelContext; +import org.apache.camel.impl.DefaultCamelContext; +import org.apache.camel.impl.SimpleRegistry; +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class KinesisEndpointTest { + + @Mock private AmazonKinesis amazonKinesisClient; + + private CamelContext camelContext; + + @Before + public void setup() throws Exception { + SimpleRegistry registry = new SimpleRegistry(); + registry.put("kinesisClient", amazonKinesisClient); + camelContext = new DefaultCamelContext(registry); + } + + @Test + public void allTheEndpointParams() throws Exception { + KinesisEndpoint endpoint = (KinesisEndpoint)camelContext.getEndpoint("aws-kinesis://some_stream_name" + + "?amazonKinesisClient=#kinesisClient" + + "&maxResultsPerRequest=101" + + "&iteratorType=latest" + ); + + assertThat(endpoint.getClient(), is(amazonKinesisClient)); + assertThat(endpoint.getStreamName(), is("some_stream_name")); + assertThat(endpoint.getIteratorType(), is(ShardIteratorType.LATEST)); + assertThat(endpoint.getMaxResultsPerRequest(), is(101)); + } + + @Test + public void onlyRequiredEndpointParams() throws Exception { + KinesisEndpoint endpoint = (KinesisEndpoint)camelContext.getEndpoint("aws-kinesis://some_stream_name" + + "?amazonKinesisClient=#kinesisClient" + ); + + assertThat(endpoint.getClient(), is(amazonKinesisClient)); + assertThat(endpoint.getStreamName(), is("some_stream_name")); + assertThat(endpoint.getIteratorType(), is(ShardIteratorType.TRIM_HORIZON)); + assertThat(endpoint.getMaxResultsPerRequest(), is(1)); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/72370df1/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/RecordStringConverterTest.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/RecordStringConverterTest.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/RecordStringConverterTest.java new file mode 100644 index 0000000..48f8edb --- /dev/null +++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/RecordStringConverterTest.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.aws.kinesis; + +import com.amazonaws.services.kinesis.model.Record; +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import static org.hamcrest.CoreMatchers.is; +import org.junit.Test; +import static org.junit.Assert.*; + +public class RecordStringConverterTest { + + @Test + public void convertRecordToString() throws Exception { + Record record = new Record() + .withSequenceNumber("1") + .withData(ByteBuffer.wrap("this is a String".getBytes(Charset.forName("UTF-8")))) + ; + + String result = RecordStringConverter.toString(record); + assertThat(result, is("this is a String")); + } +} \ No newline at end of file