Jackie-Jiang commented on a change in pull request #4269: Add interface and 
implementations for the new segment assignment
URL: https://github.com/apache/incubator-pinot/pull/4269#discussion_r292716220
 
 

 ##########
 File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java
 ##########
 @@ -0,0 +1,274 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.assignment.segment;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.Set;
+import java.util.TreeMap;
+import org.apache.helix.HelixManager;
+import org.apache.helix.controller.rebalancer.strategy.AutoRebalanceStrategy;
+import org.apache.pinot.common.config.TableConfig;
+import 
org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.RealtimeSegmentOnlineOfflineStateModel;
+import 
org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentOnlineOfflineStateModel;
+import org.apache.pinot.common.utils.InstancePartitionsType;
+import org.apache.pinot.common.utils.Pairs;
+import org.apache.pinot.controller.helix.core.assignment.InstancePartitions;
+import 
org.apache.pinot.controller.helix.core.assignment.InstancePartitionsUtils;
+
+
+/**
+ * Utility class for segment assignment.
+ */
+class SegmentAssignmentUtils {
+  private SegmentAssignmentUtils() {
+  }
+
+  /**
+   * Returns the number of segments assigned to each instance.
+   */
+  static int[] getNumSegmentsAssigned(Map<String, Map<String, String>> 
segmentAssignment, List<String> instances) {
+    int[] numSegmentsPerInstance = new int[instances.size()];
+    Map<String, Integer> instanceNameToIdMap = 
getInstanceNameToIdMap(instances);
+    for (Map<String, String> instanceStateMep : segmentAssignment.values()) {
+      for (String instanceName : instanceStateMep.keySet()) {
+        Integer instanceId = instanceNameToIdMap.get(instanceName);
+        if (instanceId != null) {
+          numSegmentsPerInstance[instanceId]++;
+        }
+      }
+    }
+    return numSegmentsPerInstance;
+  }
+
+  private static Map<String, Integer> getInstanceNameToIdMap(List<String> 
instances) {
+    int numInstances = instances.size();
+    Map<String, Integer> instanceNameToIdMap = new HashMap<>();
+    for (int i = 0; i < numInstances; i++) {
+      instanceNameToIdMap.put(instances.get(i), i);
+    }
+    return instanceNameToIdMap;
+  }
+
+  /**
+   * Returns the instances for the balance number segment assignment strategy.
+   */
+  static List<String> getInstances(HelixManager helixManager, TableConfig 
tableConfig, int replication,
+      InstancePartitionsType instancePartitionsType) {
+    InstancePartitions instancePartitions =
+        InstancePartitionsUtils.fetchOrComputeInstancePartitions(helixManager, 
tableConfig, instancePartitionsType);
+    Preconditions.checkArgument(instancePartitions.getNumPartitions() == 1 && 
instancePartitions.getNumReplicas() == 1,
+        "The instance partitions: %s should contain only 1 partition and 1 
replica", instancePartitions.getName());
+    List<String> instances = instancePartitions.getInstances(0, 0);
+    Preconditions.checkState(instances.size() >= replication,
+        "There are less instances: %d than the replication: %d for table: %s", 
instances.size(), replication,
+        tableConfig.getTableName());
+    return instances;
+  }
+
+  /**
+   * Rebalances the table with Helix AutoRebalanceStrategy for the balance 
number segment assignment strategy.
+   */
+  static Map<String, Map<String, String>> 
rebalanceTableWithHelixAutoRebalanceStrategy(
+      Map<String, Map<String, String>> currentAssignment, List<String> 
instances, int replication) {
+    // Use Helix AutoRebalanceStrategy to rebalance the table
+    LinkedHashMap<String, Integer> states = new LinkedHashMap<>();
+    states.put(SegmentOnlineOfflineStateModel.ONLINE, replication);
+    AutoRebalanceStrategy autoRebalanceStrategy =
+        new AutoRebalanceStrategy(null, new 
ArrayList<>(currentAssignment.keySet()), states);
+    // Make a copy of the current assignment because this step might change 
the passed in assignment
+    Map<String, Map<String, String>> currentAssignmentCopy = new TreeMap<>();
+    for (Map.Entry<String, Map<String, String>> entry : 
currentAssignment.entrySet()) {
+      String segmentName = entry.getKey();
+      Map<String, String> instanceStateMap = entry.getValue();
+      currentAssignmentCopy.put(segmentName, new TreeMap<>(instanceStateMap));
+    }
+    return autoRebalanceStrategy.computePartitionAssignment(instances, 
instances, currentAssignmentCopy, null)
+        .getMapFields();
+  }
+
+  /**
+   * Rebalances the table for the replica-group based segment assignment 
strategy.
+   * <p>The number of partitions for the segments can be different from the 
number of partitions in the instance
+   * partitions. Uniformly spray the segment partitions over the instance 
partitions.
+   */
+  static Map<String, Map<String, String>> rebalanceReplicaGroupBasedTable(
+      Map<String, Map<String, String>> currentAssignment, InstancePartitions 
instancePartitions,
+      Map<Integer, Set<String>> partitionIdToSegmentsMap) {
+    Map<String, Map<String, String>> newAssignment = new TreeMap<>();
+    int numPartitions = instancePartitions.getNumPartitions();
+    for (Map.Entry<Integer, Set<String>> entry : 
partitionIdToSegmentsMap.entrySet()) {
+      // Uniformly spray the segment partitions over the instance partitions
+      int partitionId = entry.getKey() % numPartitions;
+      SegmentAssignmentUtils
+          .rebalanceReplicaGroupBasedPartition(currentAssignment, 
instancePartitions, partitionId, entry.getValue(),
+              newAssignment);
+    }
+    return newAssignment;
+  }
+
+  /**
+   * Rebalances one partition of the table for the replica-group based segment 
assignment strategy.
+   * <ul>
+   *   <li>
+   *     1. Calculate the target number of segments on each server
+   *   </li>
+   *   <li>
+   *     2. Loop over all the segments and keep the assignment if target 
number of segments for the server has not been
+   *     reached and track the not assigned segments
+   *   </li>
+   *   <li>
+   *     3. Assign the left-over segments to the servers with the least 
segments, or the smallest index if there is a
+   *     tie
+   *   </li>
+   *   <li>
+   *     4. Mirror the assignment to other replicas
+   *   </li>
+   * </ul>
+   */
+  static void rebalanceReplicaGroupBasedPartition(Map<String, Map<String, 
String>> currentAssignment,
+      InstancePartitions instancePartitions, int partitionId, Set<String> 
segments,
+      Map<String, Map<String, String>> newAssignment) {
+    // Fetch instances in replica 0
+    List<String> instances = instancePartitions.getInstances(partitionId, 0);
+    Map<String, Integer> instanceNameToIdMap = 
SegmentAssignmentUtils.getInstanceNameToIdMap(instances);
+
+    // Calculate target number of segments per instance
+    int numInstances = instances.size();
+    int numSegments = segments.size();
+    int targetNumSegmentsPerInstance = (numSegments + numInstances - 1) / 
numInstances;
 
 Review comment:
   This is equivalent to Math.ceil() for integer. E.g. for 7 segments, 3 
servers, we will target 3 segments per server. Will add comment

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to