This is an automated email from the ASF dual-hosted git repository. rongr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 714ea4ed64 [multistage] pipeline breaker framework and dynamic-broadcast pipeline breaker (#10779) 714ea4ed64 is described below commit 714ea4ed645407d0e3b9509bc702fa5967b0046a Author: Rong Rong <ro...@apache.org> AuthorDate: Wed May 31 22:16:35 2023 -0700 [multistage] pipeline breaker framework and dynamic-broadcast pipeline breaker (#10779) Pipeline breaker --- 1. Introduce `PipelineBreaker` package - `PipelineBreaker` runs before the actual `OpChain` of a stage (PlanFragment + Stage/Worker metadata), it would compute/store all pending "sub-plan" results before the actual `OpChain` of a stage is **CONSTRUCTED** - `PipelineBreaker` is not another `OpChain` the actual computation happens elsewhere. - currently `PipelineBreaker` only supports MailboxReceive 2. Introduce compilation support for accepting data result placeholder in `PhysicalPlanVisitor` (and `ServerPlanRequestVisitor`) - this way `OpChain` will be generated with the results of the PipelineBreaker as contextual input. 3. Also cleaned up the scheduler - no need for 2 identical schedulers as they are all cached (leaf vs. intermediate) - pipeline executor is now a static method, so it is easily plugged into different executor services Dynamic broadcast SEMI JOIN --- 1. Created dynamic broadcast runtime based on #10772 2. Modified physical plan visitor to support semi join using dynamic broadcast --------- Co-authored-by: Rong Rong <ro...@startree.ai> --- .../apache/pinot/query/mailbox/MailboxIdUtils.java | 8 +- .../apache/pinot/query/runtime/QueryRunner.java | 114 +++++---- .../pinot/query/runtime/operator/OpChainId.java | 4 + .../query/runtime/plan/DistributedStagePlan.java | 8 + .../runtime/plan/OpChainExecutionContext.java | 8 +- ...equestContext.java => PhysicalPlanContext.java} | 14 +- .../query/runtime/plan/PhysicalPlanVisitor.java | 30 +-- .../plan/pipeline/PipelineBreakerContext.java | 62 +++++ .../plan/pipeline/PipelineBreakerExecutor.java | 101 ++++++++ .../plan/pipeline/PipelineBreakerOperator.java | 118 +++++++++ .../plan/pipeline/PipelineBreakerResult.java | 43 ++++ .../plan/pipeline/PipelineBreakerVisitor.java | 50 ++++ .../plan/server/ServerPlanRequestContext.java | 11 +- .../ServerPlanRequestUtils.java} | 264 ++++++++++----------- .../plan/server/ServerPlanRequestVisitor.java | 191 +++++++++++++++ .../apache/pinot/query/QueryServerEnclosure.java | 15 +- .../pinot/query/runtime/QueryRunnerTest.java | 3 + .../runtime/operator/LiteralValueOperatorTest.java | 4 +- .../pinot/query/service/QueryServerTest.java | 4 - .../service/dispatch/QueryDispatcherTest.java | 3 - .../src/test/resources/queries/QueryHints.json | 20 ++ 21 files changed, 846 insertions(+), 229 deletions(-) diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxIdUtils.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxIdUtils.java index f65de98118..02944c665a 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxIdUtils.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxIdUtils.java @@ -45,8 +45,12 @@ public class MailboxIdUtils { return new OpChainId(Long.parseLong(parts[0]), Integer.parseInt(parts[4]), Integer.parseInt(parts[3])); } - public static List<String> toMailboxIds(long requestId, MailboxMetadata senderMailBoxMetadatas) { - return senderMailBoxMetadatas.getMailBoxIdList().stream() + public static List<String> toMailboxIds(long requestId, MailboxMetadata senderMailBoxMetadata) { + return toMailboxIds(requestId, senderMailBoxMetadata.getMailBoxIdList()); + } + + public static List<String> toMailboxIds(long requestId, List<String> mailboxMetadataIdList) { + return mailboxMetadataIdList.stream() .map(mailboxIdFromBroker -> Long.toString(requestId) + SEPARATOR + mailboxIdFromBroker) .collect(Collectors.toList()); } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java index 94640ffae7..6fa02a216a 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java @@ -42,7 +42,6 @@ import org.apache.pinot.core.query.scheduler.resources.ResourceManager; import org.apache.pinot.query.mailbox.MailboxService; import org.apache.pinot.query.planner.plannode.MailboxSendNode; import org.apache.pinot.query.planner.plannode.PlanNode; -import org.apache.pinot.query.routing.VirtualServerAddress; import org.apache.pinot.query.routing.WorkerMetadata; import org.apache.pinot.query.runtime.executor.OpChainSchedulerService; import org.apache.pinot.query.runtime.executor.RoundRobinScheduler; @@ -52,11 +51,13 @@ import org.apache.pinot.query.runtime.operator.MultiStageOperator; import org.apache.pinot.query.runtime.operator.OpChain; import org.apache.pinot.query.runtime.plan.DistributedStagePlan; import org.apache.pinot.query.runtime.plan.OpChainExecutionContext; +import org.apache.pinot.query.runtime.plan.PhysicalPlanContext; import org.apache.pinot.query.runtime.plan.PhysicalPlanVisitor; -import org.apache.pinot.query.runtime.plan.PlanRequestContext; -import org.apache.pinot.query.runtime.plan.ServerRequestPlanVisitor; import org.apache.pinot.query.runtime.plan.StageMetadata; +import org.apache.pinot.query.runtime.plan.pipeline.PipelineBreakerExecutor; +import org.apache.pinot.query.runtime.plan.pipeline.PipelineBreakerResult; import org.apache.pinot.query.runtime.plan.server.ServerPlanRequestContext; +import org.apache.pinot.query.runtime.plan.server.ServerPlanRequestUtils; import org.apache.pinot.query.service.QueryConfig; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; @@ -83,14 +84,11 @@ public class QueryRunner { private MailboxService _mailboxService; private String _hostname; private int _port; - private VirtualServerAddress _rootServer; private ExecutorService _queryWorkerIntermExecutorService; private ExecutorService _queryWorkerLeafExecutorService; - private ExecutorService _queryRunnerExecutorService; - private OpChainSchedulerService _intermScheduler; - private OpChainSchedulerService _leafScheduler; + private OpChainSchedulerService _scheduler; /** * Initializes the query executor. @@ -102,8 +100,6 @@ public class QueryRunner { _hostname = instanceName.startsWith(CommonConstants.Helix.PREFIX_OF_SERVER_INSTANCE) ? instanceName.substring( CommonConstants.Helix.SERVER_INSTANCE_PREFIX_LENGTH) : instanceName; _port = config.getProperty(QueryConfig.KEY_OF_QUERY_RUNNER_PORT, QueryConfig.DEFAULT_QUERY_RUNNER_PORT); - // always use 0 for root server ID as all data is processed by one node at the global root - _rootServer = new VirtualServerAddress(_hostname, _port, 0); _helixManager = helixManager; try { long releaseMs = config.getProperty(QueryConfig.KEY_OF_SCHEDULER_RELEASE_TIMEOUT_MS, @@ -112,12 +108,9 @@ public class QueryRunner { new NamedThreadFactory("query_intermediate_worker_on_" + _port + "_port")); _queryWorkerLeafExecutorService = Executors.newFixedThreadPool(ResourceManager.DEFAULT_QUERY_WORKER_THREADS, new NamedThreadFactory("query_leaf_worker_on_" + _port + "_port")); - _queryRunnerExecutorService = Executors.newCachedThreadPool( - new NamedThreadFactory("query_runner_on_" + _port + "_port")); - _intermScheduler = new OpChainSchedulerService(new RoundRobinScheduler(releaseMs), + _scheduler = new OpChainSchedulerService(new RoundRobinScheduler(releaseMs), getQueryWorkerIntermExecutorService()); - _leafScheduler = new OpChainSchedulerService(new RoundRobinScheduler(releaseMs), getQueryRunnerExecutorService()); - _mailboxService = new MailboxService(_hostname, _port, config, _intermScheduler::onDataAvailable); + _mailboxService = new MailboxService(_hostname, _port, config, _scheduler::onDataAvailable); _serverExecutor = new ServerQueryExecutorV1Impl(); _serverExecutor.init(config.subset(PINOT_V1_SERVER_QUERY_CONFIG_PREFIX), instanceDataManager, serverMetrics); } catch (Exception e) { @@ -130,39 +123,70 @@ public class QueryRunner { _helixPropertyStore = _helixManager.getHelixPropertyStore(); _mailboxService.start(); _serverExecutor.start(); - _leafScheduler.startAsync().awaitRunning(30, TimeUnit.SECONDS); - _intermScheduler.startAsync().awaitRunning(30, TimeUnit.SECONDS); + _scheduler.startAsync().awaitRunning(30, TimeUnit.SECONDS); } public void shutDown() throws TimeoutException { _serverExecutor.shutDown(); _mailboxService.shutdown(); - _leafScheduler.stopAsync().awaitTerminated(30, TimeUnit.SECONDS); - _intermScheduler.stopAsync().awaitTerminated(30, TimeUnit.SECONDS); + _scheduler.stopAsync().awaitTerminated(30, TimeUnit.SECONDS); } - public void processQuery(DistributedStagePlan distributedStagePlan, Map<String, String> requestMetadataMap) { + /** + * Execute a {@link DistributedStagePlan}. + * + * <p>This execution entry point should be asynchronously called by the request handler and caller should not wait + * for results/exceptions.</p> + */ + public void processQuery(DistributedStagePlan distributedStagePlan, Map<String, String> requestMetadataMap) + throws Exception { long requestId = Long.parseLong(requestMetadataMap.get(QueryConfig.KEY_OF_BROKER_REQUEST_ID)); long timeoutMs = Long.parseLong(requestMetadataMap.get(QueryConfig.KEY_OF_BROKER_REQUEST_TIMEOUT_MS)); boolean isTraceEnabled = Boolean.parseBoolean(requestMetadataMap.getOrDefault(CommonConstants.Broker.Request.TRACE, "false")); long deadlineMs = System.currentTimeMillis() + timeoutMs; - if (isLeafStage(distributedStagePlan)) { - OpChain rootOperator = compileLeafStage(distributedStagePlan, requestMetadataMap, timeoutMs, deadlineMs, - requestId); - _leafScheduler.register(rootOperator); + + // run pre-stage execution for all pipeline breakers + PipelineBreakerResult pipelineBreakerResult; + try { + pipelineBreakerResult = PipelineBreakerExecutor.executePipelineBreakers(_scheduler, _mailboxService, + distributedStagePlan, timeoutMs, deadlineMs, requestId, isTraceEnabled); + } catch (Exception e) { + LOGGER.error("Error executing pre-stage pipeline breaker for: {}:{}", requestId, + distributedStagePlan.getStageId(), e); + _scheduler.cancel(requestId); + throw e; + } + + // run OpChain + if (DistributedStagePlan.isLeafStage(distributedStagePlan)) { + try { + OpChain rootOperator = compileLeafStage(distributedStagePlan, requestMetadataMap, pipelineBreakerResult, + timeoutMs, deadlineMs, requestId); + _scheduler.register(rootOperator); + } catch (Exception e) { + LOGGER.error("Error executing leaf stage for: {}:{}", requestId, distributedStagePlan.getStageId(), e); + _scheduler.cancel(requestId); + throw e; + } } else { - PlanNode stageRoot = distributedStagePlan.getStageRoot(); - OpChain rootOperator = PhysicalPlanVisitor.build(stageRoot, - new PlanRequestContext(_mailboxService, requestId, stageRoot.getPlanFragmentId(), timeoutMs, deadlineMs, - distributedStagePlan.getServer(), distributedStagePlan.getStageMetadata(), isTraceEnabled)); - _intermScheduler.register(rootOperator); + try { + PlanNode stageRoot = distributedStagePlan.getStageRoot(); + OpChain rootOperator = PhysicalPlanVisitor.walkPlanNode(stageRoot, + new PhysicalPlanContext(_mailboxService, requestId, stageRoot.getPlanFragmentId(), timeoutMs, deadlineMs, + distributedStagePlan.getServer(), distributedStagePlan.getStageMetadata(), null, isTraceEnabled)); + _scheduler.register(rootOperator); + } catch (Exception e) { + LOGGER.error("Error executing intermediate stage for: {}:{}", requestId, distributedStagePlan.getStageId(), e); + _scheduler.cancel(requestId); + throw e; + } } } public void cancel(long requestId) { - _intermScheduler.cancel(requestId); + _scheduler.cancel(requestId); } @VisibleForTesting @@ -175,17 +199,13 @@ public class QueryRunner { return _queryWorkerIntermExecutorService; } - public ExecutorService getQueryRunnerExecutorService() { - return _queryRunnerExecutorService; - } - private OpChain compileLeafStage(DistributedStagePlan distributedStagePlan, Map<String, String> requestMetadataMap, - long timeoutMs, long deadlineMs, long requestId) { + PipelineBreakerResult pipelineBreakerResult, long timeoutMs, long deadlineMs, long requestId) { boolean isTraceEnabled = Boolean.parseBoolean(requestMetadataMap.getOrDefault(CommonConstants.Broker.Request.TRACE, "false")); List<ServerPlanRequestContext> serverPlanRequestContexts = - constructServerQueryRequests(distributedStagePlan, requestMetadataMap, _helixPropertyStore, _mailboxService, - deadlineMs); + constructServerQueryRequests(distributedStagePlan, requestMetadataMap, pipelineBreakerResult, + _helixPropertyStore, _mailboxService, deadlineMs); List<ServerQueryRequest> serverQueryRequests = new ArrayList<>(serverPlanRequestContexts.size()); for (ServerPlanRequestContext requestContext : serverPlanRequestContexts) { serverQueryRequests.add(new ServerQueryRequest(requestContext.getInstanceRequest(), @@ -207,8 +227,8 @@ public class QueryRunner { } private static List<ServerPlanRequestContext> constructServerQueryRequests(DistributedStagePlan distributedStagePlan, - Map<String, String> requestMetadataMap, ZkHelixPropertyStore<ZNRecord> helixPropertyStore, - MailboxService mailboxService, long deadlineMs) { + Map<String, String> requestMetadataMap, PipelineBreakerResult pipelineBreakerResult, + ZkHelixPropertyStore<ZNRecord> helixPropertyStore, MailboxService mailboxService, long deadlineMs) { StageMetadata stageMetadata = distributedStagePlan.getStageMetadata(); WorkerMetadata workerMetadata = distributedStagePlan.getCurrentWorkerMetadata(); String rawTableName = StageMetadata.getTableName(stageMetadata); @@ -224,17 +244,17 @@ public class QueryRunner { TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(rawTableName)); Schema schema = ZKMetadataProvider.getTableSchema(helixPropertyStore, TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(rawTableName)); - requests.add(ServerRequestPlanVisitor.build(mailboxService, distributedStagePlan, requestMetadataMap, - tableConfig, schema, StageMetadata.getTimeBoundary(stageMetadata), TableType.OFFLINE, - tableEntry.getValue(), deadlineMs)); + requests.add(ServerPlanRequestUtils.build(mailboxService, distributedStagePlan, requestMetadataMap, + pipelineBreakerResult, tableConfig, schema, StageMetadata.getTimeBoundary(stageMetadata), + TableType.OFFLINE, tableEntry.getValue(), deadlineMs)); } else if (TableType.REALTIME.name().equals(tableType)) { TableConfig tableConfig = ZKMetadataProvider.getTableConfig(helixPropertyStore, TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(rawTableName)); Schema schema = ZKMetadataProvider.getTableSchema(helixPropertyStore, TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(rawTableName)); - requests.add(ServerRequestPlanVisitor.build(mailboxService, distributedStagePlan, requestMetadataMap, - tableConfig, schema, StageMetadata.getTimeBoundary(stageMetadata), TableType.REALTIME, - tableEntry.getValue(), deadlineMs)); + requests.add(ServerPlanRequestUtils.build(mailboxService, distributedStagePlan, requestMetadataMap, + pipelineBreakerResult, tableConfig, schema, StageMetadata.getTimeBoundary(stageMetadata), + TableType.REALTIME, tableEntry.getValue(), deadlineMs)); } else { throw new IllegalArgumentException("Unsupported table type key: " + tableType); } @@ -254,10 +274,4 @@ public class QueryRunner { } return result; } - - private boolean isLeafStage(DistributedStagePlan distributedStagePlan) { - WorkerMetadata workerMetadata = distributedStagePlan.getCurrentWorkerMetadata(); - Map<String, List<String>> segments = WorkerMetadata.getTableSegmentsMap(workerMetadata); - return segments != null && segments.size() > 0; - } } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChainId.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChainId.java index 50589cfca5..dae9c2f755 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChainId.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChainId.java @@ -36,6 +36,10 @@ public class OpChainId { return _requestId; } + public int getVirtualServerId() { + return _virtualServerId; + } + @Override public String toString() { return String.format("%s_%s_%s", _requestId, _virtualServerId, _stageId); diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/DistributedStagePlan.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/DistributedStagePlan.java index f8a5f5118b..2aa269e6aa 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/DistributedStagePlan.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/DistributedStagePlan.java @@ -18,6 +18,8 @@ */ package org.apache.pinot.query.runtime.plan; +import java.util.List; +import java.util.Map; import org.apache.pinot.query.planner.plannode.PlanNode; import org.apache.pinot.query.routing.VirtualServerAddress; import org.apache.pinot.query.routing.WorkerMetadata; @@ -78,4 +80,10 @@ public class DistributedStagePlan { public WorkerMetadata getCurrentWorkerMetadata() { return _stageMetadata.getWorkerMetadataList().get(_server.workerId()); } + + public static boolean isLeafStage(DistributedStagePlan distributedStagePlan) { + WorkerMetadata workerMetadata = distributedStagePlan.getCurrentWorkerMetadata(); + Map<String, List<String>> segments = WorkerMetadata.getTableSegmentsMap(workerMetadata); + return segments != null && segments.size() > 0; + } } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java index e16795891b..5da20c4e70 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java @@ -57,10 +57,10 @@ public class OpChainExecutionContext { _traceEnabled = traceEnabled; } - public OpChainExecutionContext(PlanRequestContext planRequestContext) { - this(planRequestContext.getMailboxService(), planRequestContext.getRequestId(), planRequestContext.getStageId(), - planRequestContext.getServer(), planRequestContext.getTimeoutMs(), planRequestContext.getDeadlineMs(), - planRequestContext.getStageMetadata(), planRequestContext.isTraceEnabled()); + public OpChainExecutionContext(PhysicalPlanContext physicalPlanContext) { + this(physicalPlanContext.getMailboxService(), physicalPlanContext.getRequestId(), physicalPlanContext.getStageId(), + physicalPlanContext.getServer(), physicalPlanContext.getTimeoutMs(), physicalPlanContext.getDeadlineMs(), + physicalPlanContext.getStageMetadata(), physicalPlanContext.isTraceEnabled()); } public MailboxService getMailboxService() { diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PlanRequestContext.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanContext.java similarity index 82% rename from pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PlanRequestContext.java rename to pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanContext.java index d0233c1578..2da5772930 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PlanRequestContext.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanContext.java @@ -22,9 +22,10 @@ import java.util.ArrayList; import java.util.List; import org.apache.pinot.query.mailbox.MailboxService; import org.apache.pinot.query.routing.VirtualServerAddress; +import org.apache.pinot.query.runtime.plan.pipeline.PipelineBreakerResult; -public class PlanRequestContext { +public class PhysicalPlanContext { protected final MailboxService _mailboxService; protected final long _requestId; protected final int _stageId; @@ -33,12 +34,14 @@ public class PlanRequestContext { private final long _deadlineMs; protected final VirtualServerAddress _server; protected final StageMetadata _stageMetadata; + protected final PipelineBreakerResult _pipelineBreakerResult; protected final List<String> _receivingMailboxIds = new ArrayList<>(); private final OpChainExecutionContext _opChainExecutionContext; private final boolean _traceEnabled; - public PlanRequestContext(MailboxService mailboxService, long requestId, int stageId, long timeoutMs, long deadlineMs, - VirtualServerAddress server, StageMetadata stageMetadata, boolean traceEnabled) { + public PhysicalPlanContext(MailboxService mailboxService, long requestId, int stageId, long timeoutMs, + long deadlineMs, VirtualServerAddress server, StageMetadata stageMetadata, + PipelineBreakerResult pipelineBreakerResult, boolean traceEnabled) { _mailboxService = mailboxService; _requestId = requestId; _stageId = stageId; @@ -46,6 +49,7 @@ public class PlanRequestContext { _deadlineMs = deadlineMs; _server = server; _stageMetadata = stageMetadata; + _pipelineBreakerResult = pipelineBreakerResult; _traceEnabled = traceEnabled; _opChainExecutionContext = new OpChainExecutionContext(this); } @@ -74,6 +78,10 @@ public class PlanRequestContext { return _stageMetadata; } + public PipelineBreakerResult getPipelineBreakerResult() { + return _pipelineBreakerResult; + } + public MailboxService getMailboxService() { return _mailboxService; } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java index 8b98fa517a..68fcd82f67 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java @@ -56,19 +56,19 @@ import org.apache.pinot.query.runtime.operator.WindowAggregateOperator; * this works only for the intermediate stage nodes, leaf stage nodes are expected to compile into * v1 operators at this point in time. * - * <p>This class should be used statically via {@link #build(PlanNode, PlanRequestContext)} + * <p>This class should be used statically via {@link #walkPlanNode(PlanNode, PhysicalPlanContext)} */ -public class PhysicalPlanVisitor implements PlanNodeVisitor<MultiStageOperator, PlanRequestContext> { +public class PhysicalPlanVisitor implements PlanNodeVisitor<MultiStageOperator, PhysicalPlanContext> { private static final PhysicalPlanVisitor INSTANCE = new PhysicalPlanVisitor(); - public static OpChain build(PlanNode node, PlanRequestContext context) { + public static OpChain walkPlanNode(PlanNode node, PhysicalPlanContext context) { MultiStageOperator root = node.visit(INSTANCE, context); return new OpChain(context.getOpChainExecutionContext(), root, context.getReceivingMailboxIds()); } @Override - public MultiStageOperator visitMailboxReceive(MailboxReceiveNode node, PlanRequestContext context) { + public MultiStageOperator visitMailboxReceive(MailboxReceiveNode node, PhysicalPlanContext context) { if (node.isSortOnReceiver()) { SortedMailboxReceiveOperator sortedMailboxReceiveOperator = new SortedMailboxReceiveOperator(context.getOpChainExecutionContext(), node.getExchangeType(), @@ -86,7 +86,7 @@ public class PhysicalPlanVisitor implements PlanNodeVisitor<MultiStageOperator, } @Override - public MultiStageOperator visitMailboxSend(MailboxSendNode node, PlanRequestContext context) { + public MultiStageOperator visitMailboxSend(MailboxSendNode node, PhysicalPlanContext context) { MultiStageOperator nextOperator = node.getInputs().get(0).visit(this, context); return new MailboxSendOperator(context.getOpChainExecutionContext(), nextOperator, node.getExchangeType(), node.getPartitionKeySelector(), node.getCollationKeys(), node.getCollationDirections(), node.isSortOnSender(), @@ -94,14 +94,14 @@ public class PhysicalPlanVisitor implements PlanNodeVisitor<MultiStageOperator, } @Override - public MultiStageOperator visitAggregate(AggregateNode node, PlanRequestContext context) { + public MultiStageOperator visitAggregate(AggregateNode node, PhysicalPlanContext context) { MultiStageOperator nextOperator = node.getInputs().get(0).visit(this, context); return new AggregateOperator(context.getOpChainExecutionContext(), nextOperator, node.getDataSchema(), node.getAggCalls(), node.getGroupSet(), node.getInputs().get(0).getDataSchema()); } @Override - public MultiStageOperator visitWindow(WindowNode node, PlanRequestContext context) { + public MultiStageOperator visitWindow(WindowNode node, PhysicalPlanContext context) { MultiStageOperator nextOperator = node.getInputs().get(0).visit(this, context); return new WindowAggregateOperator(context.getOpChainExecutionContext(), nextOperator, node.getGroupSet(), node.getOrderSet(), node.getOrderSetDirection(), node.getOrderSetNullDirection(), node.getAggCalls(), @@ -110,7 +110,7 @@ public class PhysicalPlanVisitor implements PlanNodeVisitor<MultiStageOperator, } @Override - public MultiStageOperator visitSetOp(SetOpNode setOpNode, PlanRequestContext context) { + public MultiStageOperator visitSetOp(SetOpNode setOpNode, PhysicalPlanContext context) { List<MultiStageOperator> inputs = new ArrayList<>(); for (PlanNode input : setOpNode.getInputs()) { MultiStageOperator visited = input.visit(this, context); @@ -132,19 +132,19 @@ public class PhysicalPlanVisitor implements PlanNodeVisitor<MultiStageOperator, } @Override - public MultiStageOperator visitExchange(ExchangeNode exchangeNode, PlanRequestContext context) { + public MultiStageOperator visitExchange(ExchangeNode exchangeNode, PhysicalPlanContext context) { throw new UnsupportedOperationException("ExchangeNode should not be visited"); } @Override - public MultiStageOperator visitFilter(FilterNode node, PlanRequestContext context) { + public MultiStageOperator visitFilter(FilterNode node, PhysicalPlanContext context) { MultiStageOperator nextOperator = node.getInputs().get(0).visit(this, context); return new FilterOperator(context.getOpChainExecutionContext(), nextOperator, node.getDataSchema(), node.getCondition()); } @Override - public MultiStageOperator visitJoin(JoinNode node, PlanRequestContext context) { + public MultiStageOperator visitJoin(JoinNode node, PhysicalPlanContext context) { PlanNode left = node.getInputs().get(0); PlanNode right = node.getInputs().get(1); @@ -156,14 +156,14 @@ public class PhysicalPlanVisitor implements PlanNodeVisitor<MultiStageOperator, } @Override - public MultiStageOperator visitProject(ProjectNode node, PlanRequestContext context) { + public MultiStageOperator visitProject(ProjectNode node, PhysicalPlanContext context) { MultiStageOperator nextOperator = node.getInputs().get(0).visit(this, context); return new TransformOperator(context.getOpChainExecutionContext(), nextOperator, node.getDataSchema(), node.getProjects(), node.getInputs().get(0).getDataSchema()); } @Override - public MultiStageOperator visitSort(SortNode node, PlanRequestContext context) { + public MultiStageOperator visitSort(SortNode node, PhysicalPlanContext context) { MultiStageOperator nextOperator = node.getInputs().get(0).visit(this, context); boolean isInputSorted = nextOperator instanceof SortedMailboxReceiveOperator; return new SortOperator(context.getOpChainExecutionContext(), nextOperator, node.getCollationKeys(), @@ -172,12 +172,12 @@ public class PhysicalPlanVisitor implements PlanNodeVisitor<MultiStageOperator, } @Override - public MultiStageOperator visitTableScan(TableScanNode node, PlanRequestContext context) { + public MultiStageOperator visitTableScan(TableScanNode node, PhysicalPlanContext context) { throw new UnsupportedOperationException("Stage node of type TableScanNode is not supported!"); } @Override - public MultiStageOperator visitValue(ValueNode node, PlanRequestContext context) { + public MultiStageOperator visitValue(ValueNode node, PhysicalPlanContext context) { return new LiteralValueOperator(context.getOpChainExecutionContext(), node.getDataSchema(), node.getLiteralRows()); } } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerContext.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerContext.java new file mode 100644 index 0000000000..110c8e22e5 --- /dev/null +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerContext.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.runtime.plan.pipeline; + +import java.util.HashMap; +import java.util.Map; +import org.apache.pinot.query.planner.plannode.MailboxReceiveNode; +import org.apache.pinot.query.planner.plannode.PlanNode; + + +/** + * This class used to record the pipeline breaker operator that needs to be run before the main opChain. + */ +public class PipelineBreakerContext { + private final Map<Integer, PlanNode> _pipelineBreakerMap = new HashMap<>(); + private final Map<PlanNode, Integer> _planNodeObjectToIdMap = new HashMap<>(); + + private final boolean _isLeafStage; + private int _currentNodeId = 0; + + public PipelineBreakerContext(boolean isLeafStage) { + _isLeafStage = isLeafStage; + } + + public void addPipelineBreaker(MailboxReceiveNode mailboxReceiveNode) { + int nodeId = _planNodeObjectToIdMap.get(mailboxReceiveNode); + _pipelineBreakerMap.put(nodeId, mailboxReceiveNode); + } + + public void visitedNewPlanNode(PlanNode planNode) { + _planNodeObjectToIdMap.put(planNode, _currentNodeId); + _currentNodeId++; + } + + public Map<PlanNode, Integer> getNodeIdMap() { + return _planNodeObjectToIdMap; + } + + public Map<Integer, PlanNode> getPipelineBreakerMap() { + return _pipelineBreakerMap; + } + + public boolean isLeafStage() { + return _isLeafStage; + } +} diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutor.java new file mode 100644 index 0000000000..b000d8ee74 --- /dev/null +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutor.java @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.runtime.plan.pipeline; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.pinot.core.common.Operator; +import org.apache.pinot.query.mailbox.MailboxService; +import org.apache.pinot.query.planner.plannode.MailboxReceiveNode; +import org.apache.pinot.query.planner.plannode.PlanNode; +import org.apache.pinot.query.runtime.blocks.TransferableBlock; +import org.apache.pinot.query.runtime.executor.OpChainSchedulerService; +import org.apache.pinot.query.runtime.operator.OpChain; +import org.apache.pinot.query.runtime.plan.DistributedStagePlan; +import org.apache.pinot.query.runtime.plan.PhysicalPlanContext; +import org.apache.pinot.query.runtime.plan.PhysicalPlanVisitor; + + +/** + * Utility class to run pipeline breaker execution and collects the results. + */ +public class PipelineBreakerExecutor { + private PipelineBreakerExecutor() { + // do not instantiate. + } + + /** + * Execute a pipeline breaker and collect the results (synchronously) + * + * Currently, pipeline breaker executor can only execute mailbox receive pipeline breaker. + */ + public static PipelineBreakerResult executePipelineBreakers(OpChainSchedulerService scheduler, + MailboxService mailboxService, DistributedStagePlan distributedStagePlan, long timeoutMs, long deadlineMs, + long requestId, boolean isTraceEnabled) + throws Exception { + PipelineBreakerContext pipelineBreakerContext = new PipelineBreakerContext( + DistributedStagePlan.isLeafStage(distributedStagePlan)); + PipelineBreakerVisitor.visitPlanRoot(distributedStagePlan.getStageRoot(), pipelineBreakerContext); + if (pipelineBreakerContext.getPipelineBreakerMap().size() > 0) { + PlanNode stageRoot = distributedStagePlan.getStageRoot(); + // TODO: This PlanRequestContext needs to indicate it is a pre-stage opChain and only listens to pre-stage OpChain + // receive-mail callbacks. + // see also: MailboxIdUtils TODOs, de-couple mailbox id from query information + PhysicalPlanContext physicalPlanContext = + new PhysicalPlanContext(mailboxService, requestId, stageRoot.getPlanFragmentId(), timeoutMs, deadlineMs, + distributedStagePlan.getServer(), distributedStagePlan.getStageMetadata(), null, isTraceEnabled); + Map<Integer, List<TransferableBlock>> resultMap = + PipelineBreakerExecutor.execute(scheduler, pipelineBreakerContext, physicalPlanContext); + return new PipelineBreakerResult(pipelineBreakerContext.getNodeIdMap(), resultMap); + } else { + return null; + } + } + + private static Map<Integer, List<TransferableBlock>> execute(OpChainSchedulerService scheduler, + PipelineBreakerContext context, PhysicalPlanContext physicalPlanContext) + throws Exception { + Map<Integer, Operator<TransferableBlock>> pipelineWorkerMap = new HashMap<>(); + for (Map.Entry<Integer, PlanNode> e : context.getPipelineBreakerMap().entrySet()) { + int key = e.getKey(); + PlanNode planNode = e.getValue(); + // TODO: supprot other pipeline breaker node type as well. + if (!(planNode instanceof MailboxReceiveNode)) { + throw new UnsupportedOperationException("Only MailboxReceiveNode is supported to run as pipeline breaker now"); + } + OpChain tempOpChain = PhysicalPlanVisitor.walkPlanNode(planNode, physicalPlanContext); + pipelineWorkerMap.put(key, tempOpChain.getRoot()); + } + return runMailboxReceivePipelineBreaker(scheduler, pipelineWorkerMap, physicalPlanContext); + } + + private static Map<Integer, List<TransferableBlock>> runMailboxReceivePipelineBreaker( + OpChainSchedulerService scheduler, Map<Integer, Operator<TransferableBlock>> pipelineWorkerMap, + PhysicalPlanContext physicalPlanContext) + throws Exception { + PipelineBreakerOperator pipelineBreakerOperator = new PipelineBreakerOperator( + physicalPlanContext.getOpChainExecutionContext(), pipelineWorkerMap); + OpChain pipelineBreakerOpChain = new OpChain(physicalPlanContext.getOpChainExecutionContext(), + pipelineBreakerOperator, + physicalPlanContext.getReceivingMailboxIds()); + scheduler.register(pipelineBreakerOpChain); + return pipelineBreakerOperator.getResult(); + } +} diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerOperator.java new file mode 100644 index 0000000000..61bc9c03b9 --- /dev/null +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerOperator.java @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.runtime.plan.pipeline; + +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Deque; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import javax.annotation.Nullable; +import org.apache.pinot.common.exception.QueryException; +import org.apache.pinot.core.common.Operator; +import org.apache.pinot.query.runtime.blocks.TransferableBlock; +import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils; +import org.apache.pinot.query.runtime.operator.MultiStageOperator; +import org.apache.pinot.query.runtime.plan.OpChainExecutionContext; + + +public class PipelineBreakerOperator extends MultiStageOperator { + private static final String EXPLAIN_NAME = "PIPELINE_BREAKER"; + private final Deque<Map.Entry<Integer, Operator<TransferableBlock>>> _workerEntries; + private final Map<Integer, List<TransferableBlock>> _resultMap; + private final CountDownLatch _workerDoneLatch; + private TransferableBlock _errorBlock; + + + public PipelineBreakerOperator(OpChainExecutionContext context, + Map<Integer, Operator<TransferableBlock>> pipelineWorkerMap) { + super(context); + _resultMap = new HashMap<>(); + _workerEntries = new ArrayDeque<>(); + _workerEntries.addAll(pipelineWorkerMap.entrySet()); + _workerDoneLatch = new CountDownLatch(1); + } + + public Map<Integer, List<TransferableBlock>> getResult() + throws Exception { + boolean isWorkerDone = + _workerDoneLatch.await(_context.getDeadlineMs() - System.currentTimeMillis(), TimeUnit.MILLISECONDS); + if (isWorkerDone && _errorBlock == null) { + return _resultMap; + } else { + throw new RuntimeException("Unable to construct pipeline breaker results due to timeout."); + } + } + + @Nullable + @Override + public String toExplainString() { + return EXPLAIN_NAME; + } + + @Override + protected TransferableBlock getNextBlock() { + if (System.currentTimeMillis() > _context.getDeadlineMs()) { + _errorBlock = TransferableBlockUtils.getErrorTransferableBlock(QueryException.EXECUTION_TIMEOUT_ERROR); + _workerDoneLatch.countDown(); + return _errorBlock; + } + + // Poll from every mailbox operator in round-robin fashion: + // - Return the first content block + // - If no content block found but there are mailboxes not finished, return no-op block + // - If all content blocks are already returned, return end-of-stream block + int numWorkers = _workerEntries.size(); + for (int i = 0; i < numWorkers; i++) { + Map.Entry<Integer, Operator<TransferableBlock>> worker = _workerEntries.remove(); + TransferableBlock block = worker.getValue().nextBlock(); + + // Release the mailbox when the block is end-of-stream + if (block != null && block.isSuccessfulEndOfStreamBlock()) { + continue; + } + + // Add the worker back to the queue if the block is not end-of-stream + _workerEntries.add(worker); + if (block != null) { + if (block.isErrorBlock()) { + _errorBlock = block; + _workerDoneLatch.countDown(); + return _errorBlock; + } + List<TransferableBlock> blockList = _resultMap.computeIfAbsent(worker.getKey(), (k) -> new ArrayList<>()); + // TODO: only data block is handled, we also need to handle metadata block from upstream in the future. + if (!block.isEndOfStreamBlock()) { + blockList.add(block); + } + } + } + + if (_workerEntries.isEmpty()) { + // NOTIFY results are ready. + _workerDoneLatch.countDown(); + return TransferableBlockUtils.getEndOfStreamTransferableBlock(); + } else { + return TransferableBlockUtils.getNoOpTransferableBlock(); + } + } +} diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerResult.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerResult.java new file mode 100644 index 0000000000..7ebd1d0c8d --- /dev/null +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerResult.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.runtime.plan.pipeline; + +import java.util.List; +import java.util.Map; +import org.apache.pinot.query.planner.plannode.PlanNode; +import org.apache.pinot.query.runtime.blocks.TransferableBlock; + + +public class PipelineBreakerResult { + private final Map<PlanNode, Integer> _nodeIdMap; + private final Map<Integer, List<TransferableBlock>> _resultMap; + + public PipelineBreakerResult(Map<PlanNode, Integer> nodeIdMap, Map<Integer, List<TransferableBlock>> resultMap) { + _nodeIdMap = nodeIdMap; + _resultMap = resultMap; + } + + public Map<PlanNode, Integer> getNodeIdMap() { + return _nodeIdMap; + } + + public Map<Integer, List<TransferableBlock>> getResultMap() { + return _resultMap; + } +} diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerVisitor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerVisitor.java new file mode 100644 index 0000000000..03ea0b2115 --- /dev/null +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerVisitor.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.runtime.plan.pipeline; + +import org.apache.pinot.query.planner.plannode.DefaultPostOrderTraversalVisitor; +import org.apache.pinot.query.planner.plannode.MailboxReceiveNode; +import org.apache.pinot.query.planner.plannode.PlanNode; +import org.apache.pinot.query.planner.plannode.PlanNodeVisitor; + + +public class PipelineBreakerVisitor extends DefaultPostOrderTraversalVisitor<Void, PipelineBreakerContext> { + private static final PlanNodeVisitor<Void, PipelineBreakerContext> INSTANCE = new PipelineBreakerVisitor(); + + public static void visitPlanRoot(PlanNode root, PipelineBreakerContext context) { + root.visit(PipelineBreakerVisitor.INSTANCE, context); + } + + @Override + public Void process(PlanNode planNode, PipelineBreakerContext context) { + context.visitedNewPlanNode(planNode); + return null; + } + + @Override + public Void visitMailboxReceive(MailboxReceiveNode node, PipelineBreakerContext context) { + process(node, context); + // TODO: actually implement pipeline breaker attribute in PlanNode + // currently all mailbox receive node from leaf stage is considered as pipeline breaker node. + if (context.isLeafStage()) { + context.addPipelineBreaker(node); + } + return null; + } +} diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestContext.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestContext.java index 87bf3302b5..5c7bc12503 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestContext.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestContext.java @@ -23,8 +23,9 @@ import org.apache.pinot.common.request.PinotQuery; import org.apache.pinot.core.routing.TimeBoundaryInfo; import org.apache.pinot.query.mailbox.MailboxService; import org.apache.pinot.query.routing.VirtualServerAddress; -import org.apache.pinot.query.runtime.plan.PlanRequestContext; +import org.apache.pinot.query.runtime.plan.PhysicalPlanContext; import org.apache.pinot.query.runtime.plan.StageMetadata; +import org.apache.pinot.query.runtime.plan.pipeline.PipelineBreakerResult; import org.apache.pinot.spi.config.table.TableType; @@ -32,7 +33,7 @@ import org.apache.pinot.spi.config.table.TableType; * Context class for converting a {@link org.apache.pinot.query.runtime.plan.DistributedStagePlan} into * {@link PinotQuery} to execute on server. */ -public class ServerPlanRequestContext extends PlanRequestContext { +public class ServerPlanRequestContext extends PhysicalPlanContext { protected TableType _tableType; protected TimeBoundaryInfo _timeBoundaryInfo; @@ -40,9 +41,11 @@ public class ServerPlanRequestContext extends PlanRequestContext { protected InstanceRequest _instanceRequest; public ServerPlanRequestContext(MailboxService mailboxService, long requestId, int stageId, long timeoutMs, - long deadlineMs, VirtualServerAddress server, StageMetadata stageMetadata, PinotQuery pinotQuery, + long deadlineMs, VirtualServerAddress server, StageMetadata stageMetadata, + PipelineBreakerResult pipelineBreakerResult, PinotQuery pinotQuery, TableType tableType, TimeBoundaryInfo timeBoundaryInfo, boolean traceEnabled) { - super(mailboxService, requestId, stageId, timeoutMs, deadlineMs, server, stageMetadata, traceEnabled); + super(mailboxService, requestId, stageId, timeoutMs, deadlineMs, server, stageMetadata, pipelineBreakerResult, + traceEnabled); _pinotQuery = pinotQuery; _tableType = tableType; _timeBoundaryInfo = timeBoundaryInfo; diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/ServerRequestPlanVisitor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java similarity index 57% rename from pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/ServerRequestPlanVisitor.java rename to pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java index a4bf0450ea..53846d257f 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/ServerRequestPlanVisitor.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java @@ -16,48 +16,37 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.query.runtime.plan; +package org.apache.pinot.query.runtime.plan.server; +import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.stream.Collectors; import org.apache.pinot.common.request.BrokerRequest; import org.apache.pinot.common.request.DataSource; import org.apache.pinot.common.request.Expression; import org.apache.pinot.common.request.InstanceRequest; import org.apache.pinot.common.request.PinotQuery; import org.apache.pinot.common.request.QuerySource; +import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.common.utils.config.QueryOptionsUtils; import org.apache.pinot.common.utils.request.RequestUtils; import org.apache.pinot.core.query.optimizer.QueryOptimizer; import org.apache.pinot.core.routing.TimeBoundaryInfo; import org.apache.pinot.query.mailbox.MailboxService; -import org.apache.pinot.query.parser.CalciteRexExpressionParser; -import org.apache.pinot.query.planner.plannode.AggregateNode; -import org.apache.pinot.query.planner.plannode.ExchangeNode; -import org.apache.pinot.query.planner.plannode.FilterNode; +import org.apache.pinot.query.planner.partitioning.FieldSelectionKeySelector; import org.apache.pinot.query.planner.plannode.JoinNode; -import org.apache.pinot.query.planner.plannode.MailboxReceiveNode; -import org.apache.pinot.query.planner.plannode.MailboxSendNode; -import org.apache.pinot.query.planner.plannode.PlanNode; -import org.apache.pinot.query.planner.plannode.PlanNodeVisitor; -import org.apache.pinot.query.planner.plannode.ProjectNode; -import org.apache.pinot.query.planner.plannode.SetOpNode; -import org.apache.pinot.query.planner.plannode.SortNode; -import org.apache.pinot.query.planner.plannode.TableScanNode; -import org.apache.pinot.query.planner.plannode.ValueNode; -import org.apache.pinot.query.planner.plannode.WindowNode; -import org.apache.pinot.query.runtime.plan.server.ServerPlanRequestContext; +import org.apache.pinot.query.runtime.plan.DistributedStagePlan; +import org.apache.pinot.query.runtime.plan.pipeline.PipelineBreakerResult; import org.apache.pinot.query.service.QueryConfig; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.utils.CommonConstants; -import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.apache.pinot.sql.FilterKind; import org.apache.pinot.sql.parsers.rewriter.NonAggregationGroupByToDistinctQueryRewriter; import org.apache.pinot.sql.parsers.rewriter.PredicateComparisonRewriter; @@ -67,18 +56,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -/** - * Plan visitor for direct leaf-stage server request. - * - * This should be merged with logics in {@link org.apache.pinot.core.plan.maker.InstancePlanMakerImplV2} in the future - * to directly produce operator chain. - * - * As of now, the reason why we use the plan visitor for server request is for additional support such as dynamic - * filtering and other auxiliary functionalities. - */ -public class ServerRequestPlanVisitor implements PlanNodeVisitor<Void, ServerPlanRequestContext> { +public class ServerPlanRequestUtils { private static final int DEFAULT_LEAF_NODE_LIMIT = 10_000_000; - private static final Logger LOGGER = LoggerFactory.getLogger(ServerRequestPlanVisitor.class); + private static final Logger LOGGER = LoggerFactory.getLogger(ServerPlanRequestUtils.class); private static final List<String> QUERY_REWRITERS_CLASS_NAMES = ImmutableList.of(PredicateComparisonRewriter.class.getName(), NonAggregationGroupByToDistinctQueryRewriter.class.getName()); @@ -86,11 +66,14 @@ public class ServerRequestPlanVisitor implements PlanNodeVisitor<Void, ServerPla new ArrayList<>(QueryRewriterFactory.getQueryRewriters(QUERY_REWRITERS_CLASS_NAMES)); private static final QueryOptimizer QUERY_OPTIMIZER = new QueryOptimizer(); - private static final ServerRequestPlanVisitor INSTANCE = new ServerRequestPlanVisitor(); + private ServerPlanRequestUtils() { + // do not instantiate. + } public static ServerPlanRequestContext build(MailboxService mailboxService, DistributedStagePlan stagePlan, - Map<String, String> requestMetadataMap, TableConfig tableConfig, Schema schema, TimeBoundaryInfo timeBoundaryInfo, - TableType tableType, List<String> segmentList, long deadlineMs) { + Map<String, String> requestMetadataMap, PipelineBreakerResult pipelineBreakerResult, + TableConfig tableConfig, Schema schema, TimeBoundaryInfo timeBoundaryInfo, TableType tableType, + List<String> segmentList, long deadlineMs) { // Before-visit: construct the ServerPlanRequestContext baseline // Making a unique requestId for leaf stages otherwise it causes problem on stats/metrics/tracing. long requestId = (Long.parseLong(requestMetadataMap.get(QueryConfig.KEY_OF_BROKER_REQUEST_ID)) << 16) + ( @@ -108,10 +91,11 @@ public class ServerRequestPlanVisitor implements PlanNodeVisitor<Void, ServerPla pinotQuery.setExplain(false); ServerPlanRequestContext context = new ServerPlanRequestContext(mailboxService, requestId, stagePlan.getStageId(), timeoutMs, deadlineMs, - stagePlan.getServer(), stagePlan.getStageMetadata(), pinotQuery, tableType, timeBoundaryInfo, traceEnabled); + stagePlan.getServer(), stagePlan.getStageMetadata(), pipelineBreakerResult, pinotQuery, tableType, + timeBoundaryInfo, traceEnabled); // visit the plan and create query physical plan. - ServerRequestPlanVisitor.walkStageNode(stagePlan.getStageRoot(), context); + ServerPlanRequestVisitor.walkStageNode(stagePlan.getStageRoot(), context); // Post-visit: finalize context. // 1. global rewrite/optimize @@ -148,6 +132,9 @@ public class ServerRequestPlanVisitor implements PlanNodeVisitor<Void, ServerPla return context; } + /** + * Helper method to update query options. + */ private static void updateQueryOptions(PinotQuery pinotQuery, Map<String, String> requestMetadataMap, long timeoutMs, boolean traceEnabled) { Map<String, String> queryOptions = new HashMap<>(); @@ -161,113 +148,6 @@ public class ServerRequestPlanVisitor implements PlanNodeVisitor<Void, ServerPla pinotQuery.setQueryOptions(queryOptions); } - private static void walkStageNode(PlanNode node, ServerPlanRequestContext context) { - node.visit(INSTANCE, context); - } - - @Override - public Void visitAggregate(AggregateNode node, ServerPlanRequestContext context) { - visitChildren(node, context); - // set group-by list - context.getPinotQuery() - .setGroupByList(CalciteRexExpressionParser.convertGroupByList(node.getGroupSet(), context.getPinotQuery())); - // set agg list - context.getPinotQuery().setSelectList( - CalciteRexExpressionParser.addSelectList(context.getPinotQuery().getGroupByList(), node.getAggCalls(), - context.getPinotQuery())); - return null; - } - - @Override - public Void visitWindow(WindowNode node, ServerPlanRequestContext context) { - throw new UnsupportedOperationException("Window not yet supported!"); - } - - @Override - public Void visitSetOp(SetOpNode node, ServerPlanRequestContext context) { - visitChildren(node, context); - return null; - } - - @Override - public Void visitExchange(ExchangeNode exchangeNode, ServerPlanRequestContext context) { - throw new UnsupportedOperationException("Exchange not yet supported!"); - } - - @Override - public Void visitFilter(FilterNode node, ServerPlanRequestContext context) { - visitChildren(node, context); - context.getPinotQuery() - .setFilterExpression(CalciteRexExpressionParser.toExpression(node.getCondition(), context.getPinotQuery())); - return null; - } - - @Override - public Void visitJoin(JoinNode node, ServerPlanRequestContext context) { - visitChildren(node, context); - return null; - } - - @Override - public Void visitMailboxReceive(MailboxReceiveNode node, ServerPlanRequestContext context) { - visitChildren(node, context); - return null; - } - - @Override - public Void visitMailboxSend(MailboxSendNode node, ServerPlanRequestContext context) { - visitChildren(node, context); - return null; - } - - @Override - public Void visitProject(ProjectNode node, ServerPlanRequestContext context) { - visitChildren(node, context); - context.getPinotQuery() - .setSelectList(CalciteRexExpressionParser.overwriteSelectList(node.getProjects(), context.getPinotQuery())); - return null; - } - - @Override - public Void visitSort(SortNode node, ServerPlanRequestContext context) { - visitChildren(node, context); - PinotQuery pinotQuery = context.getPinotQuery(); - if (node.getCollationKeys().size() > 0) { - pinotQuery.setOrderByList(CalciteRexExpressionParser.convertOrderByList(node, pinotQuery)); - } - if (node.getFetch() > 0) { - pinotQuery.setLimit(node.getFetch()); - } - if (node.getOffset() > 0) { - pinotQuery.setOffset(node.getOffset()); - } - return null; - } - - @Override - public Void visitTableScan(TableScanNode node, ServerPlanRequestContext context) { - DataSource dataSource = new DataSource(); - String tableNameWithType = TableNameBuilder.forType(context.getTableType()) - .tableNameWithType(TableNameBuilder.extractRawTableName(node.getTableName())); - dataSource.setTableName(tableNameWithType); - context.getPinotQuery().setDataSource(dataSource); - context.getPinotQuery().setSelectList( - node.getTableScanColumns().stream().map(RequestUtils::getIdentifierExpression).collect(Collectors.toList())); - return null; - } - - @Override - public Void visitValue(ValueNode node, ServerPlanRequestContext context) { - visitChildren(node, context); - return null; - } - - private void visitChildren(PlanNode node, ServerPlanRequestContext context) { - for (PlanNode child : node.getInputs()) { - child.visit(this, context); - } - } - /** * Helper method to attach the time boundary to the given PinotQuery. */ @@ -289,4 +169,106 @@ public class ServerRequestPlanVisitor implements PlanNodeVisitor<Void, ServerPla pinotQuery.setFilterExpression(timeFilterExpression); } } + + /** + * attach the dynamic filter to the given PinotQuery. + */ + static void attachDynamicFilter(PinotQuery pinotQuery, JoinNode.JoinKeys joinKeys, List<Object[]> dataContainer, + DataSchema dataSchema) { + FieldSelectionKeySelector leftSelector = (FieldSelectionKeySelector) joinKeys.getLeftJoinKeySelector(); + FieldSelectionKeySelector rightSelector = (FieldSelectionKeySelector) joinKeys.getRightJoinKeySelector(); + List<Expression> expressions = new ArrayList<>(); + for (int i = 0; i < leftSelector.getColumnIndices().size(); i++) { + Expression leftExpr = pinotQuery.getSelectList().get(leftSelector.getColumnIndices().get(i)); + int rightIdx = rightSelector.getColumnIndices().get(i); + Expression inFilterExpr = RequestUtils.getFunctionExpression(FilterKind.IN.name()); + List<Expression> operands = new ArrayList<>(dataContainer.size() + 1); + operands.add(leftExpr); + operands.addAll(computeInOperands(dataContainer, dataSchema, rightIdx)); + inFilterExpr.getFunctionCall().setOperands(operands); + expressions.add(inFilterExpr); + } + attachFilterExpression(pinotQuery, FilterKind.AND, expressions); + } + + private static List<Expression> computeInOperands(List<Object[]> dataContainer, DataSchema dataSchema, int colIdx) { + final DataSchema.ColumnDataType columnDataType = dataSchema.getColumnDataType(colIdx); + final FieldSpec.DataType storedType = columnDataType.getStoredType().toDataType();; + final int numRows = dataContainer.size(); + List<Expression> expressions = new ArrayList<>(); + switch (storedType) { + case INT: + int[] arrInt = new int[numRows]; + for (int rowIdx = 0; rowIdx < numRows; rowIdx++) { + arrInt[rowIdx] = (int) dataContainer.get(rowIdx)[colIdx]; + } + Arrays.sort(arrInt); + for (int rowIdx = 0; rowIdx < numRows; rowIdx++) { + expressions.add(RequestUtils.getLiteralExpression(arrInt[rowIdx])); + } + break; + case LONG: + long[] arrLong = new long[numRows]; + for (int rowIdx = 0; rowIdx < numRows; rowIdx++) { + arrLong[rowIdx] = (long) dataContainer.get(rowIdx)[colIdx]; + } + Arrays.sort(arrLong); + for (int rowIdx = 0; rowIdx < numRows; rowIdx++) { + expressions.add(RequestUtils.getLiteralExpression(arrLong[rowIdx])); + } + break; + case FLOAT: + float[] arrFloat = new float[numRows]; + for (int rowIdx = 0; rowIdx < numRows; rowIdx++) { + arrFloat[rowIdx] = (float) dataContainer.get(rowIdx)[colIdx]; + } + Arrays.sort(arrFloat); + for (int rowIdx = 0; rowIdx < numRows; rowIdx++) { + expressions.add(RequestUtils.getLiteralExpression(arrFloat[rowIdx])); + } + break; + case DOUBLE: + double[] arrDouble = new double[numRows]; + for (int rowIdx = 0; rowIdx < numRows; rowIdx++) { + arrDouble[rowIdx] = (double) dataContainer.get(rowIdx)[colIdx]; + } + Arrays.sort(arrDouble); + for (int rowIdx = 0; rowIdx < numRows; rowIdx++) { + expressions.add(RequestUtils.getLiteralExpression(arrDouble[rowIdx])); + } + break; + case STRING: + String[] arrString = new String[numRows]; + for (int rowIdx = 0; rowIdx < numRows; rowIdx++) { + arrString[rowIdx] = (String) dataContainer.get(rowIdx)[colIdx]; + } + Arrays.sort(arrString); + for (int rowIdx = 0; rowIdx < numRows; rowIdx++) { + expressions.add(RequestUtils.getLiteralExpression(arrString[rowIdx])); + } + break; + default: + throw new IllegalStateException("Illegal SV data type for ID_SET aggregation function: " + storedType); + } + return expressions; + } + + /** + * Attach Filter Expression to existing PinotQuery. + */ + private static void attachFilterExpression(PinotQuery pinotQuery, FilterKind attachKind, List<Expression> exprs) { + Preconditions.checkState(attachKind == FilterKind.AND || attachKind == FilterKind.OR); + Expression filterExpression = pinotQuery.getFilterExpression(); + List<Expression> arrayList = new ArrayList<>(exprs); + if (filterExpression != null) { + arrayList.add(filterExpression); + } + if (arrayList.size() > 1) { + Expression attachFilterExpression = RequestUtils.getFunctionExpression(attachKind.name()); + attachFilterExpression.getFunctionCall().setOperands(arrayList); + pinotQuery.setFilterExpression(attachFilterExpression); + } else { + pinotQuery.setFilterExpression(arrayList.get(0)); + } + } } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestVisitor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestVisitor.java new file mode 100644 index 0000000000..9b1f97426c --- /dev/null +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestVisitor.java @@ -0,0 +1,191 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.runtime.plan.server; + +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; +import org.apache.pinot.common.datablock.DataBlock; +import org.apache.pinot.common.request.DataSource; +import org.apache.pinot.common.request.PinotQuery; +import org.apache.pinot.common.utils.DataSchema; +import org.apache.pinot.common.utils.request.RequestUtils; +import org.apache.pinot.query.parser.CalciteRexExpressionParser; +import org.apache.pinot.query.planner.plannode.AggregateNode; +import org.apache.pinot.query.planner.plannode.ExchangeNode; +import org.apache.pinot.query.planner.plannode.FilterNode; +import org.apache.pinot.query.planner.plannode.JoinNode; +import org.apache.pinot.query.planner.plannode.MailboxReceiveNode; +import org.apache.pinot.query.planner.plannode.MailboxSendNode; +import org.apache.pinot.query.planner.plannode.PlanNode; +import org.apache.pinot.query.planner.plannode.PlanNodeVisitor; +import org.apache.pinot.query.planner.plannode.ProjectNode; +import org.apache.pinot.query.planner.plannode.SetOpNode; +import org.apache.pinot.query.planner.plannode.SortNode; +import org.apache.pinot.query.planner.plannode.TableScanNode; +import org.apache.pinot.query.planner.plannode.ValueNode; +import org.apache.pinot.query.planner.plannode.WindowNode; +import org.apache.pinot.query.runtime.blocks.TransferableBlock; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; + + +/** + * Plan visitor for direct leaf-stage server request. + * + * This should be merged with logics in {@link org.apache.pinot.core.plan.maker.InstancePlanMakerImplV2} in the future + * to directly produce operator chain. + * + * As of now, the reason why we use the plan visitor for server request is for additional support such as dynamic + * filtering and other auxiliary functionalities. + */ +public class ServerPlanRequestVisitor implements PlanNodeVisitor<Void, ServerPlanRequestContext> { + private static final ServerPlanRequestVisitor INSTANCE = new ServerPlanRequestVisitor(); + + static void walkStageNode(PlanNode node, ServerPlanRequestContext context) { + node.visit(INSTANCE, context); + } + + @Override + public Void visitAggregate(AggregateNode node, ServerPlanRequestContext context) { + visitChildren(node, context); + // set group-by list + context.getPinotQuery() + .setGroupByList(CalciteRexExpressionParser.convertGroupByList(node.getGroupSet(), context.getPinotQuery())); + // set agg list + context.getPinotQuery().setSelectList( + CalciteRexExpressionParser.addSelectList(context.getPinotQuery().getGroupByList(), node.getAggCalls(), + context.getPinotQuery())); + return null; + } + + @Override + public Void visitWindow(WindowNode node, ServerPlanRequestContext context) { + throw new UnsupportedOperationException("Window not yet supported!"); + } + + @Override + public Void visitSetOp(SetOpNode node, ServerPlanRequestContext context) { + visitChildren(node, context); + return null; + } + + @Override + public Void visitExchange(ExchangeNode exchangeNode, ServerPlanRequestContext context) { + throw new UnsupportedOperationException("Exchange not yet supported!"); + } + + @Override + public Void visitFilter(FilterNode node, ServerPlanRequestContext context) { + visitChildren(node, context); + context.getPinotQuery() + .setFilterExpression(CalciteRexExpressionParser.toExpression(node.getCondition(), context.getPinotQuery())); + return null; + } + + @Override + public Void visitJoin(JoinNode node, ServerPlanRequestContext context) { + // visit only the static side, turn the dynamic side into a lookup from the pipeline breaker resultDataContainer + PlanNode staticSide = node.getInputs().get(0); + PlanNode dynamicSide = node.getInputs().get(1); + if (staticSide instanceof MailboxReceiveNode) { + dynamicSide = node.getInputs().get(0); + staticSide = node.getInputs().get(1); + } + staticSide.visit(this, context); + int resultMapId = context.getPipelineBreakerResult().getNodeIdMap().get(dynamicSide); + List<TransferableBlock> transferableBlocks = context.getPipelineBreakerResult().getResultMap().get(resultMapId); + List<Object[]> resultDataContainer = new ArrayList<>(); + DataSchema dataSchema = dynamicSide.getDataSchema(); + for (TransferableBlock block : transferableBlocks) { + if (block.getType() == DataBlock.Type.ROW) { + resultDataContainer.addAll(block.getContainer()); + } + } + + if (resultDataContainer.size() > 0) { + // rewrite SEMI-JOIN as filter clause. + ServerPlanRequestUtils.attachDynamicFilter(context.getPinotQuery(), node.getJoinKeys(), resultDataContainer, + dataSchema); + } else { + // do not pull any data out, this is constant false filter. + context.getPinotQuery().setLimit(0); + } + return null; + } + + @Override + public Void visitMailboxReceive(MailboxReceiveNode node, ServerPlanRequestContext context) { + visitChildren(node, context); + return null; + } + + @Override + public Void visitMailboxSend(MailboxSendNode node, ServerPlanRequestContext context) { + visitChildren(node, context); + return null; + } + + @Override + public Void visitProject(ProjectNode node, ServerPlanRequestContext context) { + visitChildren(node, context); + context.getPinotQuery() + .setSelectList(CalciteRexExpressionParser.overwriteSelectList(node.getProjects(), context.getPinotQuery())); + return null; + } + + @Override + public Void visitSort(SortNode node, ServerPlanRequestContext context) { + visitChildren(node, context); + PinotQuery pinotQuery = context.getPinotQuery(); + if (node.getCollationKeys().size() > 0) { + pinotQuery.setOrderByList(CalciteRexExpressionParser.convertOrderByList(node, pinotQuery)); + } + if (node.getFetch() > 0) { + pinotQuery.setLimit(node.getFetch()); + } + if (node.getOffset() > 0) { + pinotQuery.setOffset(node.getOffset()); + } + return null; + } + + @Override + public Void visitTableScan(TableScanNode node, ServerPlanRequestContext context) { + DataSource dataSource = new DataSource(); + String tableNameWithType = TableNameBuilder.forType(context.getTableType()) + .tableNameWithType(TableNameBuilder.extractRawTableName(node.getTableName())); + dataSource.setTableName(tableNameWithType); + context.getPinotQuery().setDataSource(dataSource); + context.getPinotQuery().setSelectList( + node.getTableScanColumns().stream().map(RequestUtils::getIdentifierExpression).collect(Collectors.toList())); + return null; + } + + @Override + public Void visitValue(ValueNode node, ServerPlanRequestContext context) { + visitChildren(node, context); + return null; + } + + private void visitChildren(PlanNode node, ServerPlanRequestContext context) { + for (PlanNode child : node.getInputs()) { + child.visit(this, context); + } + } +} diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java index 75401ca07f..5042805948 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java @@ -20,12 +20,16 @@ package org.apache.pinot.query; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import org.apache.helix.HelixManager; import org.apache.helix.store.zk.ZkHelixPropertyStore; import org.apache.helix.zookeeper.datamodel.ZNRecord; import org.apache.pinot.common.metrics.ServerMetrics; +import org.apache.pinot.common.utils.NamedThreadFactory; import org.apache.pinot.common.utils.SchemaUtils; import org.apache.pinot.core.data.manager.InstanceDataManager; +import org.apache.pinot.core.query.scheduler.resources.ResourceManager; import org.apache.pinot.query.runtime.QueryRunner; import org.apache.pinot.query.runtime.plan.DistributedStagePlan; import org.apache.pinot.query.service.QueryConfig; @@ -65,6 +69,9 @@ public class QueryServerEnclosure { private final HelixManager _helixManager; private final QueryRunner _queryRunner; + private final ExecutorService _executor = Executors.newFixedThreadPool(ResourceManager.DEFAULT_QUERY_RUNNER_THREADS, + new NamedThreadFactory("QueryServerTest_Server")); + public QueryServerEnclosure(MockInstanceDataManagerFactory factory) { try { @@ -124,6 +131,12 @@ public class QueryServerEnclosure { } public void processQuery(DistributedStagePlan distributedStagePlan, Map<String, String> requestMetadataMap) { - _queryRunner.processQuery(distributedStagePlan, requestMetadataMap); + _executor.submit(() -> { + try { + _queryRunner.processQuery(distributedStagePlan, requestMetadataMap); + } catch (Exception e) { + throw new RuntimeException("Error executing query!", e); + } + }); } } diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java index ca35b62a9c..88081cb342 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java @@ -215,6 +215,9 @@ public class QueryRunnerTest extends QueryRunnerTestBase { @DataProvider(name = "testDataWithSqlToFinalRowCount") private Object[][] provideTestSqlAndRowCount() { return new Object[][]{ + new Object[]{"SELECT /*+ joinOptions(join_strategy='dynamic_broadcast') */ col1 FROM a " + + " WHERE a.col1 IN (SELECT b.col2 FROM b WHERE b.col3 < 10) AND a.col3 > 0", 9}, + // using join clause new Object[]{"SELECT * FROM a JOIN b USING (col1)", 15}, diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/LiteralValueOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/LiteralValueOperatorTest.java index 8276c647aa..83dea7c82d 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/LiteralValueOperatorTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/LiteralValueOperatorTest.java @@ -25,7 +25,7 @@ import org.apache.pinot.common.utils.DataSchema.ColumnDataType; import org.apache.pinot.query.planner.logical.RexExpression; import org.apache.pinot.query.routing.VirtualServerAddress; import org.apache.pinot.query.runtime.blocks.TransferableBlock; -import org.apache.pinot.query.runtime.plan.PlanRequestContext; +import org.apache.pinot.query.runtime.plan.PhysicalPlanContext; import org.apache.pinot.spi.data.FieldSpec.DataType; import org.mockito.Mock; import org.mockito.Mockito; @@ -41,7 +41,7 @@ public class LiteralValueOperatorTest { private AutoCloseable _mocks; @Mock - private PlanRequestContext _context; + private PhysicalPlanContext _context; @Mock private VirtualServerAddress _serverAddress; diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryServerTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryServerTest.java index ca24f97043..b099e5f807 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryServerTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryServerTest.java @@ -67,9 +67,6 @@ public class QueryServerTest extends QueryTestSet { private static final ExecutorService INTERM_WORKER_EXECUTOR_SERVICE = Executors.newFixedThreadPool(ResourceManager.DEFAULT_QUERY_WORKER_THREADS, new NamedThreadFactory("QueryDispatcherTest_IntermWorker")); - private static final ExecutorService RUNNER_EXECUTOR_SERVICE = - Executors.newFixedThreadPool(ResourceManager.DEFAULT_QUERY_RUNNER_THREADS, - new NamedThreadFactory("QueryServerTest_Runner")); private final Map<Integer, QueryServer> _queryServerMap = new HashMap<>(); private final Map<Integer, QueryRunner> _queryRunnerMap = new HashMap<>(); @@ -85,7 +82,6 @@ public class QueryServerTest extends QueryTestSet { QueryRunner queryRunner = Mockito.mock(QueryRunner.class); Mockito.when(queryRunner.getQueryWorkerLeafExecutorService()).thenReturn(LEAF_WORKER_EXECUTOR_SERVICE); Mockito.when(queryRunner.getQueryWorkerIntermExecutorService()).thenReturn(INTERM_WORKER_EXECUTOR_SERVICE); - Mockito.when(queryRunner.getQueryRunnerExecutorService()).thenReturn(RUNNER_EXECUTOR_SERVICE); QueryServer queryServer = new QueryServer(availablePort, queryRunner); queryServer.start(); _queryServerMap.put(availablePort, queryServer); diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/dispatch/QueryDispatcherTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/dispatch/QueryDispatcherTest.java index 69d2dda026..98a54b5f6e 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/dispatch/QueryDispatcherTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/dispatch/QueryDispatcherTest.java @@ -55,8 +55,6 @@ public class QueryDispatcherTest extends QueryTestSet { ResourceManager.DEFAULT_QUERY_WORKER_THREADS, new NamedThreadFactory("QueryDispatcherTest_LeafWorker")); private static final ExecutorService INTERM_WORKER_EXECUTOR_SERVICE = Executors.newFixedThreadPool( ResourceManager.DEFAULT_QUERY_WORKER_THREADS, new NamedThreadFactory("QueryDispatcherTest_IntermWorker")); - private static final ExecutorService RUNNER_EXECUTOR_SERVICE = Executors.newFixedThreadPool( - ResourceManager.DEFAULT_QUERY_RUNNER_THREADS, new NamedThreadFactory("QueryDispatcherTest_Runner")); private final Map<Integer, QueryServer> _queryServerMap = new HashMap<>(); private final Map<Integer, QueryRunner> _queryRunnerMap = new HashMap<>(); @@ -73,7 +71,6 @@ public class QueryDispatcherTest extends QueryTestSet { QueryRunner queryRunner = Mockito.mock(QueryRunner.class); Mockito.when(queryRunner.getQueryWorkerLeafExecutorService()).thenReturn(LEAF_WORKER_EXECUTOR_SERVICE); Mockito.when(queryRunner.getQueryWorkerIntermExecutorService()).thenReturn(INTERM_WORKER_EXECUTOR_SERVICE); - Mockito.when(queryRunner.getQueryRunnerExecutorService()).thenReturn(RUNNER_EXECUTOR_SERVICE); QueryServer queryServer = new QueryServer(availablePort, queryRunner); queryServer = Mockito.spy(queryServer); queryServer.start(); diff --git a/pinot-query-runtime/src/test/resources/queries/QueryHints.json b/pinot-query-runtime/src/test/resources/queries/QueryHints.json index 464dac98f5..2bc442fc09 100644 --- a/pinot-query-runtime/src/test/resources/queries/QueryHints.json +++ b/pinot-query-runtime/src/test/resources/queries/QueryHints.json @@ -62,6 +62,26 @@ { "description": "Colocated JOIN with partition column and group by non-partitioned column", "sql": "SELECT /*+ joinOptions(is_colocated_by_join_keys='true'), aggOptions(is_partitioned_by_group_by_keys='false') */ {tbl1}.name, SUM({tbl2}.num) FROM {tbl1} JOIN {tbl2} ON {tbl1}.num = {tbl2}.num GROUP BY {tbl1}.name" + }, + { + "description": "Colocated, Dynamic broadcast SEMI-JOIN with partition column", + "sql": "SELECT /*+ joinOptions(join_strategy='dynamic_broadcast', is_colocated_by_join_keys='true') */ {tbl1}.num, {tbl1}.name FROM {tbl1} WHERE {tbl1}.num IN (SELECT {tbl2}.num FROM {tbl2} WHERE {tbl2}.val IN ('xxx', 'yyy'))" + }, + { + "description": "Colocated, Dynamic broadcast SEMI-JOIN with partition column and group by partition column", + "sql": "SELECT /*+ joinOptions(join_strategy='dynamic_broadcast', is_colocated_by_join_keys='true'), aggOptions(is_partitioned_by_group_by_keys='true') */ {tbl1}.num, COUNT({tbl1}.name) FROM {tbl1} WHERE {tbl1}.num IN (SELECT {tbl2}.num FROM {tbl2} WHERE {tbl2}.val IN ('xxx', 'yyy')) GROUP BY {tbl1}.num, {tbl1}.name" + }, + { + "description": "Colocated, Dynamic broadcast SEMI-JOIN with partition column and group by non-partitioned column", + "sql": "SELECT /*+ joinOptions(join_strategy='dynamic_broadcast', is_colocated_by_join_keys='true') */ {tbl1}.name, COUNT(*) FROM {tbl1} WHERE {tbl1}.num IN (SELECT {tbl2}.num FROM {tbl2} WHERE {tbl2}.val IN ('xxx', 'yyy')) GROUP BY {tbl1}.name" + }, + { + "description": "Dynamic broadcast SEMI-JOIN with empty right table result", + "sql": "SELECT /*+ joinOptions(join_strategy='dynamic_broadcast') */ {tbl1}.name, COUNT(*) FROM {tbl1} WHERE {tbl1}.num IN (SELECT {tbl2}.num FROM {tbl2} WHERE {tbl2}.val = 'non-exist') GROUP BY {tbl1}.name" + }, + { + "description": "Colocated, Dynamic broadcast SEMI-JOIN with partially empty right table result for some servers", + "sql": "SELECT /*+ joinOptions(join_strategy='dynamic_broadcast', is_colocated_by_join_keys='true') */ {tbl1}.name, COUNT(*) FROM {tbl1} WHERE {tbl1}.num IN (SELECT {tbl2}.num FROM {tbl2} WHERE {tbl2}.val = 'z') GROUP BY {tbl1}.name" } ] } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org