This is an automated email from the ASF dual-hosted git repository. nehapawar pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push: new e5029b6 Minor improvements as encountered while studying replica groups (#4180) e5029b6 is described below commit e5029b6506ef18953dfa8b390ca07c3eb9ea8a6d Author: Neha Pawar <npa...@linkedin.com> AuthorDate: Wed May 1 10:36:59 2019 -0700 Minor improvements as encountered while studying replica groups (#4180) --- .../broker/routing/RoutingTableBuilderFactory.java | 2 +- .../builder/GeneratorBasedRoutingTableBuilder.java | 22 +++++----------------- .../PartitionAwareOfflineRoutingTableBuilder.java | 9 +++------ .../common/partition/PartitionAssignment.java | 4 ++-- .../partition/ReplicaGroupPartitionAssignment.java | 4 ++-- .../org/apache/pinot/common/utils/LLCUtils.java | 11 ++--------- .../ReplicaGroupRebalanceSegmentStrategy.java | 4 ++-- .../ReplicaGroupSegmentAssignmentStrategy.java | 2 +- .../controller/utils/ReplicaGroupTestUtils.java | 4 ++-- 9 files changed, 20 insertions(+), 42 deletions(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/RoutingTableBuilderFactory.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/RoutingTableBuilderFactory.java index 8fb0ccf..574ccf5 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/RoutingTableBuilderFactory.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/RoutingTableBuilderFactory.java @@ -116,7 +116,7 @@ public class RoutingTableBuilderFactory { == CommonConstants.Helix.DataSource.SegmentAssignmentStrategyType.ReplicaGroupSegmentAssignmentStrategy); // Check that replica group strategy config is correctly set - boolean hasReplicaGroupStrategyConfig = (validationConfig != null); + boolean hasReplicaGroupStrategyConfig = (validationConfig.getReplicaGroupStrategyConfig() != null); // Check that the table push type is not 'refresh'. boolean isNotRefreshPush = diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/GeneratorBasedRoutingTableBuilder.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/GeneratorBasedRoutingTableBuilder.java index 23e1773..592bd6e 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/GeneratorBasedRoutingTableBuilder.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/GeneratorBasedRoutingTableBuilder.java @@ -73,7 +73,7 @@ public abstract class GeneratorBasedRoutingTableBuilder extends BaseRoutingTable return new ImmutablePair<>(routingTable, variance); } - Map<String, List<String>> generateRoutingTable(Map<String, List<String>> segmentToServersMap) { + private Map<String, List<String>> generateRoutingTable(Map<String, List<String>> segmentToServersMap) { Map<String, List<String>> routingTable = new HashMap<>(); @@ -86,11 +86,7 @@ public abstract class GeneratorBasedRoutingTableBuilder extends BaseRoutingTable for (Map.Entry<String, List<String>> entry : segmentToServersMap.entrySet()) { List<String> servers = entry.getValue(); for (String serverName : servers) { - List<String> segmentsForServer = serverToSegmentsMap.get(serverName); - if (segmentsForServer == null) { - segmentsForServer = new ArrayList<>(); - serverToSegmentsMap.put(serverName, segmentsForServer); - } + List<String> segmentsForServer = serverToSegmentsMap.computeIfAbsent(serverName, k -> new ArrayList<>()); segmentsForServer.add(entry.getKey()); } } @@ -134,12 +130,7 @@ public abstract class GeneratorBasedRoutingTableBuilder extends BaseRoutingTable // Sort all the segments to be used during assignment in ascending order of replicas PriorityQueue<Pair<String, List<String>>> segmentToReplicaSetQueue = - new PriorityQueue<>(numSegments, new Comparator<Pair<String, List<String>>>() { - @Override - public int compare(Pair<String, List<String>> firstPair, Pair<String, List<String>> secondPair) { - return Integer.compare(firstPair.getRight().size(), secondPair.getRight().size()); - } - }); + new PriorityQueue<>(numSegments, Comparator.comparingInt(pair -> pair.getRight().size())); for (Map.Entry<String, List<String>> entry : segmentToServersMap.entrySet()) { // Servers for the segment is the intersection of all servers for this segment and the servers that we have in @@ -157,11 +148,8 @@ public abstract class GeneratorBasedRoutingTableBuilder extends BaseRoutingTable List<String> serversForSegment = segmentServersPair.getRight(); String serverWithLeastSegmentsAssigned = getServerWithLeastSegmentsAssigned(serversForSegment, routingTable); - List<String> segmentsAssignedToServer = routingTable.get(serverWithLeastSegmentsAssigned); - if (segmentsAssignedToServer == null) { - segmentsAssignedToServer = new ArrayList<>(); - routingTable.put(serverWithLeastSegmentsAssigned, segmentsAssignedToServer); - } + List<String> segmentsAssignedToServer = + routingTable.computeIfAbsent(serverWithLeastSegmentsAssigned, k -> new ArrayList<>()); segmentsAssignedToServer.add(segmentName); } diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/PartitionAwareOfflineRoutingTableBuilder.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/PartitionAwareOfflineRoutingTableBuilder.java index 8ecf14d..19305ea 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/PartitionAwareOfflineRoutingTableBuilder.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/PartitionAwareOfflineRoutingTableBuilder.java @@ -122,13 +122,10 @@ public class PartitionAwareOfflineRoutingTableBuilder extends BasePartitionAware for (Integer partitionId : partitionIds) { for (int replicaId = 0; replicaId < _numReplicas; replicaId++) { List<String> serversForPartitionAndReplica = - partitionAssignment.getInstancesfromReplicaGroup(partitionId, replicaId); + partitionAssignment.getInstancesFromReplicaGroup(partitionId, replicaId); for (String serverName : serversForPartitionAndReplica) { - Map<String, Integer> serverToReplicaMap = partitionToServerToReplicaMap.get(partitionId); - if (serverToReplicaMap == null) { - serverToReplicaMap = new HashMap<>(); - partitionToServerToReplicaMap.put(partitionId, serverToReplicaMap); - } + Map<String, Integer> serverToReplicaMap = + partitionToServerToReplicaMap.computeIfAbsent(partitionId, k -> new HashMap<>()); serverToReplicaMap.put(serverName, replicaId); } } diff --git a/pinot-common/src/main/java/org/apache/pinot/common/partition/PartitionAssignment.java b/pinot-common/src/main/java/org/apache/pinot/common/partition/PartitionAssignment.java index 1f9522c..ae5a95c 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/partition/PartitionAssignment.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/partition/PartitionAssignment.java @@ -32,8 +32,8 @@ import org.apache.pinot.common.utils.EqualityUtils; */ public class PartitionAssignment { - protected String _tableName; - protected Map<String, List<String>> _partitionToInstances; + private String _tableName; + private Map<String, List<String>> _partitionToInstances; public PartitionAssignment(String tableName) { _tableName = tableName; diff --git a/pinot-common/src/main/java/org/apache/pinot/common/partition/ReplicaGroupPartitionAssignment.java b/pinot-common/src/main/java/org/apache/pinot/common/partition/ReplicaGroupPartitionAssignment.java index 42e695e..b065793 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/partition/ReplicaGroupPartitionAssignment.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/partition/ReplicaGroupPartitionAssignment.java @@ -79,7 +79,7 @@ public class ReplicaGroupPartitionAssignment extends PartitionAssignment { public void addInstanceToReplicaGroup(int partition, int replicaGroup, String instanceName) { String key = createMappingKey(partition, replicaGroup); if (!getPartitionToInstances().containsKey(key)) { - addPartition(key, new ArrayList<String>()); + addPartition(key, new ArrayList<>()); } getInstancesListForPartition(key).add(instanceName); } @@ -91,7 +91,7 @@ public class ReplicaGroupPartitionAssignment extends PartitionAssignment { * @param replicaGroup Replica group number * @return List of instances belongs to the given partition and replica group */ - public List<String> getInstancesfromReplicaGroup(int partition, int replicaGroup) { + public List<String> getInstancesFromReplicaGroup(int partition, int replicaGroup) { String key = createMappingKey(partition, replicaGroup); if (!getPartitionToInstances().containsKey(key)) { throw new NoSuchElementException(); diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/LLCUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/LLCUtils.java index 2189653..3adbb90 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/LLCUtils.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/LLCUtils.java @@ -42,15 +42,8 @@ public class LLCUtils { final LLCSegmentName segmentName = new LLCSegmentName(segment); String streamPartitionId = segmentName.getPartitionRange(); - SortedSet<SegmentName> segmentsForPartition = sortedSegmentsByStreamPartition.get(streamPartitionId); - - // Create sorted set if necessary - if (segmentsForPartition == null) { - segmentsForPartition = new TreeSet<>(); - - sortedSegmentsByStreamPartition.put(streamPartitionId, segmentsForPartition); - } - + SortedSet<SegmentName> segmentsForPartition = + sortedSegmentsByStreamPartition.computeIfAbsent(streamPartitionId, k -> new TreeSet<>()); segmentsForPartition.add(segmentName); } return sortedSegmentsByStreamPartition; diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ReplicaGroupRebalanceSegmentStrategy.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ReplicaGroupRebalanceSegmentStrategy.java index b472df3..a75f334 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ReplicaGroupRebalanceSegmentStrategy.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ReplicaGroupRebalanceSegmentStrategy.java @@ -206,7 +206,7 @@ public class ReplicaGroupRebalanceSegmentStrategy implements RebalanceSegmentStr int currentNewReplicaGroupId = 0; for (int groupId = 0; groupId < oldNumReplicaGroup; groupId++) { List<String> oldReplicaGroup = - oldReplicaGroupPartitionAssignment.getInstancesfromReplicaGroup(partitionId, groupId); + oldReplicaGroupPartitionAssignment.getInstancesFromReplicaGroup(partitionId, groupId); List<String> newReplicaGroup = new ArrayList<>(); boolean removeGroup = false; @@ -373,7 +373,7 @@ public class ReplicaGroupRebalanceSegmentStrategy implements RebalanceSegmentStr List<String> referenceReplicaGroup = new ArrayList<>(); for (int replicaId = 0; replicaId < numReplicaGroups; replicaId++) { List<String> serversInReplicaGroup = - replicaGroupPartitionAssignment.getInstancesfromReplicaGroup(partitionId, replicaId); + replicaGroupPartitionAssignment.getInstancesFromReplicaGroup(partitionId, replicaId); if (replicaId == 0) { // We need to keep the first replica group in case of mirroring. referenceReplicaGroup.addAll(serversInReplicaGroup); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/sharding/ReplicaGroupSegmentAssignmentStrategy.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/sharding/ReplicaGroupSegmentAssignmentStrategy.java index 577e0f7..cd604f5 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/sharding/ReplicaGroupSegmentAssignmentStrategy.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/sharding/ReplicaGroupSegmentAssignmentStrategy.java @@ -84,7 +84,7 @@ public class ReplicaGroupSegmentAssignmentStrategy implements SegmentAssignmentS int index = 0; for (int groupId = 0; groupId < numReplicas; groupId++) { List<String> instancesInReplicaGroup = - replicaGroupPartitionAssignment.getInstancesfromReplicaGroup(partitionNumber, groupId); + replicaGroupPartitionAssignment.getInstancesFromReplicaGroup(partitionNumber, groupId); int numInstances = instancesInReplicaGroup.size(); if (mirrorAssignmentAcrossReplicaGroups) { // Randomly pick the index and use the same index for all replica groups. diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/utils/ReplicaGroupTestUtils.java b/pinot-controller/src/test/java/org/apache/pinot/controller/utils/ReplicaGroupTestUtils.java index e1bf7c2..31466d6 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/utils/ReplicaGroupTestUtils.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/utils/ReplicaGroupTestUtils.java @@ -98,7 +98,7 @@ public class ReplicaGroupTestUtils { // Check if the servers in a replica group covers all segments for (int partitionId = 0; partitionId < numPartitions; partitionId++) { for (int replicaId = 0; replicaId < numReplicaGroups; replicaId++) { - List<String> replicaGroup = replicaGroupMapping.getInstancesfromReplicaGroup(partitionId, replicaId); + List<String> replicaGroup = replicaGroupMapping.getInstancesFromReplicaGroup(partitionId, replicaId); Set<String> replicaGroupSegments = new HashSet<>(); for (String server : replicaGroup) { for (String segment : serverToSegments.get(server)) { @@ -124,7 +124,7 @@ public class ReplicaGroupTestUtils { for (int serverIndex = 0; serverIndex < replicaGroupConfig.getNumInstancesPerPartition(); serverIndex++) { Set<String> mirrorSegments = new HashSet<>(); for (int replicaId = 0; replicaId < numReplicaGroups; replicaId++) { - List<String> replicaGroup = replicaGroupMapping.getInstancesfromReplicaGroup(partitionId, replicaId); + List<String> replicaGroup = replicaGroupMapping.getInstancesFromReplicaGroup(partitionId, replicaId); String server = replicaGroup.get(serverIndex); Set<String> currentSegments = new HashSet<>(serverToSegments.get(server)); mirrorSegments.addAll(currentSegments); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org