This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch rebalance_llc
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 82cbaca278eea6571f4cd44a2eff694b3f8dc4f3
Author: Jackie (Xiaotian) Jiang <xaji...@linkedin.com>
AuthorDate: Tue Jun 18 16:53:58 2019 -0700

    Take OFFLINE segment into account for real-time rebalancer
    
    When the consuming segment encounters error, the ideal state can be turned 
into OFFLINE
    If the instance states for a segment is all OFFLINE, the segment is counted 
OFFLINE and won't be rebalanced
---
 ...ealtimeBalanceNumSegmentAssignmentStrategy.java | 13 +++++++++----
 ...ltimeReplicaGroupSegmentAssignmentStrategy.java | 13 +++++++++----
 .../assignment/segment/SegmentAssignmentUtils.java | 22 +++++++++++++++++-----
 ...imeBalanceNumSegmentAssignmentStrategyTest.java |  9 +++++++++
 ...eReplicaGroupSegmentAssignmentStrategyTest.java |  9 +++++++++
 5 files changed, 53 insertions(+), 13 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeBalanceNumSegmentAssignmentStrategy.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeBalanceNumSegmentAssignmentStrategy.java
index 72bdf29..1395294 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeBalanceNumSegmentAssignmentStrategy.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeBalanceNumSegmentAssignmentStrategy.java
@@ -27,7 +27,6 @@ import org.apache.pinot.common.config.TableConfig;
 import 
org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.RealtimeSegmentOnlineOfflineStateModel;
 import org.apache.pinot.common.utils.InstancePartitionsType;
 import org.apache.pinot.common.utils.LLCSegmentName;
-import 
org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentUtils.CompletedConsumingSegmentAssignmentPair;
 import 
org.apache.pinot.controller.helix.core.rebalance.RebalanceUserConfigConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -95,10 +94,12 @@ public class RealtimeBalanceNumSegmentAssignmentStrategy 
implements SegmentAssig
   @Override
   public Map<String, Map<String, String>> rebalanceTable(Map<String, 
Map<String, String>> currentAssignment,
       Configuration config) {
-    CompletedConsumingSegmentAssignmentPair pair = new 
CompletedConsumingSegmentAssignmentPair(currentAssignment);
+    SegmentAssignmentUtils.CompletedConsumingOfflineSegmentAssignment 
completedConsumingOfflineSegmentAssignment =
+        new 
SegmentAssignmentUtils.CompletedConsumingOfflineSegmentAssignment(currentAssignment);
 
     // Rebalance COMPLETED segments first
-    Map<String, Map<String, String>> completedSegmentAssignment = 
pair.getCompletedSegmentAssignment();
+    Map<String, Map<String, String>> completedSegmentAssignment =
+        
completedConsumingOfflineSegmentAssignment.getCompletedSegmentAssignment();
     List<String> instancesForCompletedSegments = SegmentAssignmentUtils
         .getInstancesForBalanceNumStrategy(_helixManager, _tableConfig, 
_replication, InstancePartitionsType.COMPLETED);
     Map<String, Map<String, String>> newAssignment = SegmentAssignmentUtils
@@ -106,7 +107,8 @@ public class RealtimeBalanceNumSegmentAssignmentStrategy 
implements SegmentAssig
             _replication);
 
     // Rebalance CONSUMING segments if needed
-    Map<String, Map<String, String>> consumingSegmentAssignment = 
pair.getConsumingSegmentAssignment();
+    Map<String, Map<String, String>> consumingSegmentAssignment =
+        
completedConsumingOfflineSegmentAssignment.getConsumingSegmentAssignment();
     if (config.getBoolean(RebalanceUserConfigConstants.INCLUDE_CONSUMING,
         RebalanceUserConfigConstants.DEFAULT_INCLUDE_CONSUMING)) {
       List<String> instancesForConsumingSegments = SegmentAssignmentUtils
@@ -132,6 +134,9 @@ public class RealtimeBalanceNumSegmentAssignmentStrategy 
implements SegmentAssig
       newAssignment.putAll(consumingSegmentAssignment);
     }
 
+    // Keep the OFFLINE segments not moved
+    
newAssignment.putAll(completedConsumingOfflineSegmentAssignment.getOfflineSegmentAssignment());
+
     return newAssignment;
   }
 
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentStrategy.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentStrategy.java
index afd74be..3e80b44 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentStrategy.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentStrategy.java
@@ -90,11 +90,12 @@ public class RealtimeReplicaGroupSegmentAssignmentStrategy 
implements SegmentAss
   @Override
   public Map<String, Map<String, String>> rebalanceTable(Map<String, 
Map<String, String>> currentAssignment,
       Configuration config) {
-    SegmentAssignmentUtils.CompletedConsumingSegmentAssignmentPair pair =
-        new 
SegmentAssignmentUtils.CompletedConsumingSegmentAssignmentPair(currentAssignment);
+    SegmentAssignmentUtils.CompletedConsumingOfflineSegmentAssignment 
completedConsumingOfflineSegmentAssignment =
+        new 
SegmentAssignmentUtils.CompletedConsumingOfflineSegmentAssignment(currentAssignment);
 
     // Rebalance COMPLETED segments first
-    Map<String, Map<String, String>> completedSegmentAssignment = 
pair.getCompletedSegmentAssignment();
+    Map<String, Map<String, String>> completedSegmentAssignment =
+        
completedConsumingOfflineSegmentAssignment.getCompletedSegmentAssignment();
     InstancePartitions instancePartitionsForCompletedSegments = 
InstancePartitionsUtils
         .fetchOrComputeInstancePartitions(_helixManager, _tableConfig, 
InstancePartitionsType.COMPLETED);
     Map<Integer, Set<String>> partitionIdToSegmentsMap = new HashMap<>();
@@ -107,7 +108,8 @@ public class RealtimeReplicaGroupSegmentAssignmentStrategy 
implements SegmentAss
             partitionIdToSegmentsMap);
 
     // Rebalance CONSUMING segments if needed
-    Map<String, Map<String, String>> consumingSegmentAssignment = 
pair.getConsumingSegmentAssignment();
+    Map<String, Map<String, String>> consumingSegmentAssignment =
+        
completedConsumingOfflineSegmentAssignment.getConsumingSegmentAssignment();
     if (config.getBoolean(RebalanceUserConfigConstants.INCLUDE_CONSUMING,
         RebalanceUserConfigConstants.DEFAULT_INCLUDE_CONSUMING)) {
       InstancePartitions instancePartitionsForConsumingSegments = 
InstancePartitionsUtils
@@ -134,6 +136,9 @@ public class RealtimeReplicaGroupSegmentAssignmentStrategy 
implements SegmentAss
       newAssignment.putAll(consumingSegmentAssignment);
     }
 
+    // Keep the OFFLINE segments not moved
+    
newAssignment.putAll(completedConsumingOfflineSegmentAssignment.getOfflineSegmentAssignment());
+
     return newAssignment;
   }
 
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java
index 4333f91..0f3fe6f 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java
@@ -248,19 +248,27 @@ class SegmentAssignmentUtils {
   }
 
   /**
-   * Class that splits segment assignment into CONSUMING segments and 
COMPLETED segments.
+   * Class that splits segment assignment into COMPLETED, CONSUMING and 
OFFLINE segments.
    */
-  static class CompletedConsumingSegmentAssignmentPair {
+  static class CompletedConsumingOfflineSegmentAssignment {
     private final Map<String, Map<String, String>> _completedSegmentAssignment 
= new TreeMap<>();
     private final Map<String, Map<String, String>> _consumingSegmentAssignment 
= new TreeMap<>();
+    private final Map<String, Map<String, String>> _offlineSegmentAssignment = 
new TreeMap<>();
 
-    CompletedConsumingSegmentAssignmentPair(Map<String, Map<String, String>> 
segmentAssignment) {
+    // NOTE: split the segments based on the following criteria:
+    //       1. At least one instance ONLINE -> COMPLETED segment
+    //       2. At least one instance CONSUMING -> CONSUMING segment
+    //       3. All instances OFFLINE (all instances encountered error while 
consuming) -> OFFLINE segment
+    CompletedConsumingOfflineSegmentAssignment(Map<String, Map<String, 
String>> segmentAssignment) {
       for (Map.Entry<String, Map<String, String>> entry : 
segmentAssignment.entrySet()) {
+        String segmentName = entry.getKey();
         Map<String, String> instanceStateMap = entry.getValue();
         if 
(instanceStateMap.values().contains(RealtimeSegmentOnlineOfflineStateModel.ONLINE))
 {
-          _completedSegmentAssignment.put(entry.getKey(), instanceStateMap);
+          _completedSegmentAssignment.put(segmentName, instanceStateMap);
+        } else if 
(instanceStateMap.values().contains(RealtimeSegmentOnlineOfflineStateModel.CONSUMING))
 {
+          _consumingSegmentAssignment.put(segmentName, instanceStateMap);
         } else {
-          _consumingSegmentAssignment.put(entry.getKey(), instanceStateMap);
+          _offlineSegmentAssignment.put(segmentName, instanceStateMap);
         }
       }
     }
@@ -272,5 +280,9 @@ class SegmentAssignmentUtils {
     Map<String, Map<String, String>> getConsumingSegmentAssignment() {
       return _consumingSegmentAssignment;
     }
+
+    Map<String, Map<String, String>> getOfflineSegmentAssignment() {
+      return _offlineSegmentAssignment;
+    }
   }
 }
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeBalanceNumSegmentAssignmentStrategyTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeBalanceNumSegmentAssignmentStrategyTest.java
index 87ef462..0aa454c 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeBalanceNumSegmentAssignmentStrategyTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeBalanceNumSegmentAssignmentStrategyTest.java
@@ -188,6 +188,15 @@ public class 
RealtimeBalanceNumSegmentAssignmentStrategyTest {
     BaseConfiguration config = new BaseConfiguration();
     config.setProperty(RebalanceUserConfigConstants.INCLUDE_CONSUMING, true);
     assertEquals(_strategy.rebalanceTable(currentAssignment, config), 
newAssignment);
+
+    // Rebalance should not change the assignment for the OFFLINE segments
+    String offlineSegmentName = "offlineSegment";
+    Map<String, String> offlineSegmentInstanceStateMap = SegmentAssignmentUtils
+        
.getInstanceStateMap(SegmentAssignmentTestUtils.getNameList("badInstance_", 
NUM_REPLICAS),
+            RealtimeSegmentOnlineOfflineStateModel.OFFLINE);
+    currentAssignment.put(offlineSegmentName, offlineSegmentInstanceStateMap);
+    newAssignment.put(offlineSegmentName, offlineSegmentInstanceStateMap);
+    assertEquals(_strategy.rebalanceTable(currentAssignment, config), 
newAssignment);
   }
 
   private void addToAssignment(Map<String, Map<String, String>> 
currentAssignment, int segmentId,
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentStrategyTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentStrategyTest.java
index c4c9a82..6d83cc6 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentStrategyTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentStrategyTest.java
@@ -210,6 +210,15 @@ public class 
RealtimeReplicaGroupSegmentAssignmentStrategyTest {
     BaseConfiguration config = new BaseConfiguration();
     config.setProperty(RebalanceUserConfigConstants.INCLUDE_CONSUMING, true);
     assertEquals(_strategy.rebalanceTable(currentAssignment, config), 
newAssignment);
+
+    // Rebalance should not change the assignment for the OFFLINE segments
+    String offlineSegmentName = "offlineSegment";
+    Map<String, String> offlineSegmentInstanceStateMap = SegmentAssignmentUtils
+        
.getInstanceStateMap(SegmentAssignmentTestUtils.getNameList("badInstance_", 
NUM_REPLICAS),
+            RealtimeSegmentOnlineOfflineStateModel.OFFLINE);
+    currentAssignment.put(offlineSegmentName, offlineSegmentInstanceStateMap);
+    newAssignment.put(offlineSegmentName, offlineSegmentInstanceStateMap);
+    assertEquals(_strategy.rebalanceTable(currentAssignment, config), 
newAssignment);
   }
 
   private void addToAssignment(Map<String, Map<String, String>> 
currentAssignment, int segmentId,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to