Jackie-Jiang commented on code in PR #11978:
URL: https://github.com/apache/pinot/pull/11978#discussion_r1397913935


##########
pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotBrokerDebug.java:
##########
@@ -111,9 +112,9 @@ public TimeBoundaryInfo getTimeBoundary(
       @ApiResponse(code = 404, message = "Routing not found"),
       @ApiResponse(code = 500, message = "Internal server error")
   })
-  public Map<String, Map<ServerInstance, List<String>>> getRoutingTable(
+  public Map<String, Map<ServerInstance, Pair<List<String>, List<String>>>> 
getRoutingTable(

Review Comment:
   Let's not change these APIs. I believe they are used by Trino to get the 
routing tables, and this change will break them. We may add a TODO and 
introduce new APIs for the optional segments



##########
pinot-core/src/main/java/org/apache/pinot/core/routing/RoutingTable.java:
##########
@@ -18,24 +18,46 @@
  */
 package org.apache.pinot.core.routing;
 
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pinot.core.transport.ServerInstance;
 
 
 public class RoutingTable {
-  private final Map<ServerInstance, List<String>> _serverInstanceToSegmentsMap;
+  // Optional segments are those not online as in ExternalView but might have 
been online on servers already, e.g.
+  // the newly created consuming segments. Such segments were simply skipped 
by brokers at query routing time, but that
+  // had caused wrong query results, particularly for upsert tables. Instead, 
we should pass such segments to servers
+  // and let them decide how to handle them, e.g. skip them upon issues or 
include them for better query results.
+  private final Map<ServerInstance, Pair<List<String>/*non-optional 
segments*/, List<String>/*optional segments*/>>
+      _serverInstanceToSegmentsMap;
   private final List<String> _unavailableSegments;
   private final int _numPrunedSegments;
 
   public RoutingTable(Map<ServerInstance, List<String>> 
serverInstanceToSegmentsMap, List<String> unavailableSegments,
       int numPrunedSegments) {
-    _serverInstanceToSegmentsMap = serverInstanceToSegmentsMap;
+    this(serverInstanceToSegmentsMap, Collections.emptyMap(), 
unavailableSegments, numPrunedSegments);
+  }
+
+  public RoutingTable(Map<ServerInstance, List<String>> 
serverInstanceToSegmentsMap,

Review Comment:
   Suggest keeping this special handling at the caller side. `RoutingTable` 
should be a thin wrapper class without any complex logic



##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java:
##########
@@ -610,7 +610,16 @@ public RoutingTable getRoutingTable(BrokerRequest 
brokerRequest, long requestId)
       return null;
     }
     InstanceSelector.SelectionResult selectionResult = 
routingEntry.calculateRouting(brokerRequest, requestId);
-    Map<String, String> segmentToInstanceMap = 
selectionResult.getSegmentToInstanceMap();
+    Map<ServerInstance, List<String>> serverInstanceToSegmentsMap =
+        getServerInstanceToSegmentsMap(tableNameWithType, 
selectionResult.getSegmentToInstanceMap());
+    Map<ServerInstance, List<String>> serverInstanceToOptionalSegmentsMap =
+        getServerInstanceToSegmentsMap(tableNameWithType, 
selectionResult.getOptionalSegmentToInstanceMap());

Review Comment:
   Merge it into one map here, and directly set it into the `RoutingTable`



##########
pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java:
##########
@@ -113,7 +115,7 @@ protected BrokerResponseNative processBrokerRequest(long 
requestId, BrokerReques
     String rawTableName = 
TableNameBuilder.extractRawTableName(serverBrokerRequest.getQuerySource().getTableName());
     long scatterGatherStartTimeNs = System.nanoTime();
     AsyncQueryResponse asyncQueryResponse =
-        _queryRouter.submitQuery(requestId, rawTableName, 
offlineBrokerRequest, offlineRoutingTable,
+        _queryRouter.submitQueryWithOptionalSegments(requestId, rawTableName, 
offlineBrokerRequest, offlineRoutingTable,

Review Comment:
   Suggest directly modifying the `submitQuery()` without introducing a new 
method. QueryRouting is specifically built for this request handler



##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java:
##########
@@ -430,8 +436,10 @@ public Set<String> getServingInstances() {
 
   /**
    * Selects the server instances for the given segments based on the request 
id and segment states. Returns a map
-   * from segment to selected server instance hosting the segment.
+   * from segment to selected server instance hosting the segment. The 
optional segments are used to get the new
+   * segments that is not online yet. Instead of simply skipping them by 
broker at routing time, we can let servers
+   * decide how to handle them.
    */
   abstract Map<String, String> select(List<String> segments, int requestId, 
SegmentStates segmentStates,

Review Comment:
   Return a `Pair` instead of passing in a `Map`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to