This is an automated email from the ASF dual-hosted git repository. rongr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 5ee6e137c0 [multistage] Refactor StageMetadata from pinot-query-planner to pinot-query-runner module (#10791) 5ee6e137c0 is described below commit 5ee6e137c02e3e0a149a7c44dd6a9f5deb29cb5d Author: Xiang Fu <xiangfu.1...@gmail.com> AuthorDate: Fri May 26 09:42:14 2023 -0700 [multistage] Refactor StageMetadata from pinot-query-planner to pinot-query-runner module (#10791) - Refactor StageMetadata from `pinot-query-planner` to `pinot-query-runtime` module - Make `QueryRequest` carries a list of `StagePlan`. - Make `QueryDispatcher` sends only one request per `QueryServerInstance` per `StageId` - Make `QuerySerDeUtils` serialize a single `PlanFragment` with multiple worker metadata once instead of once per worker. --------- Co-authored-by: Rong Rong <ro...@startree.ai> --- pinot-common/src/main/proto/worker.proto | 13 +++-- .../query/planner/DispatchablePlanFragment.java | 5 -- .../apache/pinot/query/runtime/QueryRunner.java | 2 +- .../query/runtime/plan/DistributedStagePlan.java | 1 - .../runtime/plan/OpChainExecutionContext.java | 1 - .../query/runtime/plan/PlanRequestContext.java | 1 - .../pinot/query/runtime/plan}/StageMetadata.java | 10 +++- .../runtime/plan/serde/QueryPlanSerDeUtils.java | 66 ++++++++++++++++------ .../plan/server/ServerPlanRequestContext.java | 2 +- .../apache/pinot/query/service/QueryServer.java | 50 +++++++++------- .../query/service/dispatch/QueryDispatcher.java | 50 +++++++--------- .../pinot/query/runtime/QueryRunnerTestBase.java | 12 +++- .../operator/MailboxReceiveOperatorTest.java | 2 +- .../runtime/operator/MailboxSendOperatorTest.java | 2 +- .../pinot/query/runtime/operator/OpChainTest.java | 2 +- .../operator/SortedMailboxReceiveOperatorTest.java | 2 +- .../pinot/query/service/QueryServerTest.java | 14 ++--- 17 files changed, 137 insertions(+), 98 deletions(-) diff --git a/pinot-common/src/main/proto/worker.proto b/pinot-common/src/main/proto/worker.proto index c1942f685b..dfb1cd53eb 100644 --- a/pinot-common/src/main/proto/worker.proto +++ b/pinot-common/src/main/proto/worker.proto @@ -56,10 +56,10 @@ message CancelResponse { // intentionally left empty } -// QueryRequest is the dispatched content for a specific query stage on a specific worker. +// QueryRequest is the dispatched content for all query stages to a physical worker. message QueryRequest { - map<string, string> metadata = 1; - StagePlan stagePlan = 2; + repeated StagePlan stagePlan = 1; + map<string, string> metadata = 2; } // QueryResponse is the dispatched response from worker, it doesn't contain actual data, only dispatch status. @@ -70,14 +70,15 @@ message QueryResponse { message StagePlan { int32 stageId = 1; - string virtualAddress = 2; - StageNode stageRoot = 3; - StageMetadata stageMetadata = 4; + StageNode stageRoot = 2; + StageMetadata stageMetadata = 3; } message StageMetadata { repeated WorkerMetadata workerMetadata = 1; map<string, string> customProperty = 2; + string serverAddress = 3; + repeated int32 workerIds = 4; } message WorkerMetadata { diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/DispatchablePlanFragment.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/DispatchablePlanFragment.java index c06fa383a6..e0bc3f6a45 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/DispatchablePlanFragment.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/DispatchablePlanFragment.java @@ -24,7 +24,6 @@ import java.util.List; import java.util.Map; import org.apache.pinot.core.routing.TimeBoundaryInfo; import org.apache.pinot.query.routing.QueryServerInstance; -import org.apache.pinot.query.routing.StageMetadata; import org.apache.pinot.query.routing.WorkerMetadata; @@ -108,10 +107,6 @@ public class DispatchablePlanFragment { _workerMetadataList.addAll(workerMetadataList); } - public StageMetadata toStageMetadata() { - return new StageMetadata(_workerMetadataList, _customProperties); - } - public void setServerInstanceToWorkerIdMap(Map<QueryServerInstance, List<Integer>> serverInstanceToWorkerIdMap) { _serverInstanceToWorkerIdMap.clear(); _serverInstanceToWorkerIdMap.putAll(serverInstanceToWorkerIdMap); diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java index 92bf32638d..94640ffae7 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java @@ -42,7 +42,6 @@ import org.apache.pinot.core.query.scheduler.resources.ResourceManager; import org.apache.pinot.query.mailbox.MailboxService; import org.apache.pinot.query.planner.plannode.MailboxSendNode; import org.apache.pinot.query.planner.plannode.PlanNode; -import org.apache.pinot.query.routing.StageMetadata; import org.apache.pinot.query.routing.VirtualServerAddress; import org.apache.pinot.query.routing.WorkerMetadata; import org.apache.pinot.query.runtime.executor.OpChainSchedulerService; @@ -56,6 +55,7 @@ import org.apache.pinot.query.runtime.plan.OpChainExecutionContext; import org.apache.pinot.query.runtime.plan.PhysicalPlanVisitor; import org.apache.pinot.query.runtime.plan.PlanRequestContext; import org.apache.pinot.query.runtime.plan.ServerRequestPlanVisitor; +import org.apache.pinot.query.runtime.plan.StageMetadata; import org.apache.pinot.query.runtime.plan.server.ServerPlanRequestContext; import org.apache.pinot.query.service.QueryConfig; import org.apache.pinot.spi.config.table.TableConfig; diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/DistributedStagePlan.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/DistributedStagePlan.java index 7f4e3015f7..f8a5f5118b 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/DistributedStagePlan.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/DistributedStagePlan.java @@ -19,7 +19,6 @@ package org.apache.pinot.query.runtime.plan; import org.apache.pinot.query.planner.plannode.PlanNode; -import org.apache.pinot.query.routing.StageMetadata; import org.apache.pinot.query.routing.VirtualServerAddress; import org.apache.pinot.query.routing.WorkerMetadata; diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java index e45db904df..e16795891b 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java @@ -20,7 +20,6 @@ package org.apache.pinot.query.runtime.plan; import java.util.function.Consumer; import org.apache.pinot.query.mailbox.MailboxService; -import org.apache.pinot.query.routing.StageMetadata; import org.apache.pinot.query.routing.VirtualServerAddress; import org.apache.pinot.query.runtime.operator.OpChainId; import org.apache.pinot.query.runtime.operator.OpChainStats; diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PlanRequestContext.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PlanRequestContext.java index d3d890d9d5..d0233c1578 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PlanRequestContext.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PlanRequestContext.java @@ -21,7 +21,6 @@ package org.apache.pinot.query.runtime.plan; import java.util.ArrayList; import java.util.List; import org.apache.pinot.query.mailbox.MailboxService; -import org.apache.pinot.query.routing.StageMetadata; import org.apache.pinot.query.routing.VirtualServerAddress; diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/StageMetadata.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/StageMetadata.java similarity index 90% rename from pinot-query-planner/src/main/java/org/apache/pinot/query/routing/StageMetadata.java rename to pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/StageMetadata.java index 16d0da897b..2ad9df403d 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/StageMetadata.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/StageMetadata.java @@ -16,12 +16,13 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.query.routing; +package org.apache.pinot.query.runtime.plan; import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.pinot.core.routing.TimeBoundaryInfo; +import org.apache.pinot.query.routing.WorkerMetadata; /** @@ -31,7 +32,7 @@ public class StageMetadata { private final List<WorkerMetadata> _workerMetadataList; private final Map<String, String> _customProperties; - public StageMetadata(List<WorkerMetadata> workerMetadataList, Map<String, String> customProperties) { + StageMetadata(List<WorkerMetadata> workerMetadataList, Map<String, String> customProperties) { _workerMetadataList = workerMetadataList; _customProperties = customProperties; } @@ -71,6 +72,11 @@ public class StageMetadata { return this; } + public Builder addCustomProperties(Map<String, String> customPropertyMap) { + _customProperties.putAll(customPropertyMap); + return this; + } + public StageMetadata build() { return new StageMetadata(_workerMetadataList, _customProperties); } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/serde/QueryPlanSerDeUtils.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/serde/QueryPlanSerDeUtils.java index 62b2af1ab4..cd4c1d6fd7 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/serde/QueryPlanSerDeUtils.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/serde/QueryPlanSerDeUtils.java @@ -24,44 +24,51 @@ import java.util.List; import java.util.Map; import java.util.regex.Matcher; import java.util.regex.Pattern; +import org.apache.commons.lang.StringUtils; import org.apache.pinot.common.proto.Worker; +import org.apache.pinot.query.planner.DispatchablePlanFragment; +import org.apache.pinot.query.planner.DispatchableSubPlan; import org.apache.pinot.query.planner.plannode.AbstractPlanNode; import org.apache.pinot.query.planner.plannode.StageNodeSerDeUtils; import org.apache.pinot.query.routing.MailboxMetadata; -import org.apache.pinot.query.routing.StageMetadata; +import org.apache.pinot.query.routing.QueryServerInstance; import org.apache.pinot.query.routing.VirtualServerAddress; import org.apache.pinot.query.routing.WorkerMetadata; import org.apache.pinot.query.runtime.plan.DistributedStagePlan; +import org.apache.pinot.query.runtime.plan.StageMetadata; /** * This utility class serialize/deserialize between {@link Worker.StagePlan} elements to Planner elements. */ public class QueryPlanSerDeUtils { + private static final Pattern VIRTUAL_SERVER_PATTERN = Pattern.compile( + "(?<virtualid>[0-9]+)@(?<host>[^:]+):(?<port>[0-9]+)"); private QueryPlanSerDeUtils() { // do not instantiate. } - public static DistributedStagePlan deserialize(Worker.StagePlan stagePlan) { - DistributedStagePlan distributedStagePlan = new DistributedStagePlan(stagePlan.getStageId()); - distributedStagePlan.setServer(protoToAddress(stagePlan.getVirtualAddress())); - distributedStagePlan.setStageRoot(StageNodeSerDeUtils.deserializeStageNode(stagePlan.getStageRoot())); - distributedStagePlan.setStageMetadata(fromProtoStageMetadata(stagePlan.getStageMetadata())); - return distributedStagePlan; + public static List<DistributedStagePlan> deserializeStagePlan(Worker.QueryRequest request) { + List<DistributedStagePlan> distributedStagePlans = new ArrayList<>(); + for (Worker.StagePlan stagePlan : request.getStagePlanList()) { + distributedStagePlans.addAll(deserializeStagePlan(stagePlan)); + } + return distributedStagePlans; } - public static Worker.StagePlan serialize(DistributedStagePlan distributedStagePlan) { + public static Worker.StagePlan serialize(DispatchableSubPlan dispatchableSubPlan, int stageId, + QueryServerInstance queryServerInstance, List<Integer> workerIds) { return Worker.StagePlan.newBuilder() - .setStageId(distributedStagePlan.getStageId()) - .setVirtualAddress(addressToProto(distributedStagePlan.getServer())) - .setStageRoot(StageNodeSerDeUtils.serializeStageNode((AbstractPlanNode) distributedStagePlan.getStageRoot())) - .setStageMetadata(toProtoStageMetadata(distributedStagePlan.getStageMetadata())).build(); + .setStageId(stageId) + .setStageRoot(StageNodeSerDeUtils.serializeStageNode( + (AbstractPlanNode) dispatchableSubPlan.getQueryStageList().get(stageId).getPlanFragment() + .getFragmentRoot())) + .setStageMetadata( + toProtoStageMetadata(dispatchableSubPlan.getQueryStageList().get(stageId), queryServerInstance, workerIds)) + .build(); } - private static final Pattern VIRTUAL_SERVER_PATTERN = Pattern.compile( - "(?<virtualid>[0-9]+)@(?<host>[^:]+):(?<port>[0-9]+)"); - public static VirtualServerAddress protoToAddress(String virtualAddressStr) { Matcher matcher = VIRTUAL_SERVER_PATTERN.matcher(virtualAddressStr); if (!matcher.matches()) { @@ -79,6 +86,25 @@ public class QueryPlanSerDeUtils { return String.format("%s@%s:%s", serverAddress.workerId(), serverAddress.hostname(), serverAddress.port()); } + private static List<DistributedStagePlan> deserializeStagePlan(Worker.StagePlan stagePlan) { + List<DistributedStagePlan> distributedStagePlans = new ArrayList<>(); + String serverAddress = stagePlan.getStageMetadata().getServerAddress(); + String[] hostPort = StringUtils.split(serverAddress, ':'); + String hostname = hostPort[0]; + int port = Integer.parseInt(hostPort[1]); + AbstractPlanNode stageRoot = StageNodeSerDeUtils.deserializeStageNode(stagePlan.getStageRoot()); + StageMetadata stageMetadata = fromProtoStageMetadata(stagePlan.getStageMetadata()); + for (int workerId : stagePlan.getStageMetadata().getWorkerIdsList()) { + DistributedStagePlan distributedStagePlan = new DistributedStagePlan(stagePlan.getStageId()); + VirtualServerAddress virtualServerAddress = new VirtualServerAddress(hostname, port, workerId); + distributedStagePlan.setServer(virtualServerAddress); + distributedStagePlan.setStageRoot(stageRoot); + distributedStagePlan.setStageMetadata(stageMetadata); + distributedStagePlans.add(distributedStagePlan); + } + return distributedStagePlans; + } + private static StageMetadata fromProtoStageMetadata(Worker.StageMetadata protoStageMetadata) { StageMetadata.Builder builder = new StageMetadata.Builder(); List<WorkerMetadata> workerMetadataList = new ArrayList<>(); @@ -119,12 +145,16 @@ public class QueryPlanSerDeUtils { return mailboxMetadata; } - private static Worker.StageMetadata toProtoStageMetadata(StageMetadata stageMetadata) { + private static Worker.StageMetadata toProtoStageMetadata(DispatchablePlanFragment planFragment, + QueryServerInstance queryServerInstance, List<Integer> workerIds) { Worker.StageMetadata.Builder builder = Worker.StageMetadata.newBuilder(); - for (WorkerMetadata workerMetadata : stageMetadata.getWorkerMetadataList()) { + for (WorkerMetadata workerMetadata : planFragment.getWorkerMetadataList()) { builder.addWorkerMetadata(toProtoWorkerMetadata(workerMetadata)); } - builder.putAllCustomProperty(stageMetadata.getCustomProperties()); + builder.putAllCustomProperty(planFragment.getCustomProperties()); + builder.setServerAddress(String.format("%s:%d", queryServerInstance.getHostname(), + queryServerInstance.getQueryMailboxPort())); + builder.addAllWorkerIds(workerIds); return builder.build(); } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestContext.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestContext.java index 1c0f7168ff..87bf3302b5 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestContext.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestContext.java @@ -22,9 +22,9 @@ import org.apache.pinot.common.request.InstanceRequest; import org.apache.pinot.common.request.PinotQuery; import org.apache.pinot.core.routing.TimeBoundaryInfo; import org.apache.pinot.query.mailbox.MailboxService; -import org.apache.pinot.query.routing.StageMetadata; import org.apache.pinot.query.routing.VirtualServerAddress; import org.apache.pinot.query.runtime.plan.PlanRequestContext; +import org.apache.pinot.query.runtime.plan.StageMetadata; import org.apache.pinot.spi.config.table.TableType; diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryServer.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryServer.java index d88b87220e..126273d310 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryServer.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryServer.java @@ -22,10 +22,13 @@ import io.grpc.Server; import io.grpc.ServerBuilder; import io.grpc.Status; import io.grpc.stub.StreamObserver; +import java.util.List; import java.util.Map; -import org.apache.pinot.common.exception.QueryException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import org.apache.pinot.common.proto.PinotQueryWorkerGrpc; import org.apache.pinot.common.proto.Worker; +import org.apache.pinot.common.utils.NamedThreadFactory; import org.apache.pinot.core.transport.grpc.GrpcQueryServer; import org.apache.pinot.query.runtime.QueryRunner; import org.apache.pinot.query.runtime.plan.DistributedStagePlan; @@ -45,12 +48,19 @@ public class QueryServer extends PinotQueryWorkerGrpc.PinotQueryWorkerImplBase { private static final int MAX_INBOUND_MESSAGE_SIZE = 64 * 1024 * 1024; private final int _port; - private Server _server = null; private final QueryRunner _queryRunner; + // query submission service is only used for plan submission for now. + // TODO: with complex query submission logic we should allow asynchronous query submission return instead of + // directly return from submission response observer. + private final ExecutorService _querySubmissionExecutorService; + + private Server _server = null; public QueryServer(int port, QueryRunner queryRunner) { _port = port; _queryRunner = queryRunner; + _querySubmissionExecutorService = Executors.newCachedThreadPool( + new NamedThreadFactory("query_submission_executor_on_" + _port + "_port")); } public void start() { @@ -83,32 +93,30 @@ public class QueryServer extends PinotQueryWorkerGrpc.PinotQueryWorkerImplBase { @Override public void submit(Worker.QueryRequest request, StreamObserver<Worker.QueryResponse> responseObserver) { // Deserialize the request - DistributedStagePlan distributedStagePlan; + List<DistributedStagePlan> distributedStagePlans; Map<String, String> requestMetadataMap; - long requestId = -1; + requestMetadataMap = request.getMetadataMap(); + long requestId = Long.parseLong(requestMetadataMap.get(QueryConfig.KEY_OF_BROKER_REQUEST_ID)); try { - distributedStagePlan = QueryPlanSerDeUtils.deserialize(request.getStagePlan()); - requestMetadataMap = request.getMetadataMap(); - requestId = Long.parseLong(requestMetadataMap.get(QueryConfig.KEY_OF_BROKER_REQUEST_ID)); + distributedStagePlans = QueryPlanSerDeUtils.deserializeStagePlan(request); } catch (Exception e) { LOGGER.error("Caught exception while deserializing the request: {}, payload: {}", requestId, request, e); responseObserver.onError(Status.INVALID_ARGUMENT.withDescription("Bad request").withCause(e).asException()); return; } - - try { - _queryRunner.processQuery(distributedStagePlan, requestMetadataMap); - responseObserver.onNext(Worker.QueryResponse.newBuilder() - .putMetadata(QueryConfig.KEY_OF_SERVER_RESPONSE_STATUS_OK, "").build()); - responseObserver.onCompleted(); - } catch (Throwable t) { - LOGGER.error("Caught exception while compiling opChain for request: {}, stage: {}", requestId, - distributedStagePlan.getStageId(), t); - responseObserver.onNext(Worker.QueryResponse.newBuilder() - .putMetadata(QueryConfig.KEY_OF_SERVER_RESPONSE_STATUS_ERROR, QueryException.getTruncatedStackTrace(t)) - .build()); - responseObserver.onCompleted(); - } + // TODO: allow thrown exception to return back to broker in asynchronous manner. + distributedStagePlans.forEach(distributedStagePlan -> _querySubmissionExecutorService.submit(() -> { + try { + _queryRunner.processQuery(distributedStagePlan, requestMetadataMap); + } catch (Throwable t) { + LOGGER.error("Caught exception while compiling opChain for request: {}, stage: {}", requestId, + distributedStagePlan.getStageId(), t); + } + }) + ); + responseObserver.onNext(Worker.QueryResponse.newBuilder() + .putMetadata(QueryConfig.KEY_OF_SERVER_RESPONSE_STATUS_OK, "").build()); + responseObserver.onCompleted(); } @Override diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java index aa700a2e51..d7eb25a944 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java @@ -45,6 +45,7 @@ import org.apache.pinot.core.common.ObjectSerDeUtils; import org.apache.pinot.core.query.reduce.ExecutionStatsAggregator; import org.apache.pinot.core.util.trace.TracedThreadFactory; import org.apache.pinot.query.mailbox.MailboxService; +import org.apache.pinot.query.planner.DispatchablePlanFragment; import org.apache.pinot.query.planner.DispatchableSubPlan; import org.apache.pinot.query.planner.ExplainPlanPlanVisitor; import org.apache.pinot.query.planner.plannode.MailboxReceiveNode; @@ -56,8 +57,8 @@ import org.apache.pinot.query.runtime.operator.MailboxReceiveOperator; import org.apache.pinot.query.runtime.operator.OpChainStats; import org.apache.pinot.query.runtime.operator.OperatorStats; import org.apache.pinot.query.runtime.operator.utils.OperatorUtils; -import org.apache.pinot.query.runtime.plan.DistributedStagePlan; import org.apache.pinot.query.runtime.plan.OpChainExecutionContext; +import org.apache.pinot.query.runtime.plan.StageMetadata; import org.apache.pinot.query.runtime.plan.serde.QueryPlanSerDeUtils; import org.apache.pinot.query.service.QueryConfig; import org.apache.pinot.spi.utils.ByteArray; @@ -139,22 +140,21 @@ public class QueryDispatcher { for (Map.Entry<QueryServerInstance, List<Integer>> queryServerEntry : dispatchableSubPlan.getQueryStageList().get(stageId).getServerInstanceToWorkerIdMap().entrySet()) { QueryServerInstance queryServerInstance = queryServerEntry.getKey(); - for (int workerId : queryServerEntry.getValue()) { - String host = queryServerInstance.getHostname(); - int servicePort = queryServerInstance.getQueryServicePort(); - int mailboxPort = queryServerInstance.getQueryMailboxPort(); - VirtualServerAddress virtualServerAddress = new VirtualServerAddress(host, mailboxPort, workerId); - DispatchClient client = getOrCreateDispatchClient(host, servicePort); - dispatchCalls++; - int finalStageId = stageId; - _executorService.submit(() -> client.submit(Worker.QueryRequest.newBuilder().setStagePlan( - QueryPlanSerDeUtils.serialize( - constructDistributedStagePlan(dispatchableSubPlan, finalStageId, virtualServerAddress))) - .putMetadata(QueryConfig.KEY_OF_BROKER_REQUEST_ID, String.valueOf(requestId)) - .putMetadata(QueryConfig.KEY_OF_BROKER_REQUEST_TIMEOUT_MS, String.valueOf(timeoutMs)) - .putAllMetadata(queryOptions).build(), finalStageId, queryServerInstance, deadline, - dispatchCallbacks::offer)); - } + Worker.QueryRequest.Builder queryRequestBuilder = Worker.QueryRequest.newBuilder(); + String host = queryServerInstance.getHostname(); + int servicePort = queryServerInstance.getQueryServicePort(); + queryRequestBuilder.addStagePlan( + QueryPlanSerDeUtils.serialize(dispatchableSubPlan, stageId, queryServerInstance, + queryServerEntry.getValue())); + dispatchCalls++; + Worker.QueryRequest queryRequest = + queryRequestBuilder.putMetadata(QueryConfig.KEY_OF_BROKER_REQUEST_ID, String.valueOf(requestId)) + .putMetadata(QueryConfig.KEY_OF_BROKER_REQUEST_TIMEOUT_MS, String.valueOf(timeoutMs)) + .putAllMetadata(queryOptions).build(); + DispatchClient client = getOrCreateDispatchClient(host, servicePort); + int finalStageId = stageId; + _executorService.submit(() -> client.submit(queryRequest, finalStageId, queryServerInstance, deadline, + dispatchCallbacks::offer)); } } } @@ -190,14 +190,14 @@ public class QueryDispatcher { public static ResultTable runReducer(long requestId, DispatchableSubPlan dispatchableSubPlan, int reduceStageId, long timeoutMs, MailboxService mailboxService, Map<Integer, ExecutionStatsAggregator> statsAggregatorMap, boolean traceEnabled) { - MailboxReceiveNode reduceNode = - (MailboxReceiveNode) dispatchableSubPlan.getQueryStageList().get(reduceStageId).getPlanFragment() - .getFragmentRoot(); + DispatchablePlanFragment reduceStagePlanFragment = dispatchableSubPlan.getQueryStageList().get(reduceStageId); + MailboxReceiveNode reduceNode = (MailboxReceiveNode) reduceStagePlanFragment.getPlanFragment().getFragmentRoot(); VirtualServerAddress server = new VirtualServerAddress(mailboxService.getHostname(), mailboxService.getPort(), 0); OpChainExecutionContext context = new OpChainExecutionContext(mailboxService, requestId, reduceStageId, server, timeoutMs, System.currentTimeMillis() + timeoutMs, - dispatchableSubPlan.getQueryStageList().get(reduceStageId).toStageMetadata(), + new StageMetadata.Builder().setWorkerMetadataList(reduceStagePlanFragment.getWorkerMetadataList()) + .addCustomProperties(reduceStagePlanFragment.getCustomProperties()).build(), traceEnabled); MailboxReceiveOperator mailboxReceiveOperator = createReduceStageOperator(context, reduceNode.getSenderStageId()); List<DataBlock> resultDataBlocks = @@ -207,14 +207,6 @@ public class QueryDispatcher { dispatchableSubPlan.getQueryStageList().get(0).getPlanFragment().getFragmentRoot().getDataSchema()); } - @VisibleForTesting - public static DistributedStagePlan constructDistributedStagePlan(DispatchableSubPlan dispatchableSubPlan, - int stageId, VirtualServerAddress serverAddress) { - return new DistributedStagePlan(stageId, serverAddress, - dispatchableSubPlan.getQueryStageList().get(stageId).getPlanFragment().getFragmentRoot(), - dispatchableSubPlan.getQueryStageList().get(stageId).toStageMetadata()); - } - private static List<DataBlock> reduceMailboxReceive(MailboxReceiveOperator mailboxReceiveOperator, long timeoutMs, @Nullable Map<Integer, ExecutionStatsAggregator> executionStatsAggregatorMap, DispatchableSubPlan dispatchableSubPlan, diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java index 32164207d0..190bac2573 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java @@ -52,6 +52,7 @@ import org.apache.pinot.query.planner.plannode.MailboxReceiveNode; import org.apache.pinot.query.routing.QueryServerInstance; import org.apache.pinot.query.routing.VirtualServerAddress; import org.apache.pinot.query.runtime.plan.DistributedStagePlan; +import org.apache.pinot.query.runtime.plan.StageMetadata; import org.apache.pinot.query.service.QueryConfig; import org.apache.pinot.query.service.dispatch.QueryDispatcher; import org.apache.pinot.spi.data.FieldSpec; @@ -134,13 +135,22 @@ public abstract class QueryRunnerTestBase extends QueryTestSet { for (Map.Entry<QueryServerInstance, List<Integer>> entry : serverInstanceToWorkerIdMap.entrySet()) { QueryServerInstance server = entry.getKey(); for (int workerId : entry.getValue()) { - DistributedStagePlan distributedStagePlan = QueryDispatcher.constructDistributedStagePlan( + DistributedStagePlan distributedStagePlan = constructDistributedStagePlan( dispatchableSubPlan, stageId, new VirtualServerAddress(server, workerId)); _servers.get(server).processQuery(distributedStagePlan, requestMetadataMap); } } } + protected static DistributedStagePlan constructDistributedStagePlan(DispatchableSubPlan dispatchableSubPlan, + int stageId, VirtualServerAddress serverAddress) { + return new DistributedStagePlan(stageId, serverAddress, + dispatchableSubPlan.getQueryStageList().get(stageId).getPlanFragment().getFragmentRoot(), + new StageMetadata.Builder().setWorkerMetadataList( + dispatchableSubPlan.getQueryStageList().get(stageId).getWorkerMetadataList()) + .addCustomProperties(dispatchableSubPlan.getQueryStageList().get(stageId).getCustomProperties()).build()); + } + protected List<Object[]> queryH2(String sql) throws Exception { int firstSemi = sql.indexOf(';'); diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java index bda313fa73..bcec439843 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java @@ -31,12 +31,12 @@ import org.apache.pinot.query.mailbox.MailboxIdUtils; import org.apache.pinot.query.mailbox.MailboxService; import org.apache.pinot.query.mailbox.ReceivingMailbox; import org.apache.pinot.query.routing.MailboxMetadata; -import org.apache.pinot.query.routing.StageMetadata; import org.apache.pinot.query.routing.VirtualServerAddress; import org.apache.pinot.query.routing.WorkerMetadata; import org.apache.pinot.query.runtime.blocks.TransferableBlock; import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils; import org.apache.pinot.query.runtime.plan.OpChainExecutionContext; +import org.apache.pinot.query.runtime.plan.StageMetadata; import org.mockito.Mock; import org.mockito.MockitoAnnotations; import org.testng.annotations.AfterMethod; diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java index 774393945e..ade517f1a1 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java @@ -23,13 +23,13 @@ import java.util.List; import java.util.Map; import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.query.mailbox.MailboxService; -import org.apache.pinot.query.routing.StageMetadata; import org.apache.pinot.query.routing.VirtualServerAddress; import org.apache.pinot.query.routing.WorkerMetadata; import org.apache.pinot.query.runtime.blocks.TransferableBlock; import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils; import org.apache.pinot.query.runtime.operator.exchange.BlockExchange; import org.apache.pinot.query.runtime.plan.OpChainExecutionContext; +import org.apache.pinot.query.runtime.plan.StageMetadata; import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.MockitoAnnotations; diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OpChainTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OpChainTest.java index 01f3dc8715..4af33067fa 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OpChainTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OpChainTest.java @@ -42,13 +42,13 @@ import org.apache.pinot.query.mailbox.ReceivingMailbox; import org.apache.pinot.query.planner.logical.RexExpression; import org.apache.pinot.query.planner.physical.MailboxIdUtils; import org.apache.pinot.query.routing.MailboxMetadata; -import org.apache.pinot.query.routing.StageMetadata; import org.apache.pinot.query.routing.VirtualServerAddress; import org.apache.pinot.query.routing.WorkerMetadata; import org.apache.pinot.query.runtime.blocks.TransferableBlock; import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils; import org.apache.pinot.query.runtime.operator.exchange.BlockExchange; import org.apache.pinot.query.runtime.plan.OpChainExecutionContext; +import org.apache.pinot.query.runtime.plan.StageMetadata; import org.apache.pinot.spi.data.FieldSpec; import org.mockito.Mock; import org.mockito.MockitoAnnotations; diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperatorTest.java index 4c2ebc5d00..7ed0c2469a 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperatorTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperatorTest.java @@ -35,12 +35,12 @@ import org.apache.pinot.query.mailbox.MailboxService; import org.apache.pinot.query.mailbox.ReceivingMailbox; import org.apache.pinot.query.planner.logical.RexExpression; import org.apache.pinot.query.routing.MailboxMetadata; -import org.apache.pinot.query.routing.StageMetadata; import org.apache.pinot.query.routing.VirtualServerAddress; import org.apache.pinot.query.routing.WorkerMetadata; import org.apache.pinot.query.runtime.blocks.TransferableBlock; import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils; import org.apache.pinot.query.runtime.plan.OpChainExecutionContext; +import org.apache.pinot.query.runtime.plan.StageMetadata; import org.mockito.Mock; import org.mockito.MockitoAnnotations; import org.testng.annotations.AfterMethod; diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryServerTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryServerTest.java index d9803acc17..ca24f97043 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryServerTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryServerTest.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.query.service; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; @@ -40,12 +41,10 @@ import org.apache.pinot.query.planner.DispatchablePlanFragment; import org.apache.pinot.query.planner.DispatchableSubPlan; import org.apache.pinot.query.planner.plannode.PlanNode; import org.apache.pinot.query.routing.QueryServerInstance; -import org.apache.pinot.query.routing.StageMetadata; -import org.apache.pinot.query.routing.VirtualServerAddress; import org.apache.pinot.query.routing.WorkerMetadata; import org.apache.pinot.query.runtime.QueryRunner; +import org.apache.pinot.query.runtime.plan.StageMetadata; import org.apache.pinot.query.runtime.plan.serde.QueryPlanSerDeUtils; -import org.apache.pinot.query.service.dispatch.QueryDispatcher; import org.apache.pinot.query.testutils.QueryTestUtils; import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.EqualityUtils; @@ -124,7 +123,9 @@ public class QueryServerTest extends QueryTestSet { DispatchablePlanFragment dispatchablePlanFragment = dispatchableSubPlan.getQueryStageList().get(stageId); - StageMetadata stageMetadata = dispatchablePlanFragment.toStageMetadata(); + StageMetadata stageMetadata = new StageMetadata.Builder() + .setWorkerMetadataList(dispatchablePlanFragment.getWorkerMetadataList()) + .addCustomProperties(dispatchablePlanFragment.getCustomProperties()).build(); // ensure mock query runner received correctly deserialized payload. QueryRunner mockRunner = @@ -230,9 +231,8 @@ public class QueryServerTest extends QueryTestSet { QueryServerInstance serverInstance = serverInstanceToWorkerIdMap.keySet().iterator().next(); int workerId = serverInstanceToWorkerIdMap.get(serverInstance).get(0); - return Worker.QueryRequest.newBuilder().setStagePlan(QueryPlanSerDeUtils.serialize( - QueryDispatcher.constructDistributedStagePlan(dispatchableSubPlan, stageId, - new VirtualServerAddress(serverInstance, workerId)))) + return Worker.QueryRequest.newBuilder().addStagePlan( + QueryPlanSerDeUtils.serialize(dispatchableSubPlan, stageId, serverInstance, ImmutableList.of(workerId))) // the default configurations that must exist. .putMetadata(QueryConfig.KEY_OF_BROKER_REQUEST_ID, String.valueOf(RANDOM_REQUEST_ID_GEN.nextLong())) .putMetadata(QueryConfig.KEY_OF_BROKER_REQUEST_TIMEOUT_MS, --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org