This is an automated email from the ASF dual-hosted git repository.

nehapawar pushed a commit to branch sharded_consumer_type_support_with_kinesis
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit d7d04081131363e582340dfd8dc11fc1a92f3e5a
Author: Neha Pawar <neha.pawa...@gmail.com>
AuthorDate: Wed Jan 6 18:21:20 2021 -0800

    Use shardId's last digits as partitionGroupId
---
 .../helix/core/PinotHelixResourceManager.java      |  4 +-
 .../realtime/PinotLLCRealtimeSegmentManager.java   | 10 +++
 .../kinesis/KinesisStreamMetadataProvider.java     | 79 +++++++++++++++++-----
 .../pinot/spi/stream/StreamMetadataProvider.java   |  3 +-
 4 files changed, 75 insertions(+), 21 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index b50da5f..d1c8755 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -1374,8 +1374,8 @@ public class PinotHelixResourceManager {
       // (unless there are low-level segments already present)
       if (ZKMetadataProvider.getLLCRealtimeSegments(_propertyStore, 
realtimeTableName).isEmpty()) {
         PinotTableIdealStateBuilder
-            .buildLowLevelRealtimeIdealStateFor(realtimeTableName, 
realtimeTableConfig, idealState,
-                _enableBatchMessageMode);
+            
.buildLowLevelRealtimeIdealStateFor(_pinotLLCRealtimeSegmentManager, 
realtimeTableName, realtimeTableConfig,
+                idealState, _enableBatchMessageMode);
         LOGGER.info("Successfully added Helix entries for low-level consumers 
for {} ", realtimeTableName);
       } else {
         LOGGER.info("LLC is already set up for table {}, not configuring 
again", realtimeTableName);
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 61ef719..bbd1ef3 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -519,6 +519,11 @@ public class PinotLLCRealtimeSegmentManager {
       PartitionGroupMetadata currentPartitionGroupMetadata = 
currentGroupIdToMetadata.get(newPartitionGroupId);
       if (currentPartitionGroupMetadata == null) { // not present in current 
state. New partition found.
         // make new segment
+        // FIXME: flushThreshold of segment is actually (configured 
threshold/numPartitions)
+        //  In Kinesis, with every split/merge, we get new partitions, and an 
old partition gets deactivated.
+        //  However, the getPartitionGroupInfo call returns ALL shards, 
regardless of whether they're active or not.
+        //  So our numPartitions will forever keep increasing.
+        // TODO: can the getPartitionGroupInfo return the active partitions 
only, based on the checkpoints passed in current?
         String newLLCSegmentName =
             setupNewPartitionGroup(tableConfig, streamConfig, 
partitionGroupInfo, newSegmentCreationTimeMs,
                 instancePartitions, numPartitions, numReplicas);
@@ -534,6 +539,11 @@ public class PinotLLCRealtimeSegmentManager {
           createNewSegmentZKMetadata(tableConfig, streamConfig, 
newLLCSegmentName, newSegmentCreationTimeMs,
               committingSegmentDescriptor, committingSegmentZKMetadata, 
instancePartitions, numPartitions, numReplicas);
           newConsumingSegmentNames.add(newLLCSegmentName.getSegmentName());
+
+          // FIXME: a new CONSUMING segment is created even if EOL for this 
shard has been reached.
+          //  the logic in getPartitionGroupInfo to prevent returning of EOLed 
shards isn't working
+          //  OPTION: Since consumer knows about it, it can pass param in 
request/committingSegmentDescriptor "isEndOfShard"
+          //  We can set that in metadata for validation manager to skip these 
partitions
         }
       }
     }
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProvider.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProvider.java
index f86d06c..6c55a18 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProvider.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProvider.java
@@ -6,7 +6,9 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
 import javax.annotation.Nonnull;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.pinot.spi.stream.OffsetCriteria;
 import org.apache.pinot.spi.stream.PartitionGroupInfo;
 import org.apache.pinot.spi.stream.PartitionGroupMetadata;
@@ -16,44 +18,85 @@ import software.amazon.awssdk.services.kinesis.model.Shard;
 
 
 public class KinesisStreamMetadataProvider implements StreamMetadataProvider {
-  private final KinesisConfig _kinesisConfig;
-  private KinesisConnectionHandler _kinesisConnectionHandler;
+  private final KinesisConnectionHandler _kinesisConnectionHandler;
 
   public KinesisStreamMetadataProvider(String clientId, KinesisConfig 
kinesisConfig) {
-    _kinesisConfig = kinesisConfig;
     _kinesisConnectionHandler = new 
KinesisConnectionHandler(kinesisConfig.getStream(), 
kinesisConfig.getAwsRegion());
   }
 
   @Override
   public int fetchPartitionCount(long timeoutMillis) {
-    return 0;
+    throw new UnsupportedOperationException();
   }
 
   @Override
-  public long fetchPartitionOffset(@Nonnull OffsetCriteria offsetCriteria, 
long timeoutMillis)
-      throws TimeoutException {
-    return 0;
+  public long fetchPartitionOffset(@Nonnull OffsetCriteria offsetCriteria, 
long timeoutMillis) {
+    throw new UnsupportedOperationException();
   }
 
   @Override
   public List<PartitionGroupInfo> getPartitionGroupInfoList(String clientId, 
StreamConfig streamConfig,
       List<PartitionGroupMetadata> currentPartitionGroupsMetadata, int 
timeoutMillis)
-      throws TimeoutException {
-    List<PartitionGroupInfo> partitionGroupInfos = new ArrayList<>();
+      throws IOException {
+
+    Map<Integer, PartitionGroupMetadata> currentPartitionGroupMap =
+        
currentPartitionGroupsMetadata.stream().collect(Collectors.toMap(PartitionGroupMetadata::getPartitionGroupId,
 p -> p));
+
+    List<PartitionGroupInfo> newPartitionGroupInfos = new ArrayList<>();
     List<Shard> shards = _kinesisConnectionHandler.getShards();
-    for (Shard shard : shards) {
-      Map<String, String> shardToSequenceNumMap = new HashMap<>();
-      shardToSequenceNumMap.put(shard.shardId(), 
shard.sequenceNumberRange().startingSequenceNumber());
-      KinesisCheckpoint kinesisCheckpoint = new 
KinesisCheckpoint(shardToSequenceNumMap);
-      partitionGroupInfos
-          .add(new PartitionGroupInfo(Math.abs(shard.shardId().hashCode()), 
kinesisCheckpoint.serialize()));
+    for (Shard shard : shards) { // go over all shards
+      String shardId = shard.shardId();
+      int partitionGroupId = getPartitionGroupIdFromShardId(shardId);
+      PartitionGroupMetadata currentPartitionGroupMetadata = 
currentPartitionGroupMap.get(partitionGroupId);
+      KinesisCheckpoint newStartCheckpoint;
+      if (currentPartitionGroupMetadata != null) { // existing shard
+        KinesisCheckpoint currentEndCheckpoint = null;
+        try {
+          currentEndCheckpoint = new 
KinesisCheckpoint(currentPartitionGroupMetadata.getEndCheckpoint());
+        } catch (Exception e) {
+          // ignore. No end checkpoint yet for IN_PROGRESS segment
+        }
+        if (currentEndCheckpoint != null) { // end checkpoint available i.e. 
committing segment
+          String endingSequenceNumber = 
shard.sequenceNumberRange().endingSequenceNumber();
+          if (endingSequenceNumber != null) { // shard has ended
+            // FIXME: this logic is not working
+            //  was expecting sequenceNumOfLastMsgInShard == 
endSequenceNumOfShard.
+            //  But it is much lesser than the endSeqNumOfShard
+            Map<String, String> shardToSequenceNumberMap = new HashMap<>();
+            shardToSequenceNumberMap.put(shardId, endingSequenceNumber);
+            KinesisCheckpoint shardEndCheckpoint = new 
KinesisCheckpoint(shardToSequenceNumberMap);
+            if (currentEndCheckpoint.compareTo(shardEndCheckpoint) >= 0) {
+              // shard has ended AND we have reached the end checkpoint.
+              // skip this partition group in the result
+              continue;
+            }
+          }
+          newStartCheckpoint = currentEndCheckpoint;
+        } else {
+          newStartCheckpoint = new 
KinesisCheckpoint(currentPartitionGroupMetadata.getStartCheckpoint());
+        }
+      } else { // new shard
+        Map<String, String> shardToSequenceNumberMap = new HashMap<>();
+        shardToSequenceNumberMap.put(shardId, 
shard.sequenceNumberRange().startingSequenceNumber());
+        newStartCheckpoint = new KinesisCheckpoint(shardToSequenceNumberMap);
+      }
+      newPartitionGroupInfos
+          .add(new PartitionGroupInfo(partitionGroupId, 
newStartCheckpoint.serialize()));
     }
-    return partitionGroupInfos;
+    return newPartitionGroupInfos;
+  }
+
+  /**
+   * Converts a shardId string to a partitionGroupId integer by parsing the 
digits of the shardId
+   * e.g. "shardId-000000000001" becomes 1
+   */
+  private int getPartitionGroupIdFromShardId(String shardId) {
+    String shardIdNum = 
StringUtils.stripStart(StringUtils.removeStart(shardId, "shardId-"), "0");
+    return shardIdNum.isEmpty() ? 0 : Integer.parseInt(shardIdNum);
   }
 
   @Override
-  public void close()
-      throws IOException {
+  public void close() {
 
   }
 }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java
index c64f710..be2e819 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.spi.stream;
 
 import java.io.Closeable;
+import java.io.IOException;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.List;
@@ -64,7 +65,7 @@ public interface StreamMetadataProvider extends Closeable {
    */
   default List<PartitionGroupInfo> getPartitionGroupInfoList(String clientId, 
StreamConfig streamConfig,
       List<PartitionGroupMetadata> currentPartitionGroupsMetadata, int 
timeoutMillis)
-      throws TimeoutException {
+      throws TimeoutException, IOException {
     int partitionCount = fetchPartitionCount(timeoutMillis);
     List<PartitionGroupInfo> newPartitionGroupInfoList = new 
ArrayList<>(partitionCount);
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to