This is an automated email from the ASF dual-hosted git repository. nehapawar pushed a commit to branch sharded_consumer_type_support_with_kinesis in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 6cb0ebb1b775959c166cbcdeadec74ae3349e4ad Author: Xiang Fu <fx19880...@gmail.com> AuthorDate: Sat Jan 2 17:14:31 2021 -0800 fixing compilation --- pinot-distribution/pinot-assembly.xml | 4 ++ pinot-distribution/pom.xml | 4 ++ .../pinot-stream-ingestion/pinot-kinesis/pom.xml | 64 ++++++++++++++++++++-- .../plugin/stream/kinesis/KinesisCheckpoint.java | 1 + .../pinot/plugin/stream/kinesis/KinesisConfig.java | 23 ++++---- .../stream/kinesis/KinesisConnectionHandler.java | 26 +++------ .../plugin/stream/kinesis/KinesisConsumer.java | 50 +++++++---------- .../stream/kinesis/KinesisConsumerFactory.java | 4 +- .../plugin/stream/kinesis/KinesisFetchResult.java | 3 - .../kinesis/KinesisPartitionGroupMetadataMap.java | 7 +-- .../plugin/stream/kinesis/KinesisRecordsBatch.java | 18 ++++++ .../stream/kinesis/KinesisShardMetadata.java | 13 ++--- .../plugin/stream/kinesis/KinesisConsumerTest.java | 39 +++++++------ 13 files changed, 152 insertions(+), 104 deletions(-) diff --git a/pinot-distribution/pinot-assembly.xml b/pinot-distribution/pinot-assembly.xml index 2dfb36e..de7329f 100644 --- a/pinot-distribution/pinot-assembly.xml +++ b/pinot-distribution/pinot-assembly.xml @@ -55,6 +55,10 @@ <source>${pinot.root}/pinot-plugins/pinot-stream-ingestion/pinot-kafka-${kafka.version}/target/pinot-kafka-${kafka.version}-${project.version}-shaded.jar</source> <destName>plugins/pinot-stream-ingestion/pinot-kafka-${kafka.version}/pinot-kafka-${kafka.version}-${project.version}-shaded.jar</destName> </file> + <file> + <source>${pinot.root}/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/target/pinot-kinesis-${project.version}-shaded.jar</source> + <destName>plugins/pinot-stream-ingestion/pinot-kinesis/pinot-kinesis-${project.version}-shaded.jar</destName> + </file> <!-- End Include Pinot Stream Ingestion Plugins--> <!-- Start Include Pinot Batch Ingestion Plugins--> <file> diff --git a/pinot-distribution/pom.xml b/pinot-distribution/pom.xml index 1a3f106..f29cae0 100644 --- a/pinot-distribution/pom.xml +++ b/pinot-distribution/pom.xml @@ -86,6 +86,10 @@ </exclusion> <exclusion> <groupId>org.apache.pinot</groupId> + <artifactId>pinot-kinesis</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.pinot</groupId> <artifactId>pinot-batch-ingestion-standalone</artifactId> </exclusion> <exclusion> diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/pom.xml b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/pom.xml index 0c9ae0b..4fce169 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/pom.xml +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/pom.xml @@ -19,19 +19,20 @@ under the License. --> -<project xmlns="http://maven.apache.org/POM/4.0.0" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" +<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xmlns="http://maven.apache.org/POM/4.0.0" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> <parent> <artifactId>pinot-stream-ingestion</artifactId> <groupId>org.apache.pinot</groupId> <version>0.7.0-SNAPSHOT</version> <relativePath>..</relativePath> </parent> - <modelVersion>4.0.0</modelVersion> <artifactId>pinot-kinesis</artifactId> - + <name>Pinot Kinesis</name> + <url>https://pinot.apache.org/</url> <properties> <pinot.root>${basedir}/../../..</pinot.root> <phase.prop>package</phase.prop> @@ -43,6 +44,32 @@ <groupId>software.amazon.awssdk</groupId> <artifactId>kinesis</artifactId> <version>${aws.version}</version> + <exclusions> + <exclusion> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-core</artifactId> + </exclusion> + <exclusion> + <groupId>org.reactivestreams</groupId> + <artifactId>reactive-streams</artifactId> + </exclusion> + <exclusion> + <groupId>io.netty</groupId> + <artifactId>netty-codec</artifactId> + </exclusion> + <exclusion> + <groupId>io.netty</groupId> + <artifactId>netty-buffer</artifactId> + </exclusion> + <exclusion> + <groupId>io.netty</groupId> + <artifactId>netty-transport</artifactId> + </exclusion> + <exclusion> + <groupId>io.netty</groupId> + <artifactId>netty-common</artifactId> + </exclusion> + </exclusions> </dependency> <!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-core --> <dependency> @@ -52,8 +79,33 @@ </dependency> <dependency> - <groupId>org.apache.pinot</groupId> - <artifactId>pinot-spi</artifactId> + <groupId>org.reactivestreams</groupId> + <artifactId>reactive-streams</artifactId> + <version>1.0.2</version> + </dependency> + + <dependency> + <groupId>io.netty</groupId> + <artifactId>netty-codec</artifactId> + <version>4.1.42.Final</version> + </dependency> + + <dependency> + <groupId>io.netty</groupId> + <artifactId>netty-buffer</artifactId> + <version>4.1.42.Final</version> + </dependency> + + <dependency> + <groupId>io.netty</groupId> + <artifactId>netty-transport</artifactId> + <version>4.1.42.Final</version> + </dependency> + + <dependency> + <groupId>io.netty</groupId> + <artifactId>netty-common</artifactId> + <version>4.1.42.Final</version> </dependency> </dependencies> diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java index 54e26d0..f3a7a49 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java @@ -20,6 +20,7 @@ package org.apache.pinot.plugin.stream.kinesis; import org.apache.pinot.spi.stream.v2.Checkpoint; + public class KinesisCheckpoint implements Checkpoint { String _sequenceNumber; Boolean _isEndOfPartition = false; diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java index 82fc438..529f34f 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java @@ -24,16 +24,14 @@ import software.amazon.awssdk.services.kinesis.model.ShardIteratorType; public class KinesisConfig { - private final Map<String, String> _props; - public static final String STREAM = "stream"; - private static final String AWS_REGION = "aws-region"; - private static final String MAX_RECORDS_TO_FETCH = "max-records-to-fetch"; public static final String SHARD_ITERATOR_TYPE = "shard-iterator-type"; - - private static final String DEFAULT_AWS_REGION = "us-central-1"; - private static final String DEFAULT_MAX_RECORDS = "20"; - private static final String DEFAULT_SHARD_ITERATOR_TYPE = "LATEST"; + public static final String AWS_REGION = "aws-region"; + public static final String MAX_RECORDS_TO_FETCH = "max-records-to-fetch"; + public static final String DEFAULT_AWS_REGION = "us-central-1"; + public static final String DEFAULT_MAX_RECORDS = "20"; + public static final String DEFAULT_SHARD_ITERATOR_TYPE = ShardIteratorType.LATEST.toString(); + private final Map<String, String> _props; public KinesisConfig(StreamConfig streamConfig) { _props = streamConfig.getStreamConfigsMap(); @@ -43,20 +41,19 @@ public class KinesisConfig { _props = props; } - public String getStream(){ + public String getStream() { return _props.get(STREAM); } - public String getAwsRegion(){ + public String getAwsRegion() { return _props.getOrDefault(AWS_REGION, DEFAULT_AWS_REGION); } - public Integer maxRecordsToFetch(){ + public Integer maxRecordsToFetch() { return Integer.parseInt(_props.getOrDefault(MAX_RECORDS_TO_FETCH, DEFAULT_MAX_RECORDS)); } - public ShardIteratorType getShardIteratorType(){ + public ShardIteratorType getShardIteratorType() { return ShardIteratorType.fromValue(_props.getOrDefault(SHARD_ITERATOR_TYPE, DEFAULT_SHARD_ITERATOR_TYPE)); } - } diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java index 0cf4787..4d968f6 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java @@ -19,28 +19,18 @@ package org.apache.pinot.plugin.stream.kinesis; import java.util.List; -import org.apache.pinot.spi.stream.StreamConfig; -import org.apache.pinot.spi.stream.v2.ConsumerV2; import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; -import software.amazon.awssdk.core.SdkBytes; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.kinesis.KinesisClient; -import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest; -import software.amazon.awssdk.services.kinesis.model.DescribeStreamResponse; -import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest; import software.amazon.awssdk.services.kinesis.model.ListShardsRequest; import software.amazon.awssdk.services.kinesis.model.ListShardsResponse; -import software.amazon.awssdk.services.kinesis.model.PutRecordRequest; -import software.amazon.awssdk.services.kinesis.model.PutRecordResponse; import software.amazon.awssdk.services.kinesis.model.Shard; -import software.amazon.awssdk.services.kinesis.model.ShardIteratorType; -import software.amazon.awssdk.services.kinesis.model.StreamDescription; public class KinesisConnectionHandler { + KinesisClient _kinesisClient; private String _stream; private String _awsRegion; - KinesisClient _kinesisClient; public KinesisConnectionHandler() { @@ -58,18 +48,18 @@ public class KinesisConnectionHandler { return listShardsResponse.shards(); } - public void createConnection(){ - if(_kinesisClient == null) { - _kinesisClient = KinesisClient.builder().region(Region.of(_awsRegion)).credentialsProvider(DefaultCredentialsProvider.create()) - .build(); + public void createConnection() { + if (_kinesisClient == null) { + _kinesisClient = + KinesisClient.builder().region(Region.of(_awsRegion)).credentialsProvider(DefaultCredentialsProvider.create()) + .build(); } } - public void close(){ - if(_kinesisClient != null) { + public void close() { + if (_kinesisClient != null) { _kinesisClient.close(); _kinesisClient = null; } } - } diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java index 336468a..fb414f0 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java @@ -19,18 +19,13 @@ package org.apache.pinot.plugin.stream.kinesis; import java.util.ArrayList; -import java.util.Collections; import java.util.List; -import java.util.concurrent.Callable; -import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; -import org.apache.pinot.spi.stream.StreamConfig; import org.apache.pinot.spi.stream.v2.Checkpoint; import org.apache.pinot.spi.stream.v2.ConsumerV2; -import org.apache.pinot.spi.stream.v2.FetchResult; import org.apache.pinot.spi.stream.v2.PartitionGroupMetadata; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,7 +33,6 @@ import software.amazon.awssdk.services.kinesis.model.ExpiredIteratorException; import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest; import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse; import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest; -import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse; import software.amazon.awssdk.services.kinesis.model.InvalidArgumentException; import software.amazon.awssdk.services.kinesis.model.KinesisException; import software.amazon.awssdk.services.kinesis.model.ProvisionedThroughputExceededException; @@ -46,13 +40,14 @@ import software.amazon.awssdk.services.kinesis.model.Record; import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException; import software.amazon.awssdk.services.kinesis.model.ShardIteratorType; + public class KinesisConsumer extends KinesisConnectionHandler implements ConsumerV2 { + private final Logger LOG = LoggerFactory.getLogger(KinesisConsumer.class); String _stream; Integer _maxRecords; String _shardId; ExecutorService _executorService; ShardIteratorType _shardIteratorType; - private final Logger LOG = LoggerFactory.getLogger(KinesisConsumer.class); public KinesisConsumer(KinesisConfig kinesisConfig, PartitionGroupMetadata partitionGroupMetadata) { super(kinesisConfig.getStream(), kinesisConfig.getAwsRegion()); @@ -67,12 +62,13 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Consume @Override public KinesisFetchResult fetch(Checkpoint start, Checkpoint end, long timeout) { List<Record> recordList = new ArrayList<>(); - Future<KinesisFetchResult> kinesisFetchResultFuture = _executorService.submit(() -> getResult(start, end, recordList)); + Future<KinesisFetchResult> kinesisFetchResultFuture = + _executorService.submit(() -> getResult(start, end, recordList)); try { return kinesisFetchResultFuture.get(timeout, TimeUnit.MILLISECONDS); - } catch(Exception e){ - return handleException((KinesisCheckpoint) start, recordList); + } catch (Exception e) { + return handleException((KinesisCheckpoint) start, recordList); } } @@ -81,7 +77,7 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Consume try { - if(_kinesisClient == null){ + if (_kinesisClient == null) { createConnection(); } @@ -105,7 +101,8 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Consume recordList.addAll(getRecordsResponse.records()); nextStartSequenceNumber = recordList.get(recordList.size() - 1).sequenceNumber(); - if (kinesisEndSequenceNumber != null && kinesisEndSequenceNumber.compareTo(recordList.get(recordList.size() - 1).sequenceNumber()) <= 0) { + if (kinesisEndSequenceNumber != null + && kinesisEndSequenceNumber.compareTo(recordList.get(recordList.size() - 1).sequenceNumber()) <= 0) { nextStartSequenceNumber = kinesisEndSequenceNumber; break; } @@ -115,14 +112,13 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Consume } } - if(getRecordsResponse.hasChildShards()){ + if (getRecordsResponse.hasChildShards()) { //This statement returns true only when end of current shard has reached. isEndOfShard = true; break; } shardIterator = getRecordsResponse.nextShardIterator(); - } if (nextStartSequenceNumber == null && recordList.size() > 0) { @@ -133,28 +129,20 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Consume KinesisFetchResult kinesisFetchResult = new KinesisFetchResult(kinesisCheckpoint, recordList); return kinesisFetchResult; - }catch (ProvisionedThroughputExceededException e) { - LOG.warn( - "The request rate for the stream is too high" - , e); + } catch (ProvisionedThroughputExceededException e) { + LOG.warn("The request rate for the stream is too high", e); return handleException(kinesisStartCheckpoint, recordList); - } - catch (ExpiredIteratorException e) { - LOG.warn( - "ShardIterator expired while trying to fetch records",e - ); + } catch (ExpiredIteratorException e) { + LOG.warn("ShardIterator expired while trying to fetch records", e); return handleException(kinesisStartCheckpoint, recordList); - } - catch (ResourceNotFoundException | InvalidArgumentException e) { + } catch (ResourceNotFoundException | InvalidArgumentException e) { // aws errors LOG.error("Encountered AWS error while attempting to fetch records", e); return handleException(kinesisStartCheckpoint, recordList); - } - catch (KinesisException e) { + } catch (KinesisException e) { LOG.warn("Encountered unknown unrecoverable AWS exception", e); throw new RuntimeException(e); - } - catch (Throwable e) { + } catch (Throwable e) { // non transient errors LOG.error("Unknown fetchRecords exception", e); throw new RuntimeException(e); @@ -162,11 +150,11 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Consume } private KinesisFetchResult handleException(KinesisCheckpoint start, List<Record> recordList) { - if(recordList.size() > 0){ + if (recordList.size() > 0) { String nextStartSequenceNumber = recordList.get(recordList.size() - 1).sequenceNumber(); KinesisCheckpoint kinesisCheckpoint = new KinesisCheckpoint(nextStartSequenceNumber); return new KinesisFetchResult(kinesisCheckpoint, recordList); - }else{ + } else { KinesisCheckpoint kinesisCheckpoint = new KinesisCheckpoint(start.getSequenceNumber()); return new KinesisFetchResult(kinesisCheckpoint, recordList); } diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java index acac1fb..9bb4d0c 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java @@ -18,7 +18,6 @@ */ package org.apache.pinot.plugin.stream.kinesis; -import java.util.Map; import org.apache.pinot.spi.stream.StreamConfig; import org.apache.pinot.spi.stream.v2.ConsumerV2; import org.apache.pinot.spi.stream.v2.PartitionGroupMetadata; @@ -38,7 +37,8 @@ public class KinesisConsumerFactory implements StreamConsumerFactoryV2 { @Override public PartitionGroupMetadataMap getPartitionGroupsMetadata( PartitionGroupMetadataMap currentPartitionGroupsMetadata) { - return new KinesisPartitionGroupMetadataMap(_kinesisConfig.getStream(), _kinesisConfig.getAwsRegion(), currentPartitionGroupsMetadata); + return new KinesisPartitionGroupMetadataMap(_kinesisConfig.getStream(), _kinesisConfig.getAwsRegion(), + currentPartitionGroupsMetadata); } @Override diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisFetchResult.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisFetchResult.java index 39561f3..8da3d2e 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisFetchResult.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisFetchResult.java @@ -18,10 +18,7 @@ */ package org.apache.pinot.plugin.stream.kinesis; -import java.util.ArrayList; import java.util.List; -import org.apache.pinot.spi.stream.MessageBatch; -import org.apache.pinot.spi.stream.v2.Checkpoint; import org.apache.pinot.spi.stream.v2.FetchResult; import software.amazon.awssdk.services.kinesis.model.Record; diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java index 626c8ea..f96533f 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java @@ -22,12 +22,8 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Set; -import java.util.stream.Collectors; import org.apache.pinot.spi.stream.v2.PartitionGroupMetadata; import org.apache.pinot.spi.stream.v2.PartitionGroupMetadataMap; -import software.amazon.awssdk.services.kinesis.model.ListShardsRequest; -import software.amazon.awssdk.services.kinesis.model.ListShardsResponse; import software.amazon.awssdk.services.kinesis.model.Shard; @@ -56,7 +52,8 @@ public class KinesisPartitionGroupMetadataMap extends KinesisConnectionHandler i //Return existing shard metadata _stringPartitionGroupMetadataIndex.add(currentMetadataMap.get(shard.shardId())); } else if (currentMetadataMap.containsKey(shard.parentShardId())) { - KinesisShardMetadata kinesisShardMetadata = (KinesisShardMetadata) currentMetadataMap.get(shard.parentShardId()); + KinesisShardMetadata kinesisShardMetadata = + (KinesisShardMetadata) currentMetadataMap.get(shard.parentShardId()); if (isProcessingFinished(kinesisShardMetadata)) { //Add child shards for processing since parent has finished appendShardMetadata(stream, awsRegion, shard); diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisRecordsBatch.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisRecordsBatch.java index ed51f8f..04bf4e6 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisRecordsBatch.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisRecordsBatch.java @@ -1,3 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ package org.apache.pinot.plugin.stream.kinesis; import java.util.List; diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java index 1d753c3..e24121b 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java @@ -20,10 +20,7 @@ package org.apache.pinot.plugin.stream.kinesis; import org.apache.pinot.spi.stream.v2.Checkpoint; import org.apache.pinot.spi.stream.v2.PartitionGroupMetadata; -import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest; -import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse; -import software.amazon.awssdk.services.kinesis.model.SequenceNumberRange; -import software.amazon.awssdk.services.kinesis.model.ShardIteratorType; + //TODO: Implement shardId as Array and have unique id public class KinesisShardMetadata extends KinesisConnectionHandler implements PartitionGroupMetadata { @@ -48,13 +45,13 @@ public class KinesisShardMetadata extends KinesisConnectionHandler implements Pa } @Override - public KinesisCheckpoint getEndCheckpoint() { - return _endCheckpoint; + public void setStartCheckpoint(Checkpoint startCheckpoint) { + _startCheckpoint = (KinesisCheckpoint) startCheckpoint; } @Override - public void setStartCheckpoint(Checkpoint startCheckpoint) { - _startCheckpoint = (KinesisCheckpoint) startCheckpoint; + public KinesisCheckpoint getEndCheckpoint() { + return _endCheckpoint; } @Override diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerTest.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerTest.java index 6f660f7..f853875 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerTest.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerTest.java @@ -20,40 +20,43 @@ package org.apache.pinot.plugin.stream.kinesis; /** import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import software.amazon.awssdk.services.kinesis.model.Record; import software.amazon.awssdk.services.kinesis.model.Shard; +import software.amazon.awssdk.services.kinesis.model.ShardIteratorType; public class KinesisConsumerTest { + + private static final String STREAM_NAME = "kinesis-test"; + private static final String AWS_REGION = "us-west-2"; + public static void main(String[] args) { Map<String, String> props = new HashMap<>(); - props.put("stream", "kinesis-test"); - props.put("aws-region", "us-west-2"); - props.put("max-records-to-fetch", "2000"); - props.put("shard-iterator-type", "AT-SEQUENCE-NUMBER"); - + props.put(KinesisConfig.STREAM, STREAM_NAME); + props.put(KinesisConfig.AWS_REGION, AWS_REGION); + props.put(KinesisConfig.MAX_RECORDS_TO_FETCH, "10"); + props.put(KinesisConfig.SHARD_ITERATOR_TYPE, ShardIteratorType.AT_SEQUENCE_NUMBER.toString()); KinesisConfig kinesisConfig = new KinesisConfig(props); - - KinesisConnectionHandler kinesisConnectionHandler = new KinesisConnectionHandler("kinesis-test", "us-west-2"); - + KinesisConnectionHandler kinesisConnectionHandler = new KinesisConnectionHandler(STREAM_NAME, AWS_REGION); List<Shard> shardList = kinesisConnectionHandler.getShards(); - - for(Shard shard : shardList) { + for (Shard shard : shardList) { System.out.println("SHARD: " + shard.shardId()); - KinesisConsumer kinesisConsumer = new KinesisConsumer(kinesisConfig, new KinesisShardMetadata(shard.shardId(), "kinesis-test", "us-west-2")); - + KinesisConsumer kinesisConsumer = + new KinesisConsumer(kinesisConfig, new KinesisShardMetadata(shard.shardId(), STREAM_NAME, AWS_REGION)); + System.out.println( + "Kinesis Checkpoint Range: < " + shard.sequenceNumberRange().startingSequenceNumber() + ", " + shard + .sequenceNumberRange().endingSequenceNumber() + " >"); KinesisCheckpoint kinesisCheckpoint = new KinesisCheckpoint(shard.sequenceNumberRange().startingSequenceNumber()); - KinesisFetchResult fetchResult = kinesisConsumer.fetch(kinesisCheckpoint, null, 6 * 10 * 1000L); - + KinesisFetchResult fetchResult = kinesisConsumer.fetch(kinesisCheckpoint, null, 60 * 1000L); KinesisRecordsBatch list = fetchResult.getMessages(); int n = list.getMessageCount(); - for (int i=0;i<n;i++) { + System.out.println("Found " + n + " messages "); + for (int i = 0; i < n; i++) { System.out.println("SEQ-NO: " + list.getMessageOffsetAtIndex(i) + ", DATA: " + list.getMessageAtIndex(i)); } + kinesisConsumer.close(); } + kinesisConnectionHandler.close(); } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org