This is an automated email from the ASF dual-hosted git repository. nehapawar pushed a commit to branch sharded_consumer_type_support_with_kinesis in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/sharded_consumer_type_support_with_kinesis by this push: new f006615 Fix offsets in StreamMetadataProvider impl f006615 is described below commit f00661545aefe9bbdfc2eaa89117c2441ddadad1 Author: Neha Pawar <neha.pawa...@gmail.com> AuthorDate: Mon Jan 4 11:59:18 2021 -0800 Fix offsets in StreamMetadataProvider impl --- .../org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java | 6 +++++- .../plugin/stream/kinesis/KinesisStreamMetadataProvider.java | 8 +++++++- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java index d42f899..517f8c0 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java @@ -22,7 +22,6 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; import java.io.IOException; import java.util.Map; -import org.apache.pinot.spi.stream.Checkpoint; import org.apache.pinot.spi.stream.StreamPartitionMsgOffset; import org.apache.pinot.spi.utils.JsonUtils; @@ -54,6 +53,11 @@ public class KinesisCheckpoint implements StreamPartitionMsgOffset { } @Override + public String toString() { + return serialize(); + } + + @Override public KinesisCheckpoint deserialize(String blob) { try { return new KinesisCheckpoint(blob); diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProvider.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProvider.java index ba9d2b6..f86d06c 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProvider.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProvider.java @@ -2,7 +2,9 @@ package org.apache.pinot.plugin.stream.kinesis; import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.TimeoutException; import javax.annotation.Nonnull; import org.apache.pinot.spi.stream.OffsetCriteria; @@ -40,7 +42,11 @@ public class KinesisStreamMetadataProvider implements StreamMetadataProvider { List<PartitionGroupInfo> partitionGroupInfos = new ArrayList<>(); List<Shard> shards = _kinesisConnectionHandler.getShards(); for (Shard shard : shards) { - partitionGroupInfos.add(new PartitionGroupInfo(shard.shardId().hashCode(), shard.sequenceNumberRange().startingSequenceNumber())); + Map<String, String> shardToSequenceNumMap = new HashMap<>(); + shardToSequenceNumMap.put(shard.shardId(), shard.sequenceNumberRange().startingSequenceNumber()); + KinesisCheckpoint kinesisCheckpoint = new KinesisCheckpoint(shardToSequenceNumMap); + partitionGroupInfos + .add(new PartitionGroupInfo(Math.abs(shard.shardId().hashCode()), kinesisCheckpoint.serialize())); } return partitionGroupInfos; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org