This is an automated email from the ASF dual-hosted git repository. nehapawar pushed a commit to branch sharded_consumer_type_support_with_kinesis in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit cf23ee3a83a0ea22d0dde57e306ccadf30db4d1c Author: KKcorps <kharekar...@gmail.com> AuthorDate: Thu Dec 24 17:48:04 2020 +0530 Handle timeout exception in consumer and make shard iterator type configurable --- .../plugin/stream/kinesis/KinesisCheckpoint.java | 1 - .../pinot/plugin/stream/kinesis/KinesisConfig.java | 8 +++++ .../stream/kinesis/KinesisConnectionHandler.java | 1 + .../plugin/stream/kinesis/KinesisConsumer.java | 36 +++++++++------------- .../stream/kinesis/KinesisShardMetadata.java | 2 +- .../plugin/stream/kinesis/KinesisConsumerTest.java | 8 +++-- 6 files changed, 30 insertions(+), 26 deletions(-) diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java index 8de95e2..027b789 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java @@ -20,7 +20,6 @@ package org.apache.pinot.plugin.stream.kinesis; import org.apache.pinot.spi.stream.v2.Checkpoint; - public class KinesisCheckpoint implements Checkpoint { String _sequenceNumber; diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java index a81d11f..82fc438 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java @@ -20,6 +20,7 @@ package org.apache.pinot.plugin.stream.kinesis; import java.util.Map; import org.apache.pinot.spi.stream.StreamConfig; +import software.amazon.awssdk.services.kinesis.model.ShardIteratorType; public class KinesisConfig { @@ -28,9 +29,11 @@ public class KinesisConfig { public static final String STREAM = "stream"; private static final String AWS_REGION = "aws-region"; private static final String MAX_RECORDS_TO_FETCH = "max-records-to-fetch"; + public static final String SHARD_ITERATOR_TYPE = "shard-iterator-type"; private static final String DEFAULT_AWS_REGION = "us-central-1"; private static final String DEFAULT_MAX_RECORDS = "20"; + private static final String DEFAULT_SHARD_ITERATOR_TYPE = "LATEST"; public KinesisConfig(StreamConfig streamConfig) { _props = streamConfig.getStreamConfigsMap(); @@ -51,4 +54,9 @@ public class KinesisConfig { public Integer maxRecordsToFetch(){ return Integer.parseInt(_props.getOrDefault(MAX_RECORDS_TO_FETCH, DEFAULT_MAX_RECORDS)); } + + public ShardIteratorType getShardIteratorType(){ + return ShardIteratorType.fromValue(_props.getOrDefault(SHARD_ITERATOR_TYPE, DEFAULT_SHARD_ITERATOR_TYPE)); + } + } diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java index 3607787..0cf4787 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java @@ -71,4 +71,5 @@ public class KinesisConnectionHandler { _kinesisClient = null; } } + } diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java index 3263f87..abbc753 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java @@ -51,6 +51,7 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Consume Integer _maxRecords; String _shardId; ExecutorService _executorService; + ShardIteratorType _shardIteratorType; private final Logger LOG = LoggerFactory.getLogger(KinesisConsumer.class); public KinesisConsumer(KinesisConfig kinesisConfig, PartitionGroupMetadata partitionGroupMetadata) { @@ -59,22 +60,23 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Consume _maxRecords = kinesisConfig.maxRecordsToFetch(); KinesisShardMetadata kinesisShardMetadata = (KinesisShardMetadata) partitionGroupMetadata; _shardId = kinesisShardMetadata.getShardId(); + _shardIteratorType = kinesisConfig.getShardIteratorType(); _executorService = Executors.newSingleThreadExecutor(); } @Override public KinesisFetchResult fetch(Checkpoint start, Checkpoint end, long timeout) { - Future<KinesisFetchResult> kinesisFetchResultFuture = _executorService.submit(() -> getResult(start, end)); + List<Record> recordList = new ArrayList<>(); + Future<KinesisFetchResult> kinesisFetchResultFuture = _executorService.submit(() -> getResult(start, end, recordList)); try { return kinesisFetchResultFuture.get(timeout, TimeUnit.MILLISECONDS); } catch(Exception e){ - return null; + return handleException((KinesisCheckpoint) start, recordList); } } - private KinesisFetchResult getResult(Checkpoint start, Checkpoint end) { - List<Record> recordList = new ArrayList<>(); + private KinesisFetchResult getResult(Checkpoint start, Checkpoint end, List<Record> recordList) { KinesisCheckpoint kinesisStartCheckpoint = (KinesisCheckpoint) start; try { @@ -83,7 +85,7 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Consume createConnection(); } - String shardIterator = getShardIterator(kinesisStartCheckpoint); + String shardIterator = getShardIterator(kinesisStartCheckpoint.getSequenceNumber()); String kinesisEndSequenceNumber = null; @@ -162,25 +164,15 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Consume } } - private String getShardIterator(KinesisCheckpoint kinesisStartCheckpoint) { - if (kinesisStartCheckpoint.getSequenceNumber() != null) { - return getShardIterator(ShardIteratorType.AT_SEQUENCE_NUMBER, kinesisStartCheckpoint.getSequenceNumber()); - } else { - return getShardIterator(ShardIteratorType.LATEST, null); - } - } + public String getShardIterator(String sequenceNumber) { - public String getShardIterator(ShardIteratorType shardIteratorType, String sequenceNumber){ - if(sequenceNumber == null){ - return _kinesisClient.getShardIterator( - GetShardIteratorRequest.builder().shardId(_shardId).streamName(_stream) - .shardIteratorType(shardIteratorType).build()).shardIterator(); - }else{ - return _kinesisClient.getShardIterator( - GetShardIteratorRequest.builder().streamName(_stream).shardId(_shardId) - .shardIteratorType(shardIteratorType) - .startingSequenceNumber(sequenceNumber).build()).shardIterator(); + GetShardIteratorRequest.Builder requestBuilder = + GetShardIteratorRequest.builder().streamName(_stream).shardId(_shardId).shardIteratorType(_shardIteratorType); + + if (sequenceNumber != null) { + requestBuilder = requestBuilder.startingSequenceNumber(sequenceNumber); } + return _kinesisClient.getShardIterator(requestBuilder.build()).shardIterator(); } @Override diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java index 327e034..1d753c3 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java @@ -25,7 +25,7 @@ import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse; import software.amazon.awssdk.services.kinesis.model.SequenceNumberRange; import software.amazon.awssdk.services.kinesis.model.ShardIteratorType; -//TODO: Implement shardId as Array +//TODO: Implement shardId as Array and have unique id public class KinesisShardMetadata extends KinesisConnectionHandler implements PartitionGroupMetadata { String _shardId; KinesisCheckpoint _startCheckpoint; diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerTest.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerTest.java index f8a0551..17691c4 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerTest.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerTest.java @@ -20,6 +20,8 @@ package org.apache.pinot.plugin.stream.kinesis; /** import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import software.amazon.awssdk.services.kinesis.model.Record; import software.amazon.awssdk.services.kinesis.model.Shard; @@ -29,7 +31,8 @@ public class KinesisConsumerTest { Map<String, String> props = new HashMap<>(); props.put("stream", "kinesis-test"); props.put("aws-region", "us-west-2"); - props.put("maxRecords", "10"); + props.put("max-records-to-fetch", "2000"); + props.put("shard-iterator-type", "AT-SEQUENCE-NUMBER"); KinesisConfig kinesisConfig = new KinesisConfig(props); @@ -38,6 +41,8 @@ public class KinesisConsumerTest { List<Shard> shardList = kinesisConnectionHandler.getShards(); for(Shard shard : shardList) { + System.out.println("SHARD: " + shard.shardId()); + KinesisConsumer kinesisConsumer = new KinesisConsumer(kinesisConfig, new KinesisShardMetadata(shard.shardId(), "kinesis-test", "us-west-2")); KinesisCheckpoint kinesisCheckpoint = new KinesisCheckpoint(shard.sequenceNumberRange().startingSequenceNumber()); @@ -45,7 +50,6 @@ public class KinesisConsumerTest { List<Record> list = fetchResult.getMessages(); - System.out.println("SHARD: " + shard.shardId()); for (Record record : list) { System.out.println("SEQ-NO: " + record.sequenceNumber() + ", DATA: " + record.data().asUtf8String()); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org