Jackie-Jiang commented on code in PR #15419: URL: https://github.com/apache/pinot/pull/15419#discussion_r2037888495
########## pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java: ########## @@ -518,15 +518,13 @@ public void addConsumingSegment(String segmentName) private void doAddConsumingSegment(String segmentName) throws AttemptsExceededException, RetriableOperationException { - SegmentZKMetadata zkMetadata = fetchZKMetadata(segmentName); - if (!_enforceConsumptionInOrder && zkMetadata.getStatus().isCompleted()) { + SegmentZKMetadata zkMetadata = fetchZKMetadataNullable(segmentName); Review Comment: We do want it to throw exception when ZK metadata is removed. This is an unexpected scenario ########## pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java: ########## @@ -1722,16 +1726,13 @@ public RealtimeSegmentDataManager(SegmentZKMetadata segmentZKMetadata, TableConf _state = State.INITIAL_CONSUMING; _latestStreamOffsetAtStartupTime = fetchLatestStreamOffset(5000); _consumeStartTime = now(); - setConsumeEndTime(segmentZKMetadata, _consumeStartTime); + setConsumeEndTime(segmentZKMetadata, now()); _segmentCommitterFactory = new SegmentCommitterFactory(_segmentLogger, _protocolHandler, tableConfig, indexLoadingConfig, serverMetrics); - _segmentLogger.info("Starting consumption on realtime consuming segment {} maxRowCount {} maxEndTime {}", - llcSegmentName, _segmentMaxRowCount, new DateTime(_consumeEndTime, DateTimeZone.UTC)); } catch (Throwable t) { // In case of exception thrown here, segment goes to ERROR state. Then any attempt to reset the segment from // ERROR -> OFFLINE -> CONSUMING via Helix Admin fails because the semaphore is acquired, but not released. // Hence releasing the semaphore here to unblock reset operation via Helix Admin. - releaseConsumerSemaphore(); Review Comment: Good catch ########## pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java: ########## @@ -1722,16 +1726,13 @@ public RealtimeSegmentDataManager(SegmentZKMetadata segmentZKMetadata, TableConf _state = State.INITIAL_CONSUMING; _latestStreamOffsetAtStartupTime = fetchLatestStreamOffset(5000); _consumeStartTime = now(); - setConsumeEndTime(segmentZKMetadata, _consumeStartTime); + setConsumeEndTime(segmentZKMetadata, now()); Review Comment: (minor) This can be reverted ########## pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java: ########## @@ -311,7 +311,7 @@ public void deleteSegmentFile() { private final AtomicLong _lastUpdatedRowsIndexed = new AtomicLong(0); private final String _instanceId; private final ServerSegmentCompletionProtocolHandler _protocolHandler; - private final long _consumeStartTime; + private long _consumeStartTime; Review Comment: (minor) Move it in front of `_lastLogTime` ########## pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java: ########## @@ -747,6 +747,10 @@ public void run() { _segmentLogger.info("Acquired consumer semaphore."); + _consumeStartTime = now(); + _segmentLogger.info("Starting consumption on realtime consuming segment {} maxRowCount {} maxEndTime {}", Review Comment: (minor) ```suggestion _segmentLogger.info("Starting consumption on segment: {}, maxRowCount: {}, maxEndTime: {}", ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org