This is an automated email from the ASF dual-hosted git repository.

nehapawar pushed a commit to branch sharded_consumer_type_support_with_kinesis
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to 
refs/heads/sharded_consumer_type_support_with_kinesis by this push:
     new f006615  Fix offsets in StreamMetadataProvider impl
f006615 is described below

commit f00661545aefe9bbdfc2eaa89117c2441ddadad1
Author: Neha Pawar <neha.pawa...@gmail.com>
AuthorDate: Mon Jan 4 11:59:18 2021 -0800

    Fix offsets in StreamMetadataProvider impl
---
 .../org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java | 6 +++++-
 .../plugin/stream/kinesis/KinesisStreamMetadataProvider.java      | 8 +++++++-
 2 files changed, 12 insertions(+), 2 deletions(-)

diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java
index d42f899..517f8c0 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java
@@ -22,7 +22,6 @@ import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.core.type.TypeReference;
 import java.io.IOException;
 import java.util.Map;
-import org.apache.pinot.spi.stream.Checkpoint;
 import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
 import org.apache.pinot.spi.utils.JsonUtils;
 
@@ -54,6 +53,11 @@ public class KinesisCheckpoint implements 
StreamPartitionMsgOffset {
   }
 
   @Override
+  public String toString() {
+    return serialize();
+  }
+
+  @Override
   public KinesisCheckpoint deserialize(String blob) {
     try {
       return new KinesisCheckpoint(blob);
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProvider.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProvider.java
index ba9d2b6..f86d06c 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProvider.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProvider.java
@@ -2,7 +2,9 @@ package org.apache.pinot.plugin.stream.kinesis;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.TimeoutException;
 import javax.annotation.Nonnull;
 import org.apache.pinot.spi.stream.OffsetCriteria;
@@ -40,7 +42,11 @@ public class KinesisStreamMetadataProvider implements 
StreamMetadataProvider {
     List<PartitionGroupInfo> partitionGroupInfos = new ArrayList<>();
     List<Shard> shards = _kinesisConnectionHandler.getShards();
     for (Shard shard : shards) {
-      partitionGroupInfos.add(new 
PartitionGroupInfo(shard.shardId().hashCode(), 
shard.sequenceNumberRange().startingSequenceNumber()));
+      Map<String, String> shardToSequenceNumMap = new HashMap<>();
+      shardToSequenceNumMap.put(shard.shardId(), 
shard.sequenceNumberRange().startingSequenceNumber());
+      KinesisCheckpoint kinesisCheckpoint = new 
KinesisCheckpoint(shardToSequenceNumMap);
+      partitionGroupInfos
+          .add(new PartitionGroupInfo(Math.abs(shard.shardId().hashCode()), 
kinesisCheckpoint.serialize()));
     }
     return partitionGroupInfos;
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to