This is an automated email from the ASF dual-hosted git repository. jlli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 87c089f8eb Release all segments of a table in releaseAndRemoveAllSegments method (#12297) 87c089f8eb is described below commit 87c089f8eb4ea019231ac5a5600aaf12f591a20d Author: Jialiang Li <j...@linkedin.com> AuthorDate: Fri Feb 2 10:11:45 2024 -0800 Release all segments of a table in releaseAndRemoveAllSegments method (#12297) --- .../pinot/core/data/manager/BaseTableDataManager.java | 19 +++++++++++++++++-- .../starter/helix/HelixInstanceDataManager.java | 19 +++++++++++++++++-- 2 files changed, 34 insertions(+), 4 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java index 119bede805..191ea04a0d 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java @@ -33,6 +33,7 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -223,8 +224,22 @@ public abstract class BaseTableDataManager implements TableDataManager { segmentDataManagers = new ArrayList<>(_segmentDataManagerMap.values()); _segmentDataManagerMap.clear(); } - for (SegmentDataManager segmentDataManager : segmentDataManagers) { - releaseSegment(segmentDataManager); + if (!segmentDataManagers.isEmpty()) { + int numThreads = Math.min(Runtime.getRuntime().availableProcessors(), segmentDataManagers.size()); + ExecutorService stopExecutorService = Executors.newFixedThreadPool(numThreads); + for (SegmentDataManager segmentDataManager : segmentDataManagers) { + stopExecutorService.submit(() -> releaseSegment(segmentDataManager)); + } + stopExecutorService.shutdown(); + try { + // Wait at most 10 minutes before exiting this method. + if (!stopExecutorService.awaitTermination(10, TimeUnit.MINUTES)) { + stopExecutorService.shutdownNow(); + } + } catch (InterruptedException e) { + stopExecutorService.shutdownNow(); + Thread.currentThread().interrupt(); + } } } diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java index 8e147cc74a..f097c4103c 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java @@ -33,6 +33,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Lock; import java.util.function.Supplier; @@ -203,8 +204,22 @@ public class HelixInstanceDataManager implements InstanceDataManager { if (_segmentPreloadExecutor != null) { _segmentPreloadExecutor.shutdownNow(); } - for (TableDataManager tableDataManager : _tableDataManagerMap.values()) { - tableDataManager.shutDown(); + if (!_tableDataManagerMap.isEmpty()) { + int numThreads = Math.min(Runtime.getRuntime().availableProcessors(), _tableDataManagerMap.size()); + ExecutorService stopExecutorService = Executors.newFixedThreadPool(numThreads); + for (TableDataManager tableDataManager : _tableDataManagerMap.values()) { + stopExecutorService.submit(tableDataManager::shutDown); + } + stopExecutorService.shutdown(); + try { + // Wait at most 10 minutes before exiting this method. + if (!stopExecutorService.awaitTermination(10, TimeUnit.MINUTES)) { + stopExecutorService.shutdownNow(); + } + } catch (InterruptedException e) { + stopExecutorService.shutdownNow(); + Thread.currentThread().interrupt(); + } } SegmentBuildTimeLeaseExtender.shutdownExecutor(); LOGGER.info("Helix instance data manager shut down"); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org