Jackie-Jiang commented on code in PR #11978:
URL: https://github.com/apache/pinot/pull/11978#discussion_r1401383312


##########
pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotBrokerDebug.java:
##########
@@ -114,28 +116,55 @@ public TimeBoundaryInfo getTimeBoundary(
   public Map<String, Map<ServerInstance, List<String>>> getRoutingTable(
       @ApiParam(value = "Name of the table") @PathParam("tableName") String 
tableName) {
     Map<String, Map<ServerInstance, List<String>>> result = new TreeMap<>();
+    getRoutingTable(tableName, (tableNameWithType, routingTable) -> 
result.put(tableNameWithType,
+        routingTable.getServerInstanceToSegmentsMap(false)));
+    if (!result.isEmpty()) {
+      return result;
+    } else {
+      throw new WebApplicationException("Cannot find routing for table: " + 
tableName, Response.Status.NOT_FOUND);
+    }
+  }
+
+  @GET
+  @Produces(MediaType.APPLICATION_JSON)
+  @Path("/v2/debug/routingTable/{tableName}")

Review Comment:
   Not a big fan of `v2`
   ```suggestion
     @Path("/debug/routingTableWithOptionalSegments/{tableName}")
   ```



##########
pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotBrokerDebug.java:
##########
@@ -152,7 +181,40 @@ public Map<ServerInstance, List<String>> 
getRoutingTableForQuery(
       @ApiParam(value = "SQL query (table name should have type suffix)") 
@QueryParam("query") String query,
       @Context HttpHeaders httpHeaders) {
     BrokerRequest brokerRequest = 
CalciteSqlCompiler.compileToBrokerRequest(query);
+    checkAccessControl(brokerRequest, httpHeaders);
+    RoutingTable routingTable = _routingManager.getRoutingTable(brokerRequest, 
getRequestId());
+    if (routingTable != null) {
+      return routingTable.getServerInstanceToSegmentsMap(false);
+    } else {
+      throw new WebApplicationException("Cannot find routing for query: " + 
query, Response.Status.NOT_FOUND);
+    }
+  }
+
+  @GET
+  @Produces(MediaType.APPLICATION_JSON)
+  @Path("/v2/debug/routingTable/sql")

Review Comment:
   Same here
   ```suggestion
     @Path("/debug/routingTableWithOptionalSegments/sql")
   ```



##########
pinot-core/src/main/java/org/apache/pinot/core/routing/RoutingTable.java:
##########
@@ -18,27 +18,39 @@
  */
 package org.apache.pinot.core.routing;
 
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pinot.core.transport.ServerInstance;
 
 
 public class RoutingTable {
-  private final Map<ServerInstance, List<String>> _serverInstanceToSegmentsMap;
+  // Optional segments are those not online as in ExternalView but might have 
been online on servers already, e.g.
+  // the newly created consuming segments. Such segments were simply skipped 
by brokers at query routing time, but that
+  // had caused wrong query results, particularly for upsert tables. Instead, 
we should pass such segments to servers
+  // and let them decide how to handle them, e.g. skip them upon issues or 
include them for better query results.
+  private final Map<ServerInstance, Pair<List<String>, List<String>/*optional 
segments*/>> _serverInstanceToSegmentsMap;
   private final List<String> _unavailableSegments;
   private final int _numPrunedSegments;
 
-  public RoutingTable(Map<ServerInstance, List<String>> 
serverInstanceToSegmentsMap, List<String> unavailableSegments,
-      int numPrunedSegments) {
+  public RoutingTable(Map<ServerInstance, Pair<List<String>, List<String>>> 
serverInstanceToSegmentsMap,
+      List<String> unavailableSegments, int numPrunedSegments) {
     _serverInstanceToSegmentsMap = serverInstanceToSegmentsMap;
     _unavailableSegments = unavailableSegments;
     _numPrunedSegments = numPrunedSegments;
   }
 
-  public Map<ServerInstance, List<String>> getServerInstanceToSegmentsMap() {
+  public Map<ServerInstance, Pair<List<String>, List<String>>> 
getServerInstanceToSegmentsMap() {
     return _serverInstanceToSegmentsMap;
   }
 
+  public Map<ServerInstance, List<String>> 
getServerInstanceToSegmentsMap(boolean optionalSegments) {

Review Comment:
   Let's not handle this within `RoutingTable`. We can handle it in 
`PinotBrokerDebug`



##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BalancedInstanceSelector.java:
##########
@@ -54,9 +55,10 @@ public BalancedInstanceSelector(String tableNameWithType, 
ZkHelixPropertyStore<Z
   }
 
   @Override
-  Map<String, String> select(List<String> segments, int requestId, 
SegmentStates segmentStates,
-      Map<String, String> queryOptions) {
+  Pair<Map<String, String>, Map<String, String>> select(List<String> segments, 
int requestId,
+      SegmentStates segmentStates, Map<String, String> queryOptions) {
     Map<String, String> segmentToSelectedInstanceMap = new 
HashMap<>(HashUtil.getHashMapCapacity(segments.size()));
+    Map<String, String> optionalSegmentToInstanceMap = new 
HashMap<>(HashUtil.getHashMapCapacity(segments.size()));

Review Comment:
   We don't need to initialize a big map here because most segments shouldn't 
be optional. Default capacity should be fine. Same for other selectors



##########
pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java:
##########
@@ -195,16 +197,18 @@ void markQueryDone(long requestId) {
     _asyncQueryResponseMap.remove(requestId);
   }
 
-  private InstanceRequest getInstanceRequest(long requestId, BrokerRequest 
brokerRequest, List<String> segments) {
+  private InstanceRequest getInstanceRequest(long requestId, BrokerRequest 
brokerRequest,
+      Pair<List<String>, List<String>> segments) {
     InstanceRequest instanceRequest = new InstanceRequest();
     instanceRequest.setRequestId(requestId);
     instanceRequest.setQuery(brokerRequest);
     Map<String, String> queryOptions = 
brokerRequest.getPinotQuery().getQueryOptions();
     if (queryOptions != null) {
       
instanceRequest.setEnableTrace(Boolean.parseBoolean(queryOptions.get(CommonConstants.Broker.Request.TRACE)));
     }
-    instanceRequest.setSearchSegments(segments);
+    instanceRequest.setSearchSegments(segments.getLeft());
     instanceRequest.setBrokerId(_brokerId);
+    instanceRequest.setOptionalSegments(segments.getRight());

Review Comment:
   What is the contract here when there is no optional segments? Should it be 
an empty list or `null`? Let's fix the contract and make sure it is properly 
applied



##########
pinot-core/src/main/java/org/apache/pinot/core/routing/RoutingTable.java:
##########
@@ -18,27 +18,39 @@
  */
 package org.apache.pinot.core.routing;
 
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pinot.core.transport.ServerInstance;
 
 
 public class RoutingTable {
-  private final Map<ServerInstance, List<String>> _serverInstanceToSegmentsMap;
+  // Optional segments are those not online as in ExternalView but might have 
been online on servers already, e.g.
+  // the newly created consuming segments. Such segments were simply skipped 
by brokers at query routing time, but that
+  // had caused wrong query results, particularly for upsert tables. Instead, 
we should pass such segments to servers
+  // and let them decide how to handle them, e.g. skip them upon issues or 
include them for better query results.
+  private final Map<ServerInstance, Pair<List<String>, List<String>/*optional 
segments*/>> _serverInstanceToSegmentsMap;
   private final List<String> _unavailableSegments;
   private final int _numPrunedSegments;
 
-  public RoutingTable(Map<ServerInstance, List<String>> 
serverInstanceToSegmentsMap, List<String> unavailableSegments,
-      int numPrunedSegments) {
+  public RoutingTable(Map<ServerInstance, Pair<List<String>, List<String>>> 
serverInstanceToSegmentsMap,

Review Comment:
   By reading the code, seems the optional segments are either non empty list 
or `null`. Let's document this contract.
   Does the current code handle the case of empty list properly?



##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java:
##########
@@ -610,7 +611,16 @@ public RoutingTable getRoutingTable(BrokerRequest 
brokerRequest, long requestId)
       return null;
     }
     InstanceSelector.SelectionResult selectionResult = 
routingEntry.calculateRouting(brokerRequest, requestId);
-    Map<String, String> segmentToInstanceMap = 
selectionResult.getSegmentToInstanceMap();
+    Map<ServerInstance, List<String>> serverInstanceToSegmentsMap =
+        getServerInstanceToSegmentsMap(tableNameWithType, 
selectionResult.getSegmentToInstanceMap());
+    Map<ServerInstance, List<String>> serverInstanceToOptionalSegmentsMap =
+        getServerInstanceToSegmentsMap(tableNameWithType, 
selectionResult.getOptionalSegmentToInstanceMap());
+    return new RoutingTable(merge(serverInstanceToSegmentsMap, 
serverInstanceToOptionalSegmentsMap),
+        selectionResult.getUnavailableSegments(), 
selectionResult.getNumPrunedSegments());

Review Comment:
   Can we merge these 3 methods into one (i.e. process mandatory and optional 
together)?
   IMO that has 2 benefits:
   - Logic more clear
   - Reduce overhead, only need to maintain one map instead of 3



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to