This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch maintain-pool-selection-for-minimizeDataMovement in repository https://gitbox.apache.org/repos/asf/pinot.git
commit 2a934ef00dcccb74a60312309300fcdbd013dd92 Author: jlli_LinkedIn <j...@linkedin.com> AuthorDate: Thu Nov 16 21:30:54 2023 -0800 Address PR comments --- .../instance/InstanceTagPoolSelector.java | 24 ++++++++++------------ 1 file changed, 11 insertions(+), 13 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceTagPoolSelector.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceTagPoolSelector.java index 755e7aa713..2062a75209 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceTagPoolSelector.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceTagPoolSelector.java @@ -22,11 +22,11 @@ import com.google.common.base.Preconditions; import java.util.ArrayList; import java.util.Comparator; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.TreeMap; +import java.util.TreeSet; import javax.annotation.Nullable; import org.apache.helix.model.InstanceConfig; import org.apache.pinot.common.assignment.InstancePartitions; @@ -50,8 +50,7 @@ public class InstanceTagPoolSelector { private final InstancePartitions _existingInstancePartitions; public InstanceTagPoolSelector(InstanceTagPoolConfig tagPoolConfig, String tableNameWithType, - boolean minimizeDataMovement, - @Nullable InstancePartitions existingInstancePartitions) { + boolean minimizeDataMovement, @Nullable InstancePartitions existingInstancePartitions) { _tagPoolConfig = tagPoolConfig; _tableNameWithType = tableNameWithType; _minimizeDataMovement = minimizeDataMovement; @@ -124,7 +123,7 @@ public class InstanceTagPoolSelector { poolsToSelect = new ArrayList<>(numPoolsToSelect); if (_minimizeDataMovement && _existingInstancePartitions != null) { - Set<Integer> existingPools = new HashSet<>(numPoolsToSelect); + Set<Integer> existingPools = new TreeSet<>(); // Keep the same pool if it's already been used for the table. int existingNumPartitions = _existingInstancePartitions.getNumPartitions(); int existingNumReplicaGroups = _existingInstancePartitions.getNumReplicaGroups(); @@ -135,9 +134,10 @@ public class InstanceTagPoolSelector { List<String> existingInstances = _existingInstancePartitions.getInstances(partitionId, replicaGroupId); for (String existingInstance : existingInstances) { Integer existingPool = instanceToPoolMap.get(existingInstance); - if (existingPool != null & pools.contains(existingPool)) { - poolsToSelect.add(existingPool); - existingPools.add(existingPool); + if (existingPool != null) { + if (existingPools.add(existingPool)) { + poolsToSelect.add(existingPool); + } foundExistingPoolForReplicaGroup = true; break; } @@ -147,12 +147,10 @@ public class InstanceTagPoolSelector { LOGGER.info("Keep the same pool: {} for table: {}", existingPools, _tableNameWithType); // Pick a pool from remainingPools that isn't used before. List<Integer> remainingPools = new ArrayList<>(pools); - remainingPools.retainAll(existingPools); - // Skip selecting the existing pool. - for (int i = 0; i < numPoolsToSelect; i++) { - if (existingPools.contains(i)) { - continue; - } + remainingPools.removeAll(existingPools); + // Select from the remaining pools. + int remainingNumPoolsToSelect = numPoolsToSelect - poolsToSelect.size(); + for (int i = 0; i < remainingNumPoolsToSelect; i++) { poolsToSelect.add(remainingPools.remove(i % remainingPools.size())); } } else { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org