This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push: new 2e543e9 Add interface and implementations for the new segment assignment (#4269) 2e543e9 is described below commit 2e543e992f842918e2405c276b041d03b0958bf6 Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com> AuthorDate: Fri Jun 14 13:46:24 2019 -0700 Add interface and implementations for the new segment assignment (#4269) Added InstancePartitions implementations Added InstancePartitionsUtils to fetch/persist it to ZK and also handle backward-compatibility if it does not exist in ZK Added SegmentAssignmentStrategy to handle both segment assignment and table rebalance Implemented the balance-number, replica-group segment assignment strategy for both OFFLINE and REALTIME tables that works on InstancePartitions There is no change to the persistent state --- .../pinot/common/metadata/ZKMetadataProvider.java | 4 +- .../pinot/common/utils/InstancePartitionsType.java | 37 ++ .../java/org/apache/pinot/common/utils/Pairs.java | 37 +- .../helix/core/assignment/InstancePartitions.java | 99 +++++ .../core/assignment/InstancePartitionsUtils.java | 97 +++++ ...OfflineBalanceNumSegmentAssignmentStrategy.java | 96 +++++ ...flineReplicaGroupSegmentAssignmentStrategy.java | 200 ++++++++++ ...ealtimeBalanceNumSegmentAssignmentStrategy.java | 155 ++++++++ ...ltimeReplicaGroupSegmentAssignmentStrategy.java | 159 ++++++++ .../segment/SegmentAssignmentStrategy.java | 59 +++ .../segment/SegmentAssignmentStrategyFactory.java | 57 +++ .../assignment/segment/SegmentAssignmentUtils.java | 276 +++++++++++++ ...ineBalanceNumSegmentAssignmentStrategyTest.java | 137 +++++++ ...eReplicaGroupSegmentAssignmentStrategyTest.java | 289 ++++++++++++++ ...imeBalanceNumSegmentAssignmentStrategyTest.java | 208 ++++++++++ ...eReplicaGroupSegmentAssignmentStrategyTest.java | 230 +++++++++++ .../segment/SegmentAssignmentTestUtils.java | 39 ++ .../segment/SegmentAssignmentUtilsTest.java | 434 +++++++++++++++++++++ 18 files changed, 2594 insertions(+), 19 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java b/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java index 0915119..7cf7448 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java @@ -94,8 +94,8 @@ public class ZKMetadataProvider { return StringUtil.join("/", PROPERTYSTORE_SCHEMAS_PREFIX, schemaName); } - public static String constructPropertyStorePathForInstancePartitions(String tableNameWithType) { - return StringUtil.join("/", PROPERTYSTORE_INSTANCE_PARTITIONS_PREFIX, tableNameWithType); + public static String constructPropertyStorePathForInstancePartitions(String instancePartitionsName) { + return StringUtil.join("/", PROPERTYSTORE_INSTANCE_PARTITIONS_PREFIX, instancePartitionsName); } public static String constructPropertyStorePathForResource(String resourceName) { diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/InstancePartitionsType.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/InstancePartitionsType.java new file mode 100644 index 0000000..90e42a0 --- /dev/null +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/InstancePartitionsType.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.common.utils; + +/** + * The type of the instance partitions. + * <p> + * The instance partitions name will be of the format {@code <rawTableName>_<type>}, e.g. {@code table_OFFLINE}, + * {@code table_CONSUMING}, {@code table_COMPLETED}. + */ +public enum InstancePartitionsType { + OFFLINE, // For segments from offline table// For consuming segments from real-time table + CONSUMING, // For consuming segments from real-time table + COMPLETED; // For completed segments from real-time table + + public static final char TYPE_SUFFIX_SEPARATOR = '_'; + + public String getInstancePartitionsName(String rawTableName) { + return rawTableName + TYPE_SUFFIX_SEPARATOR + name(); + } +} diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/Pairs.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/Pairs.java index c67bca3..adb95e7 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/Pairs.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/Pairs.java @@ -32,46 +32,45 @@ public class Pairs { } public static class IntPair { - int a; + private int _left; + private int _right; - int b; - - public IntPair(int a, int b) { - this.a = a; - this.b = b; + public IntPair(int left, int right) { + _left = left; + _right = right; } public int getLeft() { - return a; + return _left; } public int getRight() { - return b; + return _right; } - public void setLeft(int a) { - this.a = a; + public void setLeft(int left) { + _left = left; } - public void setRight(int b) { - this.b = b; + public void setRight(int right) { + _right = right; } @Override public String toString() { - return "[" + a + "," + b + "]"; + return "[" + _left + "," + _right + "]"; } @Override public int hashCode() { - return toString().hashCode(); + return 37 * _left + _right; } @Override public boolean equals(Object obj) { if (obj instanceof IntPair) { IntPair that = (IntPair) obj; - return obj != null && a == (that.a) && b == that.b; + return _left == that._left && _right == that._right; } return false; } @@ -80,8 +79,12 @@ public class Pairs { public static class AscendingIntPairComparator implements Comparator<IntPair> { @Override - public int compare(IntPair o1, IntPair o2) { - return Integer.compare(o1.a, o2.a); + public int compare(IntPair pair1, IntPair pair2) { + if (pair1._left != pair2._left) { + return Integer.compare(pair1._left, pair2._left); + } else { + return Integer.compare(pair1._right, pair2._right); + } } } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/InstancePartitions.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/InstancePartitions.java new file mode 100644 index 0000000..6d9a35f --- /dev/null +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/InstancePartitions.java @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.helix.core.assignment; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import org.apache.helix.ZNRecord; + + +/** + * Instance (server) partitions for the table. + * + * <p>The instance partitions is stored as a map from partition of the format: {@code <partitionId>_<replicaId>} to + * list of server instances, and is persisted under the ZK path: {@code <cluster>/PROPERTYSTORE/INSTANCE_PARTITIONS}. + * <p>The segment assignment will be based on the instance partitions of the table. + */ +public class InstancePartitions { + private static final char PARTITION_REPLICA_SEPARATOR = '_'; + + // Name will be of the format "<rawTableName>_<type>", e.g. "table_OFFLINE", "table_CONSUMING", "table_COMPLETED" + private final String _name; + private final Map<String, List<String>> _partitionToInstancesMap; + private int _numPartitions; + private int _numReplicas; + + public InstancePartitions(String name) { + _name = name; + _partitionToInstancesMap = new TreeMap<>(); + } + + private InstancePartitions(String name, Map<String, List<String>> partitionToInstancesMap) { + _name = name; + _partitionToInstancesMap = partitionToInstancesMap; + for (String key : partitionToInstancesMap.keySet()) { + int splitterIndex = key.indexOf(PARTITION_REPLICA_SEPARATOR); + int partition = Integer.parseInt(key.substring(0, splitterIndex)); + int replica = Integer.parseInt(key.substring(splitterIndex + 1)); + _numPartitions = Integer.max(_numPartitions, partition + 1); + _numReplicas = Integer.max(_numReplicas, replica + 1); + } + } + + public String getName() { + return _name; + } + + public Map<String, List<String>> getPartitionToInstancesMap() { + return _partitionToInstancesMap; + } + + @JsonIgnore + public int getNumPartitions() { + return _numPartitions; + } + + @JsonIgnore + public int getNumReplicas() { + return _numReplicas; + } + + public List<String> getInstances(int partitionId, int replicaId) { + return _partitionToInstancesMap.get(Integer.toString(partitionId) + PARTITION_REPLICA_SEPARATOR + replicaId); + } + + public void setInstances(int partitionId, int replicaId, List<String> instances) { + String key = Integer.toString(partitionId) + PARTITION_REPLICA_SEPARATOR + replicaId; + _partitionToInstancesMap.put(key, instances); + _numPartitions = Integer.max(_numPartitions, partitionId + 1); + _numReplicas = Integer.max(_numReplicas, replicaId + 1); + } + + public static InstancePartitions fromZNRecord(ZNRecord znRecord) { + return new InstancePartitions(znRecord.getId(), znRecord.getListFields()); + } + + public ZNRecord toZNRecord() { + ZNRecord znRecord = new ZNRecord(_name); + znRecord.setListFields(_partitionToInstancesMap); + return znRecord; + } +} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/InstancePartitionsUtils.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/InstancePartitionsUtils.java new file mode 100644 index 0000000..4121246 --- /dev/null +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/InstancePartitionsUtils.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.helix.core.assignment; + +import java.util.Collections; +import java.util.List; +import org.apache.helix.AccessOption; +import org.apache.helix.HelixManager; +import org.apache.helix.ZNRecord; +import org.apache.helix.model.InstanceConfig; +import org.apache.helix.store.HelixPropertyStore; +import org.apache.pinot.common.config.OfflineTagConfig; +import org.apache.pinot.common.config.RealtimeTagConfig; +import org.apache.pinot.common.config.TableConfig; +import org.apache.pinot.common.config.TableNameBuilder; +import org.apache.pinot.common.metadata.ZKMetadataProvider; +import org.apache.pinot.common.utils.InstancePartitionsType; +import org.apache.pinot.common.utils.helix.HelixHelper; + + +/** + * Utility class for instance partitions. + */ +public class InstancePartitionsUtils { + private InstancePartitionsUtils() { + } + + /** + * Fetches the instance partitions from Helix property store if exists, or computes it for backward compatibility. + */ + public static InstancePartitions fetchOrComputeInstancePartitions(HelixManager helixManager, TableConfig tableConfig, + InstancePartitionsType instancePartitionsType) { + String tableNameWithType = tableConfig.getTableName(); + String instancePartitionsName = + instancePartitionsType.getInstancePartitionsName(TableNameBuilder.extractRawTableName(tableNameWithType)); + + // Fetch the instance partitions from property store if exists + String path = ZKMetadataProvider.constructPropertyStorePathForInstancePartitions(instancePartitionsName); + ZNRecord znRecord = helixManager.getHelixPropertyStore().get(path, null, AccessOption.PERSISTENT); + if (znRecord != null) { + return InstancePartitions.fromZNRecord(znRecord); + } + + // Compute the instance partitions (for backward compatible) + // Sort all enabled instances with the server tag, rotate the list based on the table name to prevent creating + // hotspot servers + InstancePartitions instancePartitions = new InstancePartitions(instancePartitionsName); + List<InstanceConfig> instanceConfigs = HelixHelper.getInstanceConfigs(helixManager); + String serverTag; + switch (instancePartitionsType) { + case OFFLINE: + serverTag = new OfflineTagConfig(tableConfig).getOfflineServerTag(); + break; + case CONSUMING: + serverTag = new RealtimeTagConfig(tableConfig).getConsumingServerTag(); + break; + case COMPLETED: + serverTag = new RealtimeTagConfig(tableConfig).getCompletedServerTag(); + break; + default: + throw new IllegalArgumentException(); + } + List<String> instances = HelixHelper.getEnabledInstancesWithTag(instanceConfigs, serverTag); + instances.sort(null); + int numInstances = instances.size(); + Collections.rotate(instances, -(Math.abs(tableNameWithType.hashCode()) % numInstances)); + instancePartitions.setInstances(0, 0, instances); + return instancePartitions; + } + + /** + * Persists the instance partitions to Helix property store. + * + * @return true if the instance partitions was successfully persisted, false otherwise + */ + public static boolean persistInstancePartitions(HelixPropertyStore<ZNRecord> propertyStore, + InstancePartitions instancePartitions) { + String path = ZKMetadataProvider.constructPropertyStorePathForInstancePartitions(instancePartitions.getName()); + return propertyStore.set(path, instancePartitions.toZNRecord(), AccessOption.PERSISTENT); + } +} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineBalanceNumSegmentAssignmentStrategy.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineBalanceNumSegmentAssignmentStrategy.java new file mode 100644 index 0000000..4bd34b5 --- /dev/null +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineBalanceNumSegmentAssignmentStrategy.java @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.helix.core.assignment.segment; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.PriorityQueue; +import org.apache.commons.configuration.Configuration; +import org.apache.helix.HelixManager; +import org.apache.pinot.common.config.TableConfig; +import org.apache.pinot.common.utils.InstancePartitionsType; +import org.apache.pinot.common.utils.Pairs; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Segment assignment strategy for offline segments that assigns segment to the instance with the least number of + * segments. In case of a tie, assigns to the instance with the smallest index in the list. The strategy ensures that + * replicas of the same segment are not assigned to the same server. + * <p>To rebalance a table, use Helix AutoRebalanceStrategy. + */ +public class OfflineBalanceNumSegmentAssignmentStrategy implements SegmentAssignmentStrategy { + private static final Logger LOGGER = LoggerFactory.getLogger(OfflineBalanceNumSegmentAssignmentStrategy.class); + + private HelixManager _helixManager; + private TableConfig _tableConfig; + private String _tableNameWithType; + private int _replication; + + @Override + public void init(HelixManager helixManager, TableConfig tableConfig) { + _helixManager = helixManager; + _tableConfig = tableConfig; + _tableNameWithType = tableConfig.getTableName(); + _replication = tableConfig.getValidationConfig().getReplicationNumber(); + + LOGGER.info("Initialized OfflineBalanceNumSegmentAssignmentStrategy for table: {} with replication: {}", + _tableNameWithType, _replication); + } + + @Override + public List<String> assignSegment(String segmentName, Map<String, Map<String, String>> currentAssignment) { + List<String> instances = SegmentAssignmentUtils + .getInstancesForBalanceNumStrategy(_helixManager, _tableConfig, _replication, InstancePartitionsType.OFFLINE); + int[] numSegmentsAssignedPerInstance = + SegmentAssignmentUtils.getNumSegmentsAssignedPerInstance(currentAssignment, instances); + + // Assign the segment to the instance with the least segments, or the smallest id if there is a tie + int numInstances = numSegmentsAssignedPerInstance.length; + PriorityQueue<Pairs.IntPair> heap = new PriorityQueue<>(numInstances, Pairs.intPairComparator()); + for (int instanceId = 0; instanceId < numInstances; instanceId++) { + heap.add(new Pairs.IntPair(numSegmentsAssignedPerInstance[instanceId], instanceId)); + } + List<String> instancesAssigned = new ArrayList<>(_replication); + for (int i = 0; i < _replication; i++) { + instancesAssigned.add(instances.get(heap.remove().getRight())); + } + + LOGGER.info("Assigned segment: {} to instances: {} for table: {}", segmentName, instancesAssigned, + _tableNameWithType); + return instancesAssigned; + } + + @Override + public Map<String, Map<String, String>> rebalanceTable(Map<String, Map<String, String>> currentAssignment, + Configuration config) { + List<String> instances = SegmentAssignmentUtils + .getInstancesForBalanceNumStrategy(_helixManager, _tableConfig, _replication, InstancePartitionsType.OFFLINE); + Map<String, Map<String, String>> newAssignment = + SegmentAssignmentUtils.rebalanceTableWithHelixAutoRebalanceStrategy(currentAssignment, instances, _replication); + + LOGGER.info( + "Rebalanced {} segments to instances: {} for table: {} with replication: {}, number of segments to be moved to each instance: {}", + currentAssignment.size(), instances, _tableNameWithType, _replication, + SegmentAssignmentUtils.getNumSegmentsToBeMovedPerInstance(currentAssignment, newAssignment)); + return newAssignment; + } +} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineReplicaGroupSegmentAssignmentStrategy.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineReplicaGroupSegmentAssignmentStrategy.java new file mode 100644 index 0000000..8124d30 --- /dev/null +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineReplicaGroupSegmentAssignmentStrategy.java @@ -0,0 +1,200 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.helix.core.assignment.segment; + +import com.google.common.base.Preconditions; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import org.apache.commons.configuration.Configuration; +import org.apache.helix.HelixManager; +import org.apache.pinot.common.config.ReplicaGroupStrategyConfig; +import org.apache.pinot.common.config.TableConfig; +import org.apache.pinot.common.metadata.ZKMetadataProvider; +import org.apache.pinot.common.metadata.segment.ColumnPartitionMetadata; +import org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata; +import org.apache.pinot.common.utils.InstancePartitionsType; +import org.apache.pinot.controller.helix.core.assignment.InstancePartitions; +import org.apache.pinot.controller.helix.core.assignment.InstancePartitionsUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Segment assignment strategy for offline segments that assigns segment to the instance in the replica with the least + * number of segments. + * <p>Among multiple replicas, always mirror the assignment (pick the same index of the instance). + * <p>Inside each partition, assign the segment to the servers with the least segments already assigned. In case of a + * tie, assign to the server with the smallest index in the list. Do this for one replica and mirror the assignment to + * other replicas. + * <p>To rebalance a table, inside each partition, first calculate the number of segments on each server, loop over all + * the segments and keep the assignment if number of segments for the server has not been reached and track the not + * assigned segments, then assign the left-over segments to the servers with the least segments, or the smallest index + * if there is a tie. Repeat the process for all the partitions in one replica, and mirror the assignment to other + * replicas. With this greedy algorithm, the result is deterministic and with minimum segment moves. + */ +public class OfflineReplicaGroupSegmentAssignmentStrategy implements SegmentAssignmentStrategy { + private static final Logger LOGGER = LoggerFactory.getLogger(OfflineReplicaGroupSegmentAssignmentStrategy.class); + + private HelixManager _helixManager; + private TableConfig _tableConfig; + private String _tableNameWithType; + private String _partitionColumn; + + @Override + public void init(HelixManager helixManager, TableConfig tableConfig) { + _helixManager = helixManager; + _tableConfig = tableConfig; + _tableNameWithType = tableConfig.getTableName(); + ReplicaGroupStrategyConfig strategyConfig = tableConfig.getValidationConfig().getReplicaGroupStrategyConfig(); + _partitionColumn = strategyConfig != null ? strategyConfig.getPartitionColumn() : null; + + if (_partitionColumn == null) { + LOGGER.info("Initialized OfflineReplicaGroupSegmentAssignmentStrategy for table: {} without partition column", + _tableNameWithType); + } else { + LOGGER.info("Initialized OfflineReplicaGroupSegmentAssignmentStrategy for table: {} with partition column: {}", + _tableNameWithType, _partitionColumn); + } + } + + @Override + public List<String> assignSegment(String segmentName, Map<String, Map<String, String>> currentAssignment) { + InstancePartitions instancePartitions = InstancePartitionsUtils + .fetchOrComputeInstancePartitions(_helixManager, _tableConfig, InstancePartitionsType.OFFLINE); + + // Fetch partition id from segment ZK metadata if partition column is configured + int partitionId = 0; + if (_partitionColumn == null) { + Preconditions.checkState(instancePartitions.getNumPartitions() == 1, + "The instance partitions: %s should contain only 1 partition", instancePartitions.getName()); + } else { + OfflineSegmentZKMetadata segmentZKMetadata = ZKMetadataProvider + .getOfflineSegmentZKMetadata(_helixManager.getHelixPropertyStore(), _tableNameWithType, segmentName); + Preconditions + .checkState(segmentZKMetadata != null, "Failed to fetch segment ZK metadata for table: %s, segment: %s", + _tableNameWithType, segmentName); + // Uniformly spray the segment partitions over the instance partitions + partitionId = getPartitionId(segmentZKMetadata) % instancePartitions.getNumPartitions(); + } + + // First assign the segment to replica 0 + List<String> instances = instancePartitions.getInstances(partitionId, 0); + int[] numSegmentsAssignedPerInstance = + SegmentAssignmentUtils.getNumSegmentsAssignedPerInstance(currentAssignment, instances); + int minNumSegmentsAssigned = numSegmentsAssignedPerInstance[0]; + int instanceIdWithLeastSegmentsAssigned = 0; + int numInstances = numSegmentsAssignedPerInstance.length; + for (int instanceId = 1; instanceId < numInstances; instanceId++) { + if (numSegmentsAssignedPerInstance[instanceId] < minNumSegmentsAssigned) { + minNumSegmentsAssigned = numSegmentsAssignedPerInstance[instanceId]; + instanceIdWithLeastSegmentsAssigned = instanceId; + } + } + + // Mirror the assignment to all replicas + int numReplicas = instancePartitions.getNumReplicas(); + List<String> instancesAssigned = new ArrayList<>(numReplicas); + for (int replicaId = 0; replicaId < numReplicas; replicaId++) { + instancesAssigned + .add(instancePartitions.getInstances(partitionId, replicaId).get(instanceIdWithLeastSegmentsAssigned)); + } + + if (_partitionColumn == null) { + LOGGER.info("Assigned segment: {} to instances: {} for table: {}", segmentName, instancesAssigned, + _tableNameWithType); + } else { + LOGGER.info("Assigned segment: {} with partition id: {} to instances: {} for table: {}", segmentName, partitionId, + instancesAssigned, _tableNameWithType); + } + return instancesAssigned; + } + + @Override + public Map<String, Map<String, String>> rebalanceTable(Map<String, Map<String, String>> currentAssignment, + Configuration config) { + InstancePartitions instancePartitions = InstancePartitionsUtils + .fetchOrComputeInstancePartitions(_helixManager, _tableConfig, InstancePartitionsType.OFFLINE); + if (_partitionColumn == null) { + return rebalanceTableWithoutPartition(currentAssignment, instancePartitions); + } else { + return rebalanceTableWithPartition(currentAssignment, instancePartitions); + } + } + + private Map<String, Map<String, String>> rebalanceTableWithoutPartition( + Map<String, Map<String, String>> currentAssignment, InstancePartitions instancePartitions) { + Preconditions.checkState(instancePartitions.getNumPartitions() == 1, + "The instance partitions: %s should contain only 1 partition", instancePartitions.getName()); + + Map<String, Map<String, String>> newAssignment = new TreeMap<>(); + SegmentAssignmentUtils + .rebalanceReplicaGroupBasedPartition(currentAssignment, instancePartitions, 0, currentAssignment.keySet(), + newAssignment); + + LOGGER.info( + "Rebalanced {} segments with instance partitions: {} for table: {} without partition column, number of segments to be moved to each instance: {}", + currentAssignment.size(), instancePartitions.getPartitionToInstancesMap(), _tableNameWithType, + SegmentAssignmentUtils.getNumSegmentsToBeMovedPerInstance(currentAssignment, newAssignment)); + return newAssignment; + } + + private Map<String, Map<String, String>> rebalanceTableWithPartition( + Map<String, Map<String, String>> currentAssignment, InstancePartitions instancePartitions) { + // Fetch partition id from segment ZK metadata + List<OfflineSegmentZKMetadata> segmentZKMetadataList = ZKMetadataProvider + .getOfflineSegmentZKMetadataListForTable(_helixManager.getHelixPropertyStore(), _tableNameWithType); + Map<String, OfflineSegmentZKMetadata> segmentZKMetadataMap = new HashMap<>(); + for (OfflineSegmentZKMetadata segmentZKMetadata : segmentZKMetadataList) { + segmentZKMetadataMap.put(segmentZKMetadata.getSegmentName(), segmentZKMetadata); + } + Map<Integer, Set<String>> partitionIdToSegmentsMap = new HashMap<>(); + for (String segmentName : currentAssignment.keySet()) { + int partitionId = getPartitionId(segmentZKMetadataMap.get(segmentName)); + partitionIdToSegmentsMap.computeIfAbsent(partitionId, k -> new HashSet<>()).add(segmentName); + } + + Map<String, Map<String, String>> newAssignment = SegmentAssignmentUtils + .rebalanceReplicaGroupBasedTable(currentAssignment, instancePartitions, partitionIdToSegmentsMap); + + LOGGER.info( + "Rebalanced {} segments with instance partitions: {} for table: {} with partition column: {}, number of segments to be moved to each instance: {}", + currentAssignment.size(), instancePartitions.getPartitionToInstancesMap(), _tableNameWithType, _partitionColumn, + SegmentAssignmentUtils.getNumSegmentsToBeMovedPerInstance(currentAssignment, newAssignment)); + return newAssignment; + } + + private int getPartitionId(OfflineSegmentZKMetadata segmentZKMetadata) { + String segmentName = segmentZKMetadata.getSegmentName(); + ColumnPartitionMetadata partitionMetadata = + segmentZKMetadata.getPartitionMetadata().getColumnPartitionMap().get(_partitionColumn); + Preconditions.checkState(partitionMetadata != null, + "Segment ZK metadata for table: %s, segment: %s does not contain partition metadata for column: %s", + _tableNameWithType, segmentName, _partitionColumn); + Set<Integer> partitions = partitionMetadata.getPartitions(); + Preconditions.checkState(partitions.size() == 1, + "Segment ZK metadata for table: %s, segment: %s contains multiple partitions for column: %s", + _tableNameWithType, segmentName, _partitionColumn); + return partitions.iterator().next(); + } +} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeBalanceNumSegmentAssignmentStrategy.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeBalanceNumSegmentAssignmentStrategy.java new file mode 100644 index 0000000..72bdf29 --- /dev/null +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeBalanceNumSegmentAssignmentStrategy.java @@ -0,0 +1,155 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.helix.core.assignment.segment; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import org.apache.commons.configuration.Configuration; +import org.apache.helix.HelixManager; +import org.apache.pinot.common.config.TableConfig; +import org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.RealtimeSegmentOnlineOfflineStateModel; +import org.apache.pinot.common.utils.InstancePartitionsType; +import org.apache.pinot.common.utils.LLCSegmentName; +import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentUtils.CompletedConsumingSegmentAssignmentPair; +import org.apache.pinot.controller.helix.core.rebalance.RebalanceUserConfigConstants; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Segment assignment strategy for LLC real-time segments without replica-group. + * <ul> + * <li> + * For the CONSUMING segments, it is very similar to replica-group based segment assignment with the following + * differences: + * <ul> + * <li> + * 1. Within a replica, all segments of a partition (steam partition) always exist in exactly one one server + * </li> + * <li> + * 2. Partition id for an instance is derived from the index of the instance, instead of explicitly stored in + * the instance partitions + * </li> + * <li> + * 3. In addition to the ONLINE segments, there are also CONSUMING segments to be assigned + * </li> + * </ul> + * Since within a replica, each partition contains only one server, we can directly assign or rebalance the + * CONSUMING segments to the servers based on the partition id. + * <p>The strategy does not minimize segment movements for CONSUMING segments because within a replica, the server + * is fixed for each partition. The instance assignment is responsible for keeping minimum changes to the instance + * partitions to reduce the number of segments need to be moved. + * </li> + * <li> + * For the COMPLETED segments, rebalance segments the same way as OfflineBalanceNumSegmentAssignmentStrategy. + * </li> + * </ul> + */ +public class RealtimeBalanceNumSegmentAssignmentStrategy implements SegmentAssignmentStrategy { + private static final Logger LOGGER = LoggerFactory.getLogger(RealtimeBalanceNumSegmentAssignmentStrategy.class); + + private HelixManager _helixManager; + private TableConfig _tableConfig; + private String _tableNameWithType; + private int _replication; + + @Override + public void init(HelixManager helixManager, TableConfig tableConfig) { + _helixManager = helixManager; + _tableConfig = tableConfig; + _tableNameWithType = tableConfig.getTableName(); + _replication = tableConfig.getValidationConfig().getReplicasPerPartitionNumber(); + + LOGGER.info("Initialized RealtimeBalanceNumSegmentAssignmentStrategy for table: {} with replication: {}", + _tableNameWithType, _replication); + } + + @Override + public List<String> assignSegment(String segmentName, Map<String, Map<String, String>> currentAssignment) { + List<String> instances = SegmentAssignmentUtils + .getInstancesForBalanceNumStrategy(_helixManager, _tableConfig, _replication, InstancePartitionsType.CONSUMING); + int partitionId = new LLCSegmentName(segmentName).getPartitionId(); + List<String> instancesAssigned = getInstances(instances, partitionId); + LOGGER.info("Assigned segment: {} with partition id: {} to instances: {} for table: {}", segmentName, partitionId, + instancesAssigned, _tableNameWithType); + return instancesAssigned; + } + + @Override + public Map<String, Map<String, String>> rebalanceTable(Map<String, Map<String, String>> currentAssignment, + Configuration config) { + CompletedConsumingSegmentAssignmentPair pair = new CompletedConsumingSegmentAssignmentPair(currentAssignment); + + // Rebalance COMPLETED segments first + Map<String, Map<String, String>> completedSegmentAssignment = pair.getCompletedSegmentAssignment(); + List<String> instancesForCompletedSegments = SegmentAssignmentUtils + .getInstancesForBalanceNumStrategy(_helixManager, _tableConfig, _replication, InstancePartitionsType.COMPLETED); + Map<String, Map<String, String>> newAssignment = SegmentAssignmentUtils + .rebalanceTableWithHelixAutoRebalanceStrategy(completedSegmentAssignment, instancesForCompletedSegments, + _replication); + + // Rebalance CONSUMING segments if needed + Map<String, Map<String, String>> consumingSegmentAssignment = pair.getConsumingSegmentAssignment(); + if (config.getBoolean(RebalanceUserConfigConstants.INCLUDE_CONSUMING, + RebalanceUserConfigConstants.DEFAULT_INCLUDE_CONSUMING)) { + List<String> instancesForConsumingSegments = SegmentAssignmentUtils + .getInstancesForBalanceNumStrategy(_helixManager, _tableConfig, _replication, + InstancePartitionsType.CONSUMING); + for (String segmentName : consumingSegmentAssignment.keySet()) { + int partitionId = new LLCSegmentName(segmentName).getPartitionId(); + List<String> instancesAssigned = getInstances(instancesForConsumingSegments, partitionId); + Map<String, String> instanceStateMap = SegmentAssignmentUtils + .getInstanceStateMap(instancesAssigned, RealtimeSegmentOnlineOfflineStateModel.CONSUMING); + newAssignment.put(segmentName, instanceStateMap); + } + LOGGER.info( + "Rebalanced {} COMPLETED segments to instances: {} and {} CONSUMING segments to instances: {} for table: {} with replication: {}, number of segments to be moved to each instances: {}", + completedSegmentAssignment.size(), instancesForCompletedSegments, consumingSegmentAssignment.size(), + instancesForConsumingSegments, _tableNameWithType, _replication, + SegmentAssignmentUtils.getNumSegmentsToBeMovedPerInstance(currentAssignment, newAssignment)); + } else { + LOGGER.info( + "Rebalanced {} COMPLETED segments to instances: {} for table: {} with replication: {}, number of segments to be moved to each instance: {}", + completedSegmentAssignment.size(), instancesForCompletedSegments, _tableNameWithType, _replication, + SegmentAssignmentUtils.getNumSegmentsToBeMovedPerInstance(completedSegmentAssignment, newAssignment)); + newAssignment.putAll(consumingSegmentAssignment); + } + + return newAssignment; + } + + /** + * Returns the instances for the given partition id for CONSUMING segments. + * <p>Uniformly spray the partitions and replicas across the instances. + * <p>E.g. (6 servers, 3 partitions, 4 replicas) + * <pre> + * "0_0": [i0, i1, i2, i3, i4, i5 ] + * p0r0, p0r1, p0r2, p1r3, p1r0, p1r1 + * p1r2, p1r3, p2r0, p2r1, p2r2, p2r3 + * </pre> + */ + private List<String> getInstances(List<String> instances, int partitionId) { + List<String> instancesAssigned = new ArrayList<>(_replication); + for (int replicaId = 0; replicaId < _replication; replicaId++) { + instancesAssigned.add(instances.get((partitionId * _replication + replicaId) % instances.size())); + } + return instancesAssigned; + } +} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentStrategy.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentStrategy.java new file mode 100644 index 0000000..afd74be --- /dev/null +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentStrategy.java @@ -0,0 +1,159 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.helix.core.assignment.segment; + +import com.google.common.base.Preconditions; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.commons.configuration.Configuration; +import org.apache.helix.HelixManager; +import org.apache.pinot.common.config.TableConfig; +import org.apache.pinot.common.utils.CommonConstants; +import org.apache.pinot.common.utils.InstancePartitionsType; +import org.apache.pinot.common.utils.LLCSegmentName; +import org.apache.pinot.controller.helix.core.assignment.InstancePartitions; +import org.apache.pinot.controller.helix.core.assignment.InstancePartitionsUtils; +import org.apache.pinot.controller.helix.core.rebalance.RebalanceUserConfigConstants; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Segment assignment strategy for LLC real-time segments (both consuming and completed). + * <p>It is very similar to replica-group based segment assignment with the following differences: + * <ul> + * <li>1. Inside one replica, each partition (stream partition) always contains one server</li> + * <li> + * 2. Partition id for an instance is derived from the index of the instance in the replica group, instead of + * explicitly stored in the instance partitions + * </li> + * <li>3. In addition to the ONLINE segments, there are also CONSUMING segments to be assigned</li> + * </ul> + * <p> + * Since each partition contains only one server (in one replica), we can directly assign or rebalance segments to the + * servers based on the partition id. + * <p> + * The real-time segment assignment does not minimize segment moves because the server is fixed for each partition in + * each replica. The instance assignment is responsible for keeping minimum changes to the instance partitions to + * reduce the number of segments need to be moved. + */ +public class RealtimeReplicaGroupSegmentAssignmentStrategy implements SegmentAssignmentStrategy { + private static final Logger LOGGER = LoggerFactory.getLogger(RealtimeReplicaGroupSegmentAssignmentStrategy.class); + + private HelixManager _helixManager; + private TableConfig _tableConfig; + private String _tableNameWithType; + + @Override + public void init(HelixManager helixManager, TableConfig tableConfig) { + _helixManager = helixManager; + _tableConfig = tableConfig; + _tableNameWithType = tableConfig.getTableName(); + + LOGGER.info("Initialized RealtimeReplicaGroupSegmentAssignmentStrategy for table: {}", _tableNameWithType); + } + + @Override + public List<String> assignSegment(String segmentName, Map<String, Map<String, String>> currentAssignment) { + InstancePartitions instancePartitions = InstancePartitionsUtils + .fetchOrComputeInstancePartitions(_helixManager, _tableConfig, InstancePartitionsType.CONSUMING); + Preconditions.checkState(instancePartitions.getNumPartitions() == 1, + "The instance partitions: %s should contain only 1 partition", instancePartitions.getName()); + + int partitionId = new LLCSegmentName(segmentName).getPartitionId(); + List<String> instancesAssigned = getInstances(instancePartitions, partitionId); + LOGGER.info("Assigned segment: {} with partition id: {} to instances: {} for table: {}", segmentName, partitionId, + instancesAssigned, _tableNameWithType); + return instancesAssigned; + } + + @Override + public Map<String, Map<String, String>> rebalanceTable(Map<String, Map<String, String>> currentAssignment, + Configuration config) { + SegmentAssignmentUtils.CompletedConsumingSegmentAssignmentPair pair = + new SegmentAssignmentUtils.CompletedConsumingSegmentAssignmentPair(currentAssignment); + + // Rebalance COMPLETED segments first + Map<String, Map<String, String>> completedSegmentAssignment = pair.getCompletedSegmentAssignment(); + InstancePartitions instancePartitionsForCompletedSegments = InstancePartitionsUtils + .fetchOrComputeInstancePartitions(_helixManager, _tableConfig, InstancePartitionsType.COMPLETED); + Map<Integer, Set<String>> partitionIdToSegmentsMap = new HashMap<>(); + for (String segmentName : completedSegmentAssignment.keySet()) { + int partitionId = new LLCSegmentName(segmentName).getPartitionId(); + partitionIdToSegmentsMap.computeIfAbsent(partitionId, k -> new HashSet<>()).add(segmentName); + } + Map<String, Map<String, String>> newAssignment = SegmentAssignmentUtils + .rebalanceReplicaGroupBasedTable(completedSegmentAssignment, instancePartitionsForCompletedSegments, + partitionIdToSegmentsMap); + + // Rebalance CONSUMING segments if needed + Map<String, Map<String, String>> consumingSegmentAssignment = pair.getConsumingSegmentAssignment(); + if (config.getBoolean(RebalanceUserConfigConstants.INCLUDE_CONSUMING, + RebalanceUserConfigConstants.DEFAULT_INCLUDE_CONSUMING)) { + InstancePartitions instancePartitionsForConsumingSegments = InstancePartitionsUtils + .fetchOrComputeInstancePartitions(_helixManager, _tableConfig, InstancePartitionsType.CONSUMING); + for (String segmentName : consumingSegmentAssignment.keySet()) { + int partitionId = new LLCSegmentName(segmentName).getPartitionId(); + List<String> instancesAssigned = getInstances(instancePartitionsForConsumingSegments, partitionId); + Map<String, String> instanceStateMap = SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, + CommonConstants.Helix.StateModel.RealtimeSegmentOnlineOfflineStateModel.CONSUMING); + newAssignment.put(segmentName, instanceStateMap); + } + LOGGER.info( + "Rebalanced {} COMPLETED segments with instance partitions: {} and {} CONSUMING segments with instance partitions: {} for table: {}, number of segments to be moved to each instances: {}", + completedSegmentAssignment.size(), instancePartitionsForCompletedSegments.getPartitionToInstancesMap(), + consumingSegmentAssignment.size(), instancePartitionsForConsumingSegments.getPartitionToInstancesMap(), + _tableNameWithType, + SegmentAssignmentUtils.getNumSegmentsToBeMovedPerInstance(currentAssignment, newAssignment)); + } else { + LOGGER.info( + "Rebalanced {} COMPLETED segments with instance partitions: {} for table: {}, number of segments to be moved to each instance: {}", + completedSegmentAssignment.size(), instancePartitionsForCompletedSegments.getPartitionToInstancesMap(), + _tableNameWithType, + SegmentAssignmentUtils.getNumSegmentsToBeMovedPerInstance(completedSegmentAssignment, newAssignment)); + newAssignment.putAll(consumingSegmentAssignment); + } + + return newAssignment; + } + + /** + * Returns the instances for the given partition id for CONSUMING segments. + * <p>Within a replica, uniformly spray the partitions across the instances. + * <p>E.g. (within a replica, 3 servers, 6 partitions) + * <pre> + * "0_0": [i0, i1, i2] + * p0, p1, p2 + * p3, p4, p5 + * </pre> + */ + private List<String> getInstances(InstancePartitions instancePartitions, int partitionId) { + int numReplicas = instancePartitions.getNumReplicas(); + List<String> instancesAssigned = new ArrayList<>(numReplicas); + for (int replicaId = 0; replicaId < numReplicas; replicaId++) { + List<String> instances = instancePartitions.getInstances(0, replicaId); + instancesAssigned.add(instances.get(partitionId % instances.size())); + } + return instancesAssigned; + } +} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentStrategy.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentStrategy.java new file mode 100644 index 0000000..de2ea0d --- /dev/null +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentStrategy.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.helix.core.assignment.segment; + +import java.util.List; +import java.util.Map; +import org.apache.commons.configuration.Configuration; +import org.apache.helix.HelixManager; +import org.apache.pinot.common.config.TableConfig; + + +/** + * Strategy to assign segment to instances or rebalance all segments in a table. + */ +public interface SegmentAssignmentStrategy { + + /** + * Initializes the segment assignment strategy. + * + * @param helixManager Helix manager + * @param tableConfig Table config + */ + void init(HelixManager helixManager, TableConfig tableConfig); + + /** + * Assigns a new segment. + * + * @param segmentName Name of the segment to be assigned + * @param currentAssignment Current segment assignment of the table (map from segment name to instance state map) + * @return List of servers to assign the segment to + */ + List<String> assignSegment(String segmentName, Map<String, Map<String, String>> currentAssignment); + + /** + * Rebalances the segments for a table. + * + * @param currentAssignment Current segment assignment of the table (map from segment name to instance state map) + * @param config Configuration for the rebalance + * @return the rebalanced assignment for the segments + */ + Map<String, Map<String, String>> rebalanceTable(Map<String, Map<String, String>> currentAssignment, + Configuration config); +} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentStrategyFactory.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentStrategyFactory.java new file mode 100644 index 0000000..d71b1d9 --- /dev/null +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentStrategyFactory.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.helix.core.assignment.segment; + +import org.apache.helix.HelixManager; +import org.apache.pinot.common.config.TableConfig; +import org.apache.pinot.common.utils.CommonConstants.Helix.TableType; + + +/** + * Factory for the {@link SegmentAssignmentStrategy}. + */ +public class SegmentAssignmentStrategyFactory { + private SegmentAssignmentStrategyFactory() { + } + + public enum Strategy { + BalanceNumSegmentAssignmentStrategy, ReplicaGroupSegmentAssignmentStrategy + } + + public static SegmentAssignmentStrategy getSegmentAssignmentStrategy(HelixManager helixManager, + TableConfig tableConfig) { + SegmentAssignmentStrategy segmentAssignmentStrategy; + if (Strategy.ReplicaGroupSegmentAssignmentStrategy.name() + .equalsIgnoreCase(tableConfig.getValidationConfig().getSegmentAssignmentStrategy())) { + if (tableConfig.getTableType() == TableType.OFFLINE) { + segmentAssignmentStrategy = new OfflineReplicaGroupSegmentAssignmentStrategy(); + } else { + segmentAssignmentStrategy = new RealtimeReplicaGroupSegmentAssignmentStrategy(); + } + } else { + if (tableConfig.getTableType() == TableType.OFFLINE) { + segmentAssignmentStrategy = new OfflineBalanceNumSegmentAssignmentStrategy(); + } else { + segmentAssignmentStrategy = new RealtimeBalanceNumSegmentAssignmentStrategy(); + } + } + segmentAssignmentStrategy.init(helixManager, tableConfig); + return segmentAssignmentStrategy; + } +} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java new file mode 100644 index 0000000..4333f91 --- /dev/null +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java @@ -0,0 +1,276 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.helix.core.assignment.segment; + +import com.google.common.base.Preconditions; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.PriorityQueue; +import java.util.Set; +import java.util.TreeMap; +import org.apache.helix.HelixManager; +import org.apache.helix.controller.rebalancer.strategy.AutoRebalanceStrategy; +import org.apache.pinot.common.config.TableConfig; +import org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.RealtimeSegmentOnlineOfflineStateModel; +import org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentOnlineOfflineStateModel; +import org.apache.pinot.common.utils.InstancePartitionsType; +import org.apache.pinot.common.utils.Pairs; +import org.apache.pinot.controller.helix.core.assignment.InstancePartitions; +import org.apache.pinot.controller.helix.core.assignment.InstancePartitionsUtils; + + +/** + * Utility class for segment assignment. + */ +class SegmentAssignmentUtils { + private SegmentAssignmentUtils() { + } + + /** + * Returns the number of segments assigned to each instance. + */ + static int[] getNumSegmentsAssignedPerInstance(Map<String, Map<String, String>> segmentAssignment, + List<String> instances) { + int[] numSegmentsPerInstance = new int[instances.size()]; + Map<String, Integer> instanceNameToIdMap = getInstanceNameToIdMap(instances); + for (Map<String, String> instanceStateMep : segmentAssignment.values()) { + for (String instanceName : instanceStateMep.keySet()) { + Integer instanceId = instanceNameToIdMap.get(instanceName); + if (instanceId != null) { + numSegmentsPerInstance[instanceId]++; + } + } + } + return numSegmentsPerInstance; + } + + private static Map<String, Integer> getInstanceNameToIdMap(List<String> instances) { + int numInstances = instances.size(); + Map<String, Integer> instanceNameToIdMap = new HashMap<>(); + for (int i = 0; i < numInstances; i++) { + instanceNameToIdMap.put(instances.get(i), i); + } + return instanceNameToIdMap; + } + + /** + * Returns the instances for the balance number segment assignment strategy. + */ + static List<String> getInstancesForBalanceNumStrategy(HelixManager helixManager, TableConfig tableConfig, + int replication, InstancePartitionsType instancePartitionsType) { + InstancePartitions instancePartitions = + InstancePartitionsUtils.fetchOrComputeInstancePartitions(helixManager, tableConfig, instancePartitionsType); + Preconditions.checkArgument(instancePartitions.getNumPartitions() == 1 && instancePartitions.getNumReplicas() == 1, + "The instance partitions: %s should contain only 1 partition and 1 replica", instancePartitions.getName()); + List<String> instances = instancePartitions.getInstances(0, 0); + Preconditions.checkState(instances.size() >= replication, + "There are less instances: %d than the replication: %d for table: %s", instances.size(), replication, + tableConfig.getTableName()); + return instances; + } + + /** + * Rebalances the table with Helix AutoRebalanceStrategy for the balance number segment assignment strategy. + */ + static Map<String, Map<String, String>> rebalanceTableWithHelixAutoRebalanceStrategy( + Map<String, Map<String, String>> currentAssignment, List<String> instances, int replication) { + // Use Helix AutoRebalanceStrategy to rebalance the table + LinkedHashMap<String, Integer> states = new LinkedHashMap<>(); + states.put(SegmentOnlineOfflineStateModel.ONLINE, replication); + AutoRebalanceStrategy autoRebalanceStrategy = + new AutoRebalanceStrategy(null, new ArrayList<>(currentAssignment.keySet()), states); + // Make a copy of the current assignment because this step might change the passed in assignment + Map<String, Map<String, String>> currentAssignmentCopy = new TreeMap<>(); + for (Map.Entry<String, Map<String, String>> entry : currentAssignment.entrySet()) { + String segmentName = entry.getKey(); + Map<String, String> instanceStateMap = entry.getValue(); + currentAssignmentCopy.put(segmentName, new TreeMap<>(instanceStateMap)); + } + return autoRebalanceStrategy.computePartitionAssignment(instances, instances, currentAssignmentCopy, null) + .getMapFields(); + } + + /** + * Rebalances the table for the replica-group based segment assignment strategy. + * <p>The number of partitions for the segments can be different from the number of partitions in the instance + * partitions. Uniformly spray the segment partitions over the instance partitions. + */ + static Map<String, Map<String, String>> rebalanceReplicaGroupBasedTable( + Map<String, Map<String, String>> currentAssignment, InstancePartitions instancePartitions, + Map<Integer, Set<String>> partitionIdToSegmentsMap) { + Map<String, Map<String, String>> newAssignment = new TreeMap<>(); + int numPartitions = instancePartitions.getNumPartitions(); + for (Map.Entry<Integer, Set<String>> entry : partitionIdToSegmentsMap.entrySet()) { + // Uniformly spray the segment partitions over the instance partitions + int partitionId = entry.getKey() % numPartitions; + SegmentAssignmentUtils + .rebalanceReplicaGroupBasedPartition(currentAssignment, instancePartitions, partitionId, entry.getValue(), + newAssignment); + } + return newAssignment; + } + + /** + * Rebalances one partition of the table for the replica-group based segment assignment strategy. + * <ul> + * <li> + * 1. Calculate the target number of segments on each server + * </li> + * <li> + * 2. Loop over all the segments and keep the assignment if target number of segments for the server has not been + * reached and track the not assigned segments + * </li> + * <li> + * 3. Assign the left-over segments to the servers with the least segments, or the smallest index if there is a + * tie + * </li> + * <li> + * 4. Mirror the assignment to other replicas + * </li> + * </ul> + */ + static void rebalanceReplicaGroupBasedPartition(Map<String, Map<String, String>> currentAssignment, + InstancePartitions instancePartitions, int partitionId, Set<String> segments, + Map<String, Map<String, String>> newAssignment) { + // Fetch instances in replica 0 + List<String> instances = instancePartitions.getInstances(partitionId, 0); + Map<String, Integer> instanceNameToIdMap = SegmentAssignmentUtils.getInstanceNameToIdMap(instances); + + // Calculate target number of segments per instance + // NOTE: in order to minimize the segment movements, use the ceiling of the quotient + int numInstances = instances.size(); + int numSegments = segments.size(); + int targetNumSegmentsPerInstance = (numSegments + numInstances - 1) / numInstances; + + // Do not move segment if target number of segments is not reached, track the segments need to be moved + int[] numSegmentsAssignedPerInstance = new int[numInstances]; + List<String> segmentsNotAssigned = new ArrayList<>(); + for (Map.Entry<String, Map<String, String>> entry : currentAssignment.entrySet()) { + String segmentName = entry.getKey(); + // Skip segments not in the partition + if (!segments.contains(segmentName)) { + continue; + } + boolean segmentAssigned = false; + for (String instanceName : entry.getValue().keySet()) { + Integer instanceId = instanceNameToIdMap.get(instanceName); + if (instanceId != null && numSegmentsAssignedPerInstance[instanceId] < targetNumSegmentsPerInstance) { + newAssignment + .put(segmentName, getReplicaGroupBasedInstanceStateMap(instancePartitions, partitionId, instanceId)); + numSegmentsAssignedPerInstance[instanceId]++; + segmentAssigned = true; + break; + } + } + if (!segmentAssigned) { + segmentsNotAssigned.add(segmentName); + } + } + + // Assign each not assigned segment to the instance with the least segments, or the smallest id if there is a tie + PriorityQueue<Pairs.IntPair> heap = new PriorityQueue<>(numInstances, Pairs.intPairComparator()); + for (int instanceId = 0; instanceId < numInstances; instanceId++) { + heap.add(new Pairs.IntPair(numSegmentsAssignedPerInstance[instanceId], instanceId)); + } + for (String segmentName : segmentsNotAssigned) { + Pairs.IntPair intPair = heap.remove(); + int instanceId = intPair.getRight(); + newAssignment.put(segmentName, getReplicaGroupBasedInstanceStateMap(instancePartitions, partitionId, instanceId)); + intPair.setLeft(intPair.getLeft() + 1); + heap.add(intPair); + } + } + + /** + * Returns the map from instance name to Helix partition state for the replica-group based segment assignment + * strategy, which can be put into the segment assignment. The instances are picked from the instance partitions by + * the given partition id and instance id. + */ + private static Map<String, String> getReplicaGroupBasedInstanceStateMap(InstancePartitions instancePartitions, + int partitionId, int instanceId) { + Map<String, String> instanceStateMap = new TreeMap<>(); + int numReplicas = instancePartitions.getNumReplicas(); + for (int replicaId = 0; replicaId < numReplicas; replicaId++) { + instanceStateMap.put(instancePartitions.getInstances(partitionId, replicaId).get(instanceId), + SegmentOnlineOfflineStateModel.ONLINE); + } + return instanceStateMap; + } + + /** + * Returns the map from instance name to Helix partition state, which can be put into the segment assignment. + */ + static Map<String, String> getInstanceStateMap(List<String> instances, String state) { + Map<String, String> instanceStateMap = new TreeMap<>(); + for (String instanceName : instances) { + instanceStateMap.put(instanceName, state); + } + return instanceStateMap; + } + + /** + * Returns a map from instance name to number of segments to be moved to it. + */ + static Map<String, Integer> getNumSegmentsToBeMovedPerInstance(Map<String, Map<String, String>> oldAssignment, + Map<String, Map<String, String>> newAssignment) { + Map<String, Integer> numSegmentsToBeMovedPerInstance = new TreeMap<>(); + for (Map.Entry<String, Map<String, String>> entry : newAssignment.entrySet()) { + String segmentName = entry.getKey(); + Set<String> newInstancesAssigned = entry.getValue().keySet(); + Set<String> oldInstancesAssigned = oldAssignment.get(segmentName).keySet(); + // For each new assigned instance, check if the segment needs to be moved to it + for (String instanceName : newInstancesAssigned) { + if (!oldInstancesAssigned.contains(instanceName)) { + numSegmentsToBeMovedPerInstance.merge(instanceName, 1, Integer::sum); + } + } + } + return numSegmentsToBeMovedPerInstance; + } + + /** + * Class that splits segment assignment into CONSUMING segments and COMPLETED segments. + */ + static class CompletedConsumingSegmentAssignmentPair { + private final Map<String, Map<String, String>> _completedSegmentAssignment = new TreeMap<>(); + private final Map<String, Map<String, String>> _consumingSegmentAssignment = new TreeMap<>(); + + CompletedConsumingSegmentAssignmentPair(Map<String, Map<String, String>> segmentAssignment) { + for (Map.Entry<String, Map<String, String>> entry : segmentAssignment.entrySet()) { + Map<String, String> instanceStateMap = entry.getValue(); + if (instanceStateMap.values().contains(RealtimeSegmentOnlineOfflineStateModel.ONLINE)) { + _completedSegmentAssignment.put(entry.getKey(), instanceStateMap); + } else { + _consumingSegmentAssignment.put(entry.getKey(), instanceStateMap); + } + } + } + + Map<String, Map<String, String>> getCompletedSegmentAssignment() { + return _completedSegmentAssignment; + } + + Map<String, Map<String, String>> getConsumingSegmentAssignment() { + return _consumingSegmentAssignment; + } + } +} diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineBalanceNumSegmentAssignmentStrategyTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineBalanceNumSegmentAssignmentStrategyTest.java new file mode 100644 index 0000000..762773b --- /dev/null +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineBalanceNumSegmentAssignmentStrategyTest.java @@ -0,0 +1,137 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.helix.core.assignment.segment; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import org.apache.helix.HelixManager; +import org.apache.helix.ZNRecord; +import org.apache.helix.store.zk.ZkHelixPropertyStore; +import org.apache.pinot.common.config.TableConfig; +import org.apache.pinot.common.metadata.ZKMetadataProvider; +import org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentOnlineOfflineStateModel; +import org.apache.pinot.common.utils.CommonConstants.Helix.TableType; +import org.apache.pinot.common.utils.InstancePartitionsType; +import org.apache.pinot.controller.helix.core.assignment.InstancePartitions; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; + + +public class OfflineBalanceNumSegmentAssignmentStrategyTest { + private static final int NUM_REPLICAS = 3; + private static final String SEGMENT_NAME_PREFIX = "segment_"; + private static final int NUM_SEGMENTS = 100; + private static final List<String> SEGMENTS = + SegmentAssignmentTestUtils.getNameList(SEGMENT_NAME_PREFIX, NUM_SEGMENTS); + private static final String INSTANCE_NAME_PREFIX = "instance_"; + private static final int NUM_INSTANCES = 10; + private static final List<String> INSTANCES = + SegmentAssignmentTestUtils.getNameList(INSTANCE_NAME_PREFIX, NUM_INSTANCES); + private static final String RAW_TABLE_NAME = "testTable"; + private static final String INSTANCE_PARTITIONS_NAME = + InstancePartitionsType.OFFLINE.getInstancePartitionsName(RAW_TABLE_NAME); + + private SegmentAssignmentStrategy _strategy; + + @BeforeClass + public void setUp() { + // { + // 0_0=[instance_0, instance_1, instance_2, instance_3, instance_4, instance_5, instance_6, instance_7, instance_8, instance_9] + // } + InstancePartitions instancePartitions = new InstancePartitions(INSTANCE_PARTITIONS_NAME); + instancePartitions.setInstances(0, 0, INSTANCES); + + // Mock HelixManager + @SuppressWarnings("unchecked") + ZkHelixPropertyStore<ZNRecord> propertyStore = mock(ZkHelixPropertyStore.class); + when(propertyStore + .get(eq(ZKMetadataProvider.constructPropertyStorePathForInstancePartitions(INSTANCE_PARTITIONS_NAME)), any(), + anyInt())).thenReturn(instancePartitions.toZNRecord()); + HelixManager helixManager = mock(HelixManager.class); + when(helixManager.getHelixPropertyStore()).thenReturn(propertyStore); + + TableConfig tableConfig = + new TableConfig.Builder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setNumReplicas(NUM_REPLICAS).build(); + _strategy = SegmentAssignmentStrategyFactory.getSegmentAssignmentStrategy(helixManager, tableConfig); + } + + @Test + public void testFactory() { + assertTrue(_strategy instanceof OfflineBalanceNumSegmentAssignmentStrategy); + } + + @Test + public void testAssignSegment() { + Map<String, Map<String, String>> currentAssignment = new TreeMap<>(); + + // Segment 0 should be assigned to instance 0, 1, 2 + // Segment 1 should be assigned to instance 3, 4, 5 + // Segment 2 should be assigned to instance 6, 7, 8 + // Segment 3 should be assigned to instance 9, 0, 1 + // Segment 4 should be assigned to instance 2, 3, 4 + // ... + int expectedAssignedInstanceId = 0; + for (String segmentName : SEGMENTS) { + List<String> instancesAssigned = _strategy.assignSegment(segmentName, currentAssignment); + assertEquals(instancesAssigned.size(), NUM_REPLICAS); + for (int replicaId = 0; replicaId < NUM_REPLICAS; replicaId++) { + assertEquals(instancesAssigned.get(replicaId), INSTANCES.get(expectedAssignedInstanceId)); + expectedAssignedInstanceId = (expectedAssignedInstanceId + 1) % NUM_INSTANCES; + } + currentAssignment.put(segmentName, + SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, SegmentOnlineOfflineStateModel.ONLINE)); + } + } + + @Test + public void testTableBalanced() { + Map<String, Map<String, String>> currentAssignment = new TreeMap<>(); + for (String segmentName : SEGMENTS) { + List<String> instancesAssigned = _strategy.assignSegment(segmentName, currentAssignment); + currentAssignment.put(segmentName, + SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, SegmentOnlineOfflineStateModel.ONLINE)); + } + + // There should be 100 segments assigned + assertEquals(currentAssignment.size(), NUM_SEGMENTS); + // Each segment should have 3 replicas + for (Map<String, String> instanceStateMap : currentAssignment.values()) { + assertEquals(instanceStateMap.size(), NUM_REPLICAS); + } + // Each instance should have 30 segments assigned + int[] numSegmentsAssignedPerInstance = + SegmentAssignmentUtils.getNumSegmentsAssignedPerInstance(currentAssignment, INSTANCES); + int[] expectedNumSegmentsAssignedPerInstance = new int[NUM_INSTANCES]; + int numSegmentsPerInstance = NUM_SEGMENTS * NUM_REPLICAS / NUM_INSTANCES; + Arrays.fill(expectedNumSegmentsAssignedPerInstance, numSegmentsPerInstance); + assertEquals(numSegmentsAssignedPerInstance, expectedNumSegmentsAssignedPerInstance); + // Current assignment should already be balanced + assertEquals(_strategy.rebalanceTable(currentAssignment, null), currentAssignment); + } +} diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineReplicaGroupSegmentAssignmentStrategyTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineReplicaGroupSegmentAssignmentStrategyTest.java new file mode 100644 index 0000000..47ecdd8 --- /dev/null +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/OfflineReplicaGroupSegmentAssignmentStrategyTest.java @@ -0,0 +1,289 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.helix.core.assignment.segment; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import org.apache.helix.HelixManager; +import org.apache.helix.ZNRecord; +import org.apache.helix.store.zk.ZkHelixPropertyStore; +import org.apache.pinot.common.config.ReplicaGroupStrategyConfig; +import org.apache.pinot.common.config.TableConfig; +import org.apache.pinot.common.config.TableNameBuilder; +import org.apache.pinot.common.metadata.ZKMetadataProvider; +import org.apache.pinot.common.metadata.segment.ColumnPartitionMetadata; +import org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata; +import org.apache.pinot.common.metadata.segment.SegmentPartitionMetadata; +import org.apache.pinot.common.utils.CommonConstants; +import org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentOnlineOfflineStateModel; +import org.apache.pinot.common.utils.InstancePartitionsType; +import org.apache.pinot.controller.helix.core.assignment.InstancePartitions; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; + + +public class OfflineReplicaGroupSegmentAssignmentStrategyTest { + private static final int NUM_REPLICAS = 3; + private static final String SEGMENT_NAME_PREFIX = "segment_"; + private static final int NUM_SEGMENTS = 90; + private static final List<String> SEGMENTS = + SegmentAssignmentTestUtils.getNameList(SEGMENT_NAME_PREFIX, NUM_SEGMENTS); + private static final String INSTANCE_NAME_PREFIX = "instance_"; + private static final int NUM_INSTANCES = 18; + private static final List<String> INSTANCES = + SegmentAssignmentTestUtils.getNameList(INSTANCE_NAME_PREFIX, NUM_INSTANCES); + private static final String RAW_TABLE_NAME_WITHOUT_PARTITION = "testTableWithoutPartition"; + private static final String INSTANCE_PARTITIONS_NAME_WITHOUT_PARTITION = + InstancePartitionsType.OFFLINE.getInstancePartitionsName(RAW_TABLE_NAME_WITHOUT_PARTITION); + private static final String RAW_TABLE_NAME_WITH_PARTITION = "testTableWithPartition"; + private static final String OFFLINE_TABLE_NAME_WITH_PARTITION = + TableNameBuilder.OFFLINE.tableNameWithType(RAW_TABLE_NAME_WITH_PARTITION); + private static final String INSTANCE_PARTITIONS_NAME_WITH_PARTITION = + InstancePartitionsType.OFFLINE.getInstancePartitionsName(RAW_TABLE_NAME_WITH_PARTITION); + private static final String PARTITION_COLUMN = "partitionColumn"; + private static final int NUM_PARTITIONS = 3; + + private SegmentAssignmentStrategy _strategyWithoutPartition; + private SegmentAssignmentStrategy _strategyWithPartition; + + @BeforeClass + public void setUp() { + // { + // 0_0=[instance_0, instance_1, instance_2, instance_3, instance_4, instance_5], + // 0_1=[instance_6, instance_7, instance_8, instance_9, instance_10, instance_11], + // 0_2=[instance_12, instance_13, instance_14, instance_15, instance_16, instance_17] + // } + InstancePartitions instancePartitionsWithoutPartition = + new InstancePartitions(INSTANCE_PARTITIONS_NAME_WITHOUT_PARTITION); + int numInstancesPerReplica = NUM_INSTANCES / NUM_REPLICAS; + int instanceIdToAdd = 0; + for (int replicaId = 0; replicaId < NUM_REPLICAS; replicaId++) { + List<String> instancesForReplica = new ArrayList<>(numInstancesPerReplica); + for (int i = 0; i < numInstancesPerReplica; i++) { + instancesForReplica.add(INSTANCES.get(instanceIdToAdd++)); + } + instancePartitionsWithoutPartition.setInstances(0, replicaId, instancesForReplica); + } + + // Mock HelixManager + @SuppressWarnings("unchecked") + ZkHelixPropertyStore<ZNRecord> propertyStoreWithoutPartitions = mock(ZkHelixPropertyStore.class); + when(propertyStoreWithoutPartitions.get(eq(ZKMetadataProvider + .constructPropertyStorePathForInstancePartitions(INSTANCE_PARTITIONS_NAME_WITHOUT_PARTITION)), any(), anyInt())) + .thenReturn(instancePartitionsWithoutPartition.toZNRecord()); + HelixManager helixManagerWithoutPartitions = mock(HelixManager.class); + when(helixManagerWithoutPartitions.getHelixPropertyStore()).thenReturn(propertyStoreWithoutPartitions); + + TableConfig tableConfigWithoutPartitions = + new TableConfig.Builder(CommonConstants.Helix.TableType.OFFLINE).setTableName(RAW_TABLE_NAME_WITHOUT_PARTITION) + .setNumReplicas(NUM_REPLICAS).setSegmentAssignmentStrategy( + SegmentAssignmentStrategyFactory.Strategy.ReplicaGroupSegmentAssignmentStrategy.name()).build(); + _strategyWithoutPartition = SegmentAssignmentStrategyFactory + .getSegmentAssignmentStrategy(helixManagerWithoutPartitions, tableConfigWithoutPartitions); + + // { + // 0_0=[instance_0, instance_1], + // 0_1=[instance_6, instance_7], + // 0_2=[instance_12, instance_13], + // 1_0=[instance_2, instance_3], + // 1_1=[instance_8, instance_9], + // 1_2=[instance_14, instance_15], + // 2_0=[instance_4, instance_5], + // 2_1=[instance_10, instance_11], + // 2_2=[instance_16, instance_17] + // } + InstancePartitions instancePartitionsWithPartition = + new InstancePartitions(INSTANCE_PARTITIONS_NAME_WITH_PARTITION); + int numInstancesPerPartition = numInstancesPerReplica / NUM_REPLICAS; + instanceIdToAdd = 0; + for (int replicaId = 0; replicaId < NUM_REPLICAS; replicaId++) { + for (int partitionId = 0; partitionId < NUM_PARTITIONS; partitionId++) { + List<String> instancesForPartition = new ArrayList<>(numInstancesPerPartition); + for (int i = 0; i < numInstancesPerPartition; i++) { + instancesForPartition.add(INSTANCES.get(instanceIdToAdd++)); + } + instancePartitionsWithPartition.setInstances(partitionId, replicaId, instancesForPartition); + } + } + + // Mock HelixManager + @SuppressWarnings("unchecked") + ZkHelixPropertyStore<ZNRecord> propertyStoreWithPartitions = mock(ZkHelixPropertyStore.class); + when(propertyStoreWithPartitions.get( + eq(ZKMetadataProvider.constructPropertyStorePathForInstancePartitions(INSTANCE_PARTITIONS_NAME_WITH_PARTITION)), + any(), anyInt())).thenReturn(instancePartitionsWithPartition.toZNRecord()); + List<ZNRecord> segmentZKMetadataZNRecords = new ArrayList<>(NUM_SEGMENTS); + for (int segmentId = 0; segmentId < NUM_SEGMENTS; segmentId++) { + String segmentName = SEGMENTS.get(segmentId); + OfflineSegmentZKMetadata segmentZKMetadata = new OfflineSegmentZKMetadata(); + segmentZKMetadata.setSegmentName(segmentName); + int partitionId = segmentId % NUM_PARTITIONS; + segmentZKMetadata.setPartitionMetadata(new SegmentPartitionMetadata(Collections.singletonMap(PARTITION_COLUMN, + new ColumnPartitionMetadata(null, NUM_PARTITIONS, Collections.singleton(partitionId))))); + ZNRecord segmentZKMetadataZNRecord = segmentZKMetadata.toZNRecord(); + when(propertyStoreWithPartitions.get( + eq(ZKMetadataProvider.constructPropertyStorePathForSegment(OFFLINE_TABLE_NAME_WITH_PARTITION, segmentName)), + any(), anyInt())).thenReturn(segmentZKMetadataZNRecord); + segmentZKMetadataZNRecords.add(segmentZKMetadataZNRecord); + } + when(propertyStoreWithPartitions + .getChildren(eq(ZKMetadataProvider.constructPropertyStorePathForResource(OFFLINE_TABLE_NAME_WITH_PARTITION)), + any(), anyInt())).thenReturn(segmentZKMetadataZNRecords); + HelixManager helixManagerWithPartitions = mock(HelixManager.class); + when(helixManagerWithPartitions.getHelixPropertyStore()).thenReturn(propertyStoreWithPartitions); + + ReplicaGroupStrategyConfig strategyConfig = new ReplicaGroupStrategyConfig(); + strategyConfig.setPartitionColumn(PARTITION_COLUMN); + TableConfig tableConfigWithPartitions = + new TableConfig.Builder(CommonConstants.Helix.TableType.OFFLINE).setTableName(RAW_TABLE_NAME_WITH_PARTITION) + .setNumReplicas(NUM_REPLICAS).setSegmentAssignmentStrategy( + SegmentAssignmentStrategyFactory.Strategy.ReplicaGroupSegmentAssignmentStrategy.name()).build(); + tableConfigWithPartitions.getValidationConfig().setReplicaGroupStrategyConfig(strategyConfig); + _strategyWithPartition = SegmentAssignmentStrategyFactory + .getSegmentAssignmentStrategy(helixManagerWithPartitions, tableConfigWithPartitions); + } + + @Test + public void testFactory() { + assertTrue(_strategyWithoutPartition instanceof OfflineReplicaGroupSegmentAssignmentStrategy); + assertTrue(_strategyWithPartition instanceof OfflineReplicaGroupSegmentAssignmentStrategy); + } + + @Test + public void testAssignSegmentWithoutPartition() { + int numInstancesPerReplica = NUM_INSTANCES / NUM_REPLICAS; + Map<String, Map<String, String>> currentAssignment = new TreeMap<>(); + for (int segmentId = 0; segmentId < NUM_SEGMENTS; segmentId++) { + String segmentName = SEGMENTS.get(segmentId); + List<String> instancesAssigned = _strategyWithoutPartition.assignSegment(segmentName, currentAssignment); + assertEquals(instancesAssigned.size(), NUM_REPLICAS); + for (int replicaId = 0; replicaId < NUM_REPLICAS; replicaId++) { + + // Segment 0 should be assigned to instance 0, 6, 12 + // Segment 1 should be assigned to instance 1, 7, 13 + // Segment 2 should be assigned to instance 2, 8, 14 + // Segment 3 should be assigned to instance 3, 9, 15 + // Segment 4 should be assigned to instance 4, 10, 16 + // Segment 5 should be assigned to instance 5, 11, 17 + // Segment 6 should be assigned to instance 0, 6, 12 + // Segment 7 should be assigned to instance 1, 7, 13 + // ... + int expectedAssignedInstanceId = segmentId % numInstancesPerReplica + replicaId * numInstancesPerReplica; + assertEquals(instancesAssigned.get(replicaId), INSTANCES.get(expectedAssignedInstanceId)); + } + currentAssignment.put(segmentName, + SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, SegmentOnlineOfflineStateModel.ONLINE)); + } + } + + @Test + public void testAssignSegmentWithPartition() { + int numInstancesPerReplica = NUM_INSTANCES / NUM_REPLICAS; + Map<String, Map<String, String>> currentAssignment = new TreeMap<>(); + int numInstancesPerPartition = numInstancesPerReplica / NUM_PARTITIONS; + for (int segmentId = 0; segmentId < NUM_SEGMENTS; segmentId++) { + String segmentName = SEGMENTS.get(segmentId); + List<String> instancesAssigned = _strategyWithPartition.assignSegment(segmentName, currentAssignment); + assertEquals(instancesAssigned.size(), NUM_REPLICAS); + int partitionId = segmentId % NUM_PARTITIONS; + for (int replicaId = 0; replicaId < NUM_REPLICAS; replicaId++) { + + // Segment 0 (partition 0) should be assigned to instance 0, 6, 12 + // Segment 1 (partition 1) should be assigned to instance 2, 8, 14 + // Segment 2 (partition 2) should be assigned to instance 4, 10, 16 + // Segment 3 (partition 0) should be assigned to instance 1, 7, 13 + // Segment 4 (partition 1) should be assigned to instance 3, 9, 15 + // Segment 5 (partition 2) should be assigned to instance 5, 11, 17 + // Segment 6 (partition 0) should be assigned to instance 0, 6, 12 + // Segment 7 (partition 1) should be assigned to instance 2, 8, 14 + // ... + int expectedAssignedInstanceId = + (segmentId % numInstancesPerReplica) / NUM_PARTITIONS + partitionId * numInstancesPerPartition + + replicaId * numInstancesPerReplica; + assertEquals(instancesAssigned.get(replicaId), INSTANCES.get(expectedAssignedInstanceId)); + } + currentAssignment.put(segmentName, + SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, SegmentOnlineOfflineStateModel.ONLINE)); + } + } + + @Test + public void testTableBalancedWithoutPartition() { + Map<String, Map<String, String>> currentAssignment = new TreeMap<>(); + for (String segmentName : SEGMENTS) { + List<String> instancesAssigned = _strategyWithoutPartition.assignSegment(segmentName, currentAssignment); + currentAssignment.put(segmentName, + SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, SegmentOnlineOfflineStateModel.ONLINE)); + } + + // There should be 90 segments assigned + assertEquals(currentAssignment.size(), NUM_SEGMENTS); + // Each segment should have 3 replicas + for (Map<String, String> instanceStateMap : currentAssignment.values()) { + assertEquals(instanceStateMap.size(), NUM_REPLICAS); + } + // Each instance should have 15 segments assigned + int[] numSegmentsAssignedPerInstance = + SegmentAssignmentUtils.getNumSegmentsAssignedPerInstance(currentAssignment, INSTANCES); + int[] expectedNumSegmentsAssignedPerInstance = new int[NUM_INSTANCES]; + int numSegmentsPerInstance = NUM_SEGMENTS * NUM_REPLICAS / NUM_INSTANCES; + Arrays.fill(expectedNumSegmentsAssignedPerInstance, numSegmentsPerInstance); + assertEquals(numSegmentsAssignedPerInstance, expectedNumSegmentsAssignedPerInstance); + // Current assignment should already be balanced + assertEquals(_strategyWithoutPartition.rebalanceTable(currentAssignment, null), currentAssignment); + } + + @Test + public void testTableBalancedWithPartition() { + Map<String, Map<String, String>> currentAssignment = new TreeMap<>(); + for (String segmentName : SEGMENTS) { + List<String> instancesAssigned = _strategyWithPartition.assignSegment(segmentName, currentAssignment); + currentAssignment.put(segmentName, + SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, SegmentOnlineOfflineStateModel.ONLINE)); + } + + // There should be 90 segments assigned + assertEquals(currentAssignment.size(), NUM_SEGMENTS); + // Each segment should have 3 replicas + for (Map<String, String> instanceStateMap : currentAssignment.values()) { + assertEquals(instanceStateMap.size(), NUM_REPLICAS); + } + // Each instance should have 15 segments assigned + int[] numSegmentsAssignedPerInstance = + SegmentAssignmentUtils.getNumSegmentsAssignedPerInstance(currentAssignment, INSTANCES); + int[] expectedNumSegmentsAssignedPerInstance = new int[NUM_INSTANCES]; + int numSegmentsPerInstance = NUM_SEGMENTS * NUM_REPLICAS / NUM_INSTANCES; + Arrays.fill(expectedNumSegmentsAssignedPerInstance, numSegmentsPerInstance); + assertEquals(numSegmentsAssignedPerInstance, expectedNumSegmentsAssignedPerInstance); + // Current assignment should already be balanced + assertEquals(_strategyWithPartition.rebalanceTable(currentAssignment, null), currentAssignment); + } +} diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeBalanceNumSegmentAssignmentStrategyTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeBalanceNumSegmentAssignmentStrategyTest.java new file mode 100644 index 0000000..87ef462 --- /dev/null +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeBalanceNumSegmentAssignmentStrategyTest.java @@ -0,0 +1,208 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.helix.core.assignment.segment; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import org.apache.commons.configuration.BaseConfiguration; +import org.apache.helix.HelixManager; +import org.apache.helix.ZNRecord; +import org.apache.helix.store.zk.ZkHelixPropertyStore; +import org.apache.pinot.common.config.TableConfig; +import org.apache.pinot.common.metadata.ZKMetadataProvider; +import org.apache.pinot.common.utils.CommonConstants; +import org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.RealtimeSegmentOnlineOfflineStateModel; +import org.apache.pinot.common.utils.InstancePartitionsType; +import org.apache.pinot.common.utils.LLCSegmentName; +import org.apache.pinot.controller.helix.core.assignment.InstancePartitions; +import org.apache.pinot.controller.helix.core.rebalance.RebalanceUserConfigConstants; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; + + +public class RealtimeBalanceNumSegmentAssignmentStrategyTest { + private static final int NUM_REPLICAS = 3; + private static final int NUM_PARTITIONS = 4; + private static final int NUM_SEGMENTS = 100; + private static final String CONSUMING_INSTANCE_NAME_PREFIX = "consumingInstance_"; + private static final int NUM_CONSUMING_INSTANCES = 9; + private static final List<String> CONSUMING_INSTANCES = + SegmentAssignmentTestUtils.getNameList(CONSUMING_INSTANCE_NAME_PREFIX, NUM_CONSUMING_INSTANCES); + private static final String COMPLETED_INSTANCE_NAME_PREFIX = "completedInstance_"; + private static final int NUM_COMPLETED_INSTANCES = 10; + private static final List<String> COMPLETED_INSTANCES = + SegmentAssignmentTestUtils.getNameList(COMPLETED_INSTANCE_NAME_PREFIX, NUM_COMPLETED_INSTANCES); + private static final String RAW_TABLE_NAME = "testTable"; + private static final String CONSUMING_INSTANCE_PARTITIONS_NAME = + InstancePartitionsType.CONSUMING.getInstancePartitionsName(RAW_TABLE_NAME); + private static final String COMPLETED_INSTANCE_PARTITIONS_NAME = + InstancePartitionsType.COMPLETED.getInstancePartitionsName(RAW_TABLE_NAME); + + private List<String> _segments; + private SegmentAssignmentStrategy _strategy; + + @BeforeClass + public void setUp() { + _segments = new ArrayList<>(NUM_SEGMENTS); + for (int segmentId = 0; segmentId < NUM_SEGMENTS; segmentId++) { + _segments.add(new LLCSegmentName(RAW_TABLE_NAME, segmentId % NUM_PARTITIONS, segmentId / NUM_PARTITIONS, + System.currentTimeMillis()).getSegmentName()); + } + + // Consuming instances: + // { + // 0_0=[instance_0, instance_1, instance_2, instance_3, instance_4, instance_5, instance_6, instance_7, instance_8] + // } + // p0r0 p0r1 p0r2 p1r0 p1r1 p1r2 p2r0 p2r1 p2r2 + // p3r0 p3r1 p3r2 + InstancePartitions consumingInstancePartitions = new InstancePartitions(CONSUMING_INSTANCE_PARTITIONS_NAME); + consumingInstancePartitions.setInstances(0, 0, CONSUMING_INSTANCES); + + // Completed instances: + // { + // 0_0=[instance_0, instance_1, instance_2, instance_3, instance_4, instance_5, instance_6, instance_7, instance_8, instance_9] + // } + InstancePartitions completedInstancePartitions = new InstancePartitions(COMPLETED_INSTANCE_PARTITIONS_NAME); + completedInstancePartitions.setInstances(0, 0, COMPLETED_INSTANCES); + + // Mock HelixManager + @SuppressWarnings("unchecked") + ZkHelixPropertyStore<ZNRecord> propertyStore = mock(ZkHelixPropertyStore.class); + when(propertyStore + .get(eq(ZKMetadataProvider.constructPropertyStorePathForInstancePartitions(CONSUMING_INSTANCE_PARTITIONS_NAME)), + any(), anyInt())).thenReturn(consumingInstancePartitions.toZNRecord()); + when(propertyStore + .get(eq(ZKMetadataProvider.constructPropertyStorePathForInstancePartitions(COMPLETED_INSTANCE_PARTITIONS_NAME)), + any(), anyInt())).thenReturn(completedInstancePartitions.toZNRecord()); + HelixManager helixManager = mock(HelixManager.class); + when(helixManager.getHelixPropertyStore()).thenReturn(propertyStore); + + TableConfig tableConfig = + new TableConfig.Builder(CommonConstants.Helix.TableType.REALTIME).setTableName(RAW_TABLE_NAME) + .setNumReplicas(NUM_REPLICAS).setLLC(true).build(); + _strategy = SegmentAssignmentStrategyFactory.getSegmentAssignmentStrategy(helixManager, tableConfig); + } + + @Test + public void testFactory() { + assertTrue(_strategy instanceof RealtimeBalanceNumSegmentAssignmentStrategy); + } + + @Test + public void testAssignSegment() { + Map<String, Map<String, String>> currentAssignment = new TreeMap<>(); + for (int segmentId = 0; segmentId < NUM_SEGMENTS; segmentId++) { + String segmentName = _segments.get(segmentId); + List<String> instancesAssigned = _strategy.assignSegment(segmentName, currentAssignment); + assertEquals(instancesAssigned.size(), NUM_REPLICAS); + for (int replicaId = 0; replicaId < NUM_REPLICAS; replicaId++) { + + // Segment 0 (partition 0) should be assigned to instance 0, 1, 2 + // Segment 1 (partition 1) should be assigned to instance 3, 4, 5 + // Segment 2 (partition 2) should be assigned to instance 6, 7, 8 + // Segment 3 (partition 3) should be assigned to instance 0, 1, 2 + // Segment 4 (partition 0) should be assigned to instance 0, 1, 2 + // Segment 5 (partition 1) should be assigned to instance 3, 4, 5 + // ... + int partitionId = segmentId % NUM_PARTITIONS; + int expectedAssignedInstanceId = (partitionId * NUM_REPLICAS + replicaId) % NUM_CONSUMING_INSTANCES; + assertEquals(instancesAssigned.get(replicaId), CONSUMING_INSTANCES.get(expectedAssignedInstanceId)); + } + addToAssignment(currentAssignment, segmentId, instancesAssigned); + } + } + + @Test + public void testRelocateCompletedSegments() { + Map<String, Map<String, String>> currentAssignment = new TreeMap<>(); + for (int segmentId = 0; segmentId < NUM_SEGMENTS; segmentId++) { + String segmentName = _segments.get(segmentId); + List<String> instancesAssigned = _strategy.assignSegment(segmentName, currentAssignment); + addToAssignment(currentAssignment, segmentId, instancesAssigned); + } + + // There should be 100 segments assigned + assertEquals(currentAssignment.size(), NUM_SEGMENTS); + // Each segment should have 3 replicas + for (Map<String, String> instanceStateMap : currentAssignment.values()) { + assertEquals(instanceStateMap.size(), NUM_REPLICAS); + } + + // Rebalance should relocate all completed (ONLINE) segments to the completed instances + Map<String, Map<String, String>> newAssignment = + _strategy.rebalanceTable(currentAssignment, new BaseConfiguration()); + assertEquals(newAssignment.size(), NUM_SEGMENTS); + for (int segmentId = 0; segmentId < NUM_SEGMENTS; segmentId++) { + if (segmentId < NUM_SEGMENTS - NUM_PARTITIONS) { + // Completed (ONLINE) segments + Map<String, String> instanceStateMap = newAssignment.get(_segments.get(segmentId)); + for (Map.Entry<String, String> entry : instanceStateMap.entrySet()) { + assertTrue(entry.getKey().startsWith(COMPLETED_INSTANCE_NAME_PREFIX)); + assertEquals(entry.getValue(), RealtimeSegmentOnlineOfflineStateModel.ONLINE); + } + } else { + // Consuming (CONSUMING) segments + Map<String, String> instanceStateMap = newAssignment.get(_segments.get(segmentId)); + for (Map.Entry<String, String> entry : instanceStateMap.entrySet()) { + assertTrue(entry.getKey().startsWith(CONSUMING_INSTANCE_NAME_PREFIX)); + assertEquals(entry.getValue(), RealtimeSegmentOnlineOfflineStateModel.CONSUMING); + } + } + } + // Relocated segments should be balanced (each instance should have at least 28 segments assigned) + int[] numSegmentsAssignedPerInstance = + SegmentAssignmentUtils.getNumSegmentsAssignedPerInstance(newAssignment, COMPLETED_INSTANCES); + assertEquals(numSegmentsAssignedPerInstance.length, NUM_COMPLETED_INSTANCES); + int expectedMinNumSegmentsPerInstance = (NUM_SEGMENTS - NUM_PARTITIONS) * NUM_REPLICAS / NUM_COMPLETED_INSTANCES; + for (int i = 0; i < NUM_COMPLETED_INSTANCES; i++) { + assertTrue(numSegmentsAssignedPerInstance[i] >= expectedMinNumSegmentsPerInstance); + } + + // Rebalance all segments (both completed and consuming) should give the same assignment + BaseConfiguration config = new BaseConfiguration(); + config.setProperty(RebalanceUserConfigConstants.INCLUDE_CONSUMING, true); + assertEquals(_strategy.rebalanceTable(currentAssignment, config), newAssignment); + } + + private void addToAssignment(Map<String, Map<String, String>> currentAssignment, int segmentId, + List<String> instancesAssigned) { + // Change the state of the last segment in the same partition from CONSUMING to ONLINE if exists + if (segmentId >= NUM_PARTITIONS) { + String lastSegmentInPartition = _segments.get(segmentId - NUM_PARTITIONS); + Map<String, String> instanceStateMap = currentAssignment.get(lastSegmentInPartition); + currentAssignment.put(lastSegmentInPartition, SegmentAssignmentUtils + .getInstanceStateMap(new ArrayList<>(instanceStateMap.keySet()), + RealtimeSegmentOnlineOfflineStateModel.ONLINE)); + } + + // Add the new segment into the assignment as CONSUMING + currentAssignment.put(_segments.get(segmentId), SegmentAssignmentUtils + .getInstanceStateMap(instancesAssigned, RealtimeSegmentOnlineOfflineStateModel.CONSUMING)); + } +} diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentStrategyTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentStrategyTest.java new file mode 100644 index 0000000..c4c9a82 --- /dev/null +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentStrategyTest.java @@ -0,0 +1,230 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.helix.core.assignment.segment; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import org.apache.commons.configuration.BaseConfiguration; +import org.apache.helix.HelixManager; +import org.apache.helix.ZNRecord; +import org.apache.helix.store.zk.ZkHelixPropertyStore; +import org.apache.pinot.common.config.TableConfig; +import org.apache.pinot.common.metadata.ZKMetadataProvider; +import org.apache.pinot.common.utils.CommonConstants; +import org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.RealtimeSegmentOnlineOfflineStateModel; +import org.apache.pinot.common.utils.InstancePartitionsType; +import org.apache.pinot.common.utils.LLCSegmentName; +import org.apache.pinot.controller.helix.core.assignment.InstancePartitions; +import org.apache.pinot.controller.helix.core.rebalance.RebalanceUserConfigConstants; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; + + +public class RealtimeReplicaGroupSegmentAssignmentStrategyTest { + private static final int NUM_REPLICAS = 3; + private static final int NUM_PARTITIONS = 4; + private static final int NUM_SEGMENTS = 100; + private static final String CONSUMING_INSTANCE_NAME_PREFIX = "consumingInstance_"; + private static final int NUM_CONSUMING_INSTANCES = 9; + private static final List<String> CONSUMING_INSTANCES = + SegmentAssignmentTestUtils.getNameList(CONSUMING_INSTANCE_NAME_PREFIX, NUM_CONSUMING_INSTANCES); + private static final String COMPLETED_INSTANCE_NAME_PREFIX = "completedInstance_"; + private static final int NUM_COMPLETED_INSTANCES = 12; + private static final List<String> COMPLETED_INSTANCES = + SegmentAssignmentTestUtils.getNameList(COMPLETED_INSTANCE_NAME_PREFIX, NUM_COMPLETED_INSTANCES); + private static final String RAW_TABLE_NAME = "testTable"; + private static final String CONSUMING_INSTANCE_PARTITIONS_NAME = + InstancePartitionsType.CONSUMING.getInstancePartitionsName(RAW_TABLE_NAME); + private static final String COMPLETED_INSTANCE_PARTITIONS_NAME = + InstancePartitionsType.COMPLETED.getInstancePartitionsName(RAW_TABLE_NAME); + + private List<String> _segments; + private SegmentAssignmentStrategy _strategy; + + @BeforeClass + public void setUp() { + _segments = new ArrayList<>(NUM_SEGMENTS); + for (int segmentId = 0; segmentId < NUM_SEGMENTS; segmentId++) { + _segments.add(new LLCSegmentName(RAW_TABLE_NAME, segmentId % NUM_PARTITIONS, segmentId / NUM_PARTITIONS, + System.currentTimeMillis()).getSegmentName()); + } + + // Consuming instances: + // { + // 0_0=[instance_0, instance_1, instance_2], + // 0_1=[instance_3, instance_4, instance_5], + // 0_2=[instance_6, instance_7, instance_8] + // } + // p0 p1 p2 + // p3 + InstancePartitions consumingInstancePartitions = new InstancePartitions(CONSUMING_INSTANCE_PARTITIONS_NAME); + int numConsumingInstancesPerReplica = NUM_CONSUMING_INSTANCES / NUM_REPLICAS; + int consumingInstanceIdToAdd = 0; + for (int replicaId = 0; replicaId < NUM_REPLICAS; replicaId++) { + List<String> consumingInstancesForReplica = new ArrayList<>(numConsumingInstancesPerReplica); + for (int i = 0; i < numConsumingInstancesPerReplica; i++) { + consumingInstancesForReplica.add(CONSUMING_INSTANCES.get(consumingInstanceIdToAdd++)); + } + consumingInstancePartitions.setInstances(0, replicaId, consumingInstancesForReplica); + } + + // Completed instances: + // { + // 0_0=[instance_0, instance_1, instance_2, instance_3], + // 0_1=[instance_4, instance_5, instance_6, instance_7], + // 0_2=[instance_8, instance_9, instance_10, instance_11] + // } + InstancePartitions completedInstancePartitions = new InstancePartitions(COMPLETED_INSTANCE_PARTITIONS_NAME); + int numCompletedInstancesPerReplica = NUM_COMPLETED_INSTANCES / NUM_REPLICAS; + int completedInstanceIdToAdd = 0; + for (int replicaId = 0; replicaId < NUM_REPLICAS; replicaId++) { + List<String> completedInstancesForReplica = new ArrayList<>(numCompletedInstancesPerReplica); + for (int i = 0; i < numCompletedInstancesPerReplica; i++) { + completedInstancesForReplica.add(COMPLETED_INSTANCES.get(completedInstanceIdToAdd++)); + } + completedInstancePartitions.setInstances(0, replicaId, completedInstancesForReplica); + } + + // Mock HelixManager + @SuppressWarnings("unchecked") + ZkHelixPropertyStore<ZNRecord> propertyStore = mock(ZkHelixPropertyStore.class); + when(propertyStore + .get(eq(ZKMetadataProvider.constructPropertyStorePathForInstancePartitions(CONSUMING_INSTANCE_PARTITIONS_NAME)), + any(), anyInt())).thenReturn(consumingInstancePartitions.toZNRecord()); + when(propertyStore + .get(eq(ZKMetadataProvider.constructPropertyStorePathForInstancePartitions(COMPLETED_INSTANCE_PARTITIONS_NAME)), + any(), anyInt())).thenReturn(completedInstancePartitions.toZNRecord()); + HelixManager helixManager = mock(HelixManager.class); + when(helixManager.getHelixPropertyStore()).thenReturn(propertyStore); + + TableConfig tableConfig = + new TableConfig.Builder(CommonConstants.Helix.TableType.REALTIME).setTableName(RAW_TABLE_NAME) + .setNumReplicas(NUM_REPLICAS).setLLC(true).setSegmentAssignmentStrategy( + SegmentAssignmentStrategyFactory.Strategy.ReplicaGroupSegmentAssignmentStrategy.name()).build(); + _strategy = SegmentAssignmentStrategyFactory.getSegmentAssignmentStrategy(helixManager, tableConfig); + } + + @Test + public void testFactory() { + assertTrue(_strategy instanceof RealtimeReplicaGroupSegmentAssignmentStrategy); + } + + @Test + public void testAssignSegment() { + int numInstancesPerReplica = NUM_CONSUMING_INSTANCES / NUM_REPLICAS; + Map<String, Map<String, String>> currentAssignment = new TreeMap<>(); + for (int segmentId = 0; segmentId < NUM_SEGMENTS; segmentId++) { + String segmentName = _segments.get(segmentId); + List<String> instancesAssigned = _strategy.assignSegment(segmentName, currentAssignment); + assertEquals(instancesAssigned.size(), NUM_REPLICAS); + for (int replicaId = 0; replicaId < NUM_REPLICAS; replicaId++) { + + // Segment 0 (partition 0) should be assigned to instance 0, 3, 6 + // Segment 1 (partition 1) should be assigned to instance 1, 4, 7 + // Segment 2 (partition 2) should be assigned to instance 2, 5, 8 + // Segment 3 (partition 3) should be assigned to instance 0, 3, 6 + // Segment 4 (partition 0) should be assigned to instance 0, 3, 6 + // Segment 5 (partition 1) should be assigned to instance 1, 4, 7 + // ... + int partitionId = segmentId % NUM_PARTITIONS; + int expectedAssignedInstanceId = partitionId % numInstancesPerReplica + replicaId * numInstancesPerReplica; + assertEquals(instancesAssigned.get(replicaId), CONSUMING_INSTANCES.get(expectedAssignedInstanceId)); + } + addToAssignment(currentAssignment, segmentId, instancesAssigned); + } + } + + @Test + public void testRelocateCompletedSegments() { + Map<String, Map<String, String>> currentAssignment = new TreeMap<>(); + for (int segmentId = 0; segmentId < NUM_SEGMENTS; segmentId++) { + String segmentName = _segments.get(segmentId); + List<String> instancesAssigned = _strategy.assignSegment(segmentName, currentAssignment); + addToAssignment(currentAssignment, segmentId, instancesAssigned); + } + + // There should be 100 segments assigned + assertEquals(currentAssignment.size(), NUM_SEGMENTS); + // Each segment should have 3 replicas + for (Map<String, String> instanceStateMap : currentAssignment.values()) { + assertEquals(instanceStateMap.size(), NUM_REPLICAS); + } + + // Rebalance should relocate all completed (ONLINE) segments to the completed instances + Map<String, Map<String, String>> newAssignment = + _strategy.rebalanceTable(currentAssignment, new BaseConfiguration()); + assertEquals(newAssignment.size(), NUM_SEGMENTS); + for (int segmentId = 0; segmentId < NUM_SEGMENTS; segmentId++) { + if (segmentId < NUM_SEGMENTS - NUM_PARTITIONS) { + // Completed (ONLINE) segments + Map<String, String> instanceStateMap = newAssignment.get(_segments.get(segmentId)); + for (Map.Entry<String, String> entry : instanceStateMap.entrySet()) { + assertTrue(entry.getKey().startsWith(COMPLETED_INSTANCE_NAME_PREFIX)); + assertEquals(entry.getValue(), RealtimeSegmentOnlineOfflineStateModel.ONLINE); + } + } else { + // Consuming (CONSUMING) segments + Map<String, String> instanceStateMap = newAssignment.get(_segments.get(segmentId)); + for (Map.Entry<String, String> entry : instanceStateMap.entrySet()) { + assertTrue(entry.getKey().startsWith(CONSUMING_INSTANCE_NAME_PREFIX)); + assertEquals(entry.getValue(), RealtimeSegmentOnlineOfflineStateModel.CONSUMING); + } + } + } + // Relocated segments should be balanced (each instance should have 24 segments assigned) + int[] numSegmentsAssignedPerInstance = + SegmentAssignmentUtils.getNumSegmentsAssignedPerInstance(newAssignment, COMPLETED_INSTANCES); + int[] expectedNumSegmentsAssignedPerInstance = new int[NUM_COMPLETED_INSTANCES]; + int numSegmentsPerInstance = (NUM_SEGMENTS - NUM_PARTITIONS) * NUM_REPLICAS / NUM_COMPLETED_INSTANCES; + Arrays.fill(expectedNumSegmentsAssignedPerInstance, numSegmentsPerInstance); + assertEquals(numSegmentsAssignedPerInstance, expectedNumSegmentsAssignedPerInstance); + + // Rebalance all segments (both completed and consuming) should give the same assignment + BaseConfiguration config = new BaseConfiguration(); + config.setProperty(RebalanceUserConfigConstants.INCLUDE_CONSUMING, true); + assertEquals(_strategy.rebalanceTable(currentAssignment, config), newAssignment); + } + + private void addToAssignment(Map<String, Map<String, String>> currentAssignment, int segmentId, + List<String> instancesAssigned) { + // Change the state of the last segment in the same partition from CONSUMING to ONLINE if exists + if (segmentId >= NUM_PARTITIONS) { + String lastSegmentInPartition = _segments.get(segmentId - NUM_PARTITIONS); + Map<String, String> instanceStateMap = currentAssignment.get(lastSegmentInPartition); + currentAssignment.put(lastSegmentInPartition, SegmentAssignmentUtils + .getInstanceStateMap(new ArrayList<>(instanceStateMap.keySet()), + RealtimeSegmentOnlineOfflineStateModel.ONLINE)); + } + + // Add the new segment into the assignment as CONSUMING + currentAssignment.put(_segments.get(segmentId), SegmentAssignmentUtils + .getInstanceStateMap(instancesAssigned, RealtimeSegmentOnlineOfflineStateModel.CONSUMING)); + } +} diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentTestUtils.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentTestUtils.java new file mode 100644 index 0000000..c9a7ae0 --- /dev/null +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentTestUtils.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.helix.core.assignment.segment; + +import java.util.ArrayList; +import java.util.List; + + +public class SegmentAssignmentTestUtils { + private SegmentAssignmentTestUtils() { + } + + /** + * Returns a list of names. + */ + public static List<String> getNameList(String namePrefix, int numNames) { + List<String> names = new ArrayList<>(numNames); + for (int nameId = 0; nameId < numNames; nameId++) { + names.add(namePrefix + nameId); + } + return names; + } +} diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtilsTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtilsTest.java new file mode 100644 index 0000000..86cfac6 --- /dev/null +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtilsTest.java @@ -0,0 +1,434 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.helix.core.assignment.segment; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentOnlineOfflineStateModel; +import org.apache.pinot.controller.helix.core.assignment.InstancePartitions; +import org.testng.annotations.Test; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; + + +public class SegmentAssignmentUtilsTest { + private static final int NUM_REPLICAS = 3; + private static final String SEGMENT_NAME_PREFIX = "segment_"; + private static final String INSTANCE_NAME_PREFIX = "instance_"; + + @Test + public void testRebalanceTableWithHelixAutoRebalanceStrategy() { + int numSegments = 100; + List<String> segments = SegmentAssignmentTestUtils.getNameList(SEGMENT_NAME_PREFIX, numSegments); + int numInstances = 10; + List<String> instances = SegmentAssignmentTestUtils.getNameList(INSTANCE_NAME_PREFIX, numInstances); + + // Uniformly spray segments to the instances: + // [instance_0, instance_1, instance_2, instance_3, instance_4, instance_5, instance_6, instance_7, instance_8, instance_9] + // segment_0(r0) segment_0(r1) segment_0(r2) segment_1(r0) segment_1(r1) segment_1(r2) segment_2(r0) segment_2(r1) segment_2(r2) segment_3(r0) + // segment_3(r1) segment_3(r2) segment_4(r0) segment_4(r1) segment_4(r2) segment_5(r0) segment_5(r1) segment_5(r2) segment_6(r0) segment_6(r1) + // ... + Map<String, Map<String, String>> currentAssignment = new TreeMap<>(); + int assignedInstanceId = 0; + for (String segmentName : segments) { + List<String> instancesAssigned = new ArrayList<>(NUM_REPLICAS); + for (int replicaId = 0; replicaId < NUM_REPLICAS; replicaId++) { + instancesAssigned.add(instances.get(assignedInstanceId)); + assignedInstanceId = (assignedInstanceId + 1) % numInstances; + } + currentAssignment.put(segmentName, + SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, SegmentOnlineOfflineStateModel.ONLINE)); + } + + // There should be 100 segments assigned + assertEquals(currentAssignment.size(), numSegments); + // Each segment should have 3 replicas + for (Map<String, String> instanceStateMap : currentAssignment.values()) { + assertEquals(instanceStateMap.size(), NUM_REPLICAS); + } + // Each instance should have 30 segments assigned + int[] numSegmentsAssignedPerInstance = + SegmentAssignmentUtils.getNumSegmentsAssignedPerInstance(currentAssignment, instances); + int[] expectedNumSegmentsAssignedPerInstance = new int[numInstances]; + int numSegmentsPerInstance = numSegments * NUM_REPLICAS / numInstances; + Arrays.fill(expectedNumSegmentsAssignedPerInstance, numSegmentsPerInstance); + assertEquals(numSegmentsAssignedPerInstance, expectedNumSegmentsAssignedPerInstance); + // Current assignment should already be balanced + assertEquals( + SegmentAssignmentUtils.rebalanceTableWithHelixAutoRebalanceStrategy(currentAssignment, instances, NUM_REPLICAS), + currentAssignment); + + // Replace instance_0 with instance_10 + // { + // 0_0=[instance_10, instance_1, instance_2, instance_3, instance_4, instance_5, instance_6, instance_7, instance_8, instance_9] + // } + List<String> newInstances = new ArrayList<>(instances); + String newInstanceName = INSTANCE_NAME_PREFIX + 10; + newInstances.set(0, newInstanceName); + Map<String, Map<String, String>> newAssignment = SegmentAssignmentUtils + .rebalanceTableWithHelixAutoRebalanceStrategy(currentAssignment, newInstances, NUM_REPLICAS); + // There should be 100 segments assigned + assertEquals(currentAssignment.size(), numSegments); + // Each segment should have 3 replicas + for (Map<String, String> instanceStateMap : newAssignment.values()) { + assertEquals(instanceStateMap.size(), NUM_REPLICAS); + } + // Each instance should have 30 segments assigned + numSegmentsAssignedPerInstance = + SegmentAssignmentUtils.getNumSegmentsAssignedPerInstance(newAssignment, newInstances); + assertEquals(numSegmentsAssignedPerInstance, expectedNumSegmentsAssignedPerInstance); + // All segments on instance_0 should be moved to instance_10 + Map<String, Integer> numSegmentsToBeMovedPerInstance = + SegmentAssignmentUtils.getNumSegmentsToBeMovedPerInstance(currentAssignment, newAssignment); + assertEquals(numSegmentsToBeMovedPerInstance.size(), 1); + assertEquals((int) numSegmentsToBeMovedPerInstance.get(newInstanceName), numSegmentsPerInstance); + String oldInstanceName = INSTANCE_NAME_PREFIX + 0; + for (String segmentName : segments) { + if (currentAssignment.get(segmentName).containsKey(oldInstanceName)) { + assertTrue(newAssignment.get(segmentName).containsKey(newInstanceName)); + } + } + + // Remove 5 instances + // { + // 0_0=[instance_0, instance_1, instance_2, instance_3, instance_4] + // } + int newNumInstances = numInstances - 5; + newInstances = SegmentAssignmentTestUtils.getNameList(INSTANCE_NAME_PREFIX, newNumInstances); + newAssignment = SegmentAssignmentUtils + .rebalanceTableWithHelixAutoRebalanceStrategy(currentAssignment, newInstances, NUM_REPLICAS); + // There should be 100 segments assigned + assertEquals(newAssignment.size(), numSegments); + // Each segment should have 3 replicas + for (Map<String, String> instanceStateMap : newAssignment.values()) { + assertEquals(instanceStateMap.size(), NUM_REPLICAS); + } + // The segments are not perfectly balanced, but should be deterministic + numSegmentsAssignedPerInstance = + SegmentAssignmentUtils.getNumSegmentsAssignedPerInstance(newAssignment, newInstances); + assertEquals(numSegmentsAssignedPerInstance[0], 56); + assertEquals(numSegmentsAssignedPerInstance[1], 60); + assertEquals(numSegmentsAssignedPerInstance[2], 60); + assertEquals(numSegmentsAssignedPerInstance[3], 60); + assertEquals(numSegmentsAssignedPerInstance[4], 64); + numSegmentsToBeMovedPerInstance = + SegmentAssignmentUtils.getNumSegmentsToBeMovedPerInstance(currentAssignment, newAssignment); + assertEquals(numSegmentsToBeMovedPerInstance.size(), newNumInstances); + assertEquals((int) numSegmentsToBeMovedPerInstance.get(newInstances.get(0)), 26); + assertEquals((int) numSegmentsToBeMovedPerInstance.get(newInstances.get(1)), 30); + assertEquals((int) numSegmentsToBeMovedPerInstance.get(newInstances.get(2)), 30); + assertEquals((int) numSegmentsToBeMovedPerInstance.get(newInstances.get(3)), 30); + assertEquals((int) numSegmentsToBeMovedPerInstance.get(newInstances.get(4)), 34); + + // Add 5 instances + // { + // 0_0=[instance_0, instance_1, instance_2, instance_3, instance_4, instance_5, instance_6, instance_7, instance_8, instance_9, instance_10, instance_11, instance_12, instance_13, instance_14] + // } + newNumInstances = numInstances + 5; + newInstances = SegmentAssignmentTestUtils.getNameList(INSTANCE_NAME_PREFIX, newNumInstances); + newAssignment = SegmentAssignmentUtils + .rebalanceTableWithHelixAutoRebalanceStrategy(currentAssignment, newInstances, NUM_REPLICAS); + // There should be 100 segments assigned + assertEquals(newAssignment.size(), numSegments); + // Each segment should have 3 replicas + for (Map<String, String> instanceStateMap : newAssignment.values()) { + assertEquals(instanceStateMap.size(), NUM_REPLICAS); + } + // Each instance should have 20 segments assigned + numSegmentsAssignedPerInstance = + SegmentAssignmentUtils.getNumSegmentsAssignedPerInstance(newAssignment, newInstances); + expectedNumSegmentsAssignedPerInstance = new int[newNumInstances]; + int newNumSegmentsPerInstance = numSegments * NUM_REPLICAS / newNumInstances; + Arrays.fill(expectedNumSegmentsAssignedPerInstance, newNumSegmentsPerInstance); + assertEquals(numSegmentsAssignedPerInstance, expectedNumSegmentsAssignedPerInstance); + // Each new added instance should have 20 segments to be moved to it + numSegmentsToBeMovedPerInstance = + SegmentAssignmentUtils.getNumSegmentsToBeMovedPerInstance(currentAssignment, newAssignment); + assertEquals(numSegmentsToBeMovedPerInstance.size(), 5); + for (int instanceId = numInstances; instanceId < newNumInstances; instanceId++) { + assertEquals((int) numSegmentsToBeMovedPerInstance.get(newInstances.get(instanceId)), newNumSegmentsPerInstance); + } + + // Change all instances + // { + // 0_0=[i_0, i_1, i_2, i_3, i_4, i_5, i_6, i_7, i_8, i_9] + // } + String newInstanceNamePrefix = "i_"; + newInstances = SegmentAssignmentTestUtils.getNameList(newInstanceNamePrefix, numInstances); + newAssignment = SegmentAssignmentUtils + .rebalanceTableWithHelixAutoRebalanceStrategy(currentAssignment, newInstances, NUM_REPLICAS); + // There should be 100 segments assigned + assertEquals(newAssignment.size(), numSegments); + // Each segment should have 3 replicas + for (Map<String, String> instanceStateMap : newAssignment.values()) { + assertEquals(instanceStateMap.size(), NUM_REPLICAS); + } + // Each instance should have 30 segments assigned + numSegmentsAssignedPerInstance = + SegmentAssignmentUtils.getNumSegmentsAssignedPerInstance(newAssignment, newInstances); + expectedNumSegmentsAssignedPerInstance = new int[numInstances]; + Arrays.fill(expectedNumSegmentsAssignedPerInstance, numSegmentsPerInstance); + assertEquals(numSegmentsAssignedPerInstance, expectedNumSegmentsAssignedPerInstance); + // Each instance should have 30 segments to be moved to it + numSegmentsToBeMovedPerInstance = + SegmentAssignmentUtils.getNumSegmentsToBeMovedPerInstance(currentAssignment, newAssignment); + assertEquals(numSegmentsToBeMovedPerInstance.size(), numInstances); + for (String instanceName : newInstances) { + assertEquals((int) numSegmentsToBeMovedPerInstance.get(instanceName), numSegmentsPerInstance); + } + } + + @Test + public void testRebalanceReplicaGroupBasedTable() { + // Table is rebalanced on a per partition basis, so testing rebalancing one partition is enough + + int numSegments = 90; + List<String> segments = SegmentAssignmentTestUtils.getNameList(SEGMENT_NAME_PREFIX, numSegments); + Map<Integer, Set<String>> partitionIdToSegmentsMap = Collections.singletonMap(0, new HashSet<>(segments)); + int numInstances = 9; + List<String> instances = SegmentAssignmentTestUtils.getNameList(INSTANCE_NAME_PREFIX, numInstances); + + // { + // 0_0=[instance_0, instance_1, instance_2], + // 0_1=[instance_3, instance_4, instance_5], + // 0_2=[instance_6, instance_7, instance_8] + // } + InstancePartitions instancePartitions = new InstancePartitions(null); + int numInstancesPerReplica = numInstances / NUM_REPLICAS; + int instanceIdToAdd = 0; + for (int replicaId = 0; replicaId < NUM_REPLICAS; replicaId++) { + List<String> instancesForReplica = new ArrayList<>(numInstancesPerReplica); + for (int i = 0; i < numInstancesPerReplica; i++) { + instancesForReplica.add(instances.get(instanceIdToAdd++)); + } + instancePartitions.setInstances(0, replicaId, instancesForReplica); + } + + // Uniformly spray segments to the instances: + // Replica group 0: [instance_0, instance_1, instance_2], + // Replica group 1: [instance_3, instance_4, instance_5], + // Replica group 2: [instance_6, instance_7, instance_8] + // segment_0 segment_1 segment_2 + // segment_3 segment_4 segment_5 + // ... + Map<String, Map<String, String>> currentAssignment = new TreeMap<>(); + for (int segmentId = 0; segmentId < numSegments; segmentId++) { + List<String> instancesAssigned = new ArrayList<>(NUM_REPLICAS); + for (int replicaId = 0; replicaId < NUM_REPLICAS; replicaId++) { + int assignedInstanceId = segmentId % numInstancesPerReplica + replicaId * numInstancesPerReplica; + instancesAssigned.add(instances.get(assignedInstanceId)); + } + currentAssignment.put(segments.get(segmentId), + SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, SegmentOnlineOfflineStateModel.ONLINE)); + } + + // There should be 90 segments assigned + assertEquals(currentAssignment.size(), numSegments); + // Each segment should have 3 replicas + for (Map<String, String> instanceStateMap : currentAssignment.values()) { + assertEquals(instanceStateMap.size(), NUM_REPLICAS); + } + // Each instance should have 30 segments assigned + int[] numSegmentsAssignedPerInstance = + SegmentAssignmentUtils.getNumSegmentsAssignedPerInstance(currentAssignment, instances); + int[] expectedNumSegmentsAssignedPerInstance = new int[numInstances]; + int numSegmentsPerInstance = numSegments * NUM_REPLICAS / numInstances; + Arrays.fill(expectedNumSegmentsAssignedPerInstance, numSegmentsPerInstance); + assertEquals(numSegmentsAssignedPerInstance, expectedNumSegmentsAssignedPerInstance); + // Current assignment should already be balanced + assertEquals(SegmentAssignmentUtils + .rebalanceReplicaGroupBasedTable(currentAssignment, instancePartitions, partitionIdToSegmentsMap), + currentAssignment); + + // Replace instance_0 with instance_9, instance_4 with instance_10 + // { + // 0_0=[instance_9, instance_1, instance_2], + // 0_1=[instance_3, instance_10, instance_5], + // 0_2=[instance_6, instance_7, instance_8] + // } + List<String> newInstances = new ArrayList<>(numInstances); + List<String> newReplica0Instances = new ArrayList<>(instancePartitions.getInstances(0, 0)); + String newReplica0Instance = INSTANCE_NAME_PREFIX + 9; + newReplica0Instances.set(0, newReplica0Instance); + newInstances.addAll(newReplica0Instances); + List<String> newReplica1Instances = new ArrayList<>(instancePartitions.getInstances(0, 1)); + String newReplica1Instance = INSTANCE_NAME_PREFIX + 10; + newReplica1Instances.set(1, newReplica1Instance); + newInstances.addAll(newReplica1Instances); + List<String> newReplica2Instances = instancePartitions.getInstances(0, 2); + newInstances.addAll(newReplica2Instances); + InstancePartitions newInstancePartitions = new InstancePartitions(null); + newInstancePartitions.setInstances(0, 0, newReplica0Instances); + newInstancePartitions.setInstances(0, 1, newReplica1Instances); + newInstancePartitions.setInstances(0, 2, newReplica2Instances); + Map<String, Map<String, String>> newAssignment = SegmentAssignmentUtils + .rebalanceReplicaGroupBasedTable(currentAssignment, newInstancePartitions, partitionIdToSegmentsMap); + // There should be 90 segments assigned + assertEquals(newAssignment.size(), numSegments); + // Each segment should have 3 replicas + for (Map<String, String> instanceStateMap : newAssignment.values()) { + assertEquals(instanceStateMap.size(), NUM_REPLICAS); + } + // Each instance should have 30 segments assigned + numSegmentsAssignedPerInstance = + SegmentAssignmentUtils.getNumSegmentsAssignedPerInstance(newAssignment, newInstances); + assertEquals(numSegmentsAssignedPerInstance, expectedNumSegmentsAssignedPerInstance); + // All segments on instance_0 should be moved to instance_9, all segments on instance_4 should be moved to + // instance_10 + Map<String, Integer> numSegmentsToBeMovedPerInstance = + SegmentAssignmentUtils.getNumSegmentsToBeMovedPerInstance(currentAssignment, newAssignment); + assertEquals(numSegmentsToBeMovedPerInstance.size(), 2); + assertEquals((int) numSegmentsToBeMovedPerInstance.get(newReplica0Instance), numSegmentsPerInstance); + assertEquals((int) numSegmentsToBeMovedPerInstance.get(newReplica1Instance), numSegmentsPerInstance); + String replica0OldInstanceName = INSTANCE_NAME_PREFIX + 0; + String replica1OldInstanceName = INSTANCE_NAME_PREFIX + 4; + for (String segmentName : segments) { + Map<String, String> oldInstanceStateMap = currentAssignment.get(segmentName); + if (oldInstanceStateMap.containsKey(replica0OldInstanceName)) { + assertTrue(newAssignment.get(segmentName).containsKey(newReplica0Instance)); + } + if (oldInstanceStateMap.containsKey(replica1OldInstanceName)) { + assertTrue(newAssignment.get(segmentName).containsKey(newReplica1Instance)); + } + } + + // Remove 3 instances (1 from each replica) + // { + // 0_0=[instance_0, instance_1], + // 0_1=[instance_3, instance_4], + // 0_2=[instance_6, instance_7] + // } + int newNumInstances = numInstances - 3; + int newNumInstancesPerReplica = newNumInstances / NUM_REPLICAS; + newInstances = new ArrayList<>(newNumInstances); + for (int replicaId = 0; replicaId < NUM_REPLICAS; replicaId++) { + List<String> newInstancesForReplica = + instancePartitions.getInstances(0, replicaId).subList(0, newNumInstancesPerReplica); + newInstancePartitions.setInstances(0, replicaId, newInstancesForReplica); + newInstances.addAll(newInstancesForReplica); + } + newAssignment = SegmentAssignmentUtils + .rebalanceReplicaGroupBasedTable(currentAssignment, newInstancePartitions, partitionIdToSegmentsMap); + // There should be 90 segments assigned + assertEquals(newAssignment.size(), numSegments); + // Each segment should have 3 replicas + for (Map<String, String> instanceStateMap : newAssignment.values()) { + assertEquals(instanceStateMap.size(), NUM_REPLICAS); + } + // Each instance should have 45 segments assigned + numSegmentsAssignedPerInstance = + SegmentAssignmentUtils.getNumSegmentsAssignedPerInstance(newAssignment, newInstances); + expectedNumSegmentsAssignedPerInstance = new int[newNumInstances]; + int newNumSegmentsPerInstance = numSegments * NUM_REPLICAS / newNumInstances; + Arrays.fill(expectedNumSegmentsAssignedPerInstance, newNumSegmentsPerInstance); + assertEquals(numSegmentsAssignedPerInstance, expectedNumSegmentsAssignedPerInstance); + // Each instance should have 15 segments to be moved to it + numSegmentsToBeMovedPerInstance = + SegmentAssignmentUtils.getNumSegmentsToBeMovedPerInstance(currentAssignment, newAssignment); + assertEquals(numSegmentsToBeMovedPerInstance.size(), newNumInstances); + for (String instanceName : newInstances) { + assertEquals((int) numSegmentsToBeMovedPerInstance.get(instanceName), + newNumSegmentsPerInstance - numSegmentsPerInstance); + } + + // Add 6 instances (2 to each replica) + // { + // 0_0=[instance_0, instance_1, instance_2, instance_9, instance_10], + // 0_1=[instance_3, instance_4, instance_5, instance_11, instance_12], + // 0_2=[instance_6, instance_7, instance_8, instance_13, instance_14] + // } + newNumInstances = numInstances + 6; + newNumInstancesPerReplica = newNumInstances / NUM_REPLICAS; + newInstances = SegmentAssignmentTestUtils.getNameList(INSTANCE_NAME_PREFIX, newNumInstances); + instanceIdToAdd = numInstances; + for (int replicaId = 0; replicaId < NUM_REPLICAS; replicaId++) { + List<String> newInstancesForReplica = new ArrayList<>(instancePartitions.getInstances(0, replicaId)); + for (int i = 0; i < newNumInstancesPerReplica - numInstancesPerReplica; i++) { + newInstancesForReplica.add(newInstances.get(instanceIdToAdd++)); + } + newInstancePartitions.setInstances(0, replicaId, newInstancesForReplica); + } + newAssignment = SegmentAssignmentUtils + .rebalanceReplicaGroupBasedTable(currentAssignment, newInstancePartitions, partitionIdToSegmentsMap); + // There should be 90 segments assigned + assertEquals(newAssignment.size(), numSegments); + // Each segment should have 3 replicas + for (Map<String, String> instanceStateMap : newAssignment.values()) { + assertEquals(instanceStateMap.size(), NUM_REPLICAS); + } + // Each instance should have 18 segments assigned + numSegmentsAssignedPerInstance = + SegmentAssignmentUtils.getNumSegmentsAssignedPerInstance(newAssignment, newInstances); + expectedNumSegmentsAssignedPerInstance = new int[newNumInstances]; + newNumSegmentsPerInstance = numSegments * NUM_REPLICAS / newNumInstances; + Arrays.fill(expectedNumSegmentsAssignedPerInstance, newNumSegmentsPerInstance); + assertEquals(numSegmentsAssignedPerInstance, expectedNumSegmentsAssignedPerInstance); + // Each new added instance should have 18 segments to be moved to it + numSegmentsToBeMovedPerInstance = + SegmentAssignmentUtils.getNumSegmentsToBeMovedPerInstance(currentAssignment, newAssignment); + assertEquals(numSegmentsToBeMovedPerInstance.size(), 6); + for (int instanceId = numInstances; instanceId < newNumInstances; instanceId++) { + assertEquals((int) numSegmentsToBeMovedPerInstance.get(newInstances.get(instanceId)), newNumSegmentsPerInstance); + } + + // Change all instances + // { + // 0_0=[i_0, i_1, i_2], + // 0_1=[i_3, i_4, i_5], + // 0_2=[i_6, i_7, i_8] + // } + newInstances = SegmentAssignmentTestUtils.getNameList("i_", numInstances); + instanceIdToAdd = 0; + for (int replicaId = 0; replicaId < NUM_REPLICAS; replicaId++) { + List<String> instancesForReplica = new ArrayList<>(numInstancesPerReplica); + for (int i = 0; i < numInstancesPerReplica; i++) { + instancesForReplica.add(newInstances.get(instanceIdToAdd++)); + } + newInstancePartitions.setInstances(0, replicaId, instancesForReplica); + } + newAssignment = SegmentAssignmentUtils + .rebalanceReplicaGroupBasedTable(currentAssignment, newInstancePartitions, partitionIdToSegmentsMap); + // There should be 90 segments assigned + assertEquals(newAssignment.size(), numSegments); + // Each segment should have 3 replicas + for (Map<String, String> instanceStateMap : newAssignment.values()) { + assertEquals(instanceStateMap.size(), NUM_REPLICAS); + } + // Each instance should have 30 segments assigned + numSegmentsAssignedPerInstance = + SegmentAssignmentUtils.getNumSegmentsAssignedPerInstance(newAssignment, newInstances); + expectedNumSegmentsAssignedPerInstance = new int[numInstances]; + Arrays.fill(expectedNumSegmentsAssignedPerInstance, numSegmentsPerInstance); + assertEquals(numSegmentsAssignedPerInstance, expectedNumSegmentsAssignedPerInstance); + // Each instance should have 30 segments to be moved to it + numSegmentsToBeMovedPerInstance = + SegmentAssignmentUtils.getNumSegmentsToBeMovedPerInstance(currentAssignment, newAssignment); + assertEquals(numSegmentsToBeMovedPerInstance.size(), numInstances); + for (String instanceName : newInstances) { + assertEquals((int) numSegmentsToBeMovedPerInstance.get(instanceName), numSegmentsPerInstance); + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org