This is an automated email from the ASF dual-hosted git repository.

nehapawar pushed a commit to branch sharded_consumer_type_support_with_kinesis
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to 
refs/heads/sharded_consumer_type_support_with_kinesis by this push:
     new f00c8dc  End-of-shard as end criteria AND consume shards in order
f00c8dc is described below

commit f00c8dca81cf6adc7863b05a47a0801e5a564442
Author: Neha Pawar <neha.pawa...@gmail.com>
AuthorDate: Fri Jan 15 17:38:28 2021 -0800

    End-of-shard as end criteria AND consume shards in order
---
 .../segment/LLCRealtimeSegmentZKMetadata.java      |   5 -
 .../protocols/SegmentCompletionProtocol.java       |   2 +
 .../realtime/PinotLLCRealtimeSegmentManager.java   |  31 +++---
 .../RealtimeSegmentValidationManager.java          |   2 +-
 .../PinotLLCRealtimeSegmentManagerTest.java        |   2 +-
 .../realtime/LLRealtimeSegmentDataManager.java     |  37 ++++---
 .../kinesis/KinesisStreamMetadataProvider.java     | 106 ++++++++++++---------
 .../pinot/spi/stream/PartitionGroupInfo.java       |   6 +-
 .../pinot/spi/stream/PartitionGroupMetadata.java   |  16 ++--
 .../pinot/spi/stream/StreamMetadataProvider.java   |   2 +-
 10 files changed, 120 insertions(+), 89 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/LLCRealtimeSegmentZKMetadata.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/LLCRealtimeSegmentZKMetadata.java
index b8b8d95..7cb19a7 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/LLCRealtimeSegmentZKMetadata.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/LLCRealtimeSegmentZKMetadata.java
@@ -87,11 +87,6 @@ public class LLCRealtimeSegmentZKMetadata extends 
RealtimeSegmentZKMetadata {
   public ZNRecord toZNRecord() {
     ZNRecord znRecord = super.toZNRecord();
     znRecord.setSimpleField(START_OFFSET, _startOffset);
-    if (_endOffset == null) {
-      // TODO Issue 5359 Keep this until all components have upgraded to a 
version that can handle _offset being null
-      // For backward compatibility until all components have been upgraded to 
deal with null value for _endOffset
-      _endOffset = Long.toString(Long.MAX_VALUE);
-    }
     znRecord.setSimpleField(END_OFFSET, _endOffset);
     znRecord.setIntField(NUM_REPLICAS, _numReplicas);
     znRecord.setSimpleField(DOWNLOAD_URL, _downloadUrl);
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java
index dd1330d..6dcbda2 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java
@@ -138,6 +138,8 @@ public class SegmentCompletionProtocol {
 
   public static final String REASON_ROW_LIMIT = "rowLimit";  // Stop reason 
sent by server as max num rows reached
   public static final String REASON_TIME_LIMIT = "timeLimit";  // Stop reason 
sent by server as max time reached
+  public static final String REASON_END_OF_PARTITION_GROUP = 
"endOfPartitionGroup";
+      // Stop reason sent by server as end of partitionGroup reached
 
   // Canned responses
   public static final Response RESP_NOT_LEADER =
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 8c17ff4..3a3dcf6 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -78,6 +78,7 @@ import org.apache.pinot.spi.filesystem.PinotFS;
 import org.apache.pinot.spi.filesystem.PinotFSFactory;
 import org.apache.pinot.spi.stream.Checkpoint;
 import org.apache.pinot.spi.stream.OffsetCriteria;
+import org.apache.pinot.spi.stream.PartitionGroupCheckpointFactory;
 import org.apache.pinot.spi.stream.PartitionGroupInfo;
 import org.apache.pinot.spi.stream.PartitionGroupMetadata;
 import org.apache.pinot.spi.stream.PartitionLevelStreamConfig;
@@ -166,7 +167,8 @@ public class PinotLLCRealtimeSegmentManager {
    * Using the ideal state and segment metadata, return a list of {@link 
PartitionGroupMetadata}
    * for latest segment of each partition group
    */
-  public List<PartitionGroupMetadata> 
getCurrentPartitionGroupMetadataList(IdealState idealState) {
+  public List<PartitionGroupMetadata> 
getCurrentPartitionGroupMetadataList(IdealState idealState,
+      StreamConfig streamConfig) {
     List<PartitionGroupMetadata> partitionGroupMetadataList = new 
ArrayList<>();
 
     // From all segment names in the ideal state, find unique partition group 
ids and their latest segment
@@ -185,6 +187,8 @@ public class PinotLLCRealtimeSegmentManager {
     }
 
     // Create a PartitionGroupMetadata for each latest segment
+    PartitionGroupCheckpointFactory checkpointFactory =
+        
StreamConsumerFactoryProvider.create(streamConfig).createStreamMsgOffsetFactory();
     for (Map.Entry<Integer, LLCSegmentName> entry : 
partitionGroupIdToLatestSegment.entrySet()) {
       int partitionGroupId = entry.getKey();
       LLCSegmentName llcSegmentName = entry.getValue();
@@ -195,7 +199,9 @@ public class PinotLLCRealtimeSegmentManager {
           (LLCRealtimeSegmentZKMetadata) realtimeSegmentZKMetadata;
       PartitionGroupMetadata partitionGroupMetadata =
           new PartitionGroupMetadata(partitionGroupId, 
llcSegmentName.getSequenceNumber(),
-              llRealtimeSegmentZKMetadata.getStartOffset(), 
llRealtimeSegmentZKMetadata.getEndOffset(),
+              
checkpointFactory.create(llRealtimeSegmentZKMetadata.getStartOffset()),
+              llRealtimeSegmentZKMetadata.getEndOffset() == null ? null
+                  : 
checkpointFactory.create(llRealtimeSegmentZKMetadata.getEndOffset()),
               llRealtimeSegmentZKMetadata.getStatus().toString());
       partitionGroupMetadataList.add(partitionGroupMetadata);
     }
@@ -498,9 +504,10 @@ public class PinotLLCRealtimeSegmentManager {
     // Example: Say we currently were consuming from 2 shards A, B. Of those, 
A is the one committing.
 
     // Get current partition groups - this gives current state of latest 
segments for each partition [A - DONE], [B - IN_PROGRESS]
-    List<PartitionGroupMetadata> currentPartitionGroupMetadataList = 
getCurrentPartitionGroupMetadataList(idealState);
     PartitionLevelStreamConfig streamConfig = new 
PartitionLevelStreamConfig(tableConfig.getTableName(),
         IngestionConfigUtils.getStreamConfigMap(tableConfig));
+    List<PartitionGroupMetadata> currentPartitionGroupMetadataList =
+        getCurrentPartitionGroupMetadataList(idealState, streamConfig);
 
     // Find new partition groups [A],[B],[C],[D] (assume A split into C D)
     // If segment has consumed all of A, we will receive B,C,D
@@ -610,9 +617,7 @@ public class PinotLLCRealtimeSegmentManager {
       int numPartitions, int numReplicas) {
     String realtimeTableName = tableConfig.getTableName();
     String segmentName = newLLCSegmentName.getSegmentName();
-    StreamPartitionMsgOffsetFactory offsetFactory =
-        
StreamConsumerFactoryProvider.create(streamConfig).createStreamMsgOffsetFactory();
-    StreamPartitionMsgOffset startOffset = 
offsetFactory.create(committingSegmentDescriptor.getNextOffset());
+    String startOffset = committingSegmentDescriptor.getNextOffset();
     LOGGER
         .info("Creating segment ZK metadata for new CONSUMING segment: {} with 
start offset: {} and creation time: {}",
             segmentName, startOffset, creationTimeMs);
@@ -621,7 +626,7 @@ public class PinotLLCRealtimeSegmentManager {
     newSegmentZKMetadata.setTableName(realtimeTableName);
     newSegmentZKMetadata.setSegmentName(segmentName);
     newSegmentZKMetadata.setCreationTime(creationTimeMs);
-    newSegmentZKMetadata.setStartOffset(startOffset.toString());
+    newSegmentZKMetadata.setStartOffset(startOffset);
     // Leave maxOffset as null.
     newSegmentZKMetadata.setNumReplicas(numReplicas);
     newSegmentZKMetadata.setStatus(Status.IN_PROGRESS);
@@ -808,7 +813,7 @@ public class PinotLLCRealtimeSegmentManager {
       assert idealState != null;
       if (idealState.isEnabled()) {
         List<PartitionGroupMetadata> currentPartitionGroupMetadataList =
-            getCurrentPartitionGroupMetadataList(idealState);
+            getCurrentPartitionGroupMetadataList(idealState, streamConfig);
         List<PartitionGroupInfo> newPartitionGroupInfoList =
             getPartitionGroupInfoList(streamConfig, 
currentPartitionGroupMetadataList);
         return ensureAllPartitionsConsuming(tableConfig, streamConfig, 
idealState, newPartitionGroupInfoList);
@@ -1102,7 +1107,7 @@ public class PinotLLCRealtimeSegmentManager {
     return idealState;
   }
 
-  private StreamPartitionMsgOffset 
getPartitionGroupStartCheckpoint(StreamConfig streamConfig, int 
partitionGroupId) {
+  private Checkpoint getPartitionGroupStartCheckpoint(StreamConfig 
streamConfig, int partitionGroupId) {
     Map<String, String> streamConfigMapWithSmallestOffsetCriteria = new 
HashMap<>(streamConfig.getStreamConfigsMap());
     streamConfigMapWithSmallestOffsetCriteria.put(StreamConfigProperties
             .constructStreamProperty(streamConfig.getType(), 
StreamConfigProperties.STREAM_CONSUMER_OFFSET_CRITERIA),
@@ -1111,12 +1116,10 @@ public class PinotLLCRealtimeSegmentManager {
         new StreamConfig(streamConfig.getTableNameWithType(), 
streamConfigMapWithSmallestOffsetCriteria);
     List<PartitionGroupInfo> smallestOffsetCriteriaPartitionGroupInfo =
         getPartitionGroupInfoList(smallestOffsetCriteriaStreamConfig, 
Collections.emptyList());
-    StreamPartitionMsgOffset partitionStartOffset = null;
+    Checkpoint partitionStartOffset = null;
     for (PartitionGroupInfo info : smallestOffsetCriteriaPartitionGroupInfo) {
       if (info.getPartitionGroupId() == partitionGroupId) {
-        StreamPartitionMsgOffsetFactory factory =
-            
StreamConsumerFactoryProvider.create(streamConfig).createStreamMsgOffsetFactory();
-        partitionStartOffset = factory.create(info.getStartCheckpoint());
+        partitionStartOffset = info.getStartCheckpoint();
         break;
       }
     }
@@ -1136,7 +1139,7 @@ public class PinotLLCRealtimeSegmentManager {
       long creationTimeMs, InstancePartitions instancePartitions, int 
numPartitionGroups, int numReplicas) {
     String realtimeTableName = tableConfig.getTableName();
     int partitionGroupId = partitionGroupInfo.getPartitionGroupId();
-    String startCheckpoint = partitionGroupInfo.getStartCheckpoint();
+    String startCheckpoint = 
partitionGroupInfo.getStartCheckpoint().toString();
     LOGGER.info("Setting up new partition group: {} for table: {}", 
partitionGroupId, realtimeTableName);
 
     String rawTableName = 
TableNameBuilder.extractRawTableName(realtimeTableName);
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
index 96604dd..d611433 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
@@ -58,7 +58,7 @@ public class RealtimeSegmentValidationManager extends 
ControllerPeriodicTask<Rea
       LeadControllerManager leadControllerManager, 
PinotLLCRealtimeSegmentManager llcRealtimeSegmentManager,
       ValidationMetrics validationMetrics, ControllerMetrics 
controllerMetrics) {
     super("RealtimeSegmentValidationManager", 
config.getRealtimeSegmentValidationFrequencyInSeconds(),
-        6000, pinotHelixResourceManager,
+        config.getRealtimeSegmentValidationManagerInitialDelaySeconds(), 
pinotHelixResourceManager,
         leadControllerManager, controllerMetrics);
     _llcRealtimeSegmentManager = llcRealtimeSegmentManager;
     _validationMetrics = validationMetrics;
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
index c19a845..ecbf2ef 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
@@ -920,7 +920,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
     List<PartitionGroupInfo> getPartitionGroupInfoList(StreamConfig 
streamConfig,
         List<PartitionGroupMetadata> currentPartitionGroupMetadataList) {
       return IntStream.range(0, _numPartitions).mapToObj(i -> new 
PartitionGroupInfo(i,
-          PARTITION_OFFSET.toString()))
+          PARTITION_OFFSET))
           .collect(Collectors.toList());
     }
 
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index 4c1d8f4..63eafc1 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -160,7 +160,7 @@ public class LLRealtimeSegmentDataManager extends 
RealtimeSegmentDataManager {
         Checkpoint offset, long buildTimeMillis, long waitTimeMillis, long 
segmentSizeBytes) {
       _segmentTarFile = segmentTarFile;
       _metadataFileMap = metadataFileMap;
-      _offset = _streamPartitionMsgOffsetFactory.create(offset);
+      _offset = _checkpointFactory.create(offset);
       _buildTimeMillis = buildTimeMillis;
       _waitTimeMillis = waitTimeMillis;
       _segmentSizeBytes = segmentSizeBytes;
@@ -235,11 +235,12 @@ public class LLRealtimeSegmentDataManager extends 
RealtimeSegmentDataManager {
   private final SegmentVersion _segmentVersion;
   private final SegmentBuildTimeLeaseExtender _leaseExtender;
   private SegmentBuildDescriptor _segmentBuildDescriptor;
-  private StreamConsumerFactory _streamConsumerFactory;
-  private PartitionGroupCheckpointFactory _streamPartitionMsgOffsetFactory;
+  private final StreamConsumerFactory _streamConsumerFactory;
+  private final PartitionGroupCheckpointFactory _checkpointFactory;
 
   // Segment end criteria
   private volatile long _consumeEndTime = 0;
+  private boolean _endOfPartitionGroup = false;
   private Checkpoint _finalOffset; // Used when we want to catch up to this one
   private volatile boolean _shouldStop = false;
 
@@ -263,7 +264,7 @@ public class LLRealtimeSegmentDataManager extends 
RealtimeSegmentDataManager {
   private final List<String> _noDictionaryColumns;
   private final List<String> _varLengthDictionaryColumns;
   private final String _sortedColumn;
-  private Logger segmentLogger;
+  private final Logger segmentLogger;
   private final String _tableStreamName;
   private final PinotDataBufferMemoryManager _memoryManager;
   private AtomicLong _lastUpdatedRowsIndexed = new AtomicLong(0);
@@ -306,6 +307,14 @@ public class LLRealtimeSegmentDataManager extends 
RealtimeSegmentDataManager {
               _numRowsIndexed, _numRowsConsumed, _segmentMaxRowCount);
           _stopReason = SegmentCompletionProtocol.REASON_ROW_LIMIT;
           return true;
+        } else if (_endOfPartitionGroup) {
+          segmentLogger.info(
+              "Stopping consumption due to end of partitionGroup reached 
nRows={} numRowsIndexed={}, numRowsConsumed={}",
+              _numRowsIndexed, _numRowsConsumed, _segmentMaxRowCount);
+          _stopReason = 
SegmentCompletionProtocol.REASON_END_OF_PARTITION_GROUP;
+          // fixme: Handle creating a segment with 0 rows.
+          //  Happens if endOfPartitionGroup reached but no rows were consumed
+          return true;
         }
         return false;
 
@@ -369,7 +378,7 @@ public class LLRealtimeSegmentDataManager extends 
RealtimeSegmentDataManager {
     final long idlePipeSleepTimeMillis = 100;
     final long maxIdleCountBeforeStatUpdate = (3 * 60 * 1000) / 
(idlePipeSleepTimeMillis + _partitionLevelStreamConfig
         .getFetchTimeoutMillis());  // 3 minute count
-    Checkpoint lastUpdatedOffset = _streamPartitionMsgOffsetFactory
+    Checkpoint lastUpdatedOffset = _checkpointFactory
         .create(_currentOffset);  // so that we always update the metric when 
we enter this method.
     long consecutiveIdleCount = 0;
     // At this point, we know that we can potentially move the offset, so the 
old saved segment file is not valid
@@ -384,6 +393,7 @@ public class LLRealtimeSegmentDataManager extends 
RealtimeSegmentDataManager {
       try {
         messageBatch = _partitionGroupConsumer
             .fetchMessages(_currentOffset, null, 
_partitionLevelStreamConfig.getFetchTimeoutMillis());
+        _endOfPartitionGroup = messageBatch.isEndOfPartitionGroup();
         consecutiveErrorCount = 0;
       } catch (TimeoutException e) {
         handleTransientStreamErrors(e);
@@ -410,7 +420,7 @@ public class LLRealtimeSegmentDataManager extends 
RealtimeSegmentDataManager {
 //        _serverMetrics.setValueOfTableGauge(_metricKeyName, 
ServerGauge.HIGHEST_KAFKA_OFFSET_CONSUMED, _currentOffset.getOffset());
 //        _serverMetrics.setValueOfTableGauge(_metricKeyName, 
ServerGauge.HIGHEST_STREAM_OFFSET_CONSUMED, _currentOffset.getOffset());
         _serverMetrics.setValueOfTableGauge(_metricKeyName, 
ServerGauge.LLC_PARTITION_CONSUMING, 1);
-        lastUpdatedOffset = 
_streamPartitionMsgOffsetFactory.create(_currentOffset);
+        lastUpdatedOffset = _checkpointFactory.create(_currentOffset);
       } else {
         // We did not consume any rows. Update the partition-consuming metric 
only if we have been idling for a long time.
         // Create a new stream consumer wrapper, in case we are stuck on 
something.
@@ -668,10 +678,10 @@ public class LLRealtimeSegmentDataManager extends 
RealtimeSegmentDataManager {
   @VisibleForTesting
   protected Checkpoint extractOffset(SegmentCompletionProtocol.Response 
response) {
     if (response.getStreamPartitionMsgOffset() != null) {
-      return 
_streamPartitionMsgOffsetFactory.create(response.getStreamPartitionMsgOffset());
+      return _checkpointFactory.create(response.getStreamPartitionMsgOffset());
     } else {
       // TODO Issue 5359 Remove this once the protocol is upgraded on server 
and controller
-      return 
_streamPartitionMsgOffsetFactory.create(Long.toString(response.getOffset()));
+      return _checkpointFactory.create(Long.toString(response.getOffset()));
     }
   }
 
@@ -966,7 +976,7 @@ public class LLRealtimeSegmentDataManager extends 
RealtimeSegmentDataManager {
       // Remove the segment file before we do anything else.
       removeSegmentFile();
       _leaseExtender.removeSegment(_segmentNameStr);
-      final Checkpoint endOffset = 
_streamPartitionMsgOffsetFactory.create(llcMetadata.getEndOffset());
+      final Checkpoint endOffset = 
_checkpointFactory.create(llcMetadata.getEndOffset());
       segmentLogger
           .info("State: {}, transitioning from CONSUMING to ONLINE 
(startOffset: {}, endOffset: {})", _state.toString(),
               _startOffset, endOffset);
@@ -1126,14 +1136,15 @@ public class LLRealtimeSegmentDataManager extends 
RealtimeSegmentDataManager {
     _partitionLevelStreamConfig =
         new PartitionLevelStreamConfig(_tableNameWithType, 
IngestionConfigUtils.getStreamConfigMap(_tableConfig));
     _streamConsumerFactory = 
StreamConsumerFactoryProvider.create(_partitionLevelStreamConfig);
-    _streamPartitionMsgOffsetFactory =
+    _checkpointFactory =
         
StreamConsumerFactoryProvider.create(_partitionLevelStreamConfig).createStreamMsgOffsetFactory();
     _streamTopic = _partitionLevelStreamConfig.getTopicName();
     _segmentNameStr = _segmentZKMetadata.getSegmentName();
     _llcSegmentName = llcSegmentName;
     _partitionGroupId = _llcSegmentName.getPartitionGroupId();
     _partitionGroupMetadata = new PartitionGroupMetadata(_partitionGroupId, 
_llcSegmentName.getSequenceNumber(),
-        _segmentZKMetadata.getStartOffset(), _segmentZKMetadata.getEndOffset(),
+        _checkpointFactory.create(_segmentZKMetadata.getStartOffset()),
+        _segmentZKMetadata.getEndOffset() == null ? null : 
_checkpointFactory.create(_segmentZKMetadata.getEndOffset()),
         _segmentZKMetadata.getStatus().toString());
     _partitionGroupConsumerSemaphore = partitionGroupConsumerSemaphore;
     _acquiredConsumerSemaphore = new AtomicBoolean(false);
@@ -1273,8 +1284,8 @@ public class LLRealtimeSegmentDataManager extends 
RealtimeSegmentDataManager {
     }
 
     _realtimeSegment = new 
MutableSegmentImpl(realtimeSegmentConfigBuilder.build(), serverMetrics);
-    _startOffset = 
_streamPartitionMsgOffsetFactory.create(_segmentZKMetadata.getStartOffset());
-    _currentOffset = _streamPartitionMsgOffsetFactory.create(_startOffset);
+    _startOffset = 
_checkpointFactory.create(_segmentZKMetadata.getStartOffset());
+    _currentOffset = _checkpointFactory.create(_startOffset);
     _resourceTmpDir = new File(resourceDataDir, "_tmp");
     if (!_resourceTmpDir.exists()) {
       _resourceTmpDir.mkdirs();
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProvider.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProvider.java
index b22bbe4..42150a3 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProvider.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProvider.java
@@ -21,12 +21,15 @@ package org.apache.pinot.plugin.stream.kinesis;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.TimeoutException;
 import java.util.stream.Collectors;
 import javax.annotation.Nonnull;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.spi.stream.Checkpoint;
 import org.apache.pinot.spi.stream.MessageBatch;
 import org.apache.pinot.spi.stream.OffsetCriteria;
 import org.apache.pinot.spi.stream.PartitionGroupConsumer;
@@ -69,7 +72,7 @@ public class KinesisStreamMetadataProvider implements 
StreamMetadataProvider {
   /**
    * This call returns all active shards, taking into account the consumption 
status for those shards.
    * PartitionGroupInfo is returned for a shard if:
-   * 1. It is a branch new shard i.e. no partitionGroupMetadata was found for 
it in the current list
+   * 1. It is a branch new shard AND its parent has been consumed completely
    * 2. It is still being actively consumed from i.e. the consuming partition 
has not reached the end of the shard
    */
   @Override
@@ -77,54 +80,57 @@ public class KinesisStreamMetadataProvider implements 
StreamMetadataProvider {
       List<PartitionGroupMetadata> currentPartitionGroupsMetadata, int 
timeoutMillis)
       throws IOException, TimeoutException {
 
-    Map<Integer, PartitionGroupMetadata> currentPartitionGroupMap = 
currentPartitionGroupsMetadata.stream()
-        .collect(Collectors.toMap(PartitionGroupMetadata::getPartitionGroupId, 
p -> p));
-
     List<PartitionGroupInfo> newPartitionGroupInfos = new ArrayList<>();
-    List<Shard> shards = _kinesisConnectionHandler.getShards();
-    for (Shard shard : shards) {
-      KinesisCheckpoint newStartCheckpoint;
-
-      String shardId = shard.shardId();
-      int partitionGroupId = getPartitionGroupIdFromShardId(shardId);
-      PartitionGroupMetadata currentPartitionGroupMetadata = 
currentPartitionGroupMap.get(partitionGroupId);
-
-      if (currentPartitionGroupMetadata != null) { // existing shard
-        KinesisCheckpoint currentEndCheckpoint = null;
-        try {
-          currentEndCheckpoint = new 
KinesisCheckpoint(currentPartitionGroupMetadata.getEndCheckpoint());
-        } catch (Exception e) {
-          // ignore. No end checkpoint yet for IN_PROGRESS segment
-        }
-        if (currentEndCheckpoint != null) { // end checkpoint available i.e. 
committing/committed segment
-          String endingSequenceNumber = 
shard.sequenceNumberRange().endingSequenceNumber();
-          if (endingSequenceNumber != null) { // shard has ended
-            // check if segment has consumed all the messages already
-            PartitionGroupConsumer partitionGroupConsumer =
-                
_kinesisStreamConsumerFactory.createPartitionGroupConsumer(_clientId, 
currentPartitionGroupMetadata);
-
-            MessageBatch messageBatch;
-            try {
-              messageBatch = 
partitionGroupConsumer.fetchMessages(currentEndCheckpoint, null, 
_fetchTimeoutMs);
-            } finally {
-              partitionGroupConsumer.close();
-            }
-            if (messageBatch.isEndOfPartitionGroup()) {
-              // shard has ended. Skip it from results
-              continue;
-            }
+
+    Map<String, Shard> shardIdToShardMap =
+        
_kinesisConnectionHandler.getShards().stream().collect(Collectors.toMap(Shard::shardId,
 s -> s));
+    Set<String> shardsInCurrent = new HashSet<>();
+    Set<String> shardsEnded = new HashSet<>();
+
+    // TODO: Once we start supporting multiple shards in a PartitionGroup,
+    //  we need to iterate over all shards to check if any of them have 
reached end
+
+    // Process existing shards. Add them to new list if still consuming from 
them
+    for (PartitionGroupMetadata currentPartitionGroupMetadata : 
currentPartitionGroupsMetadata) {
+      KinesisCheckpoint kinesisStartCheckpoint = (KinesisCheckpoint) 
currentPartitionGroupMetadata.getStartCheckpoint();
+      String shardId = 
kinesisStartCheckpoint.getShardToStartSequenceMap().keySet().iterator().next();
+      Shard shard = shardIdToShardMap.get(shardId);
+      shardsInCurrent.add(shardId);
+
+      Checkpoint newStartCheckpoint;
+      Checkpoint currentEndCheckpoint = 
currentPartitionGroupMetadata.getEndCheckpoint();
+      if (currentEndCheckpoint != null) { // Segment DONE 
(committing/committed)
+        String endingSequenceNumber = 
shard.sequenceNumberRange().endingSequenceNumber();
+        if (endingSequenceNumber != null) { // Shard has ended, check if we're 
also done consuming it
+          if (consumedEndOfShard(currentEndCheckpoint, 
currentPartitionGroupMetadata)) {
+            shardsEnded.add(shardId);
+            continue; // Shard ended and we're done consuming it. Skip
           }
-          newStartCheckpoint = currentEndCheckpoint;
-        } else {
-          newStartCheckpoint = new 
KinesisCheckpoint(currentPartitionGroupMetadata.getStartCheckpoint());
         }
-      } else { // new shard
+        newStartCheckpoint = currentEndCheckpoint;
+      } else { // Segment IN_PROGRESS
+        newStartCheckpoint = 
currentPartitionGroupMetadata.getStartCheckpoint();
+      }
+      newPartitionGroupInfos.add(new 
PartitionGroupInfo(currentPartitionGroupMetadata.getPartitionGroupId(), 
newStartCheckpoint));
+    }
+
+    // Add new shards. Parent should be null (new table case, very first 
shards) OR we should be flagged as reached EOL and completely consumed.
+    for (Map.Entry<String, Shard> entry : shardIdToShardMap.entrySet()) {
+      String newShardId = entry.getKey();
+      if (shardsInCurrent.contains(newShardId)) {
+        continue;
+      }
+      Checkpoint newStartCheckpoint;
+      Shard newShard = entry.getValue();
+      String parentShardId = newShard.parentShardId();
+
+      if (parentShardId == null || shardsEnded.contains(parentShardId)) {
         Map<String, String> shardToSequenceNumberMap = new HashMap<>();
-        shardToSequenceNumberMap.put(shardId, 
shard.sequenceNumberRange().startingSequenceNumber());
+        shardToSequenceNumberMap.put(newShardId, 
newShard.sequenceNumberRange().startingSequenceNumber());
         newStartCheckpoint = new KinesisCheckpoint(shardToSequenceNumberMap);
+        int partitionGroupId = getPartitionGroupIdFromShardId(newShardId);
+        newPartitionGroupInfos.add(new PartitionGroupInfo(partitionGroupId, 
newStartCheckpoint));
       }
-
-      newPartitionGroupInfos.add(new PartitionGroupInfo(partitionGroupId, 
newStartCheckpoint.serialize()));
     }
     return newPartitionGroupInfos;
   }
@@ -138,6 +144,20 @@ public class KinesisStreamMetadataProvider implements 
StreamMetadataProvider {
     return shardIdNum.isEmpty() ? 0 : Integer.parseInt(shardIdNum);
   }
 
+  private boolean consumedEndOfShard(Checkpoint startCheckpoint, 
PartitionGroupMetadata partitionGroupMetadata)
+      throws IOException, TimeoutException {
+    PartitionGroupConsumer partitionGroupConsumer =
+        _kinesisStreamConsumerFactory.createPartitionGroupConsumer(_clientId, 
partitionGroupMetadata);
+
+    MessageBatch messageBatch;
+    try {
+      messageBatch = partitionGroupConsumer.fetchMessages(startCheckpoint, 
null, _fetchTimeoutMs);
+    } finally {
+      partitionGroupConsumer.close();
+    }
+    return messageBatch.isEndOfPartitionGroup();
+  }
+
   @Override
   public void close() {
 
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupInfo.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupInfo.java
index 758953d..b06e878 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupInfo.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupInfo.java
@@ -27,9 +27,9 @@ package org.apache.pinot.spi.stream;
 public class PartitionGroupInfo {
 
   private final int _partitionGroupId;
-  private final String _startCheckpoint;
+  private final Checkpoint _startCheckpoint;
 
-  public PartitionGroupInfo(int partitionGroupId, String startCheckpoint) {
+  public PartitionGroupInfo(int partitionGroupId, Checkpoint startCheckpoint) {
     _partitionGroupId = partitionGroupId;
     _startCheckpoint = startCheckpoint;
   }
@@ -38,7 +38,7 @@ public class PartitionGroupInfo {
     return _partitionGroupId;
   }
 
-  public String getStartCheckpoint() {
+  public Checkpoint getStartCheckpoint() {
     return _startCheckpoint;
   }
 }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadata.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadata.java
index a99a82b..1ac12fb 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadata.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadata.java
@@ -22,12 +22,12 @@ public class PartitionGroupMetadata {
 
   private final int _partitionGroupId;
   private int _sequenceNumber;
-  private String _startCheckpoint;
-  private String _endCheckpoint;
+  private Checkpoint _startCheckpoint;
+  private Checkpoint _endCheckpoint;
   private String _status;
 
-  public PartitionGroupMetadata(int partitionGroupId, int sequenceNumber, 
String startCheckpoint,
-      String endCheckpoint, String status) {
+  public PartitionGroupMetadata(int partitionGroupId, int sequenceNumber, 
Checkpoint startCheckpoint,
+      Checkpoint endCheckpoint, String status) {
     _partitionGroupId = partitionGroupId;
     _sequenceNumber = sequenceNumber;
     _startCheckpoint = startCheckpoint;
@@ -47,19 +47,19 @@ public class PartitionGroupMetadata {
     _sequenceNumber = sequenceNumber;
   }
 
-  public String getStartCheckpoint() {
+  public Checkpoint getStartCheckpoint() {
     return _startCheckpoint;
   }
 
-  public void setStartCheckpoint(String startCheckpoint) {
+  public void setStartCheckpoint(Checkpoint startCheckpoint) {
     _startCheckpoint = startCheckpoint;
   }
 
-  public String getEndCheckpoint() {
+  public Checkpoint getEndCheckpoint() {
     return _endCheckpoint;
   }
 
-  public void setEndCheckpoint(String endCheckpoint) {
+  public void setEndCheckpoint(Checkpoint endCheckpoint) {
     _endCheckpoint = endCheckpoint;
   }
 
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java
index cecc708..4b2751c 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java
@@ -82,7 +82,7 @@ public interface StreamMetadataProvider extends Closeable {
           streamConsumerFactory.createPartitionMetadataProvider(clientId, i);
       StreamPartitionMsgOffset streamPartitionMsgOffset =
           
partitionMetadataProvider.fetchStreamPartitionOffset(streamConfig.getOffsetCriteria(),
 timeoutMillis);
-      newPartitionGroupInfoList.add(new PartitionGroupInfo(i, 
streamPartitionMsgOffset.toString()));
+      newPartitionGroupInfoList.add(new PartitionGroupInfo(i, 
streamPartitionMsgOffset));
     }
     return newPartitionGroupInfoList;
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to