This is an automated email from the ASF dual-hosted git repository. xbli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 3bd74b36e4 Allow optional segments that can be skipped by servers without failing the query (#11978) 3bd74b36e4 is described below commit 3bd74b36e4fad0ba35579854339a07d86f5bebd5 Author: Xiaobing <61892277+klsi...@users.noreply.github.com> AuthorDate: Tue Nov 28 17:00:27 2023 -0800 Allow optional segments that can be skipped by servers without failing the query (#11978) * collect optional segments and pass to servers to handle instead of simply skipped at broker side, which could lead to wrong query results particularly for upsert tables --- .../broker/api/resources/PinotBrokerDebug.java | 90 ++++++++-- .../requesthandler/BaseBrokerRequestHandler.java | 26 +-- .../requesthandler/GrpcBrokerRequestHandler.java | 19 ++- .../MultiStageBrokerRequestHandler.java | 9 +- .../SingleConnectionBrokerRequestHandler.java | 8 +- .../pinot/broker/routing/BrokerRoutingManager.java | 35 +++- .../instanceselector/BalancedInstanceSelector.java | 17 +- .../instanceselector/BaseInstanceSelector.java | 22 ++- .../routing/instanceselector/InstanceSelector.java | 21 ++- .../MultiStageReplicaGroupSelector.java | 20 ++- .../ReplicaGroupInstanceSelector.java | 40 +++-- .../StrictReplicaGroupInstanceSelector.java | 4 + .../broker/broker/HelixBrokerStarterTest.java | 9 +- .../BaseBrokerRequestHandlerTest.java | 9 +- .../instanceselector/InstanceSelectorTest.java | 53 +++--- .../pinot/common/request/InstanceRequest.java | 186 +++++++++++++++++++-- pinot-common/src/thrift/request.thrift | 1 + .../common/reader/PinotServerDataFetcher.scala | 11 +- .../core/data/manager/BaseTableDataManager.java | 15 ++ .../query/executor/ServerQueryExecutorV1Impl.java | 8 +- .../pinot/core/query/logger/ServerQueryLogger.java | 2 +- .../core/query/request/ServerQueryRequest.java | 8 + .../apache/pinot/core/routing/RoutingTable.java | 13 +- .../apache/pinot/core/transport/QueryRouter.java | 24 ++- .../pinot/core/transport/QueryRoutingTest.java | 5 +- .../apache/pinot/query/routing/WorkerManager.java | 12 +- .../query/testutils/MockRoutingManagerFactory.java | 12 +- .../testutils/MockInstanceDataManagerFactory.java | 4 +- .../local/data/manager/TableDataManager.java | 5 + 29 files changed, 522 insertions(+), 166 deletions(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotBrokerDebug.java b/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotBrokerDebug.java index 30e995298c..f9799d7e75 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotBrokerDebug.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotBrokerDebug.java @@ -27,10 +27,12 @@ import io.swagger.annotations.ApiResponses; import io.swagger.annotations.Authorization; import io.swagger.annotations.SecurityDefinition; import io.swagger.annotations.SwaggerDefinition; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.TreeMap; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.BiConsumer; import javax.inject.Inject; import javax.ws.rs.GET; import javax.ws.rs.Path; @@ -42,6 +44,7 @@ import javax.ws.rs.core.Context; import javax.ws.rs.core.HttpHeaders; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; +import org.apache.commons.lang3.tuple.Pair; import org.apache.pinot.broker.broker.AccessControlFactory; import org.apache.pinot.broker.routing.BrokerRoutingManager; import org.apache.pinot.common.request.BrokerRequest; @@ -114,13 +117,45 @@ public class PinotBrokerDebug { public Map<String, Map<ServerInstance, List<String>>> getRoutingTable( @ApiParam(value = "Name of the table") @PathParam("tableName") String tableName) { Map<String, Map<ServerInstance, List<String>>> result = new TreeMap<>(); + getRoutingTable(tableName, (tableNameWithType, routingTable) -> result.put(tableNameWithType, + removeOptionalSegments(routingTable.getServerInstanceToSegmentsMap()))); + if (!result.isEmpty()) { + return result; + } else { + throw new WebApplicationException("Cannot find routing for table: " + tableName, Response.Status.NOT_FOUND); + } + } + + @GET + @Produces(MediaType.APPLICATION_JSON) + @Path("/debug/routingTableWithOptionalSegments/{tableName}") + @Authorize(targetType = TargetType.TABLE, paramName = "tableName", action = Actions.Table.GET_ROUTING_TABLE) + @ApiOperation(value = "Get the routing table for a table, including optional segments") + @ApiResponses(value = { + @ApiResponse(code = 200, message = "Routing table"), + @ApiResponse(code = 404, message = "Routing not found"), + @ApiResponse(code = 500, message = "Internal server error") + }) + public Map<String, Map<ServerInstance, Pair<List<String>, List<String>>>> getRoutingTableWithOptionalSegments( + @ApiParam(value = "Name of the table") @PathParam("tableName") String tableName) { + Map<String, Map<ServerInstance, Pair<List<String>, List<String>>>> result = new TreeMap<>(); + getRoutingTable(tableName, (tableNameWithType, routingTable) -> result.put(tableNameWithType, + routingTable.getServerInstanceToSegmentsMap())); + if (!result.isEmpty()) { + return result; + } else { + throw new WebApplicationException("Cannot find routing for table: " + tableName, Response.Status.NOT_FOUND); + } + } + + private void getRoutingTable(String tableName, BiConsumer<String, RoutingTable> consumer) { TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableName); if (tableType != TableType.REALTIME) { String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(tableName); RoutingTable routingTable = _routingManager.getRoutingTable( CalciteSqlCompiler.compileToBrokerRequest("SELECT * FROM " + offlineTableName), getRequestId()); if (routingTable != null) { - result.put(offlineTableName, routingTable.getServerInstanceToSegmentsMap()); + consumer.accept(offlineTableName, routingTable); } } if (tableType != TableType.OFFLINE) { @@ -128,14 +163,16 @@ public class PinotBrokerDebug { RoutingTable routingTable = _routingManager.getRoutingTable( CalciteSqlCompiler.compileToBrokerRequest("SELECT * FROM " + realtimeTableName), getRequestId()); if (routingTable != null) { - result.put(realtimeTableName, routingTable.getServerInstanceToSegmentsMap()); + consumer.accept(realtimeTableName, routingTable); } } - if (!result.isEmpty()) { - return result; - } else { - throw new WebApplicationException("Cannot find routing for table: " + tableName, Response.Status.NOT_FOUND); - } + } + + private static Map<ServerInstance, List<String>> removeOptionalSegments( + Map<ServerInstance, Pair<List<String>, List<String>>> serverInstanceToSegmentsMap) { + Map<ServerInstance, List<String>> ret = new HashMap<>(); + serverInstanceToSegmentsMap.forEach((k, v) -> ret.put(k, v.getLeft())); + return ret; } @GET @@ -152,7 +189,39 @@ public class PinotBrokerDebug { @ApiParam(value = "SQL query (table name should have type suffix)") @QueryParam("query") String query, @Context HttpHeaders httpHeaders) { BrokerRequest brokerRequest = CalciteSqlCompiler.compileToBrokerRequest(query); + checkAccessControl(brokerRequest, httpHeaders); + RoutingTable routingTable = _routingManager.getRoutingTable(brokerRequest, getRequestId()); + if (routingTable != null) { + return removeOptionalSegments(routingTable.getServerInstanceToSegmentsMap()); + } else { + throw new WebApplicationException("Cannot find routing for query: " + query, Response.Status.NOT_FOUND); + } + } + + @GET + @Produces(MediaType.APPLICATION_JSON) + @Path("/debug/routingTableWithOptionalSegments/sql") + @ManualAuthorization + @ApiOperation(value = "Get the routing table for a query, including optional segments") + @ApiResponses(value = { + @ApiResponse(code = 200, message = "Routing table"), + @ApiResponse(code = 404, message = "Routing not found"), + @ApiResponse(code = 500, message = "Internal server error") + }) + public Map<ServerInstance, Pair<List<String>, List<String>>> getRoutingTableForQueryWithOptionalSegments( + @ApiParam(value = "SQL query (table name should have type suffix)") @QueryParam("query") String query, + @Context HttpHeaders httpHeaders) { + BrokerRequest brokerRequest = CalciteSqlCompiler.compileToBrokerRequest(query); + checkAccessControl(brokerRequest, httpHeaders); + RoutingTable routingTable = _routingManager.getRoutingTable(brokerRequest, getRequestId()); + if (routingTable != null) { + return routingTable.getServerInstanceToSegmentsMap(); + } else { + throw new WebApplicationException("Cannot find routing for query: " + query, Response.Status.NOT_FOUND); + } + } + private void checkAccessControl(BrokerRequest brokerRequest, HttpHeaders httpHeaders) { // TODO: Handle nested queries if (brokerRequest.isSetQuerySource() && brokerRequest.getQuerySource().isSetTableName()) { if (!_accessControlFactory.create() @@ -163,13 +232,6 @@ public class PinotBrokerDebug { } else { throw new WebApplicationException("Table name is not set in the query", Response.Status.BAD_REQUEST); } - - RoutingTable routingTable = _routingManager.getRoutingTable(brokerRequest, getRequestId()); - if (routingTable != null) { - return routingTable.getServerInstanceToSegmentsMap(); - } else { - throw new WebApplicationException("Cannot find routing for query: " + query, Response.Status.NOT_FOUND); - } } /** diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java index bcc5049ff0..c93248fbe4 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java @@ -43,6 +43,7 @@ import javax.ws.rs.core.HttpHeaders; import javax.ws.rs.core.Response; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.tuple.Pair; import org.apache.http.client.methods.HttpDelete; import org.apache.http.conn.HttpClientConnectionManager; import org.apache.http.util.EntityUtils; @@ -592,8 +593,8 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { // Calculate routing table for the query // TODO: Modify RoutingManager interface to directly take PinotQuery long routingStartTimeNs = System.nanoTime(); - Map<ServerInstance, List<String>> offlineRoutingTable = null; - Map<ServerInstance, List<String>> realtimeRoutingTable = null; + Map<ServerInstance, Pair<List<String>, List<String>>> offlineRoutingTable = null; + Map<ServerInstance, Pair<List<String>, List<String>>> realtimeRoutingTable = null; List<String> unavailableSegments = new ArrayList<>(); int numPrunedSegmentsTotal = 0; if (offlineBrokerRequest != null) { @@ -601,7 +602,8 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { RoutingTable routingTable = _routingManager.getRoutingTable(offlineBrokerRequest, requestId); if (routingTable != null) { unavailableSegments.addAll(routingTable.getUnavailableSegments()); - Map<ServerInstance, List<String>> serverInstanceToSegmentsMap = routingTable.getServerInstanceToSegmentsMap(); + Map<ServerInstance, Pair<List<String>, List<String>>> serverInstanceToSegmentsMap = + routingTable.getServerInstanceToSegmentsMap(); if (!serverInstanceToSegmentsMap.isEmpty()) { offlineRoutingTable = serverInstanceToSegmentsMap; } else { @@ -617,7 +619,8 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { RoutingTable routingTable = _routingManager.getRoutingTable(realtimeBrokerRequest, requestId); if (routingTable != null) { unavailableSegments.addAll(routingTable.getUnavailableSegments()); - Map<ServerInstance, List<String>> serverInstanceToSegmentsMap = routingTable.getServerInstanceToSegmentsMap(); + Map<ServerInstance, Pair<List<String>, List<String>>> serverInstanceToSegmentsMap = + routingTable.getServerInstanceToSegmentsMap(); if (!serverInstanceToSegmentsMap.isEmpty()) { realtimeRoutingTable = serverInstanceToSegmentsMap; } else { @@ -1815,9 +1818,10 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { */ protected abstract BrokerResponseNative processBrokerRequest(long requestId, BrokerRequest originalBrokerRequest, BrokerRequest serverBrokerRequest, @Nullable BrokerRequest offlineBrokerRequest, - @Nullable Map<ServerInstance, List<String>> offlineRoutingTable, @Nullable BrokerRequest realtimeBrokerRequest, - @Nullable Map<ServerInstance, List<String>> realtimeRoutingTable, long timeoutMs, ServerStats serverStats, - RequestContext requestContext) + @Nullable Map<ServerInstance, Pair<List<String>, List<String>>> offlineRoutingTable, + @Nullable BrokerRequest realtimeBrokerRequest, + @Nullable Map<ServerInstance, Pair<List<String>, List<String>>> realtimeRoutingTable, long timeoutMs, + ServerStats serverStats, RequestContext requestContext) throws Exception; protected static boolean isPartialResult(BrokerResponse brokerResponse) { @@ -1858,8 +1862,8 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { statistics.setNumSegmentsPrunedByValue(response.getNumSegmentsPrunedByValue()); statistics.setExplainPlanNumEmptyFilterSegments(response.getExplainPlanNumEmptyFilterSegments()); statistics.setExplainPlanNumMatchAllFilterSegments(response.getExplainPlanNumMatchAllFilterSegments()); - statistics.setProcessingExceptions(response.getProcessingExceptions().stream().map(Object::toString).collect( - Collectors.toList())); + statistics.setProcessingExceptions( + response.getProcessingExceptions().stream().map(Object::toString).collect(Collectors.toList())); } private String getGlobalQueryId(long requestId) { @@ -1888,8 +1892,8 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { final String _query; final Set<ServerInstance> _servers = new HashSet<>(); - QueryServers(String query, @Nullable Map<ServerInstance, List<String>> offlineRoutingTable, - @Nullable Map<ServerInstance, List<String>> realtimeRoutingTable) { + QueryServers(String query, @Nullable Map<ServerInstance, Pair<List<String>, List<String>>> offlineRoutingTable, + @Nullable Map<ServerInstance, Pair<List<String>, List<String>>> realtimeRoutingTable) { _query = query; if (offlineRoutingTable != null) { _servers.addAll(offlineRoutingTable.keySet()); diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java index b2e36d16ef..c2cd3d6027 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java @@ -25,6 +25,7 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; +import org.apache.commons.lang3.tuple.Pair; import org.apache.pinot.broker.broker.AccessControlFactory; import org.apache.pinot.broker.queryquota.QueryQuotaManager; import org.apache.pinot.broker.routing.BrokerRoutingManager; @@ -88,9 +89,10 @@ public class GrpcBrokerRequestHandler extends BaseBrokerRequestHandler { @Override protected BrokerResponseNative processBrokerRequest(long requestId, BrokerRequest originalBrokerRequest, BrokerRequest serverBrokerRequest, @Nullable BrokerRequest offlineBrokerRequest, - @Nullable Map<ServerInstance, List<String>> offlineRoutingTable, @Nullable BrokerRequest realtimeBrokerRequest, - @Nullable Map<ServerInstance, List<String>> realtimeRoutingTable, long timeoutMs, ServerStats serverStats, - RequestContext requestContext) + @Nullable Map<ServerInstance, Pair<List<String>, List<String>>> offlineRoutingTable, + @Nullable BrokerRequest realtimeBrokerRequest, + @Nullable Map<ServerInstance, Pair<List<String>, List<String>>> realtimeRoutingTable, long timeoutMs, + ServerStats serverStats, RequestContext requestContext) throws Exception { // TODO: Support failure detection assert offlineBrokerRequest != null || realtimeBrokerRequest != null; @@ -106,8 +108,8 @@ public class GrpcBrokerRequestHandler extends BaseBrokerRequestHandler { requestContext.isSampledRequest()); } final long startReduceTimeNanos = System.nanoTime(); - BrokerResponseNative brokerResponse = _streamingReduceService.reduceOnStreamResponse(originalBrokerRequest, - responseMap, timeoutMs, _brokerMetrics); + BrokerResponseNative brokerResponse = + _streamingReduceService.reduceOnStreamResponse(originalBrokerRequest, responseMap, timeoutMs, _brokerMetrics); requestContext.setReduceTimeNanos(System.nanoTime() - startReduceTimeNanos); return brokerResponse; } @@ -116,11 +118,12 @@ public class GrpcBrokerRequestHandler extends BaseBrokerRequestHandler { * Query pinot server for data table. */ private void sendRequest(long requestId, TableType tableType, BrokerRequest brokerRequest, - Map<ServerInstance, List<String>> routingTable, + Map<ServerInstance, Pair<List<String>, List<String>>> routingTable, Map<ServerRoutingInstance, Iterator<Server.ServerResponse>> responseMap, boolean trace) { - for (Map.Entry<ServerInstance, List<String>> routingEntry : routingTable.entrySet()) { + for (Map.Entry<ServerInstance, Pair<List<String>, List<String>>> routingEntry : routingTable.entrySet()) { ServerInstance serverInstance = routingEntry.getKey(); - List<String> segments = routingEntry.getValue(); + // TODO: support optional segments for GrpcQueryServer. + List<String> segments = routingEntry.getValue().getLeft(); String serverHost = serverInstance.getHostname(); int port = serverInstance.getGrpcPort(); // TODO: enable throttling on per host bases. diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java index d329a99f69..9e74419830 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java @@ -31,6 +31,7 @@ import javax.ws.rs.WebApplicationException; import javax.ws.rs.core.HttpHeaders; import javax.ws.rs.core.Response; import org.apache.calcite.jdbc.CalciteSchemaBuilder; +import org.apache.commons.lang3.tuple.Pair; import org.apache.pinot.broker.api.AccessControl; import org.apache.pinot.broker.api.RequesterIdentity; import org.apache.pinot.broker.broker.AccessControlFactory; @@ -314,9 +315,11 @@ public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler { @Override protected BrokerResponseNative processBrokerRequest(long requestId, BrokerRequest originalBrokerRequest, BrokerRequest serverBrokerRequest, @Nullable BrokerRequest offlineBrokerRequest, - @Nullable Map<ServerInstance, List<String>> offlineRoutingTable, @Nullable BrokerRequest realtimeBrokerRequest, - @Nullable Map<ServerInstance, List<String>> realtimeRoutingTable, long timeoutMs, ServerStats serverStats, - RequestContext requestContext) { + @Nullable Map<ServerInstance, Pair<List<String>, List<String>>> offlineRoutingTable, + @Nullable BrokerRequest realtimeBrokerRequest, + @Nullable Map<ServerInstance, Pair<List<String>, List<String>>> realtimeRoutingTable, long timeoutMs, + ServerStats serverStats, RequestContext requestContext) + throws Exception { throw new UnsupportedOperationException(); } diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java index d44532a500..68ae70f9eb 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java @@ -25,6 +25,7 @@ import java.util.Map; import java.util.concurrent.TimeUnit; import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; +import org.apache.commons.lang3.tuple.Pair; import org.apache.pinot.broker.broker.AccessControlFactory; import org.apache.pinot.broker.failuredetector.FailureDetector; import org.apache.pinot.broker.failuredetector.FailureDetectorFactory; @@ -101,9 +102,10 @@ public class SingleConnectionBrokerRequestHandler extends BaseBrokerRequestHandl @Override protected BrokerResponseNative processBrokerRequest(long requestId, BrokerRequest originalBrokerRequest, BrokerRequest serverBrokerRequest, @Nullable BrokerRequest offlineBrokerRequest, - @Nullable Map<ServerInstance, List<String>> offlineRoutingTable, @Nullable BrokerRequest realtimeBrokerRequest, - @Nullable Map<ServerInstance, List<String>> realtimeRoutingTable, long timeoutMs, ServerStats serverStats, - RequestContext requestContext) + @Nullable Map<ServerInstance, Pair<List<String>, List<String>>> offlineRoutingTable, + @Nullable BrokerRequest realtimeBrokerRequest, + @Nullable Map<ServerInstance, Pair<List<String>, List<String>>> realtimeRoutingTable, long timeoutMs, + ServerStats serverStats, RequestContext requestContext) throws Exception { assert offlineBrokerRequest != null || realtimeBrokerRequest != null; if (requestContext.isSampledRequest()) { diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java index 3cd14e676f..cc3a5354ef 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java @@ -28,6 +28,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import javax.annotation.Nullable; +import org.apache.commons.lang3.tuple.Pair; import org.apache.helix.AccessOption; import org.apache.helix.BaseDataAccessor; import org.apache.helix.HelixConstants.ChangeType; @@ -610,19 +611,38 @@ public class BrokerRoutingManager implements RoutingManager, ClusterChangeHandle return null; } InstanceSelector.SelectionResult selectionResult = routingEntry.calculateRouting(brokerRequest, requestId); - Map<String, String> segmentToInstanceMap = selectionResult.getSegmentToInstanceMap(); - Map<ServerInstance, List<String>> serverInstanceToSegmentsMap = new HashMap<>(); - for (Map.Entry<String, String> entry : segmentToInstanceMap.entrySet()) { + return new RoutingTable(getServerInstanceToSegmentsMap(tableNameWithType, selectionResult), + selectionResult.getUnavailableSegments(), selectionResult.getNumPrunedSegments()); + } + + private Map<ServerInstance, Pair<List<String>, List<String>>> getServerInstanceToSegmentsMap(String tableNameWithType, + InstanceSelector.SelectionResult selectionResult) { + Map<ServerInstance, Pair<List<String>, List<String>>> merged = new HashMap<>(); + for (Map.Entry<String, String> entry : selectionResult.getSegmentToInstanceMap().entrySet()) { ServerInstance serverInstance = _enabledServerInstanceMap.get(entry.getValue()); if (serverInstance != null) { - serverInstanceToSegmentsMap.computeIfAbsent(serverInstance, k -> new ArrayList<>()).add(entry.getKey()); + Pair<List<String>, List<String>> pair = + merged.computeIfAbsent(serverInstance, k -> Pair.of(new ArrayList<>(), new ArrayList<>())); + pair.getLeft().add(entry.getKey()); } else { // Should not happen in normal case unless encountered unexpected exception when updating routing entries _brokerMetrics.addMeteredTableValue(tableNameWithType, BrokerMeter.SERVER_MISSING_FOR_ROUTING, 1L); } } - return new RoutingTable(serverInstanceToSegmentsMap, selectionResult.getUnavailableSegments(), - selectionResult.getNumPrunedSegments()); + for (Map.Entry<String, String> entry : selectionResult.getOptionalSegmentToInstanceMap().entrySet()) { + ServerInstance serverInstance = _enabledServerInstanceMap.get(entry.getValue()); + if (serverInstance != null) { + Pair<List<String>, List<String>> pair = merged.get(serverInstance); + // Skip servers that don't have non-optional segments, so that servers always get some non-optional segments + // to process, to be backward compatible. + // TODO: allow servers only with optional segments + if (pair != null) { + pair.getRight().add(entry.getKey()); + } + } + // TODO: Report missing server metrics when we allow servers only with optional segments. + } + return merged; } @Override @@ -795,7 +815,8 @@ public class BrokerRoutingManager implements RoutingManager, ClusterChangeHandle selectionResult.setNumPrunedSegments(numPrunedSegments); return selectionResult; } else { - return new InstanceSelector.SelectionResult(Collections.emptyMap(), Collections.emptyList(), numPrunedSegments); + return new InstanceSelector.SelectionResult(Pair.of(Collections.emptyMap(), Collections.emptyMap()), + Collections.emptyList(), numPrunedSegments); } } } diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BalancedInstanceSelector.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BalancedInstanceSelector.java index 17c9d5e5b8..77b5389fd4 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BalancedInstanceSelector.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BalancedInstanceSelector.java @@ -24,6 +24,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import javax.annotation.Nullable; +import org.apache.commons.lang3.tuple.Pair; import org.apache.helix.store.zk.ZkHelixPropertyStore; import org.apache.helix.zookeeper.datamodel.ZNRecord; import org.apache.pinot.broker.routing.adaptiveserverselector.AdaptiveServerSelector; @@ -54,9 +55,11 @@ public class BalancedInstanceSelector extends BaseInstanceSelector { } @Override - Map<String, String> select(List<String> segments, int requestId, SegmentStates segmentStates, - Map<String, String> queryOptions) { + Pair<Map<String, String>, Map<String, String>> select(List<String> segments, int requestId, + SegmentStates segmentStates, Map<String, String> queryOptions) { Map<String, String> segmentToSelectedInstanceMap = new HashMap<>(HashUtil.getHashMapCapacity(segments.size())); + // No need to adjust this map per total segment numbers, as optional segments should be empty most of the time. + Map<String, String> optionalSegmentToInstanceMap = new HashMap<>(); if (_adaptiveServerSelector != null) { for (String segment : segments) { List<SegmentInstanceCandidate> candidates = segmentStates.getCandidates(segment); @@ -70,8 +73,12 @@ public class BalancedInstanceSelector extends BaseInstanceSelector { candidateInstances.add(candidate.getInstance()); } String selectedInstance = _adaptiveServerSelector.select(candidateInstances); + // This can only be offline when it is a new segment. And such segment is marked as optional segment so that + // broker or server can skip it upon any issue to process it. if (candidates.get(candidateInstances.indexOf(selectedInstance)).isOnline()) { segmentToSelectedInstanceMap.put(segment, selectedInstance); + } else { + optionalSegmentToInstanceMap.put(segment, selectedInstance); } } } else { @@ -84,11 +91,15 @@ public class BalancedInstanceSelector extends BaseInstanceSelector { } int selectedIdx = requestId++ % candidates.size(); SegmentInstanceCandidate selectedCandidate = candidates.get(selectedIdx); + // This can only be offline when it is a new segment. And such segment is marked as optional segment so that + // broker or server can skip it upon any issue to process it. if (selectedCandidate.isOnline()) { segmentToSelectedInstanceMap.put(segment, selectedCandidate.getInstance()); + } else { + optionalSegmentToInstanceMap.put(segment, selectedCandidate.getInstance()); } } } - return segmentToSelectedInstanceMap; + return Pair.of(segmentToSelectedInstanceMap, optionalSegmentToInstanceMap); } } diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java index 972142663e..b2961eef94 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java @@ -30,6 +30,7 @@ import java.util.SortedMap; import java.util.TreeMap; import java.util.TreeSet; import javax.annotation.Nullable; +import org.apache.commons.lang3.tuple.Pair; import org.apache.helix.AccessOption; import org.apache.helix.model.ExternalView; import org.apache.helix.model.IdealState; @@ -270,6 +271,10 @@ abstract class BaseInstanceSelector implements InstanceSelector { } } } + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Got _newSegmentStateMap: {}, _oldSegmentCandidatesMap: {}", _newSegmentStateMap.keySet(), + _oldSegmentCandidatesMap.keySet()); + } } /** @@ -408,10 +413,11 @@ abstract class BaseInstanceSelector implements InstanceSelector { // Copy the volatile reference so that segmentToInstanceMap and unavailableSegments can have a consistent view of // the state. SegmentStates segmentStates = _segmentStates; - Map<String, String> segmentToInstanceMap = select(segments, requestIdInt, segmentStates, queryOptions); + Pair<Map<String, String>, Map<String, String>> segmentToInstanceMap = + select(segments, requestIdInt, segmentStates, queryOptions); Set<String> unavailableSegments = segmentStates.getUnavailableSegments(); if (unavailableSegments.isEmpty()) { - return new SelectionResult(segmentToInstanceMap, Collections.emptyList()); + return new SelectionResult(segmentToInstanceMap, Collections.emptyList(), 0); } else { List<String> unavailableSegmentsForRequest = new ArrayList<>(); for (String segment : segments) { @@ -419,7 +425,7 @@ abstract class BaseInstanceSelector implements InstanceSelector { unavailableSegmentsForRequest.add(segment); } } - return new SelectionResult(segmentToInstanceMap, unavailableSegmentsForRequest); + return new SelectionResult(segmentToInstanceMap, unavailableSegmentsForRequest, 0); } } @@ -429,9 +435,11 @@ abstract class BaseInstanceSelector implements InstanceSelector { } /** - * Selects the server instances for the given segments based on the request id and segment states. Returns a map - * from segment to selected server instance hosting the segment. + * Selects the server instances for the given segments based on the request id and segment states. Returns two maps + * from segment to selected server instance hosting the segment. The 2nd map is for optional segments. The optional + * segments are used to get the new segments that is not online yet. Instead of simply skipping them by broker at + * routing time, we can send them to servers and let servers decide how to handle them. */ - abstract Map<String, String> select(List<String> segments, int requestId, SegmentStates segmentStates, - Map<String, String> queryOptions); + abstract Pair<Map<String, String>, Map<String, String>/*optional segments*/> select(List<String> segments, + int requestId, SegmentStates segmentStates, Map<String, String> queryOptions); } diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelector.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelector.java index 9ffe830229..d003723c5b 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelector.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelector.java @@ -22,6 +22,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; +import org.apache.commons.lang3.tuple.Pair; import org.apache.helix.model.ExternalView; import org.apache.helix.model.IdealState; import org.apache.pinot.broker.routing.segmentpreselector.SegmentPreSelector; @@ -75,16 +76,12 @@ public interface InstanceSelector { Set<String> getServingInstances(); class SelectionResult { - private final Map<String, String> _segmentToInstanceMap; + private final Pair<Map<String, String>, Map<String, String>/*optional segments*/> _segmentToInstanceMap; private final List<String> _unavailableSegments; private int _numPrunedSegments; - public SelectionResult(Map<String, String> segmentToInstanceMap, List<String> unavailableSegments) { - this(segmentToInstanceMap, unavailableSegments, 0); - } - - public SelectionResult(Map<String, String> segmentToInstanceMap, List<String> unavailableSegments, - int numPrunedSegments) { + public SelectionResult(Pair<Map<String, String>, Map<String, String>> segmentToInstanceMap, + List<String> unavailableSegments, int numPrunedSegments) { _segmentToInstanceMap = segmentToInstanceMap; _unavailableSegments = unavailableSegments; _numPrunedSegments = numPrunedSegments; @@ -94,7 +91,15 @@ public interface InstanceSelector { * Returns the map from segment to selected server instance hosting the segment. */ public Map<String, String> getSegmentToInstanceMap() { - return _segmentToInstanceMap; + return _segmentToInstanceMap.getLeft(); + } + + /** + * Returns the map from optional segment to selected server instance hosting the optional segment. + * Optional segments can be skipped by broker or server upon any issue w/o failing the query. + */ + public Map<String, String> getOptionalSegmentToInstanceMap() { + return _segmentToInstanceMap.getRight(); } /** diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/MultiStageReplicaGroupSelector.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/MultiStageReplicaGroupSelector.java index 9be701ebe5..15fb525a8c 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/MultiStageReplicaGroupSelector.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/MultiStageReplicaGroupSelector.java @@ -27,6 +27,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import javax.annotation.Nullable; +import org.apache.commons.lang3.tuple.Pair; import org.apache.helix.model.ExternalView; import org.apache.helix.model.IdealState; import org.apache.helix.store.zk.ZkHelixPropertyStore; @@ -81,8 +82,8 @@ public class MultiStageReplicaGroupSelector extends BaseInstanceSelector { } @Override - Map<String, String> select(List<String> segments, int requestId, SegmentStates segmentStates, - Map<String, String> queryOptions) { + Pair<Map<String, String>, Map<String, String>> select(List<String> segments, int requestId, + SegmentStates segmentStates, Map<String, String> queryOptions) { // Create a copy of InstancePartitions to avoid race-condition with event-listeners above. InstancePartitions instancePartitions = _instancePartitions; int replicaGroupSelected = requestId % instancePartitions.getNumReplicaGroups(); @@ -102,14 +103,15 @@ public class MultiStageReplicaGroupSelector extends BaseInstanceSelector { * Returns a map from the segmentName to the corresponding server in the given replica-group. If the is not enabled, * we throw an exception. */ - private Map<String, String> tryAssigning(List<String> segments, SegmentStates segmentStates, - InstancePartitions instancePartitions, int replicaId) { + private Pair<Map<String, String>, Map<String, String>> tryAssigning(List<String> segments, + SegmentStates segmentStates, InstancePartitions instancePartitions, int replicaId) { Set<String> instanceLookUpSet = new HashSet<>(); for (int partition = 0; partition < instancePartitions.getNumPartitions(); partition++) { List<String> instances = instancePartitions.getInstances(partition, replicaId); instanceLookUpSet.addAll(instances); } - Map<String, String> result = new HashMap<>(); + Map<String, String> segmentToSelectedInstanceMap = new HashMap<>(); + Map<String, String> optionalSegmentToInstanceMap = new HashMap<>(); for (String segment : segments) { List<SegmentInstanceCandidate> candidates = segmentStates.getCandidates(segment); // If candidates are null, we will throw an exception and log a warning. @@ -119,8 +121,12 @@ public class MultiStageReplicaGroupSelector extends BaseInstanceSelector { String instance = candidate.getInstance(); if (instanceLookUpSet.contains(instance)) { found = true; + // This can only be offline when it is a new segment. And such segment is marked as optional segment so that + // broker or server can skip it upon any issue to process it. if (candidate.isOnline()) { - result.put(segment, instance); + segmentToSelectedInstanceMap.put(segment, instance); + } else { + optionalSegmentToInstanceMap.put(segment, instance); } break; } @@ -129,7 +135,7 @@ public class MultiStageReplicaGroupSelector extends BaseInstanceSelector { throw new RuntimeException(String.format("Unable to find an enabled instance for segment: %s", segment)); } } - return result; + return Pair.of(segmentToSelectedInstanceMap, optionalSegmentToInstanceMap); } @VisibleForTesting diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/ReplicaGroupInstanceSelector.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/ReplicaGroupInstanceSelector.java index 9aedaa8e66..3683ca46bb 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/ReplicaGroupInstanceSelector.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/ReplicaGroupInstanceSelector.java @@ -69,8 +69,8 @@ public class ReplicaGroupInstanceSelector extends BaseInstanceSelector { } @Override - Map<String, String> select(List<String> segments, int requestId, SegmentStates segmentStates, - Map<String, String> queryOptions) { + Pair<Map<String, String>, Map<String, String>> select(List<String> segments, int requestId, + SegmentStates segmentStates, Map<String, String> queryOptions) { if (_adaptiveServerSelector != null) { // Adaptive Server Selection is enabled. List<String> serverRankList = new ArrayList<>(); @@ -90,9 +90,11 @@ public class ReplicaGroupInstanceSelector extends BaseInstanceSelector { } } - private Map<String, String> selectServersUsingRoundRobin(List<String> segments, int requestId, - SegmentStates segmentStates, Map<String, String> queryOptions) { - Map<String, String> selectedServers = new HashMap<>(HashUtil.getHashMapCapacity(segments.size())); + private Pair<Map<String, String>, Map<String, String>> selectServersUsingRoundRobin(List<String> segments, + int requestId, SegmentStates segmentStates, Map<String, String> queryOptions) { + Map<String, String> segmentToSelectedInstanceMap = new HashMap<>(HashUtil.getHashMapCapacity(segments.size())); + // No need to adjust this map per total segment numbers, as optional segments should be empty most of the time. + Map<String, String> optionalSegmentToInstanceMap = new HashMap<>(); Integer numReplicaGroupsToQuery = QueryOptionsUtils.getNumReplicaGroupsToQuery(queryOptions); int numReplicaGroups = numReplicaGroupsToQuery == null ? 1 : numReplicaGroupsToQuery; int replicaOffset = 0; @@ -107,22 +109,26 @@ public class ReplicaGroupInstanceSelector extends BaseInstanceSelector { int numCandidates = candidates.size(); int instanceIdx = (requestId + replicaOffset) % numCandidates; SegmentInstanceCandidate selectedInstance = candidates.get(instanceIdx); - // Only put online instance. - // This can only be offline when it is a new segment. + // This can only be offline when it is a new segment. And such segment is marked as optional segment so that + // broker or server can skip it upon any issue to process it. if (selectedInstance.isOnline()) { - selectedServers.put(segment, selectedInstance.getInstance()); + segmentToSelectedInstanceMap.put(segment, selectedInstance.getInstance()); + } else { + optionalSegmentToInstanceMap.put(segment, selectedInstance.getInstance()); } if (numReplicaGroups > numCandidates) { numReplicaGroups = numCandidates; } replicaOffset = (replicaOffset + 1) % numReplicaGroups; } - return selectedServers; + return Pair.of(segmentToSelectedInstanceMap, optionalSegmentToInstanceMap); } - private Map<String, String> selectServersUsingAdaptiveServerSelector(List<String> segments, int requestId, - SegmentStates segmentStates, List<String> serverRankList) { - Map<String, String> selectedServers = new HashMap<>(HashUtil.getHashMapCapacity(segments.size())); + private Pair<Map<String, String>, Map<String, String>> selectServersUsingAdaptiveServerSelector(List<String> segments, + int requestId, SegmentStates segmentStates, List<String> serverRankList) { + Map<String, String> segmentToSelectedInstanceMap = new HashMap<>(HashUtil.getHashMapCapacity(segments.size())); + // No need to adjust this map per total segment numbers, as optional segments should be empty most of the time. + Map<String, String> optionalSegmentToInstanceMap = new HashMap<>(); for (String segment : segments) { // NOTE: candidates can be null when there is no enabled instances for the segment, or the instance selector has // not been updated (we update all components for routing in sequence) @@ -151,13 +157,15 @@ public class ReplicaGroupInstanceSelector extends BaseInstanceSelector { } } } - // Only put online instance. - // This can only be offline when it is a new segment. + // This can only be offline when it is a new segment. And such segment is marked as optional segment so that + // broker or server can skip it upon any issue to process it. if (selectedInstance.isOnline()) { - selectedServers.put(segment, selectedInstance.getInstance()); + segmentToSelectedInstanceMap.put(segment, selectedInstance.getInstance()); + } else { + optionalSegmentToInstanceMap.put(segment, selectedInstance.getInstance()); } } - return selectedServers; + return Pair.of(segmentToSelectedInstanceMap, optionalSegmentToInstanceMap); } private List<String> fetchCandidateServersForQuery(List<String> segments, SegmentStates segmentStates) { diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/StrictReplicaGroupInstanceSelector.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/StrictReplicaGroupInstanceSelector.java index 8206d49452..8c352bdbe6 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/StrictReplicaGroupInstanceSelector.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/StrictReplicaGroupInstanceSelector.java @@ -172,5 +172,9 @@ public class StrictReplicaGroupInstanceSelector extends ReplicaGroupInstanceSele } _newSegmentStateMap.put(segment, new NewSegmentState(newSegmentCreationTimeMap.get(segment), candidates)); } + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Got _newSegmentStateMap: {}, _oldSegmentCandidatesMap: {}", _newSegmentStateMap.keySet(), + _oldSegmentCandidatesMap.keySet()); + } } } diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java index 438836a4cf..039d7a4205 100644 --- a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java +++ b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java @@ -163,7 +163,8 @@ public class HelixBrokerStarterTest extends ControllerTest { RoutingTable routingTable = routingManager.getRoutingTable(brokerRequest, 0); assertNotNull(routingTable); assertEquals(routingTable.getServerInstanceToSegmentsMap().size(), NUM_SERVERS); - assertEquals(routingTable.getServerInstanceToSegmentsMap().values().iterator().next().size(), NUM_OFFLINE_SEGMENTS); + assertEquals(routingTable.getServerInstanceToSegmentsMap().values().iterator().next().getLeft().size(), + NUM_OFFLINE_SEGMENTS); assertTrue(routingTable.getUnavailableSegments().isEmpty()); // Add a new segment into the OFFLINE table @@ -171,9 +172,9 @@ public class HelixBrokerStarterTest extends ControllerTest { SegmentMetadataMockUtils.mockSegmentMetadata(RAW_TABLE_NAME), "downloadUrl"); TestUtils.waitForCondition(aVoid -> - routingManager.getRoutingTable(brokerRequest, 0).getServerInstanceToSegmentsMap() - .values().iterator().next().size() == NUM_OFFLINE_SEGMENTS + 1, 30_000L, "Failed to add the new segment " - + "into the routing table"); + routingManager.getRoutingTable(brokerRequest, 0).getServerInstanceToSegmentsMap().values().iterator().next() + .getLeft().size() == NUM_OFFLINE_SEGMENTS + 1, 30_000L, + "Failed to add the new segment " + "into the routing table"); // Add a new table with different broker tenant String newRawTableName = "newTable"; diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandlerTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandlerTest.java index e38db6e020..29d2c5b428 100644 --- a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandlerTest.java +++ b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandlerTest.java @@ -28,6 +28,7 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import javax.annotation.Nullable; +import org.apache.commons.lang3.tuple.Pair; import org.apache.helix.model.InstanceConfig; import org.apache.pinot.broker.broker.AllowAllAccessControlFactory; import org.apache.pinot.broker.queryquota.QueryQuotaManager; @@ -199,7 +200,7 @@ public class BaseBrokerRequestHandlerTest { RoutingTable rt = mock(RoutingTable.class); when(rt.getServerInstanceToSegmentsMap()).thenReturn( Collections.singletonMap(new ServerInstance(new InstanceConfig("server01_9000")), - Collections.singletonList("segment01"))); + Pair.of(Collections.singletonList("segment01"), Collections.emptyList()))); when(routingManager.getRoutingTable(any(), Mockito.anyLong())).thenReturn(rt); QueryQuotaManager queryQuotaManager = mock(QueryQuotaManager.class); when(queryQuotaManager.acquire(anyString())).thenReturn(true); @@ -223,10 +224,10 @@ public class BaseBrokerRequestHandlerTest { @Override protected BrokerResponseNative processBrokerRequest(long requestId, BrokerRequest originalBrokerRequest, BrokerRequest serverBrokerRequest, @Nullable BrokerRequest offlineBrokerRequest, - @Nullable Map<ServerInstance, List<String>> offlineRoutingTable, + @Nullable Map<ServerInstance, Pair<List<String>, List<String>>> offlineRoutingTable, @Nullable BrokerRequest realtimeBrokerRequest, - @Nullable Map<ServerInstance, List<String>> realtimeRoutingTable, long timeoutMs, ServerStats serverStats, - RequestContext requestContext) + @Nullable Map<ServerInstance, Pair<List<String>, List<String>>> realtimeRoutingTable, long timeoutMs, + ServerStats serverStats, RequestContext requestContext) throws Exception { testRequestId[0] = requestId; latch.await(); diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorTest.java index fd2568e183..c748be4885 100644 --- a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorTest.java +++ b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorTest.java @@ -151,7 +151,7 @@ public class InstanceSelectorTest { } private static boolean isReplicaGroupType(String selectorType) { - return selectorType.equals(RoutingConfig.REPLICA_GROUP_INSTANCE_SELECTOR_TYPE) || selectorType.equals( + return selectorType.equals(REPLICA_GROUP_INSTANCE_SELECTOR_TYPE) || selectorType.equals( STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE); } @@ -165,7 +165,7 @@ public class InstanceSelectorTest { @DataProvider(name = "selectorType") public Object[] getSelectorType() { return new Object[]{ - RoutingConfig.REPLICA_GROUP_INSTANCE_SELECTOR_TYPE, STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE, + REPLICA_GROUP_INSTANCE_SELECTOR_TYPE, STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE, BALANCED_INSTANCE_SELECTOR }; } @@ -203,7 +203,7 @@ public class InstanceSelectorTest { brokerMetrics) instanceof BalancedInstanceSelector); // Replica-group instance selector should be returned - when(routingConfig.getInstanceSelectorType()).thenReturn(RoutingConfig.REPLICA_GROUP_INSTANCE_SELECTOR_TYPE); + when(routingConfig.getInstanceSelectorType()).thenReturn(REPLICA_GROUP_INSTANCE_SELECTOR_TYPE); assertTrue(InstanceSelectorFactory.getInstanceSelector(tableConfig, propertyStore, brokerMetrics) instanceof ReplicaGroupInstanceSelector); @@ -1414,15 +1414,15 @@ public class InstanceSelectorTest { // First selection, we select instance0 for oldSeg and instance1 for newSeg in balance selector // For replica group, we select instance0 for oldSeg and newSeg. Because newSeg is not online in instance0, so // we exclude it from selection result. - Map<String, String> expectedSelectionResult; + InstanceSelector.SelectionResult selectionResult = + selector.select(_brokerRequest, Lists.newArrayList(onlineSegments), requestId); if (isReplicaGroupType(selectorType)) { - expectedSelectionResult = ImmutableMap.of(oldSeg, instance0); + assertEquals(selectionResult.getSegmentToInstanceMap(), ImmutableMap.of(oldSeg, instance0)); + assertEquals(selectionResult.getOptionalSegmentToInstanceMap(), ImmutableMap.of(newSeg, instance0)); } else { - expectedSelectionResult = ImmutableMap.of(oldSeg, instance0, newSeg, instance1); + assertEquals(selectionResult.getSegmentToInstanceMap(), ImmutableMap.of(oldSeg, instance0, newSeg, instance1)); + assertTrue(selectionResult.getOptionalSegmentToInstanceMap().isEmpty()); } - InstanceSelector.SelectionResult selectionResult = - selector.select(_brokerRequest, Lists.newArrayList(onlineSegments), requestId); - assertEquals(selectionResult.getSegmentToInstanceMap(), expectedSelectionResult); assertTrue(selectionResult.getUnavailableSegments().isEmpty()); } { @@ -1430,21 +1430,22 @@ public class InstanceSelectorTest { // Second selection, we select instance1 for oldSeg and instance0 for newSeg in balance selector // Because newSeg is not online in instance0, so we exclude it from selection result. // For replica group, we select instance1 for oldSeg and newSeg. - Map<String, String> expectedSelectionResult; + InstanceSelector.SelectionResult selectionResult = + selector.select(_brokerRequest, Lists.newArrayList(onlineSegments), requestId); switch (selectorType) { case BALANCED_INSTANCE_SELECTOR: - expectedSelectionResult = ImmutableMap.of(oldSeg, instance1); + assertEquals(selectionResult.getSegmentToInstanceMap(), ImmutableMap.of(oldSeg, instance1)); + assertEquals(selectionResult.getOptionalSegmentToInstanceMap(), ImmutableMap.of(newSeg, instance0)); break; case STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE: // fall through - case RoutingConfig.REPLICA_GROUP_INSTANCE_SELECTOR_TYPE: - expectedSelectionResult = ImmutableMap.of(oldSeg, instance1, newSeg, instance1); + case REPLICA_GROUP_INSTANCE_SELECTOR_TYPE: + assertEquals(selectionResult.getSegmentToInstanceMap(), + ImmutableMap.of(oldSeg, instance1, newSeg, instance1)); + assertTrue(selectionResult.getOptionalSegmentToInstanceMap().isEmpty()); break; default: throw new RuntimeException("unsupported selector type:" + selectorType); } - InstanceSelector.SelectionResult selectionResult = - selector.select(_brokerRequest, Lists.newArrayList(onlineSegments), requestId); - assertEquals(selectionResult.getSegmentToInstanceMap(), expectedSelectionResult); assertTrue(selectionResult.getUnavailableSegments().isEmpty()); } // Advance the clock to make newSeg to old segment. @@ -1453,21 +1454,22 @@ public class InstanceSelectorTest { selector.init(enabledInstances, idealState, externalView, onlineSegments); { int requestId = 0; - Map<String, String> expectedSelectionResult; + InstanceSelector.SelectionResult selectionResult = + selector.select(_brokerRequest, Lists.newArrayList(onlineSegments), requestId); switch (selectorType) { case BALANCED_INSTANCE_SELECTOR: // fall through - case RoutingConfig.REPLICA_GROUP_INSTANCE_SELECTOR_TYPE: - expectedSelectionResult = ImmutableMap.of(oldSeg, instance0, newSeg, instance1); + case REPLICA_GROUP_INSTANCE_SELECTOR_TYPE: + assertEquals(selectionResult.getSegmentToInstanceMap(), + ImmutableMap.of(oldSeg, instance0, newSeg, instance1)); break; case STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE: - expectedSelectionResult = ImmutableMap.of(oldSeg, instance1, newSeg, instance1); + assertEquals(selectionResult.getSegmentToInstanceMap(), + ImmutableMap.of(oldSeg, instance1, newSeg, instance1)); break; default: throw new RuntimeException("unsupported selector type:" + selectorType); } - InstanceSelector.SelectionResult selectionResult = - selector.select(_brokerRequest, Lists.newArrayList(onlineSegments), requestId); - assertEquals(selectionResult.getSegmentToInstanceMap(), expectedSelectionResult); + assertTrue(selectionResult.getOptionalSegmentToInstanceMap().isEmpty()); assertTrue(selectionResult.getUnavailableSegments().isEmpty()); } { @@ -1476,6 +1478,7 @@ public class InstanceSelectorTest { InstanceSelector.SelectionResult selectionResult = selector.select(_brokerRequest, Lists.newArrayList(onlineSegments), requestId); assertEquals(selectionResult.getSegmentToInstanceMap(), expectedSelectionResult); + assertTrue(selectionResult.getOptionalSegmentToInstanceMap().isEmpty()); assertTrue(selectionResult.getUnavailableSegments().isEmpty()); } } @@ -1523,19 +1526,21 @@ public class InstanceSelectorTest { InstanceSelector.SelectionResult selectionResult = selector.select(_brokerRequest, Lists.newArrayList(onlineSegments), requestId); assertEquals(selectionResult.getSegmentToInstanceMap(), expectedResult); + assertEquals(selectionResult.getOptionalSegmentToInstanceMap(), ImmutableMap.of(newSeg, instance0)); assertTrue(selectionResult.getUnavailableSegments().isEmpty()); // Advance the clock to make newSeg to old segment and we see newSeg is reported as unavailable segment. _mutableClock.fastForward(Duration.ofMillis(NEW_SEGMENT_EXPIRATION_MILLIS + 10)); selector.init(enabledInstances, idealState, externalView, onlineSegments); selectionResult = selector.select(_brokerRequest, Lists.newArrayList(onlineSegments), requestId); - if (selectorType == STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE) { + if (STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE.equals(selectorType)) { expectedResult = ImmutableMap.of(); assertEquals(selectionResult.getUnavailableSegments(), ImmutableList.of(newSeg, oldSeg)); } else { assertEquals(selectionResult.getUnavailableSegments(), ImmutableList.of(newSeg)); } assertEquals(selectionResult.getSegmentToInstanceMap(), expectedResult); + assertTrue(selectionResult.getOptionalSegmentToInstanceMap().isEmpty()); } @Test(dataProvider = "selectorType") diff --git a/pinot-common/src/main/java/org/apache/pinot/common/request/InstanceRequest.java b/pinot-common/src/main/java/org/apache/pinot/common/request/InstanceRequest.java index 24870f77be..d266fef76b 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/request/InstanceRequest.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/request/InstanceRequest.java @@ -25,7 +25,7 @@ package org.apache.pinot.common.request; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"}) -@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.15.0)", date = "2023-09-27") +@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.15.0)", date = "2023-11-16") public class InstanceRequest implements org.apache.thrift.TBase<InstanceRequest, InstanceRequest._Fields>, java.io.Serializable, Cloneable, Comparable<InstanceRequest> { private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("InstanceRequest"); @@ -34,6 +34,7 @@ public class InstanceRequest implements org.apache.thrift.TBase<InstanceRequest, private static final org.apache.thrift.protocol.TField SEARCH_SEGMENTS_FIELD_DESC = new org.apache.thrift.protocol.TField("searchSegments", org.apache.thrift.protocol.TType.LIST, (short)3); private static final org.apache.thrift.protocol.TField ENABLE_TRACE_FIELD_DESC = new org.apache.thrift.protocol.TField("enableTrace", org.apache.thrift.protocol.TType.BOOL, (short)4); private static final org.apache.thrift.protocol.TField BROKER_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("brokerId", org.apache.thrift.protocol.TType.STRING, (short)5); + private static final org.apache.thrift.protocol.TField OPTIONAL_SEGMENTS_FIELD_DESC = new org.apache.thrift.protocol.TField("optionalSegments", org.apache.thrift.protocol.TType.LIST, (short)6); private static final org.apache.thrift.scheme.SchemeFactory STANDARD_SCHEME_FACTORY = new InstanceRequestStandardSchemeFactory(); private static final org.apache.thrift.scheme.SchemeFactory TUPLE_SCHEME_FACTORY = new InstanceRequestTupleSchemeFactory(); @@ -43,6 +44,7 @@ public class InstanceRequest implements org.apache.thrift.TBase<InstanceRequest, private @org.apache.thrift.annotation.Nullable java.util.List<java.lang.String> searchSegments; // optional private boolean enableTrace; // optional private @org.apache.thrift.annotation.Nullable java.lang.String brokerId; // optional + private @org.apache.thrift.annotation.Nullable java.util.List<java.lang.String> optionalSegments; // optional /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ public enum _Fields implements org.apache.thrift.TFieldIdEnum { @@ -50,7 +52,8 @@ public class InstanceRequest implements org.apache.thrift.TBase<InstanceRequest, QUERY((short)2, "query"), SEARCH_SEGMENTS((short)3, "searchSegments"), ENABLE_TRACE((short)4, "enableTrace"), - BROKER_ID((short)5, "brokerId"); + BROKER_ID((short)5, "brokerId"), + OPTIONAL_SEGMENTS((short)6, "optionalSegments"); private static final java.util.Map<java.lang.String, _Fields> byName = new java.util.HashMap<java.lang.String, _Fields>(); @@ -76,6 +79,8 @@ public class InstanceRequest implements org.apache.thrift.TBase<InstanceRequest, return ENABLE_TRACE; case 5: // BROKER_ID return BROKER_ID; + case 6: // OPTIONAL_SEGMENTS + return OPTIONAL_SEGMENTS; default: return null; } @@ -120,7 +125,7 @@ public class InstanceRequest implements org.apache.thrift.TBase<InstanceRequest, private static final int __REQUESTID_ISSET_ID = 0; private static final int __ENABLETRACE_ISSET_ID = 1; private byte __isset_bitfield = 0; - private static final _Fields optionals[] = {_Fields.SEARCH_SEGMENTS,_Fields.ENABLE_TRACE,_Fields.BROKER_ID}; + private static final _Fields optionals[] = {_Fields.SEARCH_SEGMENTS,_Fields.ENABLE_TRACE,_Fields.BROKER_ID,_Fields.OPTIONAL_SEGMENTS}; public static final java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; static { java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new java.util.EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class); @@ -135,6 +140,9 @@ public class InstanceRequest implements org.apache.thrift.TBase<InstanceRequest, new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.BOOL))); tmpMap.put(_Fields.BROKER_ID, new org.apache.thrift.meta_data.FieldMetaData("brokerId", org.apache.thrift.TFieldRequirementType.OPTIONAL, new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))); + tmpMap.put(_Fields.OPTIONAL_SEGMENTS, new org.apache.thrift.meta_data.FieldMetaData("optionalSegments", org.apache.thrift.TFieldRequirementType.OPTIONAL, + new org.apache.thrift.meta_data.ListMetaData(org.apache.thrift.protocol.TType.LIST, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)))); metaDataMap = java.util.Collections.unmodifiableMap(tmpMap); org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(InstanceRequest.class, metaDataMap); } @@ -169,6 +177,10 @@ public class InstanceRequest implements org.apache.thrift.TBase<InstanceRequest, if (other.isSetBrokerId()) { this.brokerId = other.brokerId; } + if (other.isSetOptionalSegments()) { + java.util.List<java.lang.String> __this__optionalSegments = new java.util.ArrayList<java.lang.String>(other.optionalSegments); + this.optionalSegments = __this__optionalSegments; + } } public InstanceRequest deepCopy() { @@ -184,6 +196,7 @@ public class InstanceRequest implements org.apache.thrift.TBase<InstanceRequest, setEnableTraceIsSet(false); this.enableTrace = false; this.brokerId = null; + this.optionalSegments = null; } public long getRequestId() { @@ -318,6 +331,46 @@ public class InstanceRequest implements org.apache.thrift.TBase<InstanceRequest, } } + public int getOptionalSegmentsSize() { + return (this.optionalSegments == null) ? 0 : this.optionalSegments.size(); + } + + @org.apache.thrift.annotation.Nullable + public java.util.Iterator<java.lang.String> getOptionalSegmentsIterator() { + return (this.optionalSegments == null) ? null : this.optionalSegments.iterator(); + } + + public void addToOptionalSegments(java.lang.String elem) { + if (this.optionalSegments == null) { + this.optionalSegments = new java.util.ArrayList<java.lang.String>(); + } + this.optionalSegments.add(elem); + } + + @org.apache.thrift.annotation.Nullable + public java.util.List<java.lang.String> getOptionalSegments() { + return this.optionalSegments; + } + + public void setOptionalSegments(@org.apache.thrift.annotation.Nullable java.util.List<java.lang.String> optionalSegments) { + this.optionalSegments = optionalSegments; + } + + public void unsetOptionalSegments() { + this.optionalSegments = null; + } + + /** Returns true if field optionalSegments is set (has been assigned a value) and false otherwise */ + public boolean isSetOptionalSegments() { + return this.optionalSegments != null; + } + + public void setOptionalSegmentsIsSet(boolean value) { + if (!value) { + this.optionalSegments = null; + } + } + public void setFieldValue(_Fields field, @org.apache.thrift.annotation.Nullable java.lang.Object value) { switch (field) { case REQUEST_ID: @@ -360,6 +413,14 @@ public class InstanceRequest implements org.apache.thrift.TBase<InstanceRequest, } break; + case OPTIONAL_SEGMENTS: + if (value == null) { + unsetOptionalSegments(); + } else { + setOptionalSegments((java.util.List<java.lang.String>)value); + } + break; + } } @@ -381,6 +442,9 @@ public class InstanceRequest implements org.apache.thrift.TBase<InstanceRequest, case BROKER_ID: return getBrokerId(); + case OPTIONAL_SEGMENTS: + return getOptionalSegments(); + } throw new java.lang.IllegalStateException(); } @@ -402,6 +466,8 @@ public class InstanceRequest implements org.apache.thrift.TBase<InstanceRequest, return isSetEnableTrace(); case BROKER_ID: return isSetBrokerId(); + case OPTIONAL_SEGMENTS: + return isSetOptionalSegments(); } throw new java.lang.IllegalStateException(); } @@ -464,6 +530,15 @@ public class InstanceRequest implements org.apache.thrift.TBase<InstanceRequest, return false; } + boolean this_present_optionalSegments = true && this.isSetOptionalSegments(); + boolean that_present_optionalSegments = true && that.isSetOptionalSegments(); + if (this_present_optionalSegments || that_present_optionalSegments) { + if (!(this_present_optionalSegments && that_present_optionalSegments)) + return false; + if (!this.optionalSegments.equals(that.optionalSegments)) + return false; + } + return true; } @@ -489,6 +564,10 @@ public class InstanceRequest implements org.apache.thrift.TBase<InstanceRequest, if (isSetBrokerId()) hashCode = hashCode * 8191 + brokerId.hashCode(); + hashCode = hashCode * 8191 + ((isSetOptionalSegments()) ? 131071 : 524287); + if (isSetOptionalSegments()) + hashCode = hashCode * 8191 + optionalSegments.hashCode(); + return hashCode; } @@ -550,6 +629,16 @@ public class InstanceRequest implements org.apache.thrift.TBase<InstanceRequest, return lastComparison; } } + lastComparison = java.lang.Boolean.compare(isSetOptionalSegments(), other.isSetOptionalSegments()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetOptionalSegments()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.optionalSegments, other.optionalSegments); + if (lastComparison != 0) { + return lastComparison; + } + } return 0; } @@ -608,6 +697,16 @@ public class InstanceRequest implements org.apache.thrift.TBase<InstanceRequest, } first = false; } + if (isSetOptionalSegments()) { + if (!first) sb.append(", "); + sb.append("optionalSegments:"); + if (this.optionalSegments == null) { + sb.append("null"); + } else { + sb.append(this.optionalSegments); + } + first = false; + } sb.append(")"); return sb.toString(); } @@ -715,6 +814,24 @@ public class InstanceRequest implements org.apache.thrift.TBase<InstanceRequest, org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } break; + case 6: // OPTIONAL_SEGMENTS + if (schemeField.type == org.apache.thrift.protocol.TType.LIST) { + { + org.apache.thrift.protocol.TList _list3 = iprot.readListBegin(); + struct.optionalSegments = new java.util.ArrayList<java.lang.String>(_list3.size); + @org.apache.thrift.annotation.Nullable java.lang.String _elem4; + for (int _i5 = 0; _i5 < _list3.size; ++_i5) + { + _elem4 = iprot.readString(); + struct.optionalSegments.add(_elem4); + } + iprot.readListEnd(); + } + struct.setOptionalSegmentsIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; default: org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } @@ -741,9 +858,9 @@ public class InstanceRequest implements org.apache.thrift.TBase<InstanceRequest, oprot.writeFieldBegin(SEARCH_SEGMENTS_FIELD_DESC); { oprot.writeListBegin(new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRING, struct.searchSegments.size())); - for (java.lang.String _iter3 : struct.searchSegments) + for (java.lang.String _iter6 : struct.searchSegments) { - oprot.writeString(_iter3); + oprot.writeString(_iter6); } oprot.writeListEnd(); } @@ -762,6 +879,20 @@ public class InstanceRequest implements org.apache.thrift.TBase<InstanceRequest, oprot.writeFieldEnd(); } } + if (struct.optionalSegments != null) { + if (struct.isSetOptionalSegments()) { + oprot.writeFieldBegin(OPTIONAL_SEGMENTS_FIELD_DESC); + { + oprot.writeListBegin(new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRING, struct.optionalSegments.size())); + for (java.lang.String _iter7 : struct.optionalSegments) + { + oprot.writeString(_iter7); + } + oprot.writeListEnd(); + } + oprot.writeFieldEnd(); + } + } oprot.writeFieldStop(); oprot.writeStructEnd(); } @@ -791,13 +922,16 @@ public class InstanceRequest implements org.apache.thrift.TBase<InstanceRequest, if (struct.isSetBrokerId()) { optionals.set(2); } - oprot.writeBitSet(optionals, 3); + if (struct.isSetOptionalSegments()) { + optionals.set(3); + } + oprot.writeBitSet(optionals, 4); if (struct.isSetSearchSegments()) { { oprot.writeI32(struct.searchSegments.size()); - for (java.lang.String _iter4 : struct.searchSegments) + for (java.lang.String _iter8 : struct.searchSegments) { - oprot.writeString(_iter4); + oprot.writeString(_iter8); } } } @@ -807,6 +941,15 @@ public class InstanceRequest implements org.apache.thrift.TBase<InstanceRequest, if (struct.isSetBrokerId()) { oprot.writeString(struct.brokerId); } + if (struct.isSetOptionalSegments()) { + { + oprot.writeI32(struct.optionalSegments.size()); + for (java.lang.String _iter9 : struct.optionalSegments) + { + oprot.writeString(_iter9); + } + } + } } @Override @@ -817,16 +960,16 @@ public class InstanceRequest implements org.apache.thrift.TBase<InstanceRequest, struct.query = new BrokerRequest(); struct.query.read(iprot); struct.setQueryIsSet(true); - java.util.BitSet incoming = iprot.readBitSet(3); + java.util.BitSet incoming = iprot.readBitSet(4); if (incoming.get(0)) { { - org.apache.thrift.protocol.TList _list5 = iprot.readListBegin(org.apache.thrift.protocol.TType.STRING); - struct.searchSegments = new java.util.ArrayList<java.lang.String>(_list5.size); - @org.apache.thrift.annotation.Nullable java.lang.String _elem6; - for (int _i7 = 0; _i7 < _list5.size; ++_i7) + org.apache.thrift.protocol.TList _list10 = iprot.readListBegin(org.apache.thrift.protocol.TType.STRING); + struct.searchSegments = new java.util.ArrayList<java.lang.String>(_list10.size); + @org.apache.thrift.annotation.Nullable java.lang.String _elem11; + for (int _i12 = 0; _i12 < _list10.size; ++_i12) { - _elem6 = iprot.readString(); - struct.searchSegments.add(_elem6); + _elem11 = iprot.readString(); + struct.searchSegments.add(_elem11); } } struct.setSearchSegmentsIsSet(true); @@ -839,6 +982,19 @@ public class InstanceRequest implements org.apache.thrift.TBase<InstanceRequest, struct.brokerId = iprot.readString(); struct.setBrokerIdIsSet(true); } + if (incoming.get(3)) { + { + org.apache.thrift.protocol.TList _list13 = iprot.readListBegin(org.apache.thrift.protocol.TType.STRING); + struct.optionalSegments = new java.util.ArrayList<java.lang.String>(_list13.size); + @org.apache.thrift.annotation.Nullable java.lang.String _elem14; + for (int _i15 = 0; _i15 < _list13.size; ++_i15) + { + _elem14 = iprot.readString(); + struct.optionalSegments.add(_elem14); + } + } + struct.setOptionalSegmentsIsSet(true); + } } } diff --git a/pinot-common/src/thrift/request.thrift b/pinot-common/src/thrift/request.thrift index 384bb66107..225836da54 100644 --- a/pinot-common/src/thrift/request.thrift +++ b/pinot-common/src/thrift/request.thrift @@ -51,4 +51,5 @@ struct InstanceRequest { 3: optional list<string> searchSegments; 4: optional bool enableTrace; 5: optional string brokerId; + 6: optional list<string> optionalSegments; } diff --git a/pinot-connectors/pinot-spark-common/src/main/scala/org/apache/pinot/connector/spark/common/reader/PinotServerDataFetcher.scala b/pinot-connectors/pinot-spark-common/src/main/scala/org/apache/pinot/connector/spark/common/reader/PinotServerDataFetcher.scala index e6d88755c5..48255233a9 100644 --- a/pinot-connectors/pinot-spark-common/src/main/scala/org/apache/pinot/connector/spark/common/reader/PinotServerDataFetcher.scala +++ b/pinot-connectors/pinot-spark-common/src/main/scala/org/apache/pinot/connector/spark/common/reader/PinotServerDataFetcher.scala @@ -18,6 +18,7 @@ */ package org.apache.pinot.connector.spark.common.reader +import org.apache.commons.lang3.tuple.Pair import org.apache.helix.model.InstanceConfig import org.apache.pinot.common.datatable.DataTable import org.apache.pinot.common.metrics.BrokerMetrics @@ -31,7 +32,7 @@ import org.apache.pinot.spi.env.PinotConfiguration import org.apache.pinot.spi.metrics.PinotMetricUtils import org.apache.pinot.sql.parsers.CalciteSqlCompiler -import java.util.{List => JList, Map => JMap} +import java.util.{Collections, List => JList, Map => JMap} import scala.collection.JavaConverters._ /** @@ -92,7 +93,7 @@ private[reader] class PinotServerDataFetcher( dataTables.filter(_.getNumberOfRows > 0) } - private def createRoutingTableForRequest(): JMap[ServerInstance, JList[String]] = { + private def createRoutingTableForRequest(): JMap[ServerInstance, Pair[JList[String], JList[String]]] = { val nullZkId: String = null val instanceConfig = new InstanceConfig(nullZkId) instanceConfig.setHostName(pinotSplit.serverAndSegments.serverHost) @@ -100,15 +101,15 @@ private[reader] class PinotServerDataFetcher( // TODO: support netty-sec val serverInstance = new ServerInstance(instanceConfig) Map( - serverInstance -> pinotSplit.serverAndSegments.segments.asJava + serverInstance -> Pair.of(pinotSplit.serverAndSegments.segments.asJava, List[String]().asJava) ).asJava } private def submitRequestToPinotServer( offlineBrokerRequest: BrokerRequest, - offlineRoutingTable: JMap[ServerInstance, JList[String]], + offlineRoutingTable: JMap[ServerInstance, Pair[JList[String], JList[String]]], realtimeBrokerRequest: BrokerRequest, - realtimeRoutingTable: JMap[ServerInstance, JList[String]]): AsyncQueryResponse = { + realtimeRoutingTable: JMap[ServerInstance, Pair[JList[String], JList[String]]]): AsyncQueryResponse = { logInfo(s"Sending request to ${pinotSplit.serverAndSegments.toString}") queryRouter.submitQuery( partitionId, diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java index 1f0e9844fe..2335a0fc33 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java @@ -311,6 +311,12 @@ public abstract class BaseTableDataManager implements TableDataManager { @Override public List<SegmentDataManager> acquireSegments(List<String> segmentNames, List<String> missingSegments) { + return acquireSegments(segmentNames, null, missingSegments); + } + + @Override + public List<SegmentDataManager> acquireSegments(List<String> segmentNames, + @Nullable List<String> optionalSegmentNames, List<String> missingSegments) { List<SegmentDataManager> segmentDataManagers = new ArrayList<>(); for (String segmentName : segmentNames) { SegmentDataManager segmentDataManager = _segmentDataManagerMap.get(segmentName); @@ -320,6 +326,15 @@ public abstract class BaseTableDataManager implements TableDataManager { missingSegments.add(segmentName); } } + if (optionalSegmentNames != null) { + for (String segmentName : optionalSegmentNames) { + SegmentDataManager segmentDataManager = _segmentDataManagerMap.get(segmentName); + // Optional segments are not counted to missing segments that are reported back in query exception. + if (segmentDataManager != null && segmentDataManager.increaseReferenceCount()) { + segmentDataManagers.add(segmentDataManager); + } + } + } return segmentDataManagers; } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java index ced6c62fb2..b8c5383bbb 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java @@ -200,10 +200,16 @@ public class ServerQueryExecutorV1Impl implements QueryExecutor { } List<String> segmentsToQuery = queryRequest.getSegmentsToQuery(); + List<String> optionalSegments = queryRequest.getOptionalSegments(); List<String> notAcquiredSegments = new ArrayList<>(); List<SegmentDataManager> segmentDataManagers = - tableDataManager.acquireSegments(segmentsToQuery, notAcquiredSegments); + tableDataManager.acquireSegments(segmentsToQuery, optionalSegments, notAcquiredSegments); int numSegmentsAcquired = segmentDataManagers.size(); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Processing requestId: {} with segmentsToQuery: {}, optionalSegments: {} and acquiredSegments: {}", + requestId, segmentsToQuery, optionalSegments, + segmentDataManagers.stream().map(SegmentDataManager::getSegmentName).collect(Collectors.toList())); + } List<IndexSegment> indexSegments = new ArrayList<>(numSegmentsAcquired); for (SegmentDataManager segmentDataManager : segmentDataManagers) { indexSegments.add(segmentDataManager.getSegment()); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/logger/ServerQueryLogger.java b/pinot-core/src/main/java/org/apache/pinot/core/query/logger/ServerQueryLogger.java index 2d72fc04ef..a9908a87b1 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/logger/ServerQueryLogger.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/logger/ServerQueryLogger.java @@ -82,7 +82,7 @@ public class ServerQueryLogger { getLongValue(responseMetadata, MetadataKey.NUM_ENTRIES_SCANNED_POST_FILTER.getName(), -1); addToTableMeter(tableNameWithType, ServerMeter.NUM_ENTRIES_SCANNED_POST_FILTER, numEntriesScannedPostFilter); - int numSegmentsQueried = request.getSegmentsToQuery().size(); + long numSegmentsQueried = getLongValue(responseMetadata, MetadataKey.NUM_SEGMENTS_QUERIED.getName(), -1); addToTableMeter(tableNameWithType, ServerMeter.NUM_SEGMENTS_QUERIED, numSegmentsQueried); long numSegmentsProcessed = getLongValue(responseMetadata, MetadataKey.NUM_SEGMENTS_PROCESSED.getName(), -1); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/request/ServerQueryRequest.java b/pinot-core/src/main/java/org/apache/pinot/core/query/request/ServerQueryRequest.java index e444eae880..d4ce7857a5 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/request/ServerQueryRequest.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/request/ServerQueryRequest.java @@ -51,6 +51,7 @@ public class ServerQueryRequest { private final boolean _enableTrace; private final boolean _enableStreaming; private final List<String> _segmentsToQuery; + private final List<String> _optionalSegments; private final QueryContext _queryContext; // Request id might not be unique across brokers or for request hitting a hybrid table. To solve that we may construct @@ -71,6 +72,7 @@ public class ServerQueryRequest { _enableTrace = instanceRequest.isEnableTrace(); _enableStreaming = enableStreaming; _segmentsToQuery = instanceRequest.getSearchSegments(); + _optionalSegments = instanceRequest.getOptionalSegments(); _queryContext = getQueryContext(instanceRequest.getQuery().getPinotQuery()); _queryId = QueryIdUtils.getQueryId(_brokerId, _requestId, TableNameBuilder.getTableTypeFromTableName(_queryContext.getTableName())); @@ -88,6 +90,8 @@ public class ServerQueryRequest { _enableStreaming = Boolean.parseBoolean(metadata.get(Request.MetadataKeys.ENABLE_STREAMING)); _segmentsToQuery = serverRequest.getSegmentsList(); + // TODO: support optional segments for GrpcQueryServer + _optionalSegments = null; BrokerRequest brokerRequest; String payloadType = metadata.getOrDefault(Request.MetadataKeys.PAYLOAD_TYPE, Request.PayloadType.SQL); @@ -139,6 +143,10 @@ public class ServerQueryRequest { return _segmentsToQuery; } + public List<String> getOptionalSegments() { + return _optionalSegments; + } + public QueryContext getQueryContext() { return _queryContext; } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/routing/RoutingTable.java b/pinot-core/src/main/java/org/apache/pinot/core/routing/RoutingTable.java index 3491dcd821..ccc6aedb81 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/routing/RoutingTable.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/routing/RoutingTable.java @@ -20,22 +20,27 @@ package org.apache.pinot.core.routing; import java.util.List; import java.util.Map; +import org.apache.commons.lang3.tuple.Pair; import org.apache.pinot.core.transport.ServerInstance; public class RoutingTable { - private final Map<ServerInstance, List<String>> _serverInstanceToSegmentsMap; + // Optional segments are those not online as in ExternalView but might have been online on servers already, e.g. + // the newly created consuming segments. Such segments were simply skipped by brokers at query routing time, but that + // had caused wrong query results, particularly for upsert tables. Instead, we should pass such segments to servers + // and let them decide how to handle them, e.g. skip them upon issues or include them for better query results. + private final Map<ServerInstance, Pair<List<String>, List<String>/*optional segments*/>> _serverInstanceToSegmentsMap; private final List<String> _unavailableSegments; private final int _numPrunedSegments; - public RoutingTable(Map<ServerInstance, List<String>> serverInstanceToSegmentsMap, List<String> unavailableSegments, - int numPrunedSegments) { + public RoutingTable(Map<ServerInstance, Pair<List<String>, List<String>>> serverInstanceToSegmentsMap, + List<String> unavailableSegments, int numPrunedSegments) { _serverInstanceToSegmentsMap = serverInstanceToSegmentsMap; _unavailableSegments = unavailableSegments; _numPrunedSegments = numPrunedSegments; } - public Map<ServerInstance, List<String>> getServerInstanceToSegmentsMap() { + public Map<ServerInstance, Pair<List<String>, List<String>>> getServerInstanceToSegmentsMap() { return _serverInstanceToSegmentsMap; } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java b/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java index e0ff080e11..086211ad5f 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java @@ -25,6 +25,8 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeoutException; import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang3.tuple.Pair; import org.apache.pinot.common.config.NettyConfig; import org.apache.pinot.common.config.TlsConfig; import org.apache.pinot.common.datatable.DataTable; @@ -85,9 +87,10 @@ public class QueryRouter { } public AsyncQueryResponse submitQuery(long requestId, String rawTableName, - @Nullable BrokerRequest offlineBrokerRequest, @Nullable Map<ServerInstance, List<String>> offlineRoutingTable, - @Nullable BrokerRequest realtimeBrokerRequest, @Nullable Map<ServerInstance, List<String>> realtimeRoutingTable, - long timeoutMs) { + @Nullable BrokerRequest offlineBrokerRequest, + @Nullable Map<ServerInstance, Pair<List<String>, List<String>>> offlineRoutingTable, + @Nullable BrokerRequest realtimeBrokerRequest, + @Nullable Map<ServerInstance, Pair<List<String>, List<String>>> realtimeRoutingTable, long timeoutMs) { assert offlineBrokerRequest != null || realtimeBrokerRequest != null; // can prefer but not require TLS until all servers guaranteed to be on TLS @@ -97,7 +100,7 @@ public class QueryRouter { Map<ServerRoutingInstance, InstanceRequest> requestMap = new HashMap<>(); if (offlineBrokerRequest != null) { assert offlineRoutingTable != null; - for (Map.Entry<ServerInstance, List<String>> entry : offlineRoutingTable.entrySet()) { + for (Map.Entry<ServerInstance, Pair<List<String>, List<String>>> entry : offlineRoutingTable.entrySet()) { ServerRoutingInstance serverRoutingInstance = entry.getKey().toServerRoutingInstance(TableType.OFFLINE, preferTls); InstanceRequest instanceRequest = getInstanceRequest(requestId, offlineBrokerRequest, entry.getValue()); @@ -106,7 +109,7 @@ public class QueryRouter { } if (realtimeBrokerRequest != null) { assert realtimeRoutingTable != null; - for (Map.Entry<ServerInstance, List<String>> entry : realtimeRoutingTable.entrySet()) { + for (Map.Entry<ServerInstance, Pair<List<String>, List<String>>> entry : realtimeRoutingTable.entrySet()) { ServerRoutingInstance serverRoutingInstance = entry.getKey().toServerRoutingInstance(TableType.REALTIME, preferTls); InstanceRequest instanceRequest = getInstanceRequest(requestId, realtimeBrokerRequest, entry.getValue()); @@ -195,7 +198,8 @@ public class QueryRouter { _asyncQueryResponseMap.remove(requestId); } - private InstanceRequest getInstanceRequest(long requestId, BrokerRequest brokerRequest, List<String> segments) { + private InstanceRequest getInstanceRequest(long requestId, BrokerRequest brokerRequest, + Pair<List<String>, List<String>> segments) { InstanceRequest instanceRequest = new InstanceRequest(); instanceRequest.setRequestId(requestId); instanceRequest.setQuery(brokerRequest); @@ -203,8 +207,14 @@ public class QueryRouter { if (queryOptions != null) { instanceRequest.setEnableTrace(Boolean.parseBoolean(queryOptions.get(CommonConstants.Broker.Request.TRACE))); } - instanceRequest.setSearchSegments(segments); + instanceRequest.setSearchSegments(segments.getLeft()); instanceRequest.setBrokerId(_brokerId); + if (CollectionUtils.isNotEmpty(segments.getRight())) { + // Don't set this field, i.e. leave it as null, if there is no optional segment at all, to be more backward + // compatible, as there are places like in multi-stage query engine where this field is not set today when + // creating the InstanceRequest. + instanceRequest.setOptionalSegments(segments.getRight()); + } return instanceRequest; } } diff --git a/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java b/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java index 2a30eefa2f..07aa0e2c08 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java @@ -23,6 +23,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import org.apache.commons.lang3.tuple.Pair; import org.apache.pinot.common.datatable.DataTable; import org.apache.pinot.common.datatable.DataTable.MetadataKey; import org.apache.pinot.common.metrics.BrokerMetrics; @@ -60,8 +61,8 @@ public class QueryRoutingTest { SERVER_INSTANCE.toServerRoutingInstance(TableType.REALTIME, ServerInstance.RoutingType.NETTY); private static final BrokerRequest BROKER_REQUEST = CalciteSqlCompiler.compileToBrokerRequest("SELECT * FROM testTable"); - private static final Map<ServerInstance, List<String>> ROUTING_TABLE = - Collections.singletonMap(SERVER_INSTANCE, Collections.emptyList()); + private static final Map<ServerInstance, Pair<List<String>, List<String>>> ROUTING_TABLE = + Collections.singletonMap(SERVER_INSTANCE, Pair.of(Collections.emptyList(), Collections.emptyList())); private QueryRouter _queryRouter; private ServerRoutingStatsManager _serverRoutingStatsManager; diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java index 5029321162..75164add69 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java @@ -30,6 +30,7 @@ import java.util.Random; import java.util.Set; import javax.annotation.Nullable; import org.apache.calcite.rel.hint.PinotHintOptions; +import org.apache.commons.lang3.tuple.Pair; import org.apache.pinot.core.routing.RoutingManager; import org.apache.pinot.core.routing.RoutingTable; import org.apache.pinot.core.routing.TablePartitionInfo; @@ -145,11 +146,12 @@ public class WorkerManager { String tableType = routingEntry.getKey(); RoutingTable routingTable = routingEntry.getValue(); // for each server instance, attach all table types and their associated segment list. - for (Map.Entry<ServerInstance, List<String>> serverEntry : routingTable.getServerInstanceToSegmentsMap() - .entrySet()) { - serverInstanceToSegmentsMap.putIfAbsent(serverEntry.getKey(), new HashMap<>()); - Map<String, List<String>> tableTypeToSegmentListMap = serverInstanceToSegmentsMap.get(serverEntry.getKey()); - Preconditions.checkState(tableTypeToSegmentListMap.put(tableType, serverEntry.getValue()) == null, + Map<ServerInstance, Pair<List<String>, List<String>>> segmentsMap = routingTable.getServerInstanceToSegmentsMap(); + for (Map.Entry<ServerInstance, Pair<List<String>, List<String>>> serverEntry : segmentsMap.entrySet()) { + Map<String, List<String>> tableTypeToSegmentListMap = + serverInstanceToSegmentsMap.computeIfAbsent(serverEntry.getKey(), k -> new HashMap<>()); + // TODO: support optional segments for multi-stage engine. + Preconditions.checkState(tableTypeToSegmentListMap.put(tableType, serverEntry.getValue().getLeft()) == null, "Entry for server {} and table type: {} already exist!", serverEntry.getKey(), tableType); } diff --git a/pinot-query-planner/src/test/java/org/apache/pinot/query/testutils/MockRoutingManagerFactory.java b/pinot-query-planner/src/test/java/org/apache/pinot/query/testutils/MockRoutingManagerFactory.java index 41bf57700c..4539627a23 100644 --- a/pinot-query-planner/src/test/java/org/apache/pinot/query/testutils/MockRoutingManagerFactory.java +++ b/pinot-query-planner/src/test/java/org/apache/pinot/query/testutils/MockRoutingManagerFactory.java @@ -27,6 +27,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; import javax.annotation.Nullable; +import org.apache.commons.lang3.tuple.Pair; import org.apache.helix.model.InstanceConfig; import org.apache.helix.zookeeper.datamodel.ZNRecord; import org.apache.pinot.common.config.provider.TableCache; @@ -56,7 +57,7 @@ public class MockRoutingManagerFactory { private final Map<String, Schema> _schemaMap; private final Set<String> _hybridTables; private final Map<String, ServerInstance> _serverInstances; - private final Map<String, Map<ServerInstance, List<String>>> _tableServerSegmentsMap; + private final Map<String, Map<ServerInstance, Pair<List<String>, List<String>>>> _tableServerSegmentsMap; public MockRoutingManagerFactory(int... ports) { _tableNameMap = new HashMap<>(); @@ -87,16 +88,15 @@ public class MockRoutingManagerFactory { public void registerSegment(int insertToServerPort, String tableNameWithType, String segmentName) { ServerInstance serverInstance = _serverInstances.get(toHostname(insertToServerPort)); _tableServerSegmentsMap.computeIfAbsent(tableNameWithType, k -> new HashMap<>()) - .computeIfAbsent(serverInstance, k -> new ArrayList<>()).add(segmentName); + .computeIfAbsent(serverInstance, k -> Pair.of(new ArrayList<>(), null)).getLeft().add(segmentName); } public RoutingManager buildRoutingManager(@Nullable Map<String, TablePartitionInfo> partitionInfoMap) { Map<String, RoutingTable> routingTableMap = new HashMap<>(); - for (Map.Entry<String, Map<ServerInstance, List<String>>> tableEntry : _tableServerSegmentsMap.entrySet()) { - String tableNameWithType = tableEntry.getKey(); - RoutingTable fakeRoutingTable = new RoutingTable(tableEntry.getValue(), Collections.emptyList(), 0); + _tableServerSegmentsMap.forEach((tableNameWithType, serverSegmentsMap) -> { + RoutingTable fakeRoutingTable = new RoutingTable(serverSegmentsMap, Collections.emptyList(), 0); routingTableMap.put(tableNameWithType, fakeRoutingTable); - } + }); return new FakeRoutingManager(routingTableMap, _hybridTables, partitionInfoMap, _serverInstances); } diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/testutils/MockInstanceDataManagerFactory.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/testutils/MockInstanceDataManagerFactory.java index c3a34cbd20..6bd3d617b3 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/testutils/MockInstanceDataManagerFactory.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/testutils/MockInstanceDataManagerFactory.java @@ -46,6 +46,7 @@ import org.apache.pinot.spi.utils.builder.TableConfigBuilder; import org.apache.pinot.spi.utils.builder.TableNameBuilder; import static org.mockito.ArgumentMatchers.anyList; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -150,7 +151,8 @@ public class MockInstanceDataManagerFactory { Map<String, SegmentDataManager> segmentDataManagerMap = segmentList.stream().collect(Collectors.toMap(IndexSegment::getSegmentName, ImmutableSegmentDataManager::new)); TableDataManager tableDataManager = mock(TableDataManager.class); - when(tableDataManager.acquireSegments(anyList(), anyList())).thenAnswer(invocation -> { + // TODO: support optional segments for multi-stage engine, but for now, it's always null. + when(tableDataManager.acquireSegments(anyList(), eq(null), anyList())).thenAnswer(invocation -> { List<String> segments = invocation.getArgument(0); return segments.stream().map(segmentDataManagerMap::get).collect(Collectors.toList()); }); diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java index 2de9c586c0..0b820f50c2 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java @@ -170,6 +170,11 @@ public interface TableDataManager { */ List<SegmentDataManager> acquireSegments(List<String> segmentNames, List<String> missingSegments); + default List<SegmentDataManager> acquireSegments(List<String> segmentNames, + @Nullable List<String> optionalSegmentNames, List<String> missingSegments) { + return acquireSegments(segmentNames, missingSegments); + } + /** * Acquires the segments with the given segment name. * <p>It is the caller's responsibility to return the segments by calling {@link #releaseSegment(SegmentDataManager)}. --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org