This is an automated email from the ASF dual-hosted git repository.
acosentino pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
The following commit(s) were added to refs/heads/main by this push:
new 0f8d4e9 Fix AWS2 Kinesis Sink issues
0f8d4e9 is described below
commit 0f8d4e9dac91e382e026072b3283372a2e23e378
Author: Otavio Rodolfo Piske <[email protected]>
AuthorDate: Fri May 14 13:44:53 2021 +0200
Fix AWS2 Kinesis Sink issues
AWS2 Kinesis Sink test was failing because the shard was not available
at the time the check was occurring
---
.../aws/v2/kinesis/common/KinesisUtils.java | 24 +++++++++++++++++-----
1 file changed, 19 insertions(+), 5 deletions(-)
diff --git
a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/common/KinesisUtils.java
b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/common/KinesisUtils.java
index 2e5a7d1..fb1d64a 100644
---
a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/common/KinesisUtils.java
+++
b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/common/KinesisUtils.java
@@ -21,6 +21,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
+import org.apache.camel.kafkaconnector.common.utils.TestUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.awscore.exception.AwsServiceException;
@@ -191,18 +192,31 @@ public final class KinesisUtils {
} while (retries > 0);
}
- public static GetRecordsRequest getGetRecordsRequest(KinesisClient
kinesisClient, String streamName) {
- DescribeStreamRequest describeStreamRequest =
DescribeStreamRequest.builder()
- .streamName(streamName)
- .build();
- List<Shard> shards = new ArrayList<>();
+ private static boolean hasShards(KinesisClient kinesisClient,
DescribeStreamRequest describeStreamRequest) {
+ DescribeStreamResponse streamRes =
kinesisClient.describeStream(describeStreamRequest);
+
+ return streamRes.streamDescription().shards().isEmpty();
+ }
+ private static List<Shard> getAllShards(KinesisClient kinesisClient,
DescribeStreamRequest describeStreamRequest) {
+ List<Shard> shards = new ArrayList<>();
DescribeStreamResponse streamRes;
do {
streamRes = kinesisClient.describeStream(describeStreamRequest);
+
shards.addAll(streamRes.streamDescription().shards());
} while (streamRes.streamDescription().hasMoreShards());
+ return shards;
+ }
+
+ public static GetRecordsRequest getGetRecordsRequest(KinesisClient
kinesisClient, String streamName) {
+ DescribeStreamRequest describeStreamRequest =
DescribeStreamRequest.builder()
+ .streamName(streamName)
+ .build();
+
+ TestUtils.waitFor(() -> hasShards(kinesisClient,
describeStreamRequest));
+ List<Shard> shards = getAllShards(kinesisClient,
describeStreamRequest);
GetShardIteratorRequest iteratorRequest =
GetShardIteratorRequest.builder()
.streamName(streamName)