This is an automated email from the ASF dual-hosted git repository. nehapawar pushed a commit to branch sharded_consumer_type_support_with_kinesis in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/sharded_consumer_type_support_with_kinesis by this push: new 5ff70d2 Add unit tests in Kinesis consumer (#6410) 5ff70d2 is described below commit 5ff70d2b21e2453c7a48464177248ea2b19a4499 Author: Kartik Khare <kharekar...@gmail.com> AuthorDate: Mon Feb 1 22:44:37 2021 +0530 Add unit tests in Kinesis consumer (#6410) * Bug fixes: Handle connection broken exception * Add unit tests for partition group consumer * Add tests for child shards * Add unit test for kafka consumer * Refactor: remove unused imports, expand * imports and rename classes * Fix: enforcer errors * Remove powermock dependency * Fix jackson version conflict. Shade jackson dependency * Remove powermock --- .../pinot-stream-ingestion/pinot-kinesis/pom.xml | 79 ++++++++- .../plugin/stream/kinesis/KinesisCheckpoint.java | 7 +- .../pinot/plugin/stream/kinesis/KinesisConfig.java | 4 + .../stream/kinesis/KinesisConnectionHandler.java | 6 + .../plugin/stream/kinesis/KinesisConsumer.java | 11 ++ .../kinesis/KinesisStreamMetadataProvider.java | 10 +- ...st.java => KinesisConsumerIntegrationTest.java} | 3 +- .../plugin/stream/kinesis/KinesisConsumerTest.java | 187 ++++++++++++++++----- .../kinesis/KinesisStreamMetadataProviderTest.java | 156 +++++++++++++++++ .../pinot/plugin/stream/kinesis/TestUtils.java | 55 ++++++ pom.xml | 2 +- 11 files changed, 468 insertions(+), 52 deletions(-) diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/pom.xml b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/pom.xml index 38d4f73..b636b9f 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/pom.xml +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/pom.xml @@ -36,9 +36,25 @@ <properties> <pinot.root>${basedir}/../../..</pinot.root> <phase.prop>package</phase.prop> - <aws.version>2.15.50</aws.version> + <aws.version>2.14.28</aws.version> + <jackson.version>2.10.4</jackson.version> + <netty.version>4.1.42.Final</netty.version> + <easymock.version>4.2</easymock.version> + <reactive.version>1.0.2</reactive.version> </properties> + <dependencyManagement> + <dependencies> + <dependency> + <groupId>software.amazon.awssdk</groupId> + <artifactId>bom</artifactId> + <version>${aws.version}</version> + <type>pom</type> + <scope>import</scope> + </dependency> + </dependencies> + </dependencyManagement> + <dependencies> <dependency> <groupId>software.amazon.awssdk</groupId> @@ -75,38 +91,87 @@ <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-core</artifactId> - <version>2.12.0</version> + <version>${jackson.version}</version> </dependency> <dependency> <groupId>org.reactivestreams</groupId> <artifactId>reactive-streams</artifactId> - <version>1.0.2</version> + <version>${reactive.version}</version> </dependency> <dependency> <groupId>io.netty</groupId> <artifactId>netty-codec</artifactId> - <version>4.1.42.Final</version> + <version>${netty.version}</version> </dependency> <dependency> <groupId>io.netty</groupId> <artifactId>netty-buffer</artifactId> - <version>4.1.42.Final</version> + <version>${netty.version}</version> </dependency> <dependency> <groupId>io.netty</groupId> <artifactId>netty-transport</artifactId> - <version>4.1.42.Final</version> + <version>${netty.version}</version> </dependency> <dependency> <groupId>io.netty</groupId> <artifactId>netty-common</artifactId> - <version>4.1.42.Final</version> + <version>${netty.version}</version> + </dependency> + + <dependency> + <groupId>org.easymock</groupId> + <artifactId>easymock</artifactId> + <version>${easymock.version}</version> + <scope>test</scope> </dependency> + </dependencies> + <profiles> + <profile> + <id>build-shaded-jar</id> + <activation> + <activeByDefault>true</activeByDefault> + </activation> + <build> + <plugins> + <plugin> + <artifactId>maven-shade-plugin</artifactId> + <version>3.2.1</version> + <executions> + <execution> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + <configuration> + <relocations> + <relocation> + <pattern>com.fasterxml.jackson</pattern> + <shadedPattern>shaded.kinesis.com.fasterxml.jackson</shadedPattern> + </relocation> + <relocation> + <pattern>software.amazon</pattern> + <shadedPattern>shaded.kinesis.software.amazon</shadedPattern> + </relocation> + </relocations> + <transformers> + <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> + <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"/> + </transformers> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + </profile> + </profiles> + </project> diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java index e1f8b05..57904ac 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java @@ -20,6 +20,7 @@ package org.apache.pinot.plugin.stream.kinesis; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; import java.io.IOException; import java.util.Map; import org.apache.pinot.spi.stream.Checkpoint; @@ -33,6 +34,7 @@ import org.apache.pinot.spi.utils.JsonUtils; */ public class KinesisCheckpoint implements StreamPartitionMsgOffset { private final Map<String, String> _shardToStartSequenceMap; + public static final ObjectMapper objectMapper = new ObjectMapper(); public KinesisCheckpoint(Map<String, String> shardToStartSequenceMap) { _shardToStartSequenceMap = shardToStartSequenceMap; @@ -40,8 +42,7 @@ public class KinesisCheckpoint implements StreamPartitionMsgOffset { public KinesisCheckpoint(String checkpointStr) throws IOException { - _shardToStartSequenceMap = JsonUtils.stringToObject(checkpointStr, new TypeReference<Map<String, String>>() { - }); + _shardToStartSequenceMap = objectMapper.readValue(checkpointStr, new TypeReference<Map<String, String>>(){}); } public Map<String, String> getShardToStartSequenceMap() { @@ -51,7 +52,7 @@ public class KinesisCheckpoint implements StreamPartitionMsgOffset { @Override public String serialize() { try { - return JsonUtils.objectToString(_shardToStartSequenceMap); + return objectMapper.writeValueAsString(_shardToStartSequenceMap); } catch (JsonProcessingException e) { throw new IllegalStateException(); } diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java index fbe369f..6e46498 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java @@ -59,4 +59,8 @@ public class KinesisConfig { public ShardIteratorType getShardIteratorType() { return ShardIteratorType.fromValue(_props.getOrDefault(SHARD_ITERATOR_TYPE, DEFAULT_SHARD_ITERATOR_TYPE)); } + + public void setMaxRecordsToFetch(int maxRecordsToFetch){ + _props.put(MAX_RECORDS_TO_FETCH, String.valueOf(maxRecordsToFetch)); + } } diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java index 61d065e..0686742 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java @@ -41,6 +41,12 @@ public class KinesisConnectionHandler { createConnection(); } + public KinesisConnectionHandler(String stream, String awsRegion, KinesisClient kinesisClient) { + _stream = stream; + _awsRegion = awsRegion; + _kinesisClient = kinesisClient; + } + /** * Lists all shards of the stream */ diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java index 9c56f95..8ad27b4 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java @@ -31,6 +31,7 @@ import org.apache.pinot.spi.stream.PartitionGroupConsumer; import org.apache.pinot.spi.stream.PartitionGroupMetadata; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import software.amazon.awssdk.services.kinesis.KinesisClient; import software.amazon.awssdk.services.kinesis.model.ExpiredIteratorException; import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest; import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse; @@ -61,6 +62,15 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Partiti _executorService = Executors.newSingleThreadExecutor(); } + public KinesisConsumer(KinesisConfig kinesisConfig, KinesisClient kinesisClient) { + super(kinesisConfig.getStream(), kinesisConfig.getAwsRegion(), kinesisClient); + _kinesisClient = kinesisClient; + _stream = kinesisConfig.getStream(); + _maxRecords = kinesisConfig.maxRecordsToFetch(); + _shardIteratorType = kinesisConfig.getShardIteratorType(); + _executorService = Executors.newSingleThreadExecutor(); + } + /** * Fetch records from the Kinesis stream between the start and end KinesisCheckpoint */ @@ -175,6 +185,7 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Partiti if (sequenceNumber != null && _shardIteratorType.toString().contains("SEQUENCE")) { requestBuilder = requestBuilder.startingSequenceNumber(sequenceNumber); } + return _kinesisClient.getShardIterator(requestBuilder.build()).shardIterator(); } diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProvider.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProvider.java index 42150a3..7af1df1 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProvider.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProvider.java @@ -59,6 +59,14 @@ public class KinesisStreamMetadataProvider implements StreamMetadataProvider { _fetchTimeoutMs = streamConfig.getFetchTimeoutMillis(); } + public KinesisStreamMetadataProvider(String clientId, StreamConfig streamConfig, KinesisConnectionHandler kinesisConnectionHandler, StreamConsumerFactory streamConsumerFactory) { + KinesisConfig kinesisConfig = new KinesisConfig(streamConfig); + _kinesisConnectionHandler = kinesisConnectionHandler; + _kinesisStreamConsumerFactory = streamConsumerFactory; + _clientId = clientId; + _fetchTimeoutMs = streamConfig.getFetchTimeoutMillis(); + } + @Override public int fetchPartitionCount(long timeoutMillis) { throw new UnsupportedOperationException(); @@ -83,7 +91,7 @@ public class KinesisStreamMetadataProvider implements StreamMetadataProvider { List<PartitionGroupInfo> newPartitionGroupInfos = new ArrayList<>(); Map<String, Shard> shardIdToShardMap = - _kinesisConnectionHandler.getShards().stream().collect(Collectors.toMap(Shard::shardId, s -> s)); + _kinesisConnectionHandler.getShards().stream().collect(Collectors.toMap(Shard::shardId, s -> s, (s1, s2) -> s1)); Set<String> shardsInCurrent = new HashSet<>(); Set<String> shardsEnded = new HashSet<>(); diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerTest.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerIntegrationTest.java similarity index 96% copy from pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerTest.java copy to pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerIntegrationTest.java index f9ed779..1e832fa 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerTest.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerIntegrationTest.java @@ -22,12 +22,11 @@ import java.io.IOException; import java.util.HashMap; import java.util.List; import java.util.Map; -import org.apache.pinot.spi.stream.PartitionGroupMetadata; import software.amazon.awssdk.services.kinesis.model.Shard; import software.amazon.awssdk.services.kinesis.model.ShardIteratorType; -public class KinesisConsumerTest { +public class KinesisConsumerIntegrationTest { private static final String STREAM_NAME = "kinesis-test"; private static final String AWS_REGION = "us-west-2"; diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerTest.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerTest.java index f9ed779..384c512 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerTest.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerTest.java @@ -18,51 +18,162 @@ */ package org.apache.pinot.plugin.stream.kinesis; -import java.io.IOException; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import org.apache.pinot.spi.stream.PartitionGroupMetadata; -import software.amazon.awssdk.services.kinesis.model.Shard; -import software.amazon.awssdk.services.kinesis.model.ShardIteratorType; +import org.apache.pinot.spi.stream.StreamConsumerFactory; +import org.easymock.Capture; +import org.testng.Assert; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; +import software.amazon.awssdk.core.SdkBytes; +import software.amazon.awssdk.services.kinesis.KinesisClient; +import software.amazon.awssdk.services.kinesis.model.ChildShard; +import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest; +import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse; +import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest; +import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse; +import software.amazon.awssdk.services.kinesis.model.Record; + +import static org.easymock.EasyMock.capture; +import static org.easymock.EasyMock.createMock; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.replay; public class KinesisConsumerTest { + public static final int TIMEOUT = 1000; + public static final int NUM_RECORDS = 10; + public static final String DUMMY_RECORD_PREFIX = "DUMMY_RECORD-"; + public static final String PARTITION_KEY_PREFIX = "PARTITION_KEY-"; + public static final String PLACEHOLDER = "DUMMY"; + + private static KinesisConnectionHandler kinesisConnectionHandler; + private static StreamConsumerFactory streamConsumerFactory; + private static KinesisClient kinesisClient; + private List<Record> recordList; + + @BeforeMethod + public void setupTest() { + kinesisConnectionHandler = createMock(KinesisConnectionHandler.class); + kinesisClient = createMock(KinesisClient.class); + streamConsumerFactory = createMock(StreamConsumerFactory.class); + + recordList = new ArrayList<>(); + + for (int i = 0; i < NUM_RECORDS; i++) { + Record record = + Record.builder().data(SdkBytes.fromUtf8String(DUMMY_RECORD_PREFIX + i)).partitionKey(PARTITION_KEY_PREFIX + i) + .sequenceNumber(String.valueOf(i + 1)).build(); + recordList.add(record); + } + } + + @Test + public void testBasicConsumer() { + Capture<GetRecordsRequest> getRecordsRequestCapture = Capture.newInstance(); + Capture<GetShardIteratorRequest> getShardIteratorRequestCapture = Capture.newInstance(); + + GetRecordsResponse getRecordsResponse = + GetRecordsResponse.builder().nextShardIterator(null).records(recordList).build(); + GetShardIteratorResponse getShardIteratorResponse = + GetShardIteratorResponse.builder().shardIterator(PLACEHOLDER).build(); + + expect(kinesisClient.getRecords(capture(getRecordsRequestCapture))).andReturn(getRecordsResponse).anyTimes(); + expect(kinesisClient.getShardIterator(capture(getShardIteratorRequestCapture))).andReturn(getShardIteratorResponse) + .anyTimes(); - private static final String STREAM_NAME = "kinesis-test"; - private static final String AWS_REGION = "us-west-2"; - - public static void main(String[] args) - throws IOException { - Map<String, String> props = new HashMap<>(); - props.put(KinesisConfig.STREAM, STREAM_NAME); - props.put(KinesisConfig.AWS_REGION, AWS_REGION); - props.put(KinesisConfig.MAX_RECORDS_TO_FETCH, "10"); - props.put(KinesisConfig.SHARD_ITERATOR_TYPE, ShardIteratorType.AT_SEQUENCE_NUMBER.toString()); - KinesisConfig kinesisConfig = new KinesisConfig(props); - KinesisConnectionHandler kinesisConnectionHandler = new KinesisConnectionHandler(STREAM_NAME, AWS_REGION); - List<Shard> shardList = kinesisConnectionHandler.getShards(); - for (Shard shard : shardList) { - System.out.println("SHARD: " + shard.shardId()); - - KinesisConsumer kinesisConsumer = new KinesisConsumer(kinesisConfig); - System.out.println( - "Kinesis Checkpoint Range: < " + shard.sequenceNumberRange().startingSequenceNumber() + ", " + shard - .sequenceNumberRange().endingSequenceNumber() + " >"); - Map<String, String> shardIdToSeqNumMap = new HashMap<>(); - shardIdToSeqNumMap.put(shard.shardId(), shard.sequenceNumberRange().startingSequenceNumber()); - KinesisCheckpoint kinesisCheckpoint = new KinesisCheckpoint(shardIdToSeqNumMap); - KinesisRecordsBatch kinesisRecordsBatch = kinesisConsumer.fetchMessages(kinesisCheckpoint, null, 60 * 1000); - int n = kinesisRecordsBatch.getMessageCount(); - - System.out.println("Found " + n + " messages "); - for (int i = 0; i < n; i++) { - System.out.println( - "SEQ-NO: " + kinesisRecordsBatch.getMessageOffsetAtIndex(i) + ", DATA: " + kinesisRecordsBatch - .getMessageAtIndex(i)); - } - kinesisConsumer.close(); + replay(kinesisClient); + + KinesisConsumer kinesisConsumer = new KinesisConsumer(TestUtils.getKinesisConfig(), kinesisClient); + + Map<String, String> shardToSequenceMap = new HashMap<>(); + shardToSequenceMap.put("0", "1"); + KinesisCheckpoint kinesisCheckpoint = new KinesisCheckpoint(shardToSequenceMap); + KinesisRecordsBatch kinesisRecordsBatch = kinesisConsumer.fetchMessages(kinesisCheckpoint, null, TIMEOUT); + + Assert.assertEquals(kinesisRecordsBatch.getMessageCount(), NUM_RECORDS); + + for (int i = 0; i < NUM_RECORDS; i++) { + Assert.assertEquals(baToString(kinesisRecordsBatch.getMessageAtIndex(i)), DUMMY_RECORD_PREFIX + i); } - kinesisConnectionHandler.close(); + + Assert.assertFalse(kinesisRecordsBatch.isEndOfPartitionGroup()); + } + + @Test + public void testBasicConsumerWithMaxRecordsLimit() { + int maxRecordsLimit = 20; + Capture<GetRecordsRequest> getRecordsRequestCapture = Capture.newInstance(); + Capture<GetShardIteratorRequest> getShardIteratorRequestCapture = Capture.newInstance(); + + GetRecordsResponse getRecordsResponse = + GetRecordsResponse.builder().nextShardIterator(PLACEHOLDER).records(recordList).build(); + GetShardIteratorResponse getShardIteratorResponse = + GetShardIteratorResponse.builder().shardIterator(PLACEHOLDER).build(); + + expect(kinesisClient.getRecords(capture(getRecordsRequestCapture))).andReturn(getRecordsResponse).anyTimes(); + expect(kinesisClient.getShardIterator(capture(getShardIteratorRequestCapture))).andReturn(getShardIteratorResponse) + .anyTimes(); + + replay(kinesisClient); + + KinesisConfig kinesisConfig = TestUtils.getKinesisConfig(); + kinesisConfig.setMaxRecordsToFetch(maxRecordsLimit); + KinesisConsumer kinesisConsumer = new KinesisConsumer(kinesisConfig, kinesisClient); + + Map<String, String> shardToSequenceMap = new HashMap<>(); + shardToSequenceMap.put("0", "1"); + KinesisCheckpoint kinesisCheckpoint = new KinesisCheckpoint(shardToSequenceMap); + KinesisRecordsBatch kinesisRecordsBatch = kinesisConsumer.fetchMessages(kinesisCheckpoint, null, TIMEOUT); + + Assert.assertEquals(kinesisRecordsBatch.getMessageCount(), maxRecordsLimit); + + for (int i = 0; i < NUM_RECORDS; i++) { + Assert.assertEquals(baToString(kinesisRecordsBatch.getMessageAtIndex(i)), DUMMY_RECORD_PREFIX + i); + } + } + + @Test + public void testBasicConsumerWithChildShard() { + int maxRecordsLimit = 20; + + List<ChildShard> shardList = new ArrayList<>(); + shardList.add(ChildShard.builder().shardId(PLACEHOLDER).parentShards("0").build()); + + Capture<GetRecordsRequest> getRecordsRequestCapture = Capture.newInstance(); + Capture<GetShardIteratorRequest> getShardIteratorRequestCapture = Capture.newInstance(); + + GetRecordsResponse getRecordsResponse = + GetRecordsResponse.builder().nextShardIterator(null).records(recordList).childShards(shardList).build(); + GetShardIteratorResponse getShardIteratorResponse = + GetShardIteratorResponse.builder().shardIterator(PLACEHOLDER).build(); + + expect(kinesisClient.getRecords(capture(getRecordsRequestCapture))).andReturn(getRecordsResponse).anyTimes(); + expect(kinesisClient.getShardIterator(capture(getShardIteratorRequestCapture))).andReturn(getShardIteratorResponse) + .anyTimes(); + + replay(kinesisClient); + + KinesisConfig kinesisConfig = TestUtils.getKinesisConfig(); + kinesisConfig.setMaxRecordsToFetch(maxRecordsLimit); + KinesisConsumer kinesisConsumer = new KinesisConsumer(kinesisConfig, kinesisClient); + + Map<String, String> shardToSequenceMap = new HashMap<>(); + shardToSequenceMap.put("0", "1"); + KinesisCheckpoint kinesisCheckpoint = new KinesisCheckpoint(shardToSequenceMap); + KinesisRecordsBatch kinesisRecordsBatch = kinesisConsumer.fetchMessages(kinesisCheckpoint, null, TIMEOUT); + + Assert.assertTrue(kinesisRecordsBatch.isEndOfPartitionGroup()); + Assert.assertEquals(kinesisRecordsBatch.getMessageCount(), NUM_RECORDS); + + for (int i = 0; i < NUM_RECORDS; i++) { + Assert.assertEquals(baToString(kinesisRecordsBatch.getMessageAtIndex(i)), DUMMY_RECORD_PREFIX + i); + } + } + + public String baToString(byte[] bytes) { + return SdkBytes.fromByteArray(bytes).asUtf8String(); } } diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProviderTest.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProviderTest.java new file mode 100644 index 0000000..4845e57 --- /dev/null +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProviderTest.java @@ -0,0 +1,156 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.plugin.stream.kinesis; + +import com.google.common.collect.ImmutableList; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.pinot.spi.stream.Checkpoint; +import org.apache.pinot.spi.stream.PartitionGroupConsumer; +import org.apache.pinot.spi.stream.PartitionGroupInfo; +import org.apache.pinot.spi.stream.PartitionGroupMetadata; +import org.apache.pinot.spi.stream.StreamConsumerFactory; +import org.easymock.Capture; +import org.easymock.CaptureType; +import org.testng.Assert; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; +import software.amazon.awssdk.services.kinesis.model.SequenceNumberRange; +import software.amazon.awssdk.services.kinesis.model.Shard; + +import static org.easymock.EasyMock.capture; +import static org.easymock.EasyMock.createMock; +import static org.easymock.EasyMock.createNiceMock; +import static org.easymock.EasyMock.newCapture; +import static org.easymock.EasyMock.captureInt; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.replay; + +public class KinesisStreamMetadataProviderTest { + private static final String SHARD_ID_0 = "0"; + private static final String SHARD_ID_1 = "1"; + public static final String CLIENT_ID = "dummy"; + public static final int TIMEOUT = 1000; + + private static KinesisConnectionHandler kinesisConnectionHandler; + private KinesisStreamMetadataProvider kinesisStreamMetadataProvider; + private static StreamConsumerFactory streamConsumerFactory; + private static PartitionGroupConsumer partitionGroupConsumer; + + @BeforeMethod + public void setupTest() { + kinesisConnectionHandler = createMock(KinesisConnectionHandler.class); + streamConsumerFactory = createMock(StreamConsumerFactory.class); + partitionGroupConsumer = createNiceMock(PartitionGroupConsumer.class); + kinesisStreamMetadataProvider = + new KinesisStreamMetadataProvider(CLIENT_ID, TestUtils.getStreamConfig(), kinesisConnectionHandler, + streamConsumerFactory); + } + + @Test + public void getPartitionsGroupInfoListTest() + throws Exception { + Shard shard0 = Shard.builder().shardId(SHARD_ID_0).sequenceNumberRange(SequenceNumberRange.builder().startingSequenceNumber("1").build()).build(); + Shard shard1 = Shard.builder().shardId(SHARD_ID_1).sequenceNumberRange(SequenceNumberRange.builder().startingSequenceNumber("1").build()).build(); + + expect(kinesisConnectionHandler.getShards()).andReturn(ImmutableList.of(shard0, shard1)).anyTimes(); + replay(kinesisConnectionHandler); + + List<PartitionGroupInfo> result = kinesisStreamMetadataProvider + .getPartitionGroupInfoList(CLIENT_ID, TestUtils.getStreamConfig(), new ArrayList<>(), TIMEOUT); + + + Assert.assertEquals(result.size(), 2); + Assert.assertEquals(result.get(0).getPartitionGroupId(), 0); + Assert.assertEquals(result.get(1).getPartitionGroupId(), 1); + } + + @Test + public void getPartitionsGroupInfoEndOfShardTest() + throws Exception { + List<PartitionGroupMetadata> currentPartitionGroupMeta = new ArrayList<>(); + + Map<String, String> shardToSequenceMap = new HashMap<>(); + shardToSequenceMap.put("0", "1"); + KinesisCheckpoint kinesisCheckpoint = new KinesisCheckpoint(shardToSequenceMap); + + currentPartitionGroupMeta.add(new PartitionGroupMetadata(0, 1, kinesisCheckpoint, kinesisCheckpoint, "CONSUMING")); + + Capture<Checkpoint> checkpointArgs = newCapture(CaptureType.ALL); + Capture<PartitionGroupMetadata> partitionGroupMetadataCapture = newCapture(CaptureType.ALL); + Capture<Integer> intArguments = newCapture(CaptureType.ALL); + Capture<String> stringCapture = newCapture(CaptureType.ALL); + + Shard shard0 = Shard.builder().shardId(SHARD_ID_0).sequenceNumberRange(SequenceNumberRange.builder().startingSequenceNumber("1").endingSequenceNumber("1").build()).build(); + Shard shard1 = Shard.builder().shardId(SHARD_ID_1).sequenceNumberRange(SequenceNumberRange.builder().startingSequenceNumber("1").build()).build(); + expect(kinesisConnectionHandler.getShards()).andReturn(ImmutableList.of(shard0, shard1)).anyTimes(); + expect(streamConsumerFactory + .createPartitionGroupConsumer(capture(stringCapture), capture(partitionGroupMetadataCapture))) + .andReturn(partitionGroupConsumer).anyTimes(); + expect(partitionGroupConsumer + .fetchMessages(capture(checkpointArgs), capture(checkpointArgs), captureInt(intArguments))) + .andReturn(new KinesisRecordsBatch(new ArrayList<>(), "0", true)).anyTimes(); + + replay(kinesisConnectionHandler, streamConsumerFactory, partitionGroupConsumer); + + List<PartitionGroupInfo> result = kinesisStreamMetadataProvider + .getPartitionGroupInfoList(CLIENT_ID, TestUtils.getStreamConfig(), currentPartitionGroupMeta, TIMEOUT); + + Assert.assertEquals(result.size(), 1); + Assert.assertEquals(result.get(0).getPartitionGroupId(), 1); + } + + @Test + public void getPartitionsGroupInfoChildShardsest() + throws Exception { + List<PartitionGroupMetadata> currentPartitionGroupMeta = new ArrayList<>(); + + Map<String, String> shardToSequenceMap = new HashMap<>(); + shardToSequenceMap.put("1", "1"); + KinesisCheckpoint kinesisCheckpoint = new KinesisCheckpoint(shardToSequenceMap); + + currentPartitionGroupMeta.add(new PartitionGroupMetadata(0, 1, kinesisCheckpoint, kinesisCheckpoint, "CONSUMING")); + + Capture<Checkpoint> checkpointArgs = newCapture(CaptureType.ALL); + Capture<PartitionGroupMetadata> partitionGroupMetadataCapture = newCapture(CaptureType.ALL); + Capture<Integer> intArguments = newCapture(CaptureType.ALL); + Capture<String> stringCapture = newCapture(CaptureType.ALL); + + Shard shard0 = Shard.builder().shardId(SHARD_ID_0).parentShardId(SHARD_ID_1).sequenceNumberRange(SequenceNumberRange.builder().startingSequenceNumber("1").build()).build(); + Shard shard1 = Shard.builder().shardId(SHARD_ID_1).sequenceNumberRange(SequenceNumberRange.builder().startingSequenceNumber("1").endingSequenceNumber("1").build()).build(); + + expect(kinesisConnectionHandler.getShards()).andReturn(ImmutableList.of(shard0, shard1)).anyTimes(); + expect(streamConsumerFactory + .createPartitionGroupConsumer(capture(stringCapture), capture(partitionGroupMetadataCapture))) + .andReturn(partitionGroupConsumer).anyTimes(); + expect(partitionGroupConsumer + .fetchMessages(capture(checkpointArgs), capture(checkpointArgs), captureInt(intArguments))) + .andReturn(new KinesisRecordsBatch(new ArrayList<>(), "0", true)).anyTimes(); + + replay(kinesisConnectionHandler, streamConsumerFactory, partitionGroupConsumer); + + List<PartitionGroupInfo> result = kinesisStreamMetadataProvider + .getPartitionGroupInfoList(CLIENT_ID, TestUtils.getStreamConfig(), currentPartitionGroupMeta, TIMEOUT); + + Assert.assertEquals(result.size(), 1); + Assert.assertEquals(result.get(0).getPartitionGroupId(), 0); + } +} diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/TestUtils.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/TestUtils.java new file mode 100644 index 0000000..28d02de --- /dev/null +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/TestUtils.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.plugin.stream.kinesis; + +import java.util.HashMap; +import java.util.Map; +import org.apache.pinot.spi.stream.StreamConfig; +import org.apache.pinot.spi.stream.StreamConfigProperties; +import software.amazon.awssdk.services.kinesis.model.ShardIteratorType; + + +public class TestUtils { + private static final String STREAM_NAME = "kinesis-test"; + private static final String AWS_REGION = "us-west-2"; + + public static StreamConfig getStreamConfig() { + Map<String, String> props = new HashMap<>(); + props.put(KinesisConfig.STREAM, STREAM_NAME); + props.put(KinesisConfig.AWS_REGION, AWS_REGION); + props.put(KinesisConfig.MAX_RECORDS_TO_FETCH, "10"); + props.put(KinesisConfig.SHARD_ITERATOR_TYPE, ShardIteratorType.AT_SEQUENCE_NUMBER.toString()); + props.put(StreamConfigProperties.STREAM_TYPE, "kinesis"); + props.put("stream.kinesis.consumer.type", "lowLevel"); + props.put("stream.kinesis.topic.name", STREAM_NAME); + props.put("stream.kinesis.decoder.class.name", "ABCD"); + props.put("stream.kinesis.consumer.factory.class.name", + "org.apache.pinot.plugin.stream.kinesis.KinesisConsumerFactory"); + return new StreamConfig("", props); + } + + public static KinesisConfig getKinesisConfig() { + Map<String, String> props = new HashMap<>(); + props.put(KinesisConfig.STREAM, STREAM_NAME); + props.put(KinesisConfig.AWS_REGION, AWS_REGION); + props.put(KinesisConfig.MAX_RECORDS_TO_FETCH, "10"); + props.put(KinesisConfig.SHARD_ITERATOR_TYPE, ShardIteratorType.AT_SEQUENCE_NUMBER.toString()); + return new KinesisConfig(props); + } +} diff --git a/pom.xml b/pom.xml index 237b5c9..79dabf7 100644 --- a/pom.xml +++ b/pom.xml @@ -117,7 +117,7 @@ <parquet.version>1.8.0</parquet.version> <helix.version>0.9.8</helix.version> <zkclient.version>0.7</zkclient.version> - <jackson.version>2.12.0</jackson.version> + <jackson.version>2.9.8</jackson.version> <async-http-client.version>1.9.21</async-http-client.version> <jersey.version>2.28</jersey.version> <grizzly.version>2.4.4</grizzly.version> --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org