Jackie-Jiang commented on code in PR #13112:
URL: https://github.com/apache/pinot/pull/13112#discussion_r1605909440


##########
pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/SegmentZKMetadata.java:
##########
@@ -323,6 +324,15 @@ public void setEndOffset(String endOffset) {
     setValue(Segment.Realtime.END_OFFSET, endOffset);
   }
 
+  public StreamContinuationMode getContinuationMode() {
+    return _znRecord.getEnumField(Segment.Realtime.CONTINUATION_MODE, 
StreamContinuationMode.class,
+        StreamContinuationMode.RESUME);

Review Comment:
   Do you see we might add another mode in the future? I feel a `boolean` field 
of whether it is the first segment of a streaming partition is good enough.



##########
pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumer.java:
##########
@@ -55,15 +56,15 @@ default void start(StreamPartitionMsgOffset startOffset) {
    * @throws TimeoutException If the operation could not be completed within 
timeout
    * @return A batch of messages from the stream partition group
    */
-  default MessageBatch fetchMessages(StreamPartitionMsgOffset startOffset, int 
timeoutMs)
+  default MessageBatch fetchMessages(StreamPartitionMsgOffset startOffset, 
CommonConstants.Segment.Realtime.StreamContinuationMode continuationMode, int 
timeoutMs)

Review Comment:
   I feel a boolean is easier to understand, and I don't see other possible mode



##########
pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java:
##########
@@ -73,11 +74,12 @@ public KinesisConsumer(KinesisConfig config, KinesisClient 
kinesisClient) {
    * Fetch records from the Kinesis stream between the start and end 
KinesisCheckpoint
    */
   @Override
-  public KinesisMessageBatch fetchMessages(StreamPartitionMsgOffset 
startMsgOffset, int timeoutMs) {
+  public KinesisMessageBatch fetchMessages(StreamPartitionMsgOffset 
startMsgOffset,

Review Comment:
   Can we get #12806 in first if it works well?



##########
pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupOffset.java:
##########
@@ -41,18 +41,25 @@ public class KinesisPartitionGroupOffset implements 
StreamPartitionMsgOffset {
   private final String _shardId;
   private final String _sequenceNumber;
 
+  public static final String STATUS_SEPARATOR = "::";

Review Comment:
   Revert the changes in this file?



##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java:
##########
@@ -430,7 +431,8 @@ protected boolean consumeLoop()
       // Update _currentOffset upon return from this method
       MessageBatch messageBatch;
       try {
-        messageBatch = _partitionGroupConsumer.fetchMessages(_currentOffset, 
_streamConfig.getFetchTimeoutMillis());
+        messageBatch = _partitionGroupConsumer.fetchMessages(_currentOffset, 
_segmentZKMetadata.getContinuationMode(),

Review Comment:
   (MAJOR) Only the first batch should be count as ingested from new stream.
   Within the `fetchMessages()` API, we should just a boolean `inclusive` to 
mark whether we should consume the current offset



##########
pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumer.java:
##########
@@ -55,15 +56,15 @@ default void start(StreamPartitionMsgOffset startOffset) {
    * @throws TimeoutException If the operation could not be completed within 
timeout
    * @return A batch of messages from the stream partition group
    */
-  default MessageBatch fetchMessages(StreamPartitionMsgOffset startOffset, int 
timeoutMs)
+  default MessageBatch fetchMessages(StreamPartitionMsgOffset startOffset, 
CommonConstants.Segment.Realtime.StreamContinuationMode continuationMode, int 
timeoutMs)

Review Comment:
   Please also update the javadoc



##########
pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java:
##########
@@ -187,15 +190,27 @@ private KinesisMessageBatch 
buildKinesisMessageBatch(KinesisPartitionGroupOffset
     return new KinesisMessageBatch(messages, offsetOfNextBatch, endOfShard);
   }
 
-  private String getShardIterator(String shardId, String sequenceNumber) {
+  private String getShardIterator(String shardId, String sequenceNumber,
+      CommonConstants.Segment.Realtime.StreamContinuationMode 
continuationMode) {
     GetShardIteratorRequest.Builder requestBuilder =
         
GetShardIteratorRequest.builder().streamName(_config.getStreamTopicName()).shardId(shardId);
-    if (sequenceNumber != null) {
-      requestBuilder = requestBuilder.startingSequenceNumber(sequenceNumber)
-          .shardIteratorType(ShardIteratorType.AFTER_SEQUENCE_NUMBER);
-    } else {
-      requestBuilder = 
requestBuilder.shardIteratorType(_config.getShardIteratorType());
+
+    switch (continuationMode) {
+      case RESUME: {
+        if (sequenceNumber != null) {
+          requestBuilder = 
requestBuilder.startingSequenceNumber(sequenceNumber).shardIteratorType(ShardIteratorType.AFTER_SEQUENCE_NUMBER);
+        } else {
+          requestBuilder = 
requestBuilder.shardIteratorType(_config.getShardIteratorType());
+        }
+        break;
+      }
+      case INITIALIZE: {
+        requestBuilder = 
requestBuilder.shardIteratorType(_config.getShardIteratorType());
+        break;
+      }
+      default: //

Review Comment:
   This is bad practice to leave unexpected mode unhandled. Throw exception 
instead



##########
pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumer.java:
##########
@@ -55,15 +56,15 @@ default void start(StreamPartitionMsgOffset startOffset) {
    * @throws TimeoutException If the operation could not be completed within 
timeout
    * @return A batch of messages from the stream partition group
    */
-  default MessageBatch fetchMessages(StreamPartitionMsgOffset startOffset, int 
timeoutMs)
+  default MessageBatch fetchMessages(StreamPartitionMsgOffset startOffset, 
CommonConstants.Segment.Realtime.StreamContinuationMode continuationMode, int 
timeoutMs)

Review Comment:
   (MAJOR) This is backward incompatible (this is a public facing interface). 
We need to add default impl for the new added API and not changing existing 
signature



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to