Jackie-Jiang commented on code in PR #13112: URL: https://github.com/apache/pinot/pull/13112#discussion_r1605909440
########## pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/SegmentZKMetadata.java: ########## @@ -323,6 +324,15 @@ public void setEndOffset(String endOffset) { setValue(Segment.Realtime.END_OFFSET, endOffset); } + public StreamContinuationMode getContinuationMode() { + return _znRecord.getEnumField(Segment.Realtime.CONTINUATION_MODE, StreamContinuationMode.class, + StreamContinuationMode.RESUME); Review Comment: Do you see we might add another mode in the future? I feel a `boolean` field of whether it is the first segment of a streaming partition is good enough. ########## pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumer.java: ########## @@ -55,15 +56,15 @@ default void start(StreamPartitionMsgOffset startOffset) { * @throws TimeoutException If the operation could not be completed within timeout * @return A batch of messages from the stream partition group */ - default MessageBatch fetchMessages(StreamPartitionMsgOffset startOffset, int timeoutMs) + default MessageBatch fetchMessages(StreamPartitionMsgOffset startOffset, CommonConstants.Segment.Realtime.StreamContinuationMode continuationMode, int timeoutMs) Review Comment: I feel a boolean is easier to understand, and I don't see other possible mode ########## pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java: ########## @@ -73,11 +74,12 @@ public KinesisConsumer(KinesisConfig config, KinesisClient kinesisClient) { * Fetch records from the Kinesis stream between the start and end KinesisCheckpoint */ @Override - public KinesisMessageBatch fetchMessages(StreamPartitionMsgOffset startMsgOffset, int timeoutMs) { + public KinesisMessageBatch fetchMessages(StreamPartitionMsgOffset startMsgOffset, Review Comment: Can we get #12806 in first if it works well? ########## pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupOffset.java: ########## @@ -41,18 +41,25 @@ public class KinesisPartitionGroupOffset implements StreamPartitionMsgOffset { private final String _shardId; private final String _sequenceNumber; + public static final String STATUS_SEPARATOR = "::"; Review Comment: Revert the changes in this file? ########## pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java: ########## @@ -430,7 +431,8 @@ protected boolean consumeLoop() // Update _currentOffset upon return from this method MessageBatch messageBatch; try { - messageBatch = _partitionGroupConsumer.fetchMessages(_currentOffset, _streamConfig.getFetchTimeoutMillis()); + messageBatch = _partitionGroupConsumer.fetchMessages(_currentOffset, _segmentZKMetadata.getContinuationMode(), Review Comment: (MAJOR) Only the first batch should be count as ingested from new stream. Within the `fetchMessages()` API, we should just a boolean `inclusive` to mark whether we should consume the current offset ########## pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumer.java: ########## @@ -55,15 +56,15 @@ default void start(StreamPartitionMsgOffset startOffset) { * @throws TimeoutException If the operation could not be completed within timeout * @return A batch of messages from the stream partition group */ - default MessageBatch fetchMessages(StreamPartitionMsgOffset startOffset, int timeoutMs) + default MessageBatch fetchMessages(StreamPartitionMsgOffset startOffset, CommonConstants.Segment.Realtime.StreamContinuationMode continuationMode, int timeoutMs) Review Comment: Please also update the javadoc ########## pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java: ########## @@ -187,15 +190,27 @@ private KinesisMessageBatch buildKinesisMessageBatch(KinesisPartitionGroupOffset return new KinesisMessageBatch(messages, offsetOfNextBatch, endOfShard); } - private String getShardIterator(String shardId, String sequenceNumber) { + private String getShardIterator(String shardId, String sequenceNumber, + CommonConstants.Segment.Realtime.StreamContinuationMode continuationMode) { GetShardIteratorRequest.Builder requestBuilder = GetShardIteratorRequest.builder().streamName(_config.getStreamTopicName()).shardId(shardId); - if (sequenceNumber != null) { - requestBuilder = requestBuilder.startingSequenceNumber(sequenceNumber) - .shardIteratorType(ShardIteratorType.AFTER_SEQUENCE_NUMBER); - } else { - requestBuilder = requestBuilder.shardIteratorType(_config.getShardIteratorType()); + + switch (continuationMode) { + case RESUME: { + if (sequenceNumber != null) { + requestBuilder = requestBuilder.startingSequenceNumber(sequenceNumber).shardIteratorType(ShardIteratorType.AFTER_SEQUENCE_NUMBER); + } else { + requestBuilder = requestBuilder.shardIteratorType(_config.getShardIteratorType()); + } + break; + } + case INITIALIZE: { + requestBuilder = requestBuilder.shardIteratorType(_config.getShardIteratorType()); + break; + } + default: // Review Comment: This is bad practice to leave unexpected mode unhandled. Throw exception instead ########## pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumer.java: ########## @@ -55,15 +56,15 @@ default void start(StreamPartitionMsgOffset startOffset) { * @throws TimeoutException If the operation could not be completed within timeout * @return A batch of messages from the stream partition group */ - default MessageBatch fetchMessages(StreamPartitionMsgOffset startOffset, int timeoutMs) + default MessageBatch fetchMessages(StreamPartitionMsgOffset startOffset, CommonConstants.Segment.Realtime.StreamContinuationMode continuationMode, int timeoutMs) Review Comment: (MAJOR) This is backward incompatible (this is a public facing interface). We need to add default impl for the new added API and not changing existing signature -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org