This is an automated email from the ASF dual-hosted git repository. klease pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push: new 4e87f51c4a1 CAMEL-19789: Fix handling of shardIterator so that all records are returned. (#11206) 4e87f51c4a1 is described below commit 4e87f51c4a1c93455c4062b6f65c34730c65d433 Author: klease <38634989+kle...@users.noreply.github.com> AuthorDate: Fri Aug 25 17:32:04 2023 +0200 CAMEL-19789: Fix handling of shardIterator so that all records are returned. (#11206) According to the AWS Kinesis client documentation, "use a GetShardIterator request to get the first shard iterator for use in your first GetRecords request and for subsequent reads use the shard iterator returned by the GetRecords request in NextShardIterator". Adapt the unit tests for this case and modify the KinesisConsumerIT test to verify that different records are returned. --- .../component/aws2/kinesis/Kinesis2Consumer.java | 73 ++++++++++++---------- .../KinesisConsumerClosedShardWithFailTest.java | 2 +- .../KinesisConsumerClosedShardWithSilentTest.java | 8 +-- .../kinesis/integration/KinesisConsumerIT.java | 2 + 4 files changed, 48 insertions(+), 37 deletions(-) diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java index f70551aa53f..8e7a8692a35 100644 --- a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java +++ b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java @@ -48,9 +48,10 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R private static final Logger LOG = LoggerFactory.getLogger(Kinesis2Consumer.class); private KinesisConnection connection; - private boolean isShardClosed; private ResumeStrategy resumeStrategy; + private String currentShardIterator; + public Kinesis2Consumer(Kinesis2Endpoint endpoint, Processor processor) { super(endpoint, processor); @@ -126,8 +127,9 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R } if (shardIterator == null) { - // probably closed. Returning 0 as nothing was processed + // Unable to get an interator so shard must be closed processedExchangeCount.set(0); + return; } GetRecordsRequest req = GetRecordsRequest @@ -166,18 +168,19 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R // we left off, however, I don't know what happens to subsequent // exchanges when an earlier exchange fails. - var currentShardIterator = result.nextShardIterator(); - if (isShardClosed) { + currentShardIterator = result.nextShardIterator(); + if (currentShardIterator == null) { + // This indicates that the shard is closed and no more data is available switch (getEndpoint().getConfiguration().getShardClosed()) { case ignore: - LOG.warn("The shard {} is in closed state", currentShardIterator); + LOG.warn("The shard with id={} on stream {} reached CLOSE status", + shard.shardId(), getEndpoint().getConfiguration().getStreamName()); break; case silent: break; case fail: - LOG.info("Shard Iterator reaches CLOSE status:{} {}", - getEndpoint().getConfiguration().getStreamName(), - getEndpoint().getConfiguration().getShardId()); + LOG.info("The shard with id={} on stream {} reached CLOSE status", + shard.shardId(), getEndpoint().getConfiguration().getStreamName()); throw new IllegalStateException( new ReachedClosedStatusException( getEndpoint().getConfiguration().getStreamName(), shard.shardId())); @@ -210,38 +213,43 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R final Shard shard, final KinesisConnection kinesisConnection) throws ExecutionException, InterruptedException { + // either return a cached one or get a new one via a GetShardIterator + // request. + if (currentShardIterator == null) { + var shardId = shard.shardId(); - var shardId = shard.shardId(); - isShardClosed = shard.sequenceNumberRange().endingSequenceNumber() != null; - LOG.debug("ShardId is: {}", shardId); - - GetShardIteratorRequest.Builder request = GetShardIteratorRequest.builder() - .streamName(getEndpoint().getConfiguration().getStreamName()).shardId(shardId) - .shardIteratorType(getEndpoint().getConfiguration().getIteratorType()); + GetShardIteratorRequest.Builder request = GetShardIteratorRequest.builder() + .streamName(getEndpoint().getConfiguration().getStreamName()).shardId(shardId) + .shardIteratorType(getEndpoint().getConfiguration().getIteratorType()); - if (hasSequenceNumber()) { - request.startingSequenceNumber(getEndpoint().getConfiguration().getSequenceNumber()); - } + if (hasSequenceNumber()) { + request.startingSequenceNumber(getEndpoint().getConfiguration().getSequenceNumber()); + } - resume(request); + resume(request); - GetShardIteratorResponse result; - if (getEndpoint().getConfiguration().isAsyncClient()) { - try { + GetShardIteratorResponse result; + if (getEndpoint().getConfiguration().isAsyncClient()) { + try { + result = kinesisConnection + .getAsyncClient(getEndpoint()) + .getShardIterator(request.build()) + .get(); + } catch (ExecutionException | InterruptedException e) { + throw new RuntimeException(e); + } + } else { result = kinesisConnection - .getAsyncClient(getEndpoint()) - .getShardIterator(request.build()) - .get(); - } catch (ExecutionException | InterruptedException e) { - throw new RuntimeException(e); + .getClient(getEndpoint()) + .getShardIterator(request.build()); } - } else { - result = kinesisConnection - .getClient(getEndpoint()) - .getShardIterator(request.build()); + + currentShardIterator = result.shardIterator(); + LOG.debug("Obtained new ShardIterator {} for shard {} on stream {}", currentShardIterator, shardId, + getEndpoint().getConfiguration().getStreamName()); } - return result.shardIterator(); + return currentShardIterator; } private void resume(GetShardIteratorRequest.Builder req) { @@ -270,6 +278,7 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R } protected Exchange createExchange(Record dataRecord) { + LOG.debug("Received Kinesis record with partition_key={}", dataRecord.partitionKey()); Exchange exchange = createExchange(true); exchange.getIn().setBody(dataRecord.data().asInputStream()); exchange.getIn().setHeader(Kinesis2Constants.APPROX_ARRIVAL_TIME, dataRecord.approximateArrivalTimestamp()); diff --git a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithFailTest.java b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithFailTest.java index 495942d298e..ea9b3882511 100644 --- a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithFailTest.java +++ b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithFailTest.java @@ -81,7 +81,7 @@ public class KinesisConsumerClosedShardWithFailTest { when(kinesisClient .getRecords(any(GetRecordsRequest.class))) - .thenReturn(GetRecordsResponse.builder().nextShardIterator("nextShardIterator").build()); + .thenReturn(GetRecordsResponse.builder().nextShardIterator(null).build()); when(kinesisClient .getShardIterator(any(GetShardIteratorRequest.class))) .thenReturn(GetShardIteratorResponse.builder().shardIterator("shardIterator").build()); diff --git a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithSilentTest.java b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithSilentTest.java index d14e5ebb332..034af13ed1b 100644 --- a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithSilentTest.java +++ b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithSilentTest.java @@ -89,7 +89,7 @@ public class KinesisConsumerClosedShardWithSilentTest { when(kinesisClient .getRecords(any(GetRecordsRequest.class))).thenReturn(GetRecordsResponse.builder() - .nextShardIterator("shardIterator") + .nextShardIterator("nextShardIterator") .records( Record.builder().sequenceNumber("1") .data(SdkBytes.fromString("Hello", Charset.defaultCharset())) @@ -185,10 +185,10 @@ public class KinesisConsumerClosedShardWithSilentTest { underTest.poll(); final ArgumentCaptor<GetRecordsRequest> getRecordsReqCap = ArgumentCaptor.forClass(GetRecordsRequest.class); - - verify(kinesisClient, times(2)).getShardIterator(any(GetShardIteratorRequest.class)); + // On second call it uses the one returned from the first call + verify(kinesisClient, times(1)).getShardIterator(any(GetShardIteratorRequest.class)); verify(kinesisClient, times(2)).getRecords(getRecordsReqCap.capture()); assertThat(getRecordsReqCap.getAllValues().get(0).shardIterator(), is("shardIterator")); - assertThat(getRecordsReqCap.getAllValues().get(1).shardIterator(), is("shardIterator")); + assertThat(getRecordsReqCap.getAllValues().get(1).shardIterator(), is("nextShardIterator")); } } diff --git a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisConsumerIT.java b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisConsumerIT.java index 754862b44c1..b9614b1129a 100644 --- a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisConsumerIT.java +++ b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisConsumerIT.java @@ -122,6 +122,7 @@ public class KinesisConsumerIT extends CamelTestSupport { .untilAsserted(() -> result.assertIsSatisfied()); assertEquals(messageCount, receivedMessages.size()); + int messageCount = 0; for (KinesisData data : receivedMessages) { ObjectHelper.notNull(data, "data"); assertNotNull(data.body, "The body should not be null"); @@ -131,6 +132,7 @@ public class KinesisConsumerIT extends CamelTestSupport { and so on. This is just testing that the code is not mixing things up. */ assertTrue(data.partition.endsWith(data.body), "The data/partition mismatch for record: " + data); + assertEquals(messageCount++, Integer.valueOf(data.partition.substring(data.partition.lastIndexOf('-') + 1))); } } }