Jackie-Jiang commented on code in PR #11978: URL: https://github.com/apache/pinot/pull/11978#discussion_r1397913935
########## pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotBrokerDebug.java: ########## @@ -111,9 +112,9 @@ public TimeBoundaryInfo getTimeBoundary( @ApiResponse(code = 404, message = "Routing not found"), @ApiResponse(code = 500, message = "Internal server error") }) - public Map<String, Map<ServerInstance, List<String>>> getRoutingTable( + public Map<String, Map<ServerInstance, Pair<List<String>, List<String>>>> getRoutingTable( Review Comment: Let's not change these APIs. I believe they are used by Trino to get the routing tables, and this change will break them. We may add a TODO and introduce new APIs for the optional segments ########## pinot-core/src/main/java/org/apache/pinot/core/routing/RoutingTable.java: ########## @@ -18,24 +18,46 @@ */ package org.apache.pinot.core.routing; +import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; +import org.apache.commons.lang3.tuple.Pair; import org.apache.pinot.core.transport.ServerInstance; public class RoutingTable { - private final Map<ServerInstance, List<String>> _serverInstanceToSegmentsMap; + // Optional segments are those not online as in ExternalView but might have been online on servers already, e.g. + // the newly created consuming segments. Such segments were simply skipped by brokers at query routing time, but that + // had caused wrong query results, particularly for upsert tables. Instead, we should pass such segments to servers + // and let them decide how to handle them, e.g. skip them upon issues or include them for better query results. + private final Map<ServerInstance, Pair<List<String>/*non-optional segments*/, List<String>/*optional segments*/>> + _serverInstanceToSegmentsMap; private final List<String> _unavailableSegments; private final int _numPrunedSegments; public RoutingTable(Map<ServerInstance, List<String>> serverInstanceToSegmentsMap, List<String> unavailableSegments, int numPrunedSegments) { - _serverInstanceToSegmentsMap = serverInstanceToSegmentsMap; + this(serverInstanceToSegmentsMap, Collections.emptyMap(), unavailableSegments, numPrunedSegments); + } + + public RoutingTable(Map<ServerInstance, List<String>> serverInstanceToSegmentsMap, Review Comment: Suggest keeping this special handling at the caller side. `RoutingTable` should be a thin wrapper class without any complex logic ########## pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java: ########## @@ -610,7 +610,16 @@ public RoutingTable getRoutingTable(BrokerRequest brokerRequest, long requestId) return null; } InstanceSelector.SelectionResult selectionResult = routingEntry.calculateRouting(brokerRequest, requestId); - Map<String, String> segmentToInstanceMap = selectionResult.getSegmentToInstanceMap(); + Map<ServerInstance, List<String>> serverInstanceToSegmentsMap = + getServerInstanceToSegmentsMap(tableNameWithType, selectionResult.getSegmentToInstanceMap()); + Map<ServerInstance, List<String>> serverInstanceToOptionalSegmentsMap = + getServerInstanceToSegmentsMap(tableNameWithType, selectionResult.getOptionalSegmentToInstanceMap()); Review Comment: Merge it into one map here, and directly set it into the `RoutingTable` ########## pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java: ########## @@ -113,7 +115,7 @@ protected BrokerResponseNative processBrokerRequest(long requestId, BrokerReques String rawTableName = TableNameBuilder.extractRawTableName(serverBrokerRequest.getQuerySource().getTableName()); long scatterGatherStartTimeNs = System.nanoTime(); AsyncQueryResponse asyncQueryResponse = - _queryRouter.submitQuery(requestId, rawTableName, offlineBrokerRequest, offlineRoutingTable, + _queryRouter.submitQueryWithOptionalSegments(requestId, rawTableName, offlineBrokerRequest, offlineRoutingTable, Review Comment: Suggest directly modifying the `submitQuery()` without introducing a new method. QueryRouting is specifically built for this request handler ########## pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java: ########## @@ -430,8 +436,10 @@ public Set<String> getServingInstances() { /** * Selects the server instances for the given segments based on the request id and segment states. Returns a map - * from segment to selected server instance hosting the segment. + * from segment to selected server instance hosting the segment. The optional segments are used to get the new + * segments that is not online yet. Instead of simply skipping them by broker at routing time, we can let servers + * decide how to handle them. */ abstract Map<String, String> select(List<String> segments, int requestId, SegmentStates segmentStates, Review Comment: Return a `Pair` instead of passing in a `Map` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org