This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 399f033ec3 [Multi-stage] Remove PhysicalPlanContext and clean up executor logic (#11439) 399f033ec3 is described below commit 399f033ec3917df2bc478b5904406a95e0bc7258 Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com> AuthorDate: Fri Aug 25 17:24:47 2023 -0700 [Multi-stage] Remove PhysicalPlanContext and clean up executor logic (#11439) --- .../MultiStageBrokerRequestHandler.java | 8 +- .../apache/pinot/query/runtime/QueryRunner.java | 114 ++++++++------------- .../pinot/query/runtime/operator/OpChain.java | 35 +++---- .../runtime/plan/OpChainExecutionContext.java | 26 +++-- .../query/runtime/plan/PhysicalPlanContext.java | 95 ----------------- .../query/runtime/plan/PhysicalPlanVisitor.java | 90 +++++++--------- .../plan/pipeline/PipelineBreakerExecutor.java | 47 ++++----- .../plan/server/ServerPlanRequestContext.java | 13 +-- .../plan/server/ServerPlanRequestUtils.java | 58 +++++------ .../plan/server/ServerPlanRequestVisitor.java | 2 +- .../query/service/dispatch/QueryDispatcher.java | 16 +-- .../pinot/query/service/server/QueryServer.java | 13 ++- .../pinot/query/runtime/QueryRunnerTest.java | 4 +- .../pinot/query/runtime/QueryRunnerTestBase.java | 4 +- .../executor/OpChainSchedulerServiceTest.java | 11 +- .../runtime/operator/LiteralValueOperatorTest.java | 4 - .../operator/MailboxReceiveOperatorTest.java | 4 +- .../runtime/operator/MailboxSendOperatorTest.java | 4 +- .../pinot/query/runtime/operator/OpChainTest.java | 35 ++++--- .../query/runtime/operator/OperatorTestUtil.java | 17 ++- .../operator/SortedMailboxReceiveOperatorTest.java | 5 +- .../plan/pipeline/PipelineBreakerExecutorTest.java | 29 +++--- .../service/dispatch/QueryDispatcherTest.java | 4 +- 23 files changed, 247 insertions(+), 391 deletions(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java index 3a65691073..2080bd4a64 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java @@ -166,8 +166,8 @@ public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler { return new BrokerResponseNative(QueryException.getException(QueryException.QUOTA_EXCEEDED_ERROR, errorMessage)); } - boolean traceEnabled = Boolean.parseBoolean( - sqlNodeAndOptions.getOptions().getOrDefault(CommonConstants.Broker.Request.TRACE, "false")); + Map<String, String> queryOptions = sqlNodeAndOptions.getOptions(); + boolean traceEnabled = Boolean.parseBoolean(queryOptions.get(CommonConstants.Broker.Request.TRACE)); ResultTable queryResults; Map<Integer, ExecutionStatsAggregator> stageIdStatsMap = new HashMap<>(); @@ -177,8 +177,8 @@ public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler { long executionStartTimeNs = System.nanoTime(); try { - queryResults = _queryDispatcher.submitAndReduce(requestContext, dispatchableSubPlan, queryTimeoutMs, - sqlNodeAndOptions.getOptions(), stageIdStatsMap, traceEnabled); + queryResults = _queryDispatcher.submitAndReduce(requestContext, dispatchableSubPlan, queryTimeoutMs, queryOptions, + stageIdStatsMap); } catch (Throwable t) { String consolidatedMessage = ExceptionUtils.consolidateExceptionMessages(t); LOGGER.error("Caught exception executing request {}: {}, {}", requestId, query, consolidatedMessage); diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java index f48f5561fb..021af52f80 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java @@ -20,15 +20,12 @@ package org.apache.pinot.query.runtime; import com.google.common.annotations.VisibleForTesting; import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeoutException; import javax.annotation.Nullable; import org.apache.helix.HelixManager; -import org.apache.helix.store.zk.ZkHelixPropertyStore; -import org.apache.helix.zookeeper.datamodel.ZNRecord; import org.apache.pinot.common.exception.QueryException; import org.apache.pinot.common.metrics.ServerMetrics; import org.apache.pinot.common.utils.config.QueryOptionsUtils; @@ -39,7 +36,6 @@ import org.apache.pinot.core.query.request.ServerQueryRequest; import org.apache.pinot.query.mailbox.MailboxIdUtils; import org.apache.pinot.query.mailbox.MailboxService; import org.apache.pinot.query.planner.plannode.MailboxSendNode; -import org.apache.pinot.query.planner.plannode.PlanNode; import org.apache.pinot.query.routing.MailboxMetadata; import org.apache.pinot.query.runtime.blocks.TransferableBlock; import org.apache.pinot.query.runtime.executor.ExecutorServiceUtils; @@ -50,14 +46,12 @@ import org.apache.pinot.query.runtime.operator.MultiStageOperator; import org.apache.pinot.query.runtime.operator.OpChain; import org.apache.pinot.query.runtime.plan.DistributedStagePlan; import org.apache.pinot.query.runtime.plan.OpChainExecutionContext; -import org.apache.pinot.query.runtime.plan.PhysicalPlanContext; import org.apache.pinot.query.runtime.plan.PhysicalPlanVisitor; import org.apache.pinot.query.runtime.plan.pipeline.PipelineBreakerExecutor; import org.apache.pinot.query.runtime.plan.pipeline.PipelineBreakerResult; import org.apache.pinot.query.runtime.plan.server.ServerPlanRequestContext; import org.apache.pinot.query.runtime.plan.server.ServerPlanRequestUtils; import org.apache.pinot.spi.env.PinotConfiguration; -import org.apache.pinot.spi.metrics.PinotMetricUtils; import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.CommonConstants.Broker.Request.QueryOptionKey; import org.apache.pinot.spi.utils.CommonConstants.MultiStageQueryRunner.JoinOverFlowMode; @@ -72,17 +66,13 @@ public class QueryRunner { private static final Logger LOGGER = LoggerFactory.getLogger(QueryRunner.class); private static final String PINOT_V1_SERVER_QUERY_CONFIG_PREFIX = "pinot.server.query.executor"; - // This is a temporary before merging the 2 type of executor. - private ServerQueryExecutorV1Impl _serverExecutor; private HelixManager _helixManager; - private ZkHelixPropertyStore<ZNRecord> _helixPropertyStore; - private MailboxService _mailboxService; - private String _hostname; - private int _port; + private ServerMetrics _serverMetrics; private ExecutorService _opChainExecutor; - private OpChainSchedulerService _scheduler; + private MailboxService _mailboxService; + private ServerQueryExecutorV1Impl _leafQueryExecutor; // Group-by settings @Nullable @@ -102,12 +92,14 @@ public class QueryRunner { */ public void init(PinotConfiguration config, InstanceDataManager instanceDataManager, HelixManager helixManager, ServerMetrics serverMetrics) { + _helixManager = helixManager; + _serverMetrics = serverMetrics; + String instanceName = config.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_HOSTNAME); - _hostname = instanceName.startsWith(CommonConstants.Helix.PREFIX_OF_SERVER_INSTANCE) ? instanceName.substring( + String hostname = instanceName.startsWith(CommonConstants.Helix.PREFIX_OF_SERVER_INSTANCE) ? instanceName.substring( CommonConstants.Helix.SERVER_INSTANCE_PREFIX_LENGTH) : instanceName; - _port = config.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_PORT, + int port = config.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_PORT, CommonConstants.MultiStageQueryRunner.DEFAULT_QUERY_RUNNER_PORT); - _helixManager = helixManager; // TODO: Consider using separate config for intermediate stage and leaf stage String numGroupsLimitStr = config.getProperty(CommonConstants.Server.CONFIG_OF_QUERY_EXECUTOR_NUM_GROUPS_LIMIT); @@ -121,29 +113,28 @@ public class QueryRunner { String joinOverflowModeStr = config.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_JOIN_OVERFLOW_MODE); _joinOverflowMode = joinOverflowModeStr != null ? JoinOverFlowMode.valueOf(joinOverflowModeStr) : null; + //TODO: make this configurable + _opChainExecutor = + ExecutorServiceUtils.create(config, "pinot.query.runner.opchain", "op_chain_worker_on_" + port + "_port"); + _scheduler = new OpChainSchedulerService(getOpChainExecutorService()); + _mailboxService = new MailboxService(hostname, port, config); try { - //TODO: make this configurable - _opChainExecutor = ExecutorServiceUtils.create(config, "pinot.query.runner.opchain", - "op_chain_worker_on_" + _port + "_port"); - _scheduler = new OpChainSchedulerService(getOpChainExecutorService()); - _mailboxService = new MailboxService(_hostname, _port, config); - _serverExecutor = new ServerQueryExecutorV1Impl(); - _serverExecutor.init(config.subset(PINOT_V1_SERVER_QUERY_CONFIG_PREFIX), instanceDataManager, serverMetrics); + _leafQueryExecutor = new ServerQueryExecutorV1Impl(); + _leafQueryExecutor.init(config.subset(PINOT_V1_SERVER_QUERY_CONFIG_PREFIX), instanceDataManager, serverMetrics); } catch (Exception e) { throw new RuntimeException(e); } + + LOGGER.info("Initialized QueryRunner with hostname: {}, port: {}", hostname, port); } - public void start() - throws TimeoutException { - _helixPropertyStore = _helixManager.getHelixPropertyStore(); + public void start() { _mailboxService.start(); - _serverExecutor.start(); + _leafQueryExecutor.start(); } - public void shutDown() - throws TimeoutException { - _serverExecutor.shutDown(); + public void shutDown() { + _leafQueryExecutor.shutDown(); _mailboxService.shutdown(); ExecutorServiceUtils.close(_opChainExecutor); } @@ -156,17 +147,15 @@ public class QueryRunner { */ public void processQuery(DistributedStagePlan distributedStagePlan, Map<String, String> requestMetadata) { long requestId = Long.parseLong(requestMetadata.get(CommonConstants.Query.Request.MetadataKeys.REQUEST_ID)); - long timeoutMs = Long.parseLong(requestMetadata.get(QueryOptionKey.TIMEOUT_MS)); - boolean isTraceEnabled = - Boolean.parseBoolean(requestMetadata.getOrDefault(CommonConstants.Broker.Request.TRACE, "false")); + long timeoutMs = Long.parseLong(requestMetadata.get(CommonConstants.Broker.Request.QueryOptionKey.TIMEOUT_MS)); long deadlineMs = System.currentTimeMillis() + timeoutMs; setStageCustomProperties(distributedStagePlan.getStageMetadata().getCustomProperties(), requestMetadata); // run pre-stage execution for all pipeline breakers PipelineBreakerResult pipelineBreakerResult = - PipelineBreakerExecutor.executePipelineBreakers(_scheduler, _mailboxService, distributedStagePlan, deadlineMs, - requestId, isTraceEnabled); + PipelineBreakerExecutor.executePipelineBreakers(_scheduler, _mailboxService, distributedStagePlan, + requestMetadata, requestId, deadlineMs); // Send error block to all the receivers if pipeline breaker fails if (pipelineBreakerResult != null && pipelineBreakerResult.getErrorBlock() != null) { @@ -193,13 +182,15 @@ public class QueryRunner { } // run OpChain + OpChainExecutionContext executionContext = + new OpChainExecutionContext(_mailboxService, requestId, distributedStagePlan.getStageId(), + distributedStagePlan.getServer(), deadlineMs, requestMetadata, distributedStagePlan.getStageMetadata(), + pipelineBreakerResult); OpChain opChain; if (DistributedStagePlan.isLeafStage(distributedStagePlan)) { - opChain = compileLeafStage(requestId, distributedStagePlan, requestMetadata, pipelineBreakerResult, deadlineMs, - isTraceEnabled); + opChain = compileLeafStage(executionContext, distributedStagePlan); } else { - opChain = compileIntermediateStage(requestId, distributedStagePlan, requestMetadata, pipelineBreakerResult, - deadlineMs, isTraceEnabled); + opChain = PhysicalPlanVisitor.walkPlanNode(distributedStagePlan.getStageRoot(), executionContext); } _scheduler.register(opChain); } @@ -248,51 +239,36 @@ public class QueryRunner { return _opChainExecutor; } - private OpChain compileIntermediateStage(long requestId, DistributedStagePlan distributedStagePlan, - Map<String, String> requestMetadataMap, PipelineBreakerResult pipelineBreakerResult, long deadlineMs, - boolean isTraceEnabled) { - PlanNode stageRoot = distributedStagePlan.getStageRoot(); - OpChainExecutionContext opChainContext = new OpChainExecutionContext(_mailboxService, requestId, - stageRoot.getPlanFragmentId(), distributedStagePlan.getServer(), deadlineMs, - distributedStagePlan.getStageMetadata(), pipelineBreakerResult, isTraceEnabled); - return PhysicalPlanVisitor.walkPlanNode(stageRoot, - new PhysicalPlanContext(opChainContext, pipelineBreakerResult)); - } - - private OpChain compileLeafStage(long requestId, DistributedStagePlan distributedStagePlan, - Map<String, String> requestMetadataMap, PipelineBreakerResult pipelineBreakerResult, long deadlineMs, - boolean isTraceEnabled) { - OpChainExecutionContext opChainContext = new OpChainExecutionContext(_mailboxService, requestId, - distributedStagePlan.getStageId(), distributedStagePlan.getServer(), deadlineMs, - distributedStagePlan.getStageMetadata(), pipelineBreakerResult, isTraceEnabled); - PhysicalPlanContext planContext = new PhysicalPlanContext(opChainContext, pipelineBreakerResult); - List<ServerPlanRequestContext> serverPlanRequestContexts = ServerPlanRequestUtils.constructServerQueryRequests( - planContext, distributedStagePlan, requestMetadataMap, _helixPropertyStore); + private OpChain compileLeafStage(OpChainExecutionContext executionContext, + DistributedStagePlan distributedStagePlan) { + List<ServerPlanRequestContext> serverPlanRequestContexts = + ServerPlanRequestUtils.constructServerQueryRequests(executionContext, distributedStagePlan, + _helixManager.getHelixPropertyStore()); List<ServerQueryRequest> serverQueryRequests = new ArrayList<>(serverPlanRequestContexts.size()); + long queryArrivalTimeMs = System.currentTimeMillis(); for (ServerPlanRequestContext requestContext : serverPlanRequestContexts) { - serverQueryRequests.add(new ServerQueryRequest(requestContext.getInstanceRequest(), - new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()), System.currentTimeMillis())); + serverQueryRequests.add( + new ServerQueryRequest(requestContext.getInstanceRequest(), _serverMetrics, queryArrivalTimeMs)); } MailboxSendNode sendNode = (MailboxSendNode) distributedStagePlan.getStageRoot(); - OpChainExecutionContext opChainExecutionContext = new OpChainExecutionContext(planContext); MultiStageOperator leafStageOperator = - new LeafStageTransferableBlockOperator(opChainExecutionContext, this::processServerQueryRequest, - serverQueryRequests, sendNode.getDataSchema()); + new LeafStageTransferableBlockOperator(executionContext, this::processServerQueryRequest, serverQueryRequests, + sendNode.getDataSchema()); MailboxSendOperator mailboxSendOperator = - new MailboxSendOperator(opChainExecutionContext, leafStageOperator, sendNode.getDistributionType(), + new MailboxSendOperator(executionContext, leafStageOperator, sendNode.getDistributionType(), sendNode.getPartitionKeySelector(), sendNode.getCollationKeys(), sendNode.getCollationDirections(), sendNode.isSortOnSender(), sendNode.getReceiverStageId()); - return new OpChain(opChainExecutionContext, mailboxSendOperator, Collections.emptyList()); + return new OpChain(executionContext, mailboxSendOperator); } private InstanceResponseBlock processServerQueryRequest(ServerQueryRequest request) { InstanceResponseBlock result; try { - result = _serverExecutor.execute(request, getOpChainExecutorService()); + result = _leafQueryExecutor.execute(request, getOpChainExecutorService()); } catch (Exception e) { InstanceResponseBlock errorResponse = new InstanceResponseBlock(); - errorResponse.getExceptions().put(QueryException.QUERY_EXECUTION_ERROR_CODE, - e.getMessage() + QueryException.getTruncatedStackTrace(e)); + errorResponse.getExceptions() + .put(QueryException.QUERY_EXECUTION_ERROR_CODE, e.getMessage() + QueryException.getTruncatedStackTrace(e)); result = errorResponse; } return result; diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChain.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChain.java index d5414439ed..360bef6324 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChain.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChain.java @@ -18,7 +18,6 @@ */ package org.apache.pinot.query.runtime.operator; -import java.util.List; import java.util.function.Consumer; import org.apache.pinot.core.common.Operator; import org.apache.pinot.query.runtime.blocks.TransferableBlock; @@ -34,31 +33,21 @@ import org.slf4j.LoggerFactory; public class OpChain implements AutoCloseable { private static final Logger LOGGER = LoggerFactory.getLogger(OpChain.class); - private final MultiStageOperator _root; - private final List<String> _receivingMailboxIds; private final OpChainId _id; private final OpChainStats _stats; - private final Consumer<OpChainId> _opChainFinishCallback; + private final MultiStageOperator _root; + private final Consumer<OpChainId> _finishCallback; - public OpChain(OpChainExecutionContext context, MultiStageOperator root, List<String> receivingMailboxIds) { - this(context, root, receivingMailboxIds, (id) -> { }); + public OpChain(OpChainExecutionContext context, MultiStageOperator root) { + this(context, root, (id) -> { + }); } - public OpChain(OpChainExecutionContext context, MultiStageOperator root, List<String> receivingMailboxIds, - Consumer<OpChainId> opChainFinishCallback) { - _root = root; - _receivingMailboxIds = receivingMailboxIds; + public OpChain(OpChainExecutionContext context, MultiStageOperator root, Consumer<OpChainId> finishCallback) { _id = context.getId(); _stats = context.getStats(); - _opChainFinishCallback = opChainFinishCallback; - } - - public Operator<TransferableBlock> getRoot() { - return _root; - } - - public List<String> getReceivingMailboxIds() { - return _receivingMailboxIds; + _root = root; + _finishCallback = finishCallback; } public OpChainId getId() { @@ -70,6 +59,10 @@ public class OpChain implements AutoCloseable { return _stats; } + public Operator<TransferableBlock> getRoot() { + return _root; + } + @Override public String toString() { return "OpChain{" + _id + "}"; @@ -86,7 +79,7 @@ public class OpChain implements AutoCloseable { try { _root.close(); } finally { - _opChainFinishCallback.accept(getId()); + _finishCallback.accept(getId()); LOGGER.trace("OpChain callback called"); } } @@ -102,7 +95,7 @@ public class OpChain implements AutoCloseable { try { _root.cancel(e); } finally { - _opChainFinishCallback.accept(getId()); + _finishCallback.accept(getId()); LOGGER.trace("OpChain callback called"); } } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java index 9963c91db1..cb4326e8b9 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java @@ -18,11 +18,13 @@ */ package org.apache.pinot.query.runtime.plan; +import java.util.Map; import org.apache.pinot.query.mailbox.MailboxService; import org.apache.pinot.query.routing.VirtualServerAddress; import org.apache.pinot.query.runtime.operator.OpChainId; import org.apache.pinot.query.runtime.operator.OpChainStats; import org.apache.pinot.query.runtime.plan.pipeline.PipelineBreakerResult; +import org.apache.pinot.spi.utils.CommonConstants; /** @@ -36,32 +38,30 @@ public class OpChainExecutionContext { private final int _stageId; private final VirtualServerAddress _server; private final long _deadlineMs; + private final Map<String, String> _requestMetadata; private final StageMetadata _stageMetadata; private final OpChainId _id; private final OpChainStats _stats; + private final PipelineBreakerResult _pipelineBreakerResult; private final boolean _traceEnabled; public OpChainExecutionContext(MailboxService mailboxService, long requestId, int stageId, - VirtualServerAddress server, long deadlineMs, StageMetadata stageMetadata, - PipelineBreakerResult pipelineBreakerResult, boolean traceEnabled) { + VirtualServerAddress server, long deadlineMs, Map<String, String> requestMetadata, StageMetadata stageMetadata, + PipelineBreakerResult pipelineBreakerResult) { _mailboxService = mailboxService; _requestId = requestId; _stageId = stageId; _server = server; _deadlineMs = deadlineMs; + _requestMetadata = requestMetadata; _stageMetadata = stageMetadata; _id = new OpChainId(requestId, server.workerId(), stageId); _stats = new OpChainStats(_id.toString()); + _pipelineBreakerResult = pipelineBreakerResult; if (pipelineBreakerResult != null && pipelineBreakerResult.getOpChainStats() != null) { _stats.getOperatorStatsMap().putAll(pipelineBreakerResult.getOpChainStats().getOperatorStatsMap()); } - _traceEnabled = traceEnabled; - } - - public OpChainExecutionContext(PhysicalPlanContext physicalPlanContext) { - this(physicalPlanContext.getMailboxService(), physicalPlanContext.getRequestId(), physicalPlanContext.getStageId(), - physicalPlanContext.getServer(), physicalPlanContext.getDeadlineMs(), physicalPlanContext.getStageMetadata(), - physicalPlanContext.getPipelineBreakerResult(), physicalPlanContext.isTraceEnabled()); + _traceEnabled = Boolean.parseBoolean(requestMetadata.get(CommonConstants.Broker.Request.TRACE)); } public MailboxService getMailboxService() { @@ -84,6 +84,10 @@ public class OpChainExecutionContext { return _deadlineMs; } + public Map<String, String> getRequestMetadata() { + return _requestMetadata; + } + public StageMetadata getStageMetadata() { return _stageMetadata; } @@ -96,6 +100,10 @@ public class OpChainExecutionContext { return _stats; } + public PipelineBreakerResult getPipelineBreakerResult() { + return _pipelineBreakerResult; + } + public boolean isTraceEnabled() { return _traceEnabled; } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanContext.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanContext.java deleted file mode 100644 index 00b7ac40e7..0000000000 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanContext.java +++ /dev/null @@ -1,95 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.query.runtime.plan; - -import java.util.ArrayList; -import java.util.List; -import org.apache.pinot.query.mailbox.MailboxService; -import org.apache.pinot.query.routing.VirtualServerAddress; -import org.apache.pinot.query.runtime.plan.pipeline.PipelineBreakerResult; - - -public class PhysicalPlanContext { - protected final MailboxService _mailboxService; - protected final long _requestId; - protected final int _stageId; - private final long _deadlineMs; - protected final VirtualServerAddress _server; - protected final StageMetadata _stageMetadata; - protected final PipelineBreakerResult _pipelineBreakerResult; - protected final List<String> _receivingMailboxIds = new ArrayList<>(); - private final OpChainExecutionContext _opChainExecutionContext; - private final boolean _traceEnabled; - - public PhysicalPlanContext(OpChainExecutionContext opChainContext, PipelineBreakerResult pipelineBreakerResult) { - _mailboxService = opChainContext.getMailboxService(); - _requestId = opChainContext.getRequestId(); - _stageId = opChainContext.getStageId(); - _deadlineMs = opChainContext.getDeadlineMs(); - _server = opChainContext.getServer(); - _stageMetadata = opChainContext.getStageMetadata(); - _pipelineBreakerResult = pipelineBreakerResult; - _traceEnabled = opChainContext.isTraceEnabled(); - _opChainExecutionContext = opChainContext; - } - - public long getRequestId() { - return _requestId; - } - - public int getStageId() { - return _stageId; - } - - public long getDeadlineMs() { - return _deadlineMs; - } - - public VirtualServerAddress getServer() { - return _server; - } - - public StageMetadata getStageMetadata() { - return _stageMetadata; - } - - public PipelineBreakerResult getPipelineBreakerResult() { - return _pipelineBreakerResult; - } - - public MailboxService getMailboxService() { - return _mailboxService; - } - - public void addReceivingMailboxIds(List<String> receivingMailboxIds) { - _receivingMailboxIds.addAll(receivingMailboxIds); - } - - public List<String> getReceivingMailboxIds() { - return _receivingMailboxIds; - } - - public OpChainExecutionContext getOpChainExecutionContext() { - return _opChainExecutionContext; - } - - public boolean isTraceEnabled() { - return _traceEnabled; - } -} diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java index 2340545437..144cf86e27 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java @@ -57,64 +57,56 @@ import org.apache.pinot.query.runtime.operator.WindowAggregateOperator; * this works only for the intermediate stage nodes, leaf stage nodes are expected to compile into * v1 operators at this point in time. * - * <p>This class should be used statically via {@link #walkPlanNode(PlanNode, PhysicalPlanContext)} + * <p>This class should be used statically via {@link #walkPlanNode(PlanNode, OpChainExecutionContext)} */ -public class PhysicalPlanVisitor implements PlanNodeVisitor<MultiStageOperator, PhysicalPlanContext> { +public class PhysicalPlanVisitor implements PlanNodeVisitor<MultiStageOperator, OpChainExecutionContext> { private static final PhysicalPlanVisitor INSTANCE = new PhysicalPlanVisitor(); - public static OpChain walkPlanNode(PlanNode node, PhysicalPlanContext context) { + public static OpChain walkPlanNode(PlanNode node, OpChainExecutionContext context) { MultiStageOperator root = node.visit(INSTANCE, context); - return new OpChain(context.getOpChainExecutionContext(), root, context.getReceivingMailboxIds()); + return new OpChain(context, root); } @Override - public MultiStageOperator visitMailboxReceive(MailboxReceiveNode node, PhysicalPlanContext context) { + public MultiStageOperator visitMailboxReceive(MailboxReceiveNode node, OpChainExecutionContext context) { if (node.isSortOnReceiver()) { - SortedMailboxReceiveOperator sortedMailboxReceiveOperator = - new SortedMailboxReceiveOperator(context.getOpChainExecutionContext(), node.getDistributionType(), - node.getDataSchema(), node.getCollationKeys(), node.getCollationDirections(), - node.getCollationNullDirections(), node.isSortOnSender(), node.getSenderStageId()); - context.addReceivingMailboxIds(sortedMailboxReceiveOperator.getMailboxIds()); - return sortedMailboxReceiveOperator; + return new SortedMailboxReceiveOperator(context, node.getDistributionType(), node.getDataSchema(), + node.getCollationKeys(), node.getCollationDirections(), node.getCollationNullDirections(), + node.isSortOnSender(), node.getSenderStageId()); } else { - MailboxReceiveOperator mailboxReceiveOperator = - new MailboxReceiveOperator(context.getOpChainExecutionContext(), node.getDistributionType(), - node.getSenderStageId()); - context.addReceivingMailboxIds(mailboxReceiveOperator.getMailboxIds()); - return mailboxReceiveOperator; + return new MailboxReceiveOperator(context, node.getDistributionType(), node.getSenderStageId()); } } @Override - public MultiStageOperator visitMailboxSend(MailboxSendNode node, PhysicalPlanContext context) { + public MultiStageOperator visitMailboxSend(MailboxSendNode node, OpChainExecutionContext context) { MultiStageOperator nextOperator = node.getInputs().get(0).visit(this, context); - return new MailboxSendOperator(context.getOpChainExecutionContext(), nextOperator, node.getDistributionType(), - node.getPartitionKeySelector(), node.getCollationKeys(), node.getCollationDirections(), node.isSortOnSender(), - node.getReceiverStageId()); + return new MailboxSendOperator(context, nextOperator, node.getDistributionType(), node.getPartitionKeySelector(), + node.getCollationKeys(), node.getCollationDirections(), node.isSortOnSender(), node.getReceiverStageId()); } @Override - public MultiStageOperator visitAggregate(AggregateNode node, PhysicalPlanContext context) { + public MultiStageOperator visitAggregate(AggregateNode node, OpChainExecutionContext context) { MultiStageOperator nextOperator = node.getInputs().get(0).visit(this, context); DataSchema inputSchema = node.getInputs().get(0).getDataSchema(); DataSchema resultSchema = node.getDataSchema(); - return new AggregateOperator(context.getOpChainExecutionContext(), nextOperator, resultSchema, inputSchema, - node.getAggCalls(), node.getGroupSet(), node.getAggType(), node.getFilterArgIndices(), node.getNodeHint()); + return new AggregateOperator(context, nextOperator, resultSchema, inputSchema, node.getAggCalls(), + node.getGroupSet(), node.getAggType(), node.getFilterArgIndices(), node.getNodeHint()); } @Override - public MultiStageOperator visitWindow(WindowNode node, PhysicalPlanContext context) { + public MultiStageOperator visitWindow(WindowNode node, OpChainExecutionContext context) { MultiStageOperator nextOperator = node.getInputs().get(0).visit(this, context); - return new WindowAggregateOperator(context.getOpChainExecutionContext(), nextOperator, node.getGroupSet(), - node.getOrderSet(), node.getOrderSetDirection(), node.getOrderSetNullDirection(), node.getAggCalls(), - node.getLowerBound(), node.getUpperBound(), node.getWindowFrameType(), node.getConstants(), - node.getDataSchema(), node.getInputs().get(0).getDataSchema()); + return new WindowAggregateOperator(context, nextOperator, node.getGroupSet(), node.getOrderSet(), + node.getOrderSetDirection(), node.getOrderSetNullDirection(), node.getAggCalls(), node.getLowerBound(), + node.getUpperBound(), node.getWindowFrameType(), node.getConstants(), node.getDataSchema(), + node.getInputs().get(0).getDataSchema()); } @Override - public MultiStageOperator visitSetOp(SetOpNode setOpNode, PhysicalPlanContext context) { + public MultiStageOperator visitSetOp(SetOpNode setOpNode, OpChainExecutionContext context) { List<MultiStageOperator> inputs = new ArrayList<>(); for (PlanNode input : setOpNode.getInputs()) { MultiStageOperator visited = input.visit(this, context); @@ -122,66 +114,60 @@ public class PhysicalPlanVisitor implements PlanNodeVisitor<MultiStageOperator, } switch (setOpNode.getSetOpType()) { case UNION: - return new UnionOperator(context.getOpChainExecutionContext(), inputs, - setOpNode.getInputs().get(0).getDataSchema()); + return new UnionOperator(context, inputs, setOpNode.getInputs().get(0).getDataSchema()); case INTERSECT: - return new IntersectOperator(context.getOpChainExecutionContext(), inputs, - setOpNode.getInputs().get(0).getDataSchema()); + return new IntersectOperator(context, inputs, setOpNode.getInputs().get(0).getDataSchema()); case MINUS: - return new MinusOperator(context.getOpChainExecutionContext(), inputs, - setOpNode.getInputs().get(0).getDataSchema()); + return new MinusOperator(context, inputs, setOpNode.getInputs().get(0).getDataSchema()); default: throw new IllegalStateException(); } } @Override - public MultiStageOperator visitExchange(ExchangeNode exchangeNode, PhysicalPlanContext context) { + public MultiStageOperator visitExchange(ExchangeNode exchangeNode, OpChainExecutionContext context) { throw new UnsupportedOperationException("ExchangeNode should not be visited"); } @Override - public MultiStageOperator visitFilter(FilterNode node, PhysicalPlanContext context) { + public MultiStageOperator visitFilter(FilterNode node, OpChainExecutionContext context) { MultiStageOperator nextOperator = node.getInputs().get(0).visit(this, context); - return new FilterOperator(context.getOpChainExecutionContext(), nextOperator, node.getDataSchema(), - node.getCondition()); + return new FilterOperator(context, nextOperator, node.getDataSchema(), node.getCondition()); } @Override - public MultiStageOperator visitJoin(JoinNode node, PhysicalPlanContext context) { + public MultiStageOperator visitJoin(JoinNode node, OpChainExecutionContext context) { PlanNode left = node.getInputs().get(0); PlanNode right = node.getInputs().get(1); MultiStageOperator leftOperator = left.visit(this, context); MultiStageOperator rightOperator = right.visit(this, context); - return new HashJoinOperator(context.getOpChainExecutionContext(), leftOperator, rightOperator, left.getDataSchema(), - node); + return new HashJoinOperator(context, leftOperator, rightOperator, left.getDataSchema(), node); } @Override - public MultiStageOperator visitProject(ProjectNode node, PhysicalPlanContext context) { + public MultiStageOperator visitProject(ProjectNode node, OpChainExecutionContext context) { MultiStageOperator nextOperator = node.getInputs().get(0).visit(this, context); - return new TransformOperator(context.getOpChainExecutionContext(), nextOperator, node.getDataSchema(), - node.getProjects(), node.getInputs().get(0).getDataSchema()); + return new TransformOperator(context, nextOperator, node.getDataSchema(), node.getProjects(), + node.getInputs().get(0).getDataSchema()); } @Override - public MultiStageOperator visitSort(SortNode node, PhysicalPlanContext context) { + public MultiStageOperator visitSort(SortNode node, OpChainExecutionContext context) { MultiStageOperator nextOperator = node.getInputs().get(0).visit(this, context); boolean isInputSorted = nextOperator instanceof SortedMailboxReceiveOperator; - return new SortOperator(context.getOpChainExecutionContext(), nextOperator, node.getCollationKeys(), - node.getCollationDirections(), node.getCollationNullDirections(), node.getFetch(), node.getOffset(), - node.getDataSchema(), isInputSorted); + return new SortOperator(context, nextOperator, node.getCollationKeys(), node.getCollationDirections(), + node.getCollationNullDirections(), node.getFetch(), node.getOffset(), node.getDataSchema(), isInputSorted); } @Override - public MultiStageOperator visitTableScan(TableScanNode node, PhysicalPlanContext context) { + public MultiStageOperator visitTableScan(TableScanNode node, OpChainExecutionContext context) { throw new UnsupportedOperationException("Stage node of type TableScanNode is not supported!"); } @Override - public MultiStageOperator visitValue(ValueNode node, PhysicalPlanContext context) { - return new LiteralValueOperator(context.getOpChainExecutionContext(), node.getDataSchema(), node.getLiteralRows()); + public MultiStageOperator visitValue(ValueNode node, OpChainExecutionContext context) { + return new LiteralValueOperator(context, node.getDataSchema(), node.getLiteralRows()); } } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutor.java index 69663f5d33..8972a89e18 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutor.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutor.java @@ -35,7 +35,6 @@ import org.apache.pinot.query.runtime.executor.OpChainSchedulerService; import org.apache.pinot.query.runtime.operator.OpChain; import org.apache.pinot.query.runtime.plan.DistributedStagePlan; import org.apache.pinot.query.runtime.plan.OpChainExecutionContext; -import org.apache.pinot.query.runtime.plan.PhysicalPlanContext; import org.apache.pinot.query.runtime.plan.PhysicalPlanVisitor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -57,31 +56,29 @@ public class PipelineBreakerExecutor { * @param scheduler scheduler service to run the pipeline breaker main thread. * @param mailboxService mailbox service to attach the {@link MailboxReceiveNode} against. * @param distributedStagePlan the distributed stage plan to run pipeline breaker on. - * @param deadlineMs execution deadline + * @param requestMetadata request metadata, including query options * @param requestId request ID - * @param isTraceEnabled whether to enable trace. + * @param deadlineMs execution deadline * @return pipeline breaker result; * - If exception occurs, exception block will be wrapped in {@link TransferableBlock} and assigned to each PB node. * - Normal stats will be attached to each PB node and downstream execution should return with stats attached. */ @Nullable public static PipelineBreakerResult executePipelineBreakers(OpChainSchedulerService scheduler, - MailboxService mailboxService, DistributedStagePlan distributedStagePlan, long deadlineMs, long requestId, - boolean isTraceEnabled) { + MailboxService mailboxService, DistributedStagePlan distributedStagePlan, Map<String, String> requestMetadata, + long requestId, long deadlineMs) { PipelineBreakerContext pipelineBreakerContext = new PipelineBreakerContext(); PipelineBreakerVisitor.visitPlanRoot(distributedStagePlan.getStageRoot(), pipelineBreakerContext); if (!pipelineBreakerContext.getPipelineBreakerMap().isEmpty()) { try { - PlanNode stageRoot = distributedStagePlan.getStageRoot(); // TODO: This PlanRequestContext needs to indicate it is a pre-stage opChain and only listens to pre-stage // OpChain receive-mail callbacks. // see also: MailboxIdUtils TODOs, de-couple mailbox id from query information - OpChainExecutionContext opChainContext = - new OpChainExecutionContext(mailboxService, requestId, stageRoot.getPlanFragmentId(), - distributedStagePlan.getServer(), deadlineMs, distributedStagePlan.getStageMetadata(), null, - isTraceEnabled); - PhysicalPlanContext physicalPlanContext = new PhysicalPlanContext(opChainContext, null); - return PipelineBreakerExecutor.execute(scheduler, pipelineBreakerContext, physicalPlanContext); + OpChainExecutionContext opChainExecutionContext = + new OpChainExecutionContext(mailboxService, requestId, distributedStagePlan.getStageId(), + distributedStagePlan.getServer(), deadlineMs, requestMetadata, distributedStagePlan.getStageMetadata(), + null); + return execute(scheduler, pipelineBreakerContext, opChainExecutionContext); } catch (Exception e) { LOGGER.error("Caught exception executing pipeline breaker for request: {}, stage: {}", requestId, distributedStagePlan.getStageId(), e); @@ -93,36 +90,36 @@ public class PipelineBreakerExecutor { } } - private static PipelineBreakerResult execute(OpChainSchedulerService scheduler, PipelineBreakerContext context, - PhysicalPlanContext physicalPlanContext) + private static PipelineBreakerResult execute(OpChainSchedulerService scheduler, + PipelineBreakerContext pipelineBreakerContext, OpChainExecutionContext opChainExecutionContext) throws Exception { Map<Integer, Operator<TransferableBlock>> pipelineWorkerMap = new HashMap<>(); - for (Map.Entry<Integer, PlanNode> e : context.getPipelineBreakerMap().entrySet()) { + for (Map.Entry<Integer, PlanNode> e : pipelineBreakerContext.getPipelineBreakerMap().entrySet()) { int key = e.getKey(); PlanNode planNode = e.getValue(); if (!(planNode instanceof MailboxReceiveNode)) { throw new UnsupportedOperationException("Only MailboxReceiveNode is supported to run as pipeline breaker now"); } - OpChain tempOpChain = PhysicalPlanVisitor.walkPlanNode(planNode, physicalPlanContext); - pipelineWorkerMap.put(key, tempOpChain.getRoot()); + OpChain opChain = PhysicalPlanVisitor.walkPlanNode(planNode, opChainExecutionContext); + pipelineWorkerMap.put(key, opChain.getRoot()); } - return runMailboxReceivePipelineBreaker(scheduler, context, pipelineWorkerMap, physicalPlanContext); + return runMailboxReceivePipelineBreaker(scheduler, pipelineBreakerContext, pipelineWorkerMap, + opChainExecutionContext); } private static PipelineBreakerResult runMailboxReceivePipelineBreaker(OpChainSchedulerService scheduler, - PipelineBreakerContext context, Map<Integer, Operator<TransferableBlock>> pipelineWorkerMap, - PhysicalPlanContext physicalPlanContext) + PipelineBreakerContext pipelineBreakerContext, Map<Integer, Operator<TransferableBlock>> pipelineWorkerMap, + OpChainExecutionContext opChainExecutionContext) throws Exception { PipelineBreakerOperator pipelineBreakerOperator = - new PipelineBreakerOperator(physicalPlanContext.getOpChainExecutionContext(), pipelineWorkerMap); + new PipelineBreakerOperator(opChainExecutionContext, pipelineWorkerMap); CountDownLatch latch = new CountDownLatch(1); OpChain pipelineBreakerOpChain = - new OpChain(physicalPlanContext.getOpChainExecutionContext(), pipelineBreakerOperator, - physicalPlanContext.getReceivingMailboxIds(), (id) -> latch.countDown()); + new OpChain(opChainExecutionContext, pipelineBreakerOperator, (id) -> latch.countDown()); scheduler.register(pipelineBreakerOpChain); - long timeoutMs = physicalPlanContext.getDeadlineMs() - System.currentTimeMillis(); + long timeoutMs = opChainExecutionContext.getDeadlineMs() - System.currentTimeMillis(); if (latch.await(timeoutMs, TimeUnit.MILLISECONDS)) { - return new PipelineBreakerResult(context.getNodeIdMap(), pipelineBreakerOperator.getResultMap(), + return new PipelineBreakerResult(pipelineBreakerContext.getNodeIdMap(), pipelineBreakerOperator.getResultMap(), pipelineBreakerOperator.getErrorBlock(), pipelineBreakerOpChain.getStats()); } else { throw new TimeoutException( diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestContext.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestContext.java index fb1c3af0c1..bc78678fa3 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestContext.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestContext.java @@ -20,7 +20,7 @@ package org.apache.pinot.query.runtime.plan.server; import org.apache.pinot.common.request.InstanceRequest; import org.apache.pinot.common.request.PinotQuery; -import org.apache.pinot.query.runtime.plan.PhysicalPlanContext; +import org.apache.pinot.query.runtime.plan.OpChainExecutionContext; import org.apache.pinot.spi.config.table.TableType; @@ -29,20 +29,21 @@ import org.apache.pinot.spi.config.table.TableType; * {@link PinotQuery} to execute on server. */ public class ServerPlanRequestContext { - private final PhysicalPlanContext _planContext; + private final OpChainExecutionContext _executionContext; private final TableType _tableType; private PinotQuery _pinotQuery; private InstanceRequest _instanceRequest; - public ServerPlanRequestContext(PhysicalPlanContext planContext, PinotQuery pinotQuery, TableType tableType) { - _planContext = planContext; + public ServerPlanRequestContext(OpChainExecutionContext executionContext, PinotQuery pinotQuery, + TableType tableType) { + _executionContext = executionContext; _pinotQuery = pinotQuery; _tableType = tableType; } - public PhysicalPlanContext getPlanContext() { - return _planContext; + public OpChainExecutionContext getExecutionContext() { + return _executionContext; } public TableType getTableType() { diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java index 7f9e756acc..466b29af14 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java @@ -43,7 +43,7 @@ import org.apache.pinot.query.planner.partitioning.FieldSelectionKeySelector; import org.apache.pinot.query.planner.plannode.JoinNode; import org.apache.pinot.query.routing.WorkerMetadata; import org.apache.pinot.query.runtime.plan.DistributedStagePlan; -import org.apache.pinot.query.runtime.plan.PhysicalPlanContext; +import org.apache.pinot.query.runtime.plan.OpChainExecutionContext; import org.apache.pinot.query.runtime.plan.StageMetadata; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; @@ -61,6 +61,9 @@ import org.slf4j.LoggerFactory; public class ServerPlanRequestUtils { + private ServerPlanRequestUtils() { + } + private static final int DEFAULT_LEAF_NODE_LIMIT = Integer.MAX_VALUE; private static final Logger LOGGER = LoggerFactory.getLogger(ServerPlanRequestUtils.class); private static final List<String> QUERY_REWRITERS_CLASS_NAMES = @@ -70,22 +73,16 @@ public class ServerPlanRequestUtils { new ArrayList<>(QueryRewriterFactory.getQueryRewriters(QUERY_REWRITERS_CLASS_NAMES)); private static final QueryOptimizer QUERY_OPTIMIZER = new QueryOptimizer(); - private ServerPlanRequestUtils() { - // do not instantiate. - } - /** * Entry point to construct a {@link ServerPlanRequestContext} for executing leaf-stage runner. * - * @param planContext physical plan context of the stage. + * @param executionContext execution context of the stage. * @param distributedStagePlan distributed stage plan of the stage. - * @param requestMetadataMap metadata map * @param helixPropertyStore helix property store used to fetch table config and schema for leaf-stage execution. * @return a list of server plan request context to be run */ - public static List<ServerPlanRequestContext> constructServerQueryRequests(PhysicalPlanContext planContext, - DistributedStagePlan distributedStagePlan, Map<String, String> requestMetadataMap, - ZkHelixPropertyStore<ZNRecord> helixPropertyStore) { + public static List<ServerPlanRequestContext> constructServerQueryRequests(OpChainExecutionContext executionContext, + DistributedStagePlan distributedStagePlan, ZkHelixPropertyStore<ZNRecord> helixPropertyStore) { StageMetadata stageMetadata = distributedStagePlan.getStageMetadata(); WorkerMetadata workerMetadata = distributedStagePlan.getCurrentWorkerMetadata(); String rawTableName = StageMetadata.getTableName(stageMetadata); @@ -101,15 +98,15 @@ public class ServerPlanRequestUtils { TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(rawTableName)); Schema schema = ZKMetadataProvider.getTableSchema(helixPropertyStore, TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(rawTableName)); - requests.add(ServerPlanRequestUtils.build(planContext, distributedStagePlan, requestMetadataMap, tableConfig, - schema, StageMetadata.getTimeBoundary(stageMetadata), TableType.OFFLINE, tableEntry.getValue())); + requests.add(ServerPlanRequestUtils.build(executionContext, distributedStagePlan, tableConfig, schema, + StageMetadata.getTimeBoundary(stageMetadata), TableType.OFFLINE, tableEntry.getValue())); } else if (TableType.REALTIME.name().equals(tableType)) { TableConfig tableConfig = ZKMetadataProvider.getTableConfig(helixPropertyStore, TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(rawTableName)); Schema schema = ZKMetadataProvider.getTableSchema(helixPropertyStore, TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(rawTableName)); - requests.add(ServerPlanRequestUtils.build(planContext, distributedStagePlan, requestMetadataMap, tableConfig, - schema, StageMetadata.getTimeBoundary(stageMetadata), TableType.REALTIME, tableEntry.getValue())); + requests.add(ServerPlanRequestUtils.build(executionContext, distributedStagePlan, tableConfig, schema, + StageMetadata.getTimeBoundary(stageMetadata), TableType.REALTIME, tableEntry.getValue())); } else { throw new IllegalArgumentException("Unsupported table type key: " + tableType); } @@ -117,18 +114,15 @@ public class ServerPlanRequestUtils { return requests; } - private static ServerPlanRequestContext build(PhysicalPlanContext planContext, DistributedStagePlan stagePlan, - Map<String, String> requestMetadataMap, TableConfig tableConfig, Schema schema, TimeBoundaryInfo timeBoundaryInfo, + private static ServerPlanRequestContext build(OpChainExecutionContext executionContext, + DistributedStagePlan stagePlan, TableConfig tableConfig, Schema schema, TimeBoundaryInfo timeBoundaryInfo, TableType tableType, List<String> segmentList) { // Before-visit: construct the ServerPlanRequestContext baseline // Making a unique requestId for leaf stages otherwise it causes problem on stats/metrics/tracing. - long requestId = - (Long.parseLong(requestMetadataMap.get(CommonConstants.Query.Request.MetadataKeys.REQUEST_ID)) << 16) + ( - (long) stagePlan.getStageId() << 8) + (tableType == TableType.REALTIME ? 1 : 0); - long timeoutMs = Long.parseLong(requestMetadataMap.get(CommonConstants.Broker.Request.QueryOptionKey.TIMEOUT_MS)); - boolean traceEnabled = Boolean.parseBoolean(requestMetadataMap.get(CommonConstants.Broker.Request.TRACE)); + long requestId = (executionContext.getRequestId() << 16) + ((long) stagePlan.getStageId() << 8) + ( + tableType == TableType.REALTIME ? 1 : 0); PinotQuery pinotQuery = new PinotQuery(); - Integer leafNodeLimit = QueryOptionsUtils.getMultiStageLeafLimit(requestMetadataMap); + Integer leafNodeLimit = QueryOptionsUtils.getMultiStageLeafLimit(executionContext.getRequestMetadata()); if (leafNodeLimit != null) { pinotQuery.setLimit(leafNodeLimit); } else { @@ -136,7 +130,7 @@ public class ServerPlanRequestUtils { } LOGGER.debug("QueryID" + requestId + " leafNodeLimit:" + leafNodeLimit); pinotQuery.setExplain(false); - ServerPlanRequestContext serverContext = new ServerPlanRequestContext(planContext, pinotQuery, tableType); + ServerPlanRequestContext serverContext = new ServerPlanRequestContext(executionContext, pinotQuery, tableType); // visit the plan and create query physical plan. ServerPlanRequestVisitor.walkStageNode(stagePlan.getStageRoot(), serverContext); @@ -152,7 +146,7 @@ public class ServerPlanRequestUtils { QUERY_OPTIMIZER.optimize(pinotQuery, tableConfig, schema); // 2. set pinot query options according to requestMetadataMap - updateQueryOptions(pinotQuery, requestMetadataMap, timeoutMs, traceEnabled); + updateQueryOptions(pinotQuery, executionContext); // 3. wrapped around in broker request BrokerRequest brokerRequest = new BrokerRequest(); @@ -168,7 +162,7 @@ public class ServerPlanRequestUtils { InstanceRequest instanceRequest = new InstanceRequest(); instanceRequest.setRequestId(requestId); instanceRequest.setBrokerId("unknown"); - instanceRequest.setEnableTrace(Boolean.parseBoolean(requestMetadataMap.get(CommonConstants.Broker.Request.TRACE))); + instanceRequest.setEnableTrace(executionContext.isTraceEnabled()); instanceRequest.setSearchSegments(segmentList); instanceRequest.setQuery(brokerRequest); @@ -179,16 +173,10 @@ public class ServerPlanRequestUtils { /** * Helper method to update query options. */ - private static void updateQueryOptions(PinotQuery pinotQuery, Map<String, String> requestMetadataMap, long timeoutMs, - boolean traceEnabled) { - Map<String, String> queryOptions = new HashMap<>(); - // put default timeout and trace options - queryOptions.put(CommonConstants.Broker.Request.QueryOptionKey.TIMEOUT_MS, String.valueOf(timeoutMs)); - if (traceEnabled) { - queryOptions.put(CommonConstants.Broker.Request.TRACE, "true"); - } - // overwrite with requestMetadataMap to carry query options from request: - queryOptions.putAll(requestMetadataMap); + private static void updateQueryOptions(PinotQuery pinotQuery, OpChainExecutionContext executionContext) { + Map<String, String> queryOptions = new HashMap<>(executionContext.getRequestMetadata()); + queryOptions.put(CommonConstants.Broker.Request.QueryOptionKey.TIMEOUT_MS, + Long.toString(executionContext.getDeadlineMs() - System.currentTimeMillis())); pinotQuery.setQueryOptions(queryOptions); } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestVisitor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestVisitor.java index 5e3873fbcd..be7db3ea63 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestVisitor.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestVisitor.java @@ -110,7 +110,7 @@ public class ServerPlanRequestVisitor implements PlanNodeVisitor<Void, ServerPla staticSide = node.getInputs().get(1); } staticSide.visit(this, context); - PipelineBreakerResult pipelineBreakerResult = context.getPlanContext().getPipelineBreakerResult(); + PipelineBreakerResult pipelineBreakerResult = context.getExecutionContext().getPipelineBreakerResult(); int resultMapId = pipelineBreakerResult.getNodeIdMap().get(dynamicSide); List<TransferableBlock> transferableBlocks = pipelineBreakerResult.getResultMap().getOrDefault( resultMapId, Collections.emptyList()); diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java index 2561779c2b..bf1993af3d 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java @@ -87,15 +87,14 @@ public class QueryDispatcher { } public ResultTable submitAndReduce(RequestContext context, DispatchableSubPlan dispatchableSubPlan, long timeoutMs, - Map<String, String> queryOptions, Map<Integer, ExecutionStatsAggregator> executionStatsAggregator, - boolean traceEnabled) + Map<String, String> queryOptions, Map<Integer, ExecutionStatsAggregator> executionStatsAggregator) throws Exception { long requestId = context.getRequestId(); try { submit(requestId, dispatchableSubPlan, timeoutMs, queryOptions); long reduceStartTimeNs = System.nanoTime(); ResultTable resultTable = - runReducer(requestId, dispatchableSubPlan, timeoutMs, executionStatsAggregator, traceEnabled, + runReducer(requestId, dispatchableSubPlan, timeoutMs, queryOptions, executionStatsAggregator, _mailboxService); context.setReduceTimeNanos(System.nanoTime() - reduceStartTimeNs); return resultTable; @@ -184,7 +183,8 @@ public class QueryDispatcher { @VisibleForTesting public static ResultTable runReducer(long requestId, DispatchableSubPlan dispatchableSubPlan, long timeoutMs, - Map<Integer, ExecutionStatsAggregator> statsAggregatorMap, boolean traceEnabled, MailboxService mailboxService) { + Map<String, String> queryOptions, @Nullable Map<Integer, ExecutionStatsAggregator> statsAggregatorMap, + MailboxService mailboxService) { // NOTE: Reduce stage is always stage 0 DispatchablePlanFragment dispatchablePlanFragment = dispatchableSubPlan.getQueryStageList().get(0); PlanFragment planFragment = dispatchablePlanFragment.getPlanFragment(); @@ -199,8 +199,8 @@ public class QueryDispatcher { .addCustomProperties(dispatchablePlanFragment.getCustomProperties()).build(); OpChainExecutionContext opChainExecutionContext = new OpChainExecutionContext(mailboxService, requestId, planFragment.getFragmentId(), - workerMetadataList.get(0).getVirtualServerAddress(), System.currentTimeMillis() + timeoutMs, stageMetadata, - null, traceEnabled); + workerMetadataList.get(0).getVirtualServerAddress(), System.currentTimeMillis() + timeoutMs, queryOptions, + stageMetadata, null); MailboxReceiveOperator receiveOperator = new MailboxReceiveOperator(opChainExecutionContext, receiveNode.getDistributionType(), receiveNode.getSenderStageId()); @@ -210,9 +210,9 @@ public class QueryDispatcher { return resultTable; } - private static void collectStats(DispatchableSubPlan dispatchableSubPlan, @Nullable OpChainStats opChainStats, + private static void collectStats(DispatchableSubPlan dispatchableSubPlan, OpChainStats opChainStats, @Nullable Map<Integer, ExecutionStatsAggregator> executionStatsAggregatorMap) { - if (executionStatsAggregatorMap != null && opChainStats != null) { + if (executionStatsAggregatorMap != null) { LOGGER.info("Extracting broker query execution stats, Runtime: {}ms", opChainStats.getExecutionTime()); for (Map.Entry<String, OperatorStats> entry : opChainStats.getOperatorStatsMap().entrySet()) { OperatorStats operatorStats = entry.getValue(); diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java index 893e0afc5e..3b1d18655d 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java @@ -98,10 +98,10 @@ public class QueryServer extends PinotQueryWorkerGrpc.PinotQueryWorkerImplBase { public void submit(Worker.QueryRequest request, StreamObserver<Worker.QueryResponse> responseObserver) { // Deserialize the request List<DistributedStagePlan> distributedStagePlans; - Map<String, String> requestMetadataMap; - requestMetadataMap = request.getMetadataMap(); - long requestId = Long.parseLong(requestMetadataMap.get(CommonConstants.Query.Request.MetadataKeys.REQUEST_ID)); - long timeoutMs = Long.parseLong(requestMetadataMap.get(CommonConstants.Broker.Request.QueryOptionKey.TIMEOUT_MS)); + Map<String, String> requestMetadata; + requestMetadata = request.getMetadataMap(); + long requestId = Long.parseLong(requestMetadata.get(CommonConstants.Query.Request.MetadataKeys.REQUEST_ID)); + long timeoutMs = Long.parseLong(requestMetadata.get(CommonConstants.Broker.Request.QueryOptionKey.TIMEOUT_MS)); long deadlineMs = System.currentTimeMillis() + timeoutMs; // 1. Deserialized request try { @@ -114,7 +114,7 @@ public class QueryServer extends PinotQueryWorkerGrpc.PinotQueryWorkerImplBase { // 2. Submit distributed stage plans SubmissionService submissionService = new SubmissionService(_querySubmissionExecutorService); distributedStagePlans.forEach(distributedStagePlan -> submissionService.submit(() -> { - _queryRunner.processQuery(distributedStagePlan, requestMetadataMap); + _queryRunner.processQuery(distributedStagePlan, requestMetadata); })); // 3. await response successful or any failure which cancels all other tasks. try { @@ -123,8 +123,7 @@ public class QueryServer extends PinotQueryWorkerGrpc.PinotQueryWorkerImplBase { LOGGER.error("error occurred during stage submission for {}:\n{}", requestId, t); responseObserver.onNext(Worker.QueryResponse.newBuilder() .putMetadata(CommonConstants.Query.Response.ServerResponseStatus.STATUS_ERROR, - QueryException.getTruncatedStackTrace(t)) - .build()); + QueryException.getTruncatedStackTrace(t)).build()); responseObserver.onCompleted(); return; } diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java index 7b27883210..afe5aa34ab 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java @@ -20,6 +20,7 @@ package org.apache.pinot.query.runtime; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -208,7 +209,8 @@ public class QueryRunnerTest extends QueryRunnerTestBase { processDistributedStagePlans(dispatchableSubPlan, stageId, requestMetadataMap); } try { - QueryDispatcher.runReducer(requestId, dispatchableSubPlan, timeoutMs, null, false, _mailboxService); + QueryDispatcher.runReducer(requestId, dispatchableSubPlan, timeoutMs, Collections.emptyMap(), null, + _mailboxService); Assert.fail("Should have thrown exception!"); } catch (RuntimeException e) { // NOTE: The actual message is (usually) something like: diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java index f784f4b586..5112cf4caf 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java @@ -137,8 +137,8 @@ public abstract class QueryRunnerTestBase extends QueryTestSet { } } ResultTable resultTable = - QueryDispatcher.runReducer(requestId, dispatchableSubPlan, timeoutMs, executionStatsAggregatorMap, true, - _mailboxService); + QueryDispatcher.runReducer(requestId, dispatchableSubPlan, timeoutMs, Collections.emptyMap(), + executionStatsAggregatorMap, _mailboxService); return resultTable.getRows(); } diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java index 38ab949875..e79f46e671 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java @@ -18,7 +18,7 @@ */ package org.apache.pinot.query.runtime.executor; -import com.google.common.collect.ImmutableList; +import java.util.Collections; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -68,8 +68,9 @@ public class OpChainSchedulerServiceTest { private OpChain getChain(MultiStageOperator operator) { VirtualServerAddress address = new VirtualServerAddress("localhost", 1234, 1); - OpChainExecutionContext context = new OpChainExecutionContext(null, 123L, 1, address, 0, null, null, true); - return new OpChain(context, operator, ImmutableList.of()); + OpChainExecutionContext context = + new OpChainExecutionContext(null, 123L, 1, address, Long.MAX_VALUE, Collections.emptyMap(), null, null); + return new OpChain(context, operator); } @Test @@ -131,8 +132,8 @@ public class OpChainSchedulerServiceTest { OpChainSchedulerService schedulerService = new OpChainSchedulerService(_executor); CountDownLatch latch = new CountDownLatch(1); - Mockito.when(_operatorA.nextBlock()).thenReturn( - TransferableBlockUtils.getErrorTransferableBlock(new RuntimeException("foo"))); + Mockito.when(_operatorA.nextBlock()) + .thenReturn(TransferableBlockUtils.getErrorTransferableBlock(new RuntimeException("foo"))); Mockito.doAnswer(inv -> { latch.countDown(); return null; diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/LiteralValueOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/LiteralValueOperatorTest.java index 83dea7c82d..4913527d29 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/LiteralValueOperatorTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/LiteralValueOperatorTest.java @@ -25,7 +25,6 @@ import org.apache.pinot.common.utils.DataSchema.ColumnDataType; import org.apache.pinot.query.planner.logical.RexExpression; import org.apache.pinot.query.routing.VirtualServerAddress; import org.apache.pinot.query.runtime.blocks.TransferableBlock; -import org.apache.pinot.query.runtime.plan.PhysicalPlanContext; import org.apache.pinot.spi.data.FieldSpec.DataType; import org.mockito.Mock; import org.mockito.Mockito; @@ -40,9 +39,6 @@ public class LiteralValueOperatorTest { private AutoCloseable _mocks; - @Mock - private PhysicalPlanContext _context; - @Mock private VirtualServerAddress _serverAddress; diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java index 76be227fcd..e9c8661ff9 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java @@ -105,12 +105,12 @@ public class MailboxReceiveOperatorTest { @Test(expectedExceptions = IllegalStateException.class, expectedExceptionsMessageRegExp = ".*RANGE_DISTRIBUTED.*") public void shouldThrowRangeDistributionNotSupported() { OpChainExecutionContext context = - OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS, Long.MAX_VALUE, null); + OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadata1); //noinspection resource new MailboxReceiveOperator(context, RelDistribution.Type.RANGE_DISTRIBUTED, 1); } - @Test(enabled = true) + @Test public void shouldTimeout() throws InterruptedException { when(_mailboxService.getReceivingMailbox(eq(MAILBOX_ID_1))).thenReturn(_mailbox1); diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java index 85893cb6b5..c01447d7cd 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java @@ -189,8 +189,8 @@ public class MailboxSendOperatorTest { StageMetadata stageMetadata = new StageMetadata.Builder().setWorkerMetadataList( Collections.singletonList(new WorkerMetadata.Builder().setVirtualServerAddress(_server).build())).build(); OpChainExecutionContext context = - new OpChainExecutionContext(_mailboxService, 0, SENDER_STAGE_ID, _server, Long.MAX_VALUE, stageMetadata, null, - false); + new OpChainExecutionContext(_mailboxService, 0, SENDER_STAGE_ID, _server, Long.MAX_VALUE, + Collections.emptyMap(), stageMetadata, null); return new MailboxSendOperator(context, _sourceOperator, _exchange, null, null, false); } } diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OpChainTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OpChainTest.java index 3bd295fe4c..cab25f9127 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OpChainTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OpChainTest.java @@ -50,6 +50,7 @@ import org.apache.pinot.query.runtime.operator.exchange.BlockExchange; import org.apache.pinot.query.runtime.plan.OpChainExecutionContext; import org.apache.pinot.query.runtime.plan.StageMetadata; import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.utils.CommonConstants; import org.mockito.Mock; import org.mockito.MockitoAnnotations; import org.testng.annotations.AfterMethod; @@ -132,7 +133,7 @@ public class OpChainTest { Thread.sleep(100); return TransferableBlockUtils.getEndOfStreamTransferableBlock(); }); - OpChain opChain = new OpChain(OperatorTestUtil.getDefaultContext(), _sourceOperator, new ArrayList<>()); + OpChain opChain = new OpChain(OperatorTestUtil.getDefaultContext(), _sourceOperator); opChain.getStats().executing(); opChain.getRoot().nextBlock(); opChain.getStats().queued(); @@ -142,7 +143,7 @@ public class OpChainTest { Thread.sleep(20); return TransferableBlockUtils.getEndOfStreamTransferableBlock(); }); - opChain = new OpChain(OperatorTestUtil.getDefaultContext(), _sourceOperator, new ArrayList<>()); + opChain = new OpChain(OperatorTestUtil.getDefaultContext(), _sourceOperator); opChain.getStats().executing(); opChain.getRoot().nextBlock(); opChain.getStats().queued(); @@ -155,7 +156,7 @@ public class OpChainTest { OpChainExecutionContext context = OperatorTestUtil.getDefaultContext(); DummyMultiStageOperator dummyMultiStageOperator = new DummyMultiStageOperator(context); - OpChain opChain = new OpChain(context, dummyMultiStageOperator, new ArrayList<>()); + OpChain opChain = new OpChain(context, dummyMultiStageOperator); opChain.getStats().executing(); opChain.getRoot().nextBlock(); opChain.getStats().queued(); @@ -168,8 +169,8 @@ public class OpChainTest { opChain.getStats().getOperatorStatsMap().get(dummyMultiStageOperator.getOperatorId()).getExecutionStats(); long time = Long.parseLong(executionStats.get(DataTable.MetadataKey.OPERATOR_EXECUTION_TIME_MS.getName())); - assertTrue(time >= 1000 && time <= 2000, "Expected " + DataTable.MetadataKey.OPERATOR_EXECUTION_TIME_MS - + " to be in [1000, 2000] but found " + time); + assertTrue(time >= 1000 && time <= 2000, + "Expected " + DataTable.MetadataKey.OPERATOR_EXECUTION_TIME_MS + " to be in [1000, 2000] but found " + time); } @Test @@ -177,7 +178,7 @@ public class OpChainTest { OpChainExecutionContext context = OperatorTestUtil.getDefaultContextWithTracingDisabled(); DummyMultiStageOperator dummyMultiStageOperator = new DummyMultiStageOperator(context); - OpChain opChain = new OpChain(context, dummyMultiStageOperator, new ArrayList<>()); + OpChain opChain = new OpChain(context, dummyMultiStageOperator); opChain.getStats().executing(); opChain.getRoot().nextBlock(); opChain.getStats().queued(); @@ -192,13 +193,14 @@ public class OpChainTest { int receivedStageId = 2; int senderStageId = 1; - OpChainExecutionContext context = new OpChainExecutionContext(_mailboxService1, 1, senderStageId, _serverAddress, - System.currentTimeMillis() + 1000, _receivingStageMetadata, null, true); + OpChainExecutionContext context = + new OpChainExecutionContext(_mailboxService1, 1, senderStageId, _serverAddress, Long.MAX_VALUE, + Collections.singletonMap(CommonConstants.Broker.Request.TRACE, "true"), _receivingStageMetadata, null); Stack<MultiStageOperator> operators = getFullOpchain(receivedStageId, senderStageId, context, dummyOperatorWaitTime); - OpChain opChain = new OpChain(context, operators.peek(), new ArrayList<>()); + OpChain opChain = new OpChain(context, operators.peek()); opChain.getStats().executing(); while (!opChain.getRoot().nextBlock().isEndOfStreamBlock()) { // Drain the opchain @@ -206,8 +208,8 @@ public class OpChainTest { opChain.getStats().queued(); OpChainExecutionContext secondStageContext = - new OpChainExecutionContext(_mailboxService2, 1, senderStageId + 1, _serverAddress, - System.currentTimeMillis() + 1000, _receivingStageMetadata, null, true); + new OpChainExecutionContext(_mailboxService2, 1, senderStageId + 1, _serverAddress, Long.MAX_VALUE, + Collections.singletonMap(CommonConstants.Broker.Request.TRACE, "true"), _receivingStageMetadata, null); MailboxReceiveOperator secondStageReceiveOp = new MailboxReceiveOperator(secondStageContext, RelDistribution.Type.BROADCAST_DISTRIBUTED, senderStageId + 1); @@ -231,20 +233,21 @@ public class OpChainTest { int receivedStageId = 2; int senderStageId = 1; - OpChainExecutionContext context = new OpChainExecutionContext(_mailboxService1, 1, senderStageId, _serverAddress, - System.currentTimeMillis() + 1000, _receivingStageMetadata, null, false); + OpChainExecutionContext context = + new OpChainExecutionContext(_mailboxService1, 1, senderStageId, _serverAddress, Long.MAX_VALUE, + Collections.emptyMap(), _receivingStageMetadata, null); Stack<MultiStageOperator> operators = getFullOpchain(receivedStageId, senderStageId, context, dummyOperatorWaitTime); - OpChain opChain = new OpChain(context, operators.peek(), new ArrayList<>()); + OpChain opChain = new OpChain(context, operators.peek()); opChain.getStats().executing(); opChain.getRoot().nextBlock(); opChain.getStats().queued(); OpChainExecutionContext secondStageContext = - new OpChainExecutionContext(_mailboxService2, 1, senderStageId + 1, _serverAddress, - System.currentTimeMillis() + 1000, _receivingStageMetadata, null, false); + new OpChainExecutionContext(_mailboxService2, 1, senderStageId + 1, _serverAddress, Long.MAX_VALUE, + Collections.emptyMap(), _receivingStageMetadata, null); MailboxReceiveOperator secondStageReceiveOp = new MailboxReceiveOperator(secondStageContext, RelDistribution.Type.BROADCAST_DISTRIBUTED, senderStageId); diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java index 55878658f7..5f139e5545 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java @@ -20,6 +20,7 @@ package org.apache.pinot.query.runtime.operator; import com.google.common.collect.ImmutableMap; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Map; import org.apache.pinot.common.datablock.DataBlock; @@ -31,6 +32,7 @@ import org.apache.pinot.query.runtime.operator.utils.OperatorUtils; import org.apache.pinot.query.runtime.plan.OpChainExecutionContext; import org.apache.pinot.query.runtime.plan.StageMetadata; import org.apache.pinot.query.testutils.MockDataBlockOperatorFactory; +import org.apache.pinot.spi.utils.CommonConstants; public class OperatorTestUtil { @@ -75,22 +77,19 @@ public class OperatorTestUtil { public static OpChainExecutionContext getOpChainContext(MailboxService mailboxService, VirtualServerAddress receiverAddress, long deadlineMs, StageMetadata stageMetadata) { - return new OpChainExecutionContext(mailboxService, 0, 0, receiverAddress, deadlineMs, stageMetadata, null, false); + return new OpChainExecutionContext(mailboxService, 0, 0, receiverAddress, deadlineMs, Collections.emptyMap(), + stageMetadata, null); } public static OpChainExecutionContext getDefaultContext() { VirtualServerAddress virtualServerAddress = new VirtualServerAddress("mock", 80, 0); - return new OpChainExecutionContext(null, 1, 2, virtualServerAddress, Long.MAX_VALUE, null, null, true); + return new OpChainExecutionContext(null, 1, 2, virtualServerAddress, Long.MAX_VALUE, + Collections.singletonMap(CommonConstants.Broker.Request.TRACE, "true"), null, null); } public static OpChainExecutionContext getDefaultContextWithTracingDisabled() { VirtualServerAddress virtualServerAddress = new VirtualServerAddress("mock", 80, 0); - return new OpChainExecutionContext(null, 1, 2, virtualServerAddress, Long.MAX_VALUE, null, null, false); - } - - public static OpChainExecutionContext getContext(long requestId, int stageId, - VirtualServerAddress virtualServerAddress) { - return new OpChainExecutionContext(null, requestId, stageId, virtualServerAddress, Long.MAX_VALUE, null, null, - true); + return new OpChainExecutionContext(null, 1, 2, virtualServerAddress, Long.MAX_VALUE, Collections.emptyMap(), null, + null); } } diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperatorTest.java index 38940e0892..0c6e60561d 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperatorTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortedMailboxReceiveOperatorTest.java @@ -115,7 +115,7 @@ public class SortedMailboxReceiveOperatorTest { @Test(expectedExceptions = IllegalStateException.class, expectedExceptionsMessageRegExp = ".*RANGE_DISTRIBUTED.*") public void shouldThrowRangeDistributionNotSupported() { OpChainExecutionContext context = - OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS, Long.MAX_VALUE, null); + OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadata1); //noinspection resource new SortedMailboxReceiveOperator(context, RelDistribution.Type.RANGE_DISTRIBUTED, DATA_SCHEMA, COLLATION_KEYS, COLLATION_DIRECTIONS, COLLATION_NULL_DIRECTIONS, false, 1); @@ -125,8 +125,7 @@ public class SortedMailboxReceiveOperatorTest { public void shouldThrowOnEmptyCollationKey() { when(_mailboxService.getReceivingMailbox(eq(MAILBOX_ID_1))).thenReturn(_mailbox1); OpChainExecutionContext context = - OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS, System.currentTimeMillis() + 10L, - _stageMetadata1); + OperatorTestUtil.getOpChainContext(_mailboxService, RECEIVER_ADDRESS, Long.MAX_VALUE, _stageMetadata1); //noinspection resource new SortedMailboxReceiveOperator(context, RelDistribution.Type.SINGLETON, DATA_SCHEMA, Collections.emptyList(), Collections.emptyList(), Collections.emptyList(), false, 1); diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutorTest.java index b5ba82e48d..e7fa0e7db2 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutorTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutorTest.java @@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableMap; import java.util.Collections; import java.util.Iterator; import java.util.List; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.stream.Collectors; @@ -131,7 +132,7 @@ public class PipelineBreakerExecutorTest { PipelineBreakerResult pipelineBreakerResult = PipelineBreakerExecutor.executePipelineBreakers(_scheduler, _mailboxService, distributedStagePlan, - System.currentTimeMillis() + 10_000L, 0, false); + Collections.emptyMap(), 0, Long.MAX_VALUE); // then // should have single PB result, receive 2 data blocks, EOS block shouldn't be included @@ -172,7 +173,7 @@ public class PipelineBreakerExecutorTest { PipelineBreakerResult pipelineBreakerResult = PipelineBreakerExecutor.executePipelineBreakers(_scheduler, _mailboxService, distributedStagePlan, - System.currentTimeMillis() + 10_000L, 0, false); + Collections.emptyMap(), 0, Long.MAX_VALUE); // then // should have two PB result, receive 2 data blocks, one each, EOS block shouldn't be included @@ -200,7 +201,7 @@ public class PipelineBreakerExecutorTest { // when PipelineBreakerResult pipelineBreakerResult = PipelineBreakerExecutor.executePipelineBreakers(_scheduler, _mailboxService, distributedStagePlan, - System.currentTimeMillis() + 10_000L, 0, false); + Collections.emptyMap(), 0, Long.MAX_VALUE); // then // should return empty block list @@ -215,23 +216,23 @@ public class PipelineBreakerExecutorTest { @Test public void shouldReturnErrorBlocksFailureWhenPBTimeout() { - MailboxReceiveNode incorrectlyConfiguredMailboxNode = + MailboxReceiveNode mailboxReceiveNode = new MailboxReceiveNode(0, DATA_SCHEMA, 1, RelDistribution.Type.SINGLETON, PinotRelExchangeType.PIPELINE_BREAKER, null, null, false, false, null); DistributedStagePlan distributedStagePlan = - new DistributedStagePlan(0, RECEIVER_ADDRESS, incorrectlyConfiguredMailboxNode, _stageMetadata1); + new DistributedStagePlan(0, RECEIVER_ADDRESS, mailboxReceiveNode, _stageMetadata1); // when when(_mailboxService.getReceivingMailbox(MAILBOX_ID_1)).thenReturn(_mailbox1); - Object[] row1 = new Object[]{1, 1}; - Object[] row2 = new Object[]{2, 3}; - when(_mailbox1.poll()).thenReturn(OperatorTestUtil.block(DATA_SCHEMA, row1), - OperatorTestUtil.block(DATA_SCHEMA, row2), - TransferableBlockUtils.getEndOfStreamTransferableBlock()); + CountDownLatch latch = new CountDownLatch(1); + when(_mailbox1.poll()).thenAnswer(invocation -> { + latch.await(); + return TransferableBlockUtils.getEndOfStreamTransferableBlock(); + }); PipelineBreakerResult pipelineBreakerResult = PipelineBreakerExecutor.executePipelineBreakers(_scheduler, _mailboxService, distributedStagePlan, - System.currentTimeMillis() - 10_000L, 0, false); + Collections.emptyMap(), 0, System.currentTimeMillis() + 100); // then // should contain only failure error blocks @@ -239,6 +240,8 @@ public class PipelineBreakerExecutorTest { TransferableBlock errorBlock = pipelineBreakerResult.getErrorBlock(); Assert.assertNotNull(errorBlock); Assert.assertTrue(errorBlock.isErrorBlock()); + + latch.countDown(); } @Test @@ -268,7 +271,7 @@ public class PipelineBreakerExecutorTest { PipelineBreakerResult pipelineBreakerResult = PipelineBreakerExecutor.executePipelineBreakers(_scheduler, _mailboxService, distributedStagePlan, - System.currentTimeMillis() + 10_000L, 0, false); + Collections.emptyMap(), 0, Long.MAX_VALUE); // then // should pass when one PB returns result, the other returns empty. @@ -307,7 +310,7 @@ public class PipelineBreakerExecutorTest { PipelineBreakerResult pipelineBreakerResult = PipelineBreakerExecutor.executePipelineBreakers(_scheduler, _mailboxService, distributedStagePlan, - System.currentTimeMillis() + 10_000L, 0, false); + Collections.emptyMap(), 0, Long.MAX_VALUE); // then // should fail even if one of the 2 PB doesn't contain error block from sender. diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/dispatch/QueryDispatcherTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/dispatch/QueryDispatcherTest.java index 417f5b5e53..08505e6e33 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/dispatch/QueryDispatcherTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/dispatch/QueryDispatcherTest.java @@ -125,7 +125,7 @@ public class QueryDispatcherTest extends QueryTestSet { context.setRequestId(requestId); DispatchableSubPlan dispatchableSubPlan = _queryEnvironment.planQuery(sql); try { - _queryDispatcher.submitAndReduce(context, dispatchableSubPlan, 10_000L, Collections.emptyMap(), null, false); + _queryDispatcher.submitAndReduce(context, dispatchableSubPlan, 10_000L, Collections.emptyMap(), null); Assert.fail("Method call above should have failed"); } catch (Exception e) { Assert.assertTrue(e.getMessage().contains("Error dispatching query")); @@ -149,7 +149,7 @@ public class QueryDispatcherTest extends QueryTestSet { DispatchableSubPlan dispatchableSubPlan = _queryEnvironment.planQuery(sql); try { // will throw b/c mailboxService is mocked - _queryDispatcher.submitAndReduce(context, dispatchableSubPlan, 10_000L, Collections.emptyMap(), null, false); + _queryDispatcher.submitAndReduce(context, dispatchableSubPlan, 10_000L, Collections.emptyMap(), null); Assert.fail("Method call above should have failed"); } catch (NullPointerException e) { // Expected --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org