This is an automated email from the ASF dual-hosted git repository.
yashmayya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new d47ea27b4ee Make query routing strategy pluggable (#17364)
d47ea27b4ee is described below
commit d47ea27b4ee724ea4b52aa07db2ef22cfd5071fd
Author: Yash Mayya <[email protected]>
AuthorDate: Mon Dec 15 17:29:35 2025 -0800
Make query routing strategy pluggable (#17364)
---
.../pinot/broker/routing/BrokerRoutingManager.java | 5 +-
.../instanceselector/BalancedInstanceSelector.java | 13 +-
.../instanceselector/BaseInstanceSelector.java | 42 +++---
.../routing/instanceselector/InstanceSelector.java | 11 +-
.../instanceselector/InstanceSelectorFactory.java | 86 ++++++++---
.../MultiStageReplicaGroupSelector.java | 18 ++-
.../ReplicaGroupInstanceSelector.java | 130 +++++++++++++++--
.../StrictReplicaGroupInstanceSelector.java | 129 +----------------
.../instanceselector/InstanceSelectorTest.java | 157 +++++++++++----------
.../MultiStageReplicaGroupSelectorTest.java | 34 +++--
.../pinot/spi/config/table/RoutingConfig.java | 3 +-
11 files changed, 342 insertions(+), 286 deletions(-)
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java
index 3bc06585e44..fa1d66c1a63 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java
@@ -709,9 +709,8 @@ public class BrokerRoutingManager implements
RoutingManager, ClusterChangeHandle
AdaptiveServerSelectorFactory.getAdaptiveServerSelector(_serverRoutingStatsManager,
_pinotConfig);
InstanceSelector instanceSelector =
InstanceSelectorFactory.getInstanceSelector(tableConfig,
_propertyStore, _brokerMetrics,
- adaptiveServerSelector, _pinotConfig);
- instanceSelector.init(_routableServers, _enabledServerInstanceMap,
idealState, externalView,
- preSelectedOnlineSegments);
+ adaptiveServerSelector, _pinotConfig, _routableServers,
_enabledServerInstanceMap, idealState,
+ externalView, preSelectedOnlineSegments);
// Add time boundary manager if both offline and real-time part exist
for a hybrid table
TimeBoundaryManager timeBoundaryManager = null;
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BalancedInstanceSelector.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BalancedInstanceSelector.java
index 4afd3a26dcd..63e80f9e3e7 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BalancedInstanceSelector.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BalancedInstanceSelector.java
@@ -18,15 +18,10 @@
*/
package org.apache.pinot.broker.routing.instanceselector;
-import java.time.Clock;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import javax.annotation.Nullable;
import org.apache.commons.lang3.tuple.Pair;
-import org.apache.helix.store.zk.ZkHelixPropertyStore;
-import org.apache.helix.zookeeper.datamodel.ZNRecord;
-import
org.apache.pinot.broker.routing.adaptiveserverselector.AdaptiveServerSelector;
import
org.apache.pinot.broker.routing.adaptiveserverselector.ServerSelectionContext;
import org.apache.pinot.common.metrics.BrokerMeter;
import org.apache.pinot.common.metrics.BrokerMetrics;
@@ -49,14 +44,8 @@ import org.apache.pinot.common.utils.HashUtil;
///
public class BalancedInstanceSelector extends BaseInstanceSelector {
- public BalancedInstanceSelector(String tableNameWithType,
ZkHelixPropertyStore<ZNRecord> propertyStore,
- BrokerMetrics brokerMetrics, @Nullable AdaptiveServerSelector
adaptiveServerSelector, Clock clock,
- InstanceSelectorConfig config) {
- super(tableNameWithType, propertyStore, brokerMetrics,
adaptiveServerSelector, clock, config);
- }
-
@Override
- Pair<Map<String, String>, Map<String, String>> select(List<String> segments,
int requestId,
+ public Pair<Map<String, String>, Map<String, String>> select(List<String>
segments, int requestId,
SegmentStates segmentStates, Map<String, String> queryOptions) {
Map<String, String> segmentToSelectedInstanceMap = new
HashMap<>(HashUtil.getHashMapCapacity(segments.size()));
// No need to adjust this map per total segment numbers, as optional
segments should be empty most of the time.
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java
index 5acd67f25b5..45f95f7e41b 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java
@@ -47,6 +47,7 @@ import org.apache.pinot.common.request.BrokerRequest;
import org.apache.pinot.common.utils.HashUtil;
import org.apache.pinot.common.utils.SegmentUtils;
import org.apache.pinot.core.transport.ServerInstance;
+import org.apache.pinot.spi.config.table.TableConfig;
import
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.slf4j.Logger;
@@ -81,22 +82,23 @@ import static
org.apache.pinot.spi.utils.CommonConstants.Broker.FALLBACK_POOL_ID
/// creation time more than 5 minutes ago).
/// TODO: refresh new/old segment state where there is no update from helix
for long time.
///
-abstract class BaseInstanceSelector implements InstanceSelector {
+public abstract class BaseInstanceSelector implements InstanceSelector {
private static final Logger LOGGER =
LoggerFactory.getLogger(BaseInstanceSelector.class);
// To prevent int overflow, reset the request id once it reaches this value
private static final long MAX_REQUEST_ID = 1_000_000_000;
- final String _tableNameWithType;
- final ZkHelixPropertyStore<ZNRecord> _propertyStore;
- final BrokerMetrics _brokerMetrics;
- final AdaptiveServerSelector _adaptiveServerSelector;
+ protected TableConfig _tableConfig;
+ protected String _tableNameWithType;
+ protected ZkHelixPropertyStore<ZNRecord> _propertyStore;
+ protected BrokerMetrics _brokerMetrics;
+ protected AdaptiveServerSelector _adaptiveServerSelector;
// Will be null if and only if adaptiveServerSelector is null
- final PriorityPoolInstanceSelector _priorityPoolInstanceSelector;
- final Clock _clock;
- final InstanceSelectorConfig _config;
- final long _newSegmentExpirationTimeInSeconds;
- final boolean _emitSinglePoolSegmentsMetric;
- final int _tableNameHashForFixedReplicaRouting;
+ protected PriorityPoolInstanceSelector _priorityPoolInstanceSelector;
+ protected Clock _clock;
+ protected InstanceSelectorConfig _config;
+ protected long _newSegmentExpirationTimeInSeconds;
+ protected boolean _emitSinglePoolSegmentsMetric;
+ protected int _tableNameHashForFixedReplicaRouting;
// These 3 variables are the cached states to help accelerate the change
processing
Set<String> _enabledInstances;
@@ -109,10 +111,13 @@ abstract class BaseInstanceSelector implements
InstanceSelector {
private volatile SegmentStates _segmentStates;
private Map<String, ServerInstance> _enabledServerStore;
- BaseInstanceSelector(String tableNameWithType,
ZkHelixPropertyStore<ZNRecord> propertyStore,
+ @Override
+ public void init(TableConfig tableConfig, ZkHelixPropertyStore<ZNRecord>
propertyStore,
BrokerMetrics brokerMetrics, @Nullable AdaptiveServerSelector
adaptiveServerSelector, Clock clock,
- InstanceSelectorConfig config) {
- _tableNameWithType = tableNameWithType;
+ InstanceSelectorConfig config, Set<String> enabledInstances, Map<String,
ServerInstance> enabledServerMap,
+ IdealState idealState, ExternalView externalView, Set<String>
onlineSegments) {
+ _tableConfig = tableConfig;
+ _tableNameWithType = tableConfig.getTableName();
_propertyStore = propertyStore;
_brokerMetrics = brokerMetrics;
_adaptiveServerSelector = adaptiveServerSelector;
@@ -124,7 +129,7 @@ abstract class BaseInstanceSelector implements
InstanceSelector {
// instance
// Math.abs(Integer.MIN_VALUE) = Integer.MIN_VALUE, so we use & 0x7FFFFFFF
to get a positive value
_tableNameHashForFixedReplicaRouting =
- TableNameBuilder.extractRawTableName(tableNameWithType).hashCode() &
0x7FFFFFFF;
+ TableNameBuilder.extractRawTableName(_tableNameWithType).hashCode() &
0x7FFFFFFF;
_priorityPoolInstanceSelector =
_adaptiveServerSelector == null ? null : new
PriorityPoolInstanceSelector(_adaptiveServerSelector);
@@ -132,11 +137,6 @@ abstract class BaseInstanceSelector implements
InstanceSelector {
throw new IllegalArgumentException(
"AdaptiveServerSelector and consistent routing cannot be enabled at
the same time");
}
- }
-
- @Override
- public void init(Set<String> enabledInstances, Map<String, ServerInstance>
enabledServerMap, IdealState idealState,
- ExternalView externalView, Set<String> onlineSegments) {
_enabledInstances = enabledInstances;
_enabledServerStore = enabledServerMap;
Map<String, Long> newSegmentCreationTimeMap =
@@ -494,6 +494,6 @@ abstract class BaseInstanceSelector implements
InstanceSelector {
* segments are used to get the new segments that are not online yet.
Instead of simply skipping them by broker at
* routing time, we can send them to servers and let servers decide how to
handle them.
*/
- abstract Pair<Map<String, String>, Map<String, String>/*optional segments*/>
select(List<String> segments,
+ protected abstract Pair<Map<String, String>, Map<String, String>/*optional
segments*/> select(List<String> segments,
int requestId, SegmentStates segmentStates, Map<String, String>
queryOptions);
}
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelector.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelector.java
index c1eb1cf77e5..25a54cd7fd7 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelector.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelector.java
@@ -18,16 +18,23 @@
*/
package org.apache.pinot.broker.routing.instanceselector;
+import java.time.Clock;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import
org.apache.pinot.broker.routing.adaptiveserverselector.AdaptiveServerSelector;
import org.apache.pinot.broker.routing.segmentpreselector.SegmentPreSelector;
+import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.common.request.BrokerRequest;
import org.apache.pinot.core.transport.ServerInstance;
+import org.apache.pinot.spi.config.table.TableConfig;
/**
@@ -49,7 +56,9 @@ public interface InstanceSelector {
* (segments with ONLINE/CONSUMING instances in the ideal state and
pre-selected by the {@link SegmentPreSelector}).
* Should be called only once before calling other methods.
*/
- void init(Set<String> enabledInstances, Map<String, ServerInstance>
enabledServerMap,
+ void init(TableConfig tableConfig, ZkHelixPropertyStore<ZNRecord>
propertyStore,
+ BrokerMetrics brokerMetrics, @Nullable AdaptiveServerSelector
adaptiveServerSelector, Clock clock,
+ InstanceSelectorConfig config, Set<String> enabledInstances, Map<String,
ServerInstance> enabledServerMap,
IdealState idealState, ExternalView externalView, Set<String>
onlineSegments);
/**
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorFactory.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorFactory.java
index f49de38ac57..49536b93a8f 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorFactory.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorFactory.java
@@ -20,15 +20,21 @@ package org.apache.pinot.broker.routing.instanceselector;
import com.google.common.annotations.VisibleForTesting;
import java.time.Clock;
+import java.util.Map;
+import java.util.Set;
import javax.annotation.Nullable;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import
org.apache.pinot.broker.routing.adaptiveserverselector.AdaptiveServerSelector;
import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.core.transport.ServerInstance;
import org.apache.pinot.spi.config.table.RoutingConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.plugin.PluginManager;
import org.apache.pinot.spi.utils.CommonConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,20 +51,27 @@ public class InstanceSelectorFactory {
@VisibleForTesting
public static InstanceSelector getInstanceSelector(TableConfig tableConfig,
- ZkHelixPropertyStore<ZNRecord> propertyStore, BrokerMetrics
brokerMetrics, PinotConfiguration brokerConfig) {
- return getInstanceSelector(tableConfig, propertyStore, brokerMetrics,
null, Clock.systemUTC(), brokerConfig);
+ ZkHelixPropertyStore<ZNRecord> propertyStore, BrokerMetrics
brokerMetrics, PinotConfiguration brokerConfig,
+ Set<String> enabledInstances, Map<String, ServerInstance>
enabledServerMap, IdealState idealState,
+ ExternalView externalView, Set<String> onlineSegments) {
+ return getInstanceSelector(tableConfig, propertyStore, brokerMetrics,
null, Clock.systemUTC(), brokerConfig,
+ enabledInstances, enabledServerMap, idealState, externalView,
onlineSegments);
}
public static InstanceSelector getInstanceSelector(TableConfig tableConfig,
ZkHelixPropertyStore<ZNRecord> propertyStore, BrokerMetrics
brokerMetrics,
- @Nullable AdaptiveServerSelector adaptiveServerSelector,
PinotConfiguration brokerConfig) {
+ @Nullable AdaptiveServerSelector adaptiveServerSelector,
PinotConfiguration brokerConfig,
+ Set<String> enabledInstances, Map<String, ServerInstance>
enabledServerMap, IdealState idealState,
+ ExternalView externalView, Set<String> onlineSegments) {
return getInstanceSelector(tableConfig, propertyStore, brokerMetrics,
adaptiveServerSelector, Clock.systemUTC(),
- brokerConfig);
+ brokerConfig, enabledInstances, enabledServerMap, idealState,
externalView, onlineSegments);
}
public static InstanceSelector getInstanceSelector(TableConfig tableConfig,
ZkHelixPropertyStore<ZNRecord> propertyStore, BrokerMetrics
brokerMetrics,
- @Nullable AdaptiveServerSelector adaptiveServerSelector, Clock clock,
PinotConfiguration brokerConfig) {
+ @Nullable AdaptiveServerSelector adaptiveServerSelector, Clock clock,
PinotConfiguration brokerConfig,
+ Set<String> enabledInstances, Map<String, ServerInstance>
enabledServerMap, IdealState idealState,
+ ExternalView externalView, Set<String> onlineSegments) {
String tableNameWithType = tableConfig.getTableName();
RoutingConfig routingConfig = tableConfig.getRoutingConfig();
boolean useFixedReplica =
brokerConfig.getProperty(CommonConstants.Broker.CONFIG_OF_USE_FIXED_REPLICA,
@@ -74,29 +87,58 @@ public class InstanceSelectorFactory {
CommonConstants.Broker.DEFAULT_ENABLE_SINGLE_POOL_SEGMENTS_METRIC);
InstanceSelectorConfig config = new
InstanceSelectorConfig(useFixedReplica, newSegmentExpirationTimeInSeconds,
emitSinglePoolSegmentsMetric);
+
+ InstanceSelector instanceSelector = null;
if (routingConfig != null) {
- if
(RoutingConfig.REPLICA_GROUP_INSTANCE_SELECTOR_TYPE.equalsIgnoreCase(routingConfig.getInstanceSelectorType())
- || (tableConfig.getTableType() == TableType.OFFLINE &&
LEGACY_REPLICA_GROUP_OFFLINE_ROUTING.equalsIgnoreCase(
+ if ((tableConfig.getTableType() == TableType.OFFLINE &&
LEGACY_REPLICA_GROUP_OFFLINE_ROUTING.equalsIgnoreCase(
routingConfig.getRoutingTableBuilderName())) ||
(tableConfig.getTableType() == TableType.REALTIME
&&
LEGACY_REPLICA_GROUP_REALTIME_ROUTING.equalsIgnoreCase(routingConfig.getRoutingTableBuilderName())))
{
LOGGER.info("Using ReplicaGroupInstanceSelector for table: {}",
tableNameWithType);
- return new ReplicaGroupInstanceSelector(tableNameWithType,
propertyStore, brokerMetrics, adaptiveServerSelector,
- clock, config);
- }
- if
(RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE.equalsIgnoreCase(
- routingConfig.getInstanceSelectorType())) {
- LOGGER.info("Using StrictReplicaGroupInstanceSelector for table: {}",
tableNameWithType);
- return new StrictReplicaGroupInstanceSelector(tableNameWithType,
propertyStore, brokerMetrics,
- adaptiveServerSelector, clock, config);
+ instanceSelector = new ReplicaGroupInstanceSelector();
}
- if
(RoutingConfig.MULTI_STAGE_REPLICA_GROUP_SELECTOR_TYPE.equalsIgnoreCase(
- routingConfig.getInstanceSelectorType())) {
- LOGGER.info("Using {} for table: {}",
routingConfig.getInstanceSelectorType(), tableNameWithType);
- return new MultiStageReplicaGroupSelector(tableNameWithType,
propertyStore, brokerMetrics,
- adaptiveServerSelector, clock, config);
+
+ if (routingConfig.getInstanceSelectorType() != null) {
+ switch (routingConfig.getInstanceSelectorType()) {
+ case RoutingConfig.REPLICA_GROUP_INSTANCE_SELECTOR_TYPE: {
+ LOGGER.info("Using ReplicaGroupInstanceSelector for table: {}",
tableNameWithType);
+ instanceSelector = new ReplicaGroupInstanceSelector();
+ break;
+ }
+ case RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE: {
+ LOGGER.info("Using StrictReplicaGroupInstanceSelector for table:
{}", tableNameWithType);
+ instanceSelector = new StrictReplicaGroupInstanceSelector();
+ break;
+ }
+ case RoutingConfig.MULTI_STAGE_REPLICA_GROUP_SELECTOR_TYPE: {
+ LOGGER.info("Using {} for table: {}",
routingConfig.getInstanceSelectorType(), tableNameWithType);
+ instanceSelector = new MultiStageReplicaGroupSelector();
+ break;
+ }
+ case RoutingConfig.BALANCED_INSTANCE_SELECTOR_TYPE: {
+ LOGGER.info("Using BalancedInstanceSelector for table: {}",
tableNameWithType);
+ instanceSelector = new BalancedInstanceSelector();
+ break;
+ }
+ default: {
+ // Try to load custom instance selector
+ try {
+ instanceSelector =
PluginManager.get().createInstance(routingConfig.getInstanceSelectorType());
+ break;
+ } catch (Exception e) {
+ LOGGER.error("Failed to create instance selector for table: {}",
tableNameWithType, e);
+ throw new RuntimeException(e);
+ }
+ }
+ }
}
}
- return new BalancedInstanceSelector(tableNameWithType, propertyStore,
brokerMetrics, adaptiveServerSelector, clock,
- config);
+
+ if (instanceSelector == null) {
+ instanceSelector = new BalancedInstanceSelector();
+ }
+
+ instanceSelector.init(tableConfig, propertyStore, brokerMetrics,
adaptiveServerSelector, clock,
+ config, enabledInstances, enabledServerMap, idealState, externalView,
onlineSegments);
+ return instanceSelector;
}
}
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/MultiStageReplicaGroupSelector.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/MultiStageReplicaGroupSelector.java
index 83cad54e44a..7c8a74f8afc 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/MultiStageReplicaGroupSelector.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/MultiStageReplicaGroupSelector.java
@@ -41,6 +41,7 @@ import
org.apache.pinot.common.assignment.InstancePartitionsUtils;
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.core.transport.ServerInstance;
+import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
@@ -62,16 +63,13 @@ public class MultiStageReplicaGroupSelector extends
BaseInstanceSelector {
private volatile InstancePartitions _instancePartitions;
- public MultiStageReplicaGroupSelector(String tableNameWithType,
ZkHelixPropertyStore<ZNRecord> propertyStore,
- BrokerMetrics brokerMetrics, @Nullable AdaptiveServerSelector
adaptiveServerSelector, Clock clock,
- InstanceSelectorConfig config) {
- super(tableNameWithType, propertyStore, brokerMetrics,
adaptiveServerSelector, clock, config);
- }
-
@Override
- public void init(Set<String> enabledInstances, Map<String, ServerInstance>
enabledServerMap, IdealState idealState,
- ExternalView externalView, Set<String> onlineSegments) {
- super.init(enabledInstances, enabledServerMap, idealState, externalView,
onlineSegments);
+ public void init(TableConfig tableConfig, ZkHelixPropertyStore<ZNRecord>
propertyStore,
+ BrokerMetrics brokerMetrics, @Nullable AdaptiveServerSelector
adaptiveServerSelector, Clock clock,
+ InstanceSelectorConfig config, Set<String> enabledInstances, Map<String,
ServerInstance> enabledServerMap,
+ IdealState idealState, ExternalView externalView, Set<String>
onlineSegments) {
+ super.init(tableConfig, propertyStore, brokerMetrics,
adaptiveServerSelector, clock, config, enabledInstances,
+ enabledServerMap, idealState, externalView, onlineSegments);
_instancePartitions = getInstancePartitions();
}
@@ -88,7 +86,7 @@ public class MultiStageReplicaGroupSelector extends
BaseInstanceSelector {
}
@Override
- Pair<Map<String, String>, Map<String, String>> select(List<String> segments,
int requestId,
+ public Pair<Map<String, String>, Map<String, String>> select(List<String>
segments, int requestId,
SegmentStates segmentStates, Map<String, String> queryOptions) {
// Create a copy of InstancePartitions to avoid race-condition with
event-listeners above.
InstancePartitions instancePartitions = _instancePartitions;
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/ReplicaGroupInstanceSelector.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/ReplicaGroupInstanceSelector.java
index 88793eac7cb..ab14a4d9bbc 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/ReplicaGroupInstanceSelector.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/ReplicaGroupInstanceSelector.java
@@ -18,23 +18,26 @@
*/
package org.apache.pinot.broker.routing.instanceselector;
-import java.time.Clock;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import javax.annotation.Nullable;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.tuple.Pair;
-import org.apache.helix.store.zk.ZkHelixPropertyStore;
-import org.apache.helix.zookeeper.datamodel.ZNRecord;
-import
org.apache.pinot.broker.routing.adaptiveserverselector.AdaptiveServerSelector;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
import
org.apache.pinot.broker.routing.adaptiveserverselector.ServerSelectionContext;
import org.apache.pinot.common.metrics.BrokerMeter;
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.common.utils.HashUtil;
import org.apache.pinot.common.utils.config.QueryOptionsUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
@@ -65,14 +68,10 @@ import
org.apache.pinot.common.utils.config.QueryOptionsUtils;
*/
public class ReplicaGroupInstanceSelector extends BaseInstanceSelector {
- public ReplicaGroupInstanceSelector(String tableNameWithType,
ZkHelixPropertyStore<ZNRecord> propertyStore,
- BrokerMetrics brokerMetrics, @Nullable AdaptiveServerSelector
adaptiveServerSelector, Clock clock,
- InstanceSelectorConfig config) {
- super(tableNameWithType, propertyStore, brokerMetrics,
adaptiveServerSelector, clock, config);
- }
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ReplicaGroupInstanceSelector.class);
@Override
- Pair<Map<String, String>, Map<String, String>> select(List<String> segments,
int requestId,
+ public Pair<Map<String, String>, Map<String, String>> select(List<String>
segments, int requestId,
SegmentStates segmentStates, Map<String, String> queryOptions) {
ServerSelectionContext ctx = new ServerSelectionContext(queryOptions,
_config);
if (_adaptiveServerSelector != null) {
@@ -166,4 +165,115 @@ public class ReplicaGroupInstanceSelector extends
BaseInstanceSelector {
}
return new ArrayList<>(candidateServers.values());
}
+
+ @Override
+ void updateSegmentMaps(IdealState idealState, ExternalView externalView,
Set<String> onlineSegments,
+ Map<String, Long> newSegmentCreationTimeMap) {
+ if (_tableConfig.isUpsertEnabled() || _tableConfig.isDedupEnabled()) {
+ updateSegmentMapsForUpsertTable(idealState, externalView,
onlineSegments, newSegmentCreationTimeMap);
+ } else {
+ super.updateSegmentMaps(idealState, externalView, onlineSegments,
newSegmentCreationTimeMap);
+ }
+ }
+
+ /**
+ *
+ * <pre>
+ * Instances unavailable for any old segment should not exist in
_oldSegmentCandidatesMap or _newSegmentStateMap for
+ * segments with the same instances in ideal state.
+ *
+ * The maps are calculated in the following steps to meet the strict
replica-group guarantee:
+ * 1. Compute the online instances for both old and new segments
+ * 2. Compare online instances for old segments with instances in ideal
state and gather the unavailable instances
+ * for each set of instances
+ * 3. Exclude the unavailable instances from the online instances map for
both old and new segment map
+ * </pre>
+ */
+ void updateSegmentMapsForUpsertTable(IdealState idealState, ExternalView
externalView, Set<String> onlineSegments,
+ Map<String, Long> newSegmentCreationTimeMap) {
+ _oldSegmentCandidatesMap.clear();
+ int newSegmentMapCapacity =
HashUtil.getHashMapCapacity(newSegmentCreationTimeMap.size());
+ _newSegmentStateMap = new HashMap<>(newSegmentMapCapacity);
+
+ Map<String, Map<String, String>> idealStateAssignment =
idealState.getRecord().getMapFields();
+ Map<String, Map<String, String>> externalViewAssignment =
externalView.getRecord().getMapFields();
+
+ // Get the online instances for the segments
+ Map<String, Set<String>> oldSegmentToOnlineInstancesMap =
+ new HashMap<>(HashUtil.getHashMapCapacity(onlineSegments.size()));
+ Map<String, Set<String>> newSegmentToOnlineInstancesMap = new
HashMap<>(newSegmentMapCapacity);
+ for (String segment : onlineSegments) {
+ Map<String, String> idealStateInstanceStateMap =
idealStateAssignment.get(segment);
+ assert idealStateInstanceStateMap != null;
+ Map<String, String> externalViewInstanceStateMap =
externalViewAssignment.get(segment);
+ Set<String> onlineInstances;
+ if (externalViewInstanceStateMap == null) {
+ onlineInstances = Collections.emptySet();
+ } else {
+ onlineInstances = getOnlineInstances(idealStateInstanceStateMap,
externalViewInstanceStateMap);
+ }
+ if (newSegmentCreationTimeMap.containsKey(segment)) {
+ newSegmentToOnlineInstancesMap.put(segment, onlineInstances);
+ } else {
+ oldSegmentToOnlineInstancesMap.put(segment, onlineInstances);
+ }
+ }
+
+ // Calculate the unavailable instances based on the old segments' online
instances for each combination of instances
+ // in the ideal state
+ Map<Set<String>, Set<String>> unavailableInstancesMap = new HashMap<>();
+ for (Map.Entry<String, Set<String>> entry :
oldSegmentToOnlineInstancesMap.entrySet()) {
+ String segment = entry.getKey();
+ Set<String> onlineInstances = entry.getValue();
+ Map<String, String> idealStateInstanceStateMap =
idealStateAssignment.get(segment);
+ Set<String> instancesInIdealState = idealStateInstanceStateMap.keySet();
+ Set<String> unavailableInstances =
+ unavailableInstancesMap.computeIfAbsent(instancesInIdealState, k ->
new HashSet<>());
+ for (String instance : instancesInIdealState) {
+ if (!onlineInstances.contains(instance)) {
+ if (unavailableInstances.add(instance)) {
+ LOGGER.warn(
+ "Found unavailable instance: {} in instance group: {} for
segment: {}, table: {} (IS: {}, EV: {})",
+ instance, instancesInIdealState, segment, _tableNameWithType,
idealStateInstanceStateMap,
+ externalViewAssignment.get(segment));
+ }
+ }
+ }
+ }
+
+ // Iterate over the maps and exclude the unavailable instances
+ for (Map.Entry<String, Set<String>> entry :
oldSegmentToOnlineInstancesMap.entrySet()) {
+ String segment = entry.getKey();
+ // NOTE: onlineInstances is either a TreeSet or an EmptySet (sorted)
+ Set<String> onlineInstances = entry.getValue();
+ Map<String, String> idealStateInstanceStateMap =
idealStateAssignment.get(segment);
+ Set<String> unavailableInstances =
unavailableInstancesMap.get(idealStateInstanceStateMap.keySet());
+ List<SegmentInstanceCandidate> candidates = new
ArrayList<>(onlineInstances.size());
+ for (String instance : onlineInstances) {
+ if (!unavailableInstances.contains(instance)) {
+ candidates.add(new SegmentInstanceCandidate(instance, true,
getPool(instance)));
+ }
+ }
+ _oldSegmentCandidatesMap.put(segment, candidates);
+ }
+
+ for (Map.Entry<String, Set<String>> entry :
newSegmentToOnlineInstancesMap.entrySet()) {
+ String segment = entry.getKey();
+ Set<String> onlineInstances = entry.getValue();
+ Map<String, String> idealStateInstanceStateMap =
idealStateAssignment.get(segment);
+ Set<String> unavailableInstances =
+
unavailableInstancesMap.getOrDefault(idealStateInstanceStateMap.keySet(),
Collections.emptySet());
+ List<SegmentInstanceCandidate> candidates = new
ArrayList<>(idealStateInstanceStateMap.size());
+ for (String instance :
convertToSortedMap(idealStateInstanceStateMap).keySet()) {
+ if (!unavailableInstances.contains(instance)) {
+ candidates.add(new SegmentInstanceCandidate(instance,
onlineInstances.contains(instance), getPool(instance)));
+ }
+ }
+ _newSegmentStateMap.put(segment, new
NewSegmentState(newSegmentCreationTimeMap.get(segment), candidates));
+ }
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("Got _newSegmentStateMap: {}, _oldSegmentCandidatesMap:
{}", _newSegmentStateMap.keySet(),
+ _oldSegmentCandidatesMap.keySet());
+ }
+ }
}
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/StrictReplicaGroupInstanceSelector.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/StrictReplicaGroupInstanceSelector.java
index 0cb56c5be95..04e3b25b359 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/StrictReplicaGroupInstanceSelector.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/StrictReplicaGroupInstanceSelector.java
@@ -18,25 +18,10 @@
*/
package org.apache.pinot.broker.routing.instanceselector;
-import java.time.Clock;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
import java.util.Map;
import java.util.Set;
-import javax.annotation.Nullable;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
-import org.apache.helix.store.zk.ZkHelixPropertyStore;
-import org.apache.helix.zookeeper.datamodel.ZNRecord;
-import
org.apache.pinot.broker.routing.adaptiveserverselector.AdaptiveServerSelector;
-import org.apache.pinot.common.metrics.BrokerMetrics;
-import org.apache.pinot.common.utils.HashUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
/**
* Instance selector for strict replica-group routing strategy.
@@ -45,9 +30,10 @@ import org.slf4j.LoggerFactory;
* The strict replica-group routing strategy always routes the query to the
instances within the same replica-group.
* (Note that the replica-group information is derived from the ideal state of
the table, where the instances are sorted
* alphabetically in the instance state map, so the replica-groups in the
instance selector might not match the
- * replica-groups in the instance partitions.) The instances in a
replica-group should have all the online segments
- * (segments with ONLINE/CONSUMING instances in the ideal state and selected
by the pre-selector) available
- * (ONLINE/CONSUMING in the external view) in order to serve queries. If any
segment is unavailable in the
+ * replica-groups in the instance partitions). The goal of this algorithm is
to ensure that segments from the same
+ * partition are never served from multiple different instances. The instances
in a replica-group should have all the
+ * online segments (segments with ONLINE/CONSUMING instances in the ideal
state and selected by the pre-selector)
+ * available (ONLINE/CONSUMING in the external view) in order to serve
queries. If any segment is unavailable in the
* replica-group, we mark the whole replica-group down and not serve queries
with this replica-group.
*
* The selection algorithm is the same as {@link
ReplicaGroupInstanceSelector}, and will always evenly distribute the
@@ -68,115 +54,10 @@ import org.slf4j.LoggerFactory;
* </pre>
*/
public class StrictReplicaGroupInstanceSelector extends
ReplicaGroupInstanceSelector {
- private static final Logger LOGGER =
LoggerFactory.getLogger(StrictReplicaGroupInstanceSelector.class);
-
- public StrictReplicaGroupInstanceSelector(String tableNameWithType,
ZkHelixPropertyStore<ZNRecord> propertyStore,
- BrokerMetrics brokerMetrics, @Nullable AdaptiveServerSelector
adaptiveServerSelector, Clock clock,
- InstanceSelectorConfig config) {
- super(tableNameWithType, propertyStore, brokerMetrics,
adaptiveServerSelector, clock, config);
- }
- /**
- * {@inheritDoc}
- *
- * <pre>
- * Instances unavailable for any old segment should not exist in
_oldSegmentCandidatesMap or _newSegmentStateMap for
- * segments with the same instances in ideal state.
- *
- * The maps are calculated in the following steps to meet the strict
replica-group guarantee:
- * 1. Compute the online instances for both old and new segments
- * 2. Compare online instances for old segments with instances in ideal
state and gather the unavailable instances
- * for each set of instances
- * 3. Exclude the unavailable instances from the online instances map for
both old and new segment map
- * </pre>
- */
@Override
void updateSegmentMaps(IdealState idealState, ExternalView externalView,
Set<String> onlineSegments,
Map<String, Long> newSegmentCreationTimeMap) {
- _oldSegmentCandidatesMap.clear();
- int newSegmentMapCapacity =
HashUtil.getHashMapCapacity(newSegmentCreationTimeMap.size());
- _newSegmentStateMap = new HashMap<>(newSegmentMapCapacity);
-
- Map<String, Map<String, String>> idealStateAssignment =
idealState.getRecord().getMapFields();
- Map<String, Map<String, String>> externalViewAssignment =
externalView.getRecord().getMapFields();
-
- // Get the online instances for the segments
- Map<String, Set<String>> oldSegmentToOnlineInstancesMap =
- new HashMap<>(HashUtil.getHashMapCapacity(onlineSegments.size()));
- Map<String, Set<String>> newSegmentToOnlineInstancesMap = new
HashMap<>(newSegmentMapCapacity);
- for (String segment : onlineSegments) {
- Map<String, String> idealStateInstanceStateMap =
idealStateAssignment.get(segment);
- assert idealStateInstanceStateMap != null;
- Map<String, String> externalViewInstanceStateMap =
externalViewAssignment.get(segment);
- Set<String> onlineInstances;
- if (externalViewInstanceStateMap == null) {
- onlineInstances = Collections.emptySet();
- } else {
- onlineInstances = getOnlineInstances(idealStateInstanceStateMap,
externalViewInstanceStateMap);
- }
- if (newSegmentCreationTimeMap.containsKey(segment)) {
- newSegmentToOnlineInstancesMap.put(segment, onlineInstances);
- } else {
- oldSegmentToOnlineInstancesMap.put(segment, onlineInstances);
- }
- }
-
- // Calculate the unavailable instances based on the old segments' online
instances for each combination of instances
- // in the ideal state
- Map<Set<String>, Set<String>> unavailableInstancesMap = new HashMap<>();
- for (Map.Entry<String, Set<String>> entry :
oldSegmentToOnlineInstancesMap.entrySet()) {
- String segment = entry.getKey();
- Set<String> onlineInstances = entry.getValue();
- Map<String, String> idealStateInstanceStateMap =
idealStateAssignment.get(segment);
- Set<String> instancesInIdealState = idealStateInstanceStateMap.keySet();
- Set<String> unavailableInstances =
- unavailableInstancesMap.computeIfAbsent(instancesInIdealState, k ->
new HashSet<>());
- for (String instance : instancesInIdealState) {
- if (!onlineInstances.contains(instance)) {
- if (unavailableInstances.add(instance)) {
- LOGGER.warn(
- "Found unavailable instance: {} in instance group: {} for
segment: {}, table: {} (IS: {}, EV: {})",
- instance, instancesInIdealState, segment, _tableNameWithType,
idealStateInstanceStateMap,
- externalViewAssignment.get(segment));
- }
- }
- }
- }
-
- // Iterate over the maps and exclude the unavailable instances
- for (Map.Entry<String, Set<String>> entry :
oldSegmentToOnlineInstancesMap.entrySet()) {
- String segment = entry.getKey();
- // NOTE: onlineInstances is either a TreeSet or an EmptySet (sorted)
- Set<String> onlineInstances = entry.getValue();
- Map<String, String> idealStateInstanceStateMap =
idealStateAssignment.get(segment);
- Set<String> unavailableInstances =
unavailableInstancesMap.get(idealStateInstanceStateMap.keySet());
- List<SegmentInstanceCandidate> candidates = new
ArrayList<>(onlineInstances.size());
- for (String instance : onlineInstances) {
- if (!unavailableInstances.contains(instance)) {
- candidates.add(new SegmentInstanceCandidate(instance, true,
getPool(instance)));
- }
- }
- _oldSegmentCandidatesMap.put(segment, candidates);
- }
-
- for (Map.Entry<String, Set<String>> entry :
newSegmentToOnlineInstancesMap.entrySet()) {
- String segment = entry.getKey();
- Set<String> onlineInstances = entry.getValue();
- Map<String, String> idealStateInstanceStateMap =
idealStateAssignment.get(segment);
- Set<String> unavailableInstances =
-
unavailableInstancesMap.getOrDefault(idealStateInstanceStateMap.keySet(),
Collections.emptySet());
- List<SegmentInstanceCandidate> candidates = new
ArrayList<>(idealStateInstanceStateMap.size());
- for (String instance :
convertToSortedMap(idealStateInstanceStateMap).keySet()) {
- if (!unavailableInstances.contains(instance)) {
- candidates.add(new SegmentInstanceCandidate(instance,
onlineInstances.contains(instance),
- getPool(instance)));
- }
- }
- _newSegmentStateMap.put(segment, new
NewSegmentState(newSegmentCreationTimeMap.get(segment), candidates));
- }
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("Got _newSegmentStateMap: {}, _oldSegmentCandidatesMap:
{}", _newSegmentStateMap.keySet(),
- _oldSegmentCandidatesMap.keySet());
- }
+ super.updateSegmentMapsForUpsertTable(idealState, externalView,
onlineSegments, newSegmentCreationTimeMap);
}
}
diff --git
a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorTest.java
b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorTest.java
index 2ec94332fc3..668332e0f68 100644
---
a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorTest.java
+++
b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorTest.java
@@ -158,11 +158,13 @@ public class InstanceSelectorTest {
STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE);
}
- private InstanceSelector createTestInstanceSelector(String selectorType) {
+ private InstanceSelector createTestInstanceSelector(String selectorType,
Set<String> enabledInstances,
+ IdealState idealState, ExternalView externalView, Set<String>
onlineSegments) {
RoutingConfig config = new RoutingConfig(null, null, selectorType, false);
when(_tableConfig.getRoutingConfig()).thenReturn(config);
return InstanceSelectorFactory.getInstanceSelector(_tableConfig,
_propertyStore, _brokerMetrics, null,
- _mutableClock, new PinotConfiguration());
+ _mutableClock, new PinotConfiguration(), enabledInstances,
EMPTY_SERVER_MAP, idealState, externalView,
+ onlineSegments);
}
@DataProvider(name = "selectorType")
@@ -196,37 +198,49 @@ public class InstanceSelectorTest {
when(tableConfig.getTableName()).thenReturn("testTable_OFFLINE");
// Routing config is missing
- assertTrue(InstanceSelectorFactory.getInstanceSelector(tableConfig,
propertyStore, brokerMetrics,
- new PinotConfiguration()) instanceof BalancedInstanceSelector);
+ assertTrue(
+ InstanceSelectorFactory.getInstanceSelector(tableConfig,
propertyStore, brokerMetrics, new PinotConfiguration(),
+ Set.of(), Map.of(), new IdealState("testTable_OFFLINE"), new
ExternalView("testTable_OFFLINE"),
+ Set.of()) instanceof BalancedInstanceSelector);
// Instance selector type is not configured
RoutingConfig routingConfig = mock(RoutingConfig.class);
when(tableConfig.getRoutingConfig()).thenReturn(routingConfig);
- assertTrue(InstanceSelectorFactory.getInstanceSelector(tableConfig,
propertyStore, brokerMetrics,
- new PinotConfiguration()) instanceof BalancedInstanceSelector);
+ assertTrue(
+ InstanceSelectorFactory.getInstanceSelector(tableConfig,
propertyStore, brokerMetrics, new PinotConfiguration(),
+ Set.of(), Map.of(), new IdealState("testTable_OFFLINE"), new
ExternalView("testTable_OFFLINE"),
+ Set.of()) instanceof BalancedInstanceSelector);
// Replica-group instance selector should be returned
when(routingConfig.getInstanceSelectorType()).thenReturn(REPLICA_GROUP_INSTANCE_SELECTOR_TYPE);
- assertTrue(InstanceSelectorFactory.getInstanceSelector(tableConfig,
propertyStore, brokerMetrics,
- new PinotConfiguration()) instanceof ReplicaGroupInstanceSelector);
+ assertTrue(
+ InstanceSelectorFactory.getInstanceSelector(tableConfig,
propertyStore, brokerMetrics, new PinotConfiguration(),
+ Set.of(), Map.of(), new IdealState("testTable_OFFLINE"), new
ExternalView("testTable_OFFLINE"),
+ Set.of()) instanceof ReplicaGroupInstanceSelector);
// Strict replica-group instance selector should be returned
when(routingConfig.getInstanceSelectorType()).thenReturn(STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE);
- assertTrue(InstanceSelectorFactory.getInstanceSelector(tableConfig,
propertyStore, brokerMetrics,
- new PinotConfiguration()) instanceof
StrictReplicaGroupInstanceSelector);
+ assertTrue(
+ InstanceSelectorFactory.getInstanceSelector(tableConfig,
propertyStore, brokerMetrics, new PinotConfiguration(),
+ Set.of(), Map.of(), new IdealState("testTable_OFFLINE"), new
ExternalView("testTable_OFFLINE"),
+ Set.of()) instanceof StrictReplicaGroupInstanceSelector);
// Should be backward-compatible with legacy config
when(routingConfig.getInstanceSelectorType()).thenReturn(null);
when(tableConfig.getTableType()).thenReturn(TableType.OFFLINE);
when(routingConfig.getRoutingTableBuilderName()).thenReturn(
InstanceSelectorFactory.LEGACY_REPLICA_GROUP_OFFLINE_ROUTING);
- assertTrue(InstanceSelectorFactory.getInstanceSelector(tableConfig,
propertyStore, brokerMetrics,
- new PinotConfiguration()) instanceof ReplicaGroupInstanceSelector);
+ assertTrue(
+ InstanceSelectorFactory.getInstanceSelector(tableConfig,
propertyStore, brokerMetrics, new PinotConfiguration(),
+ Set.of(), Map.of(), new IdealState("testTable_OFFLINE"), new
ExternalView("testTable_OFFLINE"),
+ Set.of()) instanceof ReplicaGroupInstanceSelector);
when(tableConfig.getTableType()).thenReturn(TableType.REALTIME);
when(routingConfig.getRoutingTableBuilderName()).thenReturn(
InstanceSelectorFactory.LEGACY_REPLICA_GROUP_REALTIME_ROUTING);
- assertTrue(InstanceSelectorFactory.getInstanceSelector(tableConfig,
propertyStore, brokerMetrics,
- new PinotConfiguration()) instanceof ReplicaGroupInstanceSelector);
+ assertTrue(
+ InstanceSelectorFactory.getInstanceSelector(tableConfig,
propertyStore, brokerMetrics, new PinotConfiguration(),
+ Set.of(), Map.of(), new IdealState("testTable_OFFLINE"), new
ExternalView("testTable_OFFLINE"),
+ Set.of()) instanceof ReplicaGroupInstanceSelector);
}
@Test
@@ -234,15 +248,9 @@ public class InstanceSelectorTest {
String offlineTableName = "testTable_OFFLINE";
ZkHelixPropertyStore<ZNRecord> propertyStore =
mock(ZkHelixPropertyStore.class);
BrokerMetrics brokerMetrics = mock(BrokerMetrics.class);
- BalancedInstanceSelector balancedInstanceSelector =
- new BalancedInstanceSelector(offlineTableName, propertyStore,
brokerMetrics, null,
- Clock.systemUTC(), INSTANCE_SELECTOR_CONFIG);
- ReplicaGroupInstanceSelector replicaGroupInstanceSelector =
- new ReplicaGroupInstanceSelector(offlineTableName, propertyStore,
brokerMetrics, null,
- Clock.systemUTC(), INSTANCE_SELECTOR_CONFIG);
- StrictReplicaGroupInstanceSelector strictReplicaGroupInstanceSelector =
- new StrictReplicaGroupInstanceSelector(offlineTableName,
propertyStore, brokerMetrics, null,
- Clock.systemUTC(), INSTANCE_SELECTOR_CONFIG);
+ BalancedInstanceSelector balancedInstanceSelector = new
BalancedInstanceSelector();
+ ReplicaGroupInstanceSelector replicaGroupInstanceSelector = new
ReplicaGroupInstanceSelector();
+ StrictReplicaGroupInstanceSelector strictReplicaGroupInstanceSelector =
new StrictReplicaGroupInstanceSelector();
Set<String> enabledInstances = new HashSet<>();
IdealState idealState = new IdealState(offlineTableName);
@@ -306,10 +314,12 @@ public class InstanceSelectorTest {
onlineSegments.add(segment3);
List<String> segments = Arrays.asList(segment0, segment1, segment2,
segment3);
- balancedInstanceSelector.init(enabledInstances, EMPTY_SERVER_MAP,
idealState, externalView, onlineSegments);
- replicaGroupInstanceSelector.init(enabledInstances, EMPTY_SERVER_MAP,
idealState, externalView, onlineSegments);
- strictReplicaGroupInstanceSelector.init(enabledInstances,
EMPTY_SERVER_MAP, idealState, externalView,
- onlineSegments);
+ balancedInstanceSelector.init(_tableConfig, propertyStore, brokerMetrics,
null, Clock.systemUTC(),
+ INSTANCE_SELECTOR_CONFIG, enabledInstances, EMPTY_SERVER_MAP,
idealState, externalView, onlineSegments);
+ replicaGroupInstanceSelector.init(_tableConfig, propertyStore,
brokerMetrics, null, Clock.systemUTC(),
+ INSTANCE_SELECTOR_CONFIG, enabledInstances, EMPTY_SERVER_MAP,
idealState, externalView, onlineSegments);
+ strictReplicaGroupInstanceSelector.init(_tableConfig, propertyStore,
brokerMetrics, null, Clock.systemUTC(),
+ INSTANCE_SELECTOR_CONFIG, enabledInstances, EMPTY_SERVER_MAP,
idealState, externalView, onlineSegments);
int requestId = 0;
@@ -762,9 +772,7 @@ public class InstanceSelectorTest {
when(brokerRequest.getPinotQuery()).thenReturn(pinotQuery);
when(pinotQuery.getQueryOptions()).thenReturn(queryOptions);
- ReplicaGroupInstanceSelector replicaGroupInstanceSelector =
- new ReplicaGroupInstanceSelector(offlineTableName, propertyStore,
brokerMetrics, null,
- Clock.systemUTC(), INSTANCE_SELECTOR_CONFIG);
+ ReplicaGroupInstanceSelector replicaGroupInstanceSelector = new
ReplicaGroupInstanceSelector();
Set<String> enabledInstances = new HashSet<>();
IdealState idealState = new IdealState(offlineTableName);
@@ -799,7 +807,8 @@ public class InstanceSelectorTest {
onlineSegments.add(segment);
}
- replicaGroupInstanceSelector.init(enabledInstances, EMPTY_SERVER_MAP,
idealState, externalView, onlineSegments);
+ replicaGroupInstanceSelector.init(_tableConfig, propertyStore,
brokerMetrics, null, Clock.systemUTC(),
+ INSTANCE_SELECTOR_CONFIG, enabledInstances, EMPTY_SERVER_MAP,
idealState, externalView, onlineSegments);
// ReplicaGroupInstanceSelector
// segment0 -> instance0
// segment2 -> instance0
@@ -845,9 +854,7 @@ public class InstanceSelectorTest {
when(brokerRequest.getPinotQuery()).thenReturn(pinotQuery);
when(pinotQuery.getQueryOptions()).thenReturn(queryOptions);
- ReplicaGroupInstanceSelector replicaGroupInstanceSelector =
- new ReplicaGroupInstanceSelector(offlineTableName, propertyStore,
brokerMetrics, null,
- Clock.systemUTC(), INSTANCE_SELECTOR_CONFIG);
+ ReplicaGroupInstanceSelector replicaGroupInstanceSelector = new
ReplicaGroupInstanceSelector();
Set<String> enabledInstances = new HashSet<>();
IdealState idealState = new IdealState(offlineTableName);
@@ -883,7 +890,8 @@ public class InstanceSelectorTest {
onlineSegments.add(segment);
}
- replicaGroupInstanceSelector.init(enabledInstances, EMPTY_SERVER_MAP,
idealState, externalView, onlineSegments);
+ replicaGroupInstanceSelector.init(_tableConfig, propertyStore,
brokerMetrics, null, Clock.systemUTC(),
+ INSTANCE_SELECTOR_CONFIG, enabledInstances, EMPTY_SERVER_MAP,
idealState, externalView, onlineSegments);
// ReplicaGroupInstanceSelector
// segment0 -> instance0
// segment3 -> instance0
@@ -928,9 +936,7 @@ public class InstanceSelectorTest {
when(brokerRequest.getPinotQuery()).thenReturn(pinotQuery);
when(pinotQuery.getQueryOptions()).thenReturn(queryOptions);
- ReplicaGroupInstanceSelector replicaGroupInstanceSelector =
- new ReplicaGroupInstanceSelector(offlineTableName, propertyStore,
brokerMetrics, null,
- Clock.systemUTC(), INSTANCE_SELECTOR_CONFIG);
+ ReplicaGroupInstanceSelector replicaGroupInstanceSelector = new
ReplicaGroupInstanceSelector();
Set<String> enabledInstances = new HashSet<>();
IdealState idealState = new IdealState(offlineTableName);
@@ -966,7 +972,8 @@ public class InstanceSelectorTest {
onlineSegments.add(segment);
}
- replicaGroupInstanceSelector.init(enabledInstances, EMPTY_SERVER_MAP,
idealState, externalView, onlineSegments);
+ replicaGroupInstanceSelector.init(_tableConfig, propertyStore,
brokerMetrics, null, Clock.systemUTC(),
+ INSTANCE_SELECTOR_CONFIG, enabledInstances, EMPTY_SERVER_MAP,
idealState, externalView, onlineSegments);
// since numReplicaGroupsToQuery is not set, first query should go to
first replica group,
// 2nd query should go to next replica group
@@ -989,13 +996,9 @@ public class InstanceSelectorTest {
String offlineTableName = "testTable_OFFLINE";
ZkHelixPropertyStore<ZNRecord> propertyStore =
mock(ZkHelixPropertyStore.class);
BrokerMetrics brokerMetrics = mock(BrokerMetrics.class);
- BalancedInstanceSelector balancedInstanceSelector =
- new BalancedInstanceSelector(offlineTableName, propertyStore,
brokerMetrics, null,
- Clock.systemUTC(), INSTANCE_SELECTOR_CONFIG);
+ BalancedInstanceSelector balancedInstanceSelector = new
BalancedInstanceSelector();
// ReplicaGroupInstanceSelector has the same behavior as
BalancedInstanceSelector for the unavailable segments
- StrictReplicaGroupInstanceSelector strictReplicaGroupInstanceSelector =
- new StrictReplicaGroupInstanceSelector(offlineTableName,
propertyStore, brokerMetrics, null,
- Clock.systemUTC(), INSTANCE_SELECTOR_CONFIG);
+ StrictReplicaGroupInstanceSelector strictReplicaGroupInstanceSelector =
new StrictReplicaGroupInstanceSelector();
Set<String> enabledInstances = new HashSet<>();
IdealState idealState = new IdealState(offlineTableName);
@@ -1036,9 +1039,10 @@ public class InstanceSelectorTest {
// (disabled) errorInstance: ERROR
// }
// }
- balancedInstanceSelector.init(enabledInstances, EMPTY_SERVER_MAP,
idealState, externalView, onlineSegments);
- strictReplicaGroupInstanceSelector.init(enabledInstances,
EMPTY_SERVER_MAP, idealState, externalView,
- onlineSegments);
+ balancedInstanceSelector.init(_tableConfig, propertyStore, brokerMetrics,
null, Clock.systemUTC(),
+ INSTANCE_SELECTOR_CONFIG, enabledInstances, EMPTY_SERVER_MAP,
idealState, externalView, onlineSegments);
+ strictReplicaGroupInstanceSelector.init(_tableConfig, propertyStore,
brokerMetrics, null, Clock.systemUTC(),
+ INSTANCE_SELECTOR_CONFIG, enabledInstances, EMPTY_SERVER_MAP,
idealState, externalView, onlineSegments);
BrokerRequest brokerRequest = mock(BrokerRequest.class);
PinotQuery pinotQuery = mock(PinotQuery.class);
when(brokerRequest.getPinotQuery()).thenReturn(pinotQuery);
@@ -1310,9 +1314,8 @@ public class InstanceSelectorTest {
List.of(Pair.of(instance1, ONLINE)));
ExternalView externalView = createExternalView(externalViewMap);
- InstanceSelector selector = createTestInstanceSelector(selectorType);
-
- selector.init(enabledInstances, EMPTY_SERVER_MAP, idealState,
externalView, onlineSegments);
+ InstanceSelector selector =
+ createTestInstanceSelector(selectorType, enabledInstances, idealState,
externalView, onlineSegments);
{
int requestId = 0;
@@ -1356,7 +1359,8 @@ public class InstanceSelectorTest {
// Advance the clock to make newSeg to old segment.
_mutableClock.fastForward(Duration.ofMillis(NEW_SEGMENT_EXPIRATION_MILLIS
+ 10));
// Upon re-initialization, newly old segments can only be served from
online instances: instance1
- selector.init(enabledInstances, EMPTY_SERVER_MAP, idealState,
externalView, onlineSegments);
+ selector.init(_tableConfig, _propertyStore, _brokerMetrics, null,
_mutableClock,
+ INSTANCE_SELECTOR_CONFIG, enabledInstances, EMPTY_SERVER_MAP,
idealState, externalView, onlineSegments);
{
int requestId = 0;
InstanceSelector.SelectionResult selectionResult =
@@ -1423,8 +1427,8 @@ public class InstanceSelectorTest {
ExternalView externalView = createExternalView(externalViewMap);
- InstanceSelector selector = createTestInstanceSelector(selectorType);
- selector.init(enabledInstances, EMPTY_SERVER_MAP, idealState,
externalView, onlineSegments);
+ InstanceSelector selector =
+ createTestInstanceSelector(selectorType, enabledInstances, idealState,
externalView, onlineSegments);
// We don't mark segment as unavailable.
int requestId = 0;
Map<String, String> expectedResult = Map.of(oldSeg, instance0);
@@ -1436,7 +1440,7 @@ public class InstanceSelectorTest {
// Advance the clock to make newSeg to old segment and we see newSeg is
reported as unavailable segment.
_mutableClock.fastForward(Duration.ofMillis(NEW_SEGMENT_EXPIRATION_MILLIS
+ 10));
- selector.init(enabledInstances, EMPTY_SERVER_MAP, idealState,
externalView, onlineSegments);
+ selector = createTestInstanceSelector(selectorType, enabledInstances,
idealState, externalView, onlineSegments);
selectionResult = selector.select(_brokerRequest,
Lists.newArrayList(onlineSegments), requestId);
if (STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE.equals(selectorType)) {
expectedResult = Map.of();
@@ -1483,8 +1487,8 @@ public class InstanceSelectorTest {
ExternalView externalView = createExternalView(externalViewMap);
- InstanceSelector selector = createTestInstanceSelector(selectorType);
- selector.init(enabledInstances, EMPTY_SERVER_MAP, idealState,
externalView, onlineSegments);
+ InstanceSelector selector =
+ createTestInstanceSelector(selectorType, enabledInstances, idealState,
externalView, onlineSegments);
// We don't mark segment as unavailable.
int requestId = 0;
@@ -1564,8 +1568,8 @@ public class InstanceSelectorTest {
ExternalView externalView = createExternalView(externalViewMap);
- InstanceSelector selector = createTestInstanceSelector(selectorType);
- selector.init(enabledInstances, EMPTY_SERVER_MAP, idealState,
externalView, onlineSegments);
+ InstanceSelector selector =
+ createTestInstanceSelector(selectorType, enabledInstances, idealState,
externalView, onlineSegments);
// We don't mark segment as unavailable.
int requestId = 0;
@@ -1633,8 +1637,8 @@ public class InstanceSelectorTest {
ExternalView externalView = createExternalView(externalViewMap);
- InstanceSelector selector = createTestInstanceSelector(selectorType);
- selector.init(enabledInstances, EMPTY_SERVER_MAP, idealState,
externalView, onlineSegments);
+ InstanceSelector selector =
+ createTestInstanceSelector(selectorType, enabledInstances, idealState,
externalView, onlineSegments);
// Add a new segment to ideal state with missing external view.
String newSeg = "segment2";
@@ -1719,8 +1723,8 @@ public class InstanceSelectorTest {
ExternalView externalView = createExternalView(externalViewMap);
- InstanceSelector selector = createTestInstanceSelector(selectorType);
- selector.init(enabledInstances, EMPTY_SERVER_MAP, idealState,
externalView, onlineSegments);
+ InstanceSelector selector =
+ createTestInstanceSelector(selectorType, enabledInstances, idealState,
externalView, onlineSegments);
// First selection, we select instance1 for newSeg.
int requestId = 0;
@@ -1791,8 +1795,8 @@ public class InstanceSelectorTest {
ExternalView externalView = createExternalView(externalViewMap);
- InstanceSelector selector = createTestInstanceSelector(selectorType);
- selector.init(enabledInstances, EMPTY_SERVER_MAP, idealState,
externalView, onlineSegments);
+ InstanceSelector selector =
+ createTestInstanceSelector(selectorType, enabledInstances, idealState,
externalView, onlineSegments);
// No selection because the external view is not in ideal state.
int requestId = 0;
@@ -1837,8 +1841,8 @@ public class InstanceSelectorTest {
ExternalView externalView = createExternalView(externalViewMap);
- InstanceSelector selector = createTestInstanceSelector(selectorType);
- selector.init(enabledInstances, EMPTY_SERVER_MAP, idealState,
externalView, onlineSegments);
+ InstanceSelector selector =
+ createTestInstanceSelector(selectorType, enabledInstances, idealState,
externalView, onlineSegments);
// We don't mark segment as unavailable.
int requestId = 0;
@@ -1853,13 +1857,10 @@ public class InstanceSelectorTest {
@Test
public void testReplicaGroupAdaptiveServerSelector() {
// Arrange
- String offlineTableName = "testTable_OFFLINE";
ZkHelixPropertyStore<ZNRecord> propertyStore =
mock(ZkHelixPropertyStore.class);
BrokerMetrics brokerMetrics = mock(BrokerMetrics.class);
HybridSelector hybridSelector = mock(HybridSelector.class);
- ReplicaGroupInstanceSelector instanceSelector =
- new ReplicaGroupInstanceSelector(offlineTableName, propertyStore,
brokerMetrics, hybridSelector,
- Clock.systemUTC(), INSTANCE_SELECTOR_CONFIG);
+ ReplicaGroupInstanceSelector instanceSelector = new
ReplicaGroupInstanceSelector();
// Define instances and segments
String instance0 = "instance0";
@@ -1884,6 +1885,20 @@ public class InstanceSelectorTest {
instanceCandidatesMap.put(segment2,
Arrays.asList(new SegmentInstanceCandidate(instance4, true), new
SegmentInstanceCandidate(instance3, true)));
+ IdealState idealState = createIdealState(
+ Map.of(segment0, List.of(Pair.of(instance0, ONLINE),
Pair.of(instance1, ONLINE)), segment1,
+ List.of(Pair.of(instance2, ONLINE), Pair.of(instance3, ONLINE)),
segment2,
+ List.of(Pair.of(instance3, ONLINE), Pair.of(instance4, ONLINE))));
+
+ ExternalView externalView = createExternalView(
+ Map.of(segment0, List.of(Pair.of(instance0, ONLINE),
Pair.of(instance1, ONLINE)), segment1,
+ List.of(Pair.of(instance2, ONLINE), Pair.of(instance3, ONLINE)),
segment2,
+ List.of(Pair.of(instance3, ONLINE), Pair.of(instance4, ONLINE))));
+
+ instanceSelector.init(_tableConfig, propertyStore, brokerMetrics,
hybridSelector, Clock.systemUTC(),
+ INSTANCE_SELECTOR_CONFIG, Set.of(instance0, instance1, instance2,
instance3, instance4), EMPTY_SERVER_MAP,
+ idealState, externalView, new HashSet<>(segments));
+
// Define the segment states
SegmentStates segmentStates = new SegmentStates(instanceCandidatesMap, new
HashSet<>(segments), null);
diff --git
a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/instanceselector/MultiStageReplicaGroupSelectorTest.java
b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/instanceselector/MultiStageReplicaGroupSelectorTest.java
index cad8f3fa524..3ee4159eb6b 100644
---
a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/instanceselector/MultiStageReplicaGroupSelectorTest.java
+++
b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/instanceselector/MultiStageReplicaGroupSelectorTest.java
@@ -39,6 +39,7 @@ import org.apache.pinot.common.request.PinotQuery;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.common.utils.UploadedRealtimeSegmentName;
import org.apache.pinot.core.transport.ServerInstance;
+import org.apache.pinot.spi.config.table.TableConfig;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.testng.annotations.AfterMethod;
@@ -59,7 +60,10 @@ public class MultiStageReplicaGroupSelectorTest {
private final static List<String> SEGMENTS =
Arrays.asList("segment0", "segment1", "segment2", "segment3",
"segment4", "segment5", "segment6", "segment7",
"segment8", "segment9", "segment10", "segment11");
- private static final Map<String, ServerInstance> EMPTY_SERVER_MAP =
Collections.EMPTY_MAP;
+
+ private static final Map<String, ServerInstance> EMPTY_SERVER_MAP =
Collections.emptyMap();
+
+ private static final InstanceSelectorConfig INSTANCE_SELECTOR_CONFIG = new
InstanceSelectorConfig(false, 300, false);
private AutoCloseable _mocks;
@Mock
private ZkHelixPropertyStore<ZNRecord> _propertyStore;
@@ -69,6 +73,8 @@ public class MultiStageReplicaGroupSelectorTest {
private BrokerRequest _brokerRequest;
@Mock
private PinotQuery _pinotQuery;
+ @Mock
+ private TableConfig _tableConfig;
private static List<String> getSegments() {
return SEGMENTS;
@@ -83,6 +89,7 @@ public class MultiStageReplicaGroupSelectorTest {
_mocks = MockitoAnnotations.openMocks(this);
when(_brokerRequest.getPinotQuery()).thenReturn(_pinotQuery);
when(_pinotQuery.getQueryOptions()).thenReturn(null);
+ when(_tableConfig.getTableName()).thenReturn(TABLE_NAME);
}
@AfterMethod
@@ -104,7 +111,8 @@ public class MultiStageReplicaGroupSelectorTest {
Set<String> onlineSegments = new HashSet<>();
setupBasicTestEnvironment(enabledInstances, idealState, externalView,
onlineSegments);
- multiStageSelector.init(new HashSet<>(enabledInstances), EMPTY_SERVER_MAP,
idealState, externalView,
+ multiStageSelector.init(_tableConfig, _propertyStore, _brokerMetrics,
null, Clock.systemUTC(),
+ INSTANCE_SELECTOR_CONFIG, new HashSet<>(enabledInstances),
EMPTY_SERVER_MAP, idealState, externalView,
onlineSegments);
// Using requestId=0 should select replica-group 0. Even segments get
assigned to instance-0 and odd segments get
@@ -167,12 +175,14 @@ public class MultiStageReplicaGroupSelectorTest {
Set<String> onlineSegments = new HashSet<>();
setupBasicTestEnvironment(enabledInstances, idealState, externalView,
onlineSegments);
- multiStageSelector.init(new HashSet<>(enabledInstances), EMPTY_SERVER_MAP,
idealState, externalView,
+ multiStageSelector.init(_tableConfig, _propertyStore, _brokerMetrics,
null, Clock.systemUTC(),
+ INSTANCE_SELECTOR_CONFIG, new HashSet<>(enabledInstances),
EMPTY_SERVER_MAP, idealState, externalView,
onlineSegments);
// If instance-0 is down, replica-group 1 should be picked even with
requestId=0
enabledInstances.remove("instance-0");
- multiStageSelector.init(new HashSet<>(enabledInstances), EMPTY_SERVER_MAP,
idealState, externalView,
+ multiStageSelector.init(_tableConfig, _propertyStore, _brokerMetrics,
null, Clock.systemUTC(),
+ INSTANCE_SELECTOR_CONFIG, new HashSet<>(enabledInstances),
EMPTY_SERVER_MAP, idealState, externalView,
onlineSegments);
Map<String, String> expectedSelectorResult =
createExpectedAssignment(replicaGroup1, getSegments());
InstanceSelector.SelectionResult selectionResult =
multiStageSelector.select(_brokerRequest, getSegments(), 0);
@@ -180,7 +190,8 @@ public class MultiStageReplicaGroupSelectorTest {
// If instance-2 also goes down, no replica-group is eligible
enabledInstances.remove("instance-2");
- multiStageSelector.init(new HashSet<>(enabledInstances), EMPTY_SERVER_MAP,
idealState, externalView,
+ multiStageSelector.init(_tableConfig, _propertyStore, _brokerMetrics,
null, Clock.systemUTC(),
+ INSTANCE_SELECTOR_CONFIG, new HashSet<>(enabledInstances),
EMPTY_SERVER_MAP, idealState, externalView,
onlineSegments);
try {
multiStageSelector.select(_brokerRequest, getSegments(), 0);
@@ -216,7 +227,8 @@ public class MultiStageReplicaGroupSelectorTest {
externalView.getRecord().getMapFields().get("segment1").put("instance-1",
"ERROR");
externalView.getRecord().getMapFields().get("segment2").put("instance-2",
"ERROR");
- multiStageSelector.init(new HashSet<>(enabledInstances), EMPTY_SERVER_MAP,
idealState, externalView,
+ multiStageSelector.init(_tableConfig, _propertyStore, _brokerMetrics,
null, Clock.systemUTC(),
+ INSTANCE_SELECTOR_CONFIG, new HashSet<>(enabledInstances),
EMPTY_SERVER_MAP, idealState, externalView,
onlineSegments);
// Even though instance-0 and instance-3 belong to different replica
groups, they handle exclusive sets of segments
@@ -235,7 +247,8 @@ public class MultiStageReplicaGroupSelectorTest {
// If instance-3 has an error segment as well, there is no replica group
available to serve complete set of
// segments.
externalView.getRecord().getMapFields().get("segment3").put("instance-3",
"ERROR");
- multiStageSelector.init(new HashSet<>(enabledInstances), EMPTY_SERVER_MAP,
idealState, externalView,
+ multiStageSelector.init(_tableConfig, _propertyStore, _brokerMetrics,
null, Clock.systemUTC(),
+ INSTANCE_SELECTOR_CONFIG, new HashSet<>(enabledInstances),
EMPTY_SERVER_MAP, idealState, externalView,
onlineSegments);
try {
multiStageSelector.select(_brokerRequest, getSegments(), 0);
@@ -265,7 +278,8 @@ public class MultiStageReplicaGroupSelectorTest {
Set<String> onlineSegments = new HashSet<>();
setupBasicTestEnvironment(enabledInstances, idealState, externalView,
onlineSegments);
- multiStageSelector.init(new HashSet<>(enabledInstances), EMPTY_SERVER_MAP,
idealState, externalView,
+ multiStageSelector.init(_tableConfig, _propertyStore, _brokerMetrics,
null, Clock.systemUTC(),
+ INSTANCE_SELECTOR_CONFIG, new HashSet<>(enabledInstances),
EMPTY_SERVER_MAP, idealState, externalView,
onlineSegments);
InstanceSelector.SelectionResult selectionResult =
multiStageSelector.select(_brokerRequest, getSegments(), 0);
@@ -279,9 +293,7 @@ public class MultiStageReplicaGroupSelectorTest {
}
private MultiStageReplicaGroupSelector
createMultiStageSelector(InstancePartitions instancePartitions) {
- MultiStageReplicaGroupSelector multiStageSelector =
- new MultiStageReplicaGroupSelector(TABLE_NAME, _propertyStore,
_brokerMetrics, null, Clock.systemUTC(),
- new InstanceSelectorConfig(false, 300, false));
+ MultiStageReplicaGroupSelector multiStageSelector = new
MultiStageReplicaGroupSelector();
multiStageSelector = spy(multiStageSelector);
doReturn(instancePartitions).when(multiStageSelector).getInstancePartitions();
return multiStageSelector;
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/RoutingConfig.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/RoutingConfig.java
index 65b1b00a48e..b3cde3ea82b 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/RoutingConfig.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/RoutingConfig.java
@@ -29,10 +29,11 @@ public class RoutingConfig extends BaseJsonConfig {
public static final String PARTITION_SEGMENT_PRUNER_TYPE = "partition";
public static final String TIME_SEGMENT_PRUNER_TYPE = "time";
public static final String EMPTY_SEGMENT_PRUNER_TYPE = "empty";
- public static final String DEFAULT_INSTANCE_SELECTOR_TYPE = "balanced";
+ public static final String BALANCED_INSTANCE_SELECTOR_TYPE = "balanced";
public static final String REPLICA_GROUP_INSTANCE_SELECTOR_TYPE =
"replicaGroup";
public static final String STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE =
"strictReplicaGroup";
public static final String MULTI_STAGE_REPLICA_GROUP_SELECTOR_TYPE =
"multiStageReplicaGroup";
+ public static final String DEFAULT_INSTANCE_SELECTOR_TYPE =
BALANCED_INSTANCE_SELECTOR_TYPE;
// Replaced by _segmentPrunerTypes and _instanceSelectorType
@Deprecated
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]