noob-se7en commented on code in PR #15404:
URL: https://github.com/apache/pinot/pull/15404#discussion_r2019774668


##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java:
##########
@@ -474,23 +473,26 @@ protected void doAddOnlineSegment(String segmentName)
     SegmentDataManager segmentDataManager = 
_segmentDataManagerMap.get(segmentName);
     if (segmentDataManager == null) {
       addNewOnlineSegment(zkMetadata, indexLoadingConfig);
-      return;
-    }
-    if (segmentDataManager instanceof RealtimeSegmentDataManager) {
+    } else if (segmentDataManager instanceof RealtimeSegmentDataManager) {
       _logger.info("Changing segment: {} from CONSUMING to ONLINE", 
segmentName);
       ((RealtimeSegmentDataManager) 
segmentDataManager).goOnlineFromConsuming(zkMetadata);
       onConsumingToOnline(segmentName);
-      return;
-    }
-    // For pauseless ingestion, the segment is marked ONLINE before it's built 
and before the COMMIT_END_METADATA
-    // call completes.
-    // The server should replace the segment only after the CRC is set by 
COMMIT_END_METADATA and the segment is
-    // marked DONE.
-    // This ensures the segment's download URL is available before discarding 
the locally built copy, preventing
-    // data loss if COMMIT_END_METADATA fails.
-    if (zkMetadata.getStatus() == Status.DONE) {
+    } else if (zkMetadata.getStatus() == Status.DONE) {
+      // For pauseless ingestion, the segment is marked ONLINE before it's 
built and before the COMMIT_END_METADATA
+      // call completes.
+      // The server should replace the segment only after the CRC is set by 
COMMIT_END_METADATA and the segment is
+      // marked DONE.
+      // This ensures the segment's download URL is available before 
discarding the locally built copy, preventing
+      // data loss if COMMIT_END_METADATA fails.
       replaceSegmentIfCrcMismatch(segmentDataManager, zkMetadata, 
indexLoadingConfig);
     }
+    // Register the segment into the consumer coordinator if consumption order 
is enforced.
+    if (_enforceConsumptionInOrder) {
+      LLCSegmentName llcSegmentName = LLCSegmentName.of(segmentName);
+      if (llcSegmentName != null) {
+        
getConsumerCoordinator(llcSegmentName.getPartitionGroupId()).register(llcSegmentName);

Review Comment:
   won't this mess up the watermark if 
   `consuming -> online` or `offline -> online` transitions are out of order? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to