This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 959548c75e3 allow fallback to retain mode if download url is missing 
for CompletionMode.DOWNLOAD (#16810)
959548c75e3 is described below

commit 959548c75e3459a95ce4a42f9cab19e40a23444e
Author: Xiang Fu <[email protected]>
AuthorDate: Thu Oct 23 17:59:25 2025 -0700

    allow fallback to retain mode if download url is missing for 
CompletionMode.DOWNLOAD (#16810)
    
    * Add download URL validation for CompletionMode.DOWNLOAD with graceful 
fallback
    
    - Validate download URL availability before proceeding with DOWNLOAD 
completion mode
    - Fetch fresh segment metadata from ZooKeeper to handle race conditions 
where other replicas may have set the download URL
    - Gracefully fall back to RETAINING mode when download URL is missing, 
null, or empty
    - Apply validation in both KEEP case and goOnlineFromConsuming method for 
comprehensive coverage
    - Add comprehensive test suite with 3 test cases covering valid URL, 
missing URL, and empty URL scenarios
    - Ensure robust handling of segment completion when download infrastructure 
is not available
    
    This prevents failures in DOWNLOAD completion mode when segment metadata 
lacks download URLs,
    ensuring system reliability and graceful degradation to local segment 
building.
    
    * Support peer download
---
 .../core/data/manager/BaseTableDataManager.java    |   8 +
 .../realtime/RealtimeSegmentDataManager.java       | 127 +++++++++-------
 .../manager/realtime/RealtimeTableDataManager.java |   2 -
 .../realtime/RealtimeSegmentDataManagerTest.java   | 168 +++++++++++++++++++++
 4 files changed, 252 insertions(+), 53 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
index b70419e4f51..5963e963bfe 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
@@ -1656,4 +1656,12 @@ public abstract class BaseTableDataManager implements 
TableDataManager {
       }
     }
   }
+
+  /**
+   * Returns the configured peer download scheme if peer-to-peer download is 
enabled; otherwise null.
+   */
+  @Nullable
+  public String getPeerDownloadScheme() {
+    return _peerDownloadScheme;
+  }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
index 17336e89ff2..d3458f05944 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
@@ -850,8 +850,27 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
               break;
             case KEEP: {
               if (_segmentCompletionMode == CompletionMode.DOWNLOAD) {
-                _state = State.DISCARDED;
-                break;
+                // Fetch fresh metadata from ZooKeeper to check if download 
URL has been set by another replica
+                String downloadUrl = 
_realtimeTableDataManager.fetchZKMetadata(_segmentNameStr).getDownloadUrl();
+                if (StringUtils.isNotEmpty(downloadUrl)) {
+                  _segmentLogger.info(
+                      "CompletionMode is DOWNLOAD and download URL is 
available for segment: {}. URL: {}",
+                      _segmentNameStr, downloadUrl);
+                  _state = State.DISCARDED; // will trigger download on ONLINE 
transition
+                  break;
+                }
+                // No download URL yet. If peer download is enabled, prefer 
download path by discarding and letting
+                // ONLINE transition perform the peer download. Otherwise, 
fall back to retaining (local build).
+                if (_realtimeTableDataManager.getPeerDownloadScheme() != null) 
{
+                  _segmentLogger.warn(
+                      "CompletionMode is DOWNLOAD but URL missing for segment: 
{}. Peer download enabled; "
+                          + "preferring download path via ONLINE transition.", 
_segmentNameStr);
+                  _state = State.DISCARDED; // ONLINE transition will download 
from peers or deep store
+                  break;
+                }
+                _segmentLogger.warn(
+                    "CompletionMode is DOWNLOAD but download URL is missing 
for segment: {} and peer download "
+                        + "is disabled. Falling back to RETAINING mode", 
_segmentNameStr);
               }
               _state = State.RETAINING;
               // Lock the segment to avoid multiple threads touching the same 
segment.
@@ -1460,58 +1479,64 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
         case CATCHING_UP:
         case HOLDING:
         case INITIAL_CONSUMING:
-          switch (_segmentCompletionMode) {
-            case DOWNLOAD:
-              _segmentLogger.info("State {}. CompletionMode {}. Downloading to 
replace", _state.toString(),
-                  _segmentCompletionMode);
+          if (_segmentCompletionMode == CompletionMode.DOWNLOAD) {
+            // Check if download URL has been set by another replica
+            String downloadUrl = segmentZKMetadata.getDownloadUrl();
+            if (StringUtils.isNotEmpty(downloadUrl)) {
+              _segmentLogger.info("State {}. CompletionMode {}. Downloading to 
replace with fresh metadata. URL: {}",
+                  _state.toString(), _segmentCompletionMode, downloadUrl);
               downloadSegmentAndReplace(segmentZKMetadata);
               break;
-            case DEFAULT:
-              // Allow to catch up upto final offset, and then replace.
-              if (_currentOffset.compareTo(endOffset) > 0) {
-                // We moved ahead of the offset that is committed in ZK.
-                _segmentLogger
-                    .warn("Current offset {} ahead of the offset in zk {}. 
Downloading to replace", _currentOffset,
-                        endOffset);
-                downloadSegmentAndReplace(segmentZKMetadata);
-              } else if (_currentOffset.compareTo(endOffset) == 0) {
-                _segmentLogger
-                    .info("Current offset {} matches offset in zk {}. 
Replacing segment", _currentOffset, endOffset);
-                if (!buildSegmentAndReplace()) {
-                  _segmentLogger.warn("Failed to build the segment: {} and 
replace. Downloading to replace",
-                      _segmentNameStr);
-                  downloadSegmentAndReplace(segmentZKMetadata);
-                }
-              } else {
-                boolean success = false;
-                // Since online helix transition for a segment can arrive 
before segment's consumer acquires the
-                // semaphore, check _consumerSemaphoreAcquired before catching 
up.
-                // This is to avoid consuming in parallel to another segment's 
consumer.
-                if (_consumerSemaphoreAcquired.get()) {
-                  _segmentLogger.info("Attempting to catch up from offset {} 
to {} ", _currentOffset, endOffset);
-                  success = catchupToFinalOffset(endOffset,
-                      
TimeUnit.MILLISECONDS.convert(MAX_TIME_FOR_CONSUMING_TO_ONLINE_IN_SECONDS, 
TimeUnit.SECONDS));
-                } else {
-                  _segmentLogger.warn("Consumer semaphore was not acquired, 
Skipping catch up from offset {} to {} ",
-                      _currentOffset, endOffset);
-                }
+            }
+            if (_realtimeTableDataManager.getPeerDownloadScheme() != null) {
+              _segmentLogger.warn("State {}. CompletionMode is DOWNLOAD but 
URL missing; attempting download via peers "
+                  + "or deep store if available for segment: {}", 
_state.toString(), _segmentNameStr);
+              downloadSegmentAndReplace(segmentZKMetadata);
+              break;
+            }
+            _segmentLogger.warn("State {}. CompletionMode is DOWNLOAD but 
download URL is missing and peer download is "
+                + "disabled. Falling back to local build for segment: {}", 
_state.toString(), _segmentNameStr);
+          }
+          // Allow to catch up upto final offset, and then replace.
+          if (_currentOffset.compareTo(endOffset) > 0) {
+            // We moved ahead of the offset that is committed in ZK.
+            _segmentLogger.warn("Current offset {} ahead of the offset in zk 
{}. Downloading to replace",
+                _currentOffset, endOffset);
+            downloadSegmentAndReplace(segmentZKMetadata);
+          } else if (_currentOffset.compareTo(endOffset) == 0) {
+            _segmentLogger.info("Current offset {} matches offset in zk {}. 
Replacing segment", _currentOffset,
+                endOffset);
+            if (!buildSegmentAndReplace()) {
+              _segmentLogger.warn("Failed to build the segment: {} and 
replace. Downloading to replace",
+                  _segmentNameStr);
+              downloadSegmentAndReplace(segmentZKMetadata);
+            }
+          } else {
+            boolean success = false;
+            // Since online helix transition for a segment can arrive before 
segment's consumer acquires the
+            // semaphore, check _consumerSemaphoreAcquired before catching up.
+            // This is to avoid consuming in parallel to another segment's 
consumer.
+            if (_consumerSemaphoreAcquired.get()) {
+              _segmentLogger.info("Attempting to catch up from offset {} to {} 
", _currentOffset, endOffset);
+              success = catchupToFinalOffset(endOffset,
+                  
TimeUnit.MILLISECONDS.convert(MAX_TIME_FOR_CONSUMING_TO_ONLINE_IN_SECONDS, 
TimeUnit.SECONDS));
+            } else {
+              _segmentLogger.warn("Consumer semaphore was not acquired, 
Skipping catch up from offset {} to {} ",
+                  _currentOffset, endOffset);
+            }
 
-                if (success) {
-                  _segmentLogger.info("Caught up to offset {}", 
_currentOffset);
-                  if (!buildSegmentAndReplace()) {
-                    _segmentLogger.warn("Failed to build the segment: {} after 
catchup. Downloading to replace",
-                        _segmentNameStr);
-                    downloadSegmentAndReplace(segmentZKMetadata);
-                  }
-                } else {
-                  _segmentLogger
-                      .info("Could not catch up to offset (current = {}). 
Downloading to replace", _currentOffset);
-                  downloadSegmentAndReplace(segmentZKMetadata);
-                }
+            if (success) {
+              _segmentLogger.info("Caught up to offset {}", _currentOffset);
+              if (!buildSegmentAndReplace()) {
+                _segmentLogger.warn("Failed to build the segment: {} after 
catchup. Downloading to replace",
+                    _segmentNameStr);
+                downloadSegmentAndReplace(segmentZKMetadata);
               }
-              break;
-            default:
-              break;
+            } else {
+              _segmentLogger
+                  .info("Could not catch up to offset (current = {}). 
Downloading to replace", _currentOffset);
+              downloadSegmentAndReplace(segmentZKMetadata);
+            }
           }
           break;
         default:
@@ -1788,7 +1813,7 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
           "Failed to initialize segment data manager", t));
       _segmentLogger.warn(
           "Scheduling task to call controller to mark the segment as OFFLINE 
in Ideal State due"
-           + " to initialization error: '{}'",
+              + " to initialization error: '{}'",
           t.getMessage());
       // Since we are going to throw exception from this thread (helix 
execution thread), the externalview
       // entry for this segment will be ERROR. We allow time for Helix to make 
this transition, and then
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index ff0d428ca6e..c8fdb110f20 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -581,8 +581,6 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
       throws Exception {
     Status status = zkMetadata.getStatus();
     if (status.isCompleted()) {
-      // Segment is completed and ready to be downloaded either from deep 
storage or from a peer (if peer-to-peer
-      // download is enabled).
       return super.downloadSegment(zkMetadata);
     }
 
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java
index 59d25a32151..8107625b64b 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java
@@ -859,6 +859,174 @@ public class RealtimeSegmentDataManagerTest {
     }
   }
 
+  @Test
+  public void testCompletionModeDownloadWithUrlValidation()
+      throws Exception {
+    // Test the new validation logic for CompletionMode.DOWNLOAD
+
+    // Test Case 1: Valid download URL - should proceed with DOWNLOAD mode
+    testCompletionModeDownloadWithValidUrl();
+
+    // Test Case 2: Missing/null download URL - should fall back to RETAINING 
mode
+    testCompletionModeDownloadWithMissingUrl();
+
+    // Test Case 3: Empty download URL - should fall back to RETAINING mode
+    testCompletionModeDownloadWithEmptyUrl();
+  }
+
+  private void testCompletionModeDownloadWithValidUrl()
+      throws Exception {
+    String downloadUrl = "http://example.com/segment.tar.gz";;
+    TestResult expectedResult = new TestResult(
+        RealtimeSegmentDataManager.State.DISCARDED, false);
+
+    runCompletionModeDownloadTest(downloadUrl, downloadUrl, expectedResult);
+  }
+
+  private void testCompletionModeDownloadWithMissingUrl()
+      throws Exception {
+    TestResult expectedResult = new TestResult(
+        RealtimeSegmentDataManager.State.RETAINED, true);
+
+    runCompletionModeDownloadTest(null, null, expectedResult);
+  }
+
+  private void testCompletionModeDownloadWithEmptyUrl()
+      throws Exception {
+    TestResult expectedResult = new TestResult(
+        RealtimeSegmentDataManager.State.RETAINED, true);
+
+    runCompletionModeDownloadTest("", "", expectedResult);
+  }
+
+  /**
+   * Helper method to run completion mode download tests with different 
download URL scenarios.
+   *
+   * @param freshMetadataDownloadUrl The download URL returned by 
fetchZKMetadata (fresh metadata)
+   * @param initialSegmentDownloadUrl The download URL in the initial segment 
metadata
+   * @param expectedResult The expected test result (state and 
buildAndReplaceCalled flag)
+   */
+  private void runCompletionModeDownloadTest(String freshMetadataDownloadUrl,
+      String initialSegmentDownloadUrl, TestResult expectedResult)
+      throws Exception {
+
+    // Create table config with DOWNLOAD completion mode
+    TableConfig tableConfig = createTableConfigWithDownloadCompletionMode();
+
+    // Create mock table data manager with the specified download URL
+    RealtimeTableDataManager mockTableDataManager = 
createMockTableDataManager(freshMetadataDownloadUrl);
+
+    // Create segment data manager with DOWNLOAD completion mode
+    SegmentZKMetadata segmentZKMetadata = createZkMetadata();
+    if (initialSegmentDownloadUrl != null) {
+      segmentZKMetadata.setDownloadUrl(initialSegmentDownloadUrl);
+    }
+
+    LLCSegmentName llcSegmentName = new LLCSegmentName(SEGMENT_NAME_STR);
+    Schema schema = Fixtures.createSchema();
+    ServerMetrics serverMetrics = new 
ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry());
+
+    try (FakeRealtimeSegmentDataManager segmentDataManager =
+        new FakeRealtimeSegmentDataManager(segmentZKMetadata, tableConfig, 
mockTableDataManager,
+            new File(TEMP_DIR, REALTIME_TABLE_NAME).getAbsolutePath(), schema, 
llcSegmentName,
+            _partitionGroupIdToConsumerCoordinatorMap, serverMetrics, new 
TimeSupplier())) {
+
+      // Execute the consumer with KEEP response
+      executeConsumerWithKeepResponse(segmentDataManager);
+
+      // Verify the expected results
+      Assert.assertEquals(segmentDataManager._state.get(segmentDataManager),
+          expectedResult._expectedState);
+      Assert.assertEquals(segmentDataManager._buildAndReplaceCalled,
+          expectedResult._expectedBuildAndReplaceCalled);
+      Assert.assertTrue(segmentDataManager._responses.isEmpty());
+      Assert.assertTrue(segmentDataManager._consumeOffsets.isEmpty());
+    }
+  }
+
+  /**
+   * Creates a table config with DOWNLOAD completion mode.
+   */
+  private TableConfig createTableConfigWithDownloadCompletionMode()
+      throws Exception {
+    TableConfig tableConfig = createTableConfig();
+    tableConfig.getValidationConfig().setCompletionConfig(
+        new org.apache.pinot.spi.config.table.CompletionConfig("DOWNLOAD"));
+    return tableConfig;
+  }
+
+  /**
+   * Creates a mock table data manager that returns the specified download URL 
in fresh metadata.
+   */
+  private RealtimeTableDataManager createMockTableDataManager(String 
downloadUrl) {
+    RealtimeTableDataManager mockTableDataManager = 
mock(RealtimeTableDataManager.class);
+    when(mockTableDataManager.getInstanceId()).thenReturn("server-1");
+    
when(mockTableDataManager.getSegmentLock(any())).thenReturn(mock(Lock.class));
+
+    // Mock fetchZKMetadata to return metadata with the specified download URL
+    SegmentZKMetadata metadata = createZkMetadata();
+    metadata.setDownloadUrl(downloadUrl);
+    
when(mockTableDataManager.fetchZKMetadata(SEGMENT_NAME_STR)).thenReturn(metadata);
+
+    // Set up stats history and consumer directory
+    RealtimeSegmentStatsHistory statsHistory = 
mock(RealtimeSegmentStatsHistory.class);
+    when(statsHistory.getEstimatedCardinality(anyString())).thenReturn(200);
+    when(statsHistory.getEstimatedAvgColSize(anyString())).thenReturn(32);
+    when(mockTableDataManager.getStatsHistory()).thenReturn(statsHistory);
+    
when(mockTableDataManager.getConsumerDir()).thenReturn(TEMP_DIR.getAbsolutePath()
 + "/consumerDir");
+
+    // Set up consumer coordinator in the map
+    _partitionGroupIdToConsumerCoordinatorMap.putIfAbsent(PARTITION_GROUP_ID,
+        new ConsumerCoordinator(false, mockTableDataManager));
+
+    return mockTableDataManager;
+  }
+
+  /**
+   * Executes the consumer with a KEEP response to trigger the completion mode 
logic.
+   */
+  private void executeConsumerWithKeepResponse(FakeRealtimeSegmentDataManager 
segmentDataManager) {
+    RealtimeSegmentDataManager.PartitionConsumer consumer = 
segmentDataManager.createPartitionConsumer();
+    final LongMsgOffset endOffset = new LongMsgOffset(START_OFFSET_VALUE + 
500);
+    segmentDataManager._consumeOffsets.add(endOffset);
+
+    final SegmentCompletionProtocol.Response response = new 
SegmentCompletionProtocol.Response(
+        new SegmentCompletionProtocol.Response.Params().withStatus(
+                SegmentCompletionProtocol.ControllerResponseStatus.KEEP)
+            .withStreamPartitionMsgOffset(endOffset.toString()));
+    segmentDataManager._responses.add(response);
+
+    consumer.run();
+  }
+
+  /**
+   * Helper class to encapsulate expected test results for segment state 
transitions.
+   * <p>
+   * This class holds the expected state of a {@link 
RealtimeSegmentDataManager} after a test,
+   * as well as whether the build-and-replace operation was expected to be 
called.
+   */
+  private static class TestResult {
+    /**
+     * The expected state of the {@link RealtimeSegmentDataManager} after the 
test execution.
+     */
+    final RealtimeSegmentDataManager.State _expectedState;
+    /**
+     * Whether the build-and-replace operation was expected to be called 
during the test.
+     */
+    final boolean _expectedBuildAndReplaceCalled;
+
+    /**
+     * Constructs a TestResult with the expected state and build-and-replace 
flag.
+     *
+     * @param expectedState The expected state of the segment manager after 
the test.
+     * @param expectedBuildAndReplaceCalled Whether build-and-replace was 
expected to be called.
+     */
+    TestResult(RealtimeSegmentDataManager.State expectedState, boolean 
expectedBuildAndReplaceCalled) {
+      _expectedState = expectedState;
+      _expectedBuildAndReplaceCalled = expectedBuildAndReplaceCalled;
+    }
+  }
+
   private static class TimeSupplier implements Supplier<Long> {
     protected final AtomicInteger _timeCheckCounter = new AtomicInteger();
     protected long _timeNow = System.currentTimeMillis();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to