This is an automated email from the ASF dual-hosted git repository.

somandal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new ce33d0d9184 Add support to process segment refresh messages 
asynchronously if enabled (#16931)
ce33d0d9184 is described below

commit ce33d0d918485e3aae6b42ee72c428d02672409e
Author: Sonam Mandal <[email protected]>
AuthorDate: Thu Oct 9 13:54:43 2025 -0700

    Add support to process segment refresh messages asynchronously if enabled 
(#16931)
    
    * Add support to process segment refresh messages asynchronously if enabled
    
    * Add tests
    
    * Address review comments
    
    * Rename executor to ReloadRefresh
    
    * Log exception on replace failure during enqueue
---
 .../core/data/manager/BaseTableDataManager.java    |  42 ++-
 .../core/data/manager/InstanceDataManager.java     |   5 +
 .../provider/DefaultTableDataManagerProvider.java  |   7 +-
 .../manager/provider/TableDataManagerProvider.java |   6 +-
 .../BaseTableDataManagerAcquireSegmentTest.java    |   3 +-
 ...ableDataManagerEnqueueSegmentToReplaceTest.java | 336 +++++++++++++++++++++
 .../data/manager/BaseTableDataManagerTest.java     | 177 ++++++++++-
 .../offline/DimensionTableDataManagerTest.java     |   3 +-
 .../FailureInjectingTableDataManagerProvider.java  |   7 +-
 .../perf/BenchmarkDimensionTableOverhead.java      |   3 +-
 .../local/data/manager/TableDataManager.java       |   4 +-
 .../local/utils/SegmentReloadSemaphore.java        |   4 +-
 .../starter/helix/HelixInstanceDataManager.java    |  22 +-
 .../helix/HelixInstanceDataManagerConfig.java      |  10 +
 .../apache/pinot/server/api/BaseResourceTest.java  |   2 +-
 .../config/instance/InstanceDataManagerConfig.java |   2 +
 16 files changed, 604 insertions(+), 29 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
index 0abd9bda1be..0536f9d6f30 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
@@ -129,7 +129,7 @@ public abstract class BaseTableDataManager implements 
TableDataManager {
   protected File _resourceTmpDir;
   protected Logger _logger;
   protected SegmentReloadSemaphore _segmentReloadSemaphore;
-  protected ExecutorService _segmentReloadExecutor;
+  protected ExecutorService _segmentReloadRefreshExecutor;
   @Nullable
   protected ExecutorService _segmentPreloadExecutor;
   protected AuthProvider _authProvider;
@@ -156,12 +156,14 @@ public abstract class BaseTableDataManager implements 
TableDataManager {
   protected volatile boolean _shutDown;
   protected volatile boolean _isDeleted;
 
+  protected boolean _enableAsyncSegmentRefresh;
+
   @Override
   public void init(InstanceDataManagerConfig instanceDataManagerConfig, 
HelixManager helixManager,
       SegmentLocks segmentLocks, TableConfig tableConfig, Schema schema, 
SegmentReloadSemaphore segmentReloadSemaphore,
-      ExecutorService segmentReloadExecutor, @Nullable ExecutorService 
segmentPreloadExecutor,
+      ExecutorService segmentReloadRefreshExecutor, @Nullable ExecutorService 
segmentPreloadExecutor,
       @Nullable Cache<Pair<String, String>, SegmentErrorInfo> errorCache,
-      @Nullable SegmentOperationsThrottler segmentOperationsThrottler) {
+      @Nullable SegmentOperationsThrottler segmentOperationsThrottler, boolean 
enableAsyncSegmentRefresh) {
     LOGGER.info("Initializing table data manager for table: {}", 
tableConfig.getTableName());
 
     _instanceDataManagerConfig = instanceDataManagerConfig;
@@ -170,8 +172,9 @@ public abstract class BaseTableDataManager implements 
TableDataManager {
     _propertyStore = helixManager.getHelixPropertyStore();
     _segmentLocks = segmentLocks;
     _segmentReloadSemaphore = segmentReloadSemaphore;
-    _segmentReloadExecutor = segmentReloadExecutor;
+    _segmentReloadRefreshExecutor = segmentReloadRefreshExecutor;
     _segmentPreloadExecutor = segmentPreloadExecutor;
+    _enableAsyncSegmentRefresh = enableAsyncSegmentRefresh;
     _authProvider = 
AuthProviderUtils.extractAuthProvider(instanceDataManagerConfig.getAuthConfig(),
 null);
 
     _tableNameWithType = tableConfig.getTableName();
@@ -230,8 +233,11 @@ public abstract class BaseTableDataManager implements 
TableDataManager {
     } else {
       _segmentDownloadSemaphore = null;
     }
+
     _logger = LoggerFactory.getLogger(_tableNameWithType + "-" + 
getClass().getSimpleName());
 
+    _logger.info("Async segment refresh is {}!", _enableAsyncSegmentRefresh ? 
"enabled" : "disabled");
+
     doInit();
 
     _logger.info("Initialized table data manager with data directory: {}", 
_tableDataDir);
@@ -451,6 +457,32 @@ public abstract class BaseTableDataManager implements 
TableDataManager {
   @Override
   public void replaceSegment(String segmentName)
       throws Exception {
+    if (_enableAsyncSegmentRefresh) {
+      enqueueSegmentToReplace(segmentName);
+    } else {
+      replaceSegmentInternal(segmentName);
+    }
+  }
+
+  protected void enqueueSegmentToReplace(String segmentName) {
+    assert _enableAsyncSegmentRefresh;
+    if (_shutDown) {
+      _logger.warn("Shutdown in progress, skip enqueuing segment: {} to 
replace", segmentName);
+      return;
+    }
+    _logger.info("Enqueuing segment: {} to be replaced", segmentName);
+    _segmentReloadRefreshExecutor.submit(() -> {
+      try {
+        replaceSegmentInternal(segmentName);
+      } catch (Exception e) {
+        LOGGER.error("Caught exception while replacing segment: {}", 
segmentName, e);
+        _serverMetrics.addMeteredTableValue(_tableNameWithType, 
ServerMeter.REFRESH_FAILURES, 1);
+      }
+    });
+  }
+
+  protected void replaceSegmentInternal(String segmentName)
+      throws Exception {
     Preconditions.checkState(!_shutDown,
         "Table data manager is already shut down, cannot replace segment: %s 
in table: %s", segmentName,
         _tableNameWithType);
@@ -760,7 +792,7 @@ public abstract class BaseTableDataManager implements 
TableDataManager {
         failedSegments.add(segmentName);
         sampleException.set(t);
       }
-    }, _segmentReloadExecutor)).toArray(CompletableFuture[]::new)).get();
+    }, 
_segmentReloadRefreshExecutor)).toArray(CompletableFuture[]::new)).get();
     if (sampleException.get() != null) {
       throw new RuntimeException(
           String.format("Failed to reload %d/%d segments: %s in table: %s", 
failedSegments.size(),
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java
index 15ef79f465a..ba5f93f5e6a 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java
@@ -169,6 +169,11 @@ public interface InstanceDataManager {
    */
   int getMaxParallelRefreshThreads();
 
+  /**
+   * Returns true if background processing for SEGMENT_REFRESH is enabled, 
false otherwise
+   */
+  boolean isAsyncSegmentRefreshEnabled();
+
   /**
    * Returns the Helix property store.
    */
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/DefaultTableDataManagerProvider.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/DefaultTableDataManagerProvider.java
index b750016f74a..ba8ada89160 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/DefaultTableDataManagerProvider.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/DefaultTableDataManagerProvider.java
@@ -66,10 +66,10 @@ public class DefaultTableDataManagerProvider implements 
TableDataManagerProvider
 
   @Override
   public TableDataManager getTableDataManager(TableConfig tableConfig, Schema 
schema,
-      SegmentReloadSemaphore segmentReloadSemaphore, ExecutorService 
segmentReloadExecutor,
+      SegmentReloadSemaphore segmentReloadSemaphore, ExecutorService 
segmentReloadRefreshExecutor,
       @Nullable ExecutorService segmentPreloadExecutor,
       @Nullable Cache<Pair<String, String>, SegmentErrorInfo> errorCache,
-      Supplier<Boolean> isServerReadyToServeQueries) {
+      Supplier<Boolean> isServerReadyToServeQueries, boolean 
enableAsyncSegmentRefresh) {
     TableDataManager tableDataManager;
     switch (tableConfig.getTableType()) {
       case OFFLINE:
@@ -93,7 +93,8 @@ public class DefaultTableDataManagerProvider implements 
TableDataManagerProvider
         throw new IllegalStateException();
     }
     tableDataManager.init(_instanceDataManagerConfig, _helixManager, 
_segmentLocks, tableConfig, schema,
-        segmentReloadSemaphore, segmentReloadExecutor, segmentPreloadExecutor, 
errorCache, _segmentOperationsThrottler);
+        segmentReloadSemaphore, segmentReloadRefreshExecutor, 
segmentPreloadExecutor, errorCache,
+        _segmentOperationsThrottler, enableAsyncSegmentRefresh);
     return tableDataManager;
   }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/TableDataManagerProvider.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/TableDataManagerProvider.java
index 63bfed0b0e6..1c558aaba6e 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/TableDataManagerProvider.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/TableDataManagerProvider.java
@@ -47,14 +47,14 @@ public interface TableDataManagerProvider {
       @Nullable SegmentOperationsThrottler segmentOperationsThrottler);
 
   TableDataManager getTableDataManager(TableConfig tableConfig, Schema schema,
-      SegmentReloadSemaphore segmentRefreshSemaphore, ExecutorService 
segmentRefreshExecutor,
+      SegmentReloadSemaphore segmentRefreshSemaphore, ExecutorService 
segmentReloadRefreshExecutor,
       @Nullable ExecutorService segmentPreloadExecutor,
       @Nullable Cache<Pair<String, String>, SegmentErrorInfo> errorCache,
-      Supplier<Boolean> isServerReadyToServeQueries);
+      Supplier<Boolean> isServerReadyToServeQueries, boolean 
enableAsyncSegmentRefresh);
 
   @VisibleForTesting
   default TableDataManager getTableDataManager(TableConfig tableConfig, Schema 
schema) {
     return getTableDataManager(tableConfig, schema, new 
SegmentReloadSemaphore(1), Executors.newSingleThreadExecutor(),
-        null, null, () -> true);
+        null, null, () -> true, false);
   }
 }
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerAcquireSegmentTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerAcquireSegmentTest.java
index 16cceb3ebe6..4a953236f5e 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerAcquireSegmentTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerAcquireSegmentTest.java
@@ -134,7 +134,8 @@ public class BaseTableDataManagerAcquireSegmentTest {
             new SegmentMultiColTextIndexPreprocessThrottler(4, 8, true));
     TableDataManager tableDataManager = new OfflineTableDataManager();
     tableDataManager.init(instanceDataManagerConfig, mock(HelixManager.class), 
new SegmentLocks(), tableConfig, schema,
-        new SegmentReloadSemaphore(1), Executors.newSingleThreadExecutor(), 
null, null, segmentOperationsThrottler);
+        new SegmentReloadSemaphore(1), Executors.newSingleThreadExecutor(), 
null, null, segmentOperationsThrottler,
+        false);
     tableDataManager.start();
     Field segsMapField = 
BaseTableDataManager.class.getDeclaredField("_segmentDataManagerMap");
     segsMapField.setAccessible(true);
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerEnqueueSegmentToReplaceTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerEnqueueSegmentToReplaceTest.java
new file mode 100644
index 00000000000..fd9318f3ff0
--- /dev/null
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerEnqueueSegmentToReplaceTest.java
@@ -0,0 +1,336 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.data.manager;
+
+import java.io.File;
+import java.lang.reflect.Field;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.commons.io.FileUtils;
+import org.apache.helix.HelixManager;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.common.metrics.ServerMeter;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager;
+import org.apache.pinot.core.data.manager.offline.OfflineTableDataManager;
+import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
+import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
+import org.apache.pinot.segment.local.utils.SegmentLocks;
+import org.apache.pinot.segment.local.utils.SegmentReloadSemaphore;
+import org.apache.pinot.segment.spi.ImmutableSegment;
+import org.apache.pinot.segment.spi.SegmentMetadata;
+import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.apache.pinot.util.TestUtils;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+
+
+/**
+ * Tests for async segment refresh enabled
+ */
+public class BaseTableDataManagerEnqueueSegmentToReplaceTest {
+  private static final File TEMP_DIR =
+      new File(FileUtils.getTempDirectory(), 
"BaseTableDataManagerEnqueueSegmentToReplaceTest");
+  private static final String RAW_TABLE_NAME = "testTable";
+  private static final String SEGMENT_NAME = "testSegment";
+
+  private static final TableConfig TABLE_CONFIG =
+      new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).build();
+  private static final Schema SCHEMA = new 
Schema.SchemaBuilder().setSchemaName(RAW_TABLE_NAME).build();
+
+  private OfflineTableDataManager _tableDataManager;
+  private ExecutorService _segmentReloadRefreshExecutor;
+  private InstanceDataManagerConfig _instanceDataManagerConfig;
+  private HelixManager _helixManager;
+  private ServerMetrics _serverMetrics;
+
+  @BeforeClass
+  public void setUp() {
+    ServerMetrics.register(mock(ServerMetrics.class));
+  }
+
+  @BeforeMethod
+  public void setUpMethod() throws Exception {
+    TestUtils.ensureDirectoriesExistAndEmpty(TEMP_DIR);
+
+    // Create a real ExecutorService for testing
+    _segmentReloadRefreshExecutor = Executors.newSingleThreadExecutor();
+
+    // Mock dependencies
+    _instanceDataManagerConfig = mock(InstanceDataManagerConfig.class);
+    
when(_instanceDataManagerConfig.getInstanceDataDir()).thenReturn(TEMP_DIR.getAbsolutePath());
+    
when(_instanceDataManagerConfig.getInstanceId()).thenReturn("testInstance");
+
+    _helixManager = mock(HelixManager.class);
+
+    @SuppressWarnings("unchecked")
+    ZkHelixPropertyStore<ZNRecord> propertyStore = 
mock(ZkHelixPropertyStore.class);
+    when(_helixManager.getHelixPropertyStore()).thenReturn(propertyStore);
+
+    // Mock ServerMetrics for metric validation
+    _serverMetrics = mock(ServerMetrics.class);
+
+    // Create table data manager with segment refresh executor
+    _tableDataManager = spy(new OfflineTableDataManager());
+    _tableDataManager.init(_instanceDataManagerConfig, _helixManager, new 
SegmentLocks(), TABLE_CONFIG,
+        SCHEMA, new SegmentReloadSemaphore(1), _segmentReloadRefreshExecutor, 
null, null, null, true);
+
+    // Inject the mocked ServerMetrics into the table data manager using 
reflection
+    try {
+      Field serverMetricsField = 
BaseTableDataManager.class.getDeclaredField("_serverMetrics");
+      serverMetricsField.setAccessible(true);
+      serverMetricsField.set(_tableDataManager, _serverMetrics);
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to inject mocked ServerMetrics", e);
+    }
+  }
+
+  @AfterMethod
+  public void tearDownMethod() throws Exception {
+    if (_segmentReloadRefreshExecutor != null && 
!_segmentReloadRefreshExecutor.isShutdown()) {
+      _segmentReloadRefreshExecutor.shutdownNow();
+      _segmentReloadRefreshExecutor.awaitTermination(5, TimeUnit.SECONDS);
+    }
+    if (_tableDataManager != null) {
+      _tableDataManager.shutDown();
+    }
+    FileUtils.deleteDirectory(TEMP_DIR);
+  }
+
+  /**
+   * Test successful enqueueing and execution of segment replacement
+   */
+  @Test
+  public void testEnqueueSegmentToReplaceSuccess() throws Exception {
+    // Setup mocks for successful segment replacement
+    SegmentDataManager mockSegmentDataManager = 
mock(ImmutableSegmentDataManager.class);
+    when(mockSegmentDataManager.getSegmentName()).thenReturn(SEGMENT_NAME);
+
+    ImmutableSegment mockSegment = mock(ImmutableSegment.class);
+    when(mockSegmentDataManager.getSegment()).thenReturn(mockSegment);
+
+    SegmentMetadata mockMetadata = mock(SegmentMetadata.class);
+    when(mockSegment.getSegmentMetadata()).thenReturn(mockMetadata);
+    when(mockMetadata.getCrc()).thenReturn("12345");
+
+    SegmentZKMetadata mockZkMetadata = mock(SegmentZKMetadata.class);
+    when(mockZkMetadata.getSegmentName()).thenReturn(SEGMENT_NAME);
+    when(mockZkMetadata.getCrc()).thenReturn(12345L);
+
+    // Add the segment to the manager's internal map
+    _tableDataManager._segmentDataManagerMap.put(SEGMENT_NAME, 
mockSegmentDataManager);
+
+    // Mock the methods that will be called during segment replacement
+    doAnswer(invocation -> 
mockZkMetadata).when(_tableDataManager).fetchZKMetadata(SEGMENT_NAME);
+    doAnswer(invocation -> new 
IndexLoadingConfig()).when(_tableDataManager).fetchIndexLoadingConfig();
+
+    // Use CountDownLatch to wait for async execution
+    CountDownLatch latch = new CountDownLatch(1);
+    AtomicBoolean replaceSegmentIfCrcMismatchCalled = new AtomicBoolean(false);
+
+    doAnswer(invocation -> {
+      replaceSegmentIfCrcMismatchCalled.set(true);
+      latch.countDown();
+      return null;
+    }).when(_tableDataManager).replaceSegmentIfCrcMismatch(any(), any(), 
any());
+
+    // Test the method
+    _tableDataManager.replaceSegment(SEGMENT_NAME);
+
+    // Wait for async execution to complete
+    assertTrue(latch.await(10, TimeUnit.SECONDS), "Segment replacement with 
CRC mismatch should be executed");
+    assertTrue(replaceSegmentIfCrcMismatchCalled.get(), 
"replaceSegmentIfCrcMismatch should have been called");
+
+    // Verify that the segment replacement was attempted
+    verify(_tableDataManager, times(1)).enqueueSegmentToReplace(SEGMENT_NAME);
+    verify(_tableDataManager, times(1)).fetchZKMetadata(SEGMENT_NAME);
+    verify(_tableDataManager, times(1)).fetchIndexLoadingConfig();
+  }
+
+  /**
+   * Test behavior when table manager is shut down
+   */
+  @Test
+  public void testEnqueueSegmentToReplaceWhenShutDown() throws Exception {
+    // Shutdown the table data manager
+    _tableDataManager.shutDown();
+
+    // Mock replaceSegment to track if it's called
+    doAnswer(invocation -> {
+      throw new AssertionError("replaceSegment should not be called when shut 
down");
+    }).when(_tableDataManager).replaceSegmentInternal(anyString());
+
+    // Test the method - it should return early without enqueueing
+    _tableDataManager.replaceSegment(SEGMENT_NAME);
+
+    // Give some time for any potential async execution
+    Thread.sleep(1000);
+
+    // Verify that replaceSegment was never called
+    verify(_tableDataManager, never()).replaceSegmentInternal(SEGMENT_NAME);
+    verify(_tableDataManager, never()).replaceSegmentIfCrcMismatch(any(), 
any(), any());
+  }
+
+  /**
+   * Test exception handling during segment replacement and metric increment
+   */
+  @Test
+  public void testEnqueueSegmentToReplaceWithException() throws Exception {
+    // Setup mocks to throw exception during replacement
+    RuntimeException testException = new RuntimeException("Test exception 
during replacement");
+    
doThrow(testException).when(_tableDataManager).replaceSegmentInternal(SEGMENT_NAME);
+
+    // Use CountDownLatch to wait for async execution
+    CountDownLatch latch = new CountDownLatch(1);
+    AtomicReference<Exception> caughtException = new AtomicReference<>();
+
+    // Override the executor to capture exceptions
+    ExecutorService spyExecutor = spy(_segmentReloadRefreshExecutor);
+    doAnswer(invocation -> {
+      Runnable task = invocation.getArgument(0);
+      try {
+        task.run();
+      } catch (Exception e) {
+        caughtException.set(e);
+      } finally {
+        latch.countDown();
+      }
+      return null;
+    }).when(spyExecutor).submit(any(Runnable.class));
+
+    // Replace the executor in the table data manager
+    _tableDataManager._segmentReloadRefreshExecutor = spyExecutor;
+
+    // Test the method
+    _tableDataManager.replaceSegment(SEGMENT_NAME);
+
+    // Wait for async execution to complete
+    assertTrue(latch.await(10, TimeUnit.SECONDS), "Task should complete even 
with exception");
+
+    // Verify that replaceSegment was called and exception was handled
+    verify(_tableDataManager, times(1)).enqueueSegmentToReplace(SEGMENT_NAME);
+    verify(_tableDataManager, times(1)).replaceSegmentInternal(SEGMENT_NAME);
+
+    // Verify that the REFRESH_FAILURES metric was incremented
+    verify(_serverMetrics, times(1)).addMeteredTableValue(
+        eq(RAW_TABLE_NAME + "_OFFLINE"), eq(ServerMeter.REFRESH_FAILURES), 
eq(1L));
+
+    // The exception should be caught and handled by the implementation, not 
propagated
+    assertNull(caughtException.get());
+  }
+
+  /**
+   * Test that executor is properly used for async execution
+   */
+  @Test
+  public void testExecutorUsage() throws Exception {
+    ExecutorService mockExecutor = mock(ExecutorService.class);
+    _tableDataManager._segmentReloadRefreshExecutor = mockExecutor;
+
+    // Test the method
+    _tableDataManager.replaceSegment(SEGMENT_NAME);
+
+    // Verify that the executor's submit method was called
+    verify(mockExecutor, times(1)).submit(any(Runnable.class));
+  }
+
+  /**
+   * Test with null segment refresh executor should trigger assertion error
+   */
+  @Test(expectedExceptions = AssertionError.class)
+  public void testEnqueueSegmentToReplaceWithAsyncSegmentRefreshDisabled() {
+    // Create a table data manager without segment refresh executor
+    OfflineTableDataManager tableDataManagerWithoutExecutor = new 
OfflineTableDataManager();
+    tableDataManagerWithoutExecutor.init(_instanceDataManagerConfig, 
_helixManager, new SegmentLocks(),
+        TABLE_CONFIG, SCHEMA, new SegmentReloadSemaphore(1), 
Executors.newSingleThreadExecutor(),
+        null, null, null, false);
+
+    // This should trigger the assertion error
+    tableDataManagerWithoutExecutor.enqueueSegmentToReplace(SEGMENT_NAME);
+  }
+
+  /**
+   * Test multiple concurrent enqueue operations
+   */
+  @Test
+  public void testConcurrentEnqueueOperations() throws Exception {
+    int numConcurrentOperations = 5;
+    CountDownLatch startLatch = new CountDownLatch(1);
+    CountDownLatch completionLatch = new 
CountDownLatch(numConcurrentOperations);
+
+    // Mock replaceSegment to simulate some work and track calls
+    doAnswer(invocation -> {
+      Thread.sleep(50); // Simulate some work
+      completionLatch.countDown();
+      return null;
+    }).when(_tableDataManager).replaceSegmentInternal(anyString());
+
+    // Start multiple threads to enqueue operations
+    for (int i = 0; i < numConcurrentOperations; i++) {
+      final String segmentName = SEGMENT_NAME + "_" + i;
+      new Thread(() -> {
+        try {
+          startLatch.await();
+          _tableDataManager.replaceSegment(segmentName);
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+        } catch (Exception e) {
+          throw new RuntimeException(e);
+        }
+      }).start();
+    }
+
+    // Start all operations
+    startLatch.countDown();
+
+    // Wait for all operations to complete
+    assertTrue(completionLatch.await(10, TimeUnit.SECONDS), "All concurrent 
operations should complete");
+
+    // Verify that replaceSegment was called for each segment
+    for (int i = 0; i < numConcurrentOperations; i++) {
+      verify(_tableDataManager, times(1)).replaceSegment(SEGMENT_NAME + "_" + 
i);
+    }
+  }
+}
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
index 2064d393129..74c59b3a2cf 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
@@ -27,7 +27,9 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 import org.apache.commons.io.FileUtils;
 import org.apache.helix.HelixManager;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
@@ -38,6 +40,7 @@ import 
org.apache.pinot.common.utils.fetcher.BaseSegmentFetcher;
 import org.apache.pinot.common.utils.fetcher.SegmentFetcherFactory;
 import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager;
 import org.apache.pinot.core.data.manager.offline.OfflineTableDataManager;
+import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
 import 
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
 import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
 import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
@@ -75,7 +78,10 @@ import org.testng.annotations.BeforeClass;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.*;
 
@@ -394,6 +400,162 @@ public class BaseTableDataManagerTest {
     assertFalse(tableDataManager.getSegmentDataDir(segmentName).exists());
   }
 
+  @Test
+  public void testReplaceSegmentNewDataWithAsyncSegmentRefresh()
+      throws Exception {
+    SegmentZKMetadata zkMetadata = createRawSegment(SegmentVersion.v3, 5);
+
+    // Mock the case where segment is loaded but its CRC is different from
+    // the one in zk, thus raw segment is downloaded and loaded.
+    ImmutableSegmentDataManager segmentDataManager = 
createImmutableSegmentDataManager(SEGMENT_NAME, 0);
+
+    BaseTableDataManager tableDataManager = 
spy(createTableManagerWithAsyncSegmentRefreshEnabled());
+    File dataDir = tableDataManager.getSegmentDataDir(SEGMENT_NAME);
+    assertFalse(dataDir.exists());
+
+    // Add the segment to the manager's internal map so replaceSegment can 
find it
+    tableDataManager._segmentDataManagerMap.put(SEGMENT_NAME, 
segmentDataManager);
+
+    // Mock the methods that will be called during segment replacement
+    doAnswer(invocation -> 
zkMetadata).when(tableDataManager).fetchZKMetadata(SEGMENT_NAME);
+    doAnswer(invocation -> new 
IndexLoadingConfig()).when(tableDataManager).fetchIndexLoadingConfig();
+
+    // Use CountDownLatch to wait for async execution
+    CountDownLatch latch = new CountDownLatch(1);
+
+    // Mock replaceSegmentIfCrcMismatch which is called by doReplaceSegment
+    doAnswer(invocation -> {
+      // Call the original replaceSegmentIfCrcMismatch method
+      invocation.callRealMethod();
+      latch.countDown();
+      return null;
+    }).when(tableDataManager).replaceSegmentIfCrcMismatch(
+        any(SegmentDataManager.class), any(SegmentZKMetadata.class), 
any(IndexLoadingConfig.class));
+
+    // Call replaceSegment which will execute asynchronously
+    tableDataManager.replaceSegment(SEGMENT_NAME);
+
+    // Wait for async execution to complete
+    assertTrue(latch.await(10, TimeUnit.SECONDS), "Segment replacement should 
complete");
+    assertTrue(dataDir.exists());
+    assertEquals(new SegmentMetadataImpl(dataDir).getTotalDocs(), 5);
+  }
+
+  @Test
+  public void testReplaceSegmentNewDataNewTierWithAsyncSegmentRefresh()
+      throws Exception {
+    SegmentZKMetadata zkMetadata = createRawSegment(SegmentVersion.v3, 5);
+    zkMetadata.setTier(TIER_NAME);
+
+    // Mock the case where segment is loaded but its CRC is different from
+    // the one in zk, thus raw segment is downloaded and loaded.
+    ImmutableSegmentDataManager segmentDataManager = 
createImmutableSegmentDataManager(SEGMENT_NAME, 0);
+
+    // No dataDir for coolTier, thus stay on default tier.
+    BaseTableDataManager tableDataManager = 
spy(createTableManagerWithAsyncSegmentRefreshEnabled());
+    File defaultDataDir = tableDataManager.getSegmentDataDir(SEGMENT_NAME);
+    assertFalse(defaultDataDir.exists());
+
+    // Add the segment to the manager's internal map so replaceSegment can 
find it
+    tableDataManager._segmentDataManagerMap.put(SEGMENT_NAME, 
segmentDataManager);
+
+    // Mock the methods that will be called during segment replacement
+    doAnswer(invocation -> 
zkMetadata).when(tableDataManager).fetchZKMetadata(SEGMENT_NAME);
+    doAnswer(invocation -> createTierIndexLoadingConfig(DEFAULT_TABLE_CONFIG))
+        .when(tableDataManager).fetchIndexLoadingConfig();
+
+    // Use CountDownLatch to wait for async execution
+    CountDownLatch latch = new CountDownLatch(1);
+    doAnswer(invocation -> {
+      // Call the original method
+      invocation.callRealMethod();
+      latch.countDown();
+      return null;
+    }).when(tableDataManager).replaceSegmentIfCrcMismatch(
+        any(SegmentDataManager.class), any(SegmentZKMetadata.class), 
any(IndexLoadingConfig.class));
+
+    // Call replaceSegment which will execute asynchronously
+    tableDataManager.replaceSegment(SEGMENT_NAME);
+
+    // Wait for async execution to complete
+    assertTrue(latch.await(10, TimeUnit.SECONDS), "Segment replacement should 
complete");
+    assertTrue(defaultDataDir.exists());
+    assertEquals(new SegmentMetadataImpl(defaultDataDir).getTotalDocs(), 5);
+
+    // Configured dataDir for coolTier, thus move to new dir.
+    tableDataManager = spy(createTableManagerWithAsyncSegmentRefreshEnabled());
+
+    // Add the segment to the manager's internal map
+    tableDataManager._segmentDataManagerMap.put(SEGMENT_NAME, 
segmentDataManager);
+
+    // Mock the methods for the second part of the test
+    doAnswer(invocation -> 
zkMetadata).when(tableDataManager).fetchZKMetadata(SEGMENT_NAME);
+    doAnswer(invocation -> createTierIndexLoadingConfig(TIER_TABLE_CONFIG))
+        .when(tableDataManager).fetchIndexLoadingConfig();
+
+    CountDownLatch latch2 = new CountDownLatch(1);
+    doAnswer(invocation -> {
+      // Call the original method
+      invocation.callRealMethod();
+      latch2.countDown();
+      return null;
+    }).when(tableDataManager).replaceSegmentIfCrcMismatch(
+        any(SegmentDataManager.class), any(SegmentZKMetadata.class), 
any(IndexLoadingConfig.class));
+
+    // Call replaceSegment which will execute asynchronously
+    tableDataManager.replaceSegment(SEGMENT_NAME);
+
+    // Wait for async execution to complete
+    assertTrue(latch2.await(10, TimeUnit.SECONDS), "Second segment replacement 
should complete");
+
+    File tierDataDir = tableDataManager.getSegmentDataDir(SEGMENT_NAME, 
TIER_NAME, TIER_TABLE_CONFIG);
+    assertTrue(tierDataDir.exists());
+    assertFalse(defaultDataDir.exists());
+    SegmentMetadata segmentMetadata = new SegmentMetadataImpl(tierDataDir);
+    assertEquals(segmentMetadata.getTotalDocs(), 5);
+    assertEquals(segmentMetadata.getIndexDir(), tierDataDir);
+  }
+
+  @Test
+  public void testReplaceSegmentNoopWithAsyncSegmentRefresh()
+      throws Exception {
+    String segmentName = "seg01";
+    SegmentZKMetadata zkMetadata = mock(SegmentZKMetadata.class);
+    when(zkMetadata.getSegmentName()).thenReturn(segmentName);
+    when(zkMetadata.getCrc()).thenReturn(1024L);
+
+    ImmutableSegmentDataManager segmentDataManager = 
createImmutableSegmentDataManager(segmentName, 1024L);
+
+    BaseTableDataManager tableDataManager = 
spy(createTableManagerWithAsyncSegmentRefreshEnabled());
+    assertFalse(tableDataManager.getSegmentDataDir(segmentName).exists());
+
+    // Add the segment to the manager's internal map so replaceSegment can 
find it
+    tableDataManager._segmentDataManagerMap.put(segmentName, 
segmentDataManager);
+
+    // Mock the methods that will be called during segment replacement
+    doAnswer(invocation -> 
zkMetadata).when(tableDataManager).fetchZKMetadata(segmentName);
+    doAnswer(invocation -> new 
IndexLoadingConfig()).when(tableDataManager).fetchIndexLoadingConfig();
+
+    // Use CountDownLatch to wait for async execution
+    CountDownLatch latch = new CountDownLatch(1);
+    doAnswer(invocation -> {
+      // Call the original method - this should be a no-op since CRCs match
+      invocation.callRealMethod();
+      latch.countDown();
+      return null;
+    }).when(tableDataManager).replaceSegmentIfCrcMismatch(
+        any(SegmentDataManager.class), any(SegmentZKMetadata.class), 
any(IndexLoadingConfig.class));
+
+    // Call replaceSegment which will execute asynchronously
+    tableDataManager.replaceSegment(segmentName);
+
+    // Wait for async execution to complete
+    assertTrue(latch.await(10, TimeUnit.SECONDS), "Segment replacement should 
complete");
+
+    // As CRC is same, the index dir is left as is, so not get created by the 
test.
+    assertFalse(tableDataManager.getSegmentDataDir(segmentName).exists());
+  }
+
   @Test
   public void testAddNewSegmentUseLocalCopy()
       throws Exception {
@@ -664,11 +826,24 @@ public class BaseTableDataManagerTest {
     return createTableManager(createDefaultInstanceDataManagerConfig());
   }
 
+  static OfflineTableDataManager 
createTableManagerWithAsyncSegmentRefreshEnabled() {
+    return 
createTableManagerWithAsyncSegmentRefreshEnabled(createDefaultInstanceDataManagerConfig());
+  }
+
   private static OfflineTableDataManager 
createTableManager(InstanceDataManagerConfig instanceDataManagerConfig) {
     OfflineTableDataManager tableDataManager = new OfflineTableDataManager();
     tableDataManager.init(instanceDataManagerConfig, mock(HelixManager.class), 
new SegmentLocks(), DEFAULT_TABLE_CONFIG,
         SCHEMA, new SegmentReloadSemaphore(1), 
Executors.newSingleThreadExecutor(), null, null,
-        SEGMENT_OPERATIONS_THROTTLER);
+        SEGMENT_OPERATIONS_THROTTLER, false);
+    return tableDataManager;
+  }
+
+  private static OfflineTableDataManager 
createTableManagerWithAsyncSegmentRefreshEnabled(
+      InstanceDataManagerConfig instanceDataManagerConfig) {
+    OfflineTableDataManager tableDataManager = new OfflineTableDataManager();
+    tableDataManager.init(instanceDataManagerConfig, mock(HelixManager.class), 
new SegmentLocks(), DEFAULT_TABLE_CONFIG,
+        SCHEMA, new SegmentReloadSemaphore(1), 
Executors.newSingleThreadExecutor(), null, null,
+        SEGMENT_OPERATIONS_THROTTLER, true);
     return tableDataManager;
   }
 
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManagerTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManagerTest.java
index 144b02f96b3..66337386eac 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManagerTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManagerTest.java
@@ -177,7 +177,8 @@ public class DimensionTableDataManagerTest {
     DimensionTableDataManager tableDataManager =
         
DimensionTableDataManager.createInstanceByTableName(OFFLINE_TABLE_NAME);
     tableDataManager.init(instanceDataManagerConfig, helixManager, new 
SegmentLocks(), tableConfig, schema,
-        new SegmentReloadSemaphore(1), Executors.newSingleThreadExecutor(), 
null, null, SEGMENT_OPERATIONS_THROTTLER);
+        new SegmentReloadSemaphore(1), Executors.newSingleThreadExecutor(), 
null, null, SEGMENT_OPERATIONS_THROTTLER,
+        false);
     tableDataManager.start();
     return tableDataManager;
   }
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingTableDataManagerProvider.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingTableDataManagerProvider.java
index dfa5cc32b43..e382e1ca2dc 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingTableDataManagerProvider.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingTableDataManagerProvider.java
@@ -73,10 +73,10 @@ public class FailureInjectingTableDataManagerProvider 
implements TableDataManage
 
   @Override
   public TableDataManager getTableDataManager(TableConfig tableConfig, Schema 
schema,
-      SegmentReloadSemaphore segmentReloadSemaphore, ExecutorService 
segmentReloadExecutor,
+      SegmentReloadSemaphore segmentReloadSemaphore, ExecutorService 
segmentReloadRefreshExecutor,
       @Nullable ExecutorService segmentPreloadExecutor,
       @Nullable Cache<Pair<String, String>, SegmentErrorInfo> errorCache,
-      Supplier<Boolean> isServerReadyToServeQueries) {
+      Supplier<Boolean> isServerReadyToServeQueries, boolean 
enableAsyncSegmentRefresh) {
     TableDataManager tableDataManager;
     switch (tableConfig.getTableType()) {
       case OFFLINE:
@@ -114,7 +114,8 @@ public class FailureInjectingTableDataManagerProvider 
implements TableDataManage
         throw new IllegalStateException();
     }
     tableDataManager.init(_instanceDataManagerConfig, _helixManager, 
_segmentLocks, tableConfig, schema,
-        segmentReloadSemaphore, segmentReloadExecutor, segmentPreloadExecutor, 
errorCache, null);
+        segmentReloadSemaphore, segmentReloadRefreshExecutor, 
segmentPreloadExecutor, errorCache, null,
+        enableAsyncSegmentRefresh);
     return tableDataManager;
   }
 }
diff --git 
a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkDimensionTableOverhead.java
 
b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkDimensionTableOverhead.java
index 0aff8f1415b..5d5f989d6a3 100644
--- 
a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkDimensionTableOverhead.java
+++ 
b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkDimensionTableOverhead.java
@@ -187,7 +187,8 @@ public class BenchmarkDimensionTableOverhead extends 
BaseQueriesTest {
     String tableName = TABLE_NAME + "_" + _iteration;
     _tableDataManager = 
DimensionTableDataManager.createInstanceByTableName(tableName);
     _tableDataManager.init(instanceDataManagerConfig, helixManager, new 
SegmentLocks(), tableConfig, SCHEMA,
-        new SegmentReloadSemaphore(1), Executors.newSingleThreadExecutor(), 
null, null, SEGMENT_OPERATIONS_THROTTLER);
+        new SegmentReloadSemaphore(1), Executors.newSingleThreadExecutor(), 
null, null, SEGMENT_OPERATIONS_THROTTLER,
+        false);
     _tableDataManager.start();
 
     for (int i = 0; i < _indexSegments.size(); i++) {
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
index c886a1f3085..96fd4827476 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
@@ -54,9 +54,9 @@ public interface TableDataManager {
    */
   void init(InstanceDataManagerConfig instanceDataManagerConfig, HelixManager 
helixManager, SegmentLocks segmentLocks,
       TableConfig tableConfig, Schema schema, SegmentReloadSemaphore 
segmentReloadSemaphore,
-      ExecutorService segmentReloadExecutor, @Nullable ExecutorService 
segmentPreloadExecutor,
+      ExecutorService segmentReloadRefreshExecutor, @Nullable ExecutorService 
segmentPreloadExecutor,
       @Nullable Cache<Pair<String, String>, SegmentErrorInfo> errorCache,
-      @Nullable SegmentOperationsThrottler segmentOperationsThrottler);
+      @Nullable SegmentOperationsThrottler segmentOperationsThrottler, boolean 
enableAsyncSegmentRefresh);
 
   /**
    * Returns the instance id of the server.
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentReloadSemaphore.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentReloadSemaphore.java
index 16aaca6b419..cf8b0e05326 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentReloadSemaphore.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentReloadSemaphore.java
@@ -35,9 +35,9 @@ public class SegmentReloadSemaphore {
   public void acquire(String segmentName, Logger logger)
       throws InterruptedException {
     long startTimeMs = System.currentTimeMillis();
-    logger.info("Waiting for lock to reload: {}, queue-length: {}", 
segmentName, _semaphore.getQueueLength());
+    logger.info("Waiting for lock to reload/refresh: {}, queue-length: {}", 
segmentName, _semaphore.getQueueLength());
     _semaphore.acquire();
-    logger.info("Acquired lock to reload segment: {} (lock-time={}ms, 
queue-length={})", segmentName,
+    logger.info("Acquired lock to reload/refresh segment: {} (lock-time={}ms, 
queue-length={})", segmentName,
         System.currentTimeMillis() - startTimeMs, _semaphore.getQueueLength());
   }
 
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
index 3baee22a89d..d1e26a02de5 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
@@ -109,7 +109,9 @@ public class HelixInstanceDataManager implements 
InstanceDataManager {
   protected Cache<String, Long> _recentlyDeletedTables;
 
   private SegmentReloadSemaphore _segmentReloadSemaphore;
-  private ExecutorService _segmentReloadExecutor;
+  private ExecutorService _segmentReloadRefreshExecutor;
+
+  private boolean _enableAsyncSegmentRefresh;
 
   @Nullable
   private ExecutorService _segmentPreloadExecutor;
@@ -149,9 +151,9 @@ public class HelixInstanceDataManager implements 
InstanceDataManager {
     Preconditions.checkArgument(maxParallelRefreshThreads > 0,
         "'pinot.server.instance.max.parallel.refresh.threads' must be 
positive, got: " + maxParallelRefreshThreads);
     _segmentReloadSemaphore = new 
SegmentReloadSemaphore(maxParallelRefreshThreads);
-    _segmentReloadExecutor = 
Executors.newFixedThreadPool(maxParallelRefreshThreads,
-        new 
ThreadFactoryBuilder().setNameFormat("segment-reload-thread-%d").build());
-    LOGGER.info("Created SegmentReloadExecutor with pool size: {}", 
maxParallelRefreshThreads);
+    _segmentReloadRefreshExecutor = 
Executors.newFixedThreadPool(maxParallelRefreshThreads,
+        new 
ThreadFactoryBuilder().setNameFormat("segment-reload-refresh-thread-%d").build());
+    LOGGER.info("Created SegmentReloadRefreshExecutor with pool size: {}", 
maxParallelRefreshThreads);
     int maxSegmentPreloadThreads = 
_instanceDataManagerConfig.getMaxSegmentPreloadThreads();
     if (maxSegmentPreloadThreads > 0) {
       _segmentPreloadExecutor = 
Executors.newFixedThreadPool(maxSegmentPreloadThreads,
@@ -160,6 +162,8 @@ public class HelixInstanceDataManager implements 
InstanceDataManager {
     } else {
       LOGGER.info("SegmentPreloadExecutor was not created with pool size: {}", 
maxSegmentPreloadThreads);
     }
+    _enableAsyncSegmentRefresh = isAsyncSegmentRefreshEnabled();
+    LOGGER.info("Segment refresh asynchronous handling is {}", 
_enableAsyncSegmentRefresh ? "enabled" : "disabled");
     LOGGER.info("Initialized Helix instance data manager");
 
     // Initialize the error cache and recently deleted tables cache
@@ -241,7 +245,7 @@ public class HelixInstanceDataManager implements 
InstanceDataManager {
 
   @Override
   public synchronized void shutDown() {
-    _segmentReloadExecutor.shutdownNow();
+    _segmentReloadRefreshExecutor.shutdownNow();
     if (_segmentPreloadExecutor != null) {
       _segmentPreloadExecutor.shutdownNow();
     }
@@ -328,7 +332,8 @@ public class HelixInstanceDataManager implements 
InstanceDataManager {
     TimestampIndexUtils.applyTimestampIndex(tableConfig, schema);
     TableDataManager tableDataManager =
         _tableDataManagerProvider.getTableDataManager(tableConfig, schema, 
_segmentReloadSemaphore,
-            _segmentReloadExecutor, _segmentPreloadExecutor, _errorCache, 
_isServerReadyToServeQueries);
+            _segmentReloadRefreshExecutor, _segmentPreloadExecutor, 
_errorCache, _isServerReadyToServeQueries,
+            _enableAsyncSegmentRefresh);
     tableDataManager.start();
     LOGGER.info("Created table data manager for table: {}", tableNameWithType);
     return tableDataManager;
@@ -512,6 +517,11 @@ public class HelixInstanceDataManager implements 
InstanceDataManager {
     return _instanceDataManagerConfig.getMaxParallelRefreshThreads();
   }
 
+  @Override
+  public boolean isAsyncSegmentRefreshEnabled() {
+    return _instanceDataManagerConfig.isAsyncSegmentRefreshEnabled();
+  }
+
   @Override
   public ZkHelixPropertyStore<ZNRecord> getPropertyStore() {
     return _propertyStore;
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java
index cb24365d94d..89fbb644831 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java
@@ -100,6 +100,11 @@ public class HelixInstanceDataManagerConfig implements 
InstanceDataManagerConfig
   //
   public static final String MAX_PARALLEL_REFRESH_THREADS = 
"max.parallel.refresh.threads";
 
+  // Whether to process SEGMENT_REFRESH in a synchronous or asynchronous 
manner when the messaged is received.
+  // Defaults to false, meaning SEGMENT_REFRESH will be processed in a 
synchronous manner.
+  public static final String ENABLE_ASYNC_SEGMENT_REFRESH = 
"enable.async.segment.refresh";
+  private static final boolean DEFAULT_ENABLE_ASYNC_SEGMENT_REFRESH = false;
+
   // To preload segments of table using upsert in parallel for fast upsert 
metadata recovery.
   private static final String MAX_SEGMENT_PRELOAD_THREADS = 
"max.segment.preload.threads";
 
@@ -252,6 +257,11 @@ public class HelixInstanceDataManagerConfig implements 
InstanceDataManagerConfig
     return _serverConfig.getProperty(MAX_PARALLEL_REFRESH_THREADS, 1);
   }
 
+  @Override
+  public boolean isAsyncSegmentRefreshEnabled() {
+    return _serverConfig.getProperty(ENABLE_ASYNC_SEGMENT_REFRESH, 
DEFAULT_ENABLE_ASYNC_SEGMENT_REFRESH);
+  }
+
   @Override
   public int getMaxSegmentPreloadThreads() {
     return _serverConfig.getProperty(MAX_SEGMENT_PRELOAD_THREADS, 0);
diff --git 
a/pinot-server/src/test/java/org/apache/pinot/server/api/BaseResourceTest.java 
b/pinot-server/src/test/java/org/apache/pinot/server/api/BaseResourceTest.java
index b3c194954f7..e114119f005 100644
--- 
a/pinot-server/src/test/java/org/apache/pinot/server/api/BaseResourceTest.java
+++ 
b/pinot-server/src/test/java/org/apache/pinot/server/api/BaseResourceTest.java
@@ -226,7 +226,7 @@ public abstract class BaseResourceTest {
     //       more checks
     TableDataManager tableDataManager = new OfflineTableDataManager();
     tableDataManager.init(instanceDataManagerConfig, helixManager, new 
SegmentLocks(), tableConfig, schema,
-        new SegmentReloadSemaphore(1), Executors.newSingleThreadExecutor(), 
null, null, null);
+        new SegmentReloadSemaphore(1), Executors.newSingleThreadExecutor(), 
null, null, null, false);
     tableDataManager.start();
     _tableDataManagerMap.put(tableNameWithType, tableDataManager);
   }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/instance/InstanceDataManagerConfig.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/instance/InstanceDataManagerConfig.java
index fcd7c6b61de..2aa61d7a0fc 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/instance/InstanceDataManagerConfig.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/instance/InstanceDataManagerConfig.java
@@ -55,6 +55,8 @@ public interface InstanceDataManagerConfig {
 
   int getMaxParallelRefreshThreads();
 
+  boolean isAsyncSegmentRefreshEnabled();
+
   int getMaxSegmentPreloadThreads();
 
   int getMaxParallelSegmentBuilds();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to