This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch CAMEL-12089
in repository https://gitbox.apache.org/repos/asf/camel.git

commit fa740ff4f0c11aaa1b4b8e02881cba72ef7bd1a4
Author: Andrea Cosentino <anco...@gmail.com>
AuthorDate: Mon Dec 18 14:09:42 2017 +0100

    CAMEL-12089 - Camel-AWS: Kinesis consumer starts consuming data from the 
beginning even though the shard is in Closed state
---
 .../src/main/docs/aws-kinesis-component.adoc       |  3 +-
 .../component/aws/kinesis/KinesisConsumer.java     | 60 ++++++++++----
 .../component/aws/kinesis/KinesisEndpoint.java     | 19 ++++-
 .../kinesis/KinesisShardClosedStrategyEnum.java    | 24 ++++++
 .../aws/kinesis/ReachedClosedStatusException.java  | 29 +++++++
 .../KinesisConsumerClosedShardWithFailTest.java    | 96 ++++++++++++++++++++++
 ... KinesisConsumerClosedShardWithSilentTest.java} | 17 ++--
 .../component/aws/kinesis/KinesisConsumerTest.java | 13 ++-
 8 files changed, 235 insertions(+), 26 deletions(-)

diff --git a/components/camel-aws/src/main/docs/aws-kinesis-component.adoc 
b/components/camel-aws/src/main/docs/aws-kinesis-component.adoc
index 39006e5..5ec4b7d 100644
--- a/components/camel-aws/src/main/docs/aws-kinesis-component.adoc
+++ b/components/camel-aws/src/main/docs/aws-kinesis-component.adoc
@@ -52,7 +52,7 @@ with the following path and query parameters:
 | *streamName* | *Required* Name of the stream |  | String
 |===
 
-==== Query Parameters (24 parameters):
+==== Query Parameters (25 parameters):
 
 [width="100%",cols="2,5,^1,2",options="header"]
 |===
@@ -63,6 +63,7 @@ with the following path and query parameters:
 | *maxResultsPerRequest* (consumer) | Maximum number of records that will be 
fetched in each poll | 1 | int
 | *sendEmptyMessageWhenIdle* (consumer) | If the polling consumer did not poll 
any files you can enable this option to send an empty message (no body) 
instead. | false | boolean
 | *sequenceNumber* (consumer) | The sequence number to start polling from. 
Required if iteratorType is set to AFTER_SEQUENCE_NUMBER or AT_SEQUENCE_NUMBER 
|  | String
+| *shardClosed* (consumer) | Define what will be the behavior in case of shard 
closed. Possible value are ignore silent and fail.In case of ignore a message 
will be logged and the consumer will restart from the beginningin case of 
silent there will be no logging and the consumer will start from the 
beginningin case of fail a ReachedClosedStateException will be raised | ignore 
| KinesisShardClosed StrategyEnum
 | *shardId* (consumer) | Defines which shardId in the Kinesis stream to get 
records from |  | String
 | *exceptionHandler* (consumer) | To let the consumer use a custom 
ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this 
options is not in use. By default the consumer will deal with exceptions that 
will be logged at WARN or ERROR level and ignored. |  | ExceptionHandler
 | *exchangePattern* (consumer) | Sets the exchange pattern when the consumer 
creates an exchange. |  | ExchangePattern
diff --git 
a/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisConsumer.java
 
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisConsumer.java
index 1addcdf..ec480e4 100644
--- 
a/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisConsumer.java
+++ 
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisConsumer.java
@@ -17,6 +17,7 @@
 package org.apache.camel.component.aws.kinesis;
 
 import java.util.ArrayDeque;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Queue;
 
@@ -28,7 +29,9 @@ import com.amazonaws.services.kinesis.model.GetRecordsResult;
 import com.amazonaws.services.kinesis.model.GetShardIteratorRequest;
 import com.amazonaws.services.kinesis.model.GetShardIteratorResult;
 import com.amazonaws.services.kinesis.model.Record;
+import com.amazonaws.services.kinesis.model.Shard;
 import com.amazonaws.services.kinesis.model.ShardIteratorType;
+
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
@@ -42,6 +45,7 @@ public class KinesisConsumer extends 
ScheduledBatchPollingConsumer {
     private static final Logger LOG = 
LoggerFactory.getLogger(KinesisConsumer.class);
 
     private String currentShardIterator;
+    private boolean isShardClosed;
 
     public KinesisConsumer(KinesisEndpoint endpoint, Processor processor) {
         super(endpoint, processor);
@@ -49,9 +53,7 @@ public class KinesisConsumer extends 
ScheduledBatchPollingConsumer {
 
     @Override
     protected int poll() throws Exception {
-        GetRecordsRequest req = new GetRecordsRequest()
-                .withShardIterator(getShardItertor())
-                .withLimit(getEndpoint().getMaxResultsPerRequest());
+        GetRecordsRequest req = new 
GetRecordsRequest().withShardIterator(getShardItertor()).withLimit(getEndpoint().getMaxResultsPerRequest());
         GetRecordsResult result = getClient().getRecords(req);
 
         Queue<Exchange> exchanges = createExchanges(result.getRecords());
@@ -63,6 +65,20 @@ public class KinesisConsumer extends 
ScheduledBatchPollingConsumer {
         // exchanges when an earlier echangee fails.
 
         currentShardIterator = result.getNextShardIterator();
+        if (isShardClosed) {
+            switch (getEndpoint().getShardClosed()) {
+            case ignore:
+                LOG.warn("The shard {} is in closed state");
+                break;
+            case silent:
+                break;
+            case fail:
+                LOG.info("Shard Iterator reaches CLOSE status:", 
getEndpoint().getStreamName(), getEndpoint().getShardId());
+                throw new 
ReachedClosedStatusException(getEndpoint().getStreamName(), 
getEndpoint().getShardId());
+            default:
+                throw new IllegalArgumentException("Unsupported shard closed 
strategy");
+            }
+        }
 
         return processedExchangeCount;
     }
@@ -91,29 +107,46 @@ public class KinesisConsumer extends 
ScheduledBatchPollingConsumer {
 
     @Override
     public KinesisEndpoint getEndpoint() {
-        return (KinesisEndpoint) super.getEndpoint();
+        return (KinesisEndpoint)super.getEndpoint();
     }
 
     private String getShardItertor() {
-        // either return a cached one or get a new one via a GetShardIterator 
request.
+        // either return a cached one or get a new one via a GetShardIterator
+        // request.
         if (currentShardIterator == null) {
             String shardId;
 
-            //If ShardId supplied use it, else choose first one
+            // If ShardId supplied use it, else choose first one
             if (!getEndpoint().getShardId().isEmpty()) {
                 shardId = getEndpoint().getShardId();
+                DescribeStreamRequest req1 = new 
DescribeStreamRequest().withStreamName(getEndpoint().getStreamName());
+                DescribeStreamResult res1 = getClient().describeStream(req1);
+                Iterator it = 
res1.getStreamDescription().getShards().iterator();
+                while (it.hasNext()) {
+                    Shard shard = (Shard)it.next();
+                    if 
(shard.getShardId().equalsIgnoreCase(getEndpoint().getShardId())) {
+                        if 
(shard.getSequenceNumberRange().getEndingSequenceNumber() == null) {
+                            isShardClosed = false;
+                        } else {
+                            isShardClosed = true;
+                        }
+                    }
+                }
+
             } else {
-                DescribeStreamRequest req1 = new DescribeStreamRequest()
-                        .withStreamName(getEndpoint().getStreamName());
+                DescribeStreamRequest req1 = new 
DescribeStreamRequest().withStreamName(getEndpoint().getStreamName());
                 DescribeStreamResult res1 = getClient().describeStream(req1);
                 shardId = 
res1.getStreamDescription().getShards().get(0).getShardId();
+                if 
(res1.getStreamDescription().getShards().get(0).getSequenceNumberRange().getEndingSequenceNumber()
 == null) {
+                    isShardClosed = false;
+                } else {
+                    isShardClosed = true;
+                }
             }
             LOG.debug("ShardId is: {}", shardId);
 
-            GetShardIteratorRequest req = new GetShardIteratorRequest()
-                    .withStreamName(getEndpoint().getStreamName())
-                    .withShardId(shardId)
-                    .withShardIteratorType(getEndpoint().getIteratorType());
+            GetShardIteratorRequest req = new 
GetShardIteratorRequest().withStreamName(getEndpoint().getStreamName()).withShardId(shardId)
+                .withShardIteratorType(getEndpoint().getIteratorType());
 
             if (hasSequenceNumber()) {
                 
req.withStartingSequenceNumber(getEndpoint().getSequenceNumber());
@@ -136,7 +169,6 @@ public class KinesisConsumer extends 
ScheduledBatchPollingConsumer {
 
     private boolean hasSequenceNumber() {
         return !getEndpoint().getSequenceNumber().isEmpty()
-                && 
(getEndpoint().getIteratorType().equals(ShardIteratorType.AFTER_SEQUENCE_NUMBER)
-                    || 
getEndpoint().getIteratorType().equals(ShardIteratorType.AT_SEQUENCE_NUMBER));
+               && 
(getEndpoint().getIteratorType().equals(ShardIteratorType.AFTER_SEQUENCE_NUMBER)
 || 
getEndpoint().getIteratorType().equals(ShardIteratorType.AT_SEQUENCE_NUMBER));
     }
 }
diff --git 
a/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisEndpoint.java
 
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisEndpoint.java
index 3bd06e5..9515889 100644
--- 
a/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisEndpoint.java
+++ 
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisEndpoint.java
@@ -19,6 +19,7 @@ package org.apache.camel.component.aws.kinesis;
 import com.amazonaws.services.kinesis.AmazonKinesis;
 import com.amazonaws.services.kinesis.model.Record;
 import com.amazonaws.services.kinesis.model.ShardIteratorType;
+
 import org.apache.camel.Consumer;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
@@ -30,10 +31,10 @@ import org.apache.camel.spi.UriParam;
 import org.apache.camel.spi.UriPath;
 
 /**
- * The aws-kinesis component is for consuming and producing records from 
Amazon Kinesis Streams.
+ * The aws-kinesis component is for consuming and producing records from Amazon
+ * Kinesis Streams.
  */
-@UriEndpoint(firstVersion = "2.17.0", scheme = "aws-kinesis", title = "AWS 
Kinesis", syntax = "aws-kinesis:streamName",
-    consumerClass = KinesisConsumer.class, label = "cloud,messaging")
+@UriEndpoint(firstVersion = "2.17.0", scheme = "aws-kinesis", title = "AWS 
Kinesis", syntax = "aws-kinesis:streamName", consumerClass = 
KinesisConsumer.class, label = "cloud,messaging")
 public class KinesisEndpoint extends ScheduledPollEndpoint {
 
     @UriPath(description = "Name of the stream")
@@ -50,6 +51,11 @@ public class KinesisEndpoint extends ScheduledPollEndpoint {
     private String shardId = "";
     @UriParam(label = "consumer", description = "The sequence number to start 
polling from. Required if iteratorType is set to AFTER_SEQUENCE_NUMBER or 
AT_SEQUENCE_NUMBER")
     private String sequenceNumber = "";
+    @UriParam(label = "consumer", defaultValue = "ignore", description = 
"Define what will be the behavior in case of shard closed. Possible value are 
ignore, silent and fail."
+                                                                         + "In 
case of ignore a message will be logged and the consumer will restart from the 
beginning,"
+                                                                         + "in 
case of silent there will be no logging and the consumer will start from the 
beginning,"
+                                                                         + "in 
case of fail a ReachedClosedStateException will be raised")
+    private KinesisShardClosedStrategyEnum shardClosed;
 
     public KinesisEndpoint(String uri, String streamName, KinesisComponent 
component) {
         super(uri, component);
@@ -144,4 +150,11 @@ public class KinesisEndpoint extends ScheduledPollEndpoint 
{
         this.sequenceNumber = sequenceNumber;
     }
 
+    public KinesisShardClosedStrategyEnum getShardClosed() {
+        return shardClosed;
+    }
+
+    public void setShardClosed(KinesisShardClosedStrategyEnum shardClosed) {
+        this.shardClosed = shardClosed;
+    }
 }
diff --git 
a/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisShardClosedStrategyEnum.java
 
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisShardClosedStrategyEnum.java
new file mode 100644
index 0000000..c4db5e3
--- /dev/null
+++ 
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisShardClosedStrategyEnum.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.kinesis;
+
+public enum KinesisShardClosedStrategyEnum {
+
+    ignore,
+    fail,
+    silent
+}
diff --git 
a/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/ReachedClosedStatusException.java
 
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/ReachedClosedStatusException.java
new file mode 100644
index 0000000..8376511
--- /dev/null
+++ 
b/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/ReachedClosedStatusException.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.kinesis;
+
+public class ReachedClosedStatusException extends Exception {
+
+    private final String streamName;
+    private final String shardId;
+
+    public ReachedClosedStatusException(String streamName, String shardId) {
+        super();
+        this.streamName = streamName;
+        this.shardId = shardId;
+    }
+}
diff --git 
a/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisConsumerClosedShardWithFailTest.java
 
b/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisConsumerClosedShardWithFailTest.java
new file mode 100644
index 0000000..3f7e7ea
--- /dev/null
+++ 
b/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisConsumerClosedShardWithFailTest.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.kinesis;
+
+import java.util.ArrayList;
+
+import com.amazonaws.services.kinesis.AmazonKinesis;
+import com.amazonaws.services.kinesis.model.DescribeStreamRequest;
+import com.amazonaws.services.kinesis.model.DescribeStreamResult;
+import com.amazonaws.services.kinesis.model.GetRecordsRequest;
+import com.amazonaws.services.kinesis.model.GetRecordsResult;
+import com.amazonaws.services.kinesis.model.GetShardIteratorRequest;
+import com.amazonaws.services.kinesis.model.GetShardIteratorResult;
+import com.amazonaws.services.kinesis.model.SequenceNumberRange;
+import com.amazonaws.services.kinesis.model.Shard;
+import com.amazonaws.services.kinesis.model.ShardIteratorType;
+import com.amazonaws.services.kinesis.model.StreamDescription;
+
+import org.apache.camel.AsyncProcessor;
+import org.apache.camel.CamelContext;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class KinesisConsumerClosedShardWithFailTest {
+
+    @Mock
+    private AmazonKinesis kinesisClient;
+    @Mock
+    private AsyncProcessor processor;
+
+    private final CamelContext context = new DefaultCamelContext();
+    private final KinesisComponent component = new KinesisComponent(context);
+
+    private KinesisConsumer undertest;
+
+    @Before
+    public void setup() throws Exception {
+        KinesisEndpoint endpoint = new KinesisEndpoint(null, "streamName", 
component);
+        endpoint.setAmazonKinesisClient(kinesisClient);
+        endpoint.setIteratorType(ShardIteratorType.LATEST);
+        endpoint.setShardClosed(KinesisShardClosedStrategyEnum.fail);
+        undertest = new KinesisConsumer(endpoint, processor);
+
+        SequenceNumberRange range = new 
SequenceNumberRange().withEndingSequenceNumber("20");
+        Shard shard = new 
Shard().withShardId("shardId").withSequenceNumberRange(range);
+        ArrayList<Shard> shardList = new ArrayList<>();
+        shardList.add(shard);
+
+        
when(kinesisClient.getRecords(any(GetRecordsRequest.class))).thenReturn(new 
GetRecordsResult().withNextShardIterator("nextShardIterator"));
+        when(kinesisClient.describeStream(any(DescribeStreamRequest.class)))
+            .thenReturn(new DescribeStreamResult().withStreamDescription(new 
StreamDescription().withShards(shardList)));
+        
when(kinesisClient.getShardIterator(any(GetShardIteratorRequest.class))).thenReturn(new
 GetShardIteratorResult().withShardIterator("shardIterator"));
+    }
+
+    @Test(expected = ReachedClosedStatusException.class)
+    public void itObtainsAShardIteratorOnFirstPoll() throws Exception {
+        undertest.poll();
+
+        final ArgumentCaptor<DescribeStreamRequest> describeStreamReqCap = 
ArgumentCaptor.forClass(DescribeStreamRequest.class);
+        final ArgumentCaptor<GetShardIteratorRequest> getShardIteratorReqCap = 
ArgumentCaptor.forClass(GetShardIteratorRequest.class);
+
+        verify(kinesisClient).describeStream(describeStreamReqCap.capture());
+        assertThat(describeStreamReqCap.getValue().getStreamName(), 
is("streamName"));
+
+        
verify(kinesisClient).getShardIterator(getShardIteratorReqCap.capture());
+        assertThat(getShardIteratorReqCap.getValue().getStreamName(), 
is("streamName"));
+        assertThat(getShardIteratorReqCap.getValue().getShardId(), 
is("shardId"));
+        assertThat(getShardIteratorReqCap.getValue().getShardIteratorType(), 
is("LATEST"));
+    }
+}
diff --git 
a/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisConsumerTest.java
 
b/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisConsumerClosedShardWithSilentTest.java
similarity index 94%
copy from 
components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisConsumerTest.java
copy to 
components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisConsumerClosedShardWithSilentTest.java
index 3fb29fe..f5daa8f 100644
--- 
a/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisConsumerTest.java
+++ 
b/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisConsumerClosedShardWithSilentTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.component.aws.kinesis;
 
+import java.util.ArrayList;
 import java.util.Date;
 
 import com.amazonaws.services.kinesis.AmazonKinesis;
@@ -26,6 +27,7 @@ import com.amazonaws.services.kinesis.model.GetRecordsResult;
 import com.amazonaws.services.kinesis.model.GetShardIteratorRequest;
 import com.amazonaws.services.kinesis.model.GetShardIteratorResult;
 import com.amazonaws.services.kinesis.model.Record;
+import com.amazonaws.services.kinesis.model.SequenceNumberRange;
 import com.amazonaws.services.kinesis.model.Shard;
 import com.amazonaws.services.kinesis.model.ShardIteratorType;
 import com.amazonaws.services.kinesis.model.StreamDescription;
@@ -43,6 +45,7 @@ import org.mockito.junit.MockitoJUnitRunner;
 
 import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertThat;
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.times;
@@ -50,7 +53,7 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 @RunWith(MockitoJUnitRunner.class)
-public class KinesisConsumerTest {
+public class KinesisConsumerClosedShardWithSilentTest {
 
     @Mock
     private AmazonKinesis kinesisClient;
@@ -67,7 +70,14 @@ public class KinesisConsumerTest {
         KinesisEndpoint endpoint = new KinesisEndpoint(null, "streamName", 
component);
         endpoint.setAmazonKinesisClient(kinesisClient);
         endpoint.setIteratorType(ShardIteratorType.LATEST);
+        endpoint.setShardClosed(KinesisShardClosedStrategyEnum.silent);
         undertest = new KinesisConsumer(endpoint, processor);
+        
+        SequenceNumberRange range = new 
SequenceNumberRange().withEndingSequenceNumber("20");
+        Shard shard = new 
Shard().withShardId("shardId").withSequenceNumberRange(range);
+        ArrayList<Shard> shardList = new ArrayList<>();
+        shardList.add(shard);
+       
 
         when(kinesisClient.getRecords(any(GetRecordsRequest.class)))
             .thenReturn(new GetRecordsResult()
@@ -76,7 +86,7 @@ public class KinesisConsumerTest {
         when(kinesisClient.describeStream(any(DescribeStreamRequest.class)))
             .thenReturn(new DescribeStreamResult()
                 .withStreamDescription(new StreamDescription()
-                    .withShards(new Shard().withShardId("shardId"))
+                    .withShards(shardList)
                 )
             );
         
when(kinesisClient.getShardIterator(any(GetShardIteratorRequest.class)))
@@ -107,8 +117,6 @@ public class KinesisConsumerTest {
 
         undertest.poll();
 
-        verify(kinesisClient, 
never()).describeStream(any(DescribeStreamRequest.class));
-
         final ArgumentCaptor<GetShardIteratorRequest> getShardIteratorReqCap = 
ArgumentCaptor.forClass(GetShardIteratorRequest.class);
 
         
verify(kinesisClient).getShardIterator(getShardIteratorReqCap.capture());
@@ -203,5 +211,4 @@ public class KinesisConsumerTest {
         
assertThat(exchangeCaptor.getValue().getIn().getHeader(KinesisConstants.PARTITION_KEY,
 String.class), is(partitionKey));
         
assertThat(exchangeCaptor.getValue().getIn().getHeader(KinesisConstants.SEQUENCE_NUMBER,
 String.class), is(sequenceNumber));
     }
-
 }
\ No newline at end of file
diff --git 
a/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisConsumerTest.java
 
b/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisConsumerTest.java
index 3fb29fe..e43461a 100644
--- 
a/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisConsumerTest.java
+++ 
b/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisConsumerTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.component.aws.kinesis;
 
+import java.util.ArrayList;
 import java.util.Date;
 
 import com.amazonaws.services.kinesis.AmazonKinesis;
@@ -26,6 +27,7 @@ import com.amazonaws.services.kinesis.model.GetRecordsResult;
 import com.amazonaws.services.kinesis.model.GetShardIteratorRequest;
 import com.amazonaws.services.kinesis.model.GetShardIteratorResult;
 import com.amazonaws.services.kinesis.model.Record;
+import com.amazonaws.services.kinesis.model.SequenceNumberRange;
 import com.amazonaws.services.kinesis.model.Shard;
 import com.amazonaws.services.kinesis.model.ShardIteratorType;
 import com.amazonaws.services.kinesis.model.StreamDescription;
@@ -67,7 +69,14 @@ public class KinesisConsumerTest {
         KinesisEndpoint endpoint = new KinesisEndpoint(null, "streamName", 
component);
         endpoint.setAmazonKinesisClient(kinesisClient);
         endpoint.setIteratorType(ShardIteratorType.LATEST);
+        endpoint.setShardClosed(KinesisShardClosedStrategyEnum.silent);
         undertest = new KinesisConsumer(endpoint, processor);
+        
+        SequenceNumberRange range = new 
SequenceNumberRange().withEndingSequenceNumber(null);
+        Shard shard = new 
Shard().withShardId("shardId").withSequenceNumberRange(range);
+        ArrayList<Shard> shardList = new ArrayList<>();
+        shardList.add(shard);
+       
 
         when(kinesisClient.getRecords(any(GetRecordsRequest.class)))
             .thenReturn(new GetRecordsResult()
@@ -76,7 +85,7 @@ public class KinesisConsumerTest {
         when(kinesisClient.describeStream(any(DescribeStreamRequest.class)))
             .thenReturn(new DescribeStreamResult()
                 .withStreamDescription(new StreamDescription()
-                    .withShards(new Shard().withShardId("shardId"))
+                    .withShards(shardList)
                 )
             );
         
when(kinesisClient.getShardIterator(any(GetShardIteratorRequest.class)))
@@ -107,8 +116,6 @@ public class KinesisConsumerTest {
 
         undertest.poll();
 
-        verify(kinesisClient, 
never()).describeStream(any(DescribeStreamRequest.class));
-
         final ArgumentCaptor<GetShardIteratorRequest> getShardIteratorReqCap = 
ArgumentCaptor.forClass(GetShardIteratorRequest.class);
 
         
verify(kinesisClient).getShardIterator(getShardIteratorReqCap.capture());

-- 
To stop receiving notification emails like this one, please contact
"commits@camel.apache.org" <commits@camel.apache.org>.

Reply via email to