This is an automated email from the ASF dual-hosted git repository.

nehapawar pushed a commit to branch sharded_consumer_type_support_with_kinesis
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit cf23ee3a83a0ea22d0dde57e306ccadf30db4d1c
Author: KKcorps <kharekar...@gmail.com>
AuthorDate: Thu Dec 24 17:48:04 2020 +0530

    Handle timeout exception in consumer and make shard iterator type 
configurable
---
 .../plugin/stream/kinesis/KinesisCheckpoint.java   |  1 -
 .../pinot/plugin/stream/kinesis/KinesisConfig.java |  8 +++++
 .../stream/kinesis/KinesisConnectionHandler.java   |  1 +
 .../plugin/stream/kinesis/KinesisConsumer.java     | 36 +++++++++-------------
 .../stream/kinesis/KinesisShardMetadata.java       |  2 +-
 .../plugin/stream/kinesis/KinesisConsumerTest.java |  8 +++--
 6 files changed, 30 insertions(+), 26 deletions(-)

diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java
index 8de95e2..027b789 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java
@@ -20,7 +20,6 @@ package org.apache.pinot.plugin.stream.kinesis;
 
 import org.apache.pinot.spi.stream.v2.Checkpoint;
 
-
 public class KinesisCheckpoint implements Checkpoint {
   String _sequenceNumber;
 
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java
index a81d11f..82fc438 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java
@@ -20,6 +20,7 @@ package org.apache.pinot.plugin.stream.kinesis;
 
 import java.util.Map;
 import org.apache.pinot.spi.stream.StreamConfig;
+import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
 
 
 public class KinesisConfig {
@@ -28,9 +29,11 @@ public class KinesisConfig {
   public static final String STREAM = "stream";
   private static final String AWS_REGION = "aws-region";
   private static final String MAX_RECORDS_TO_FETCH = "max-records-to-fetch";
+  public static final String SHARD_ITERATOR_TYPE = "shard-iterator-type";
 
   private static final String DEFAULT_AWS_REGION = "us-central-1";
   private static final String DEFAULT_MAX_RECORDS = "20";
+  private static final String DEFAULT_SHARD_ITERATOR_TYPE = "LATEST";
 
   public KinesisConfig(StreamConfig streamConfig) {
     _props = streamConfig.getStreamConfigsMap();
@@ -51,4 +54,9 @@ public class KinesisConfig {
   public Integer maxRecordsToFetch(){
     return Integer.parseInt(_props.getOrDefault(MAX_RECORDS_TO_FETCH, 
DEFAULT_MAX_RECORDS));
   }
+
+  public ShardIteratorType getShardIteratorType(){
+    return 
ShardIteratorType.fromValue(_props.getOrDefault(SHARD_ITERATOR_TYPE, 
DEFAULT_SHARD_ITERATOR_TYPE));
+  }
+
 }
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java
index 3607787..0cf4787 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java
@@ -71,4 +71,5 @@ public class KinesisConnectionHandler {
       _kinesisClient = null;
     }
   }
+
 }
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
index 3263f87..abbc753 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
@@ -51,6 +51,7 @@ public class KinesisConsumer extends KinesisConnectionHandler 
implements Consume
   Integer _maxRecords;
   String _shardId;
   ExecutorService _executorService;
+  ShardIteratorType _shardIteratorType;
   private final Logger LOG = LoggerFactory.getLogger(KinesisConsumer.class);
 
   public KinesisConsumer(KinesisConfig kinesisConfig, PartitionGroupMetadata 
partitionGroupMetadata) {
@@ -59,22 +60,23 @@ public class KinesisConsumer extends 
KinesisConnectionHandler implements Consume
     _maxRecords = kinesisConfig.maxRecordsToFetch();
     KinesisShardMetadata kinesisShardMetadata = (KinesisShardMetadata) 
partitionGroupMetadata;
     _shardId = kinesisShardMetadata.getShardId();
+    _shardIteratorType = kinesisConfig.getShardIteratorType();
     _executorService = Executors.newSingleThreadExecutor();
   }
 
   @Override
   public KinesisFetchResult fetch(Checkpoint start, Checkpoint end, long 
timeout) {
-    Future<KinesisFetchResult> kinesisFetchResultFuture = 
_executorService.submit(() -> getResult(start, end));
+    List<Record> recordList = new ArrayList<>();
+    Future<KinesisFetchResult> kinesisFetchResultFuture = 
_executorService.submit(() -> getResult(start, end, recordList));
 
     try {
       return kinesisFetchResultFuture.get(timeout, TimeUnit.MILLISECONDS);
     } catch(Exception e){
-      return null;
+        return handleException((KinesisCheckpoint) start, recordList);
     }
   }
 
-  private KinesisFetchResult getResult(Checkpoint start, Checkpoint end) {
-    List<Record> recordList = new ArrayList<>();
+  private KinesisFetchResult getResult(Checkpoint start, Checkpoint end, 
List<Record> recordList) {
     KinesisCheckpoint kinesisStartCheckpoint = (KinesisCheckpoint) start;
 
     try {
@@ -83,7 +85,7 @@ public class KinesisConsumer extends KinesisConnectionHandler 
implements Consume
         createConnection();
       }
 
-      String shardIterator = getShardIterator(kinesisStartCheckpoint);
+      String shardIterator = 
getShardIterator(kinesisStartCheckpoint.getSequenceNumber());
 
       String kinesisEndSequenceNumber = null;
 
@@ -162,25 +164,15 @@ public class KinesisConsumer extends 
KinesisConnectionHandler implements Consume
     }
   }
 
-  private String getShardIterator(KinesisCheckpoint kinesisStartCheckpoint) {
-    if (kinesisStartCheckpoint.getSequenceNumber() != null) {
-      return getShardIterator(ShardIteratorType.AT_SEQUENCE_NUMBER, 
kinesisStartCheckpoint.getSequenceNumber());
-    } else {
-      return getShardIterator(ShardIteratorType.LATEST, null);
-    }
-  }
+  public String getShardIterator(String sequenceNumber) {
 
-  public String getShardIterator(ShardIteratorType shardIteratorType, String 
sequenceNumber){
-    if(sequenceNumber == null){
-      return _kinesisClient.getShardIterator(
-          
GetShardIteratorRequest.builder().shardId(_shardId).streamName(_stream)
-              .shardIteratorType(shardIteratorType).build()).shardIterator();
-    }else{
-      return _kinesisClient.getShardIterator(
-          
GetShardIteratorRequest.builder().streamName(_stream).shardId(_shardId)
-              .shardIteratorType(shardIteratorType)
-              .startingSequenceNumber(sequenceNumber).build()).shardIterator();
+    GetShardIteratorRequest.Builder requestBuilder =
+        
GetShardIteratorRequest.builder().streamName(_stream).shardId(_shardId).shardIteratorType(_shardIteratorType);
+
+    if (sequenceNumber != null) {
+      requestBuilder = requestBuilder.startingSequenceNumber(sequenceNumber);
     }
+    return 
_kinesisClient.getShardIterator(requestBuilder.build()).shardIterator();
   }
 
   @Override
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java
index 327e034..1d753c3 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java
@@ -25,7 +25,7 @@ import 
software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse;
 import software.amazon.awssdk.services.kinesis.model.SequenceNumberRange;
 import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
 
-//TODO: Implement shardId as Array
+//TODO: Implement shardId as Array and have unique id
 public class KinesisShardMetadata extends KinesisConnectionHandler implements 
PartitionGroupMetadata {
   String _shardId;
   KinesisCheckpoint _startCheckpoint;
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerTest.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerTest.java
index f8a0551..17691c4 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerTest.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerTest.java
@@ -20,6 +20,8 @@ package org.apache.pinot.plugin.stream.kinesis; /**
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import software.amazon.awssdk.services.kinesis.model.Record;
 import software.amazon.awssdk.services.kinesis.model.Shard;
 
@@ -29,7 +31,8 @@ public class KinesisConsumerTest {
     Map<String, String> props = new HashMap<>();
     props.put("stream", "kinesis-test");
     props.put("aws-region", "us-west-2");
-    props.put("maxRecords", "10");
+    props.put("max-records-to-fetch", "2000");
+    props.put("shard-iterator-type", "AT-SEQUENCE-NUMBER");
 
     KinesisConfig kinesisConfig = new KinesisConfig(props);
 
@@ -38,6 +41,8 @@ public class KinesisConsumerTest {
     List<Shard> shardList = kinesisConnectionHandler.getShards();
 
     for(Shard shard : shardList) {
+      System.out.println("SHARD: " + shard.shardId());
+
       KinesisConsumer kinesisConsumer = new KinesisConsumer(kinesisConfig, new 
KinesisShardMetadata(shard.shardId(), "kinesis-test", "us-west-2"));
 
       KinesisCheckpoint kinesisCheckpoint = new 
KinesisCheckpoint(shard.sequenceNumberRange().startingSequenceNumber());
@@ -45,7 +50,6 @@ public class KinesisConsumerTest {
 
       List<Record> list = fetchResult.getMessages();
 
-      System.out.println("SHARD: " + shard.shardId());
       for (Record record : list) {
         System.out.println("SEQ-NO: " + record.sequenceNumber() + ", DATA: " + 
record.data().asUtf8String());
       }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to