This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch rebalance_llc in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 82cbaca278eea6571f4cd44a2eff694b3f8dc4f3 Author: Jackie (Xiaotian) Jiang <xaji...@linkedin.com> AuthorDate: Tue Jun 18 16:53:58 2019 -0700 Take OFFLINE segment into account for real-time rebalancer When the consuming segment encounters error, the ideal state can be turned into OFFLINE If the instance states for a segment is all OFFLINE, the segment is counted OFFLINE and won't be rebalanced --- ...ealtimeBalanceNumSegmentAssignmentStrategy.java | 13 +++++++++---- ...ltimeReplicaGroupSegmentAssignmentStrategy.java | 13 +++++++++---- .../assignment/segment/SegmentAssignmentUtils.java | 22 +++++++++++++++++----- ...imeBalanceNumSegmentAssignmentStrategyTest.java | 9 +++++++++ ...eReplicaGroupSegmentAssignmentStrategyTest.java | 9 +++++++++ 5 files changed, 53 insertions(+), 13 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeBalanceNumSegmentAssignmentStrategy.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeBalanceNumSegmentAssignmentStrategy.java index 72bdf29..1395294 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeBalanceNumSegmentAssignmentStrategy.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeBalanceNumSegmentAssignmentStrategy.java @@ -27,7 +27,6 @@ import org.apache.pinot.common.config.TableConfig; import org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.RealtimeSegmentOnlineOfflineStateModel; import org.apache.pinot.common.utils.InstancePartitionsType; import org.apache.pinot.common.utils.LLCSegmentName; -import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentUtils.CompletedConsumingSegmentAssignmentPair; import org.apache.pinot.controller.helix.core.rebalance.RebalanceUserConfigConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -95,10 +94,12 @@ public class RealtimeBalanceNumSegmentAssignmentStrategy implements SegmentAssig @Override public Map<String, Map<String, String>> rebalanceTable(Map<String, Map<String, String>> currentAssignment, Configuration config) { - CompletedConsumingSegmentAssignmentPair pair = new CompletedConsumingSegmentAssignmentPair(currentAssignment); + SegmentAssignmentUtils.CompletedConsumingOfflineSegmentAssignment completedConsumingOfflineSegmentAssignment = + new SegmentAssignmentUtils.CompletedConsumingOfflineSegmentAssignment(currentAssignment); // Rebalance COMPLETED segments first - Map<String, Map<String, String>> completedSegmentAssignment = pair.getCompletedSegmentAssignment(); + Map<String, Map<String, String>> completedSegmentAssignment = + completedConsumingOfflineSegmentAssignment.getCompletedSegmentAssignment(); List<String> instancesForCompletedSegments = SegmentAssignmentUtils .getInstancesForBalanceNumStrategy(_helixManager, _tableConfig, _replication, InstancePartitionsType.COMPLETED); Map<String, Map<String, String>> newAssignment = SegmentAssignmentUtils @@ -106,7 +107,8 @@ public class RealtimeBalanceNumSegmentAssignmentStrategy implements SegmentAssig _replication); // Rebalance CONSUMING segments if needed - Map<String, Map<String, String>> consumingSegmentAssignment = pair.getConsumingSegmentAssignment(); + Map<String, Map<String, String>> consumingSegmentAssignment = + completedConsumingOfflineSegmentAssignment.getConsumingSegmentAssignment(); if (config.getBoolean(RebalanceUserConfigConstants.INCLUDE_CONSUMING, RebalanceUserConfigConstants.DEFAULT_INCLUDE_CONSUMING)) { List<String> instancesForConsumingSegments = SegmentAssignmentUtils @@ -132,6 +134,9 @@ public class RealtimeBalanceNumSegmentAssignmentStrategy implements SegmentAssig newAssignment.putAll(consumingSegmentAssignment); } + // Keep the OFFLINE segments not moved + newAssignment.putAll(completedConsumingOfflineSegmentAssignment.getOfflineSegmentAssignment()); + return newAssignment; } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentStrategy.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentStrategy.java index afd74be..3e80b44 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentStrategy.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentStrategy.java @@ -90,11 +90,12 @@ public class RealtimeReplicaGroupSegmentAssignmentStrategy implements SegmentAss @Override public Map<String, Map<String, String>> rebalanceTable(Map<String, Map<String, String>> currentAssignment, Configuration config) { - SegmentAssignmentUtils.CompletedConsumingSegmentAssignmentPair pair = - new SegmentAssignmentUtils.CompletedConsumingSegmentAssignmentPair(currentAssignment); + SegmentAssignmentUtils.CompletedConsumingOfflineSegmentAssignment completedConsumingOfflineSegmentAssignment = + new SegmentAssignmentUtils.CompletedConsumingOfflineSegmentAssignment(currentAssignment); // Rebalance COMPLETED segments first - Map<String, Map<String, String>> completedSegmentAssignment = pair.getCompletedSegmentAssignment(); + Map<String, Map<String, String>> completedSegmentAssignment = + completedConsumingOfflineSegmentAssignment.getCompletedSegmentAssignment(); InstancePartitions instancePartitionsForCompletedSegments = InstancePartitionsUtils .fetchOrComputeInstancePartitions(_helixManager, _tableConfig, InstancePartitionsType.COMPLETED); Map<Integer, Set<String>> partitionIdToSegmentsMap = new HashMap<>(); @@ -107,7 +108,8 @@ public class RealtimeReplicaGroupSegmentAssignmentStrategy implements SegmentAss partitionIdToSegmentsMap); // Rebalance CONSUMING segments if needed - Map<String, Map<String, String>> consumingSegmentAssignment = pair.getConsumingSegmentAssignment(); + Map<String, Map<String, String>> consumingSegmentAssignment = + completedConsumingOfflineSegmentAssignment.getConsumingSegmentAssignment(); if (config.getBoolean(RebalanceUserConfigConstants.INCLUDE_CONSUMING, RebalanceUserConfigConstants.DEFAULT_INCLUDE_CONSUMING)) { InstancePartitions instancePartitionsForConsumingSegments = InstancePartitionsUtils @@ -134,6 +136,9 @@ public class RealtimeReplicaGroupSegmentAssignmentStrategy implements SegmentAss newAssignment.putAll(consumingSegmentAssignment); } + // Keep the OFFLINE segments not moved + newAssignment.putAll(completedConsumingOfflineSegmentAssignment.getOfflineSegmentAssignment()); + return newAssignment; } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java index 4333f91..0f3fe6f 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java @@ -248,19 +248,27 @@ class SegmentAssignmentUtils { } /** - * Class that splits segment assignment into CONSUMING segments and COMPLETED segments. + * Class that splits segment assignment into COMPLETED, CONSUMING and OFFLINE segments. */ - static class CompletedConsumingSegmentAssignmentPair { + static class CompletedConsumingOfflineSegmentAssignment { private final Map<String, Map<String, String>> _completedSegmentAssignment = new TreeMap<>(); private final Map<String, Map<String, String>> _consumingSegmentAssignment = new TreeMap<>(); + private final Map<String, Map<String, String>> _offlineSegmentAssignment = new TreeMap<>(); - CompletedConsumingSegmentAssignmentPair(Map<String, Map<String, String>> segmentAssignment) { + // NOTE: split the segments based on the following criteria: + // 1. At least one instance ONLINE -> COMPLETED segment + // 2. At least one instance CONSUMING -> CONSUMING segment + // 3. All instances OFFLINE (all instances encountered error while consuming) -> OFFLINE segment + CompletedConsumingOfflineSegmentAssignment(Map<String, Map<String, String>> segmentAssignment) { for (Map.Entry<String, Map<String, String>> entry : segmentAssignment.entrySet()) { + String segmentName = entry.getKey(); Map<String, String> instanceStateMap = entry.getValue(); if (instanceStateMap.values().contains(RealtimeSegmentOnlineOfflineStateModel.ONLINE)) { - _completedSegmentAssignment.put(entry.getKey(), instanceStateMap); + _completedSegmentAssignment.put(segmentName, instanceStateMap); + } else if (instanceStateMap.values().contains(RealtimeSegmentOnlineOfflineStateModel.CONSUMING)) { + _consumingSegmentAssignment.put(segmentName, instanceStateMap); } else { - _consumingSegmentAssignment.put(entry.getKey(), instanceStateMap); + _offlineSegmentAssignment.put(segmentName, instanceStateMap); } } } @@ -272,5 +280,9 @@ class SegmentAssignmentUtils { Map<String, Map<String, String>> getConsumingSegmentAssignment() { return _consumingSegmentAssignment; } + + Map<String, Map<String, String>> getOfflineSegmentAssignment() { + return _offlineSegmentAssignment; + } } } diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeBalanceNumSegmentAssignmentStrategyTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeBalanceNumSegmentAssignmentStrategyTest.java index 87ef462..0aa454c 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeBalanceNumSegmentAssignmentStrategyTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeBalanceNumSegmentAssignmentStrategyTest.java @@ -188,6 +188,15 @@ public class RealtimeBalanceNumSegmentAssignmentStrategyTest { BaseConfiguration config = new BaseConfiguration(); config.setProperty(RebalanceUserConfigConstants.INCLUDE_CONSUMING, true); assertEquals(_strategy.rebalanceTable(currentAssignment, config), newAssignment); + + // Rebalance should not change the assignment for the OFFLINE segments + String offlineSegmentName = "offlineSegment"; + Map<String, String> offlineSegmentInstanceStateMap = SegmentAssignmentUtils + .getInstanceStateMap(SegmentAssignmentTestUtils.getNameList("badInstance_", NUM_REPLICAS), + RealtimeSegmentOnlineOfflineStateModel.OFFLINE); + currentAssignment.put(offlineSegmentName, offlineSegmentInstanceStateMap); + newAssignment.put(offlineSegmentName, offlineSegmentInstanceStateMap); + assertEquals(_strategy.rebalanceTable(currentAssignment, config), newAssignment); } private void addToAssignment(Map<String, Map<String, String>> currentAssignment, int segmentId, diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentStrategyTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentStrategyTest.java index c4c9a82..6d83cc6 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentStrategyTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentStrategyTest.java @@ -210,6 +210,15 @@ public class RealtimeReplicaGroupSegmentAssignmentStrategyTest { BaseConfiguration config = new BaseConfiguration(); config.setProperty(RebalanceUserConfigConstants.INCLUDE_CONSUMING, true); assertEquals(_strategy.rebalanceTable(currentAssignment, config), newAssignment); + + // Rebalance should not change the assignment for the OFFLINE segments + String offlineSegmentName = "offlineSegment"; + Map<String, String> offlineSegmentInstanceStateMap = SegmentAssignmentUtils + .getInstanceStateMap(SegmentAssignmentTestUtils.getNameList("badInstance_", NUM_REPLICAS), + RealtimeSegmentOnlineOfflineStateModel.OFFLINE); + currentAssignment.put(offlineSegmentName, offlineSegmentInstanceStateMap); + newAssignment.put(offlineSegmentName, offlineSegmentInstanceStateMap); + assertEquals(_strategy.rebalanceTable(currentAssignment, config), newAssignment); } private void addToAssignment(Map<String, Map<String, String>> currentAssignment, int segmentId, --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org