Copilot commented on code in PR #16810:
URL: https://github.com/apache/pinot/pull/16810#discussion_r2346868358


##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java:
##########
@@ -1460,58 +1471,59 @@ public void goOnlineFromConsuming(SegmentZKMetadata 
segmentZKMetadata)
         case CATCHING_UP:
         case HOLDING:
         case INITIAL_CONSUMING:
-          switch (_segmentCompletionMode) {
-            case DOWNLOAD:
-              _segmentLogger.info("State {}. CompletionMode {}. Downloading to 
replace", _state.toString(),
-                  _segmentCompletionMode);
+          if (_segmentCompletionMode == CompletionMode.DOWNLOAD) {
+            // Fetch fresh metadata from ZooKeeper to check if download URL 
has been set by another replica
+            String downloadUrl = segmentZKMetadata.getDownloadUrl();
+            if (StringUtils.isNotEmpty(downloadUrl)) {
+              _segmentLogger.info("State {}. CompletionMode {}. Downloading to 
replace with fresh metadata. URL: {}",
+                  _state.toString(), _segmentCompletionMode, downloadUrl);
               downloadSegmentAndReplace(segmentZKMetadata);
               break;
-            case DEFAULT:
-              // Allow to catch up upto final offset, and then replace.
-              if (_currentOffset.compareTo(endOffset) > 0) {
-                // We moved ahead of the offset that is committed in ZK.
-                _segmentLogger
-                    .warn("Current offset {} ahead of the offset in zk {}. 
Downloading to replace", _currentOffset,
-                        endOffset);
-                downloadSegmentAndReplace(segmentZKMetadata);
-              } else if (_currentOffset.compareTo(endOffset) == 0) {
-                _segmentLogger
-                    .info("Current offset {} matches offset in zk {}. 
Replacing segment", _currentOffset, endOffset);
-                if (!buildSegmentAndReplace()) {
-                  _segmentLogger.warn("Failed to build the segment: {} and 
replace. Downloading to replace",
-                      _segmentNameStr);
-                  downloadSegmentAndReplace(segmentZKMetadata);
-                }
-              } else {
-                boolean success = false;
-                // Since online helix transition for a segment can arrive 
before segment's consumer acquires the
-                // semaphore, check _consumerSemaphoreAcquired before catching 
up.
-                // This is to avoid consuming in parallel to another segment's 
consumer.
-                if (_consumerSemaphoreAcquired.get()) {
-                  _segmentLogger.info("Attempting to catch up from offset {} 
to {} ", _currentOffset, endOffset);
-                  success = catchupToFinalOffset(endOffset,
-                      
TimeUnit.MILLISECONDS.convert(MAX_TIME_FOR_CONSUMING_TO_ONLINE_IN_SECONDS, 
TimeUnit.SECONDS));
-                } else {
-                  _segmentLogger.warn("Consumer semaphore was not acquired, 
Skipping catch up from offset {} to {} ",
-                      _currentOffset, endOffset);
-                }
+            } else {
+              _segmentLogger.warn("State {}. CompletionMode is DOWNLOAD but 
download URL is missing. "
+                  + "Falling back to building segment locally for segment: 
{}", _state.toString(), _segmentNameStr);
+            }
+          }

Review Comment:
   The comment mentions fetching fresh metadata from ZooKeeper, but the code 
uses the passed `segmentZKMetadata` parameter instead of fetching fresh 
metadata like in the KEEP case above. This inconsistency could lead to using 
stale metadata when the download URL might have been updated by another replica.



##########
pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java:
##########
@@ -859,6 +859,159 @@ public void 
testShouldNotSkipUnfilteredMessagesIfNotIndexedAndRowCountThresholdI
     }
   }
 
+  @Test
+  public void testCompletionModeDownloadWithUrlValidation()
+      throws Exception {
+    // Test the new validation logic for CompletionMode.DOWNLOAD
+
+    // Test Case 1: Valid download URL - should proceed with DOWNLOAD mode
+    testCompletionModeDownloadWithValidUrl();
+
+    // Test Case 2: Missing/null download URL - should fall back to RETAINING 
mode
+    testCompletionModeDownloadWithMissingUrl();
+
+    // Test Case 3: Empty download URL - should fall back to RETAINING mode
+    testCompletionModeDownloadWithEmptyUrl();
+  }
+
+  private void testCompletionModeDownloadWithValidUrl()
+      throws Exception {
+    String downloadUrl = "http://example.com/segment.tar.gz";;
+    TestResult expectedResult = new TestResult(
+        RealtimeSegmentDataManager.State.DISCARDED, false);
+
+    runCompletionModeDownloadTest(downloadUrl, downloadUrl, expectedResult);
+  }
+
+  private void testCompletionModeDownloadWithMissingUrl()
+      throws Exception {
+    TestResult expectedResult = new TestResult(
+        RealtimeSegmentDataManager.State.RETAINED, true);
+
+    runCompletionModeDownloadTest(null, null, expectedResult);
+  }
+
+  private void testCompletionModeDownloadWithEmptyUrl()
+      throws Exception {
+    TestResult expectedResult = new TestResult(
+        RealtimeSegmentDataManager.State.RETAINED, true);
+
+    runCompletionModeDownloadTest("", "", expectedResult);
+  }
+
+  /**
+   * Helper method to run completion mode download tests with different 
download URL scenarios.
+   *
+   * @param freshMetadataDownloadUrl The download URL returned by 
fetchZKMetadata (fresh metadata)
+   * @param initialSegmentDownloadUrl The download URL in the initial segment 
metadata
+   * @param expectedResult The expected test result (state and 
buildAndReplaceCalled flag)
+   */
+  private void runCompletionModeDownloadTest(String freshMetadataDownloadUrl,
+      String initialSegmentDownloadUrl, TestResult expectedResult)
+      throws Exception {
+
+    // Create table config with DOWNLOAD completion mode
+    TableConfig tableConfig = createTableConfigWithDownloadCompletionMode();
+
+    // Create mock table data manager with the specified download URL
+    RealtimeTableDataManager mockTableDataManager = 
createMockTableDataManager(freshMetadataDownloadUrl);
+
+    // Create segment data manager with DOWNLOAD completion mode
+    SegmentZKMetadata segmentZKMetadata = createZkMetadata();
+    if (initialSegmentDownloadUrl != null) {
+      segmentZKMetadata.setDownloadUrl(initialSegmentDownloadUrl);
+    }
+
+    LLCSegmentName llcSegmentName = new LLCSegmentName(SEGMENT_NAME_STR);
+    Schema schema = Fixtures.createSchema();
+    ServerMetrics serverMetrics = new 
ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry());
+
+    try (FakeRealtimeSegmentDataManager segmentDataManager =
+        new FakeRealtimeSegmentDataManager(segmentZKMetadata, tableConfig, 
mockTableDataManager,
+            new File(TEMP_DIR, REALTIME_TABLE_NAME).getAbsolutePath(), schema, 
llcSegmentName,
+            _partitionGroupIdToConsumerCoordinatorMap, serverMetrics, new 
TimeSupplier())) {
+
+      // Execute the consumer with KEEP response
+      executeConsumerWithKeepResponse(segmentDataManager);
+
+      // Verify the expected results
+      Assert.assertEquals(segmentDataManager._state.get(segmentDataManager),
+          expectedResult.expectedState);
+      Assert.assertEquals(segmentDataManager._buildAndReplaceCalled,
+          expectedResult.expectedBuildAndReplaceCalled);
+      Assert.assertTrue(segmentDataManager._responses.isEmpty());
+      Assert.assertTrue(segmentDataManager._consumeOffsets.isEmpty());
+    }
+  }
+
+  /**
+   * Creates a table config with DOWNLOAD completion mode.
+   */
+  private TableConfig createTableConfigWithDownloadCompletionMode()
+      throws Exception {
+    TableConfig tableConfig = createTableConfig();
+    tableConfig.getValidationConfig().setCompletionConfig(
+        new org.apache.pinot.spi.config.table.CompletionConfig("DOWNLOAD"));
+    return tableConfig;
+  }
+
+  /**
+   * Creates a mock table data manager that returns the specified download URL 
in fresh metadata.
+   */
+  private RealtimeTableDataManager createMockTableDataManager(String 
downloadUrl) {
+    RealtimeTableDataManager mockTableDataManager = 
mock(RealtimeTableDataManager.class);
+    when(mockTableDataManager.getInstanceId()).thenReturn("server-1");
+    
when(mockTableDataManager.getSegmentLock(any())).thenReturn(mock(Lock.class));
+
+    // Mock fetchZKMetadata to return metadata with the specified download URL
+    SegmentZKMetadata metadata = createZkMetadata();
+    metadata.setDownloadUrl(downloadUrl);
+    
when(mockTableDataManager.fetchZKMetadata(SEGMENT_NAME_STR)).thenReturn(metadata);
+
+    // Set up stats history and consumer directory
+    RealtimeSegmentStatsHistory statsHistory = 
mock(RealtimeSegmentStatsHistory.class);
+    when(statsHistory.getEstimatedCardinality(anyString())).thenReturn(200);
+    when(statsHistory.getEstimatedAvgColSize(anyString())).thenReturn(32);
+    when(mockTableDataManager.getStatsHistory()).thenReturn(statsHistory);
+    
when(mockTableDataManager.getConsumerDir()).thenReturn(TEMP_DIR.getAbsolutePath()
 + "/consumerDir");
+
+    // Set up consumer coordinator in the map
+    _partitionGroupIdToConsumerCoordinatorMap.putIfAbsent(PARTITION_GROUP_ID,
+        new ConsumerCoordinator(false, mockTableDataManager));
+
+    return mockTableDataManager;
+  }
+
+  /**
+   * Executes the consumer with a KEEP response to trigger the completion mode 
logic.
+   */
+  private void executeConsumerWithKeepResponse(FakeRealtimeSegmentDataManager 
segmentDataManager) {
+    RealtimeSegmentDataManager.PartitionConsumer consumer = 
segmentDataManager.createPartitionConsumer();
+    final LongMsgOffset endOffset = new LongMsgOffset(START_OFFSET_VALUE + 
500);
+    segmentDataManager._consumeOffsets.add(endOffset);
+
+    final SegmentCompletionProtocol.Response response = new 
SegmentCompletionProtocol.Response(
+        new SegmentCompletionProtocol.Response.Params().withStatus(
+                SegmentCompletionProtocol.ControllerResponseStatus.KEEP)
+            .withStreamPartitionMsgOffset(endOffset.toString()));
+    segmentDataManager._responses.add(response);
+
+    consumer.run();
+  }
+
+  /**
+   * Helper class to encapsulate expected test results.
+   */
+  private static class TestResult {
+    final RealtimeSegmentDataManager.State expectedState;
+    final boolean expectedBuildAndReplaceCalled;
+    

Review Comment:
   The TestResult class lacks Javadoc documentation. Since this is a helper 
class with specific fields that represent test expectations, it should include 
documentation explaining the purpose of each field and the class itself.
   ```suggestion
     /**
      * Helper class to encapsulate expected test results for segment state 
transitions.
      * <p>
      * This class holds the expected state of a {@link 
RealtimeSegmentDataManager} after a test,
      * as well as whether the build-and-replace operation was expected to be 
called.
      */
     private static class TestResult {
       /**
        * The expected state of the {@link RealtimeSegmentDataManager} after 
the test execution.
        */
       final RealtimeSegmentDataManager.State expectedState;
       /**
        * Whether the build-and-replace operation was expected to be called 
during the test.
        */
       final boolean expectedBuildAndReplaceCalled;
   
       /**
        * Constructs a TestResult with the expected state and build-and-replace 
flag.
        *
        * @param expectedState The expected state of the segment manager after 
the test.
        * @param expectedBuildAndReplaceCalled Whether build-and-replace was 
expected to be called.
        */
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to