This is an automated email from the ASF dual-hosted git repository.

nehapawar pushed a commit to branch sharded_consumer_type_support_with_kinesis
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit eb428cded05ff23b587fe0ed61b89a3b0ec9dd8e
Author: KKcorps <kharekar...@gmail.com>
AuthorDate: Thu Dec 24 17:58:40 2020 +0530

    Add isEndOfPartition check in checkpoints
---
 .../pinot/plugin/stream/kinesis/KinesisCheckpoint.java       | 12 +++++++++++-
 .../apache/pinot/plugin/stream/kinesis/KinesisConsumer.java  | 10 +++++++++-
 .../main/java/org/apache/pinot/spi/stream/v2/Checkpoint.java |  1 +
 3 files changed, 21 insertions(+), 2 deletions(-)

diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java
index 027b789..54e26d0 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java
@@ -22,11 +22,22 @@ import org.apache.pinot.spi.stream.v2.Checkpoint;
 
 public class KinesisCheckpoint implements Checkpoint {
   String _sequenceNumber;
+  Boolean _isEndOfPartition = false;
 
   public KinesisCheckpoint(String sequenceNumber) {
     _sequenceNumber = sequenceNumber;
   }
 
+  public KinesisCheckpoint(String sequenceNumber, Boolean isEndOfPartition) {
+    _sequenceNumber = sequenceNumber;
+    _isEndOfPartition = isEndOfPartition;
+  }
+
+  @Override
+  public boolean isEndOfPartition() {
+    return _isEndOfPartition;
+  }
+
   public String getSequenceNumber() {
     return _sequenceNumber;
   }
@@ -38,7 +49,6 @@ public class KinesisCheckpoint implements Checkpoint {
 
   @Override
   public KinesisCheckpoint deserialize(byte[] blob) {
-    //TODO: Implement SerDe
     return new KinesisCheckpoint(new String(blob));
   }
 }
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
index abbc753..336468a 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
@@ -95,6 +95,7 @@ public class KinesisConsumer extends KinesisConnectionHandler 
implements Consume
       }
 
       String nextStartSequenceNumber = null;
+      boolean isEndOfShard = false;
 
       while (shardIterator != null) {
         GetRecordsRequest getRecordsRequest = 
GetRecordsRequest.builder().shardIterator(shardIterator).build();
@@ -114,14 +115,21 @@ public class KinesisConsumer extends 
KinesisConnectionHandler implements Consume
           }
         }
 
+        if(getRecordsResponse.hasChildShards()){
+          //This statement returns true only when end of current shard has 
reached.
+          isEndOfShard = true;
+          break;
+        }
+
         shardIterator = getRecordsResponse.nextShardIterator();
+
       }
 
       if (nextStartSequenceNumber == null && recordList.size() > 0) {
         nextStartSequenceNumber = recordList.get(recordList.size() - 
1).sequenceNumber();
       }
 
-      KinesisCheckpoint kinesisCheckpoint = new 
KinesisCheckpoint(nextStartSequenceNumber);
+      KinesisCheckpoint kinesisCheckpoint = new 
KinesisCheckpoint(nextStartSequenceNumber, isEndOfShard);
       KinesisFetchResult kinesisFetchResult = new 
KinesisFetchResult(kinesisCheckpoint, recordList);
 
       return kinesisFetchResult;
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/Checkpoint.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/Checkpoint.java
index 030fe4e..0195684 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/Checkpoint.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/Checkpoint.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.spi.stream.v2;
 
 public interface Checkpoint {
+  boolean isEndOfPartition();
   byte[] serialize();
   Checkpoint deserialize(byte[] blob);
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to