This is an automated email from the ASF dual-hosted git repository. nehapawar pushed a commit to branch sharded_consumer_type_support_with_kinesis in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 92ddaab79edd23e232e2b2fd8fcc187830c66d40 Author: KKcorps <kharekar...@gmail.com> AuthorDate: Fri Dec 11 23:57:29 2020 +0530 Refactor kinesis shard metadata interface and add shardId to the metadata --- .../kinesis/KinesisPartitionGroupMetadataMap.java | 20 +++++++++++++------- .../plugin/stream/kinesis/KinesisShardMetadata.java | 6 ++++++ 2 files changed, 19 insertions(+), 7 deletions(-) diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java index bc3fef2..87f7235 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java @@ -1,8 +1,7 @@ package org.apache.pinot.plugin.stream.kinesis; -import java.util.HashMap; +import java.util.ArrayList; import java.util.List; -import java.util.Map; import org.apache.pinot.spi.stream.v2.PartitionGroupMetadata; import org.apache.pinot.spi.stream.v2.PartitionGroupMetadataMap; import software.amazon.awssdk.services.kinesis.model.ListShardsRequest; @@ -11,7 +10,7 @@ import software.amazon.awssdk.services.kinesis.model.Shard; public class KinesisPartitionGroupMetadataMap extends KinesisConnectionHandler implements PartitionGroupMetadataMap { - private Map<String, PartitionGroupMetadata> _stringPartitionGroupMetadataMap = new HashMap<>(); + private final List<PartitionGroupMetadata> _stringPartitionGroupMetadataIndex = new ArrayList<>(); public KinesisPartitionGroupMetadataMap(String stream, String awsRegion){ super(awsRegion); @@ -20,12 +19,19 @@ public class KinesisPartitionGroupMetadataMap extends KinesisConnectionHandler i for(Shard shard : shardList){ String endingSequenceNumber = shard.sequenceNumberRange().endingSequenceNumber(); KinesisShardMetadata shardMetadata = new KinesisShardMetadata(shard.shardId(), stream); - shardMetadata.setEndCheckpoint(new KinesisCheckpoint(endingSequenceNumber)); - _stringPartitionGroupMetadataMap.put(shard.shardId(), shardMetadata); + shardMetadata.setStartCheckpoint(new KinesisCheckpoint(endingSequenceNumber)); + _stringPartitionGroupMetadataIndex.add(shardMetadata); } } - public Map<String, PartitionGroupMetadata> getPartitionMetadata(){ - return _stringPartitionGroupMetadataMap; + @Override + public List<PartitionGroupMetadata> getMetadataList() { + return _stringPartitionGroupMetadataIndex; } + + @Override + public PartitionGroupMetadata getPartitionGroupMetadata(int index) { + return _stringPartitionGroupMetadataIndex.get(index); + } + } diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java index d50d821..4a19285 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java @@ -8,6 +8,7 @@ import software.amazon.awssdk.services.kinesis.model.ShardIteratorType; public class KinesisShardMetadata extends KinesisConnectionHandler implements PartitionGroupMetadata { + String _shardId; Checkpoint _startCheckpoint; Checkpoint _endCheckpoint; @@ -16,6 +17,11 @@ public class KinesisShardMetadata extends KinesisConnectionHandler implements Pa ShardIteratorType.LATEST).streamName(streamName).build()); _startCheckpoint = new KinesisCheckpoint(getShardIteratorResponse.shardIterator()); _endCheckpoint = null; + _shardId = shardId; + } + + public String getShardId() { + return _shardId; } @Override --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org