This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch camel-4.0.x in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-4.0.x by this push: new aa3dd7090b9 CAMEL-19811: restore multi-shard handling. (#11243) aa3dd7090b9 is described below commit aa3dd7090b95c0b289a4698cce229d620b520fba Author: klease <38634989+kle...@users.noreply.github.com> AuthorDate: Wed Aug 30 19:19:41 2023 +0200 CAMEL-19811: restore multi-shard handling. (#11243) Adapt the handling of closed streams. Modify KinesisUtils to be able to create a stream with multiple shards. Modify KinesisConsumerIT to test a stream with 2 shards. --- .../component/aws2/kinesis/Kinesis2Consumer.java | 62 +++++++++++++--------- .../KinesisConsumerClosedShardWithFailTest.java | 9 +++- .../kinesis/integration/KinesisConsumerIT.java | 8 +-- .../test/infra/aws2/clients/KinesisUtils.java | 10 ++-- 4 files changed, 56 insertions(+), 33 deletions(-) diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java index 8e7a8692a35..87178332e51 100644 --- a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java +++ b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java @@ -18,6 +18,7 @@ package org.apache.camel.component.aws2.kinesis; import java.util.ArrayDeque; import java.util.List; +import java.util.Map; import java.util.Queue; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicInteger; @@ -50,7 +51,7 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R private KinesisConnection connection; private ResumeStrategy resumeStrategy; - private String currentShardIterator; + private Map<String, String> currentShardIterators = new java.util.HashMap<>(); public Kinesis2Consumer(Kinesis2Endpoint endpoint, Processor processor) { @@ -167,27 +168,11 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R // getRecords request. That way, on the next poll, we start from where // we left off, however, I don't know what happens to subsequent // exchanges when an earlier exchange fails. + updateShardIterator(shard, result.nextShardIterator()); + } - currentShardIterator = result.nextShardIterator(); - if (currentShardIterator == null) { - // This indicates that the shard is closed and no more data is available - switch (getEndpoint().getConfiguration().getShardClosed()) { - case ignore: - LOG.warn("The shard with id={} on stream {} reached CLOSE status", - shard.shardId(), getEndpoint().getConfiguration().getStreamName()); - break; - case silent: - break; - case fail: - LOG.info("The shard with id={} on stream {} reached CLOSE status", - shard.shardId(), getEndpoint().getConfiguration().getStreamName()); - throw new IllegalStateException( - new ReachedClosedStatusException( - getEndpoint().getConfiguration().getStreamName(), shard.shardId())); - default: - throw new IllegalArgumentException("Unsupported shard closed strategy"); - } - } + private void updateShardIterator(Shard shard, String nextShardIterator) { + currentShardIterators.put(shard.shardId(), nextShardIterator); } @Override @@ -215,8 +200,14 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R throws ExecutionException, InterruptedException { // either return a cached one or get a new one via a GetShardIterator // request. - if (currentShardIterator == null) { - var shardId = shard.shardId(); + + var shardId = shard.shardId(); + + if (currentShardIterators.get(shardId) == null) { + if (currentShardIterators.containsKey(shardId)) { + // There was previously a shardIterator but shard is now closed + handleClosedShard(shardId); + } GetShardIteratorRequest.Builder request = GetShardIteratorRequest.builder() .streamName(getEndpoint().getConfiguration().getStreamName()).shardId(shardId) @@ -244,12 +235,31 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R .getShardIterator(request.build()); } - currentShardIterator = result.shardIterator(); - LOG.debug("Obtained new ShardIterator {} for shard {} on stream {}", currentShardIterator, shardId, + currentShardIterators.put(shardId, result.shardIterator()); + LOG.debug("Obtained new ShardIterator {} for shard {} on stream {}", result.shardIterator(), shardId, getEndpoint().getConfiguration().getStreamName()); } - return currentShardIterator; + return currentShardIterators.get(shardId); + } + + private void handleClosedShard(String shardId) { + switch (getEndpoint().getConfiguration().getShardClosed()) { + case ignore: + LOG.warn("The shard with id={} on stream {} reached CLOSE status", + shardId, getEndpoint().getConfiguration().getStreamName()); + break; + case silent: + break; + case fail: + LOG.info("The shard with id={} on stream {} reached CLOSE status", + shardId, getEndpoint().getConfiguration().getStreamName()); + throw new IllegalStateException( + new ReachedClosedStatusException( + getEndpoint().getConfiguration().getStreamName(), shardId)); + default: + throw new IllegalArgumentException("Unsupported shard closed strategy"); + } } private void resume(GetShardIteratorRequest.Builder req) { diff --git a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithFailTest.java b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithFailTest.java index ea9b3882511..9f5156ca5a7 100644 --- a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithFailTest.java +++ b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithFailTest.java @@ -41,7 +41,9 @@ import software.amazon.awssdk.services.kinesis.model.ShardIteratorType; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.fail; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -92,6 +94,11 @@ public class KinesisConsumerClosedShardWithFailTest { @Test public void itObtainsAShardIteratorOnFirstPoll() { + try { + underTest.poll(); + } catch (Exception e) { + fail("The first call should not throw an exception"); + } assertThrows(IllegalStateException.class, () -> { underTest.poll(); }); @@ -106,7 +113,7 @@ public class KinesisConsumerClosedShardWithFailTest { assertThat(getShardIteratorReqCap.getValue().shardId(), is("shardId")); assertThat(getShardIteratorReqCap.getValue().shardIteratorType(), is(ShardIteratorType.LATEST)); - verify(kinesisClient).listShards(getListShardsCap.capture()); + verify(kinesisClient, times(2)).listShards(getListShardsCap.capture()); assertThat(getListShardsCap.getValue().streamName(), is("streamName")); } } diff --git a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisConsumerIT.java b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisConsumerIT.java index b9614b1129a..46a62640dc6 100644 --- a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisConsumerIT.java +++ b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisConsumerIT.java @@ -45,6 +45,7 @@ import software.amazon.awssdk.services.kinesis.KinesisClient; import static org.apache.camel.test.infra.aws2.clients.KinesisUtils.createStream; import static org.apache.camel.test.infra.aws2.clients.KinesisUtils.putRecords; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.testcontainers.shaded.org.awaitility.Awaitility.await; @@ -107,7 +108,7 @@ public class KinesisConsumerIT extends CamelTestSupport { @BeforeEach public void prepareEnvironment() { - createStream(client, streamName); + createStream(client, streamName, 2); putRecords(client, streamName, messageCount); } @@ -122,7 +123,7 @@ public class KinesisConsumerIT extends CamelTestSupport { .untilAsserted(() -> result.assertIsSatisfied()); assertEquals(messageCount, receivedMessages.size()); - int messageCount = 0; + String partitionKey = null; for (KinesisData data : receivedMessages) { ObjectHelper.notNull(data, "data"); assertNotNull(data.body, "The body should not be null"); @@ -132,7 +133,8 @@ public class KinesisConsumerIT extends CamelTestSupport { and so on. This is just testing that the code is not mixing things up. */ assertTrue(data.partition.endsWith(data.body), "The data/partition mismatch for record: " + data); - assertEquals(messageCount++, Integer.valueOf(data.partition.substring(data.partition.lastIndexOf('-') + 1))); + assertNotEquals(partitionKey, data.partition); + partitionKey = data.partition; } } } diff --git a/test-infra/camel-test-infra-aws-v2/src/test/java/org/apache/camel/test/infra/aws2/clients/KinesisUtils.java b/test-infra/camel-test-infra-aws-v2/src/test/java/org/apache/camel/test/infra/aws2/clients/KinesisUtils.java index bec6c6022fa..32f02076245 100644 --- a/test-infra/camel-test-infra-aws-v2/src/test/java/org/apache/camel/test/infra/aws2/clients/KinesisUtils.java +++ b/test-infra/camel-test-infra-aws-v2/src/test/java/org/apache/camel/test/infra/aws2/clients/KinesisUtils.java @@ -57,10 +57,10 @@ public final class KinesisUtils { } - private static void doCreateStream(KinesisClient kinesisClient, String streamName) { + private static void doCreateStream(KinesisClient kinesisClient, String streamName, int shardCount) { CreateStreamRequest request = CreateStreamRequest.builder() .streamName(streamName) - .shardCount(1) + .shardCount(shardCount) .build(); try { @@ -78,6 +78,10 @@ public final class KinesisUtils { } public static void createStream(KinesisClient kinesisClient, String streamName) { + createStream(kinesisClient, streamName, 1); + } + + public static void createStream(KinesisClient kinesisClient, String streamName, int shardCount) { try { LOG.info("Checking whether the stream exists already"); int status = getStreamStatus(kinesisClient, streamName); @@ -89,7 +93,7 @@ public final class KinesisUtils { LOG.info("The stream does not exist, auto creating it: {}", e.getMessage()); } - doCreateStream(kinesisClient, streamName); + doCreateStream(kinesisClient, streamName, shardCount); TestUtils.waitFor(() -> { try { GetRecordsRequest getRecordsRequest = KinesisUtils.getGetRecordsRequest(kinesisClient, streamName);