Copilot commented on code in PR #16810:
URL: https://github.com/apache/pinot/pull/16810#discussion_r2346879266


##########
pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java:
##########
@@ -859,6 +859,174 @@ public void 
testShouldNotSkipUnfilteredMessagesIfNotIndexedAndRowCountThresholdI
     }
   }
 
+  @Test
+  public void testCompletionModeDownloadWithUrlValidation()
+      throws Exception {
+    // Test the new validation logic for CompletionMode.DOWNLOAD
+
+    // Test Case 1: Valid download URL - should proceed with DOWNLOAD mode
+    testCompletionModeDownloadWithValidUrl();
+
+    // Test Case 2: Missing/null download URL - should fall back to RETAINING 
mode
+    testCompletionModeDownloadWithMissingUrl();
+
+    // Test Case 3: Empty download URL - should fall back to RETAINING mode
+    testCompletionModeDownloadWithEmptyUrl();
+  }
+
+  private void testCompletionModeDownloadWithValidUrl()
+      throws Exception {
+    String downloadUrl = "http://example.com/segment.tar.gz";;

Review Comment:
   [nitpick] Using 'example.com' in tests is good practice, but consider using 
a constant or a more descriptive URL that clearly indicates this is a test URL 
to avoid any confusion.



##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java:
##########
@@ -1460,58 +1471,59 @@ public void goOnlineFromConsuming(SegmentZKMetadata 
segmentZKMetadata)
         case CATCHING_UP:
         case HOLDING:
         case INITIAL_CONSUMING:
-          switch (_segmentCompletionMode) {
-            case DOWNLOAD:
-              _segmentLogger.info("State {}. CompletionMode {}. Downloading to 
replace", _state.toString(),
-                  _segmentCompletionMode);
+          if (_segmentCompletionMode == CompletionMode.DOWNLOAD) {
+            // Check if download URL has been set by another replica
+            String downloadUrl = segmentZKMetadata.getDownloadUrl();
+            if (StringUtils.isNotEmpty(downloadUrl)) {
+              _segmentLogger.info("State {}. CompletionMode {}. Downloading to 
replace with fresh metadata. URL: {}",
+                  _state.toString(), _segmentCompletionMode, downloadUrl);
               downloadSegmentAndReplace(segmentZKMetadata);
               break;
-            case DEFAULT:
-              // Allow to catch up upto final offset, and then replace.
-              if (_currentOffset.compareTo(endOffset) > 0) {
-                // We moved ahead of the offset that is committed in ZK.
-                _segmentLogger
-                    .warn("Current offset {} ahead of the offset in zk {}. 
Downloading to replace", _currentOffset,
-                        endOffset);
-                downloadSegmentAndReplace(segmentZKMetadata);
-              } else if (_currentOffset.compareTo(endOffset) == 0) {
-                _segmentLogger
-                    .info("Current offset {} matches offset in zk {}. 
Replacing segment", _currentOffset, endOffset);
-                if (!buildSegmentAndReplace()) {
-                  _segmentLogger.warn("Failed to build the segment: {} and 
replace. Downloading to replace",
-                      _segmentNameStr);
-                  downloadSegmentAndReplace(segmentZKMetadata);
-                }
-              } else {
-                boolean success = false;
-                // Since online helix transition for a segment can arrive 
before segment's consumer acquires the
-                // semaphore, check _consumerSemaphoreAcquired before catching 
up.
-                // This is to avoid consuming in parallel to another segment's 
consumer.
-                if (_consumerSemaphoreAcquired.get()) {
-                  _segmentLogger.info("Attempting to catch up from offset {} 
to {} ", _currentOffset, endOffset);
-                  success = catchupToFinalOffset(endOffset,
-                      
TimeUnit.MILLISECONDS.convert(MAX_TIME_FOR_CONSUMING_TO_ONLINE_IN_SECONDS, 
TimeUnit.SECONDS));
-                } else {
-                  _segmentLogger.warn("Consumer semaphore was not acquired, 
Skipping catch up from offset {} to {} ",
-                      _currentOffset, endOffset);
-                }
+            } else {
+              _segmentLogger.warn("State {}. CompletionMode is DOWNLOAD but 
download URL is missing. "
+                  + "Falling back to building segment locally for segment: 
{}", _state.toString(), _segmentNameStr);
+            }
+          }
+          // Allow to catch up upto final offset, and then replace.
+          if (_currentOffset.compareTo(endOffset) > 0) {
+            // We moved ahead of the offset that is committed in ZK.
+            _segmentLogger.warn("Current offset {} ahead of the offset in zk 
{}. Downloading to replace",
+                _currentOffset, endOffset);

Review Comment:
   [nitpick] The code formatting change from chained method calls to line 
breaks seems unnecessary and inconsistent with other similar log statements in 
the codebase. Consider keeping the original formatting to maintain consistency.
   ```suggestion
               _segmentLogger.warn("Current offset {} ahead of the offset in zk 
{}. Downloading to replace", _currentOffset, endOffset);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to