This is an automated email from the ASF dual-hosted git repository. rongr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 6f9079921b [multistage] visitor pattern for server request construction (#9653) 6f9079921b is described below commit 6f9079921b23c899d1bb906c5ca6d30159c0b555 Author: Rong Rong <ro...@apache.org> AuthorDate: Mon Oct 31 10:40:14 2022 -0700 [multistage] visitor pattern for server request construction (#9653) * use visitor pattern for server request construction * move MailboxService into context Co-authored-by: Rong Rong <ro...@startree.ai> --- .../apache/pinot/query/runtime/QueryRunner.java | 55 ++++- .../runtime/executor/WorkerQueryExecutor.java | 6 +- .../{executor => plan}/PhysicalPlanVisitor.java | 75 +++---- .../query/runtime/plan/PlanRequestContext.java | 68 ++++++ .../runtime/plan/ServerRequestPlanVisitor.java | 242 +++++++++++++++++++++ .../plan/server/ServerPlanRequestContext.java | 67 ++++++ .../query/runtime/utils/ServerRequestUtils.java | 235 -------------------- .../pinot/query/runtime/QueryRunnerTest.java | 10 +- 8 files changed, 469 insertions(+), 289 deletions(-) diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java index 1ff15eb189..1a9a8cd88c 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.query.runtime; +import com.google.common.base.Preconditions; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; @@ -31,6 +32,7 @@ import org.apache.pinot.common.datablock.BaseDataBlock; import org.apache.pinot.common.datablock.DataBlockUtils; import org.apache.pinot.common.datablock.MetadataBlock; import org.apache.pinot.common.datatable.DataTable; +import org.apache.pinot.common.metadata.ZKMetadataProvider; import org.apache.pinot.common.metrics.ServerMetrics; import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.core.common.Operator; @@ -49,10 +51,16 @@ import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils; import org.apache.pinot.query.runtime.executor.WorkerQueryExecutor; import org.apache.pinot.query.runtime.operator.MailboxSendOperator; import org.apache.pinot.query.runtime.plan.DistributedStagePlan; -import org.apache.pinot.query.runtime.utils.ServerRequestUtils; +import org.apache.pinot.query.runtime.plan.ServerRequestPlanVisitor; +import org.apache.pinot.query.runtime.plan.server.ServerPlanRequestContext; import org.apache.pinot.query.service.QueryConfig; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.env.PinotConfiguration; +import org.apache.pinot.spi.metrics.PinotMetricUtils; import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -112,13 +120,14 @@ public class QueryRunner { // TODO: make server query request return via mailbox, this is a hack to gather the non-streaming data table // and package it here for return. But we should really use a MailboxSendOperator directly put into the // server executor. - List<ServerQueryRequest> serverQueryRequests = - ServerRequestUtils.constructServerQueryRequest(distributedStagePlan, requestMetadataMap, - _helixPropertyStore); + List<ServerPlanRequestContext> serverQueryRequests = constructServerQueryRequests(distributedStagePlan, + requestMetadataMap, _helixPropertyStore, _mailboxService); // send the data table via mailbox in one-off fashion (e.g. no block-level split, one data table/partition key) List<BaseDataBlock> serverQueryResults = new ArrayList<>(serverQueryRequests.size()); - for (ServerQueryRequest request : serverQueryRequests) { + for (ServerPlanRequestContext requestContext : serverQueryRequests) { + ServerQueryRequest request = new ServerQueryRequest(requestContext.getInstanceRequest(), + new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()), System.currentTimeMillis()); serverQueryResults.add(processServerQuery(request, executorService)); } @@ -139,6 +148,42 @@ public class QueryRunner { } } + private static List<ServerPlanRequestContext> constructServerQueryRequests(DistributedStagePlan distributedStagePlan, + Map<String, String> requestMetadataMap, ZkHelixPropertyStore<ZNRecord> helixPropertyStore, + MailboxService<TransferableBlock> mailboxService) { + StageMetadata stageMetadata = distributedStagePlan.getMetadataMap().get(distributedStagePlan.getStageId()); + Preconditions.checkState(stageMetadata.getScannedTables().size() == 1, + "Server request for V2 engine should only have 1 scan table per request."); + String rawTableName = stageMetadata.getScannedTables().get(0); + Map<String, List<String>> tableToSegmentListMap = stageMetadata.getServerInstanceToSegmentsMap() + .get(distributedStagePlan.getServerInstance()); + List<ServerPlanRequestContext> requests = new ArrayList<>(); + for (Map.Entry<String, List<String>> tableEntry : tableToSegmentListMap.entrySet()) { + String tableType = tableEntry.getKey(); + // ZkHelixPropertyStore extends from ZkCacheBaseDataAccessor so it should not cause too much out-of-the-box + // network traffic. but there's chance to improve this: + // TODO: use TableDataManager: it is already getting tableConfig and Schema when processing segments. + if (TableType.OFFLINE.name().equals(tableType)) { + TableConfig tableConfig = ZKMetadataProvider.getTableConfig(helixPropertyStore, + TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(rawTableName)); + Schema schema = ZKMetadataProvider.getTableSchema(helixPropertyStore, + TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(rawTableName)); + requests.add(ServerRequestPlanVisitor.build(mailboxService, distributedStagePlan, requestMetadataMap, + tableConfig, schema, stageMetadata.getTimeBoundaryInfo(), TableType.OFFLINE, tableEntry.getValue())); + } else if (TableType.REALTIME.name().equals(tableType)) { + TableConfig tableConfig = ZKMetadataProvider.getTableConfig(helixPropertyStore, + TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(rawTableName)); + Schema schema = ZKMetadataProvider.getTableSchema(helixPropertyStore, + TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(rawTableName)); + requests.add(ServerRequestPlanVisitor.build(mailboxService, distributedStagePlan, requestMetadataMap, + tableConfig, schema, stageMetadata.getTimeBoundaryInfo(), TableType.REALTIME, tableEntry.getValue())); + } else { + throw new IllegalArgumentException("Unsupported table type key: " + tableType); + } + } + return requests; + } + private BaseDataBlock processServerQuery(ServerQueryRequest serverQueryRequest, ExecutorService executorService) { BaseDataBlock dataBlock; try { diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java index 08de5ca7e0..2436061aa9 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java @@ -29,6 +29,8 @@ import org.apache.pinot.query.planner.stage.StageNode; import org.apache.pinot.query.runtime.blocks.TransferableBlock; import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils; import org.apache.pinot.query.runtime.plan.DistributedStagePlan; +import org.apache.pinot.query.runtime.plan.PhysicalPlanVisitor; +import org.apache.pinot.query.runtime.plan.PlanRequestContext; import org.apache.pinot.spi.env.PinotConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -71,8 +73,8 @@ public class WorkerQueryExecutor { long requestId = Long.parseLong(requestMetadataMap.get("REQUEST_ID")); StageNode stageRoot = queryRequest.getStageRoot(); - Operator<TransferableBlock> rootOperator = PhysicalPlanVisitor.build( - _mailboxService, _hostName, _port, requestId, queryRequest.getMetadataMap(), stageRoot); + Operator<TransferableBlock> rootOperator = PhysicalPlanVisitor.build(stageRoot, new PlanRequestContext( + _mailboxService, requestId, stageRoot.getStageId(), _hostName, _port, queryRequest.getMetadataMap())); executorService.submit(new TraceRunnable() { @Override diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/PhysicalPlanVisitor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java similarity index 69% rename from pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/PhysicalPlanVisitor.java rename to pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java index 6af229aa78..6dab5865ba 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/PhysicalPlanVisitor.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java @@ -16,14 +16,13 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.query.runtime.executor; +package org.apache.pinot.query.runtime.plan; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutorService; import org.apache.pinot.core.common.Operator; import org.apache.pinot.core.transport.ServerInstance; -import org.apache.pinot.query.mailbox.MailboxService; import org.apache.pinot.query.planner.StageMetadata; import org.apache.pinot.query.planner.stage.AggregateNode; import org.apache.pinot.query.planner.stage.FilterNode; @@ -45,7 +44,6 @@ import org.apache.pinot.query.runtime.operator.MailboxReceiveOperator; import org.apache.pinot.query.runtime.operator.MailboxSendOperator; import org.apache.pinot.query.runtime.operator.SortOperator; import org.apache.pinot.query.runtime.operator.TransformOperator; -import org.apache.pinot.query.runtime.plan.DistributedStagePlan; /** @@ -53,69 +51,54 @@ import org.apache.pinot.query.runtime.plan.DistributedStagePlan; * this works only for the intermediate stage nodes, leaf stage nodes are expected to compile into * v1 operators at this point in time. * - * <p>This class should be used statically via {@link #build(MailboxService, String, int, long, Map, StageNode)} + * <p>This class should be used statically via {@link #build(StageNode, PlanRequestContext)} * * @see org.apache.pinot.query.runtime.QueryRunner#processQuery(DistributedStagePlan, ExecutorService, Map) */ -public class PhysicalPlanVisitor implements StageNodeVisitor<Operator<TransferableBlock>, Void> { +public class PhysicalPlanVisitor implements StageNodeVisitor<Operator<TransferableBlock>, PlanRequestContext> { + private static final PhysicalPlanVisitor INSTANCE = new PhysicalPlanVisitor(); - private final MailboxService<TransferableBlock> _mailboxService; - private final String _hostName; - private final int _port; - private final long _requestId; - private final Map<Integer, StageMetadata> _metadataMap; - - public static Operator<TransferableBlock> build(MailboxService<TransferableBlock> mailboxService, - String hostName, int port, long requestId, Map<Integer, StageMetadata> metadataMap, StageNode node) { - return node.visit(new PhysicalPlanVisitor(mailboxService, hostName, port, requestId, metadataMap), null); - } - - private PhysicalPlanVisitor(MailboxService<TransferableBlock> mailboxService, String hostName, int port, - long requestId, Map<Integer, StageMetadata> metadataMap) { - _mailboxService = mailboxService; - _hostName = hostName; - _port = port; - _requestId = requestId; - _metadataMap = metadataMap; + public static Operator<TransferableBlock> build(StageNode node, PlanRequestContext context) { + return node.visit(INSTANCE, context); } @Override - public Operator<TransferableBlock> visitMailboxReceive(MailboxReceiveNode node, Void context) { - List<ServerInstance> sendingInstances = _metadataMap.get(node.getSenderStageId()).getServerInstances(); - return new MailboxReceiveOperator(_mailboxService, node.getDataSchema(), sendingInstances, - node.getExchangeType(), node.getPartitionKeySelector(), _hostName, _port, _requestId, - node.getSenderStageId()); + public Operator<TransferableBlock> visitMailboxReceive(MailboxReceiveNode node, PlanRequestContext context) { + List<ServerInstance> sendingInstances = context.getMetadataMap().get(node.getSenderStageId()).getServerInstances(); + return new MailboxReceiveOperator(context.getMailboxService(), node.getDataSchema(), sendingInstances, + node.getExchangeType(), node.getPartitionKeySelector(), context.getHostName(), context.getPort(), + context.getRequestId(), node.getSenderStageId()); } @Override - public Operator<TransferableBlock> visitMailboxSend(MailboxSendNode node, Void context) { - Operator<TransferableBlock> nextOperator = node.getInputs().get(0).visit(this, null); - StageMetadata receivingStageMetadata = _metadataMap.get(node.getReceiverStageId()); - return new MailboxSendOperator(_mailboxService, node.getDataSchema(), nextOperator, + public Operator<TransferableBlock> visitMailboxSend(MailboxSendNode node, PlanRequestContext context) { + Operator<TransferableBlock> nextOperator = node.getInputs().get(0).visit(this, context); + StageMetadata receivingStageMetadata = context.getMetadataMap().get(node.getReceiverStageId()); + return new MailboxSendOperator(context.getMailboxService(), node.getDataSchema(), nextOperator, receivingStageMetadata.getServerInstances(), node.getExchangeType(), node.getPartitionKeySelector(), - _hostName, _port, _requestId, node.getStageId()); + context.getHostName(), context.getPort(), context.getRequestId(), node.getStageId()); } @Override - public Operator<TransferableBlock> visitAggregate(AggregateNode node, Void context) { - Operator<TransferableBlock> nextOperator = node.getInputs().get(0).visit(this, null); + public Operator<TransferableBlock> visitAggregate(AggregateNode node, PlanRequestContext context) { + Operator<TransferableBlock> nextOperator = node.getInputs().get(0).visit(this, context); return new AggregateOperator(nextOperator, node.getDataSchema(), node.getAggCalls(), node.getGroupSet(), node.getInputs().get(0).getDataSchema()); } @Override - public Operator<TransferableBlock> visitFilter(FilterNode node, Void context) { - Operator<TransferableBlock> nextOperator = node.getInputs().get(0).visit(this, null); + public Operator<TransferableBlock> visitFilter(FilterNode node, PlanRequestContext context) { + Operator<TransferableBlock> nextOperator = node.getInputs().get(0).visit(this, context); return new FilterOperator(nextOperator, node.getDataSchema(), node.getCondition()); } @Override - public Operator<TransferableBlock> visitJoin(JoinNode node, Void context) { + public Operator<TransferableBlock> visitJoin(JoinNode node, PlanRequestContext context) { StageNode left = node.getInputs().get(0); StageNode right = node.getInputs().get(1); - Operator<TransferableBlock> leftOperator = left.visit(this, null); - Operator<TransferableBlock> rightOperator = right.visit(this, null); + Operator<TransferableBlock> leftOperator = left.visit(this, context); + Operator<TransferableBlock> rightOperator = right.visit(this, context); return new HashJoinOperator(leftOperator, left.getDataSchema(), rightOperator, right.getDataSchema(), node.getDataSchema(), node.getJoinKeys(), @@ -123,26 +106,26 @@ public class PhysicalPlanVisitor implements StageNodeVisitor<Operator<Transferab } @Override - public Operator<TransferableBlock> visitProject(ProjectNode node, Void context) { - Operator<TransferableBlock> nextOperator = node.getInputs().get(0).visit(this, null); + public Operator<TransferableBlock> visitProject(ProjectNode node, PlanRequestContext context) { + Operator<TransferableBlock> nextOperator = node.getInputs().get(0).visit(this, context); return new TransformOperator(nextOperator, node.getDataSchema(), node.getProjects(), node.getInputs().get(0).getDataSchema()); } @Override - public Operator<TransferableBlock> visitSort(SortNode node, Void context) { - Operator<TransferableBlock> nextOperator = node.getInputs().get(0).visit(this, null); + public Operator<TransferableBlock> visitSort(SortNode node, PlanRequestContext context) { + Operator<TransferableBlock> nextOperator = node.getInputs().get(0).visit(this, context); return new SortOperator(nextOperator, node.getCollationKeys(), node.getCollationDirections(), node.getFetch(), node.getOffset(), node.getDataSchema()); } @Override - public Operator<TransferableBlock> visitTableScan(TableScanNode node, Void context) { + public Operator<TransferableBlock> visitTableScan(TableScanNode node, PlanRequestContext context) { throw new UnsupportedOperationException("Stage node of type TableScanNode is not supported!"); } @Override - public Operator<TransferableBlock> visitValue(ValueNode node, Void context) { + public Operator<TransferableBlock> visitValue(ValueNode node, PlanRequestContext context) { return new LiteralValueOperator(node.getDataSchema(), node.getLiteralRows()); } } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PlanRequestContext.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PlanRequestContext.java new file mode 100644 index 0000000000..ae3e6c26db --- /dev/null +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PlanRequestContext.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.runtime.plan; + +import java.util.Map; +import org.apache.pinot.query.mailbox.MailboxService; +import org.apache.pinot.query.planner.StageMetadata; +import org.apache.pinot.query.runtime.blocks.TransferableBlock; + + +public class PlanRequestContext { + protected final MailboxService<TransferableBlock> _mailboxService; + protected final long _requestId; + protected final int _stageId; + protected final String _hostName; + protected final int _port; + protected final Map<Integer, StageMetadata> _metadataMap; + + public PlanRequestContext(MailboxService<TransferableBlock> mailboxService, long requestId, int stageId, + String hostName, int port, Map<Integer, StageMetadata> metadataMap) { + _mailboxService = mailboxService; + _requestId = requestId; + _stageId = stageId; + _hostName = hostName; + _port = port; + _metadataMap = metadataMap; + } + + public long getRequestId() { + return _requestId; + } + + public int getStageId() { + return _stageId; + } + + public String getHostName() { + return _hostName; + } + + public int getPort() { + return _port; + } + + public Map<Integer, StageMetadata> getMetadataMap() { + return _metadataMap; + } + + public MailboxService<TransferableBlock> getMailboxService() { + return _mailboxService; + } +} diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/ServerRequestPlanVisitor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/ServerRequestPlanVisitor.java new file mode 100644 index 0000000000..8735d59674 --- /dev/null +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/ServerRequestPlanVisitor.java @@ -0,0 +1,242 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.runtime.plan; + +import com.google.common.collect.ImmutableList; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.pinot.common.request.BrokerRequest; +import org.apache.pinot.common.request.DataSource; +import org.apache.pinot.common.request.Expression; +import org.apache.pinot.common.request.InstanceRequest; +import org.apache.pinot.common.request.PinotQuery; +import org.apache.pinot.common.request.QuerySource; +import org.apache.pinot.common.utils.request.RequestUtils; +import org.apache.pinot.core.query.optimizer.QueryOptimizer; +import org.apache.pinot.core.routing.TimeBoundaryInfo; +import org.apache.pinot.query.mailbox.MailboxService; +import org.apache.pinot.query.parser.CalciteRexExpressionParser; +import org.apache.pinot.query.planner.stage.AggregateNode; +import org.apache.pinot.query.planner.stage.FilterNode; +import org.apache.pinot.query.planner.stage.JoinNode; +import org.apache.pinot.query.planner.stage.MailboxReceiveNode; +import org.apache.pinot.query.planner.stage.MailboxSendNode; +import org.apache.pinot.query.planner.stage.ProjectNode; +import org.apache.pinot.query.planner.stage.SortNode; +import org.apache.pinot.query.planner.stage.StageNode; +import org.apache.pinot.query.planner.stage.StageNodeVisitor; +import org.apache.pinot.query.planner.stage.TableScanNode; +import org.apache.pinot.query.planner.stage.ValueNode; +import org.apache.pinot.query.runtime.blocks.TransferableBlock; +import org.apache.pinot.query.runtime.plan.server.ServerPlanRequestContext; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; +import org.apache.pinot.sql.FilterKind; +import org.apache.pinot.sql.parsers.rewriter.NonAggregationGroupByToDistinctQueryRewriter; +import org.apache.pinot.sql.parsers.rewriter.PredicateComparisonRewriter; +import org.apache.pinot.sql.parsers.rewriter.QueryRewriter; +import org.apache.pinot.sql.parsers.rewriter.QueryRewriterFactory; + + +/** + * Plan visitor for direct leaf-stage server request. + * + * This should be merged with logics in {@link org.apache.pinot.core.plan.maker.InstancePlanMakerImplV2} in the future + * to directly produce operator chain. + * + * As of now, the reason why we use the plan visitor for server request is for additional support such as dynamic + * filtering and other auxiliary functionalities. + */ +public class ServerRequestPlanVisitor implements StageNodeVisitor<Void, ServerPlanRequestContext> { + private static final int DEFAULT_LEAF_NODE_LIMIT = 10_000_000; + private static final List<String> QUERY_REWRITERS_CLASS_NAMES = + ImmutableList.of( + PredicateComparisonRewriter.class.getName(), + NonAggregationGroupByToDistinctQueryRewriter.class.getName() + ); + private static final List<QueryRewriter> QUERY_REWRITERS = new ArrayList<>( + QueryRewriterFactory.getQueryRewriters(QUERY_REWRITERS_CLASS_NAMES)); + private static final QueryOptimizer QUERY_OPTIMIZER = new QueryOptimizer(); + + private static final ServerRequestPlanVisitor INSTANCE = new ServerRequestPlanVisitor(); + private static Void _aVoid = null; + + public static ServerPlanRequestContext build(MailboxService<TransferableBlock> mailboxService, + DistributedStagePlan stagePlan, Map<String, String> requestMetadataMap, TableConfig tableConfig, Schema schema, + TimeBoundaryInfo timeBoundaryInfo, TableType tableType, List<String> segmentList) { + // Before-visit: construct the ServerPlanRequestContext baseline + long requestId = Long.parseLong(requestMetadataMap.get("REQUEST_ID")); + PinotQuery pinotQuery = new PinotQuery(); + pinotQuery.setLimit(DEFAULT_LEAF_NODE_LIMIT); + pinotQuery.setExplain(false); + ServerPlanRequestContext context = new ServerPlanRequestContext(mailboxService, requestId, stagePlan.getStageId(), + stagePlan.getServerInstance().getHostname(), stagePlan.getServerInstance().getPort(), + stagePlan.getMetadataMap(), pinotQuery, tableType, timeBoundaryInfo); + + // visit the plan and create query physical plan. + ServerRequestPlanVisitor.walkStageNode(stagePlan.getStageRoot(), context); + + // Post-visit: finalize context. + // 1. global rewrite/optimize + if (timeBoundaryInfo != null) { + attachTimeBoundary(pinotQuery, timeBoundaryInfo, tableType == TableType.OFFLINE); + } + for (QueryRewriter queryRewriter : QUERY_REWRITERS) { + pinotQuery = queryRewriter.rewrite(pinotQuery); + } + QUERY_OPTIMIZER.optimize(pinotQuery, tableConfig, schema); + + // 2. wrapped around in broker request + BrokerRequest brokerRequest = new BrokerRequest(); + brokerRequest.setPinotQuery(pinotQuery); + DataSource dataSource = pinotQuery.getDataSource(); + if (dataSource != null) { + QuerySource querySource = new QuerySource(); + querySource.setTableName(dataSource.getTableName()); + brokerRequest.setQuerySource(querySource); + } + + // 3. create instance request with segmentList + InstanceRequest instanceRequest = new InstanceRequest(); + instanceRequest.setRequestId(requestId); + instanceRequest.setBrokerId("unknown"); + instanceRequest.setEnableTrace(false); + instanceRequest.setSearchSegments(segmentList); + instanceRequest.setQuery(brokerRequest); + + context.setInstanceRequest(instanceRequest); + return context; + } + + private static void walkStageNode(StageNode node, ServerPlanRequestContext context) { + node.visit(INSTANCE, context); + } + + @Override + public Void visitAggregate(AggregateNode node, ServerPlanRequestContext context) { + visitChildren(node, context); + // set group-by list + context.getPinotQuery().setGroupByList(CalciteRexExpressionParser.convertGroupByList( + node.getGroupSet(), context.getPinotQuery())); + // set agg list + context.getPinotQuery().setSelectList(CalciteRexExpressionParser.addSelectList( + context.getPinotQuery().getGroupByList(), node.getAggCalls(), context.getPinotQuery())); + return _aVoid; + } + + @Override + public Void visitFilter(FilterNode node, ServerPlanRequestContext context) { + visitChildren(node, context); + context.getPinotQuery().setFilterExpression(CalciteRexExpressionParser.toExpression( + node.getCondition(), context.getPinotQuery())); + return _aVoid; + } + + @Override + public Void visitJoin(JoinNode node, ServerPlanRequestContext context) { + visitChildren(node, context); + return _aVoid; + } + + @Override + public Void visitMailboxReceive(MailboxReceiveNode node, ServerPlanRequestContext context) { + visitChildren(node, context); + return _aVoid; + } + + @Override + public Void visitMailboxSend(MailboxSendNode node, ServerPlanRequestContext context) { + visitChildren(node, context); + return _aVoid; + } + + @Override + public Void visitProject(ProjectNode node, ServerPlanRequestContext context) { + visitChildren(node, context); + context.getPinotQuery().setSelectList(CalciteRexExpressionParser.overwriteSelectList( + node.getProjects(), context.getPinotQuery())); + return _aVoid; + } + + @Override + public Void visitSort(SortNode node, ServerPlanRequestContext context) { + visitChildren(node, context); + if (node.getCollationKeys().size() > 0) { + context.getPinotQuery().setOrderByList(CalciteRexExpressionParser.convertOrderByList(node.getCollationKeys(), + node.getCollationDirections(), context.getPinotQuery())); + } + if (node.getFetch() > 0) { + context.getPinotQuery().setLimit(node.getFetch()); + } + if (node.getOffset() > 0) { + context.getPinotQuery().setOffset(node.getOffset()); + } + return _aVoid; + } + + @Override + public Void visitTableScan(TableScanNode node, ServerPlanRequestContext context) { + DataSource dataSource = new DataSource(); + String tableNameWithType = TableNameBuilder.forType(context.getTableType()) + .tableNameWithType(TableNameBuilder.extractRawTableName(node.getTableName())); + dataSource.setTableName(tableNameWithType); + context.getPinotQuery().setDataSource(dataSource); + context.getPinotQuery().setSelectList(node.getTableScanColumns().stream() + .map(RequestUtils::getIdentifierExpression).collect(Collectors.toList())); + return _aVoid; + } + + @Override + public Void visitValue(ValueNode node, ServerPlanRequestContext context) { + visitChildren(node, context); + return _aVoid; + } + + private void visitChildren(StageNode node, ServerPlanRequestContext context) { + for (StageNode child : node.getInputs()) { + child.visit(this, context); + } + } + /** + * Helper method to attach the time boundary to the given PinotQuery. + */ + private static void attachTimeBoundary(PinotQuery pinotQuery, TimeBoundaryInfo timeBoundaryInfo, + boolean isOfflineRequest) { + String timeColumn = timeBoundaryInfo.getTimeColumn(); + String timeValue = timeBoundaryInfo.getTimeValue(); + Expression timeFilterExpression = RequestUtils.getFunctionExpression( + isOfflineRequest ? FilterKind.LESS_THAN_OR_EQUAL.name() : FilterKind.GREATER_THAN.name()); + timeFilterExpression.getFunctionCall().setOperands( + Arrays.asList(RequestUtils.getIdentifierExpression(timeColumn), RequestUtils.getLiteralExpression(timeValue))); + + Expression filterExpression = pinotQuery.getFilterExpression(); + if (filterExpression != null) { + Expression andFilterExpression = RequestUtils.getFunctionExpression(FilterKind.AND.name()); + andFilterExpression.getFunctionCall().setOperands(Arrays.asList(filterExpression, timeFilterExpression)); + pinotQuery.setFilterExpression(andFilterExpression); + } else { + pinotQuery.setFilterExpression(timeFilterExpression); + } + } +} diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestContext.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestContext.java new file mode 100644 index 0000000000..cc63d04f0e --- /dev/null +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestContext.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.runtime.plan.server; + +import java.util.Map; +import org.apache.pinot.common.request.InstanceRequest; +import org.apache.pinot.common.request.PinotQuery; +import org.apache.pinot.core.routing.TimeBoundaryInfo; +import org.apache.pinot.query.mailbox.MailboxService; +import org.apache.pinot.query.planner.StageMetadata; +import org.apache.pinot.query.runtime.blocks.TransferableBlock; +import org.apache.pinot.query.runtime.plan.PlanRequestContext; +import org.apache.pinot.spi.config.table.TableType; + + +/** + * Context class for converting a {@link org.apache.pinot.query.runtime.plan.DistributedStagePlan} into + * {@link PinotQuery} to execute on server. + */ +public class ServerPlanRequestContext extends PlanRequestContext { + protected TableType _tableType; + protected TimeBoundaryInfo _timeBoundaryInfo; + + protected PinotQuery _pinotQuery; + protected InstanceRequest _instanceRequest; + + public ServerPlanRequestContext(MailboxService<TransferableBlock> mailboxService, long requestId, int stageId, + String hostName, int port, Map<Integer, StageMetadata> metadataMap, PinotQuery pinotQuery, TableType tableType, + TimeBoundaryInfo timeBoundaryInfo) { + super(mailboxService, requestId, stageId, hostName, port, metadataMap); + _pinotQuery = pinotQuery; + _tableType = tableType; + _timeBoundaryInfo = timeBoundaryInfo; + } + + public TableType getTableType() { + return _tableType; + } + + public PinotQuery getPinotQuery() { + return _pinotQuery; + } + + public void setInstanceRequest(InstanceRequest instanceRequest) { + _instanceRequest = instanceRequest; + } + + public InstanceRequest getInstanceRequest() { + return _instanceRequest; + } +} diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/utils/ServerRequestUtils.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/utils/ServerRequestUtils.java deleted file mode 100644 index 2e68c601ab..0000000000 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/utils/ServerRequestUtils.java +++ /dev/null @@ -1,235 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.query.runtime.utils; - -import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableList; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; -import org.apache.helix.store.zk.ZkHelixPropertyStore; -import org.apache.helix.zookeeper.datamodel.ZNRecord; -import org.apache.pinot.common.metadata.ZKMetadataProvider; -import org.apache.pinot.common.metrics.ServerMetrics; -import org.apache.pinot.common.request.BrokerRequest; -import org.apache.pinot.common.request.DataSource; -import org.apache.pinot.common.request.Expression; -import org.apache.pinot.common.request.InstanceRequest; -import org.apache.pinot.common.request.PinotQuery; -import org.apache.pinot.common.request.QuerySource; -import org.apache.pinot.common.utils.request.RequestUtils; -import org.apache.pinot.core.query.optimizer.QueryOptimizer; -import org.apache.pinot.core.query.request.ServerQueryRequest; -import org.apache.pinot.core.routing.TimeBoundaryInfo; -import org.apache.pinot.query.parser.CalciteRexExpressionParser; -import org.apache.pinot.query.planner.StageMetadata; -import org.apache.pinot.query.planner.stage.AggregateNode; -import org.apache.pinot.query.planner.stage.FilterNode; -import org.apache.pinot.query.planner.stage.MailboxSendNode; -import org.apache.pinot.query.planner.stage.ProjectNode; -import org.apache.pinot.query.planner.stage.SortNode; -import org.apache.pinot.query.planner.stage.StageNode; -import org.apache.pinot.query.planner.stage.TableScanNode; -import org.apache.pinot.query.runtime.plan.DistributedStagePlan; -import org.apache.pinot.spi.config.table.TableConfig; -import org.apache.pinot.spi.config.table.TableType; -import org.apache.pinot.spi.data.Schema; -import org.apache.pinot.spi.metrics.PinotMetricUtils; -import org.apache.pinot.spi.utils.builder.TableNameBuilder; -import org.apache.pinot.sql.FilterKind; -import org.apache.pinot.sql.parsers.rewriter.NonAggregationGroupByToDistinctQueryRewriter; -import org.apache.pinot.sql.parsers.rewriter.PredicateComparisonRewriter; -import org.apache.pinot.sql.parsers.rewriter.QueryRewriter; -import org.apache.pinot.sql.parsers.rewriter.QueryRewriterFactory; - - -/** - * {@code ServerRequestUtils} converts the {@link DistributedStagePlan} into a {@link ServerQueryRequest}. - * - * <p>In order to reuse the current pinot {@link org.apache.pinot.core.query.executor.ServerQueryExecutorV1Impl}, a - * conversion step is needed so that the V2 query plan can be converted into a compatible format to run V1 executor. - */ -public class ServerRequestUtils { - private static final int DEFAULT_LEAF_NODE_LIMIT = 10_000_000; - private static final List<String> QUERY_REWRITERS_CLASS_NAMES = - ImmutableList.of( - PredicateComparisonRewriter.class.getName(), - NonAggregationGroupByToDistinctQueryRewriter.class.getName() - ); - private static final List<QueryRewriter> QUERY_REWRITERS = new ArrayList<>( - QueryRewriterFactory.getQueryRewriters(QUERY_REWRITERS_CLASS_NAMES)); - private static final QueryOptimizer QUERY_OPTIMIZER = new QueryOptimizer(); - - private ServerRequestUtils() { - // do not instantiate. - } - - // TODO: This is a hack, make an actual ServerQueryRequest converter. - public static List<ServerQueryRequest> constructServerQueryRequest(DistributedStagePlan distributedStagePlan, - Map<String, String> requestMetadataMap, ZkHelixPropertyStore<ZNRecord> helixPropertyStore) { - StageMetadata stageMetadata = distributedStagePlan.getMetadataMap().get(distributedStagePlan.getStageId()); - Preconditions.checkState(stageMetadata.getScannedTables().size() == 1, - "Server request for V2 engine should only have 1 scan table per request."); - String rawTableName = stageMetadata.getScannedTables().get(0); - Map<String, List<String>> tableToSegmentListMap = stageMetadata.getServerInstanceToSegmentsMap() - .get(distributedStagePlan.getServerInstance()); - List<ServerQueryRequest> requests = new ArrayList<>(); - for (Map.Entry<String, List<String>> tableEntry : tableToSegmentListMap.entrySet()) { - String tableType = tableEntry.getKey(); - // ZkHelixPropertyStore extends from ZkCacheBaseDataAccessor so it should not cause too much out-of-the-box - // network traffic. but there's chance to improve this: - // TODO: use TableDataManager: it is already getting tableConfig and Schema when processing segments. - if (TableType.OFFLINE.name().equals(tableType)) { - TableConfig tableConfig = ZKMetadataProvider.getTableConfig(helixPropertyStore, - TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(rawTableName)); - Schema schema = ZKMetadataProvider.getTableSchema(helixPropertyStore, - TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(rawTableName)); - requests.add(constructServerQueryRequest(distributedStagePlan, requestMetadataMap, tableConfig, schema, - stageMetadata.getTimeBoundaryInfo(), TableType.OFFLINE, tableEntry.getValue())); - } else if (TableType.REALTIME.name().equals(tableType)) { - TableConfig tableConfig = ZKMetadataProvider.getTableConfig(helixPropertyStore, - TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(rawTableName)); - Schema schema = ZKMetadataProvider.getTableSchema(helixPropertyStore, - TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(rawTableName)); - requests.add(constructServerQueryRequest(distributedStagePlan, requestMetadataMap, tableConfig, schema, - stageMetadata.getTimeBoundaryInfo(), TableType.REALTIME, tableEntry.getValue())); - } else { - throw new IllegalArgumentException("Unsupported table type key: " + tableType); - } - } - return requests; - } - - public static ServerQueryRequest constructServerQueryRequest(DistributedStagePlan distributedStagePlan, - Map<String, String> requestMetadataMap, TableConfig tableConfig, Schema schema, - TimeBoundaryInfo timeBoundaryInfo, TableType tableType, List<String> segmentList) { - InstanceRequest instanceRequest = new InstanceRequest(); - instanceRequest.setRequestId(Long.parseLong(requestMetadataMap.get("REQUEST_ID"))); - instanceRequest.setBrokerId("unknown"); - instanceRequest.setEnableTrace(false); - instanceRequest.setSearchSegments(segmentList); - instanceRequest.setQuery(constructBrokerRequest(distributedStagePlan, tableType, tableConfig, schema, - timeBoundaryInfo)); - return new ServerQueryRequest(instanceRequest, new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()), - System.currentTimeMillis()); - } - - // TODO: this is a hack, create a broker request object should not be needed because we rewrite the entire - // query into stages already. - public static BrokerRequest constructBrokerRequest(DistributedStagePlan distributedStagePlan, TableType tableType, - TableConfig tableConfig, Schema schema, TimeBoundaryInfo timeBoundaryInfo) { - PinotQuery pinotQuery = constructPinotQuery(distributedStagePlan, tableType, tableConfig, schema, timeBoundaryInfo); - BrokerRequest brokerRequest = new BrokerRequest(); - brokerRequest.setPinotQuery(pinotQuery); - // Set table name in broker request because it is used for access control, query routing etc. - DataSource dataSource = pinotQuery.getDataSource(); - if (dataSource != null) { - QuerySource querySource = new QuerySource(); - querySource.setTableName(dataSource.getTableName()); - brokerRequest.setQuerySource(querySource); - } - return brokerRequest; - } - - public static PinotQuery constructPinotQuery(DistributedStagePlan distributedStagePlan, TableType tableType, - TableConfig tableConfig, Schema schema, TimeBoundaryInfo timeBoundaryInfo) { - PinotQuery pinotQuery = new PinotQuery(); - pinotQuery.setLimit(DEFAULT_LEAF_NODE_LIMIT); - pinotQuery.setExplain(false); - walkStageTree(distributedStagePlan.getStageRoot(), pinotQuery, tableType); - if (timeBoundaryInfo != null) { - attachTimeBoundary(pinotQuery, timeBoundaryInfo, tableType == TableType.OFFLINE); - } - for (QueryRewriter queryRewriter : QUERY_REWRITERS) { - pinotQuery = queryRewriter.rewrite(pinotQuery); - } - QUERY_OPTIMIZER.optimize(pinotQuery, tableConfig, schema); - return pinotQuery; - } - - private static void walkStageTree(StageNode node, PinotQuery pinotQuery, TableType tableType) { - // this walkStageTree should only be a sequential walk. - for (StageNode child : node.getInputs()) { - walkStageTree(child, pinotQuery, tableType); - } - if (node instanceof TableScanNode) { - TableScanNode tableScanNode = (TableScanNode) node; - DataSource dataSource = new DataSource(); - String tableNameWithType = TableNameBuilder.forType(tableType) - .tableNameWithType(TableNameBuilder.extractRawTableName(tableScanNode.getTableName())); - dataSource.setTableName(tableNameWithType); - pinotQuery.setDataSource(dataSource); - pinotQuery.setSelectList(tableScanNode.getTableScanColumns().stream().map(RequestUtils::getIdentifierExpression) - .collect(Collectors.toList())); - } else if (node instanceof FilterNode) { - pinotQuery.setFilterExpression(CalciteRexExpressionParser.toExpression( - ((FilterNode) node).getCondition(), pinotQuery)); - } else if (node instanceof ProjectNode) { - pinotQuery.setSelectList(CalciteRexExpressionParser.overwriteSelectList( - ((ProjectNode) node).getProjects(), pinotQuery)); - } else if (node instanceof AggregateNode) { - // set group-by list - pinotQuery.setGroupByList(CalciteRexExpressionParser.convertGroupByList( - ((AggregateNode) node).getGroupSet(), pinotQuery)); - // set agg list - pinotQuery.setSelectList(CalciteRexExpressionParser.addSelectList(pinotQuery.getGroupByList(), - ((AggregateNode) node).getAggCalls(), pinotQuery)); - } else if (node instanceof SortNode) { - if (((SortNode) node).getCollationKeys().size() > 0) { - pinotQuery.setOrderByList(CalciteRexExpressionParser.convertOrderByList(((SortNode) node).getCollationKeys(), - ((SortNode) node).getCollationDirections(), pinotQuery)); - } - if (((SortNode) node).getFetch() > 0) { - pinotQuery.setLimit(((SortNode) node).getFetch()); - } - if (((SortNode) node).getOffset() > 0) { - pinotQuery.setOffset(((SortNode) node).getOffset()); - } - } else if (node instanceof MailboxSendNode) { - // TODO: MailboxSendNode should be the root of the leaf stage. but ignore for now since it is handle seperately - // in QueryRunner as a single step sender. - } else { - throw new UnsupportedOperationException("Unsupported logical plan node: " + node); - } - } - - /** - * Helper method to attach the time boundary to the given PinotQuery. - */ - private static void attachTimeBoundary(PinotQuery pinotQuery, TimeBoundaryInfo timeBoundaryInfo, - boolean isOfflineRequest) { - String timeColumn = timeBoundaryInfo.getTimeColumn(); - String timeValue = timeBoundaryInfo.getTimeValue(); - Expression timeFilterExpression = RequestUtils.getFunctionExpression( - isOfflineRequest ? FilterKind.LESS_THAN_OR_EQUAL.name() : FilterKind.GREATER_THAN.name()); - timeFilterExpression.getFunctionCall().setOperands( - Arrays.asList(RequestUtils.getIdentifierExpression(timeColumn), RequestUtils.getLiteralExpression(timeValue))); - - Expression filterExpression = pinotQuery.getFilterExpression(); - if (filterExpression != null) { - Expression andFilterExpression = RequestUtils.getFunctionExpression(FilterKind.AND.name()); - andFilterExpression.getFunctionCall().setOperands(Arrays.asList(filterExpression, timeFilterExpression)); - pinotQuery.setFilterExpression(andFilterExpression); - } else { - pinotQuery.setFilterExpression(timeFilterExpression); - } - } -} diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java index eb7e354b9e..3c985d77aa 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java @@ -26,6 +26,10 @@ import java.util.ArrayList; import java.util.Comparator; import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import org.apache.pinot.common.utils.NamedThreadFactory; +import org.apache.pinot.core.query.scheduler.resources.ResourceManager; import org.apache.pinot.core.transport.ServerInstance; import org.apache.pinot.query.planner.QueryPlan; import org.apache.pinot.query.planner.stage.MailboxReceiveNode; @@ -38,6 +42,8 @@ import org.testng.annotations.Test; public class QueryRunnerTest extends QueryRunnerTestBase { + private static final ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool( + ResourceManager.DEFAULT_QUERY_WORKER_THREADS, new NamedThreadFactory("query_server_enclosure")); @Test(dataProvider = "testDataWithSqlToFinalRowCount") public void testSqlWithFinalRowCountChecker(String sql, int expectedRows) @@ -71,7 +77,9 @@ public class QueryRunnerTest extends QueryRunnerTestBase { for (ServerInstance serverInstance : queryPlan.getStageMetadataMap().get(stageId).getServerInstances()) { DistributedStagePlan distributedStagePlan = QueryDispatcher.constructDistributedStagePlan(queryPlan, stageId, serverInstance); - _servers.get(serverInstance).processQuery(distributedStagePlan, requestMetadataMap); + EXECUTOR_SERVICE.submit(() -> { + _servers.get(serverInstance).processQuery(distributedStagePlan, requestMetadataMap); + }); } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org