egalpin commented on code in PR #13742:
URL: https://github.com/apache/pinot/pull/13742#discussion_r1742753156


##########
pinot-core/src/main/java/org/apache/pinot/core/transport/AsyncQueryResponse.java:
##########
@@ -50,21 +52,31 @@ public class AsyncQueryResponse implements QueryResponse {
   private volatile ServerRoutingInstance _failedServer;
   private volatile Exception _exception;
 
-  public AsyncQueryResponse(QueryRouter queryRouter, long requestId, 
Set<ServerRoutingInstance> serversQueried,
-      long startTimeMs, long timeoutMs, ServerRoutingStatsManager 
serverRoutingStatsManager) {
+  public AsyncQueryResponse(QueryRouter queryRouter, long requestId,
+//      Set<ServerQueryRoutingContext> serverQueryRoutingContexts,
+      Map<ServerRoutingInstance, List<InstanceRequest>> requestMap, long 
startTimeMs, long timeoutMs,
+      ServerRoutingStatsManager serverRoutingStatsManager) {
     _queryRouter = queryRouter;
     _requestId = requestId;
-    int numServersQueried = serversQueried.size();
-    _responseMap = new 
ConcurrentHashMap<>(HashUtil.getHashMapCapacity(numServersQueried));
+    _responses = new ConcurrentHashMap<>();
     _serverRoutingStatsManager = serverRoutingStatsManager;
-    for (ServerRoutingInstance serverRoutingInstance : serversQueried) {
-      // Record stats related to query submission just before sending the 
request. Otherwise, if the response is
-      // received immediately, there's a possibility of updating query 
response stats before updating query
-      // submission stats.
-      _serverRoutingStatsManager.recordStatsForQuerySubmission(requestId, 
serverRoutingInstance.getInstanceId());
-      _responseMap.put(serverRoutingInstance, new ServerResponse(startTimeMs));
+    int numQueriesIssued = 0;
+    for (Map.Entry<ServerRoutingInstance, List<InstanceRequest>> 
serverRequests : requestMap.entrySet()) {
+      for (InstanceRequest request : serverRequests.getValue()) {
+        // Record stats related to query submission just before sending the 
request. Otherwise, if the response is
+        // received immediately, there's a possibility of updating query 
response stats before updating query
+        // submission stats.
+        _serverRoutingStatsManager.recordStatsForQuerySubmission(requestId, 
serverRequests.getKey().getInstanceId());
+
+        _responses.computeIfAbsent(serverRequests.getKey(), k -> new 
ConcurrentHashMap<>())
+            // we use query hash so that the same hash ID can be passed back 
from servers more easily than trying to
+            // instantiate a valid InstanceRequest obj and send its hash
+            .put(request.getQuery().getPinotQuery().hashCode(), new 
ServerResponse(startTimeMs));

Review Comment:
   Right, that's my understanding as well.  I _think_ that using physical table 
name (i.e. with type suffix) should be unique per-server-per-request-id.  My 
understanding is that for a given query issued to a broker, each server should 
only fetch 0 or 1 data table for a given physical table.  I'm under the 
impression that there is no legitimate case where the same server for the same 
request_id would receive more than 1 query for the same physical table.
   
   Is that accurate?  If so, I believe that using the physical table name as 
the hash key will be highly preferable over query hash.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to