This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 959548c75e3 allow fallback to retain mode if download url is missing
for CompletionMode.DOWNLOAD (#16810)
959548c75e3 is described below
commit 959548c75e3459a95ce4a42f9cab19e40a23444e
Author: Xiang Fu <[email protected]>
AuthorDate: Thu Oct 23 17:59:25 2025 -0700
allow fallback to retain mode if download url is missing for
CompletionMode.DOWNLOAD (#16810)
* Add download URL validation for CompletionMode.DOWNLOAD with graceful
fallback
- Validate download URL availability before proceeding with DOWNLOAD
completion mode
- Fetch fresh segment metadata from ZooKeeper to handle race conditions
where other replicas may have set the download URL
- Gracefully fall back to RETAINING mode when download URL is missing,
null, or empty
- Apply validation in both KEEP case and goOnlineFromConsuming method for
comprehensive coverage
- Add comprehensive test suite with 3 test cases covering valid URL,
missing URL, and empty URL scenarios
- Ensure robust handling of segment completion when download infrastructure
is not available
This prevents failures in DOWNLOAD completion mode when segment metadata
lacks download URLs,
ensuring system reliability and graceful degradation to local segment
building.
* Support peer download
---
.../core/data/manager/BaseTableDataManager.java | 8 +
.../realtime/RealtimeSegmentDataManager.java | 127 +++++++++-------
.../manager/realtime/RealtimeTableDataManager.java | 2 -
.../realtime/RealtimeSegmentDataManagerTest.java | 168 +++++++++++++++++++++
4 files changed, 252 insertions(+), 53 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
index b70419e4f51..5963e963bfe 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
@@ -1656,4 +1656,12 @@ public abstract class BaseTableDataManager implements
TableDataManager {
}
}
}
+
+ /**
+ * Returns the configured peer download scheme if peer-to-peer download is
enabled; otherwise null.
+ */
+ @Nullable
+ public String getPeerDownloadScheme() {
+ return _peerDownloadScheme;
+ }
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
index 17336e89ff2..d3458f05944 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
@@ -850,8 +850,27 @@ public class RealtimeSegmentDataManager extends
SegmentDataManager {
break;
case KEEP: {
if (_segmentCompletionMode == CompletionMode.DOWNLOAD) {
- _state = State.DISCARDED;
- break;
+ // Fetch fresh metadata from ZooKeeper to check if download
URL has been set by another replica
+ String downloadUrl =
_realtimeTableDataManager.fetchZKMetadata(_segmentNameStr).getDownloadUrl();
+ if (StringUtils.isNotEmpty(downloadUrl)) {
+ _segmentLogger.info(
+ "CompletionMode is DOWNLOAD and download URL is
available for segment: {}. URL: {}",
+ _segmentNameStr, downloadUrl);
+ _state = State.DISCARDED; // will trigger download on ONLINE
transition
+ break;
+ }
+ // No download URL yet. If peer download is enabled, prefer
download path by discarding and letting
+ // ONLINE transition perform the peer download. Otherwise,
fall back to retaining (local build).
+ if (_realtimeTableDataManager.getPeerDownloadScheme() != null)
{
+ _segmentLogger.warn(
+ "CompletionMode is DOWNLOAD but URL missing for segment:
{}. Peer download enabled; "
+ + "preferring download path via ONLINE transition.",
_segmentNameStr);
+ _state = State.DISCARDED; // ONLINE transition will download
from peers or deep store
+ break;
+ }
+ _segmentLogger.warn(
+ "CompletionMode is DOWNLOAD but download URL is missing
for segment: {} and peer download "
+ + "is disabled. Falling back to RETAINING mode",
_segmentNameStr);
}
_state = State.RETAINING;
// Lock the segment to avoid multiple threads touching the same
segment.
@@ -1460,58 +1479,64 @@ public class RealtimeSegmentDataManager extends
SegmentDataManager {
case CATCHING_UP:
case HOLDING:
case INITIAL_CONSUMING:
- switch (_segmentCompletionMode) {
- case DOWNLOAD:
- _segmentLogger.info("State {}. CompletionMode {}. Downloading to
replace", _state.toString(),
- _segmentCompletionMode);
+ if (_segmentCompletionMode == CompletionMode.DOWNLOAD) {
+ // Check if download URL has been set by another replica
+ String downloadUrl = segmentZKMetadata.getDownloadUrl();
+ if (StringUtils.isNotEmpty(downloadUrl)) {
+ _segmentLogger.info("State {}. CompletionMode {}. Downloading to
replace with fresh metadata. URL: {}",
+ _state.toString(), _segmentCompletionMode, downloadUrl);
downloadSegmentAndReplace(segmentZKMetadata);
break;
- case DEFAULT:
- // Allow to catch up upto final offset, and then replace.
- if (_currentOffset.compareTo(endOffset) > 0) {
- // We moved ahead of the offset that is committed in ZK.
- _segmentLogger
- .warn("Current offset {} ahead of the offset in zk {}.
Downloading to replace", _currentOffset,
- endOffset);
- downloadSegmentAndReplace(segmentZKMetadata);
- } else if (_currentOffset.compareTo(endOffset) == 0) {
- _segmentLogger
- .info("Current offset {} matches offset in zk {}.
Replacing segment", _currentOffset, endOffset);
- if (!buildSegmentAndReplace()) {
- _segmentLogger.warn("Failed to build the segment: {} and
replace. Downloading to replace",
- _segmentNameStr);
- downloadSegmentAndReplace(segmentZKMetadata);
- }
- } else {
- boolean success = false;
- // Since online helix transition for a segment can arrive
before segment's consumer acquires the
- // semaphore, check _consumerSemaphoreAcquired before catching
up.
- // This is to avoid consuming in parallel to another segment's
consumer.
- if (_consumerSemaphoreAcquired.get()) {
- _segmentLogger.info("Attempting to catch up from offset {}
to {} ", _currentOffset, endOffset);
- success = catchupToFinalOffset(endOffset,
-
TimeUnit.MILLISECONDS.convert(MAX_TIME_FOR_CONSUMING_TO_ONLINE_IN_SECONDS,
TimeUnit.SECONDS));
- } else {
- _segmentLogger.warn("Consumer semaphore was not acquired,
Skipping catch up from offset {} to {} ",
- _currentOffset, endOffset);
- }
+ }
+ if (_realtimeTableDataManager.getPeerDownloadScheme() != null) {
+ _segmentLogger.warn("State {}. CompletionMode is DOWNLOAD but
URL missing; attempting download via peers "
+ + "or deep store if available for segment: {}",
_state.toString(), _segmentNameStr);
+ downloadSegmentAndReplace(segmentZKMetadata);
+ break;
+ }
+ _segmentLogger.warn("State {}. CompletionMode is DOWNLOAD but
download URL is missing and peer download is "
+ + "disabled. Falling back to local build for segment: {}",
_state.toString(), _segmentNameStr);
+ }
+ // Allow to catch up upto final offset, and then replace.
+ if (_currentOffset.compareTo(endOffset) > 0) {
+ // We moved ahead of the offset that is committed in ZK.
+ _segmentLogger.warn("Current offset {} ahead of the offset in zk
{}. Downloading to replace",
+ _currentOffset, endOffset);
+ downloadSegmentAndReplace(segmentZKMetadata);
+ } else if (_currentOffset.compareTo(endOffset) == 0) {
+ _segmentLogger.info("Current offset {} matches offset in zk {}.
Replacing segment", _currentOffset,
+ endOffset);
+ if (!buildSegmentAndReplace()) {
+ _segmentLogger.warn("Failed to build the segment: {} and
replace. Downloading to replace",
+ _segmentNameStr);
+ downloadSegmentAndReplace(segmentZKMetadata);
+ }
+ } else {
+ boolean success = false;
+ // Since online helix transition for a segment can arrive before
segment's consumer acquires the
+ // semaphore, check _consumerSemaphoreAcquired before catching up.
+ // This is to avoid consuming in parallel to another segment's
consumer.
+ if (_consumerSemaphoreAcquired.get()) {
+ _segmentLogger.info("Attempting to catch up from offset {} to {}
", _currentOffset, endOffset);
+ success = catchupToFinalOffset(endOffset,
+
TimeUnit.MILLISECONDS.convert(MAX_TIME_FOR_CONSUMING_TO_ONLINE_IN_SECONDS,
TimeUnit.SECONDS));
+ } else {
+ _segmentLogger.warn("Consumer semaphore was not acquired,
Skipping catch up from offset {} to {} ",
+ _currentOffset, endOffset);
+ }
- if (success) {
- _segmentLogger.info("Caught up to offset {}",
_currentOffset);
- if (!buildSegmentAndReplace()) {
- _segmentLogger.warn("Failed to build the segment: {} after
catchup. Downloading to replace",
- _segmentNameStr);
- downloadSegmentAndReplace(segmentZKMetadata);
- }
- } else {
- _segmentLogger
- .info("Could not catch up to offset (current = {}).
Downloading to replace", _currentOffset);
- downloadSegmentAndReplace(segmentZKMetadata);
- }
+ if (success) {
+ _segmentLogger.info("Caught up to offset {}", _currentOffset);
+ if (!buildSegmentAndReplace()) {
+ _segmentLogger.warn("Failed to build the segment: {} after
catchup. Downloading to replace",
+ _segmentNameStr);
+ downloadSegmentAndReplace(segmentZKMetadata);
}
- break;
- default:
- break;
+ } else {
+ _segmentLogger
+ .info("Could not catch up to offset (current = {}).
Downloading to replace", _currentOffset);
+ downloadSegmentAndReplace(segmentZKMetadata);
+ }
}
break;
default:
@@ -1788,7 +1813,7 @@ public class RealtimeSegmentDataManager extends
SegmentDataManager {
"Failed to initialize segment data manager", t));
_segmentLogger.warn(
"Scheduling task to call controller to mark the segment as OFFLINE
in Ideal State due"
- + " to initialization error: '{}'",
+ + " to initialization error: '{}'",
t.getMessage());
// Since we are going to throw exception from this thread (helix
execution thread), the externalview
// entry for this segment will be ERROR. We allow time for Helix to make
this transition, and then
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index ff0d428ca6e..c8fdb110f20 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -581,8 +581,6 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
throws Exception {
Status status = zkMetadata.getStatus();
if (status.isCompleted()) {
- // Segment is completed and ready to be downloaded either from deep
storage or from a peer (if peer-to-peer
- // download is enabled).
return super.downloadSegment(zkMetadata);
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java
index 59d25a32151..8107625b64b 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java
@@ -859,6 +859,174 @@ public class RealtimeSegmentDataManagerTest {
}
}
+ @Test
+ public void testCompletionModeDownloadWithUrlValidation()
+ throws Exception {
+ // Test the new validation logic for CompletionMode.DOWNLOAD
+
+ // Test Case 1: Valid download URL - should proceed with DOWNLOAD mode
+ testCompletionModeDownloadWithValidUrl();
+
+ // Test Case 2: Missing/null download URL - should fall back to RETAINING
mode
+ testCompletionModeDownloadWithMissingUrl();
+
+ // Test Case 3: Empty download URL - should fall back to RETAINING mode
+ testCompletionModeDownloadWithEmptyUrl();
+ }
+
+ private void testCompletionModeDownloadWithValidUrl()
+ throws Exception {
+ String downloadUrl = "http://example.com/segment.tar.gz";
+ TestResult expectedResult = new TestResult(
+ RealtimeSegmentDataManager.State.DISCARDED, false);
+
+ runCompletionModeDownloadTest(downloadUrl, downloadUrl, expectedResult);
+ }
+
+ private void testCompletionModeDownloadWithMissingUrl()
+ throws Exception {
+ TestResult expectedResult = new TestResult(
+ RealtimeSegmentDataManager.State.RETAINED, true);
+
+ runCompletionModeDownloadTest(null, null, expectedResult);
+ }
+
+ private void testCompletionModeDownloadWithEmptyUrl()
+ throws Exception {
+ TestResult expectedResult = new TestResult(
+ RealtimeSegmentDataManager.State.RETAINED, true);
+
+ runCompletionModeDownloadTest("", "", expectedResult);
+ }
+
+ /**
+ * Helper method to run completion mode download tests with different
download URL scenarios.
+ *
+ * @param freshMetadataDownloadUrl The download URL returned by
fetchZKMetadata (fresh metadata)
+ * @param initialSegmentDownloadUrl The download URL in the initial segment
metadata
+ * @param expectedResult The expected test result (state and
buildAndReplaceCalled flag)
+ */
+ private void runCompletionModeDownloadTest(String freshMetadataDownloadUrl,
+ String initialSegmentDownloadUrl, TestResult expectedResult)
+ throws Exception {
+
+ // Create table config with DOWNLOAD completion mode
+ TableConfig tableConfig = createTableConfigWithDownloadCompletionMode();
+
+ // Create mock table data manager with the specified download URL
+ RealtimeTableDataManager mockTableDataManager =
createMockTableDataManager(freshMetadataDownloadUrl);
+
+ // Create segment data manager with DOWNLOAD completion mode
+ SegmentZKMetadata segmentZKMetadata = createZkMetadata();
+ if (initialSegmentDownloadUrl != null) {
+ segmentZKMetadata.setDownloadUrl(initialSegmentDownloadUrl);
+ }
+
+ LLCSegmentName llcSegmentName = new LLCSegmentName(SEGMENT_NAME_STR);
+ Schema schema = Fixtures.createSchema();
+ ServerMetrics serverMetrics = new
ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry());
+
+ try (FakeRealtimeSegmentDataManager segmentDataManager =
+ new FakeRealtimeSegmentDataManager(segmentZKMetadata, tableConfig,
mockTableDataManager,
+ new File(TEMP_DIR, REALTIME_TABLE_NAME).getAbsolutePath(), schema,
llcSegmentName,
+ _partitionGroupIdToConsumerCoordinatorMap, serverMetrics, new
TimeSupplier())) {
+
+ // Execute the consumer with KEEP response
+ executeConsumerWithKeepResponse(segmentDataManager);
+
+ // Verify the expected results
+ Assert.assertEquals(segmentDataManager._state.get(segmentDataManager),
+ expectedResult._expectedState);
+ Assert.assertEquals(segmentDataManager._buildAndReplaceCalled,
+ expectedResult._expectedBuildAndReplaceCalled);
+ Assert.assertTrue(segmentDataManager._responses.isEmpty());
+ Assert.assertTrue(segmentDataManager._consumeOffsets.isEmpty());
+ }
+ }
+
+ /**
+ * Creates a table config with DOWNLOAD completion mode.
+ */
+ private TableConfig createTableConfigWithDownloadCompletionMode()
+ throws Exception {
+ TableConfig tableConfig = createTableConfig();
+ tableConfig.getValidationConfig().setCompletionConfig(
+ new org.apache.pinot.spi.config.table.CompletionConfig("DOWNLOAD"));
+ return tableConfig;
+ }
+
+ /**
+ * Creates a mock table data manager that returns the specified download URL
in fresh metadata.
+ */
+ private RealtimeTableDataManager createMockTableDataManager(String
downloadUrl) {
+ RealtimeTableDataManager mockTableDataManager =
mock(RealtimeTableDataManager.class);
+ when(mockTableDataManager.getInstanceId()).thenReturn("server-1");
+
when(mockTableDataManager.getSegmentLock(any())).thenReturn(mock(Lock.class));
+
+ // Mock fetchZKMetadata to return metadata with the specified download URL
+ SegmentZKMetadata metadata = createZkMetadata();
+ metadata.setDownloadUrl(downloadUrl);
+
when(mockTableDataManager.fetchZKMetadata(SEGMENT_NAME_STR)).thenReturn(metadata);
+
+ // Set up stats history and consumer directory
+ RealtimeSegmentStatsHistory statsHistory =
mock(RealtimeSegmentStatsHistory.class);
+ when(statsHistory.getEstimatedCardinality(anyString())).thenReturn(200);
+ when(statsHistory.getEstimatedAvgColSize(anyString())).thenReturn(32);
+ when(mockTableDataManager.getStatsHistory()).thenReturn(statsHistory);
+
when(mockTableDataManager.getConsumerDir()).thenReturn(TEMP_DIR.getAbsolutePath()
+ "/consumerDir");
+
+ // Set up consumer coordinator in the map
+ _partitionGroupIdToConsumerCoordinatorMap.putIfAbsent(PARTITION_GROUP_ID,
+ new ConsumerCoordinator(false, mockTableDataManager));
+
+ return mockTableDataManager;
+ }
+
+ /**
+ * Executes the consumer with a KEEP response to trigger the completion mode
logic.
+ */
+ private void executeConsumerWithKeepResponse(FakeRealtimeSegmentDataManager
segmentDataManager) {
+ RealtimeSegmentDataManager.PartitionConsumer consumer =
segmentDataManager.createPartitionConsumer();
+ final LongMsgOffset endOffset = new LongMsgOffset(START_OFFSET_VALUE +
500);
+ segmentDataManager._consumeOffsets.add(endOffset);
+
+ final SegmentCompletionProtocol.Response response = new
SegmentCompletionProtocol.Response(
+ new SegmentCompletionProtocol.Response.Params().withStatus(
+ SegmentCompletionProtocol.ControllerResponseStatus.KEEP)
+ .withStreamPartitionMsgOffset(endOffset.toString()));
+ segmentDataManager._responses.add(response);
+
+ consumer.run();
+ }
+
+ /**
+ * Helper class to encapsulate expected test results for segment state
transitions.
+ * <p>
+ * This class holds the expected state of a {@link
RealtimeSegmentDataManager} after a test,
+ * as well as whether the build-and-replace operation was expected to be
called.
+ */
+ private static class TestResult {
+ /**
+ * The expected state of the {@link RealtimeSegmentDataManager} after the
test execution.
+ */
+ final RealtimeSegmentDataManager.State _expectedState;
+ /**
+ * Whether the build-and-replace operation was expected to be called
during the test.
+ */
+ final boolean _expectedBuildAndReplaceCalled;
+
+ /**
+ * Constructs a TestResult with the expected state and build-and-replace
flag.
+ *
+ * @param expectedState The expected state of the segment manager after
the test.
+ * @param expectedBuildAndReplaceCalled Whether build-and-replace was
expected to be called.
+ */
+ TestResult(RealtimeSegmentDataManager.State expectedState, boolean
expectedBuildAndReplaceCalled) {
+ _expectedState = expectedState;
+ _expectedBuildAndReplaceCalled = expectedBuildAndReplaceCalled;
+ }
+ }
+
private static class TimeSupplier implements Supplier<Long> {
protected final AtomicInteger _timeCheckCounter = new AtomicInteger();
protected long _timeNow = System.currentTimeMillis();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]