This is an automated email from the ASF dual-hosted git repository.

nehapawar pushed a commit to branch sharded_consumer_type_support_with_kinesis
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 92ddaab79edd23e232e2b2fd8fcc187830c66d40
Author: KKcorps <kharekar...@gmail.com>
AuthorDate: Fri Dec 11 23:57:29 2020 +0530

    Refactor kinesis shard metadata interface and add shardId to the metadata
---
 .../kinesis/KinesisPartitionGroupMetadataMap.java    | 20 +++++++++++++-------
 .../plugin/stream/kinesis/KinesisShardMetadata.java  |  6 ++++++
 2 files changed, 19 insertions(+), 7 deletions(-)

diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java
index bc3fef2..87f7235 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java
@@ -1,8 +1,7 @@
 package org.apache.pinot.plugin.stream.kinesis;
 
-import java.util.HashMap;
+import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
 import org.apache.pinot.spi.stream.v2.PartitionGroupMetadata;
 import org.apache.pinot.spi.stream.v2.PartitionGroupMetadataMap;
 import software.amazon.awssdk.services.kinesis.model.ListShardsRequest;
@@ -11,7 +10,7 @@ import software.amazon.awssdk.services.kinesis.model.Shard;
 
 
 public class KinesisPartitionGroupMetadataMap extends KinesisConnectionHandler 
implements PartitionGroupMetadataMap {
-  private Map<String, PartitionGroupMetadata> _stringPartitionGroupMetadataMap 
= new HashMap<>();
+  private final List<PartitionGroupMetadata> 
_stringPartitionGroupMetadataIndex = new ArrayList<>();
 
   public KinesisPartitionGroupMetadataMap(String stream, String awsRegion){
     super(awsRegion);
@@ -20,12 +19,19 @@ public class KinesisPartitionGroupMetadataMap extends 
KinesisConnectionHandler i
     for(Shard shard : shardList){
       String endingSequenceNumber = 
shard.sequenceNumberRange().endingSequenceNumber();
       KinesisShardMetadata shardMetadata = new 
KinesisShardMetadata(shard.shardId(), stream);
-      shardMetadata.setEndCheckpoint(new 
KinesisCheckpoint(endingSequenceNumber));
-      _stringPartitionGroupMetadataMap.put(shard.shardId(), shardMetadata);
+      shardMetadata.setStartCheckpoint(new 
KinesisCheckpoint(endingSequenceNumber));
+      _stringPartitionGroupMetadataIndex.add(shardMetadata);
     }
   }
 
-  public Map<String, PartitionGroupMetadata> getPartitionMetadata(){
-      return _stringPartitionGroupMetadataMap;
+  @Override
+  public List<PartitionGroupMetadata> getMetadataList() {
+    return _stringPartitionGroupMetadataIndex;
   }
+
+  @Override
+  public PartitionGroupMetadata getPartitionGroupMetadata(int index) {
+    return _stringPartitionGroupMetadataIndex.get(index);
+  }
+
 }
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java
index d50d821..4a19285 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java
@@ -8,6 +8,7 @@ import 
software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
 
 
 public class KinesisShardMetadata extends KinesisConnectionHandler implements 
PartitionGroupMetadata {
+  String _shardId;
   Checkpoint _startCheckpoint;
   Checkpoint _endCheckpoint;
 
@@ -16,6 +17,11 @@ public class KinesisShardMetadata extends 
KinesisConnectionHandler implements Pa
         ShardIteratorType.LATEST).streamName(streamName).build());
     _startCheckpoint = new 
KinesisCheckpoint(getShardIteratorResponse.shardIterator());
     _endCheckpoint = null;
+    _shardId = shardId;
+  }
+
+  public String getShardId() {
+    return _shardId;
   }
 
   @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to