This is an automated email from the ASF dual-hosted git repository.
somandal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new ce33d0d9184 Add support to process segment refresh messages
asynchronously if enabled (#16931)
ce33d0d9184 is described below
commit ce33d0d918485e3aae6b42ee72c428d02672409e
Author: Sonam Mandal <[email protected]>
AuthorDate: Thu Oct 9 13:54:43 2025 -0700
Add support to process segment refresh messages asynchronously if enabled
(#16931)
* Add support to process segment refresh messages asynchronously if enabled
* Add tests
* Address review comments
* Rename executor to ReloadRefresh
* Log exception on replace failure during enqueue
---
.../core/data/manager/BaseTableDataManager.java | 42 ++-
.../core/data/manager/InstanceDataManager.java | 5 +
.../provider/DefaultTableDataManagerProvider.java | 7 +-
.../manager/provider/TableDataManagerProvider.java | 6 +-
.../BaseTableDataManagerAcquireSegmentTest.java | 3 +-
...ableDataManagerEnqueueSegmentToReplaceTest.java | 336 +++++++++++++++++++++
.../data/manager/BaseTableDataManagerTest.java | 177 ++++++++++-
.../offline/DimensionTableDataManagerTest.java | 3 +-
.../FailureInjectingTableDataManagerProvider.java | 7 +-
.../perf/BenchmarkDimensionTableOverhead.java | 3 +-
.../local/data/manager/TableDataManager.java | 4 +-
.../local/utils/SegmentReloadSemaphore.java | 4 +-
.../starter/helix/HelixInstanceDataManager.java | 22 +-
.../helix/HelixInstanceDataManagerConfig.java | 10 +
.../apache/pinot/server/api/BaseResourceTest.java | 2 +-
.../config/instance/InstanceDataManagerConfig.java | 2 +
16 files changed, 604 insertions(+), 29 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
index 0abd9bda1be..0536f9d6f30 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
@@ -129,7 +129,7 @@ public abstract class BaseTableDataManager implements
TableDataManager {
protected File _resourceTmpDir;
protected Logger _logger;
protected SegmentReloadSemaphore _segmentReloadSemaphore;
- protected ExecutorService _segmentReloadExecutor;
+ protected ExecutorService _segmentReloadRefreshExecutor;
@Nullable
protected ExecutorService _segmentPreloadExecutor;
protected AuthProvider _authProvider;
@@ -156,12 +156,14 @@ public abstract class BaseTableDataManager implements
TableDataManager {
protected volatile boolean _shutDown;
protected volatile boolean _isDeleted;
+ protected boolean _enableAsyncSegmentRefresh;
+
@Override
public void init(InstanceDataManagerConfig instanceDataManagerConfig,
HelixManager helixManager,
SegmentLocks segmentLocks, TableConfig tableConfig, Schema schema,
SegmentReloadSemaphore segmentReloadSemaphore,
- ExecutorService segmentReloadExecutor, @Nullable ExecutorService
segmentPreloadExecutor,
+ ExecutorService segmentReloadRefreshExecutor, @Nullable ExecutorService
segmentPreloadExecutor,
@Nullable Cache<Pair<String, String>, SegmentErrorInfo> errorCache,
- @Nullable SegmentOperationsThrottler segmentOperationsThrottler) {
+ @Nullable SegmentOperationsThrottler segmentOperationsThrottler, boolean
enableAsyncSegmentRefresh) {
LOGGER.info("Initializing table data manager for table: {}",
tableConfig.getTableName());
_instanceDataManagerConfig = instanceDataManagerConfig;
@@ -170,8 +172,9 @@ public abstract class BaseTableDataManager implements
TableDataManager {
_propertyStore = helixManager.getHelixPropertyStore();
_segmentLocks = segmentLocks;
_segmentReloadSemaphore = segmentReloadSemaphore;
- _segmentReloadExecutor = segmentReloadExecutor;
+ _segmentReloadRefreshExecutor = segmentReloadRefreshExecutor;
_segmentPreloadExecutor = segmentPreloadExecutor;
+ _enableAsyncSegmentRefresh = enableAsyncSegmentRefresh;
_authProvider =
AuthProviderUtils.extractAuthProvider(instanceDataManagerConfig.getAuthConfig(),
null);
_tableNameWithType = tableConfig.getTableName();
@@ -230,8 +233,11 @@ public abstract class BaseTableDataManager implements
TableDataManager {
} else {
_segmentDownloadSemaphore = null;
}
+
_logger = LoggerFactory.getLogger(_tableNameWithType + "-" +
getClass().getSimpleName());
+ _logger.info("Async segment refresh is {}!", _enableAsyncSegmentRefresh ?
"enabled" : "disabled");
+
doInit();
_logger.info("Initialized table data manager with data directory: {}",
_tableDataDir);
@@ -451,6 +457,32 @@ public abstract class BaseTableDataManager implements
TableDataManager {
@Override
public void replaceSegment(String segmentName)
throws Exception {
+ if (_enableAsyncSegmentRefresh) {
+ enqueueSegmentToReplace(segmentName);
+ } else {
+ replaceSegmentInternal(segmentName);
+ }
+ }
+
+ protected void enqueueSegmentToReplace(String segmentName) {
+ assert _enableAsyncSegmentRefresh;
+ if (_shutDown) {
+ _logger.warn("Shutdown in progress, skip enqueuing segment: {} to
replace", segmentName);
+ return;
+ }
+ _logger.info("Enqueuing segment: {} to be replaced", segmentName);
+ _segmentReloadRefreshExecutor.submit(() -> {
+ try {
+ replaceSegmentInternal(segmentName);
+ } catch (Exception e) {
+ LOGGER.error("Caught exception while replacing segment: {}",
segmentName, e);
+ _serverMetrics.addMeteredTableValue(_tableNameWithType,
ServerMeter.REFRESH_FAILURES, 1);
+ }
+ });
+ }
+
+ protected void replaceSegmentInternal(String segmentName)
+ throws Exception {
Preconditions.checkState(!_shutDown,
"Table data manager is already shut down, cannot replace segment: %s
in table: %s", segmentName,
_tableNameWithType);
@@ -760,7 +792,7 @@ public abstract class BaseTableDataManager implements
TableDataManager {
failedSegments.add(segmentName);
sampleException.set(t);
}
- }, _segmentReloadExecutor)).toArray(CompletableFuture[]::new)).get();
+ },
_segmentReloadRefreshExecutor)).toArray(CompletableFuture[]::new)).get();
if (sampleException.get() != null) {
throw new RuntimeException(
String.format("Failed to reload %d/%d segments: %s in table: %s",
failedSegments.size(),
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java
index 15ef79f465a..ba5f93f5e6a 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java
@@ -169,6 +169,11 @@ public interface InstanceDataManager {
*/
int getMaxParallelRefreshThreads();
+ /**
+ * Returns true if background processing for SEGMENT_REFRESH is enabled,
false otherwise
+ */
+ boolean isAsyncSegmentRefreshEnabled();
+
/**
* Returns the Helix property store.
*/
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/DefaultTableDataManagerProvider.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/DefaultTableDataManagerProvider.java
index b750016f74a..ba8ada89160 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/DefaultTableDataManagerProvider.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/DefaultTableDataManagerProvider.java
@@ -66,10 +66,10 @@ public class DefaultTableDataManagerProvider implements
TableDataManagerProvider
@Override
public TableDataManager getTableDataManager(TableConfig tableConfig, Schema
schema,
- SegmentReloadSemaphore segmentReloadSemaphore, ExecutorService
segmentReloadExecutor,
+ SegmentReloadSemaphore segmentReloadSemaphore, ExecutorService
segmentReloadRefreshExecutor,
@Nullable ExecutorService segmentPreloadExecutor,
@Nullable Cache<Pair<String, String>, SegmentErrorInfo> errorCache,
- Supplier<Boolean> isServerReadyToServeQueries) {
+ Supplier<Boolean> isServerReadyToServeQueries, boolean
enableAsyncSegmentRefresh) {
TableDataManager tableDataManager;
switch (tableConfig.getTableType()) {
case OFFLINE:
@@ -93,7 +93,8 @@ public class DefaultTableDataManagerProvider implements
TableDataManagerProvider
throw new IllegalStateException();
}
tableDataManager.init(_instanceDataManagerConfig, _helixManager,
_segmentLocks, tableConfig, schema,
- segmentReloadSemaphore, segmentReloadExecutor, segmentPreloadExecutor,
errorCache, _segmentOperationsThrottler);
+ segmentReloadSemaphore, segmentReloadRefreshExecutor,
segmentPreloadExecutor, errorCache,
+ _segmentOperationsThrottler, enableAsyncSegmentRefresh);
return tableDataManager;
}
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/TableDataManagerProvider.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/TableDataManagerProvider.java
index 63bfed0b0e6..1c558aaba6e 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/TableDataManagerProvider.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/TableDataManagerProvider.java
@@ -47,14 +47,14 @@ public interface TableDataManagerProvider {
@Nullable SegmentOperationsThrottler segmentOperationsThrottler);
TableDataManager getTableDataManager(TableConfig tableConfig, Schema schema,
- SegmentReloadSemaphore segmentRefreshSemaphore, ExecutorService
segmentRefreshExecutor,
+ SegmentReloadSemaphore segmentRefreshSemaphore, ExecutorService
segmentReloadRefreshExecutor,
@Nullable ExecutorService segmentPreloadExecutor,
@Nullable Cache<Pair<String, String>, SegmentErrorInfo> errorCache,
- Supplier<Boolean> isServerReadyToServeQueries);
+ Supplier<Boolean> isServerReadyToServeQueries, boolean
enableAsyncSegmentRefresh);
@VisibleForTesting
default TableDataManager getTableDataManager(TableConfig tableConfig, Schema
schema) {
return getTableDataManager(tableConfig, schema, new
SegmentReloadSemaphore(1), Executors.newSingleThreadExecutor(),
- null, null, () -> true);
+ null, null, () -> true, false);
}
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerAcquireSegmentTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerAcquireSegmentTest.java
index 16cceb3ebe6..4a953236f5e 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerAcquireSegmentTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerAcquireSegmentTest.java
@@ -134,7 +134,8 @@ public class BaseTableDataManagerAcquireSegmentTest {
new SegmentMultiColTextIndexPreprocessThrottler(4, 8, true));
TableDataManager tableDataManager = new OfflineTableDataManager();
tableDataManager.init(instanceDataManagerConfig, mock(HelixManager.class),
new SegmentLocks(), tableConfig, schema,
- new SegmentReloadSemaphore(1), Executors.newSingleThreadExecutor(),
null, null, segmentOperationsThrottler);
+ new SegmentReloadSemaphore(1), Executors.newSingleThreadExecutor(),
null, null, segmentOperationsThrottler,
+ false);
tableDataManager.start();
Field segsMapField =
BaseTableDataManager.class.getDeclaredField("_segmentDataManagerMap");
segsMapField.setAccessible(true);
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerEnqueueSegmentToReplaceTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerEnqueueSegmentToReplaceTest.java
new file mode 100644
index 00000000000..fd9318f3ff0
--- /dev/null
+++
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerEnqueueSegmentToReplaceTest.java
@@ -0,0 +1,336 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.data.manager;
+
+import java.io.File;
+import java.lang.reflect.Field;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.commons.io.FileUtils;
+import org.apache.helix.HelixManager;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.common.metrics.ServerMeter;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager;
+import org.apache.pinot.core.data.manager.offline.OfflineTableDataManager;
+import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
+import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
+import org.apache.pinot.segment.local.utils.SegmentLocks;
+import org.apache.pinot.segment.local.utils.SegmentReloadSemaphore;
+import org.apache.pinot.segment.spi.ImmutableSegment;
+import org.apache.pinot.segment.spi.SegmentMetadata;
+import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.apache.pinot.util.TestUtils;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+
+
+/**
+ * Tests for async segment refresh enabled
+ */
+public class BaseTableDataManagerEnqueueSegmentToReplaceTest {
+ private static final File TEMP_DIR =
+ new File(FileUtils.getTempDirectory(),
"BaseTableDataManagerEnqueueSegmentToReplaceTest");
+ private static final String RAW_TABLE_NAME = "testTable";
+ private static final String SEGMENT_NAME = "testSegment";
+
+ private static final TableConfig TABLE_CONFIG =
+ new
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).build();
+ private static final Schema SCHEMA = new
Schema.SchemaBuilder().setSchemaName(RAW_TABLE_NAME).build();
+
+ private OfflineTableDataManager _tableDataManager;
+ private ExecutorService _segmentReloadRefreshExecutor;
+ private InstanceDataManagerConfig _instanceDataManagerConfig;
+ private HelixManager _helixManager;
+ private ServerMetrics _serverMetrics;
+
+ @BeforeClass
+ public void setUp() {
+ ServerMetrics.register(mock(ServerMetrics.class));
+ }
+
+ @BeforeMethod
+ public void setUpMethod() throws Exception {
+ TestUtils.ensureDirectoriesExistAndEmpty(TEMP_DIR);
+
+ // Create a real ExecutorService for testing
+ _segmentReloadRefreshExecutor = Executors.newSingleThreadExecutor();
+
+ // Mock dependencies
+ _instanceDataManagerConfig = mock(InstanceDataManagerConfig.class);
+
when(_instanceDataManagerConfig.getInstanceDataDir()).thenReturn(TEMP_DIR.getAbsolutePath());
+
when(_instanceDataManagerConfig.getInstanceId()).thenReturn("testInstance");
+
+ _helixManager = mock(HelixManager.class);
+
+ @SuppressWarnings("unchecked")
+ ZkHelixPropertyStore<ZNRecord> propertyStore =
mock(ZkHelixPropertyStore.class);
+ when(_helixManager.getHelixPropertyStore()).thenReturn(propertyStore);
+
+ // Mock ServerMetrics for metric validation
+ _serverMetrics = mock(ServerMetrics.class);
+
+ // Create table data manager with segment refresh executor
+ _tableDataManager = spy(new OfflineTableDataManager());
+ _tableDataManager.init(_instanceDataManagerConfig, _helixManager, new
SegmentLocks(), TABLE_CONFIG,
+ SCHEMA, new SegmentReloadSemaphore(1), _segmentReloadRefreshExecutor,
null, null, null, true);
+
+ // Inject the mocked ServerMetrics into the table data manager using
reflection
+ try {
+ Field serverMetricsField =
BaseTableDataManager.class.getDeclaredField("_serverMetrics");
+ serverMetricsField.setAccessible(true);
+ serverMetricsField.set(_tableDataManager, _serverMetrics);
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to inject mocked ServerMetrics", e);
+ }
+ }
+
+ @AfterMethod
+ public void tearDownMethod() throws Exception {
+ if (_segmentReloadRefreshExecutor != null &&
!_segmentReloadRefreshExecutor.isShutdown()) {
+ _segmentReloadRefreshExecutor.shutdownNow();
+ _segmentReloadRefreshExecutor.awaitTermination(5, TimeUnit.SECONDS);
+ }
+ if (_tableDataManager != null) {
+ _tableDataManager.shutDown();
+ }
+ FileUtils.deleteDirectory(TEMP_DIR);
+ }
+
+ /**
+ * Test successful enqueueing and execution of segment replacement
+ */
+ @Test
+ public void testEnqueueSegmentToReplaceSuccess() throws Exception {
+ // Setup mocks for successful segment replacement
+ SegmentDataManager mockSegmentDataManager =
mock(ImmutableSegmentDataManager.class);
+ when(mockSegmentDataManager.getSegmentName()).thenReturn(SEGMENT_NAME);
+
+ ImmutableSegment mockSegment = mock(ImmutableSegment.class);
+ when(mockSegmentDataManager.getSegment()).thenReturn(mockSegment);
+
+ SegmentMetadata mockMetadata = mock(SegmentMetadata.class);
+ when(mockSegment.getSegmentMetadata()).thenReturn(mockMetadata);
+ when(mockMetadata.getCrc()).thenReturn("12345");
+
+ SegmentZKMetadata mockZkMetadata = mock(SegmentZKMetadata.class);
+ when(mockZkMetadata.getSegmentName()).thenReturn(SEGMENT_NAME);
+ when(mockZkMetadata.getCrc()).thenReturn(12345L);
+
+ // Add the segment to the manager's internal map
+ _tableDataManager._segmentDataManagerMap.put(SEGMENT_NAME,
mockSegmentDataManager);
+
+ // Mock the methods that will be called during segment replacement
+ doAnswer(invocation ->
mockZkMetadata).when(_tableDataManager).fetchZKMetadata(SEGMENT_NAME);
+ doAnswer(invocation -> new
IndexLoadingConfig()).when(_tableDataManager).fetchIndexLoadingConfig();
+
+ // Use CountDownLatch to wait for async execution
+ CountDownLatch latch = new CountDownLatch(1);
+ AtomicBoolean replaceSegmentIfCrcMismatchCalled = new AtomicBoolean(false);
+
+ doAnswer(invocation -> {
+ replaceSegmentIfCrcMismatchCalled.set(true);
+ latch.countDown();
+ return null;
+ }).when(_tableDataManager).replaceSegmentIfCrcMismatch(any(), any(),
any());
+
+ // Test the method
+ _tableDataManager.replaceSegment(SEGMENT_NAME);
+
+ // Wait for async execution to complete
+ assertTrue(latch.await(10, TimeUnit.SECONDS), "Segment replacement with
CRC mismatch should be executed");
+ assertTrue(replaceSegmentIfCrcMismatchCalled.get(),
"replaceSegmentIfCrcMismatch should have been called");
+
+ // Verify that the segment replacement was attempted
+ verify(_tableDataManager, times(1)).enqueueSegmentToReplace(SEGMENT_NAME);
+ verify(_tableDataManager, times(1)).fetchZKMetadata(SEGMENT_NAME);
+ verify(_tableDataManager, times(1)).fetchIndexLoadingConfig();
+ }
+
+ /**
+ * Test behavior when table manager is shut down
+ */
+ @Test
+ public void testEnqueueSegmentToReplaceWhenShutDown() throws Exception {
+ // Shutdown the table data manager
+ _tableDataManager.shutDown();
+
+ // Mock replaceSegment to track if it's called
+ doAnswer(invocation -> {
+ throw new AssertionError("replaceSegment should not be called when shut
down");
+ }).when(_tableDataManager).replaceSegmentInternal(anyString());
+
+ // Test the method - it should return early without enqueueing
+ _tableDataManager.replaceSegment(SEGMENT_NAME);
+
+ // Give some time for any potential async execution
+ Thread.sleep(1000);
+
+ // Verify that replaceSegment was never called
+ verify(_tableDataManager, never()).replaceSegmentInternal(SEGMENT_NAME);
+ verify(_tableDataManager, never()).replaceSegmentIfCrcMismatch(any(),
any(), any());
+ }
+
+ /**
+ * Test exception handling during segment replacement and metric increment
+ */
+ @Test
+ public void testEnqueueSegmentToReplaceWithException() throws Exception {
+ // Setup mocks to throw exception during replacement
+ RuntimeException testException = new RuntimeException("Test exception
during replacement");
+
doThrow(testException).when(_tableDataManager).replaceSegmentInternal(SEGMENT_NAME);
+
+ // Use CountDownLatch to wait for async execution
+ CountDownLatch latch = new CountDownLatch(1);
+ AtomicReference<Exception> caughtException = new AtomicReference<>();
+
+ // Override the executor to capture exceptions
+ ExecutorService spyExecutor = spy(_segmentReloadRefreshExecutor);
+ doAnswer(invocation -> {
+ Runnable task = invocation.getArgument(0);
+ try {
+ task.run();
+ } catch (Exception e) {
+ caughtException.set(e);
+ } finally {
+ latch.countDown();
+ }
+ return null;
+ }).when(spyExecutor).submit(any(Runnable.class));
+
+ // Replace the executor in the table data manager
+ _tableDataManager._segmentReloadRefreshExecutor = spyExecutor;
+
+ // Test the method
+ _tableDataManager.replaceSegment(SEGMENT_NAME);
+
+ // Wait for async execution to complete
+ assertTrue(latch.await(10, TimeUnit.SECONDS), "Task should complete even
with exception");
+
+ // Verify that replaceSegment was called and exception was handled
+ verify(_tableDataManager, times(1)).enqueueSegmentToReplace(SEGMENT_NAME);
+ verify(_tableDataManager, times(1)).replaceSegmentInternal(SEGMENT_NAME);
+
+ // Verify that the REFRESH_FAILURES metric was incremented
+ verify(_serverMetrics, times(1)).addMeteredTableValue(
+ eq(RAW_TABLE_NAME + "_OFFLINE"), eq(ServerMeter.REFRESH_FAILURES),
eq(1L));
+
+ // The exception should be caught and handled by the implementation, not
propagated
+ assertNull(caughtException.get());
+ }
+
+ /**
+ * Test that executor is properly used for async execution
+ */
+ @Test
+ public void testExecutorUsage() throws Exception {
+ ExecutorService mockExecutor = mock(ExecutorService.class);
+ _tableDataManager._segmentReloadRefreshExecutor = mockExecutor;
+
+ // Test the method
+ _tableDataManager.replaceSegment(SEGMENT_NAME);
+
+ // Verify that the executor's submit method was called
+ verify(mockExecutor, times(1)).submit(any(Runnable.class));
+ }
+
+ /**
+ * Test with null segment refresh executor should trigger assertion error
+ */
+ @Test(expectedExceptions = AssertionError.class)
+ public void testEnqueueSegmentToReplaceWithAsyncSegmentRefreshDisabled() {
+ // Create a table data manager without segment refresh executor
+ OfflineTableDataManager tableDataManagerWithoutExecutor = new
OfflineTableDataManager();
+ tableDataManagerWithoutExecutor.init(_instanceDataManagerConfig,
_helixManager, new SegmentLocks(),
+ TABLE_CONFIG, SCHEMA, new SegmentReloadSemaphore(1),
Executors.newSingleThreadExecutor(),
+ null, null, null, false);
+
+ // This should trigger the assertion error
+ tableDataManagerWithoutExecutor.enqueueSegmentToReplace(SEGMENT_NAME);
+ }
+
+ /**
+ * Test multiple concurrent enqueue operations
+ */
+ @Test
+ public void testConcurrentEnqueueOperations() throws Exception {
+ int numConcurrentOperations = 5;
+ CountDownLatch startLatch = new CountDownLatch(1);
+ CountDownLatch completionLatch = new
CountDownLatch(numConcurrentOperations);
+
+ // Mock replaceSegment to simulate some work and track calls
+ doAnswer(invocation -> {
+ Thread.sleep(50); // Simulate some work
+ completionLatch.countDown();
+ return null;
+ }).when(_tableDataManager).replaceSegmentInternal(anyString());
+
+ // Start multiple threads to enqueue operations
+ for (int i = 0; i < numConcurrentOperations; i++) {
+ final String segmentName = SEGMENT_NAME + "_" + i;
+ new Thread(() -> {
+ try {
+ startLatch.await();
+ _tableDataManager.replaceSegment(segmentName);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }).start();
+ }
+
+ // Start all operations
+ startLatch.countDown();
+
+ // Wait for all operations to complete
+ assertTrue(completionLatch.await(10, TimeUnit.SECONDS), "All concurrent
operations should complete");
+
+ // Verify that replaceSegment was called for each segment
+ for (int i = 0; i < numConcurrentOperations; i++) {
+ verify(_tableDataManager, times(1)).replaceSegment(SEGMENT_NAME + "_" +
i);
+ }
+ }
+}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
index 2064d393129..74c59b3a2cf 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
@@ -27,7 +27,9 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import org.apache.commons.io.FileUtils;
import org.apache.helix.HelixManager;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
@@ -38,6 +40,7 @@ import
org.apache.pinot.common.utils.fetcher.BaseSegmentFetcher;
import org.apache.pinot.common.utils.fetcher.SegmentFetcherFactory;
import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager;
import org.apache.pinot.core.data.manager.offline.OfflineTableDataManager;
+import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
import
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
@@ -75,7 +78,10 @@ import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import static org.testng.Assert.*;
@@ -394,6 +400,162 @@ public class BaseTableDataManagerTest {
assertFalse(tableDataManager.getSegmentDataDir(segmentName).exists());
}
+ @Test
+ public void testReplaceSegmentNewDataWithAsyncSegmentRefresh()
+ throws Exception {
+ SegmentZKMetadata zkMetadata = createRawSegment(SegmentVersion.v3, 5);
+
+ // Mock the case where segment is loaded but its CRC is different from
+ // the one in zk, thus raw segment is downloaded and loaded.
+ ImmutableSegmentDataManager segmentDataManager =
createImmutableSegmentDataManager(SEGMENT_NAME, 0);
+
+ BaseTableDataManager tableDataManager =
spy(createTableManagerWithAsyncSegmentRefreshEnabled());
+ File dataDir = tableDataManager.getSegmentDataDir(SEGMENT_NAME);
+ assertFalse(dataDir.exists());
+
+ // Add the segment to the manager's internal map so replaceSegment can
find it
+ tableDataManager._segmentDataManagerMap.put(SEGMENT_NAME,
segmentDataManager);
+
+ // Mock the methods that will be called during segment replacement
+ doAnswer(invocation ->
zkMetadata).when(tableDataManager).fetchZKMetadata(SEGMENT_NAME);
+ doAnswer(invocation -> new
IndexLoadingConfig()).when(tableDataManager).fetchIndexLoadingConfig();
+
+ // Use CountDownLatch to wait for async execution
+ CountDownLatch latch = new CountDownLatch(1);
+
+ // Mock replaceSegmentIfCrcMismatch which is called by doReplaceSegment
+ doAnswer(invocation -> {
+ // Call the original replaceSegmentIfCrcMismatch method
+ invocation.callRealMethod();
+ latch.countDown();
+ return null;
+ }).when(tableDataManager).replaceSegmentIfCrcMismatch(
+ any(SegmentDataManager.class), any(SegmentZKMetadata.class),
any(IndexLoadingConfig.class));
+
+ // Call replaceSegment which will execute asynchronously
+ tableDataManager.replaceSegment(SEGMENT_NAME);
+
+ // Wait for async execution to complete
+ assertTrue(latch.await(10, TimeUnit.SECONDS), "Segment replacement should
complete");
+ assertTrue(dataDir.exists());
+ assertEquals(new SegmentMetadataImpl(dataDir).getTotalDocs(), 5);
+ }
+
+ @Test
+ public void testReplaceSegmentNewDataNewTierWithAsyncSegmentRefresh()
+ throws Exception {
+ SegmentZKMetadata zkMetadata = createRawSegment(SegmentVersion.v3, 5);
+ zkMetadata.setTier(TIER_NAME);
+
+ // Mock the case where segment is loaded but its CRC is different from
+ // the one in zk, thus raw segment is downloaded and loaded.
+ ImmutableSegmentDataManager segmentDataManager =
createImmutableSegmentDataManager(SEGMENT_NAME, 0);
+
+ // No dataDir for coolTier, thus stay on default tier.
+ BaseTableDataManager tableDataManager =
spy(createTableManagerWithAsyncSegmentRefreshEnabled());
+ File defaultDataDir = tableDataManager.getSegmentDataDir(SEGMENT_NAME);
+ assertFalse(defaultDataDir.exists());
+
+ // Add the segment to the manager's internal map so replaceSegment can
find it
+ tableDataManager._segmentDataManagerMap.put(SEGMENT_NAME,
segmentDataManager);
+
+ // Mock the methods that will be called during segment replacement
+ doAnswer(invocation ->
zkMetadata).when(tableDataManager).fetchZKMetadata(SEGMENT_NAME);
+ doAnswer(invocation -> createTierIndexLoadingConfig(DEFAULT_TABLE_CONFIG))
+ .when(tableDataManager).fetchIndexLoadingConfig();
+
+ // Use CountDownLatch to wait for async execution
+ CountDownLatch latch = new CountDownLatch(1);
+ doAnswer(invocation -> {
+ // Call the original method
+ invocation.callRealMethod();
+ latch.countDown();
+ return null;
+ }).when(tableDataManager).replaceSegmentIfCrcMismatch(
+ any(SegmentDataManager.class), any(SegmentZKMetadata.class),
any(IndexLoadingConfig.class));
+
+ // Call replaceSegment which will execute asynchronously
+ tableDataManager.replaceSegment(SEGMENT_NAME);
+
+ // Wait for async execution to complete
+ assertTrue(latch.await(10, TimeUnit.SECONDS), "Segment replacement should
complete");
+ assertTrue(defaultDataDir.exists());
+ assertEquals(new SegmentMetadataImpl(defaultDataDir).getTotalDocs(), 5);
+
+ // Configured dataDir for coolTier, thus move to new dir.
+ tableDataManager = spy(createTableManagerWithAsyncSegmentRefreshEnabled());
+
+ // Add the segment to the manager's internal map
+ tableDataManager._segmentDataManagerMap.put(SEGMENT_NAME,
segmentDataManager);
+
+ // Mock the methods for the second part of the test
+ doAnswer(invocation ->
zkMetadata).when(tableDataManager).fetchZKMetadata(SEGMENT_NAME);
+ doAnswer(invocation -> createTierIndexLoadingConfig(TIER_TABLE_CONFIG))
+ .when(tableDataManager).fetchIndexLoadingConfig();
+
+ CountDownLatch latch2 = new CountDownLatch(1);
+ doAnswer(invocation -> {
+ // Call the original method
+ invocation.callRealMethod();
+ latch2.countDown();
+ return null;
+ }).when(tableDataManager).replaceSegmentIfCrcMismatch(
+ any(SegmentDataManager.class), any(SegmentZKMetadata.class),
any(IndexLoadingConfig.class));
+
+ // Call replaceSegment which will execute asynchronously
+ tableDataManager.replaceSegment(SEGMENT_NAME);
+
+ // Wait for async execution to complete
+ assertTrue(latch2.await(10, TimeUnit.SECONDS), "Second segment replacement
should complete");
+
+ File tierDataDir = tableDataManager.getSegmentDataDir(SEGMENT_NAME,
TIER_NAME, TIER_TABLE_CONFIG);
+ assertTrue(tierDataDir.exists());
+ assertFalse(defaultDataDir.exists());
+ SegmentMetadata segmentMetadata = new SegmentMetadataImpl(tierDataDir);
+ assertEquals(segmentMetadata.getTotalDocs(), 5);
+ assertEquals(segmentMetadata.getIndexDir(), tierDataDir);
+ }
+
+ @Test
+ public void testReplaceSegmentNoopWithAsyncSegmentRefresh()
+ throws Exception {
+ String segmentName = "seg01";
+ SegmentZKMetadata zkMetadata = mock(SegmentZKMetadata.class);
+ when(zkMetadata.getSegmentName()).thenReturn(segmentName);
+ when(zkMetadata.getCrc()).thenReturn(1024L);
+
+ ImmutableSegmentDataManager segmentDataManager =
createImmutableSegmentDataManager(segmentName, 1024L);
+
+ BaseTableDataManager tableDataManager =
spy(createTableManagerWithAsyncSegmentRefreshEnabled());
+ assertFalse(tableDataManager.getSegmentDataDir(segmentName).exists());
+
+ // Add the segment to the manager's internal map so replaceSegment can
find it
+ tableDataManager._segmentDataManagerMap.put(segmentName,
segmentDataManager);
+
+ // Mock the methods that will be called during segment replacement
+ doAnswer(invocation ->
zkMetadata).when(tableDataManager).fetchZKMetadata(segmentName);
+ doAnswer(invocation -> new
IndexLoadingConfig()).when(tableDataManager).fetchIndexLoadingConfig();
+
+ // Use CountDownLatch to wait for async execution
+ CountDownLatch latch = new CountDownLatch(1);
+ doAnswer(invocation -> {
+ // Call the original method - this should be a no-op since CRCs match
+ invocation.callRealMethod();
+ latch.countDown();
+ return null;
+ }).when(tableDataManager).replaceSegmentIfCrcMismatch(
+ any(SegmentDataManager.class), any(SegmentZKMetadata.class),
any(IndexLoadingConfig.class));
+
+ // Call replaceSegment which will execute asynchronously
+ tableDataManager.replaceSegment(segmentName);
+
+ // Wait for async execution to complete
+ assertTrue(latch.await(10, TimeUnit.SECONDS), "Segment replacement should
complete");
+
+ // As CRC is same, the index dir is left as is, so not get created by the
test.
+ assertFalse(tableDataManager.getSegmentDataDir(segmentName).exists());
+ }
+
@Test
public void testAddNewSegmentUseLocalCopy()
throws Exception {
@@ -664,11 +826,24 @@ public class BaseTableDataManagerTest {
return createTableManager(createDefaultInstanceDataManagerConfig());
}
+ static OfflineTableDataManager
createTableManagerWithAsyncSegmentRefreshEnabled() {
+ return
createTableManagerWithAsyncSegmentRefreshEnabled(createDefaultInstanceDataManagerConfig());
+ }
+
private static OfflineTableDataManager
createTableManager(InstanceDataManagerConfig instanceDataManagerConfig) {
OfflineTableDataManager tableDataManager = new OfflineTableDataManager();
tableDataManager.init(instanceDataManagerConfig, mock(HelixManager.class),
new SegmentLocks(), DEFAULT_TABLE_CONFIG,
SCHEMA, new SegmentReloadSemaphore(1),
Executors.newSingleThreadExecutor(), null, null,
- SEGMENT_OPERATIONS_THROTTLER);
+ SEGMENT_OPERATIONS_THROTTLER, false);
+ return tableDataManager;
+ }
+
+ private static OfflineTableDataManager
createTableManagerWithAsyncSegmentRefreshEnabled(
+ InstanceDataManagerConfig instanceDataManagerConfig) {
+ OfflineTableDataManager tableDataManager = new OfflineTableDataManager();
+ tableDataManager.init(instanceDataManagerConfig, mock(HelixManager.class),
new SegmentLocks(), DEFAULT_TABLE_CONFIG,
+ SCHEMA, new SegmentReloadSemaphore(1),
Executors.newSingleThreadExecutor(), null, null,
+ SEGMENT_OPERATIONS_THROTTLER, true);
return tableDataManager;
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManagerTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManagerTest.java
index 144b02f96b3..66337386eac 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManagerTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManagerTest.java
@@ -177,7 +177,8 @@ public class DimensionTableDataManagerTest {
DimensionTableDataManager tableDataManager =
DimensionTableDataManager.createInstanceByTableName(OFFLINE_TABLE_NAME);
tableDataManager.init(instanceDataManagerConfig, helixManager, new
SegmentLocks(), tableConfig, schema,
- new SegmentReloadSemaphore(1), Executors.newSingleThreadExecutor(),
null, null, SEGMENT_OPERATIONS_THROTTLER);
+ new SegmentReloadSemaphore(1), Executors.newSingleThreadExecutor(),
null, null, SEGMENT_OPERATIONS_THROTTLER,
+ false);
tableDataManager.start();
return tableDataManager;
}
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingTableDataManagerProvider.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingTableDataManagerProvider.java
index dfa5cc32b43..e382e1ca2dc 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingTableDataManagerProvider.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingTableDataManagerProvider.java
@@ -73,10 +73,10 @@ public class FailureInjectingTableDataManagerProvider
implements TableDataManage
@Override
public TableDataManager getTableDataManager(TableConfig tableConfig, Schema
schema,
- SegmentReloadSemaphore segmentReloadSemaphore, ExecutorService
segmentReloadExecutor,
+ SegmentReloadSemaphore segmentReloadSemaphore, ExecutorService
segmentReloadRefreshExecutor,
@Nullable ExecutorService segmentPreloadExecutor,
@Nullable Cache<Pair<String, String>, SegmentErrorInfo> errorCache,
- Supplier<Boolean> isServerReadyToServeQueries) {
+ Supplier<Boolean> isServerReadyToServeQueries, boolean
enableAsyncSegmentRefresh) {
TableDataManager tableDataManager;
switch (tableConfig.getTableType()) {
case OFFLINE:
@@ -114,7 +114,8 @@ public class FailureInjectingTableDataManagerProvider
implements TableDataManage
throw new IllegalStateException();
}
tableDataManager.init(_instanceDataManagerConfig, _helixManager,
_segmentLocks, tableConfig, schema,
- segmentReloadSemaphore, segmentReloadExecutor, segmentPreloadExecutor,
errorCache, null);
+ segmentReloadSemaphore, segmentReloadRefreshExecutor,
segmentPreloadExecutor, errorCache, null,
+ enableAsyncSegmentRefresh);
return tableDataManager;
}
}
diff --git
a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkDimensionTableOverhead.java
b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkDimensionTableOverhead.java
index 0aff8f1415b..5d5f989d6a3 100644
---
a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkDimensionTableOverhead.java
+++
b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkDimensionTableOverhead.java
@@ -187,7 +187,8 @@ public class BenchmarkDimensionTableOverhead extends
BaseQueriesTest {
String tableName = TABLE_NAME + "_" + _iteration;
_tableDataManager =
DimensionTableDataManager.createInstanceByTableName(tableName);
_tableDataManager.init(instanceDataManagerConfig, helixManager, new
SegmentLocks(), tableConfig, SCHEMA,
- new SegmentReloadSemaphore(1), Executors.newSingleThreadExecutor(),
null, null, SEGMENT_OPERATIONS_THROTTLER);
+ new SegmentReloadSemaphore(1), Executors.newSingleThreadExecutor(),
null, null, SEGMENT_OPERATIONS_THROTTLER,
+ false);
_tableDataManager.start();
for (int i = 0; i < _indexSegments.size(); i++) {
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
index c886a1f3085..96fd4827476 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
@@ -54,9 +54,9 @@ public interface TableDataManager {
*/
void init(InstanceDataManagerConfig instanceDataManagerConfig, HelixManager
helixManager, SegmentLocks segmentLocks,
TableConfig tableConfig, Schema schema, SegmentReloadSemaphore
segmentReloadSemaphore,
- ExecutorService segmentReloadExecutor, @Nullable ExecutorService
segmentPreloadExecutor,
+ ExecutorService segmentReloadRefreshExecutor, @Nullable ExecutorService
segmentPreloadExecutor,
@Nullable Cache<Pair<String, String>, SegmentErrorInfo> errorCache,
- @Nullable SegmentOperationsThrottler segmentOperationsThrottler);
+ @Nullable SegmentOperationsThrottler segmentOperationsThrottler, boolean
enableAsyncSegmentRefresh);
/**
* Returns the instance id of the server.
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentReloadSemaphore.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentReloadSemaphore.java
index 16aaca6b419..cf8b0e05326 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentReloadSemaphore.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentReloadSemaphore.java
@@ -35,9 +35,9 @@ public class SegmentReloadSemaphore {
public void acquire(String segmentName, Logger logger)
throws InterruptedException {
long startTimeMs = System.currentTimeMillis();
- logger.info("Waiting for lock to reload: {}, queue-length: {}",
segmentName, _semaphore.getQueueLength());
+ logger.info("Waiting for lock to reload/refresh: {}, queue-length: {}",
segmentName, _semaphore.getQueueLength());
_semaphore.acquire();
- logger.info("Acquired lock to reload segment: {} (lock-time={}ms,
queue-length={})", segmentName,
+ logger.info("Acquired lock to reload/refresh segment: {} (lock-time={}ms,
queue-length={})", segmentName,
System.currentTimeMillis() - startTimeMs, _semaphore.getQueueLength());
}
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
index 3baee22a89d..d1e26a02de5 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
@@ -109,7 +109,9 @@ public class HelixInstanceDataManager implements
InstanceDataManager {
protected Cache<String, Long> _recentlyDeletedTables;
private SegmentReloadSemaphore _segmentReloadSemaphore;
- private ExecutorService _segmentReloadExecutor;
+ private ExecutorService _segmentReloadRefreshExecutor;
+
+ private boolean _enableAsyncSegmentRefresh;
@Nullable
private ExecutorService _segmentPreloadExecutor;
@@ -149,9 +151,9 @@ public class HelixInstanceDataManager implements
InstanceDataManager {
Preconditions.checkArgument(maxParallelRefreshThreads > 0,
"'pinot.server.instance.max.parallel.refresh.threads' must be
positive, got: " + maxParallelRefreshThreads);
_segmentReloadSemaphore = new
SegmentReloadSemaphore(maxParallelRefreshThreads);
- _segmentReloadExecutor =
Executors.newFixedThreadPool(maxParallelRefreshThreads,
- new
ThreadFactoryBuilder().setNameFormat("segment-reload-thread-%d").build());
- LOGGER.info("Created SegmentReloadExecutor with pool size: {}",
maxParallelRefreshThreads);
+ _segmentReloadRefreshExecutor =
Executors.newFixedThreadPool(maxParallelRefreshThreads,
+ new
ThreadFactoryBuilder().setNameFormat("segment-reload-refresh-thread-%d").build());
+ LOGGER.info("Created SegmentReloadRefreshExecutor with pool size: {}",
maxParallelRefreshThreads);
int maxSegmentPreloadThreads =
_instanceDataManagerConfig.getMaxSegmentPreloadThreads();
if (maxSegmentPreloadThreads > 0) {
_segmentPreloadExecutor =
Executors.newFixedThreadPool(maxSegmentPreloadThreads,
@@ -160,6 +162,8 @@ public class HelixInstanceDataManager implements
InstanceDataManager {
} else {
LOGGER.info("SegmentPreloadExecutor was not created with pool size: {}",
maxSegmentPreloadThreads);
}
+ _enableAsyncSegmentRefresh = isAsyncSegmentRefreshEnabled();
+ LOGGER.info("Segment refresh asynchronous handling is {}",
_enableAsyncSegmentRefresh ? "enabled" : "disabled");
LOGGER.info("Initialized Helix instance data manager");
// Initialize the error cache and recently deleted tables cache
@@ -241,7 +245,7 @@ public class HelixInstanceDataManager implements
InstanceDataManager {
@Override
public synchronized void shutDown() {
- _segmentReloadExecutor.shutdownNow();
+ _segmentReloadRefreshExecutor.shutdownNow();
if (_segmentPreloadExecutor != null) {
_segmentPreloadExecutor.shutdownNow();
}
@@ -328,7 +332,8 @@ public class HelixInstanceDataManager implements
InstanceDataManager {
TimestampIndexUtils.applyTimestampIndex(tableConfig, schema);
TableDataManager tableDataManager =
_tableDataManagerProvider.getTableDataManager(tableConfig, schema,
_segmentReloadSemaphore,
- _segmentReloadExecutor, _segmentPreloadExecutor, _errorCache,
_isServerReadyToServeQueries);
+ _segmentReloadRefreshExecutor, _segmentPreloadExecutor,
_errorCache, _isServerReadyToServeQueries,
+ _enableAsyncSegmentRefresh);
tableDataManager.start();
LOGGER.info("Created table data manager for table: {}", tableNameWithType);
return tableDataManager;
@@ -512,6 +517,11 @@ public class HelixInstanceDataManager implements
InstanceDataManager {
return _instanceDataManagerConfig.getMaxParallelRefreshThreads();
}
+ @Override
+ public boolean isAsyncSegmentRefreshEnabled() {
+ return _instanceDataManagerConfig.isAsyncSegmentRefreshEnabled();
+ }
+
@Override
public ZkHelixPropertyStore<ZNRecord> getPropertyStore() {
return _propertyStore;
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java
index cb24365d94d..89fbb644831 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java
@@ -100,6 +100,11 @@ public class HelixInstanceDataManagerConfig implements
InstanceDataManagerConfig
//
public static final String MAX_PARALLEL_REFRESH_THREADS =
"max.parallel.refresh.threads";
+ // Whether to process SEGMENT_REFRESH in a synchronous or asynchronous
manner when the messaged is received.
+ // Defaults to false, meaning SEGMENT_REFRESH will be processed in a
synchronous manner.
+ public static final String ENABLE_ASYNC_SEGMENT_REFRESH =
"enable.async.segment.refresh";
+ private static final boolean DEFAULT_ENABLE_ASYNC_SEGMENT_REFRESH = false;
+
// To preload segments of table using upsert in parallel for fast upsert
metadata recovery.
private static final String MAX_SEGMENT_PRELOAD_THREADS =
"max.segment.preload.threads";
@@ -252,6 +257,11 @@ public class HelixInstanceDataManagerConfig implements
InstanceDataManagerConfig
return _serverConfig.getProperty(MAX_PARALLEL_REFRESH_THREADS, 1);
}
+ @Override
+ public boolean isAsyncSegmentRefreshEnabled() {
+ return _serverConfig.getProperty(ENABLE_ASYNC_SEGMENT_REFRESH,
DEFAULT_ENABLE_ASYNC_SEGMENT_REFRESH);
+ }
+
@Override
public int getMaxSegmentPreloadThreads() {
return _serverConfig.getProperty(MAX_SEGMENT_PRELOAD_THREADS, 0);
diff --git
a/pinot-server/src/test/java/org/apache/pinot/server/api/BaseResourceTest.java
b/pinot-server/src/test/java/org/apache/pinot/server/api/BaseResourceTest.java
index b3c194954f7..e114119f005 100644
---
a/pinot-server/src/test/java/org/apache/pinot/server/api/BaseResourceTest.java
+++
b/pinot-server/src/test/java/org/apache/pinot/server/api/BaseResourceTest.java
@@ -226,7 +226,7 @@ public abstract class BaseResourceTest {
// more checks
TableDataManager tableDataManager = new OfflineTableDataManager();
tableDataManager.init(instanceDataManagerConfig, helixManager, new
SegmentLocks(), tableConfig, schema,
- new SegmentReloadSemaphore(1), Executors.newSingleThreadExecutor(),
null, null, null);
+ new SegmentReloadSemaphore(1), Executors.newSingleThreadExecutor(),
null, null, null, false);
tableDataManager.start();
_tableDataManagerMap.put(tableNameWithType, tableDataManager);
}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/instance/InstanceDataManagerConfig.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/instance/InstanceDataManagerConfig.java
index fcd7c6b61de..2aa61d7a0fc 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/instance/InstanceDataManagerConfig.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/instance/InstanceDataManagerConfig.java
@@ -55,6 +55,8 @@ public interface InstanceDataManagerConfig {
int getMaxParallelRefreshThreads();
+ boolean isAsyncSegmentRefreshEnabled();
+
int getMaxSegmentPreloadThreads();
int getMaxParallelSegmentBuilds();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]