Jackie-Jiang commented on code in PR #9093:
URL: https://github.com/apache/pinot/pull/9093#discussion_r930415617


##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java:
##########
@@ -440,6 +443,14 @@ protected boolean consumeLoop()
         }
         _serverMetrics.setValueOfTableGauge(_metricKeyName, 
ServerGauge.LLC_PARTITION_CONSUMING, 1);
         lastUpdatedOffset = 
_streamPartitionMsgOffsetFactory.create(_currentOffset);

Review Comment:
   Consider `break` here when `endCriteriaReached == true`



##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java:
##########
@@ -587,6 +606,7 @@ private void processStreamEvents(MessageBatch 
messagesAndOffsets, long idlePipeS
       // If there were no messages to be fetched from stream, wait for a 
little bit as to avoid hammering the stream
       Uninterruptibles.sleepUninterruptibly(idlePipeSleepTimeMillis, 
TimeUnit.MILLISECONDS);
     }
+    return prematureExit;

Review Comment:
   We should avoid sleeping when `prematureExit == true`



##########
pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMessageDecoder.java:
##########
@@ -35,11 +36,13 @@ public void init(Map<String, String> props, Set<String> 
fieldsToRead, String top
 
   @Override
   public GenericRow decode(byte[] payload, GenericRow destination) {
-    return null;
+    GenericRow row = Fixtures.createSingleRow(((Long) 
System.currentTimeMillis()).intValue());

Review Comment:
   (minor) Consider changing `Fixtures.createSingleRow()` to take `long 
randomSeed` because `Random` actually takes long seed



##########
pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java:
##########
@@ -63,23 +68,21 @@
 
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyInt;
-import static org.mockito.Mockito.anyString;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.*;
 
 
 // TODO Re-write this test using the stream abstraction
 public class LLRealtimeSegmentDataManagerTest {
   private static final String SEGMENT_DIR = "/tmp/" + 
LLRealtimeSegmentDataManagerTest.class.getSimpleName();
   private static final File SEGMENT_DIR_FILE = new File(SEGMENT_DIR);
   private static final String TABLE_NAME = "Coffee";
-  private static final int PARTITION_GROUP_ID = 13;
+  private static final int PARTITION_GROUP_ID = 0;

Review Comment:
   What's the reason of changing `PARTITION_GROUP_ID` and `START_OFFSET_VALUE` 
in this test? We should be able to handle any partition and any offset



##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java:
##########
@@ -1224,6 +1244,18 @@ public LLRealtimeSegmentDataManager(SegmentZKMetadata 
segmentZKMetadata, TableCo
       Schema schema, LLCSegmentName llcSegmentName, Semaphore 
partitionGroupConsumerSemaphore,
       ServerMetrics serverMetrics, @Nullable PartitionUpsertMetadataManager 
partitionUpsertMetadataManager,
       @Nullable PartitionDedupMetadataManager partitionDedupMetadataManager) {
+      this(segmentZKMetadata, tableConfig, realtimeTableDataManager, 
resourceDataDir, indexLoadingConfig, schema,
+          llcSegmentName, partitionGroupConsumerSemaphore, serverMetrics, 
partitionUpsertMetadataManager,
+          partitionDedupMetadataManager, Clock.systemUTC());
+  }
+
+  @VisibleForTesting
+  protected LLRealtimeSegmentDataManager(SegmentZKMetadata segmentZKMetadata, 
TableConfig tableConfig,

Review Comment:
   We don't need to introduce the `Clock` into this main production class. The 
`FakeLLRealtimeSegmentDataManager` is already overriding the `now()` method, 
and we may introduce the clock there



##########
pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakePartitionLevelConsumer.java:
##########
@@ -114,6 +114,10 @@ public MessageBatch fetchMessages(StreamPartitionMsgOffset 
startOffset, StreamPa
     }
     int startOffsetInt = (int) ((LongMsgOffset) startOffset).getOffset();
     int endOffsetInt = (int) ((LongMsgOffset) endOffset).getOffset();
+    if (endOffsetInt > _messageOffsets.size()) {
+      // Hack to get multiple batches

Review Comment:
   Should we control this on the caller side? Message fetching should not be 
aware of the flush threshold



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to