This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 99762cb [7338] Allow Reloading Segments with Multiple Threads (#7893) 99762cb is described below commit 99762cbb89e6bc3c2609667771a04b4c70e0056d Author: Prashant Pandey <84911643+suddend...@users.noreply.github.com> AuthorDate: Sat Dec 25 06:30:50 2021 +0530 [7338] Allow Reloading Segments with Multiple Threads (#7893) --- .../core/data/manager/InstanceDataManager.java | 7 ++- .../pinot/core/util/SegmentRefreshSemaphore.java | 60 ++++++++++++++++++++++ .../starter/helix/HelixInstanceDataManager.java | 40 ++++++++++----- .../helix/SegmentMessageHandlerFactory.java | 52 ++++++------------- 4 files changed, 107 insertions(+), 52 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java index 94546c0..14d98b3 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java @@ -29,6 +29,7 @@ import org.apache.helix.ZNRecord; import org.apache.helix.store.zk.ZkHelixPropertyStore; import org.apache.pinot.common.metrics.ServerMetrics; import org.apache.pinot.core.data.manager.realtime.SegmentUploader; +import org.apache.pinot.core.util.SegmentRefreshSemaphore; import org.apache.pinot.segment.local.data.manager.TableDataManager; import org.apache.pinot.segment.spi.SegmentMetadata; import org.apache.pinot.spi.env.PinotConfiguration; @@ -93,9 +94,11 @@ public interface InstanceDataManager { throws Exception; /** - * Reloads all segments in a table. + * Reloads all segments of a table. + * @param segmentRefreshSemaphore semaphore to control concurrent segment reloads/refresh */ - void reloadAllSegments(String tableNameWithType, boolean forceDownload) + void reloadAllSegments(String tableNameWithType, boolean forceDownload, + SegmentRefreshSemaphore segmentRefreshSemaphore) throws Exception; /** diff --git a/pinot-core/src/main/java/org/apache/pinot/core/util/SegmentRefreshSemaphore.java b/pinot-core/src/main/java/org/apache/pinot/core/util/SegmentRefreshSemaphore.java new file mode 100644 index 0000000..c2ed11c --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/util/SegmentRefreshSemaphore.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pinot.core.util; + +import java.util.concurrent.Semaphore; +import org.slf4j.Logger; + + +/** + * Wrapper class for semaphore used to control concurrent segment reload/refresh + */ +public class SegmentRefreshSemaphore { + + private final Semaphore _semaphore; + + public SegmentRefreshSemaphore(int permits, boolean fair) { + if (permits > 0) { + _semaphore = new Semaphore(permits, fair); + } else { + _semaphore = null; + } + } + + public void acquireSema(String segmentName, Logger logger) + throws InterruptedException { + if (_semaphore != null) { + long startTime = System.currentTimeMillis(); + logger.info("Waiting for lock to refresh : {}, queue-length: {}", segmentName, + _semaphore.getQueueLength()); + _semaphore.acquire(); + logger.info("Acquired lock to refresh segment: {} (lock-time={}ms, queue-length={})", segmentName, + System.currentTimeMillis() - startTime, _semaphore.getQueueLength()); + } else { + logger.info("Locking of refresh threads disabled (segment: {})", segmentName); + } + } + + public void releaseSema() { + if (_semaphore != null) { + _semaphore.release(); + } + } +} diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java index 947ab4f..9d1af9c 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java @@ -27,7 +27,11 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Lock; import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; @@ -44,6 +48,7 @@ import org.apache.pinot.core.data.manager.offline.TableDataManagerProvider; import org.apache.pinot.core.data.manager.realtime.PinotFSSegmentUploader; import org.apache.pinot.core.data.manager.realtime.SegmentBuildTimeLeaseExtender; import org.apache.pinot.core.data.manager.realtime.SegmentUploader; +import org.apache.pinot.core.util.SegmentRefreshSemaphore; import org.apache.pinot.segment.local.data.manager.SegmentDataManager; import org.apache.pinot.segment.local.data.manager.TableDataManager; import org.apache.pinot.segment.local.data.manager.TableDataManagerConfig; @@ -197,7 +202,8 @@ public class HelixInstanceDataManager implements InstanceDataManager { LOGGER.info("Reloading single segment: {} in table: {}", segmentName, tableNameWithType); SegmentMetadata segmentMetadata = getSegmentMetadata(tableNameWithType, segmentName); if (segmentMetadata == null) { - LOGGER.info("Segment metadata is null. Skip reloading segment: {} in table: {}", segmentName, tableNameWithType); + LOGGER.info("Segment metadata is null. Skip reloading segment: {} in table: {}", segmentName, + tableNameWithType); return; } @@ -212,33 +218,41 @@ public class HelixInstanceDataManager implements InstanceDataManager { } @Override - public void reloadAllSegments(String tableNameWithType, boolean forceDownload) { + public void reloadAllSegments(String tableNameWithType, boolean forceDownload, + SegmentRefreshSemaphore segmentRefreshSemaphore) + throws Exception { LOGGER.info("Reloading all segments in table: {}", tableNameWithType); TableConfig tableConfig = ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType); Preconditions.checkNotNull(tableConfig); - Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore, tableNameWithType); - List<String> failedSegments = new ArrayList<>(); - Exception sampleException = null; List<SegmentMetadata> segmentsMetadata = getAllSegmentsMetadata(tableNameWithType); - for (SegmentMetadata segmentMetadata : segmentsMetadata) { + ExecutorService workers = Executors.newCachedThreadPool(); + final AtomicReference<Exception> sampleException = new AtomicReference<>(); + //calling thread hasn't acquired any permit so we don't reload any segments using it. + CompletableFuture.allOf(segmentsMetadata.stream().map(segmentMetadata -> CompletableFuture.runAsync(() -> { + String segmentName = segmentMetadata.getName(); try { - reloadSegment(tableNameWithType, segmentMetadata, tableConfig, schema, forceDownload); + segmentRefreshSemaphore.acquireSema(segmentMetadata.getName(), LOGGER); + try { + reloadSegment(tableNameWithType, segmentMetadata, tableConfig, schema, forceDownload); + } finally { + segmentRefreshSemaphore.releaseSema(); + } } catch (Exception e) { - String segmentName = segmentMetadata.getName(); LOGGER.error("Caught exception while reloading segment: {} in table: {}", segmentName, tableNameWithType, e); failedSegments.add(segmentName); - sampleException = e; + sampleException.set(e); } - } + }, workers)).toArray(CompletableFuture[]::new)).get(); - if (sampleException != null) { + workers.shutdownNow(); + + if (sampleException.get() != null) { throw new RuntimeException( String.format("Failed to reload %d/%d segments: %s in table: %s", failedSegments.size(), - segmentsMetadata.size(), failedSegments, tableNameWithType), sampleException); + segmentsMetadata.size(), failedSegments, tableNameWithType), sampleException.get()); } - LOGGER.info("Reloaded all segments in table: {}", tableNameWithType); } diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentMessageHandlerFactory.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentMessageHandlerFactory.java index ad4a386..d00a558 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentMessageHandlerFactory.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentMessageHandlerFactory.java @@ -18,7 +18,6 @@ */ package org.apache.pinot.server.starter.helix; -import java.util.concurrent.Semaphore; import org.apache.helix.NotificationContext; import org.apache.helix.messaging.handling.HelixTaskResult; import org.apache.helix.messaging.handling.MessageHandler; @@ -30,6 +29,7 @@ import org.apache.pinot.common.messages.SegmentReloadMessage; import org.apache.pinot.common.metrics.ServerMeter; import org.apache.pinot.common.metrics.ServerMetrics; import org.apache.pinot.core.data.manager.InstanceDataManager; +import org.apache.pinot.core.util.SegmentRefreshSemaphore; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,39 +39,14 @@ public class SegmentMessageHandlerFactory implements MessageHandlerFactory { // We only allow limited number of segments refresh/reload happen at the same time // The reason for that is segment refresh/reload will temporarily use double-sized memory - private final Semaphore _refreshThreadSemaphore; private final InstanceDataManager _instanceDataManager; private final ServerMetrics _metrics; + private final SegmentRefreshSemaphore _segmentRefreshSemaphore; public SegmentMessageHandlerFactory(InstanceDataManager instanceDataManager, ServerMetrics metrics) { _instanceDataManager = instanceDataManager; _metrics = metrics; - int maxParallelRefreshThreads = instanceDataManager.getMaxParallelRefreshThreads(); - if (maxParallelRefreshThreads > 0) { - _refreshThreadSemaphore = new Semaphore(maxParallelRefreshThreads, true); - } else { - _refreshThreadSemaphore = null; - } - } - - private void acquireSema(String context, Logger logger) - throws InterruptedException { - if (_refreshThreadSemaphore != null) { - long startTime = System.currentTimeMillis(); - logger.info("Waiting for lock to refresh : {}, queue-length: {}", context, - _refreshThreadSemaphore.getQueueLength()); - _refreshThreadSemaphore.acquire(); - logger.info("Acquired lock to refresh segment: {} (lock-time={}ms, queue-length={})", context, - System.currentTimeMillis() - startTime, _refreshThreadSemaphore.getQueueLength()); - } else { - LOGGER.info("Locking of refresh threads disabled (segment: {})", context); - } - } - - private void releaseSema() { - if (_refreshThreadSemaphore != null) { - _refreshThreadSemaphore.release(); - } + _segmentRefreshSemaphore = new SegmentRefreshSemaphore(instanceDataManager.getMaxParallelRefreshThreads(), true); } // Called each time a message is received. @@ -113,7 +88,7 @@ public class SegmentMessageHandlerFactory implements MessageHandlerFactory { HelixTaskResult result = new HelixTaskResult(); _logger.info("Handling message: {}", _message); try { - acquireSema(_segmentName, _logger); + _segmentRefreshSemaphore.acquireSema(_segmentName, _logger); // The number of retry times depends on the retry count in Constants. _instanceDataManager.addOrReplaceSegment(_tableNameWithType, _segmentName); result.setSuccess(true); @@ -121,7 +96,7 @@ public class SegmentMessageHandlerFactory implements MessageHandlerFactory { _metrics.addMeteredTableValue(_tableNameWithType, ServerMeter.REFRESH_FAILURES, 1); Utils.rethrowException(e); } finally { - releaseSema(); + _segmentRefreshSemaphore.releaseSema(); } return result; } @@ -143,14 +118,19 @@ public class SegmentMessageHandlerFactory implements MessageHandlerFactory { _logger.info("Handling message: {}", _message); try { if (_segmentName.equals("")) { - acquireSema("ALL", _logger); // NOTE: the method aborts if any segment reload encounters an unhandled exception, - // and can lead to inconsistent state across segments - _instanceDataManager.reloadAllSegments(_tableNameWithType, _forceDownload); + // and can lead to inconsistent state across segments. + //we don't acquire any permit here as they'll be acquired by worked threads later + _instanceDataManager.reloadAllSegments(_tableNameWithType, _forceDownload, + _segmentRefreshSemaphore); } else { // Reload one segment - acquireSema(_segmentName, _logger); - _instanceDataManager.reloadSegment(_tableNameWithType, _segmentName, _forceDownload); + _segmentRefreshSemaphore.acquireSema(_segmentName, _logger); + try { + _instanceDataManager.reloadSegment(_tableNameWithType, _segmentName, _forceDownload); + } finally { + _segmentRefreshSemaphore.releaseSema(); + } } helixTaskResult.setSuccess(true); } catch (Throwable e) { @@ -159,8 +139,6 @@ public class SegmentMessageHandlerFactory implements MessageHandlerFactory { // (without any corresponding logs to indicate failure!) in the callable path throw new RuntimeException( "Caught exception while reloading segment: " + _segmentName + " in table: " + _tableNameWithType, e); - } finally { - releaseSema(); } return helixTaskResult; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org