This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 99762cb  [7338] Allow Reloading Segments with Multiple Threads (#7893)
99762cb is described below

commit 99762cbb89e6bc3c2609667771a04b4c70e0056d
Author: Prashant Pandey <84911643+suddend...@users.noreply.github.com>
AuthorDate: Sat Dec 25 06:30:50 2021 +0530

    [7338] Allow Reloading Segments with Multiple Threads (#7893)
---
 .../core/data/manager/InstanceDataManager.java     |  7 ++-
 .../pinot/core/util/SegmentRefreshSemaphore.java   | 60 ++++++++++++++++++++++
 .../starter/helix/HelixInstanceDataManager.java    | 40 ++++++++++-----
 .../helix/SegmentMessageHandlerFactory.java        | 52 ++++++-------------
 4 files changed, 107 insertions(+), 52 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java
index 94546c0..14d98b3 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java
@@ -29,6 +29,7 @@ import org.apache.helix.ZNRecord;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.core.data.manager.realtime.SegmentUploader;
+import org.apache.pinot.core.util.SegmentRefreshSemaphore;
 import org.apache.pinot.segment.local.data.manager.TableDataManager;
 import org.apache.pinot.segment.spi.SegmentMetadata;
 import org.apache.pinot.spi.env.PinotConfiguration;
@@ -93,9 +94,11 @@ public interface InstanceDataManager {
       throws Exception;
 
   /**
-   * Reloads all segments in a table.
+   * Reloads all segments of a table.
+   * @param segmentRefreshSemaphore semaphore to control concurrent segment 
reloads/refresh
    */
-  void reloadAllSegments(String tableNameWithType, boolean forceDownload)
+  void reloadAllSegments(String tableNameWithType, boolean forceDownload,
+      SegmentRefreshSemaphore segmentRefreshSemaphore)
       throws Exception;
 
   /**
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/util/SegmentRefreshSemaphore.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/util/SegmentRefreshSemaphore.java
new file mode 100644
index 0000000..c2ed11c
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/util/SegmentRefreshSemaphore.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.core.util;
+
+import java.util.concurrent.Semaphore;
+import org.slf4j.Logger;
+
+
+/**
+ * Wrapper class for semaphore used to control concurrent segment 
reload/refresh
+ */
+public class SegmentRefreshSemaphore {
+
+  private final Semaphore _semaphore;
+
+  public SegmentRefreshSemaphore(int permits, boolean fair) {
+    if (permits > 0) {
+      _semaphore = new Semaphore(permits, fair);
+    } else {
+      _semaphore = null;
+    }
+  }
+
+  public void acquireSema(String segmentName, Logger logger)
+      throws InterruptedException {
+    if (_semaphore != null) {
+      long startTime = System.currentTimeMillis();
+      logger.info("Waiting for lock to refresh : {}, queue-length: {}", 
segmentName,
+          _semaphore.getQueueLength());
+      _semaphore.acquire();
+      logger.info("Acquired lock to refresh segment: {} (lock-time={}ms, 
queue-length={})", segmentName,
+          System.currentTimeMillis() - startTime, _semaphore.getQueueLength());
+    } else {
+      logger.info("Locking of refresh threads disabled (segment: {})", 
segmentName);
+    }
+  }
+
+  public void releaseSema() {
+    if (_semaphore != null) {
+      _semaphore.release();
+    }
+  }
+}
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
index 947ab4f..9d1af9c 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
@@ -27,7 +27,11 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.Lock;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.ThreadSafe;
@@ -44,6 +48,7 @@ import 
org.apache.pinot.core.data.manager.offline.TableDataManagerProvider;
 import org.apache.pinot.core.data.manager.realtime.PinotFSSegmentUploader;
 import 
org.apache.pinot.core.data.manager.realtime.SegmentBuildTimeLeaseExtender;
 import org.apache.pinot.core.data.manager.realtime.SegmentUploader;
+import org.apache.pinot.core.util.SegmentRefreshSemaphore;
 import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
 import org.apache.pinot.segment.local.data.manager.TableDataManager;
 import org.apache.pinot.segment.local.data.manager.TableDataManagerConfig;
@@ -197,7 +202,8 @@ public class HelixInstanceDataManager implements 
InstanceDataManager {
     LOGGER.info("Reloading single segment: {} in table: {}", segmentName, 
tableNameWithType);
     SegmentMetadata segmentMetadata = getSegmentMetadata(tableNameWithType, 
segmentName);
     if (segmentMetadata == null) {
-      LOGGER.info("Segment metadata is null. Skip reloading segment: {} in 
table: {}", segmentName, tableNameWithType);
+      LOGGER.info("Segment metadata is null. Skip reloading segment: {} in 
table: {}", segmentName,
+          tableNameWithType);
       return;
     }
 
@@ -212,33 +218,41 @@ public class HelixInstanceDataManager implements 
InstanceDataManager {
   }
 
   @Override
-  public void reloadAllSegments(String tableNameWithType, boolean 
forceDownload) {
+  public void reloadAllSegments(String tableNameWithType, boolean 
forceDownload,
+      SegmentRefreshSemaphore segmentRefreshSemaphore)
+      throws Exception {
     LOGGER.info("Reloading all segments in table: {}", tableNameWithType);
     TableConfig tableConfig = 
ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType);
     Preconditions.checkNotNull(tableConfig);
-
     Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore, 
tableNameWithType);
-
     List<String> failedSegments = new ArrayList<>();
-    Exception sampleException = null;
     List<SegmentMetadata> segmentsMetadata = 
getAllSegmentsMetadata(tableNameWithType);
-    for (SegmentMetadata segmentMetadata : segmentsMetadata) {
+    ExecutorService workers = Executors.newCachedThreadPool();
+    final AtomicReference<Exception> sampleException = new AtomicReference<>();
+    //calling thread hasn't acquired any permit so we don't reload any 
segments using it.
+    CompletableFuture.allOf(segmentsMetadata.stream().map(segmentMetadata -> 
CompletableFuture.runAsync(() -> {
+      String segmentName = segmentMetadata.getName();
       try {
-        reloadSegment(tableNameWithType, segmentMetadata, tableConfig, schema, 
forceDownload);
+        segmentRefreshSemaphore.acquireSema(segmentMetadata.getName(), LOGGER);
+        try {
+          reloadSegment(tableNameWithType, segmentMetadata, tableConfig, 
schema, forceDownload);
+        } finally {
+          segmentRefreshSemaphore.releaseSema();
+        }
       } catch (Exception e) {
-        String segmentName = segmentMetadata.getName();
         LOGGER.error("Caught exception while reloading segment: {} in table: 
{}", segmentName, tableNameWithType, e);
         failedSegments.add(segmentName);
-        sampleException = e;
+        sampleException.set(e);
       }
-    }
+    }, workers)).toArray(CompletableFuture[]::new)).get();
 
-    if (sampleException != null) {
+    workers.shutdownNow();
+
+    if (sampleException.get() != null) {
       throw new RuntimeException(
           String.format("Failed to reload %d/%d segments: %s in table: %s", 
failedSegments.size(),
-              segmentsMetadata.size(), failedSegments, tableNameWithType), 
sampleException);
+              segmentsMetadata.size(), failedSegments, tableNameWithType), 
sampleException.get());
     }
-
     LOGGER.info("Reloaded all segments in table: {}", tableNameWithType);
   }
 
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentMessageHandlerFactory.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentMessageHandlerFactory.java
index ad4a386..d00a558 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentMessageHandlerFactory.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentMessageHandlerFactory.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pinot.server.starter.helix;
 
-import java.util.concurrent.Semaphore;
 import org.apache.helix.NotificationContext;
 import org.apache.helix.messaging.handling.HelixTaskResult;
 import org.apache.helix.messaging.handling.MessageHandler;
@@ -30,6 +29,7 @@ import org.apache.pinot.common.messages.SegmentReloadMessage;
 import org.apache.pinot.common.metrics.ServerMeter;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.core.data.manager.InstanceDataManager;
+import org.apache.pinot.core.util.SegmentRefreshSemaphore;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -39,39 +39,14 @@ public class SegmentMessageHandlerFactory implements 
MessageHandlerFactory {
 
   // We only allow limited number of segments refresh/reload happen at the 
same time
   // The reason for that is segment refresh/reload will temporarily use 
double-sized memory
-  private final Semaphore _refreshThreadSemaphore;
   private final InstanceDataManager _instanceDataManager;
   private final ServerMetrics _metrics;
+  private final SegmentRefreshSemaphore _segmentRefreshSemaphore;
 
   public SegmentMessageHandlerFactory(InstanceDataManager instanceDataManager, 
ServerMetrics metrics) {
     _instanceDataManager = instanceDataManager;
     _metrics = metrics;
-    int maxParallelRefreshThreads = 
instanceDataManager.getMaxParallelRefreshThreads();
-    if (maxParallelRefreshThreads > 0) {
-      _refreshThreadSemaphore = new Semaphore(maxParallelRefreshThreads, true);
-    } else {
-      _refreshThreadSemaphore = null;
-    }
-  }
-
-  private void acquireSema(String context, Logger logger)
-      throws InterruptedException {
-    if (_refreshThreadSemaphore != null) {
-      long startTime = System.currentTimeMillis();
-      logger.info("Waiting for lock to refresh : {}, queue-length: {}", 
context,
-          _refreshThreadSemaphore.getQueueLength());
-      _refreshThreadSemaphore.acquire();
-      logger.info("Acquired lock to refresh segment: {} (lock-time={}ms, 
queue-length={})", context,
-          System.currentTimeMillis() - startTime, 
_refreshThreadSemaphore.getQueueLength());
-    } else {
-      LOGGER.info("Locking of refresh threads disabled (segment: {})", 
context);
-    }
-  }
-
-  private void releaseSema() {
-    if (_refreshThreadSemaphore != null) {
-      _refreshThreadSemaphore.release();
-    }
+    _segmentRefreshSemaphore = new 
SegmentRefreshSemaphore(instanceDataManager.getMaxParallelRefreshThreads(), 
true);
   }
 
   // Called each time a message is received.
@@ -113,7 +88,7 @@ public class SegmentMessageHandlerFactory implements 
MessageHandlerFactory {
       HelixTaskResult result = new HelixTaskResult();
       _logger.info("Handling message: {}", _message);
       try {
-        acquireSema(_segmentName, _logger);
+        _segmentRefreshSemaphore.acquireSema(_segmentName, _logger);
         // The number of retry times depends on the retry count in Constants.
         _instanceDataManager.addOrReplaceSegment(_tableNameWithType, 
_segmentName);
         result.setSuccess(true);
@@ -121,7 +96,7 @@ public class SegmentMessageHandlerFactory implements 
MessageHandlerFactory {
         _metrics.addMeteredTableValue(_tableNameWithType, 
ServerMeter.REFRESH_FAILURES, 1);
         Utils.rethrowException(e);
       } finally {
-        releaseSema();
+        _segmentRefreshSemaphore.releaseSema();
       }
       return result;
     }
@@ -143,14 +118,19 @@ public class SegmentMessageHandlerFactory implements 
MessageHandlerFactory {
       _logger.info("Handling message: {}", _message);
       try {
         if (_segmentName.equals("")) {
-          acquireSema("ALL", _logger);
           // NOTE: the method aborts if any segment reload encounters an 
unhandled exception,
-          // and can lead to inconsistent state across segments
-          _instanceDataManager.reloadAllSegments(_tableNameWithType, 
_forceDownload);
+          // and can lead to inconsistent state across segments.
+          //we don't acquire any permit here as they'll be acquired by worked 
threads later
+          _instanceDataManager.reloadAllSegments(_tableNameWithType, 
_forceDownload,
+              _segmentRefreshSemaphore);
         } else {
           // Reload one segment
-          acquireSema(_segmentName, _logger);
-          _instanceDataManager.reloadSegment(_tableNameWithType, _segmentName, 
_forceDownload);
+          _segmentRefreshSemaphore.acquireSema(_segmentName, _logger);
+          try {
+            _instanceDataManager.reloadSegment(_tableNameWithType, 
_segmentName, _forceDownload);
+          } finally {
+            _segmentRefreshSemaphore.releaseSema();
+          }
         }
         helixTaskResult.setSuccess(true);
       } catch (Throwable e) {
@@ -159,8 +139,6 @@ public class SegmentMessageHandlerFactory implements 
MessageHandlerFactory {
         // (without any corresponding logs to indicate failure!) in the 
callable path
         throw new RuntimeException(
             "Caught exception while reloading segment: " + _segmentName + " in 
table: " + _tableNameWithType, e);
-      } finally {
-        releaseSema();
       }
       return helixTaskResult;
     }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to