This is an automated email from the ASF dual-hosted git repository.

nehapawar pushed a commit to branch sharded_consumer_type_support_with_kinesis
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 26085a88ac34d6b00737bbd68b8ff409ad281467
Author: KKcorps <kharekar...@gmail.com>
AuthorDate: Tue Dec 22 20:42:05 2020 +0530

    Change shard metadata logic
---
 .../pinot-stream-ingestion/pinot-kinesis/pom.xml   |  2 +-
 .../plugin/stream/kinesis/KinesisCheckpoint.java   |  2 +-
 .../stream/kinesis/KinesisConsumerFactory.java     |  2 +-
 .../plugin/stream/kinesis/KinesisFetchResult.java  |  2 +-
 .../kinesis/KinesisPartitionGroupMetadataMap.java  | 55 +++++++++++++++++++---
 .../stream/kinesis/KinesisShardMetadata.java       | 16 +++----
 6 files changed, 60 insertions(+), 19 deletions(-)

diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/pom.xml 
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/pom.xml
index 1abc536..0c9ae0b 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/pom.xml
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/pom.xml
@@ -35,7 +35,7 @@
   <properties>
     <pinot.root>${basedir}/../../..</pinot.root>
     <phase.prop>package</phase.prop>
-    <aws.version>2.13.46</aws.version>
+    <aws.version>2.15.50</aws.version>
   </properties>
 
   <dependencies>
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java
index 450173c..8de95e2 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java
@@ -38,7 +38,7 @@ public class KinesisCheckpoint implements Checkpoint {
   }
 
   @Override
-  public Checkpoint deserialize(byte[] blob) {
+  public KinesisCheckpoint deserialize(byte[] blob) {
     //TODO: Implement SerDe
     return new KinesisCheckpoint(new String(blob));
   }
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java
index da39aab..acac1fb 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java
@@ -38,7 +38,7 @@ public class KinesisConsumerFactory implements 
StreamConsumerFactoryV2 {
   @Override
   public PartitionGroupMetadataMap getPartitionGroupsMetadata(
       PartitionGroupMetadataMap currentPartitionGroupsMetadata) {
-    return new KinesisPartitionGroupMetadataMap(_kinesisConfig.getStream(), 
_kinesisConfig.getAwsRegion());
+    return new KinesisPartitionGroupMetadataMap(_kinesisConfig.getStream(), 
_kinesisConfig.getAwsRegion(), currentPartitionGroupsMetadata);
   }
 
   @Override
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisFetchResult.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisFetchResult.java
index 52dab66..aedcd5d 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisFetchResult.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisFetchResult.java
@@ -35,7 +35,7 @@ public class KinesisFetchResult implements 
FetchResult<Record> {
   }
 
   @Override
-  public Checkpoint getLastCheckpoint() {
+  public KinesisCheckpoint getLastCheckpoint() {
     return _kinesisCheckpoint;
   }
 
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java
index 9a34004..d77579e 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java
@@ -19,7 +19,11 @@
 package org.apache.pinot.plugin.stream.kinesis;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
 import org.apache.pinot.spi.stream.v2.PartitionGroupMetadata;
 import org.apache.pinot.spi.stream.v2.PartitionGroupMetadataMap;
 import software.amazon.awssdk.services.kinesis.model.ListShardsRequest;
@@ -30,19 +34,56 @@ import software.amazon.awssdk.services.kinesis.model.Shard;
 public class KinesisPartitionGroupMetadataMap extends KinesisConnectionHandler 
implements PartitionGroupMetadataMap {
   private final List<PartitionGroupMetadata> 
_stringPartitionGroupMetadataIndex = new ArrayList<>();
 
-  public KinesisPartitionGroupMetadataMap(String stream, String awsRegion) {
+  public KinesisPartitionGroupMetadataMap(String stream, String awsRegion,
+      PartitionGroupMetadataMap partitionGroupMetadataMap) {
+    //TODO: Handle child shards. Do not consume data from child shard unless 
parent is finished.
+    //Return metadata only for shards in current metadata
     super(stream, awsRegion);
+    KinesisPartitionGroupMetadataMap currentPartitionMeta =
+        (KinesisPartitionGroupMetadataMap) partitionGroupMetadataMap;
+    List<PartitionGroupMetadata> currentMetaList = 
currentPartitionMeta.getMetadataList();
+
     List<Shard> shardList = getShards();
+
+    Map<String, PartitionGroupMetadata> metadataMap = new HashMap<>();
+    for (PartitionGroupMetadata partitionGroupMetadata : currentMetaList) {
+      KinesisShardMetadata kinesisShardMetadata = (KinesisShardMetadata) 
partitionGroupMetadata;
+      metadataMap.put(kinesisShardMetadata.getShardId(), kinesisShardMetadata);
+    }
+
     for (Shard shard : shardList) {
-      String startSequenceNumber = 
shard.sequenceNumberRange().startingSequenceNumber();
-      String endingSequenceNumber = 
shard.sequenceNumberRange().endingSequenceNumber();
-      KinesisShardMetadata shardMetadata = new 
KinesisShardMetadata(shard.shardId(), stream, awsRegion);
-      shardMetadata.setStartCheckpoint(new 
KinesisCheckpoint(startSequenceNumber));
-      shardMetadata.setEndCheckpoint(new 
KinesisCheckpoint(endingSequenceNumber));
-      _stringPartitionGroupMetadataIndex.add(shardMetadata);
+      if (metadataMap.containsKey(shard.shardId())) {
+        //Return existing shard metadata
+        
_stringPartitionGroupMetadataIndex.add(metadataMap.get(shard.shardId()));
+      } else if (metadataMap.containsKey(shard.parentShardId())) {
+        KinesisShardMetadata kinesisShardMetadata = (KinesisShardMetadata) 
metadataMap.get(shard.parentShardId());
+        if (isProcessingFinished(kinesisShardMetadata)) {
+          //Add child shards for processing since parent has finished
+          appendShardMetadata(stream, awsRegion, shard);
+        } else {
+          //Do not process this shard unless the parent shard is finished or 
expired
+        }
+      } else {
+        //This is a new shard with no parents. We can start processing this 
shard.
+        appendShardMetadata(stream, awsRegion, shard);
+      }
     }
   }
 
+  private boolean isProcessingFinished(KinesisShardMetadata 
kinesisShardMetadata) {
+    return kinesisShardMetadata.getEndCheckpoint().getSequenceNumber() != null 
&& kinesisShardMetadata
+        
.getStartCheckpoint().getSequenceNumber().equals(kinesisShardMetadata.getEndCheckpoint().getSequenceNumber());
+  }
+
+  private void appendShardMetadata(String stream, String awsRegion, Shard 
shard) {
+    String startSequenceNumber = 
shard.sequenceNumberRange().startingSequenceNumber();
+    String endingSequenceNumber = 
shard.sequenceNumberRange().endingSequenceNumber();
+    KinesisShardMetadata shardMetadata = new 
KinesisShardMetadata(shard.shardId(), stream, awsRegion);
+    shardMetadata.setStartCheckpoint(new 
KinesisCheckpoint(startSequenceNumber));
+    shardMetadata.setEndCheckpoint(new 
KinesisCheckpoint(endingSequenceNumber));
+    _stringPartitionGroupMetadataIndex.add(shardMetadata);
+  }
+
   @Override
   public List<PartitionGroupMetadata> getMetadataList() {
     return _stringPartitionGroupMetadataIndex;
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java
index 8141cd4..327e034 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java
@@ -25,11 +25,11 @@ import 
software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse;
 import software.amazon.awssdk.services.kinesis.model.SequenceNumberRange;
 import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
 
-
+//TODO: Implement shardId as Array
 public class KinesisShardMetadata extends KinesisConnectionHandler implements 
PartitionGroupMetadata {
   String _shardId;
-  Checkpoint _startCheckpoint;
-  Checkpoint _endCheckpoint;
+  KinesisCheckpoint _startCheckpoint;
+  KinesisCheckpoint _endCheckpoint;
 
   public KinesisShardMetadata(String shardId, String streamName, String 
awsRegion) {
     super(streamName, awsRegion);
@@ -43,23 +43,23 @@ public class KinesisShardMetadata extends 
KinesisConnectionHandler implements Pa
   }
 
   @Override
-  public Checkpoint getStartCheckpoint() {
+  public KinesisCheckpoint getStartCheckpoint() {
     return _startCheckpoint;
   }
 
   @Override
-  public Checkpoint getEndCheckpoint() {
+  public KinesisCheckpoint getEndCheckpoint() {
     return _endCheckpoint;
   }
 
   @Override
   public void setStartCheckpoint(Checkpoint startCheckpoint) {
-    _startCheckpoint = startCheckpoint;
+    _startCheckpoint = (KinesisCheckpoint) startCheckpoint;
   }
 
   @Override
   public void setEndCheckpoint(Checkpoint endCheckpoint) {
-    _endCheckpoint = endCheckpoint;
+    _endCheckpoint = (KinesisCheckpoint) endCheckpoint;
   }
 
   @Override
@@ -68,7 +68,7 @@ public class KinesisShardMetadata extends 
KinesisConnectionHandler implements Pa
   }
 
   @Override
-  public PartitionGroupMetadata deserialize(byte[] blob) {
+  public KinesisShardMetadata deserialize(byte[] blob) {
     return null;
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to