This is an automated email from the ASF dual-hosted git repository.

somandal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 7bd61f05a7 SegmentRelocator should skip rebalancing tables that 
haven't completed rebalance since the last relocator run (#15681)
7bd61f05a7 is described below

commit 7bd61f05a710391d899484752b46117266736168
Author: Sonam Mandal <sonam.man...@startree.ai>
AuthorDate: Wed Apr 30 13:34:41 2025 -0700

    SegmentRelocator should skip rebalancing tables that haven't completed 
rebalance since the last relocator run (#15681)
    
    * SegmentRelocator should skip rebalancing tables that haven't completed 
rebalance since the last relocator run
---
 .../helix/core/relocation/SegmentRelocator.java    | 26 ++++++++++++++++++++--
 1 file changed, 24 insertions(+), 2 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/SegmentRelocator.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/SegmentRelocator.java
index 9bc8015d1c..0b4ad7d7ba 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/SegmentRelocator.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/SegmentRelocator.java
@@ -30,6 +30,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.function.Consumer;
+import javax.annotation.Nullable;
 import org.apache.hc.client5.http.io.HttpClientConnectionManager;
 import org.apache.helix.ClusterMessagingService;
 import org.apache.helix.Criteria;
@@ -80,6 +81,7 @@ public class SegmentRelocator extends 
ControllerPeriodicTask<Void> {
 
   private final Set<String> _waitingTables;
   private final BlockingQueue<String> _waitingQueue;
+  @Nullable private final Set<String> _tablesUndergoingRebalance;
 
   public SegmentRelocator(PinotHelixResourceManager pinotHelixResourceManager,
       LeadControllerManager leadControllerManager, ControllerConf config, 
ControllerMetrics controllerMetrics,
@@ -120,17 +122,24 @@ public class SegmentRelocator extends 
ControllerPeriodicTask<Void> {
           LOGGER.warn("Got interrupted while rebalancing tables sequentially", 
e);
         }
       });
+      _tablesUndergoingRebalance = null;
     } else {
       _waitingTables = null;
       _waitingQueue = null;
+      _tablesUndergoingRebalance = ConcurrentHashMap.newKeySet();
     }
   }
 
   @Override
   protected void processTable(String tableNameWithType) {
     if (_waitingTables == null) {
-      LOGGER.debug("Rebalance table: {} immediately", tableNameWithType);
-      _executorService.submit(() -> rebalanceTable(tableNameWithType));
+      assert _tablesUndergoingRebalance != null;
+      if (!_tablesUndergoingRebalance.contains(tableNameWithType)) {
+        LOGGER.debug("Rebalance table: {} immediately", tableNameWithType);
+        _executorService.submit(() -> rebalanceTable(tableNameWithType));
+      } else {
+        LOGGER.info("The previous rebalance has not yet completed, skip 
rebalancing table {}", tableNameWithType);
+      }
       return;
     }
     putTableToWait(tableNameWithType);
@@ -195,6 +204,14 @@ public class SegmentRelocator extends 
ControllerPeriodicTask<Void> {
     rebalanceConfig.setIncludeConsuming(_includeConsuming);
     rebalanceConfig.setMinimizeDataMovement(_minimizeDataMovement);
 
+    if (_tablesUndergoingRebalance != null) {
+      LOGGER.debug("Start rebalancing table: {}, adding to 
tablesUndergoingRebalance", tableNameWithType);
+      if (!_tablesUndergoingRebalance.add(tableNameWithType)) {
+        LOGGER.warn("Skip rebalancing table: {}, table already exists in 
tablesUndergoingRebalance, a rebalance "
+            + "must have already been started", tableNameWithType);
+        return;
+      }
+    }
     try {
       // Relocating segments to new tiers needs two sequential actions: table 
rebalance and local tier migration.
       // Table rebalance moves segments to the new ideal servers, which can 
change for a segment when its target
@@ -219,6 +236,11 @@ public class SegmentRelocator extends 
ControllerPeriodicTask<Void> {
       }
     } catch (Throwable t) {
       LOGGER.error("Caught exception/error while rebalancing table: {}", 
tableNameWithType, t);
+    } finally {
+      if (_tablesUndergoingRebalance != null) {
+        LOGGER.debug("Done rebalancing table: {}, removing from 
tablesUndergoingRebalance", tableNameWithType);
+        _tablesUndergoingRebalance.remove(tableNameWithType);
+      }
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to