This is an automated email from the ASF dual-hosted git repository. snlee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push: new 191255b Improve partition aware routing when a server is down. (#4119) 191255b is described below commit 191255b3f8049af484c682bc445ea1f1d0ada54d Author: Seunghyun Lee <sn...@linkedin.com> AuthorDate: Wed Apr 17 13:54:43 2019 -0700 Improve partition aware routing when a server is down. (#4119) When a server is down, current partition aware routing will always pick the next server. This pr improves the current approach by making the routing table builder to evenly distribute the load among available servers. --- .../BasePartitionAwareRoutingTableBuilder.java | 47 +++++++------ ...rtitionAwareOfflineRoutingTableBuilderTest.java | 79 ++++++++++++++++++++++ 2 files changed, 107 insertions(+), 19 deletions(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/BasePartitionAwareRoutingTableBuilder.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/BasePartitionAwareRoutingTableBuilder.java index e8e8434..7b8b182 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/BasePartitionAwareRoutingTableBuilder.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/builder/BasePartitionAwareRoutingTableBuilder.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.broker.routing.builder; +import it.unimi.dsi.fastutil.ints.IntArrays; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -47,10 +48,10 @@ import org.slf4j.LoggerFactory; * for routing. The look up table is in the format of < segment_name -> (replica_id -> server_instance) >. * * When the query comes in, the routing algorithm is as follows: - * 1. Randomly pick a replica id (or replica group id) + * 1. Shuffle the replica group ids * 2. For each segment of the given table, * a. Check if the segment can be pruned. If pruned, go to the next segment. - * b. If not pruned, assign the segment to a server with the replica id that is picked above. + * b. If not pruned, assign the segment to a server with the replica id based on the shuffled replica group ids. * */ public abstract class BasePartitionAwareRoutingTableBuilder implements RoutingTableBuilder { @@ -101,8 +102,15 @@ public abstract class BasePartitionAwareRoutingTableBuilder implements RoutingTa Map<String, List<String>> routingTable = new HashMap<>(); SegmentPrunerContext prunerContext = new SegmentPrunerContext(request.getBrokerRequest()); - // 1. Randomly pick a replica id - int replicaId = _random.nextInt(_numReplicas); + // Shuffle the replica group ids in order to satisfy: + // a. Pick a replica group in an evenly distributed fashion + // b. When a server is not available, the request should be distributed evenly among other available servers. + int[] shuffledReplicaGroupIds = new int[_numReplicas]; + for (int i = 0; i < _numReplicas; i++) { + shuffledReplicaGroupIds[i] = i; + } + IntArrays.shuffle(shuffledReplicaGroupIds, _random); + for (String segmentName : segmentsToQuery) { SegmentZKMetadata segmentZKMetadata = _segmentToZkMetadataMapping.get(segmentName); @@ -110,25 +118,26 @@ public abstract class BasePartitionAwareRoutingTableBuilder implements RoutingTa boolean segmentPruned = (segmentZKMetadata != null) && _pruner.prune(segmentZKMetadata, prunerContext); if (!segmentPruned) { - // 2b. Segment cannot be pruned. Assign the segment to a server with the replica id picked above. + // 2b. Segment cannot be pruned. Assign the segment to a server based on the shuffled replica group ids Map<Integer, String> replicaIdToServerMap = segmentToReplicaToServerMap.get(segmentName); - String serverName = replicaIdToServerMap.get(replicaId); - - // When the server is not available with this replica id, we need to pick another available server. - if (serverName == null) { - if (!replicaIdToServerMap.isEmpty()) { - serverName = replicaIdToServerMap.values().iterator().next(); - } else { - // No server is found for this segment - continue; + + String serverName = null; + for (int i = 0; i < _numReplicas; i++) { + serverName = replicaIdToServerMap.get(shuffledReplicaGroupIds[i]); + // If a server is found, update routing table for the current segment + if (serverName != null) { + break; } } - List<String> segmentsForServer = routingTable.get(serverName); - if (segmentsForServer == null) { - segmentsForServer = new ArrayList<>(); - routingTable.put(serverName, segmentsForServer); + + if (serverName != null) { + routingTable.computeIfAbsent(serverName, k -> new ArrayList<>()).add(segmentName); + } else { + // No server is found for this segment if the code reach here + + // TODO: we need to discuss and decide on how we will be handling this case since we are not returning the + // complete result here. } - segmentsForServer.add(segmentName); } } diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/builder/PartitionAwareOfflineRoutingTableBuilderTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/builder/PartitionAwareOfflineRoutingTableBuilderTest.java index 5fe6322..e71dd8a 100644 --- a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/builder/PartitionAwareOfflineRoutingTableBuilderTest.java +++ b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/builder/PartitionAwareOfflineRoutingTableBuilderTest.java @@ -229,6 +229,66 @@ public class PartitionAwareOfflineRoutingTableBuilderTest { Assert.assertEquals(servers.size(), 2); } + @Test + public void testRoutingAfterOneServerDown() throws Exception { + NUM_REPLICA = 3; + NUM_PARTITION = 1; + NUM_SERVERS = 3; + NUM_SEGMENTS = 20; + + // Create the fake property store + FakePropertyStore fakePropertyStore = new FakePropertyStore(); + + // Create the table config, partition mapping, + TableConfig tableConfig = buildOfflineTableConfig(); + + // Create the replica group id to server mapping + Map<Integer, List<String>> replicaToServerMapping = buildReplicaGroupMapping(); + + // Update segment zk metadata. + for (int i = 0; i < NUM_SEGMENTS; i++) { + String segmentName = "segment" + i; + int partition = i % NUM_PARTITION; + SegmentZKMetadata metadata = buildOfflineSegmentZKMetadata(segmentName, partition); + fakePropertyStore + .setContents(ZKMetadataProvider.constructPropertyStorePathForSegment(OFFLINE_TABLE_NAME, segmentName), + metadata.toZNRecord()); + } + + // Update replica group mapping zk metadata + updateReplicaGroupPartitionAssignment(OFFLINE_TABLE_NAME, fakePropertyStore); + + // Create instance Configs + List<InstanceConfig> instanceConfigs = new ArrayList<>(); + for (int serverId = 0; serverId < NUM_SERVERS; serverId++) { + String serverName = "Server_localhost_" + serverId; + instanceConfigs.add(new InstanceConfig(serverName)); + } + + // Pick a server that is going to be down + Random random = new Random(); + String downServer = instanceConfigs.get(random.nextInt(instanceConfigs.size())).getInstanceName(); + + // Create the fake external view with a down server + ExternalView externalView = buildExternalViewWithDownServer(OFFLINE_TABLE_NAME, replicaToServerMapping, downServer); + + // Create the partition aware offline routing table builder + RoutingTableBuilder routingTableBuilder = + buildPartitionAwareOfflineRoutingTableBuilder(fakePropertyStore, tableConfig, externalView, instanceConfigs); + + Set<String> servers = new HashSet<>(); + for (int i = 0; i < 100; i++) { + String countStarQuery = "select count(*) from " + OFFLINE_TABLE_NAME; + Map<String, List<String>> routingTable = + routingTableBuilder.getRoutingTable(buildRoutingTableLookupRequest(countStarQuery), null); + Assert.assertEquals(routingTable.keySet().size(), 1); + servers.add(routingTable.keySet().iterator().next()); + } + + // Check if the other two available servers are getting picked + Assert.assertEquals(servers.size(), 2); + } + private void updateReplicaGroupPartitionAssignment(String tableNameWithType, FakePropertyStore propertyStore) { // Create partition assignment mapping table. ReplicaGroupPartitionAssignment replicaGroupPartitionAssignment = @@ -271,6 +331,25 @@ public class PartitionAwareOfflineRoutingTableBuilderTest { return externalView; } + private ExternalView buildExternalViewWithDownServer(String tableName, Map<Integer, List<String>> replicaGroupServers, + String downServer) throws Exception { + // Create External View + ExternalView externalView = new ExternalView(tableName); + for (int i = 0; i < NUM_SEGMENTS; i++) { + String segmentName = "segment" + i; + int serverIndex = i % (NUM_SERVERS / NUM_REPLICA); + for (List<String> serversInReplicaGroup : replicaGroupServers.values()) { + String serverName = serversInReplicaGroup.get(serverIndex); + if (serverName.equals(downServer)) { + externalView.setState(segmentName, serversInReplicaGroup.get(serverIndex), "OFFLINE"); + } else { + externalView.setState(segmentName, serversInReplicaGroup.get(serverIndex), "ONLINE"); + } + } + } + return externalView; + } + private Map<Integer, List<String>> buildReplicaGroupMapping() { Map<Integer, List<String>> replicaGroupServers = new HashMap<>(); int numServersPerReplica = NUM_SERVERS / NUM_REPLICA; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org