This is an automated email from the ASF dual-hosted git repository. nehapawar pushed a commit to branch sharded_consumer_type_support_with_kinesis in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit eb428cded05ff23b587fe0ed61b89a3b0ec9dd8e Author: KKcorps <kharekar...@gmail.com> AuthorDate: Thu Dec 24 17:58:40 2020 +0530 Add isEndOfPartition check in checkpoints --- .../pinot/plugin/stream/kinesis/KinesisCheckpoint.java | 12 +++++++++++- .../apache/pinot/plugin/stream/kinesis/KinesisConsumer.java | 10 +++++++++- .../main/java/org/apache/pinot/spi/stream/v2/Checkpoint.java | 1 + 3 files changed, 21 insertions(+), 2 deletions(-) diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java index 027b789..54e26d0 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java @@ -22,11 +22,22 @@ import org.apache.pinot.spi.stream.v2.Checkpoint; public class KinesisCheckpoint implements Checkpoint { String _sequenceNumber; + Boolean _isEndOfPartition = false; public KinesisCheckpoint(String sequenceNumber) { _sequenceNumber = sequenceNumber; } + public KinesisCheckpoint(String sequenceNumber, Boolean isEndOfPartition) { + _sequenceNumber = sequenceNumber; + _isEndOfPartition = isEndOfPartition; + } + + @Override + public boolean isEndOfPartition() { + return _isEndOfPartition; + } + public String getSequenceNumber() { return _sequenceNumber; } @@ -38,7 +49,6 @@ public class KinesisCheckpoint implements Checkpoint { @Override public KinesisCheckpoint deserialize(byte[] blob) { - //TODO: Implement SerDe return new KinesisCheckpoint(new String(blob)); } } diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java index abbc753..336468a 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java @@ -95,6 +95,7 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Consume } String nextStartSequenceNumber = null; + boolean isEndOfShard = false; while (shardIterator != null) { GetRecordsRequest getRecordsRequest = GetRecordsRequest.builder().shardIterator(shardIterator).build(); @@ -114,14 +115,21 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Consume } } + if(getRecordsResponse.hasChildShards()){ + //This statement returns true only when end of current shard has reached. + isEndOfShard = true; + break; + } + shardIterator = getRecordsResponse.nextShardIterator(); + } if (nextStartSequenceNumber == null && recordList.size() > 0) { nextStartSequenceNumber = recordList.get(recordList.size() - 1).sequenceNumber(); } - KinesisCheckpoint kinesisCheckpoint = new KinesisCheckpoint(nextStartSequenceNumber); + KinesisCheckpoint kinesisCheckpoint = new KinesisCheckpoint(nextStartSequenceNumber, isEndOfShard); KinesisFetchResult kinesisFetchResult = new KinesisFetchResult(kinesisCheckpoint, recordList); return kinesisFetchResult; diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/Checkpoint.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/Checkpoint.java index 030fe4e..0195684 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/Checkpoint.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/Checkpoint.java @@ -19,6 +19,7 @@ package org.apache.pinot.spi.stream.v2; public interface Checkpoint { + boolean isEndOfPartition(); byte[] serialize(); Checkpoint deserialize(byte[] blob); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org