This is an automated email from the ASF dual-hosted git repository.

nehapawar pushed a commit to branch sharded_consumer_type_support_with_kinesis
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit be19cf6866235d7bf4ce0a74424ae2378f40c8bc
Author: KKcorps <kharekar...@gmail.com>
AuthorDate: Mon Dec 21 14:25:25 2020 +0530

    Refactor: get shard iterator methods
---
 .../plugin/stream/kinesis/KinesisConsumer.java     | 25 ++++++++++++----------
 1 file changed, 14 insertions(+), 11 deletions(-)

diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
index fd48a92..3263f87 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
@@ -163,21 +163,24 @@ public class KinesisConsumer extends 
KinesisConnectionHandler implements Consume
   }
 
   private String getShardIterator(KinesisCheckpoint kinesisStartCheckpoint) {
-    GetShardIteratorResponse getShardIteratorResponse;
-
     if (kinesisStartCheckpoint.getSequenceNumber() != null) {
-      String kinesisStartSequenceNumber = 
kinesisStartCheckpoint.getSequenceNumber();
-      getShardIteratorResponse = _kinesisClient.getShardIterator(
-          
GetShardIteratorRequest.builder().streamName(_stream).shardId(_shardId)
-              .shardIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER)
-              .startingSequenceNumber(kinesisStartSequenceNumber).build());
+      return getShardIterator(ShardIteratorType.AT_SEQUENCE_NUMBER, 
kinesisStartCheckpoint.getSequenceNumber());
     } else {
-      getShardIteratorResponse = _kinesisClient.getShardIterator(
-          
GetShardIteratorRequest.builder().shardId(_shardId).streamName(_stream)
-              .shardIteratorType(ShardIteratorType.LATEST).build());
+      return getShardIterator(ShardIteratorType.LATEST, null);
     }
+  }
 
-    return getShardIteratorResponse.shardIterator();
+  public String getShardIterator(ShardIteratorType shardIteratorType, String 
sequenceNumber){
+    if(sequenceNumber == null){
+      return _kinesisClient.getShardIterator(
+          
GetShardIteratorRequest.builder().shardId(_shardId).streamName(_stream)
+              .shardIteratorType(shardIteratorType).build()).shardIterator();
+    }else{
+      return _kinesisClient.getShardIterator(
+          
GetShardIteratorRequest.builder().streamName(_stream).shardId(_shardId)
+              .shardIteratorType(shardIteratorType)
+              .startingSequenceNumber(sequenceNumber).build()).shardIterator();
+    }
   }
 
   @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to