ankitsultana commented on code in PR #10791:
URL: https://github.com/apache/pinot/pull/10791#discussion_r1204677238


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/serde/QueryPlanSerDeUtils.java:
##########
@@ -51,12 +53,23 @@ public static DistributedStagePlan 
deserialize(Worker.StagePlan stagePlan) {
     return distributedStagePlan;
   }
 
-  public static Worker.StagePlan serialize(DistributedStagePlan 
distributedStagePlan) {
+  public static List<DistributedStagePlan> deserialize(Worker.QueryRequest 
request) {
+    List<DistributedStagePlan> distributedStagePlans = new ArrayList<>();
+    for (Worker.StagePlan stagePlan : request.getStagePlanList()) {
+      distributedStagePlans.add(deserialize(stagePlan));
+    }
+    return distributedStagePlans;
+  }
+
+  public static Worker.StagePlan serialize(DispatchableSubPlan 
dispatchableSubPlan, int stageId,
+      VirtualServerAddress serverAddress) {

Review Comment:
   Left a comment in the other part of the code where the serialize is being 
called (num-workers * num-stages) times per query.



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java:
##########
@@ -139,22 +140,24 @@ int submit(long requestId, DispatchableSubPlan 
dispatchableSubPlan, long timeout
         for (Map.Entry<QueryServerInstance, List<Integer>> queryServerEntry
             : 
dispatchableSubPlan.getQueryStageList().get(stageId).getServerInstanceToWorkerIdMap().entrySet())
 {
           QueryServerInstance queryServerInstance = queryServerEntry.getKey();
+          Worker.QueryRequest.Builder queryRequestBuilder = 
Worker.QueryRequest.newBuilder();
+          String host = queryServerInstance.getHostname();
+          int servicePort = queryServerInstance.getQueryServicePort();
+          int mailboxPort = queryServerInstance.getQueryMailboxPort();
           for (int workerId : queryServerEntry.getValue()) {
-            String host = queryServerInstance.getHostname();
-            int servicePort = queryServerInstance.getQueryServicePort();
-            int mailboxPort = queryServerInstance.getQueryMailboxPort();
             VirtualServerAddress virtualServerAddress = new 
VirtualServerAddress(host, mailboxPort, workerId);
-            DispatchClient client = getOrCreateDispatchClient(host, 
servicePort);
             dispatchCalls++;
-            int finalStageId = stageId;
-            _executorService.submit(() -> 
client.submit(Worker.QueryRequest.newBuilder().setStagePlan(
-                        QueryPlanSerDeUtils.serialize(
-                            constructDistributedStagePlan(dispatchableSubPlan, 
finalStageId, virtualServerAddress)))
-                    .putMetadata(QueryConfig.KEY_OF_BROKER_REQUEST_ID, 
String.valueOf(requestId))
-                    .putMetadata(QueryConfig.KEY_OF_BROKER_REQUEST_TIMEOUT_MS, 
String.valueOf(timeoutMs))
-                    .putAllMetadata(queryOptions).build(), finalStageId, 
queryServerInstance, deadline,
-                dispatchCallbacks::offer));
+            queryRequestBuilder.addStagePlan(
+                QueryPlanSerDeUtils.serialize(dispatchableSubPlan, stageId, 
virtualServerAddress));

Review Comment:
   This would serialize the entire sub plan for each worker and stageId 
combination. Big use-cases would usually have 256 partitions (i.e. workers) and 
~10 or so stages.
   
   So this means we'll call this ~2000 times. For low qps use-cases this should 
be fine but relatively higher use-cases might start getting bottlenecked.
   
   @walterddr : I remember we had discussed this a couple of months ago. Was 
wondering if we are planning to fix this anytime soon. cc: @xiangfu0 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to