This is an automated email from the ASF dual-hosted git repository. somandal pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 7bd61f05a7 SegmentRelocator should skip rebalancing tables that haven't completed rebalance since the last relocator run (#15681) 7bd61f05a7 is described below commit 7bd61f05a710391d899484752b46117266736168 Author: Sonam Mandal <sonam.man...@startree.ai> AuthorDate: Wed Apr 30 13:34:41 2025 -0700 SegmentRelocator should skip rebalancing tables that haven't completed rebalance since the last relocator run (#15681) * SegmentRelocator should skip rebalancing tables that haven't completed rebalance since the last relocator run --- .../helix/core/relocation/SegmentRelocator.java | 26 ++++++++++++++++++++-- 1 file changed, 24 insertions(+), 2 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/SegmentRelocator.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/SegmentRelocator.java index 9bc8015d1c..0b4ad7d7ba 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/SegmentRelocator.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/SegmentRelocator.java @@ -30,6 +30,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.function.Consumer; +import javax.annotation.Nullable; import org.apache.hc.client5.http.io.HttpClientConnectionManager; import org.apache.helix.ClusterMessagingService; import org.apache.helix.Criteria; @@ -80,6 +81,7 @@ public class SegmentRelocator extends ControllerPeriodicTask<Void> { private final Set<String> _waitingTables; private final BlockingQueue<String> _waitingQueue; + @Nullable private final Set<String> _tablesUndergoingRebalance; public SegmentRelocator(PinotHelixResourceManager pinotHelixResourceManager, LeadControllerManager leadControllerManager, ControllerConf config, ControllerMetrics controllerMetrics, @@ -120,17 +122,24 @@ public class SegmentRelocator extends ControllerPeriodicTask<Void> { LOGGER.warn("Got interrupted while rebalancing tables sequentially", e); } }); + _tablesUndergoingRebalance = null; } else { _waitingTables = null; _waitingQueue = null; + _tablesUndergoingRebalance = ConcurrentHashMap.newKeySet(); } } @Override protected void processTable(String tableNameWithType) { if (_waitingTables == null) { - LOGGER.debug("Rebalance table: {} immediately", tableNameWithType); - _executorService.submit(() -> rebalanceTable(tableNameWithType)); + assert _tablesUndergoingRebalance != null; + if (!_tablesUndergoingRebalance.contains(tableNameWithType)) { + LOGGER.debug("Rebalance table: {} immediately", tableNameWithType); + _executorService.submit(() -> rebalanceTable(tableNameWithType)); + } else { + LOGGER.info("The previous rebalance has not yet completed, skip rebalancing table {}", tableNameWithType); + } return; } putTableToWait(tableNameWithType); @@ -195,6 +204,14 @@ public class SegmentRelocator extends ControllerPeriodicTask<Void> { rebalanceConfig.setIncludeConsuming(_includeConsuming); rebalanceConfig.setMinimizeDataMovement(_minimizeDataMovement); + if (_tablesUndergoingRebalance != null) { + LOGGER.debug("Start rebalancing table: {}, adding to tablesUndergoingRebalance", tableNameWithType); + if (!_tablesUndergoingRebalance.add(tableNameWithType)) { + LOGGER.warn("Skip rebalancing table: {}, table already exists in tablesUndergoingRebalance, a rebalance " + + "must have already been started", tableNameWithType); + return; + } + } try { // Relocating segments to new tiers needs two sequential actions: table rebalance and local tier migration. // Table rebalance moves segments to the new ideal servers, which can change for a segment when its target @@ -219,6 +236,11 @@ public class SegmentRelocator extends ControllerPeriodicTask<Void> { } } catch (Throwable t) { LOGGER.error("Caught exception/error while rebalancing table: {}", tableNameWithType, t); + } finally { + if (_tablesUndergoingRebalance != null) { + LOGGER.debug("Done rebalancing table: {}, removing from tablesUndergoingRebalance", tableNameWithType); + _tablesUndergoingRebalance.remove(tableNameWithType); + } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org