This is an automated email from the ASF dual-hosted git repository.

nehapawar pushed a commit to branch sharded_consumer_type_support_with_kinesis
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 262d326be4dc8ee8b1a77bc65668ef3fc12c4aea
Author: KKcorps <kharekar...@gmail.com>
AuthorDate: Sun Dec 20 11:44:38 2020 +0530

    fetch records with timeout
---
 .../plugin/stream/kinesis/KinesisConsumer.java     | 30 ++++++++++++++++++----
 1 file changed, 25 insertions(+), 5 deletions(-)

diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
index 96241d4..910b9ee 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
@@ -21,6 +21,12 @@ package org.apache.pinot.plugin.stream.kinesis;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 import org.apache.pinot.spi.stream.StreamConfig;
 import org.apache.pinot.spi.stream.v2.Checkpoint;
 import org.apache.pinot.spi.stream.v2.ConsumerV2;
@@ -39,6 +45,7 @@ public class KinesisConsumer extends KinesisConnectionHandler 
implements Consume
   String _stream;
   Integer _maxRecords;
   String _shardId;
+  ExecutorService _executorService;
 
   public KinesisConsumer(KinesisConfig kinesisConfig, PartitionGroupMetadata 
partitionGroupMetadata) {
     super(kinesisConfig.getStream(), kinesisConfig.getAwsRegion());
@@ -46,10 +53,27 @@ public class KinesisConsumer extends 
KinesisConnectionHandler implements Consume
     _maxRecords = kinesisConfig.maxRecordsToFetch();
     KinesisShardMetadata kinesisShardMetadata = (KinesisShardMetadata) 
partitionGroupMetadata;
     _shardId = kinesisShardMetadata.getShardId();
+    _executorService = Executors.newSingleThreadExecutor();
   }
 
   @Override
   public KinesisFetchResult fetch(Checkpoint start, Checkpoint end, long 
timeout) {
+    Future<KinesisFetchResult> kinesisFetchResultFuture = 
_executorService.submit(new Callable<KinesisFetchResult>() {
+      @Override
+      public KinesisFetchResult call()
+          throws Exception {
+        return getResult(start, end);
+      }
+    });
+
+    try {
+      return kinesisFetchResultFuture.get(timeout, TimeUnit.MILLISECONDS);
+    } catch(Exception e){
+      return null;
+    }
+  }
+
+  private KinesisFetchResult getResult(Checkpoint start, Checkpoint end) {
     try {
       KinesisCheckpoint kinesisStartCheckpoint = (KinesisCheckpoint) start;
 
@@ -65,9 +89,8 @@ public class KinesisConsumer extends KinesisConnectionHandler 
implements Consume
       }
 
       String nextStartSequenceNumber = null;
-      Long startTimestamp = System.currentTimeMillis();
 
-      while (shardIterator != null && !isTimedOut(startTimestamp, timeout)) {
+      while (shardIterator != null) {
         GetRecordsRequest getRecordsRequest = 
GetRecordsRequest.builder().shardIterator(shardIterator).build();
         GetRecordsResponse getRecordsResponse = 
_kinesisClient.getRecords(getRecordsRequest);
 
@@ -119,7 +142,4 @@ public class KinesisConsumer extends 
KinesisConnectionHandler implements Consume
     return getShardIteratorResponse.shardIterator();
   }
 
-  private boolean isTimedOut(Long startTimestamp, Long timeout) {
-    return (System.currentTimeMillis() - startTimestamp) >= timeout;
-  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to