walterddr commented on code in PR #10791:
URL: https://github.com/apache/pinot/pull/10791#discussion_r1206984009


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryServer.java:
##########
@@ -83,32 +93,30 @@ public void shutdown() {
   @Override
   public void submit(Worker.QueryRequest request, 
StreamObserver<Worker.QueryResponse> responseObserver) {
     // Deserialize the request
-    DistributedStagePlan distributedStagePlan;
+    List<DistributedStagePlan> distributedStagePlans;
     Map<String, String> requestMetadataMap;
-    long requestId = -1;
+    requestMetadataMap = request.getMetadataMap();
+    long requestId = 
Long.parseLong(requestMetadataMap.get(QueryConfig.KEY_OF_BROKER_REQUEST_ID));
     try {
-      distributedStagePlan = 
QueryPlanSerDeUtils.deserialize(request.getStagePlan());
-      requestMetadataMap = request.getMetadataMap();
-      requestId = 
Long.parseLong(requestMetadataMap.get(QueryConfig.KEY_OF_BROKER_REQUEST_ID));
+      distributedStagePlans = 
QueryPlanSerDeUtils.deserializeStagePlan(request);
     } catch (Exception e) {
       LOGGER.error("Caught exception while deserializing the request: {}, 
payload: {}", requestId, request, e);
       responseObserver.onError(Status.INVALID_ARGUMENT.withDescription("Bad 
request").withCause(e).asException());

Review Comment:
   on Error should be onNext(error payload) then onComplete(), broker doesn't 
handle onError at the moment



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/serde/QueryPlanSerDeUtils.java:
##########
@@ -24,44 +24,51 @@
 import java.util.Map;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
+import org.apache.commons.lang.StringUtils;
 import org.apache.pinot.common.proto.Worker;
+import org.apache.pinot.query.planner.DispatchablePlanFragment;
+import org.apache.pinot.query.planner.DispatchableSubPlan;
 import org.apache.pinot.query.planner.plannode.AbstractPlanNode;
 import org.apache.pinot.query.planner.plannode.StageNodeSerDeUtils;
 import org.apache.pinot.query.routing.MailboxMetadata;
-import org.apache.pinot.query.routing.StageMetadata;
+import org.apache.pinot.query.routing.QueryServerInstance;
 import org.apache.pinot.query.routing.VirtualServerAddress;
 import org.apache.pinot.query.routing.WorkerMetadata;
 import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
+import org.apache.pinot.query.runtime.plan.StageMetadata;
 
 
 /**
  * This utility class serialize/deserialize between {@link Worker.StagePlan} 
elements to Planner elements.
  */
 public class QueryPlanSerDeUtils {
+  private static final Pattern VIRTUAL_SERVER_PATTERN = Pattern.compile(
+      "(?<virtualid>[0-9]+)@(?<host>[^:]+):(?<port>[0-9]+)");
 
   private QueryPlanSerDeUtils() {
     // do not instantiate.
   }
 
-  public static DistributedStagePlan deserialize(Worker.StagePlan stagePlan) {
-    DistributedStagePlan distributedStagePlan = new 
DistributedStagePlan(stagePlan.getStageId());
-    
distributedStagePlan.setServer(protoToAddress(stagePlan.getVirtualAddress()));
-    
distributedStagePlan.setStageRoot(StageNodeSerDeUtils.deserializeStageNode(stagePlan.getStageRoot()));
-    
distributedStagePlan.setStageMetadata(fromProtoStageMetadata(stagePlan.getStageMetadata()));
-    return distributedStagePlan;
+  public static List<DistributedStagePlan> 
deserializeStagePlan(Worker.QueryRequest request) {

Review Comment:
   intellij refactor error
   ```suggestion
     public static List<DistributedStagePlan> deserialize(Worker.QueryRequest 
request) {
   ```



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java:
##########
@@ -139,22 +140,24 @@ int submit(long requestId, DispatchableSubPlan 
dispatchableSubPlan, long timeout
         for (Map.Entry<QueryServerInstance, List<Integer>> queryServerEntry
             : 
dispatchableSubPlan.getQueryStageList().get(stageId).getServerInstanceToWorkerIdMap().entrySet())
 {
           QueryServerInstance queryServerInstance = queryServerEntry.getKey();
+          Worker.QueryRequest.Builder queryRequestBuilder = 
Worker.QueryRequest.newBuilder();
+          String host = queryServerInstance.getHostname();
+          int servicePort = queryServerInstance.getQueryServicePort();
+          int mailboxPort = queryServerInstance.getQueryMailboxPort();
           for (int workerId : queryServerEntry.getValue()) {
-            String host = queryServerInstance.getHostname();
-            int servicePort = queryServerInstance.getQueryServicePort();
-            int mailboxPort = queryServerInstance.getQueryMailboxPort();
             VirtualServerAddress virtualServerAddress = new 
VirtualServerAddress(host, mailboxPort, workerId);
-            DispatchClient client = getOrCreateDispatchClient(host, 
servicePort);
             dispatchCalls++;
-            int finalStageId = stageId;
-            _executorService.submit(() -> 
client.submit(Worker.QueryRequest.newBuilder().setStagePlan(
-                        QueryPlanSerDeUtils.serialize(
-                            constructDistributedStagePlan(dispatchableSubPlan, 
finalStageId, virtualServerAddress)))
-                    .putMetadata(QueryConfig.KEY_OF_BROKER_REQUEST_ID, 
String.valueOf(requestId))
-                    .putMetadata(QueryConfig.KEY_OF_BROKER_REQUEST_TIMEOUT_MS, 
String.valueOf(timeoutMs))
-                    .putAllMetadata(queryOptions).build(), finalStageId, 
queryServerInstance, deadline,
-                dispatchCallbacks::offer));
+            queryRequestBuilder.addStagePlan(
+                QueryPlanSerDeUtils.serialize(dispatchableSubPlan, stageId, 
virtualServerAddress));

Review Comment:
   actually we decided to include this in the PR, b/c both changes require 
proto modification so it is best to avoid multiple changes to wire protocol. 
updated the PR by force pushing



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/serde/QueryPlanSerDeUtils.java:
##########
@@ -79,6 +86,25 @@ public static String addressToProto(VirtualServerAddress 
serverAddress) {
     return String.format("%s@%s:%s", serverAddress.workerId(), 
serverAddress.hostname(), serverAddress.port());
   }
 
+  private static List<DistributedStagePlan> 
deserializeStagePlan(Worker.StagePlan stagePlan) {
+    List<DistributedStagePlan> distributedStagePlans = new ArrayList<>();
+    String serverAddress = stagePlan.getStageMetadata().getServerAddress();
+    String[] hostPort = StringUtils.split(serverAddress, ':');
+    String hostname = hostPort[0];
+    int port = Integer.parseInt(hostPort[1]);

Review Comment:
   - host-port should be at StagePlan level not metadata level
   - host-port parsing should be a util



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to