Jackie-Jiang commented on code in PR #15419:
URL: https://github.com/apache/pinot/pull/15419#discussion_r2037888495


##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java:
##########
@@ -518,15 +518,13 @@ public void addConsumingSegment(String segmentName)
 
   private void doAddConsumingSegment(String segmentName)
       throws AttemptsExceededException, RetriableOperationException {
-    SegmentZKMetadata zkMetadata = fetchZKMetadata(segmentName);
-    if (!_enforceConsumptionInOrder && zkMetadata.getStatus().isCompleted()) {
+    SegmentZKMetadata zkMetadata = fetchZKMetadataNullable(segmentName);

Review Comment:
   We do want it to throw exception when ZK metadata is removed. This is an 
unexpected scenario



##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java:
##########
@@ -1722,16 +1726,13 @@ public RealtimeSegmentDataManager(SegmentZKMetadata 
segmentZKMetadata, TableConf
       _state = State.INITIAL_CONSUMING;
       _latestStreamOffsetAtStartupTime = fetchLatestStreamOffset(5000);
       _consumeStartTime = now();
-      setConsumeEndTime(segmentZKMetadata, _consumeStartTime);
+      setConsumeEndTime(segmentZKMetadata, now());
       _segmentCommitterFactory =
           new SegmentCommitterFactory(_segmentLogger, _protocolHandler, 
tableConfig, indexLoadingConfig, serverMetrics);
-      _segmentLogger.info("Starting consumption on realtime consuming segment 
{} maxRowCount {} maxEndTime {}",
-          llcSegmentName, _segmentMaxRowCount, new DateTime(_consumeEndTime, 
DateTimeZone.UTC));
     } catch (Throwable t) {
       // In case of exception thrown here, segment goes to ERROR state. Then 
any attempt to reset the segment from
       // ERROR -> OFFLINE -> CONSUMING via Helix Admin fails because the 
semaphore is acquired, but not released.
       // Hence releasing the semaphore here to unblock reset operation via 
Helix Admin.
-      releaseConsumerSemaphore();

Review Comment:
   Good catch



##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java:
##########
@@ -1722,16 +1726,13 @@ public RealtimeSegmentDataManager(SegmentZKMetadata 
segmentZKMetadata, TableConf
       _state = State.INITIAL_CONSUMING;
       _latestStreamOffsetAtStartupTime = fetchLatestStreamOffset(5000);
       _consumeStartTime = now();
-      setConsumeEndTime(segmentZKMetadata, _consumeStartTime);
+      setConsumeEndTime(segmentZKMetadata, now());

Review Comment:
   (minor) This can be reverted



##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java:
##########
@@ -311,7 +311,7 @@ public void deleteSegmentFile() {
   private final AtomicLong _lastUpdatedRowsIndexed = new AtomicLong(0);
   private final String _instanceId;
   private final ServerSegmentCompletionProtocolHandler _protocolHandler;
-  private final long _consumeStartTime;
+  private long _consumeStartTime;

Review Comment:
   (minor) Move it in front of `_lastLogTime`



##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java:
##########
@@ -747,6 +747,10 @@ public void run() {
 
         _segmentLogger.info("Acquired consumer semaphore.");
 
+        _consumeStartTime = now();
+        _segmentLogger.info("Starting consumption on realtime consuming 
segment {} maxRowCount {} maxEndTime {}",

Review Comment:
   (minor)
   ```suggestion
           _segmentLogger.info("Starting consumption on segment: {}, 
maxRowCount: {}, maxEndTime: {}",
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to