egalpin commented on code in PR #13742: URL: https://github.com/apache/pinot/pull/13742#discussion_r1720064027
########## pinot-core/src/main/java/org/apache/pinot/core/transport/AsyncQueryResponse.java: ########## @@ -50,21 +52,31 @@ public class AsyncQueryResponse implements QueryResponse { private volatile ServerRoutingInstance _failedServer; private volatile Exception _exception; - public AsyncQueryResponse(QueryRouter queryRouter, long requestId, Set<ServerRoutingInstance> serversQueried, - long startTimeMs, long timeoutMs, ServerRoutingStatsManager serverRoutingStatsManager) { + public AsyncQueryResponse(QueryRouter queryRouter, long requestId, +// Set<ServerQueryRoutingContext> serverQueryRoutingContexts, + Map<ServerRoutingInstance, List<InstanceRequest>> requestMap, long startTimeMs, long timeoutMs, + ServerRoutingStatsManager serverRoutingStatsManager) { _queryRouter = queryRouter; _requestId = requestId; - int numServersQueried = serversQueried.size(); - _responseMap = new ConcurrentHashMap<>(HashUtil.getHashMapCapacity(numServersQueried)); + _responses = new ConcurrentHashMap<>(); _serverRoutingStatsManager = serverRoutingStatsManager; - for (ServerRoutingInstance serverRoutingInstance : serversQueried) { - // Record stats related to query submission just before sending the request. Otherwise, if the response is - // received immediately, there's a possibility of updating query response stats before updating query - // submission stats. - _serverRoutingStatsManager.recordStatsForQuerySubmission(requestId, serverRoutingInstance.getInstanceId()); - _responseMap.put(serverRoutingInstance, new ServerResponse(startTimeMs)); + int numQueriesIssued = 0; + for (Map.Entry<ServerRoutingInstance, List<InstanceRequest>> serverRequests : requestMap.entrySet()) { + for (InstanceRequest request : serverRequests.getValue()) { + // Record stats related to query submission just before sending the request. Otherwise, if the response is + // received immediately, there's a possibility of updating query response stats before updating query + // submission stats. + _serverRoutingStatsManager.recordStatsForQuerySubmission(requestId, serverRequests.getKey().getInstanceId()); + + _responses.computeIfAbsent(serverRequests.getKey(), k -> new ConcurrentHashMap<>()) + // we use query hash so that the same hash ID can be passed back from servers more easily than trying to + // instantiate a valid InstanceRequest obj and send its hash + .put(request.getQuery().getPinotQuery().hashCode(), new ServerResponse(startTimeMs)); Review Comment: @jackjlli what are your thoughts on using the table name instead of the query hash as the key for the nested map here? The advantages would be that it's more "human understandable" as compared to an integer value. I believe that there should be no legitimate case where a single server needs to be sent multiple queries to the same _physical_ table name (i.e. with type), so table name should uniquely identify a query. I also think it would make backward compatibility easier to support. Thoughts? Any disadvantages you could foresee using table name as the key? How about multi-stage enging considerations? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org