This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 50ab6f2065 Rename ReplicaGroups to Pools (#16104) 50ab6f2065 is described below commit 50ab6f20657b6681ba26bac873e103aab3128a3c Author: Xuanyi Li <xuany...@uber.com> AuthorDate: Mon Jun 16 11:52:38 2025 -0700 Rename ReplicaGroups to Pools (#16104) --- .../apache/pinot/broker/querylog/QueryLogger.java | 4 +- .../BaseSingleStageBrokerRequestHandler.java | 8 +- ...ctor.java => PriorityPoolInstanceSelector.java} | 162 ++++++++++----------- .../ServerSelectionContext.java | 18 +-- .../instanceselector/BalancedInstanceSelector.java | 16 +- .../instanceselector/BaseInstanceSelector.java | 24 +-- .../ReplicaGroupInstanceSelector.java | 22 +-- .../instanceselector/SegmentInstanceCandidate.java | 14 +- .../StrictReplicaGroupInstanceSelector.java | 4 +- .../pinot/broker/querylog/QueryLoggerTest.java | 2 +- ....java => PriorityPoolInstanceSelectorTest.java} | 26 ++-- .../apache/pinot/common/metrics/BrokerMeter.java | 18 +-- .../apache/pinot/common/metrics/BrokerMetrics.java | 14 +- .../pinot/common/response/BrokerResponse.java | 10 +- .../response/broker/BrokerResponseNative.java | 12 +- .../response/broker/BrokerResponseNativeV2.java | 13 +- .../common/utils/config/QueryOptionsUtils.java | 22 +-- .../pinot/common/metrics/BrokerMetricsTest.java | 34 +++-- .../pinot/core/transport/ServerInstance.java | 8 +- .../tests/BaseClusterIntegrationTestSet.java | 8 +- .../apache/pinot/spi/utils/CommonConstants.java | 7 +- 21 files changed, 232 insertions(+), 214 deletions(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/querylog/QueryLogger.java b/pinot-broker/src/main/java/org/apache/pinot/broker/querylog/QueryLogger.java index 20a94d78e8..6cb4d55555 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/querylog/QueryLogger.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/querylog/QueryLogger.java @@ -329,10 +329,10 @@ public class QueryLogger { .append(params._response.getRealtimeResponseSerMemAllocatedBytes()); } }, - REPLICA_GROUPS("replicaGroups") { + POOLS("pools") { @Override void doFormat(StringBuilder builder, QueryLogger logger, QueryLogParams params) { - builder.append(params._response.getReplicaGroups()); + builder.append(params._response.getPools()); } }; diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java index 87cd5b14bf..2e6fbcca29 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java @@ -761,7 +761,7 @@ public abstract class BaseSingleStageBrokerRequestHandler extends BaseBrokerRequ remainingTimeMs, serverStats, requestContext); } brokerResponse.setTablesQueried(Set.of(rawTableName)); - brokerResponse.setReplicaGroups(Stream.concat( + brokerResponse.setPools(Stream.concat( offlineExecutionServers != null ? offlineExecutionServers.stream() : Stream.empty(), realtimeExecutionServers != null ? realtimeExecutionServers.stream() : Stream.empty()) .map(ServerInstance::getPool) @@ -809,9 +809,9 @@ public abstract class BaseSingleStageBrokerRequestHandler extends BaseBrokerRequ _brokerMetrics.addTimedValue(BrokerTimer.QUERY_TOTAL_TIME_MS, totalTimeMs, TimeUnit.MILLISECONDS); } - for (int group : brokerResponse.getReplicaGroups()) { - _brokerMetrics.addMeteredValue(BrokerMeter.REPLICA_QUERIES, 1, - BrokerMetrics.getTagForPreferredGroup(sqlNodeAndOptions.getOptions()), String.valueOf(group)); + for (int pool : brokerResponse.getPools()) { + _brokerMetrics.addMeteredValue(BrokerMeter.POOL_QUERIES, 1, + BrokerMetrics.getTagForPreferredPool(sqlNodeAndOptions.getOptions()), String.valueOf(pool)); } // Log query and stats diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/adaptiveserverselector/PriorityGroupInstanceSelector.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/adaptiveserverselector/PriorityPoolInstanceSelector.java similarity index 53% rename from pinot-broker/src/main/java/org/apache/pinot/broker/routing/adaptiveserverselector/PriorityGroupInstanceSelector.java rename to pinot-broker/src/main/java/org/apache/pinot/broker/routing/adaptiveserverselector/PriorityPoolInstanceSelector.java index 72a1cdd464..0ee7a37250 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/adaptiveserverselector/PriorityGroupInstanceSelector.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/adaptiveserverselector/PriorityPoolInstanceSelector.java @@ -33,80 +33,80 @@ import org.apache.pinot.broker.routing.instanceselector.SegmentInstanceCandidate /** - * A server selector that implements priority-based server selection based on replica groups. + * A server selector that implements priority-based server selection based on pools. * This selector works in conjunction with an {@link AdaptiveServerSelector} to provide * a two-level selection strategy: * <ol> - * <li>First level: Select servers based on replica group priorities</li> - * <li>Second level: Use adaptive selection within the chosen replica group</li> + * <li>First level: Select servers based on pool priorities</li> + * <li>Second level: Use adaptive selection within the chosen pool</li> * </ol> * * <p>The selector maintains the following invariants:</p> * <ul> - * <li>Servers from preferred replica groups are always selected before non-preferred groups</li> - * <li>Within each replica group, servers are selected using adaptive selection criteria</li> - * <li>When no preferred groups are available, falls back to non-preferred groups</li> + * <li>Servers from preferred pools are always selected before non-preferred pools</li> + * <li>Within each pools, servers are selected using adaptive selection criteria</li> + * <li>When no preferred pools are available, falls back to non-preferred pools</li> * </ul> */ -public class PriorityGroupInstanceSelector { +public class PriorityPoolInstanceSelector { - /** The underlying adaptive server selector used for selection within replica groups */ + /** The underlying adaptive server selector used for selection within pools */ private final AdaptiveServerSelector _adaptiveServerSelector; - /** Sentinel value used to group all non-preferred servers together */ - private static final int SENTINEL_GROUP_OF_NON_PREFERRED_SERVERS = Integer.MAX_VALUE; + /** Sentinel value used to pool all non-preferred servers together */ + private static final int SENTINEL_POOL_OF_NON_PREFERRED_SERVERS = Integer.MAX_VALUE; /** - * Creates a new priority group instance selector with the given adaptive server selector. + * Creates a new priority pool instance selector with the given adaptive server selector. * - * @param adaptiveServerSelector the adaptive server selector to use for selection within groups + * @param adaptiveServerSelector the adaptive server selector to use for selection within pools * @throws IllegalArgumentException if adaptiveServerSelector is null */ - public PriorityGroupInstanceSelector(AdaptiveServerSelector adaptiveServerSelector) { + public PriorityPoolInstanceSelector(AdaptiveServerSelector adaptiveServerSelector) { assert adaptiveServerSelector != null; _adaptiveServerSelector = adaptiveServerSelector; } /** - * Selects a server instance from the given candidates based on replica group preferences. + * Selects a server instance from the given candidates based on pool preferences. * The selection process follows these steps: * <ol> - * <li>Groups all candidates by their replica group</li> - * <li>Iterates through the ordered preferred groups in priority order</li> - * <li>For the first group that has available servers, uses adaptiveServerSelector to choose one</li> - * <li>If no preferred groups have servers, falls back to selecting from remaining servers</li> + * <li>pools all candidates by their pool</li> + * <li>Iterates through the ordered preferred pools in priority order</li> + * <li>For the first pool that has available servers, uses adaptiveServerSelector to choose one</li> + * <li>If no preferred pools have servers, falls back to selecting from remaining servers</li> * </ol> * - * <p>Example 1 - Preferred group has servers:</p> + * <p>Example 1 - Preferred pool has servers:</p> * <pre> * Candidates: - * - server1 (replica group 1) - * - server2 (replica group 2) - * - server3 (replica group 1) - * Preferred groups: [2, 1] - * Result: server2 is selected (from group 2, highest priority) + * - server1 (pool 1) + * - server2 (pool 2) + * - server3 (pool 1) + * Preferred pools: [2, 1] + * Result: server2 is selected (from pool 2, highest priority) * </pre> * - * <p>Example 2 - Fallback to second preferred group:</p> + * <p>Example 2 - Fallback to second preferred pool:</p> * <pre> * Candidates: - * - server1 (replica group 1) - * - server3 (replica group 1) - * - server4 (replica group 3) - * Preferred groups: [2, 1] - * Result: adaptiveServerSelector chooses between server1 and server3 (from group 1) + * - server1 (pool 1) + * - server3 (pool 1) + * - server4 (pool 3) + * Preferred pools: [2, 1] + * Result: adaptiveServerSelector chooses between server1 and server3 (from pool 1) * </pre> * - * <p>Example 3 - Fallback to non-preferred group:</p> + * <p>Example 3 - Fallback to non-preferred pool:</p> * <pre> * Candidates: - * - server4 (replica group 3) - * - server5 (replica group 3) - * Preferred groups: [2, 1] - * Result: adaptiveServerSelector chooses between server4 and server5 (from group 3) + * - server4 (pool 3) + * - server5 (pool 3) + * Preferred pools: [2, 1] + * Result: adaptiveServerSelector chooses between server4 and server5 (from pool 3) * </pre> * - * @param ctx the server selection context containing ordered preferred groups + * @param ctx the server selection context containing ordered preferred pools * @param candidates the list of server candidates to choose from * @return the selected server instance as a SegmentInstanceCandidate, or null if no candidates are available */ @@ -119,30 +119,30 @@ public class PriorityGroupInstanceSelector { } // intentional copy to avoid modifying the original list; we will add Integer.MAX_VALUE // as a sentinel value to the end of the list to ensure non-preferred servers are processed last - List<Integer> groups = new ArrayList<>(ctx.getOrderedPreferredGroups()); - if (groups.isEmpty()) { + List<Integer> pools = new ArrayList<>(ctx.getOrderedPreferredPools()); + if (pools.isEmpty()) { return choose(candidates); } - Set<Integer> groupSet = new HashSet<>(groups); - Map<Integer, List<SegmentInstanceCandidate>> groupToServerPos = new HashMap<>(); - // Group servers by their replica groups. For servers not in preferred groups, + Set<Integer> poolSet = new HashSet<>(pools); + Map<Integer, List<SegmentInstanceCandidate>> poolToServerPos = new HashMap<>(); + // Group servers by their pools. For servers not in preferred pools, // use Integer.MAX_VALUE as a sentinel value to ensure they are processed last. // This allows us to: - // 1. Process preferred groups in their specified order - // 2. Handle all non-preferred servers as a single group with lowest priority + // 1. Process preferred pools in their specified order + // 2. Handle all non-preferred servers as a single pool with lowest priority // 3. Avoid complex conditional logic for handling non-preferred servers for (SegmentInstanceCandidate candidate : candidates) { - int group = candidate.getReplicaGroup(); - group = groupSet.contains(group) ? group : SENTINEL_GROUP_OF_NON_PREFERRED_SERVERS; - groupToServerPos.computeIfAbsent(group, k -> new ArrayList<>()).add(candidate); + int pool = candidate.getPool(); + pool = poolSet.contains(pool) ? pool : SENTINEL_POOL_OF_NON_PREFERRED_SERVERS; + poolToServerPos.computeIfAbsent(pool, k -> new ArrayList<>()).add(candidate); } - // Add Integer.MAX_VALUE to the end of preferred groups to ensure non-preferred servers - // are processed after all preferred groups - groups.add(SENTINEL_GROUP_OF_NON_PREFERRED_SERVERS); - for (int group : groups) { - List<SegmentInstanceCandidate> instancesInGroup = groupToServerPos.get(group); - if (instancesInGroup != null) { - return choose(instancesInGroup); + // Add Integer.MAX_VALUE to the end of preferred pools to ensure non-preferred servers + // are processed after all preferred pools + pools.add(SENTINEL_POOL_OF_NON_PREFERRED_SERVERS); + for (int pool : pools) { + List<SegmentInstanceCandidate> instancesInPool = poolToServerPos.get(pool); + if (instancesInPool != null) { + return choose(instancesInPool); } } assert false; @@ -151,30 +151,30 @@ public class PriorityGroupInstanceSelector { /** * Invoke adaptiveServerSelector to get the original ranking the servers (min first). Reorder the servers based on - * the replica group preference. The head of the OrderedPreferredGroups list is the most preferred group. - * The servers in the same group are ranked by the original ranking. + * the pool preference. The head of the OrderedPreferredPools list is the most preferred pool. + * The servers in the same pool are ranked by the original ranking. * * <p>Example:</p> * <pre> * Given: * - Server candidates: - * - server1 (group 1, score 80) - * - server2 (group 2, score 70) - * - server3 (group 1, score 90) - * - server4 (group 3, score 60) - * - Ordered preferred groups: [2, 1] + * - server1 (pool 1, score 80) + * - server2 (pool 2, score 70) + * - server3 (pool 1, score 90) + * - server4 (pool 3, score 60) + * - Ordered preferred pools: [2, 1] * * Original ranking by score would be: [server4, server2, server1, server3] - * Final ranking after group preference: [server2, server1, server3, server4] + * Final ranking after pool preference: [server2, server1, server3, server4] * Because: - * 1. Group 2 servers come first (server2) - * 2. Group 1 servers come next, maintaining their relative order (server1, server3) + * 1. pool 2 servers come first (server2) + * 2. pool 1 servers come next, maintaining their relative order (server1, server3) * 3. Remaining servers come last (server4) * </pre> * - * @param ctx the server selection context containing ordered preferred groups + * @param ctx the server selection context containing ordered preferred pools * @param serverCandidates the server candidates to be ranked - * @return the ranked servers, ordered by group preference and then by original ranking within each group + * @return the ranked servers, ordered by pool preference and then by original ranking within each pool */ public List<String> rank(ServerSelectionContext ctx, List<SegmentInstanceCandidate> serverCandidates) { if (serverCandidates == null || serverCandidates.isEmpty()) { @@ -186,33 +186,33 @@ public class PriorityGroupInstanceSelector { serverCandidates.stream() .map(SegmentInstanceCandidate::getInstance) .collect(Collectors.toList())); - List<Integer> groups = new ArrayList<>(ctx.getOrderedPreferredGroups()); - if (groups.isEmpty()) { + List<Integer> pools = new ArrayList<>(ctx.getOrderedPreferredPools()); + if (pools.isEmpty()) { return serverRankListWithScores.stream().map(Pair::getLeft).collect(Collectors.toList()); } Map<String, SegmentInstanceCandidate> idToCandidate = serverCandidates.stream() .map(candidate -> new ImmutablePair<>(candidate.getInstance(), candidate)) .collect(Collectors.toMap(Pair::getLeft, Pair::getRight)); - // Create a set of preferred groups for efficient lookup - Set<Integer> preferredGroups = new HashSet<>(groups); - Map<Integer, List<String>> groupToRankedServers = new HashMap<>(); + // Create a set of preferred pools for efficient lookup + Set<Integer> preferredPools = new HashSet<>(pools); + Map<Integer, List<String>> poolToRankedServers = new HashMap<>(); for (Pair<String, Double> entry : serverRankListWithScores) { - int group = idToCandidate.get(entry.getLeft()).getReplicaGroup(); - // If the group is not in the preferred groups list, assign it the sentinel group - group = preferredGroups.contains(group) ? group : SENTINEL_GROUP_OF_NON_PREFERRED_SERVERS; - groupToRankedServers.computeIfAbsent(group, k -> new ArrayList<>()).add(entry.getLeft()); + int pool = idToCandidate.get(entry.getLeft()).getPool(); + // If the pool is not in the preferred pools list, assign it the sentinel pool + pool = preferredPools.contains(pool) ? pool : SENTINEL_POOL_OF_NON_PREFERRED_SERVERS; + poolToRankedServers.computeIfAbsent(pool, k -> new ArrayList<>()).add(entry.getLeft()); } - // Add the sentinel group to the end of the groups list to ensure its group members are included in the tail - groups.add(SENTINEL_GROUP_OF_NON_PREFERRED_SERVERS); + // Add the sentinel pool to the end of the pools list to ensure its pool members are included in the tail + pools.add(SENTINEL_POOL_OF_NON_PREFERRED_SERVERS); - // Build the final ranked list by processing groups in order + // Build the final ranked list by processing pools in order List<String> rankedServers = new ArrayList<>(); - for (int group : groups) { - List<String> instancesInGroup = groupToRankedServers.get(group); - if (instancesInGroup != null) { - rankedServers.addAll(instancesInGroup); + for (int pool : pools) { + List<String> instancesInPool = poolToRankedServers.get(pool); + if (instancesInPool != null) { + rankedServers.addAll(instancesInPool); } } return rankedServers; diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/adaptiveserverselector/ServerSelectionContext.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/adaptiveserverselector/ServerSelectionContext.java index 858c021cfa..1132b2c4b2 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/adaptiveserverselector/ServerSelectionContext.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/adaptiveserverselector/ServerSelectionContext.java @@ -25,7 +25,7 @@ import org.apache.pinot.common.utils.config.QueryOptionsUtils; /** - * This class encapsulates query options and ordered preferred replica groups that influence how + * This class encapsulates query options and ordered preferred pools that influence how * servers are selected for query execution. */ public class ServerSelectionContext { @@ -34,34 +34,34 @@ public class ServerSelectionContext { * These options are passed into the context class to avoid endless constructor argument changes * as new server selection preferences are added. Examples of such options include: * <ul> - * <li>Preferred replica groups for routing</li> + * <li>Preferred pools for routing</li> * <li>Boolean fixedReplicaGroup</li> * <li>Other server selection related configurations in the future</li> * </ul> * The options are processed once during construction to extract relevant information - * (like ordered preferred groups) to avoid repeated parsing. + * (like ordered preferred pools) to avoid repeated parsing. */ private final Map<String, String> _queryOptions; // If some query options need further processing, store the parsing result below to avoid duplicate parsing. - private final List<Integer> _orderedPreferredGroups; + private final List<Integer> _orderedPreferredPools; /** * Creates a new server selection context with the given query options. - * The ordered preferred groups are extracted from the query options using - * {@link QueryOptionsUtils#getOrderedPreferredReplicas(Map)}. + * The ordered preferred pools are extracted from the query options using + * {@link QueryOptionsUtils#getOrderedPreferredPools(Map)}. * * @param queryOptions map of query options that may contain server selection preferences */ public ServerSelectionContext(Map<String, String> queryOptions) { _queryOptions = queryOptions == null ? Collections.emptyMap() : queryOptions; - _orderedPreferredGroups = QueryOptionsUtils.getOrderedPreferredReplicas(_queryOptions); + _orderedPreferredPools = QueryOptionsUtils.getOrderedPreferredPools(_queryOptions); } public Map<String, String> getQueryOptions() { return _queryOptions; } - public List<Integer> getOrderedPreferredGroups() { - return _orderedPreferredGroups; + public List<Integer> getOrderedPreferredPools() { + return _orderedPreferredPools; } } diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BalancedInstanceSelector.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BalancedInstanceSelector.java index 807d3e9fda..26d303426d 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BalancedInstanceSelector.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BalancedInstanceSelector.java @@ -64,8 +64,8 @@ public class BalancedInstanceSelector extends BaseInstanceSelector { Map<String, String> optionalSegmentToInstanceMap = new HashMap<>(); ServerSelectionContext ctx = new ServerSelectionContext(queryOptions); // TODO: refactor to dedup the code and use a single for loop - Map<Integer, Integer> replicaGroupToSegmentCount = new HashMap<>(); - if (_priorityGroupInstanceSelector != null) { + Map<Integer, Integer> poolToSegmentCount = new HashMap<>(); + if (_priorityPoolInstanceSelector != null) { for (String segment : segments) { List<SegmentInstanceCandidate> candidates = segmentStates.getCandidates(segment); // NOTE: candidates can be null when there is no enabled instances for the segment, or the instance selector has @@ -73,11 +73,11 @@ public class BalancedInstanceSelector extends BaseInstanceSelector { if (candidates == null) { continue; } - SegmentInstanceCandidate candidate = _priorityGroupInstanceSelector.select(ctx, candidates); + SegmentInstanceCandidate candidate = _priorityPoolInstanceSelector.select(ctx, candidates); // If candidates is not null, candidates is always non-empty because segments with no enabled online servers // are placed in segmentStates.getUnavailableSegments() assert candidate != null; - replicaGroupToSegmentCount.merge(candidate.getReplicaGroup(), 1, Integer::sum); + poolToSegmentCount.merge(candidate.getPool(), 1, Integer::sum); // This can only be offline when it is a new segment. And such segment is marked as optional segment so that // broker or server can skip it upon any issue to process it. if (candidate.isOnline()) { @@ -103,7 +103,7 @@ public class BalancedInstanceSelector extends BaseInstanceSelector { selectedIdx = requestId++ % candidates.size(); } SegmentInstanceCandidate selectedCandidate = candidates.get(selectedIdx); - replicaGroupToSegmentCount.merge(selectedCandidate.getReplicaGroup(), 1, Integer::sum); + poolToSegmentCount.merge(selectedCandidate.getPool(), 1, Integer::sum); // This can only be offline when it is a new segment. And such segment is marked as optional segment so that // broker or server can skip it upon any issue to process it. if (selectedCandidate.isOnline()) { @@ -113,9 +113,9 @@ public class BalancedInstanceSelector extends BaseInstanceSelector { } } } - for (Map.Entry<Integer, Integer> entry : replicaGroupToSegmentCount.entrySet()) { - _brokerMetrics.addMeteredValue(BrokerMeter.REPLICA_SEG_QUERIES, entry.getValue(), - BrokerMetrics.getTagForPreferredGroup(queryOptions), String.valueOf(entry.getKey())); + for (Map.Entry<Integer, Integer> entry : poolToSegmentCount.entrySet()) { + _brokerMetrics.addMeteredValue(BrokerMeter.POOL_SEG_QUERIES, entry.getValue(), + BrokerMetrics.getTagForPreferredPool(queryOptions), String.valueOf(entry.getKey())); } return Pair.of(segmentToSelectedInstanceMap, optionalSegmentToInstanceMap); } diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java index 50fdc8b0b6..591131ff15 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java @@ -38,7 +38,7 @@ import org.apache.helix.model.IdealState; import org.apache.helix.store.zk.ZkHelixPropertyStore; import org.apache.helix.zookeeper.datamodel.ZNRecord; import org.apache.pinot.broker.routing.adaptiveserverselector.AdaptiveServerSelector; -import org.apache.pinot.broker.routing.adaptiveserverselector.PriorityGroupInstanceSelector; +import org.apache.pinot.broker.routing.adaptiveserverselector.PriorityPoolInstanceSelector; import org.apache.pinot.broker.routing.segmentpreselector.SegmentPreSelector; import org.apache.pinot.common.metadata.ZKMetadataProvider; import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; @@ -54,7 +54,7 @@ import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.apache.pinot.spi.utils.CommonConstants.Broker.FALLBACK_REPLICA_GROUP_ID; +import static org.apache.pinot.spi.utils.CommonConstants.Broker.FALLBACK_POOL_ID; /** @@ -94,7 +94,7 @@ abstract class BaseInstanceSelector implements InstanceSelector { final BrokerMetrics _brokerMetrics; final AdaptiveServerSelector _adaptiveServerSelector; // Will be null if and only if adaptiveServerSelector is null - final PriorityGroupInstanceSelector _priorityGroupInstanceSelector; + final PriorityPoolInstanceSelector _priorityPoolInstanceSelector; final Clock _clock; final boolean _useFixedReplica; final long _newSegmentExpirationTimeInSeconds; @@ -127,8 +127,8 @@ abstract class BaseInstanceSelector implements InstanceSelector { _tableNameHashForFixedReplicaRouting = TableNameBuilder.extractRawTableName(tableNameWithType).hashCode() & 0x7FFFFFFF; - _priorityGroupInstanceSelector = - _adaptiveServerSelector == null ? null : new PriorityGroupInstanceSelector(_adaptiveServerSelector); + _priorityPoolInstanceSelector = + _adaptiveServerSelector == null ? null : new PriorityPoolInstanceSelector(_adaptiveServerSelector); if (_adaptiveServerSelector != null && _useFixedReplica) { throw new IllegalArgumentException( "AdaptiveServerSelector and consistent routing cannot be enabled at the same time"); @@ -271,7 +271,7 @@ abstract class BaseInstanceSelector implements InstanceSelector { for (Map.Entry<String, String> entry : convertToSortedMap(idealStateInstanceStateMap).entrySet()) { if (isOnlineForRouting(entry.getValue())) { String instance = entry.getKey(); - candidates.add(new SegmentInstanceCandidate(instance, false, getGroup(instance))); + candidates.add(new SegmentInstanceCandidate(instance, false, getPool(instance))); } } _newSegmentStateMap.put(segment, new NewSegmentState(newSegmentCreationTimeMs, candidates)); @@ -288,7 +288,7 @@ abstract class BaseInstanceSelector implements InstanceSelector { if (isOnlineForRouting(entry.getValue())) { String instance = entry.getKey(); candidates.add( - new SegmentInstanceCandidate(instance, onlineInstances.contains(instance), getGroup(instance))); + new SegmentInstanceCandidate(instance, onlineInstances.contains(instance), getPool(instance))); } } _newSegmentStateMap.put(segment, new NewSegmentState(newSegmentCreationTimeMs, candidates)); @@ -296,7 +296,7 @@ abstract class BaseInstanceSelector implements InstanceSelector { // Old segment List<SegmentInstanceCandidate> candidates = new ArrayList<>(onlineInstances.size()); for (String instance : onlineInstances) { - candidates.add(new SegmentInstanceCandidate(instance, true, getGroup(instance))); + candidates.add(new SegmentInstanceCandidate(instance, true, getPool(instance))); } _oldSegmentCandidatesMap.put(segment, candidates); } @@ -472,16 +472,16 @@ abstract class BaseInstanceSelector implements InstanceSelector { } @VisibleForTesting - int getGroup(String instanceID) { - int group = FALLBACK_REPLICA_GROUP_ID; + int getPool(String instanceID) { + int pool = FALLBACK_POOL_ID; ServerInstance server = _enabledServerStore.get(instanceID); if (server == null) { LOGGER.warn("Failed to find server {} in the enabledServerManager when update segmentsMap for table {}", instanceID, _tableNameWithType); } else { - group = server.getPool(); + pool = server.getPool(); } - return group; + return pool; } /** diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/ReplicaGroupInstanceSelector.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/ReplicaGroupInstanceSelector.java index 943cd10d45..988a5cd124 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/ReplicaGroupInstanceSelector.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/ReplicaGroupInstanceSelector.java @@ -81,7 +81,7 @@ public class ReplicaGroupInstanceSelector extends BaseInstanceSelector { // Fetch serverRankList before looping through all the segments. This is important to make sure that we pick // the least amount of instances for a query by referring to a single snapshot of the rankings. - List<String> serverRankList = _priorityGroupInstanceSelector.rank(ctx, candidateServers); + List<String> serverRankList = _priorityPoolInstanceSelector.rank(ctx, candidateServers); Map<String, Integer> serverRankMap = new HashMap<>(); for (int idx = 0; idx < serverRankList.size(); idx++) { serverRankMap.put(serverRankList.get(idx), idx); @@ -98,7 +98,7 @@ public class ReplicaGroupInstanceSelector extends BaseInstanceSelector { Map<String, String> segmentToSelectedInstanceMap = new HashMap<>(HashUtil.getHashMapCapacity(segments.size())); // No need to adjust this map per total segment numbers, as optional segments should be empty most of the time. Map<String, String> optionalSegmentToInstanceMap = new HashMap<>(); - Map<Integer, Integer> replicaGroupToSegmentCount = new HashMap<>(); + Map<Integer, Integer> poolToSegmentCount = new HashMap<>(); boolean useFixedReplica = isUseFixedReplica(ctx.getQueryOptions()); Integer numReplicaGroupsToQuery = QueryOptionsUtils.getNumReplicaGroupsToQuery(ctx.getQueryOptions()); int numReplicaGroups = numReplicaGroupsToQuery != null ? numReplicaGroupsToQuery : 1; @@ -122,7 +122,7 @@ public class ReplicaGroupInstanceSelector extends BaseInstanceSelector { } SegmentInstanceCandidate selectedInstance = candidates.get(instanceIdx); - replicaGroupToSegmentCount.merge(selectedInstance.getReplicaGroup(), 1, Integer::sum); + poolToSegmentCount.merge(selectedInstance.getPool(), 1, Integer::sum); // This can only be offline when it is a new segment. And such segment is marked as optional segment so that // broker or server can skip it upon any issue to process it. if (selectedInstance.isOnline()) { @@ -135,9 +135,9 @@ public class ReplicaGroupInstanceSelector extends BaseInstanceSelector { } replicaOffset = (replicaOffset + 1) % numReplicaGroups; } - for (Map.Entry<Integer, Integer> entry : replicaGroupToSegmentCount.entrySet()) { - _brokerMetrics.addMeteredValue(BrokerMeter.REPLICA_SEG_QUERIES, entry.getValue(), - BrokerMetrics.getTagForPreferredGroup(ctx.getQueryOptions()), String.valueOf(entry.getKey())); + for (Map.Entry<Integer, Integer> entry : poolToSegmentCount.entrySet()) { + _brokerMetrics.addMeteredValue(BrokerMeter.POOL_SEG_QUERIES, entry.getValue(), + BrokerMetrics.getTagForPreferredPool(ctx.getQueryOptions()), String.valueOf(entry.getKey())); } return Pair.of(segmentToSelectedInstanceMap, optionalSegmentToInstanceMap); } @@ -148,7 +148,7 @@ public class ReplicaGroupInstanceSelector extends BaseInstanceSelector { Map<String, String> segmentToSelectedInstanceMap = new HashMap<>(HashUtil.getHashMapCapacity(segments.size())); // No need to adjust this map per total segment numbers, as optional segments should be empty most of the time. Map<String, String> optionalSegmentToInstanceMap = new HashMap<>(); - Map<Integer, Integer> replicaGroupToSegmentCount = new HashMap<>(); + Map<Integer, Integer> poolToSegmentCount = new HashMap<>(); for (String segment : segments) { // NOTE: candidates can be null when there is no enabled instances for the segment, or the instance selector has // not been updated (we update all components for routing in sequence) @@ -171,7 +171,7 @@ public class ReplicaGroupInstanceSelector extends BaseInstanceSelector { .min(Comparator.comparingInt(candidate -> serverRankMap.get(candidate.getInstance()))) .orElse(candidates.get(roundRobinInstanceIdx)); } - replicaGroupToSegmentCount.merge(selectedInstance.getReplicaGroup(), 1, Integer::sum); + poolToSegmentCount.merge(selectedInstance.getPool(), 1, Integer::sum); // This can only be offline when it is a new segment. And such segment is marked as optional segment so that // broker or server can skip it upon any issue to process it. if (selectedInstance.isOnline()) { @@ -180,9 +180,9 @@ public class ReplicaGroupInstanceSelector extends BaseInstanceSelector { optionalSegmentToInstanceMap.put(segment, selectedInstance.getInstance()); } } - for (Map.Entry<Integer, Integer> entry : replicaGroupToSegmentCount.entrySet()) { - _brokerMetrics.addMeteredValue(BrokerMeter.REPLICA_SEG_QUERIES, entry.getValue(), - BrokerMetrics.getTagForPreferredGroup(ctx.getQueryOptions()), String.valueOf(entry.getKey())); + for (Map.Entry<Integer, Integer> entry : poolToSegmentCount.entrySet()) { + _brokerMetrics.addMeteredValue(BrokerMeter.POOL_SEG_QUERIES, entry.getValue(), + BrokerMetrics.getTagForPreferredPool(ctx.getQueryOptions()), String.valueOf(entry.getKey())); } return Pair.of(segmentToSelectedInstanceMap, optionalSegmentToInstanceMap); } diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/SegmentInstanceCandidate.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/SegmentInstanceCandidate.java index 8c65dfb560..6dcb88eb54 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/SegmentInstanceCandidate.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/SegmentInstanceCandidate.java @@ -21,7 +21,7 @@ package org.apache.pinot.broker.routing.instanceselector; import com.google.common.annotations.VisibleForTesting; import javax.annotation.concurrent.Immutable; -import static org.apache.pinot.spi.utils.CommonConstants.Broker.FALLBACK_REPLICA_GROUP_ID; +import static org.apache.pinot.spi.utils.CommonConstants.Broker.FALLBACK_POOL_ID; /** @@ -31,20 +31,20 @@ import static org.apache.pinot.spi.utils.CommonConstants.Broker.FALLBACK_REPLICA public class SegmentInstanceCandidate { private final String _instance; private final boolean _online; - private final int _group; + private final int _pool; @VisibleForTesting public SegmentInstanceCandidate(String instance, boolean online) { _instance = instance; _online = online; // no group - _group = FALLBACK_REPLICA_GROUP_ID; + _pool = FALLBACK_POOL_ID; } - public SegmentInstanceCandidate(String instance, boolean online, int group) { + public SegmentInstanceCandidate(String instance, boolean online, int pool) { _instance = instance; _online = online; - _group = group; + _pool = pool; } public String getInstance() { @@ -55,7 +55,7 @@ public class SegmentInstanceCandidate { return _online; } - public int getReplicaGroup() { - return _group; + public int getPool() { + return _pool; } } diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/StrictReplicaGroupInstanceSelector.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/StrictReplicaGroupInstanceSelector.java index 40247a7f03..e547c66ed8 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/StrictReplicaGroupInstanceSelector.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/StrictReplicaGroupInstanceSelector.java @@ -154,7 +154,7 @@ public class StrictReplicaGroupInstanceSelector extends ReplicaGroupInstanceSele List<SegmentInstanceCandidate> candidates = new ArrayList<>(onlineInstances.size()); for (String instance : onlineInstances) { if (!unavailableInstances.contains(instance)) { - candidates.add(new SegmentInstanceCandidate(instance, true, getGroup(instance))); + candidates.add(new SegmentInstanceCandidate(instance, true, getPool(instance))); } } _oldSegmentCandidatesMap.put(segment, candidates); @@ -170,7 +170,7 @@ public class StrictReplicaGroupInstanceSelector extends ReplicaGroupInstanceSele for (String instance : convertToSortedMap(idealStateInstanceStateMap).keySet()) { if (!unavailableInstances.contains(instance)) { candidates.add(new SegmentInstanceCandidate(instance, onlineInstances.contains(instance), - getGroup(instance))); + getPool(instance))); } } _newSegmentStateMap.put(segment, new NewSegmentState(newSegmentCreationTimeMap.get(segment), candidates)); diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/querylog/QueryLoggerTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/querylog/QueryLoggerTest.java index 3cb82436e4..124aade345 100644 --- a/pinot-broker/src/test/java/org/apache/pinot/broker/querylog/QueryLoggerTest.java +++ b/pinot-broker/src/test/java/org/apache/pinot/broker/querylog/QueryLoggerTest.java @@ -115,7 +115,7 @@ public class QueryLoggerTest { + "queryEngine=singleStage," + "offlineMemAllocatedBytes(total/thread/resSer):0/0/0," + "realtimeMemAllocatedBytes(total/thread/resSer):0/0/0," - + "replicaGroups=[]," + + "pools=[]," + "query=SELECT * FROM foo"); //@formatter:on } diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/adaptiveserverselector/PriorityGroupInstanceSelectorTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/adaptiveserverselector/PriorityPoolInstanceSelectorTest.java similarity index 87% rename from pinot-broker/src/test/java/org/apache/pinot/broker/routing/adaptiveserverselector/PriorityGroupInstanceSelectorTest.java rename to pinot-broker/src/test/java/org/apache/pinot/broker/routing/adaptiveserverselector/PriorityPoolInstanceSelectorTest.java index 147d87719e..9250828343 100644 --- a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/adaptiveserverselector/PriorityGroupInstanceSelectorTest.java +++ b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/adaptiveserverselector/PriorityPoolInstanceSelectorTest.java @@ -31,16 +31,16 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import static org.testng.Assert.*; -public class PriorityGroupInstanceSelectorTest { +public class PriorityPoolInstanceSelectorTest { private AdaptiveServerSelector _adaptiveServerSelector; - private PriorityGroupInstanceSelector _selector; + private PriorityPoolInstanceSelector _selector; private ServerSelectionContext _context; @BeforeMethod public void setUp() { _adaptiveServerSelector = mock(AdaptiveServerSelector.class); - _selector = new PriorityGroupInstanceSelector(_adaptiveServerSelector); + _selector = new PriorityPoolInstanceSelector(_adaptiveServerSelector); _context = mock(ServerSelectionContext.class); } @@ -64,7 +64,7 @@ public class PriorityGroupInstanceSelectorTest { createCandidate("server2", 2), createCandidate("server3", 3) ); - when(_context.getOrderedPreferredGroups()).thenReturn(Collections.emptyList()); + when(_context.getOrderedPreferredPools()).thenReturn(Collections.emptyList()); when(_adaptiveServerSelector.select(any())).thenReturn("server2"); // Execute @@ -83,7 +83,7 @@ public class PriorityGroupInstanceSelectorTest { createCandidate("server2", 2), createCandidate("server3", 1) ); - when(_context.getOrderedPreferredGroups()).thenReturn(Arrays.asList(2, 1)); + when(_context.getOrderedPreferredPools()).thenReturn(Arrays.asList(2, 1)); when(_adaptiveServerSelector.select(any())).thenReturn("server2"); // Execute @@ -92,7 +92,7 @@ public class PriorityGroupInstanceSelectorTest { // Verify assertNotNull(result); assertEquals(result.getInstance(), "server2"); - assertEquals(result.getReplicaGroup(), 2); + assertEquals(result.getPool(), 2); } @Test @@ -103,7 +103,7 @@ public class PriorityGroupInstanceSelectorTest { createCandidate("server3", 1), createCandidate("server4", 3) ); - when(_context.getOrderedPreferredGroups()).thenReturn(Arrays.asList(2, 1)); + when(_context.getOrderedPreferredPools()).thenReturn(Arrays.asList(2, 1)); when(_adaptiveServerSelector.select(any())).thenReturn("server1"); // Execute @@ -112,7 +112,7 @@ public class PriorityGroupInstanceSelectorTest { // Verify assertNotNull(result); assertEquals(result.getInstance(), "server1"); - assertEquals(result.getReplicaGroup(), 1); + assertEquals(result.getPool(), 1); } @Test @@ -122,7 +122,7 @@ public class PriorityGroupInstanceSelectorTest { createCandidate("server4", -1), createCandidate("server5", -1) ); - when(_context.getOrderedPreferredGroups()).thenReturn(Arrays.asList(2, 1)); + when(_context.getOrderedPreferredPools()).thenReturn(Arrays.asList(2, 1)); when(_adaptiveServerSelector.select(any())).thenReturn("server4"); // Execute @@ -131,7 +131,7 @@ public class PriorityGroupInstanceSelectorTest { // Verify assertNotNull(result); assertEquals(result.getInstance(), "server4"); - assertEquals(result.getReplicaGroup(), -1); + assertEquals(result.getPool(), -1); } @Test @@ -149,7 +149,7 @@ public class PriorityGroupInstanceSelectorTest { createCandidate("server3", 1), createCandidate("server4", -1) ); - when(_context.getOrderedPreferredGroups()).thenReturn(Collections.emptyList()); + when(_context.getOrderedPreferredPools()).thenReturn(Collections.emptyList()); when(_adaptiveServerSelector.fetchServerRankingsWithScores(any())).thenReturn(Arrays.asList( Pair.of("server1", 0.1), Pair.of("server2", 0.2), @@ -173,7 +173,7 @@ public class PriorityGroupInstanceSelectorTest { createCandidate("server3", 1), createCandidate("server4", -1) ); - when(_context.getOrderedPreferredGroups()).thenReturn(Arrays.asList(2, 1)); + when(_context.getOrderedPreferredPools()).thenReturn(Arrays.asList(2, 1)); when(_adaptiveServerSelector.fetchServerRankingsWithScores(any())).thenReturn(Arrays.asList( Pair.of("server1", 0.1), Pair.of("server2", 0.2), @@ -192,7 +192,7 @@ public class PriorityGroupInstanceSelectorTest { private SegmentInstanceCandidate createCandidate(String instance, int replicaGroup) { SegmentInstanceCandidate candidate = mock(SegmentInstanceCandidate.class); when(candidate.getInstance()).thenReturn(instance); - when(candidate.getReplicaGroup()).thenReturn(replicaGroup); + when(candidate.getPool()).thenReturn(replicaGroup); return candidate; } } diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java index 4a6f3f00e7..5ff80bf02b 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java @@ -51,23 +51,23 @@ public class BrokerMeter implements AbstractMetrics.Meter { */ public static final BrokerMeter QUERIES_GLOBAL = create("QUERIES_GLOBAL", "queries", true); /** - * Number of queries executed per replica group. + * Number of queries executed per pool. * <p> - * This metric is used to monitor query traffic distribution across replica groups. + * This metric is used to monitor query traffic distribution across pool. * Currently only includes single-stage queries. */ - public static final BrokerMeter REPLICA_QUERIES = create("REPLICA_QUERIES", "routing", false); + public static final BrokerMeter POOL_QUERIES = create("POOL_QUERIES", "routing", false); /** - * Number of segment selected per replica gorup during query execution. + * Number of segment selected per pool during query execution. * <p> - * This metric is not global and is attached to a particular replica group. + * This metric is not global and is attached to a particular pool. * Currently this counter include single stage queries only. * <p> - * Let's say the query option orderedReferredReplicas is set and a few nodes in the preferred replica group are down. - * The other metric {@link #REPLICA_QUERIES} shows the traffic are relatively equal over replica groups. - * This metric is still going to show that most of segments are still selected from the preferred replica group. + * Let's say the query option orderedReferredPools is set and a few nodes in the preferred pool are down. + * The other metric {@link #POOL_QUERIES} shows the traffic are relatively equal over pool. + * This metric is still going to show that most of segments are still selected from the preferred pool. */ - public static final BrokerMeter REPLICA_SEG_QUERIES = create("REPLICA_SEG_QUERIES", "routing", false); + public static final BrokerMeter POOL_SEG_QUERIES = create("POOL_SEG_QUERIES", "routing", false); /** * Number of multi-stage queries that have been started. * <p> diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMetrics.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMetrics.java index 0c13211698..89d41a0264 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMetrics.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMetrics.java @@ -37,8 +37,8 @@ import static org.apache.pinot.spi.utils.CommonConstants.Broker.DEFAULT_METRICS_ public class BrokerMetrics extends AbstractMetrics<BrokerQueryPhase, BrokerMeter, BrokerGauge, BrokerTimer> { private static final BrokerMetrics NOOP = new BrokerMetrics(new NoopPinotMetricsRegistry()); private static final AtomicReference<BrokerMetrics> BROKER_METRICS_INSTANCE = new AtomicReference<>(NOOP); - private static final String PREFERRED_GROUP_SET_TAG = "preferredGroupOptSet"; - private static final String PREFERRED_GROUP_UNSET_TAG = "preferredGroupOptUnset"; + private static final String PREFERRED_POOL_SET_TAG = "preferredPoolOptSet"; + private static final String PREFERRED_POOL_UNSET_TAG = "preferredPoolOptUnset"; /** * register the brokerMetrics onto this class, so that we don't need to pass it down as a parameter @@ -88,11 +88,13 @@ public class BrokerMetrics extends AbstractMetrics<BrokerQueryPhase, BrokerMeter return BrokerGauge.values(); } - public static String getTagForPreferredGroup(Map<String, String> queryOption) { + public static String getTagForPreferredPool(Map<String, String> queryOption) { if (queryOption == null) { - return PREFERRED_GROUP_UNSET_TAG; + return PREFERRED_POOL_UNSET_TAG; } - return queryOption.containsKey(CommonConstants.Broker.Request.QueryOptionKey.ORDERED_PREFERRED_REPLICAS) - ? PREFERRED_GROUP_SET_TAG : PREFERRED_GROUP_UNSET_TAG; + // backward compatibility to check ORDERED_PREFERRED_REPLICAS here + return (queryOption.containsKey(CommonConstants.Broker.Request.QueryOptionKey.ORDERED_PREFERRED_POOLS) + || queryOption.containsKey(CommonConstants.Broker.Request.QueryOptionKey.ORDERED_PREFERRED_REPLICAS)) + ? PREFERRED_POOL_SET_TAG : PREFERRED_POOL_UNSET_TAG; } } diff --git a/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java b/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java index ba22507905..6678573468 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java @@ -396,14 +396,14 @@ public interface BrokerResponse { Set<String> getTablesQueried(); /** - * Set the replica groups queried in the request - * @param replicaGroups + * Set the pools queried in the request + * @param pools */ - void setReplicaGroups(Set<Integer> replicaGroups); + void setPools(Set<Integer> pools); /** - * Get the replica groups queried in the request + * Get the pools queried in the request * @return */ - Set<Integer> getReplicaGroups(); + Set<Integer> getPools(); } diff --git a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java index 205a3eb706..3d9cf7c79a 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java @@ -56,7 +56,7 @@ import org.apache.pinot.spi.utils.JsonUtils; "explainPlanNumEmptyFilterSegments", "explainPlanNumMatchAllFilterSegments", "traceInfo", "tablesQueried", "offlineThreadMemAllocatedBytes", "realtimeThreadMemAllocatedBytes", "offlineResponseSerMemAllocatedBytes", "realtimeResponseSerMemAllocatedBytes", "offlineTotalMemAllocatedBytes", "realtimeTotalMemAllocatedBytes", - "replicaGroups" + "pools" }) @JsonIgnoreProperties(ignoreUnknown = true) public class BrokerResponseNative implements BrokerResponse { @@ -114,7 +114,7 @@ public class BrokerResponseNative implements BrokerResponse { private Map<String, String> _traceInfo = new HashMap<>(); private Set<String> _tablesQueried = Set.of(); - private Set<Integer> _replicaGroups = Set.of(); + private Set<Integer> _pools = Set.of(); public BrokerResponseNative() { } @@ -570,13 +570,13 @@ public class BrokerResponseNative implements BrokerResponse { } @Override - public void setReplicaGroups(@NotNull Set<Integer> replicaGroups) { - _replicaGroups = replicaGroups; + public void setPools(@NotNull Set<Integer> pools) { + _pools = pools; } @Override @NotNull - public Set<Integer> getReplicaGroups() { - return _replicaGroups; + public Set<Integer> getPools() { + return _pools; } } diff --git a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java index 514ed8e8a1..d9b8bcddc5 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java @@ -50,7 +50,8 @@ import org.apache.pinot.common.response.ProcessingException; "realtimeResponseSerializationCpuTimeNs", "offlineTotalCpuTimeNs", "realtimeTotalCpuTimeNs", "explainPlanNumEmptyFilterSegments", "explainPlanNumMatchAllFilterSegments", "traceInfo", "tablesQueried", "offlineThreadMemAllocatedBytes", "realtimeThreadMemAllocatedBytes", "offlineResponseSerMemAllocatedBytes", - "realtimeResponseSerMemAllocatedBytes", "offlineTotalMemAllocatedBytes", "realtimeTotalMemAllocatedBytes" + "realtimeResponseSerMemAllocatedBytes", "offlineTotalMemAllocatedBytes", "realtimeTotalMemAllocatedBytes", + "pools" }) public class BrokerResponseNativeV2 implements BrokerResponse { private final StatMap<StatKey> _brokerStats = new StatMap<>(StatKey.class); @@ -81,7 +82,7 @@ public class BrokerResponseNativeV2 implements BrokerResponse { private long _brokerReduceTimeMs; private Set<String> _tablesQueried = Set.of(); - private Set<Integer> _replicaGroups = Set.of(); + private Set<Integer> _pools = Set.of(); @JsonInclude(JsonInclude.Include.NON_NULL) @Nullable @@ -394,14 +395,14 @@ public class BrokerResponseNativeV2 implements BrokerResponse { } @Override - public void setReplicaGroups(@NotNull Set<Integer> replicaGroups) { - _replicaGroups = replicaGroups; + public void setPools(@NotNull Set<Integer> pools) { + _pools = pools; } @Override @NotNull - public Set<Integer> getReplicaGroups() { - return _replicaGroups; + public Set<Integer> getPools() { + return _pools; } public void addBrokerStats(StatMap<StatKey> brokerStats) { diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java index 56f4652f7c..9627edda7f 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java @@ -193,19 +193,23 @@ public class QueryOptionsUtils { return checkedParseIntPositive(QueryOptionKey.NUM_REPLICA_GROUPS_TO_QUERY, numReplicaGroupsToQuery); } - public static List<Integer> getOrderedPreferredReplicas(Map<String, String> queryOptions) { - String orderedPreferredReplicas = queryOptions.get(QueryOptionKey.ORDERED_PREFERRED_REPLICAS); - if (orderedPreferredReplicas == null) { + public static List<Integer> getOrderedPreferredPools(Map<String, String> queryOptions) { + String orderedPreferredPools = queryOptions.get(QueryOptionKey.ORDERED_PREFERRED_POOLS); + if (StringUtils.isEmpty(orderedPreferredPools)) { + // backward compatibility + orderedPreferredPools = queryOptions.get(QueryOptionKey.ORDERED_PREFERRED_REPLICAS); + } + if (StringUtils.isEmpty(orderedPreferredPools)) { return Collections.emptyList(); } - // cannot use comma as the delimiter of replica group list + // cannot use comma as the delimiter of pool list // because query option use comma as the delimiter of different options - String[] replicas = orderedPreferredReplicas.split("\\|"); - List<Integer> preferredReplicas = new ArrayList<>(replicas.length); - for (String replica : replicas) { - preferredReplicas.add(Integer.parseInt(replica.trim())); + String[] pools = orderedPreferredPools.split("\\|"); + List<Integer> preferredPools = new ArrayList<>(pools.length); + for (String pool : pools) { + preferredPools.add(Integer.parseInt(pool.trim())); } - return preferredReplicas; + return preferredPools; } public static boolean isExplainPlanVerbose(Map<String, String> queryOptions) { diff --git a/pinot-common/src/test/java/org/apache/pinot/common/metrics/BrokerMetricsTest.java b/pinot-common/src/test/java/org/apache/pinot/common/metrics/BrokerMetricsTest.java index d012fb48dc..05fa98bc04 100644 --- a/pinot-common/src/test/java/org/apache/pinot/common/metrics/BrokerMetricsTest.java +++ b/pinot-common/src/test/java/org/apache/pinot/common/metrics/BrokerMetricsTest.java @@ -30,27 +30,33 @@ import static org.testng.Assert.assertEquals; public class BrokerMetricsTest { @Test - public void testGetTagForPreferredGroup() { + public void testGetTagForPreferredPool() { // Test case 1: queryOption is null - assertEquals(BrokerMetrics.getTagForPreferredGroup(null), "preferredGroupOptUnset", - "Should return preferredGroupOptUnset when queryOption is null"); + assertEquals(BrokerMetrics.getTagForPreferredPool(null), "preferredPoolOptUnset", + "Should return preferredPoolOptUnset when queryOption is null"); // Test case 2: queryOption is empty Map<String, String> emptyQueryOption = new HashMap<>(); - assertEquals(BrokerMetrics.getTagForPreferredGroup(emptyQueryOption), "preferredGroupOptUnset", - "Should return preferredGroupOptUnset when queryOption is empty"); + assertEquals(BrokerMetrics.getTagForPreferredPool(emptyQueryOption), "preferredPoolOptUnset", + "Should return preferredPoolOptUnset when queryOption is empty"); - // Test case 3: queryOption does not contain ORDERED_PREFERRED_REPLICAS - Map<String, String> queryOptionWithoutPreferredGroup = new HashMap<>(); - queryOptionWithoutPreferredGroup.put("someOtherOption", "value"); - assertEquals(BrokerMetrics.getTagForPreferredGroup(queryOptionWithoutPreferredGroup), - "preferredGroupOptUnset", - "Should return preferredGroupOptUnset when queryOption does not contain ORDERED_PREFERRED_REPLICAS"); + // Test case 3: queryOption does not contain ORDERED_PREFERRED_POOLS + Map<String, String> queryOptionWithoutPreferredPool = new HashMap<>(); + queryOptionWithoutPreferredPool.put("someOtherOption", "value"); + assertEquals(BrokerMetrics.getTagForPreferredPool(queryOptionWithoutPreferredPool), + "preferredPoolOptUnset", + "Should return preferredPoolOptUnset when queryOption does not contain ORDERED_PREFERRED_POOLS"); - // Test case 4: queryOption contains ORDERED_PREFERRED_REPLICAS + // Test case 4: queryOption contains ORDERED_PREFERRED_POOLS + Map<String, String> queryOptionWithPreferredPool = new HashMap<>(); + queryOptionWithPreferredPool.put("orderedPreferredPools", "0"); + assertEquals(BrokerMetrics.getTagForPreferredPool(queryOptionWithPreferredPool), "preferredPoolOptSet", + "Should return preferredPoolOptSet when queryOption contains ORDERED_PREFERRED_POOLS"); + + // Test case 5: queryOption contains ORDERED_PREFERRED_REPLICAS Map<String, String> queryOptionWithPreferredGroup = new HashMap<>(); queryOptionWithPreferredGroup.put("orderedPreferredReplicas", "0"); - assertEquals(BrokerMetrics.getTagForPreferredGroup(queryOptionWithPreferredGroup), "preferredGroupOptSet", - "Should return preferredGroupOptSet when queryOption contains ORDERED_PREFERRED_REPLICAS"); + assertEquals(BrokerMetrics.getTagForPreferredPool(queryOptionWithPreferredGroup), "preferredPoolOptSet", + "Should return preferredPoolOptSet when queryOption contains ORDERED_PREFERRED_POOLS"); } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerInstance.java b/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerInstance.java index 1a32a16552..2df2b3a934 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerInstance.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerInstance.java @@ -32,7 +32,7 @@ import org.apache.pinot.spi.utils.CommonConstants.Helix; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.apache.pinot.spi.utils.CommonConstants.Broker.FALLBACK_REPLICA_GROUP_ID; +import static org.apache.pinot.spi.utils.CommonConstants.Broker.FALLBACK_POOL_ID; public final class ServerInstance { @@ -102,7 +102,7 @@ public final class ServerInstance { _queryServicePort = INVALID_PORT; _queryMailboxPort = INVALID_PORT; _adminEndpoint = null; - _pool = FALLBACK_REPLICA_GROUP_ID; + _pool = FALLBACK_POOL_ID; } public String getInstanceId() { @@ -194,12 +194,12 @@ public final class ServerInstance { int extractPool(InstanceConfig instanceConfig) { Map<String, String> pools = instanceConfig.getRecord().getMapField(InstanceUtils.POOL_KEY); if (pools == null || pools.isEmpty()) { - return FALLBACK_REPLICA_GROUP_ID; + return FALLBACK_POOL_ID; } Set<String> groups = new HashSet<>(pools.values()); if (groups.size() != 1) { LOGGER.warn("Instance: {} belongs to multiple groups: {}", _instanceId, groups); - return FALLBACK_REPLICA_GROUP_ID; + return FALLBACK_POOL_ID; } // The type of the field pools of org.apache.pinot.spi.config.instance.Instance uses Map<String, Integer>. // Thus it is safe to directly use Integer.parseInt without checking the parsing exception diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java index 8715b66db3..8cfb7a6e88 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java @@ -295,12 +295,12 @@ public abstract class BaseClusterIntegrationTestSet extends BaseClusterIntegrati + "'DL' ORDER BY ArrTime DESC"; testQuery(query, h2Query); - // Test orderedPreferredReplicas option which will fallbacks to non preferred replica groups - // when non of preferred replicas is available - query = "SELECT count(*) FROM mytable WHERE OriginState LIKE 'A_' option(orderedPreferredReplicas=0|1)"; + // Test orderedPreferredPools option which will fallbacks to non preferred Pools + // when non of preferred Pools is available + query = "SELECT count(*) FROM mytable WHERE OriginState LIKE 'A_' option(orderedPreferredPools=0|1)"; h2Query = "SELECT count(*) FROM mytable WHERE OriginState LIKE 'A_'"; testQuery(query, h2Query); - query = "SET orderedPreferredReplicas='0 | 1'; SELECT count(*) FROM mytable WHERE OriginState LIKE 'A_'"; + query = "SET orderedPreferredPools='0 | 1'; SELECT count(*) FROM mytable WHERE OriginState LIKE 'A_'"; h2Query = "SELECT count(*) FROM mytable WHERE OriginState LIKE 'A_'"; testQuery(query, h2Query); } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java index 308cbd8d14..af549ea6f9 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java @@ -487,7 +487,10 @@ public class CommonConstants { public static final boolean DEFAULT_ENABLE_DYNAMIC_FILTERING_SEMI_JOIN = true; // When the server instance's pool field is null or the pool contains multi distinguished group value, the broker - // would set the group to -1 in the routing table for that server. + // would set the pool to -1 in the routing table for that server. + public static final int FALLBACK_POOL_ID = -1; + // keep the variable to pass the compability test + @Deprecated public static final int FALLBACK_REPLICA_GROUP_ID = -1; public static class Request { @@ -543,7 +546,9 @@ public class CommonConstants { public static final String NUM_REPLICA_GROUPS_TO_QUERY = "numReplicaGroupsToQuery"; + @Deprecated public static final String ORDERED_PREFERRED_REPLICAS = "orderedPreferredReplicas"; + public static final String ORDERED_PREFERRED_POOLS = "orderedPreferredPools"; public static final String USE_FIXED_REPLICA = "useFixedReplica"; public static final String EXPLAIN_PLAN_VERBOSE = "explainPlanVerbose"; public static final String USE_MULTISTAGE_ENGINE = "useMultistageEngine"; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org