walterddr commented on code in PR #10791: URL: https://github.com/apache/pinot/pull/10791#discussion_r1206984009
########## pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryServer.java: ########## @@ -83,32 +93,30 @@ public void shutdown() { @Override public void submit(Worker.QueryRequest request, StreamObserver<Worker.QueryResponse> responseObserver) { // Deserialize the request - DistributedStagePlan distributedStagePlan; + List<DistributedStagePlan> distributedStagePlans; Map<String, String> requestMetadataMap; - long requestId = -1; + requestMetadataMap = request.getMetadataMap(); + long requestId = Long.parseLong(requestMetadataMap.get(QueryConfig.KEY_OF_BROKER_REQUEST_ID)); try { - distributedStagePlan = QueryPlanSerDeUtils.deserialize(request.getStagePlan()); - requestMetadataMap = request.getMetadataMap(); - requestId = Long.parseLong(requestMetadataMap.get(QueryConfig.KEY_OF_BROKER_REQUEST_ID)); + distributedStagePlans = QueryPlanSerDeUtils.deserializeStagePlan(request); } catch (Exception e) { LOGGER.error("Caught exception while deserializing the request: {}, payload: {}", requestId, request, e); responseObserver.onError(Status.INVALID_ARGUMENT.withDescription("Bad request").withCause(e).asException()); Review Comment: on Error should be onNext(error payload) then onComplete(), broker doesn't handle onError at the moment ########## pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/serde/QueryPlanSerDeUtils.java: ########## @@ -24,44 +24,51 @@ import java.util.Map; import java.util.regex.Matcher; import java.util.regex.Pattern; +import org.apache.commons.lang.StringUtils; import org.apache.pinot.common.proto.Worker; +import org.apache.pinot.query.planner.DispatchablePlanFragment; +import org.apache.pinot.query.planner.DispatchableSubPlan; import org.apache.pinot.query.planner.plannode.AbstractPlanNode; import org.apache.pinot.query.planner.plannode.StageNodeSerDeUtils; import org.apache.pinot.query.routing.MailboxMetadata; -import org.apache.pinot.query.routing.StageMetadata; +import org.apache.pinot.query.routing.QueryServerInstance; import org.apache.pinot.query.routing.VirtualServerAddress; import org.apache.pinot.query.routing.WorkerMetadata; import org.apache.pinot.query.runtime.plan.DistributedStagePlan; +import org.apache.pinot.query.runtime.plan.StageMetadata; /** * This utility class serialize/deserialize between {@link Worker.StagePlan} elements to Planner elements. */ public class QueryPlanSerDeUtils { + private static final Pattern VIRTUAL_SERVER_PATTERN = Pattern.compile( + "(?<virtualid>[0-9]+)@(?<host>[^:]+):(?<port>[0-9]+)"); private QueryPlanSerDeUtils() { // do not instantiate. } - public static DistributedStagePlan deserialize(Worker.StagePlan stagePlan) { - DistributedStagePlan distributedStagePlan = new DistributedStagePlan(stagePlan.getStageId()); - distributedStagePlan.setServer(protoToAddress(stagePlan.getVirtualAddress())); - distributedStagePlan.setStageRoot(StageNodeSerDeUtils.deserializeStageNode(stagePlan.getStageRoot())); - distributedStagePlan.setStageMetadata(fromProtoStageMetadata(stagePlan.getStageMetadata())); - return distributedStagePlan; + public static List<DistributedStagePlan> deserializeStagePlan(Worker.QueryRequest request) { Review Comment: intellij refactor error ```suggestion public static List<DistributedStagePlan> deserialize(Worker.QueryRequest request) { ``` ########## pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java: ########## @@ -139,22 +140,24 @@ int submit(long requestId, DispatchableSubPlan dispatchableSubPlan, long timeout for (Map.Entry<QueryServerInstance, List<Integer>> queryServerEntry : dispatchableSubPlan.getQueryStageList().get(stageId).getServerInstanceToWorkerIdMap().entrySet()) { QueryServerInstance queryServerInstance = queryServerEntry.getKey(); + Worker.QueryRequest.Builder queryRequestBuilder = Worker.QueryRequest.newBuilder(); + String host = queryServerInstance.getHostname(); + int servicePort = queryServerInstance.getQueryServicePort(); + int mailboxPort = queryServerInstance.getQueryMailboxPort(); for (int workerId : queryServerEntry.getValue()) { - String host = queryServerInstance.getHostname(); - int servicePort = queryServerInstance.getQueryServicePort(); - int mailboxPort = queryServerInstance.getQueryMailboxPort(); VirtualServerAddress virtualServerAddress = new VirtualServerAddress(host, mailboxPort, workerId); - DispatchClient client = getOrCreateDispatchClient(host, servicePort); dispatchCalls++; - int finalStageId = stageId; - _executorService.submit(() -> client.submit(Worker.QueryRequest.newBuilder().setStagePlan( - QueryPlanSerDeUtils.serialize( - constructDistributedStagePlan(dispatchableSubPlan, finalStageId, virtualServerAddress))) - .putMetadata(QueryConfig.KEY_OF_BROKER_REQUEST_ID, String.valueOf(requestId)) - .putMetadata(QueryConfig.KEY_OF_BROKER_REQUEST_TIMEOUT_MS, String.valueOf(timeoutMs)) - .putAllMetadata(queryOptions).build(), finalStageId, queryServerInstance, deadline, - dispatchCallbacks::offer)); + queryRequestBuilder.addStagePlan( + QueryPlanSerDeUtils.serialize(dispatchableSubPlan, stageId, virtualServerAddress)); Review Comment: actually we decided to include this in the PR, b/c both changes require proto modification so it is best to avoid multiple changes to wire protocol. updated the PR by force pushing ########## pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/serde/QueryPlanSerDeUtils.java: ########## @@ -79,6 +86,25 @@ public static String addressToProto(VirtualServerAddress serverAddress) { return String.format("%s@%s:%s", serverAddress.workerId(), serverAddress.hostname(), serverAddress.port()); } + private static List<DistributedStagePlan> deserializeStagePlan(Worker.StagePlan stagePlan) { + List<DistributedStagePlan> distributedStagePlans = new ArrayList<>(); + String serverAddress = stagePlan.getStageMetadata().getServerAddress(); + String[] hostPort = StringUtils.split(serverAddress, ':'); + String hostname = hostPort[0]; + int port = Integer.parseInt(hostPort[1]); Review Comment: - host-port should be at StagePlan level not metadata level - host-port parsing should be a util -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org