Jackie-Jiang commented on code in PR #9093: URL: https://github.com/apache/pinot/pull/9093#discussion_r930415617
########## pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java: ########## @@ -440,6 +443,14 @@ protected boolean consumeLoop() } _serverMetrics.setValueOfTableGauge(_metricKeyName, ServerGauge.LLC_PARTITION_CONSUMING, 1); lastUpdatedOffset = _streamPartitionMsgOffsetFactory.create(_currentOffset); Review Comment: Consider `break` here when `endCriteriaReached == true` ########## pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java: ########## @@ -587,6 +606,7 @@ private void processStreamEvents(MessageBatch messagesAndOffsets, long idlePipeS // If there were no messages to be fetched from stream, wait for a little bit as to avoid hammering the stream Uninterruptibles.sleepUninterruptibly(idlePipeSleepTimeMillis, TimeUnit.MILLISECONDS); } + return prematureExit; Review Comment: We should avoid sleeping when `prematureExit == true` ########## pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMessageDecoder.java: ########## @@ -35,11 +36,13 @@ public void init(Map<String, String> props, Set<String> fieldsToRead, String top @Override public GenericRow decode(byte[] payload, GenericRow destination) { - return null; + GenericRow row = Fixtures.createSingleRow(((Long) System.currentTimeMillis()).intValue()); Review Comment: (minor) Consider changing `Fixtures.createSingleRow()` to take `long randomSeed` because `Random` actually takes long seed ########## pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java: ########## @@ -63,23 +68,21 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; -import static org.mockito.Mockito.anyString; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; +import static org.mockito.Mockito.*; // TODO Re-write this test using the stream abstraction public class LLRealtimeSegmentDataManagerTest { private static final String SEGMENT_DIR = "/tmp/" + LLRealtimeSegmentDataManagerTest.class.getSimpleName(); private static final File SEGMENT_DIR_FILE = new File(SEGMENT_DIR); private static final String TABLE_NAME = "Coffee"; - private static final int PARTITION_GROUP_ID = 13; + private static final int PARTITION_GROUP_ID = 0; Review Comment: What's the reason of changing `PARTITION_GROUP_ID` and `START_OFFSET_VALUE` in this test? We should be able to handle any partition and any offset ########## pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java: ########## @@ -1224,6 +1244,18 @@ public LLRealtimeSegmentDataManager(SegmentZKMetadata segmentZKMetadata, TableCo Schema schema, LLCSegmentName llcSegmentName, Semaphore partitionGroupConsumerSemaphore, ServerMetrics serverMetrics, @Nullable PartitionUpsertMetadataManager partitionUpsertMetadataManager, @Nullable PartitionDedupMetadataManager partitionDedupMetadataManager) { + this(segmentZKMetadata, tableConfig, realtimeTableDataManager, resourceDataDir, indexLoadingConfig, schema, + llcSegmentName, partitionGroupConsumerSemaphore, serverMetrics, partitionUpsertMetadataManager, + partitionDedupMetadataManager, Clock.systemUTC()); + } + + @VisibleForTesting + protected LLRealtimeSegmentDataManager(SegmentZKMetadata segmentZKMetadata, TableConfig tableConfig, Review Comment: We don't need to introduce the `Clock` into this main production class. The `FakeLLRealtimeSegmentDataManager` is already overriding the `now()` method, and we may introduce the clock there ########## pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakePartitionLevelConsumer.java: ########## @@ -114,6 +114,10 @@ public MessageBatch fetchMessages(StreamPartitionMsgOffset startOffset, StreamPa } int startOffsetInt = (int) ((LongMsgOffset) startOffset).getOffset(); int endOffsetInt = (int) ((LongMsgOffset) endOffset).getOffset(); + if (endOffsetInt > _messageOffsets.size()) { + // Hack to get multiple batches Review Comment: Should we control this on the caller side? Message fetching should not be aware of the flush threshold -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org