This is an automated email from the ASF dual-hosted git repository.

rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 6f9079921b [multistage] visitor pattern for server request 
construction (#9653)
6f9079921b is described below

commit 6f9079921b23c899d1bb906c5ca6d30159c0b555
Author: Rong Rong <ro...@apache.org>
AuthorDate: Mon Oct 31 10:40:14 2022 -0700

    [multistage] visitor pattern for server request construction (#9653)
    
    * use visitor pattern for server request construction
    * move MailboxService into context
    
    Co-authored-by: Rong Rong <ro...@startree.ai>
---
 .../apache/pinot/query/runtime/QueryRunner.java    |  55 ++++-
 .../runtime/executor/WorkerQueryExecutor.java      |   6 +-
 .../{executor => plan}/PhysicalPlanVisitor.java    |  75 +++----
 .../query/runtime/plan/PlanRequestContext.java     |  68 ++++++
 .../runtime/plan/ServerRequestPlanVisitor.java     | 242 +++++++++++++++++++++
 .../plan/server/ServerPlanRequestContext.java      |  67 ++++++
 .../query/runtime/utils/ServerRequestUtils.java    | 235 --------------------
 .../pinot/query/runtime/QueryRunnerTest.java       |  10 +-
 8 files changed, 469 insertions(+), 289 deletions(-)

diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
index 1ff15eb189..1a9a8cd88c 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.query.runtime;
 
+import com.google.common.base.Preconditions;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
@@ -31,6 +32,7 @@ import org.apache.pinot.common.datablock.BaseDataBlock;
 import org.apache.pinot.common.datablock.DataBlockUtils;
 import org.apache.pinot.common.datablock.MetadataBlock;
 import org.apache.pinot.common.datatable.DataTable;
+import org.apache.pinot.common.metadata.ZKMetadataProvider;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.core.common.Operator;
@@ -49,10 +51,16 @@ import 
org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
 import org.apache.pinot.query.runtime.executor.WorkerQueryExecutor;
 import org.apache.pinot.query.runtime.operator.MailboxSendOperator;
 import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
-import org.apache.pinot.query.runtime.utils.ServerRequestUtils;
+import org.apache.pinot.query.runtime.plan.ServerRequestPlanVisitor;
+import org.apache.pinot.query.runtime.plan.server.ServerPlanRequestContext;
 import org.apache.pinot.query.service.QueryConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.metrics.PinotMetricUtils;
 import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -112,13 +120,14 @@ public class QueryRunner {
       // TODO: make server query request return via mailbox, this is a hack to 
gather the non-streaming data table
       // and package it here for return. But we should really use a 
MailboxSendOperator directly put into the
       // server executor.
-      List<ServerQueryRequest> serverQueryRequests =
-          ServerRequestUtils.constructServerQueryRequest(distributedStagePlan, 
requestMetadataMap,
-              _helixPropertyStore);
+      List<ServerPlanRequestContext> serverQueryRequests = 
constructServerQueryRequests(distributedStagePlan,
+          requestMetadataMap, _helixPropertyStore, _mailboxService);
 
       // send the data table via mailbox in one-off fashion (e.g. no 
block-level split, one data table/partition key)
       List<BaseDataBlock> serverQueryResults = new 
ArrayList<>(serverQueryRequests.size());
-      for (ServerQueryRequest request : serverQueryRequests) {
+      for (ServerPlanRequestContext requestContext : serverQueryRequests) {
+        ServerQueryRequest request = new 
ServerQueryRequest(requestContext.getInstanceRequest(),
+            new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()), 
System.currentTimeMillis());
         serverQueryResults.add(processServerQuery(request, executorService));
       }
 
@@ -139,6 +148,42 @@ public class QueryRunner {
     }
   }
 
+  private static List<ServerPlanRequestContext> 
constructServerQueryRequests(DistributedStagePlan distributedStagePlan,
+      Map<String, String> requestMetadataMap, ZkHelixPropertyStore<ZNRecord> 
helixPropertyStore,
+      MailboxService<TransferableBlock> mailboxService) {
+    StageMetadata stageMetadata = 
distributedStagePlan.getMetadataMap().get(distributedStagePlan.getStageId());
+    Preconditions.checkState(stageMetadata.getScannedTables().size() == 1,
+        "Server request for V2 engine should only have 1 scan table per 
request.");
+    String rawTableName = stageMetadata.getScannedTables().get(0);
+    Map<String, List<String>> tableToSegmentListMap = 
stageMetadata.getServerInstanceToSegmentsMap()
+        .get(distributedStagePlan.getServerInstance());
+    List<ServerPlanRequestContext> requests = new ArrayList<>();
+    for (Map.Entry<String, List<String>> tableEntry : 
tableToSegmentListMap.entrySet()) {
+      String tableType = tableEntry.getKey();
+      // ZkHelixPropertyStore extends from ZkCacheBaseDataAccessor so it 
should not cause too much out-of-the-box
+      // network traffic. but there's chance to improve this:
+      // TODO: use TableDataManager: it is already getting tableConfig and 
Schema when processing segments.
+      if (TableType.OFFLINE.name().equals(tableType)) {
+        TableConfig tableConfig = 
ZKMetadataProvider.getTableConfig(helixPropertyStore,
+            
TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(rawTableName));
+        Schema schema = ZKMetadataProvider.getTableSchema(helixPropertyStore,
+            
TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(rawTableName));
+        requests.add(ServerRequestPlanVisitor.build(mailboxService, 
distributedStagePlan, requestMetadataMap,
+            tableConfig, schema, stageMetadata.getTimeBoundaryInfo(), 
TableType.OFFLINE, tableEntry.getValue()));
+      } else if (TableType.REALTIME.name().equals(tableType)) {
+        TableConfig tableConfig = 
ZKMetadataProvider.getTableConfig(helixPropertyStore,
+            
TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(rawTableName));
+        Schema schema = ZKMetadataProvider.getTableSchema(helixPropertyStore,
+            
TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(rawTableName));
+        requests.add(ServerRequestPlanVisitor.build(mailboxService, 
distributedStagePlan, requestMetadataMap,
+            tableConfig, schema, stageMetadata.getTimeBoundaryInfo(), 
TableType.REALTIME, tableEntry.getValue()));
+      } else {
+        throw new IllegalArgumentException("Unsupported table type key: " + 
tableType);
+      }
+    }
+    return requests;
+  }
+
   private BaseDataBlock processServerQuery(ServerQueryRequest 
serverQueryRequest, ExecutorService executorService) {
     BaseDataBlock dataBlock;
     try {
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java
index 08de5ca7e0..2436061aa9 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java
@@ -29,6 +29,8 @@ import org.apache.pinot.query.planner.stage.StageNode;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
 import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
 import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
+import org.apache.pinot.query.runtime.plan.PhysicalPlanVisitor;
+import org.apache.pinot.query.runtime.plan.PlanRequestContext;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -71,8 +73,8 @@ public class WorkerQueryExecutor {
     long requestId = Long.parseLong(requestMetadataMap.get("REQUEST_ID"));
     StageNode stageRoot = queryRequest.getStageRoot();
 
-    Operator<TransferableBlock> rootOperator = PhysicalPlanVisitor.build(
-        _mailboxService, _hostName, _port, requestId, 
queryRequest.getMetadataMap(), stageRoot);
+    Operator<TransferableBlock> rootOperator = 
PhysicalPlanVisitor.build(stageRoot, new PlanRequestContext(
+        _mailboxService, requestId, stageRoot.getStageId(), _hostName, _port, 
queryRequest.getMetadataMap()));
 
     executorService.submit(new TraceRunnable() {
       @Override
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/PhysicalPlanVisitor.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
similarity index 69%
rename from 
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/PhysicalPlanVisitor.java
rename to 
pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
index 6af229aa78..6dab5865ba 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/PhysicalPlanVisitor.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java
@@ -16,14 +16,13 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.query.runtime.executor;
+package org.apache.pinot.query.runtime.plan;
 
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import org.apache.pinot.core.common.Operator;
 import org.apache.pinot.core.transport.ServerInstance;
-import org.apache.pinot.query.mailbox.MailboxService;
 import org.apache.pinot.query.planner.StageMetadata;
 import org.apache.pinot.query.planner.stage.AggregateNode;
 import org.apache.pinot.query.planner.stage.FilterNode;
@@ -45,7 +44,6 @@ import 
org.apache.pinot.query.runtime.operator.MailboxReceiveOperator;
 import org.apache.pinot.query.runtime.operator.MailboxSendOperator;
 import org.apache.pinot.query.runtime.operator.SortOperator;
 import org.apache.pinot.query.runtime.operator.TransformOperator;
-import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
 
 
 /**
@@ -53,69 +51,54 @@ import 
org.apache.pinot.query.runtime.plan.DistributedStagePlan;
  * this works only for the intermediate stage nodes, leaf stage nodes are 
expected to compile into
  * v1 operators at this point in time.
  *
- * <p>This class should be used statically via {@link #build(MailboxService, 
String, int, long, Map, StageNode)}
+ * <p>This class should be used statically via {@link #build(StageNode, 
PlanRequestContext)}
  *
  * @see 
org.apache.pinot.query.runtime.QueryRunner#processQuery(DistributedStagePlan, 
ExecutorService, Map)
  */
-public class PhysicalPlanVisitor implements 
StageNodeVisitor<Operator<TransferableBlock>, Void> {
+public class PhysicalPlanVisitor implements 
StageNodeVisitor<Operator<TransferableBlock>, PlanRequestContext> {
+  private static final PhysicalPlanVisitor INSTANCE = new 
PhysicalPlanVisitor();
 
-  private final MailboxService<TransferableBlock> _mailboxService;
-  private final String _hostName;
-  private final int _port;
-  private final long _requestId;
-  private final Map<Integer, StageMetadata> _metadataMap;
-
-  public static Operator<TransferableBlock> 
build(MailboxService<TransferableBlock> mailboxService,
-      String hostName, int port, long requestId, Map<Integer, StageMetadata> 
metadataMap, StageNode node) {
-    return node.visit(new PhysicalPlanVisitor(mailboxService, hostName, port, 
requestId, metadataMap), null);
-  }
-
-  private PhysicalPlanVisitor(MailboxService<TransferableBlock> 
mailboxService, String hostName, int port,
-      long requestId, Map<Integer, StageMetadata> metadataMap) {
-    _mailboxService = mailboxService;
-    _hostName = hostName;
-    _port = port;
-    _requestId = requestId;
-    _metadataMap = metadataMap;
+  public static Operator<TransferableBlock> build(StageNode node, 
PlanRequestContext context) {
+    return node.visit(INSTANCE, context);
   }
 
   @Override
-  public Operator<TransferableBlock> visitMailboxReceive(MailboxReceiveNode 
node, Void context) {
-    List<ServerInstance> sendingInstances = 
_metadataMap.get(node.getSenderStageId()).getServerInstances();
-    return new MailboxReceiveOperator(_mailboxService, node.getDataSchema(), 
sendingInstances,
-        node.getExchangeType(), node.getPartitionKeySelector(), _hostName, 
_port, _requestId,
-        node.getSenderStageId());
+  public Operator<TransferableBlock> visitMailboxReceive(MailboxReceiveNode 
node, PlanRequestContext context) {
+    List<ServerInstance> sendingInstances = 
context.getMetadataMap().get(node.getSenderStageId()).getServerInstances();
+    return new MailboxReceiveOperator(context.getMailboxService(), 
node.getDataSchema(), sendingInstances,
+        node.getExchangeType(), node.getPartitionKeySelector(), 
context.getHostName(), context.getPort(),
+        context.getRequestId(), node.getSenderStageId());
   }
 
   @Override
-  public Operator<TransferableBlock> visitMailboxSend(MailboxSendNode node, 
Void context) {
-    Operator<TransferableBlock> nextOperator = 
node.getInputs().get(0).visit(this, null);
-    StageMetadata receivingStageMetadata = 
_metadataMap.get(node.getReceiverStageId());
-    return new MailboxSendOperator(_mailboxService, node.getDataSchema(), 
nextOperator,
+  public Operator<TransferableBlock> visitMailboxSend(MailboxSendNode node, 
PlanRequestContext context) {
+    Operator<TransferableBlock> nextOperator = 
node.getInputs().get(0).visit(this, context);
+    StageMetadata receivingStageMetadata = 
context.getMetadataMap().get(node.getReceiverStageId());
+    return new MailboxSendOperator(context.getMailboxService(), 
node.getDataSchema(), nextOperator,
         receivingStageMetadata.getServerInstances(), node.getExchangeType(), 
node.getPartitionKeySelector(),
-        _hostName, _port, _requestId, node.getStageId());
+        context.getHostName(), context.getPort(), context.getRequestId(), 
node.getStageId());
   }
 
   @Override
-  public Operator<TransferableBlock> visitAggregate(AggregateNode node, Void 
context) {
-    Operator<TransferableBlock> nextOperator = 
node.getInputs().get(0).visit(this, null);
+  public Operator<TransferableBlock> visitAggregate(AggregateNode node, 
PlanRequestContext context) {
+    Operator<TransferableBlock> nextOperator = 
node.getInputs().get(0).visit(this, context);
     return new AggregateOperator(nextOperator, node.getDataSchema(), 
node.getAggCalls(),
         node.getGroupSet(), node.getInputs().get(0).getDataSchema());
   }
 
   @Override
-  public Operator<TransferableBlock> visitFilter(FilterNode node, Void 
context) {
-    Operator<TransferableBlock> nextOperator = 
node.getInputs().get(0).visit(this, null);
+  public Operator<TransferableBlock> visitFilter(FilterNode node, 
PlanRequestContext context) {
+    Operator<TransferableBlock> nextOperator = 
node.getInputs().get(0).visit(this, context);
     return new FilterOperator(nextOperator, node.getDataSchema(), 
node.getCondition());
   }
 
   @Override
-  public Operator<TransferableBlock> visitJoin(JoinNode node, Void context) {
+  public Operator<TransferableBlock> visitJoin(JoinNode node, 
PlanRequestContext context) {
     StageNode left = node.getInputs().get(0);
     StageNode right = node.getInputs().get(1);
 
-    Operator<TransferableBlock> leftOperator = left.visit(this, null);
-    Operator<TransferableBlock> rightOperator = right.visit(this, null);
+    Operator<TransferableBlock> leftOperator = left.visit(this, context);
+    Operator<TransferableBlock> rightOperator = right.visit(this, context);
 
     return new HashJoinOperator(leftOperator, left.getDataSchema(), 
rightOperator,
         right.getDataSchema(), node.getDataSchema(), node.getJoinKeys(),
@@ -123,26 +106,26 @@ public class PhysicalPlanVisitor implements 
StageNodeVisitor<Operator<Transferab
   }
 
   @Override
-  public Operator<TransferableBlock> visitProject(ProjectNode node, Void 
context) {
-    Operator<TransferableBlock> nextOperator = 
node.getInputs().get(0).visit(this, null);
+  public Operator<TransferableBlock> visitProject(ProjectNode node, 
PlanRequestContext context) {
+    Operator<TransferableBlock> nextOperator = 
node.getInputs().get(0).visit(this, context);
     return new TransformOperator(nextOperator, node.getDataSchema(), 
node.getProjects(),
         node.getInputs().get(0).getDataSchema());
   }
 
   @Override
-  public Operator<TransferableBlock> visitSort(SortNode node, Void context) {
-    Operator<TransferableBlock> nextOperator = 
node.getInputs().get(0).visit(this, null);
+  public Operator<TransferableBlock> visitSort(SortNode node, 
PlanRequestContext context) {
+    Operator<TransferableBlock> nextOperator = 
node.getInputs().get(0).visit(this, context);
     return new SortOperator(nextOperator, node.getCollationKeys(), 
node.getCollationDirections(),
         node.getFetch(), node.getOffset(), node.getDataSchema());
   }
 
   @Override
-  public Operator<TransferableBlock> visitTableScan(TableScanNode node, Void 
context) {
+  public Operator<TransferableBlock> visitTableScan(TableScanNode node, 
PlanRequestContext context) {
     throw new UnsupportedOperationException("Stage node of type TableScanNode 
is not supported!");
   }
 
   @Override
-  public Operator<TransferableBlock> visitValue(ValueNode node, Void context) {
+  public Operator<TransferableBlock> visitValue(ValueNode node, 
PlanRequestContext context) {
     return new LiteralValueOperator(node.getDataSchema(), 
node.getLiteralRows());
   }
 }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PlanRequestContext.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PlanRequestContext.java
new file mode 100644
index 0000000000..ae3e6c26db
--- /dev/null
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PlanRequestContext.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.plan;
+
+import java.util.Map;
+import org.apache.pinot.query.mailbox.MailboxService;
+import org.apache.pinot.query.planner.StageMetadata;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+
+
+public class PlanRequestContext {
+  protected final MailboxService<TransferableBlock> _mailboxService;
+  protected final long _requestId;
+  protected final int _stageId;
+  protected final String _hostName;
+  protected final int _port;
+  protected final Map<Integer, StageMetadata> _metadataMap;
+
+  public PlanRequestContext(MailboxService<TransferableBlock> mailboxService, 
long requestId, int stageId,
+      String hostName, int port, Map<Integer, StageMetadata> metadataMap) {
+    _mailboxService = mailboxService;
+    _requestId = requestId;
+    _stageId = stageId;
+    _hostName = hostName;
+    _port = port;
+    _metadataMap = metadataMap;
+  }
+
+  public long getRequestId() {
+    return _requestId;
+  }
+
+  public int getStageId() {
+    return _stageId;
+  }
+
+  public String getHostName() {
+    return _hostName;
+  }
+
+  public int getPort() {
+    return _port;
+  }
+
+  public Map<Integer, StageMetadata> getMetadataMap() {
+    return _metadataMap;
+  }
+
+  public MailboxService<TransferableBlock> getMailboxService() {
+    return _mailboxService;
+  }
+}
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/ServerRequestPlanVisitor.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/ServerRequestPlanVisitor.java
new file mode 100644
index 0000000000..8735d59674
--- /dev/null
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/ServerRequestPlanVisitor.java
@@ -0,0 +1,242 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.plan;
+
+import com.google.common.collect.ImmutableList;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.pinot.common.request.BrokerRequest;
+import org.apache.pinot.common.request.DataSource;
+import org.apache.pinot.common.request.Expression;
+import org.apache.pinot.common.request.InstanceRequest;
+import org.apache.pinot.common.request.PinotQuery;
+import org.apache.pinot.common.request.QuerySource;
+import org.apache.pinot.common.utils.request.RequestUtils;
+import org.apache.pinot.core.query.optimizer.QueryOptimizer;
+import org.apache.pinot.core.routing.TimeBoundaryInfo;
+import org.apache.pinot.query.mailbox.MailboxService;
+import org.apache.pinot.query.parser.CalciteRexExpressionParser;
+import org.apache.pinot.query.planner.stage.AggregateNode;
+import org.apache.pinot.query.planner.stage.FilterNode;
+import org.apache.pinot.query.planner.stage.JoinNode;
+import org.apache.pinot.query.planner.stage.MailboxReceiveNode;
+import org.apache.pinot.query.planner.stage.MailboxSendNode;
+import org.apache.pinot.query.planner.stage.ProjectNode;
+import org.apache.pinot.query.planner.stage.SortNode;
+import org.apache.pinot.query.planner.stage.StageNode;
+import org.apache.pinot.query.planner.stage.StageNodeVisitor;
+import org.apache.pinot.query.planner.stage.TableScanNode;
+import org.apache.pinot.query.planner.stage.ValueNode;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.plan.server.ServerPlanRequestContext;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.apache.pinot.sql.FilterKind;
+import 
org.apache.pinot.sql.parsers.rewriter.NonAggregationGroupByToDistinctQueryRewriter;
+import org.apache.pinot.sql.parsers.rewriter.PredicateComparisonRewriter;
+import org.apache.pinot.sql.parsers.rewriter.QueryRewriter;
+import org.apache.pinot.sql.parsers.rewriter.QueryRewriterFactory;
+
+
+/**
+ * Plan visitor for direct leaf-stage server request.
+ *
+ * This should be merged with logics in {@link 
org.apache.pinot.core.plan.maker.InstancePlanMakerImplV2} in the future
+ * to directly produce operator chain.
+ *
+ * As of now, the reason why we use the plan visitor for server request is for 
additional support such as dynamic
+ * filtering and other auxiliary functionalities.
+ */
+public class ServerRequestPlanVisitor implements StageNodeVisitor<Void, 
ServerPlanRequestContext> {
+  private static final int DEFAULT_LEAF_NODE_LIMIT = 10_000_000;
+  private static final List<String> QUERY_REWRITERS_CLASS_NAMES =
+      ImmutableList.of(
+          PredicateComparisonRewriter.class.getName(),
+          NonAggregationGroupByToDistinctQueryRewriter.class.getName()
+      );
+  private static final List<QueryRewriter> QUERY_REWRITERS = new ArrayList<>(
+      QueryRewriterFactory.getQueryRewriters(QUERY_REWRITERS_CLASS_NAMES));
+  private static final QueryOptimizer QUERY_OPTIMIZER = new QueryOptimizer();
+
+  private static final ServerRequestPlanVisitor INSTANCE = new 
ServerRequestPlanVisitor();
+  private static Void _aVoid = null;
+
+  public static ServerPlanRequestContext 
build(MailboxService<TransferableBlock> mailboxService,
+      DistributedStagePlan stagePlan, Map<String, String> requestMetadataMap, 
TableConfig tableConfig, Schema schema,
+      TimeBoundaryInfo timeBoundaryInfo, TableType tableType, List<String> 
segmentList) {
+    // Before-visit: construct the ServerPlanRequestContext baseline
+    long requestId = Long.parseLong(requestMetadataMap.get("REQUEST_ID"));
+    PinotQuery pinotQuery = new PinotQuery();
+    pinotQuery.setLimit(DEFAULT_LEAF_NODE_LIMIT);
+    pinotQuery.setExplain(false);
+    ServerPlanRequestContext context = new 
ServerPlanRequestContext(mailboxService, requestId, stagePlan.getStageId(),
+        stagePlan.getServerInstance().getHostname(), 
stagePlan.getServerInstance().getPort(),
+        stagePlan.getMetadataMap(), pinotQuery, tableType, timeBoundaryInfo);
+
+    // visit the plan and create query physical plan.
+    ServerRequestPlanVisitor.walkStageNode(stagePlan.getStageRoot(), context);
+
+    // Post-visit: finalize context.
+    // 1. global rewrite/optimize
+    if (timeBoundaryInfo != null) {
+      attachTimeBoundary(pinotQuery, timeBoundaryInfo, tableType == 
TableType.OFFLINE);
+    }
+    for (QueryRewriter queryRewriter : QUERY_REWRITERS) {
+      pinotQuery = queryRewriter.rewrite(pinotQuery);
+    }
+    QUERY_OPTIMIZER.optimize(pinotQuery, tableConfig, schema);
+
+    // 2. wrapped around in broker request
+    BrokerRequest brokerRequest = new BrokerRequest();
+    brokerRequest.setPinotQuery(pinotQuery);
+    DataSource dataSource = pinotQuery.getDataSource();
+    if (dataSource != null) {
+      QuerySource querySource = new QuerySource();
+      querySource.setTableName(dataSource.getTableName());
+      brokerRequest.setQuerySource(querySource);
+    }
+
+    // 3. create instance request with segmentList
+    InstanceRequest instanceRequest = new InstanceRequest();
+    instanceRequest.setRequestId(requestId);
+    instanceRequest.setBrokerId("unknown");
+    instanceRequest.setEnableTrace(false);
+    instanceRequest.setSearchSegments(segmentList);
+    instanceRequest.setQuery(brokerRequest);
+
+    context.setInstanceRequest(instanceRequest);
+    return context;
+  }
+
+  private static void walkStageNode(StageNode node, ServerPlanRequestContext 
context) {
+    node.visit(INSTANCE, context);
+  }
+
+  @Override
+  public Void visitAggregate(AggregateNode node, ServerPlanRequestContext 
context) {
+    visitChildren(node, context);
+    // set group-by list
+    
context.getPinotQuery().setGroupByList(CalciteRexExpressionParser.convertGroupByList(
+        node.getGroupSet(), context.getPinotQuery()));
+    // set agg list
+    
context.getPinotQuery().setSelectList(CalciteRexExpressionParser.addSelectList(
+        context.getPinotQuery().getGroupByList(), node.getAggCalls(), 
context.getPinotQuery()));
+    return _aVoid;
+  }
+
+  @Override
+  public Void visitFilter(FilterNode node, ServerPlanRequestContext context) {
+    visitChildren(node, context);
+    
context.getPinotQuery().setFilterExpression(CalciteRexExpressionParser.toExpression(
+        node.getCondition(), context.getPinotQuery()));
+    return _aVoid;
+  }
+
+  @Override
+  public Void visitJoin(JoinNode node, ServerPlanRequestContext context) {
+    visitChildren(node, context);
+    return _aVoid;
+  }
+
+  @Override
+  public Void visitMailboxReceive(MailboxReceiveNode node, 
ServerPlanRequestContext context) {
+    visitChildren(node, context);
+    return _aVoid;
+  }
+
+  @Override
+  public Void visitMailboxSend(MailboxSendNode node, ServerPlanRequestContext 
context) {
+    visitChildren(node, context);
+    return _aVoid;
+  }
+
+  @Override
+  public Void visitProject(ProjectNode node, ServerPlanRequestContext context) 
{
+    visitChildren(node, context);
+    
context.getPinotQuery().setSelectList(CalciteRexExpressionParser.overwriteSelectList(
+        node.getProjects(), context.getPinotQuery()));
+    return _aVoid;
+  }
+
+  @Override
+  public Void visitSort(SortNode node, ServerPlanRequestContext context) {
+    visitChildren(node, context);
+    if (node.getCollationKeys().size() > 0) {
+      
context.getPinotQuery().setOrderByList(CalciteRexExpressionParser.convertOrderByList(node.getCollationKeys(),
+          node.getCollationDirections(), context.getPinotQuery()));
+    }
+    if (node.getFetch() > 0) {
+      context.getPinotQuery().setLimit(node.getFetch());
+    }
+    if (node.getOffset() > 0) {
+      context.getPinotQuery().setOffset(node.getOffset());
+    }
+    return _aVoid;
+  }
+
+  @Override
+  public Void visitTableScan(TableScanNode node, ServerPlanRequestContext 
context) {
+    DataSource dataSource = new DataSource();
+    String tableNameWithType = TableNameBuilder.forType(context.getTableType())
+        
.tableNameWithType(TableNameBuilder.extractRawTableName(node.getTableName()));
+    dataSource.setTableName(tableNameWithType);
+    context.getPinotQuery().setDataSource(dataSource);
+    context.getPinotQuery().setSelectList(node.getTableScanColumns().stream()
+        
.map(RequestUtils::getIdentifierExpression).collect(Collectors.toList()));
+    return _aVoid;
+  }
+
+  @Override
+  public Void visitValue(ValueNode node, ServerPlanRequestContext context) {
+    visitChildren(node, context);
+    return _aVoid;
+  }
+
+  private void visitChildren(StageNode node, ServerPlanRequestContext context) 
{
+    for (StageNode child : node.getInputs()) {
+      child.visit(this, context);
+    }
+  }
+  /**
+   * Helper method to attach the time boundary to the given PinotQuery.
+   */
+  private static void attachTimeBoundary(PinotQuery pinotQuery, 
TimeBoundaryInfo timeBoundaryInfo,
+      boolean isOfflineRequest) {
+    String timeColumn = timeBoundaryInfo.getTimeColumn();
+    String timeValue = timeBoundaryInfo.getTimeValue();
+    Expression timeFilterExpression = RequestUtils.getFunctionExpression(
+        isOfflineRequest ? FilterKind.LESS_THAN_OR_EQUAL.name() : 
FilterKind.GREATER_THAN.name());
+    timeFilterExpression.getFunctionCall().setOperands(
+        Arrays.asList(RequestUtils.getIdentifierExpression(timeColumn), 
RequestUtils.getLiteralExpression(timeValue)));
+
+    Expression filterExpression = pinotQuery.getFilterExpression();
+    if (filterExpression != null) {
+      Expression andFilterExpression = 
RequestUtils.getFunctionExpression(FilterKind.AND.name());
+      
andFilterExpression.getFunctionCall().setOperands(Arrays.asList(filterExpression,
 timeFilterExpression));
+      pinotQuery.setFilterExpression(andFilterExpression);
+    } else {
+      pinotQuery.setFilterExpression(timeFilterExpression);
+    }
+  }
+}
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestContext.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestContext.java
new file mode 100644
index 0000000000..cc63d04f0e
--- /dev/null
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestContext.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.plan.server;
+
+import java.util.Map;
+import org.apache.pinot.common.request.InstanceRequest;
+import org.apache.pinot.common.request.PinotQuery;
+import org.apache.pinot.core.routing.TimeBoundaryInfo;
+import org.apache.pinot.query.mailbox.MailboxService;
+import org.apache.pinot.query.planner.StageMetadata;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.plan.PlanRequestContext;
+import org.apache.pinot.spi.config.table.TableType;
+
+
+/**
+ * Context class for converting a {@link 
org.apache.pinot.query.runtime.plan.DistributedStagePlan} into
+ * {@link PinotQuery} to execute on server.
+ */
+public class ServerPlanRequestContext extends PlanRequestContext {
+  protected TableType _tableType;
+  protected TimeBoundaryInfo _timeBoundaryInfo;
+
+  protected PinotQuery _pinotQuery;
+  protected InstanceRequest _instanceRequest;
+
+  public ServerPlanRequestContext(MailboxService<TransferableBlock> 
mailboxService, long requestId, int stageId,
+      String hostName, int port, Map<Integer, StageMetadata> metadataMap, 
PinotQuery pinotQuery, TableType tableType,
+      TimeBoundaryInfo timeBoundaryInfo) {
+    super(mailboxService, requestId, stageId, hostName, port, metadataMap);
+    _pinotQuery = pinotQuery;
+    _tableType = tableType;
+    _timeBoundaryInfo = timeBoundaryInfo;
+  }
+
+  public TableType getTableType() {
+    return _tableType;
+  }
+
+  public PinotQuery getPinotQuery() {
+    return _pinotQuery;
+  }
+
+  public void setInstanceRequest(InstanceRequest instanceRequest) {
+    _instanceRequest = instanceRequest;
+  }
+
+  public InstanceRequest getInstanceRequest() {
+    return _instanceRequest;
+  }
+}
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/utils/ServerRequestUtils.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/utils/ServerRequestUtils.java
deleted file mode 100644
index 2e68c601ab..0000000000
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/utils/ServerRequestUtils.java
+++ /dev/null
@@ -1,235 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.query.runtime.utils;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-import org.apache.helix.store.zk.ZkHelixPropertyStore;
-import org.apache.helix.zookeeper.datamodel.ZNRecord;
-import org.apache.pinot.common.metadata.ZKMetadataProvider;
-import org.apache.pinot.common.metrics.ServerMetrics;
-import org.apache.pinot.common.request.BrokerRequest;
-import org.apache.pinot.common.request.DataSource;
-import org.apache.pinot.common.request.Expression;
-import org.apache.pinot.common.request.InstanceRequest;
-import org.apache.pinot.common.request.PinotQuery;
-import org.apache.pinot.common.request.QuerySource;
-import org.apache.pinot.common.utils.request.RequestUtils;
-import org.apache.pinot.core.query.optimizer.QueryOptimizer;
-import org.apache.pinot.core.query.request.ServerQueryRequest;
-import org.apache.pinot.core.routing.TimeBoundaryInfo;
-import org.apache.pinot.query.parser.CalciteRexExpressionParser;
-import org.apache.pinot.query.planner.StageMetadata;
-import org.apache.pinot.query.planner.stage.AggregateNode;
-import org.apache.pinot.query.planner.stage.FilterNode;
-import org.apache.pinot.query.planner.stage.MailboxSendNode;
-import org.apache.pinot.query.planner.stage.ProjectNode;
-import org.apache.pinot.query.planner.stage.SortNode;
-import org.apache.pinot.query.planner.stage.StageNode;
-import org.apache.pinot.query.planner.stage.TableScanNode;
-import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
-import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.config.table.TableType;
-import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.spi.metrics.PinotMetricUtils;
-import org.apache.pinot.spi.utils.builder.TableNameBuilder;
-import org.apache.pinot.sql.FilterKind;
-import 
org.apache.pinot.sql.parsers.rewriter.NonAggregationGroupByToDistinctQueryRewriter;
-import org.apache.pinot.sql.parsers.rewriter.PredicateComparisonRewriter;
-import org.apache.pinot.sql.parsers.rewriter.QueryRewriter;
-import org.apache.pinot.sql.parsers.rewriter.QueryRewriterFactory;
-
-
-/**
- * {@code ServerRequestUtils} converts the {@link DistributedStagePlan} into a 
{@link ServerQueryRequest}.
- *
- * <p>In order to reuse the current pinot {@link 
org.apache.pinot.core.query.executor.ServerQueryExecutorV1Impl}, a
- * conversion step is needed so that the V2 query plan can be converted into a 
compatible format to run V1 executor.
- */
-public class ServerRequestUtils {
-  private static final int DEFAULT_LEAF_NODE_LIMIT = 10_000_000;
-  private static final List<String> QUERY_REWRITERS_CLASS_NAMES =
-      ImmutableList.of(
-          PredicateComparisonRewriter.class.getName(),
-          NonAggregationGroupByToDistinctQueryRewriter.class.getName()
-      );
-  private static final List<QueryRewriter> QUERY_REWRITERS = new ArrayList<>(
-      QueryRewriterFactory.getQueryRewriters(QUERY_REWRITERS_CLASS_NAMES));
-  private static final QueryOptimizer QUERY_OPTIMIZER = new QueryOptimizer();
-
-  private ServerRequestUtils() {
-    // do not instantiate.
-  }
-
-  // TODO: This is a hack, make an actual ServerQueryRequest converter.
-  public static List<ServerQueryRequest> 
constructServerQueryRequest(DistributedStagePlan distributedStagePlan,
-      Map<String, String> requestMetadataMap, ZkHelixPropertyStore<ZNRecord> 
helixPropertyStore) {
-    StageMetadata stageMetadata = 
distributedStagePlan.getMetadataMap().get(distributedStagePlan.getStageId());
-    Preconditions.checkState(stageMetadata.getScannedTables().size() == 1,
-        "Server request for V2 engine should only have 1 scan table per 
request.");
-    String rawTableName = stageMetadata.getScannedTables().get(0);
-    Map<String, List<String>> tableToSegmentListMap = 
stageMetadata.getServerInstanceToSegmentsMap()
-        .get(distributedStagePlan.getServerInstance());
-    List<ServerQueryRequest> requests = new ArrayList<>();
-    for (Map.Entry<String, List<String>> tableEntry : 
tableToSegmentListMap.entrySet()) {
-      String tableType = tableEntry.getKey();
-      // ZkHelixPropertyStore extends from ZkCacheBaseDataAccessor so it 
should not cause too much out-of-the-box
-      // network traffic. but there's chance to improve this:
-      // TODO: use TableDataManager: it is already getting tableConfig and 
Schema when processing segments.
-      if (TableType.OFFLINE.name().equals(tableType)) {
-        TableConfig tableConfig = 
ZKMetadataProvider.getTableConfig(helixPropertyStore,
-            
TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(rawTableName));
-        Schema schema = ZKMetadataProvider.getTableSchema(helixPropertyStore,
-            
TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(rawTableName));
-        requests.add(constructServerQueryRequest(distributedStagePlan, 
requestMetadataMap, tableConfig, schema,
-            stageMetadata.getTimeBoundaryInfo(), TableType.OFFLINE, 
tableEntry.getValue()));
-      } else if (TableType.REALTIME.name().equals(tableType)) {
-        TableConfig tableConfig = 
ZKMetadataProvider.getTableConfig(helixPropertyStore,
-            
TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(rawTableName));
-        Schema schema = ZKMetadataProvider.getTableSchema(helixPropertyStore,
-            
TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(rawTableName));
-        requests.add(constructServerQueryRequest(distributedStagePlan, 
requestMetadataMap, tableConfig, schema,
-            stageMetadata.getTimeBoundaryInfo(), TableType.REALTIME, 
tableEntry.getValue()));
-      } else {
-        throw new IllegalArgumentException("Unsupported table type key: " + 
tableType);
-      }
-    }
-    return requests;
-  }
-
-  public static ServerQueryRequest 
constructServerQueryRequest(DistributedStagePlan distributedStagePlan,
-      Map<String, String> requestMetadataMap, TableConfig tableConfig, Schema 
schema,
-      TimeBoundaryInfo timeBoundaryInfo, TableType tableType, List<String> 
segmentList) {
-    InstanceRequest instanceRequest = new InstanceRequest();
-    
instanceRequest.setRequestId(Long.parseLong(requestMetadataMap.get("REQUEST_ID")));
-    instanceRequest.setBrokerId("unknown");
-    instanceRequest.setEnableTrace(false);
-    instanceRequest.setSearchSegments(segmentList);
-    instanceRequest.setQuery(constructBrokerRequest(distributedStagePlan, 
tableType, tableConfig, schema,
-        timeBoundaryInfo));
-    return new ServerQueryRequest(instanceRequest, new 
ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()),
-        System.currentTimeMillis());
-  }
-
-  // TODO: this is a hack, create a broker request object should not be needed 
because we rewrite the entire
-  // query into stages already.
-  public static BrokerRequest constructBrokerRequest(DistributedStagePlan 
distributedStagePlan, TableType tableType,
-      TableConfig tableConfig, Schema schema, TimeBoundaryInfo 
timeBoundaryInfo) {
-    PinotQuery pinotQuery = constructPinotQuery(distributedStagePlan, 
tableType, tableConfig, schema, timeBoundaryInfo);
-    BrokerRequest brokerRequest = new BrokerRequest();
-    brokerRequest.setPinotQuery(pinotQuery);
-    // Set table name in broker request because it is used for access control, 
query routing etc.
-    DataSource dataSource = pinotQuery.getDataSource();
-    if (dataSource != null) {
-      QuerySource querySource = new QuerySource();
-      querySource.setTableName(dataSource.getTableName());
-      brokerRequest.setQuerySource(querySource);
-    }
-    return brokerRequest;
-  }
-
-  public static PinotQuery constructPinotQuery(DistributedStagePlan 
distributedStagePlan, TableType tableType,
-      TableConfig tableConfig, Schema schema, TimeBoundaryInfo 
timeBoundaryInfo) {
-    PinotQuery pinotQuery = new PinotQuery();
-    pinotQuery.setLimit(DEFAULT_LEAF_NODE_LIMIT);
-    pinotQuery.setExplain(false);
-    walkStageTree(distributedStagePlan.getStageRoot(), pinotQuery, tableType);
-    if (timeBoundaryInfo != null) {
-      attachTimeBoundary(pinotQuery, timeBoundaryInfo, tableType == 
TableType.OFFLINE);
-    }
-    for (QueryRewriter queryRewriter : QUERY_REWRITERS) {
-      pinotQuery = queryRewriter.rewrite(pinotQuery);
-    }
-    QUERY_OPTIMIZER.optimize(pinotQuery, tableConfig, schema);
-    return pinotQuery;
-  }
-
-  private static void walkStageTree(StageNode node, PinotQuery pinotQuery, 
TableType tableType) {
-    // this walkStageTree should only be a sequential walk.
-    for (StageNode child : node.getInputs()) {
-      walkStageTree(child, pinotQuery, tableType);
-    }
-    if (node instanceof TableScanNode) {
-      TableScanNode tableScanNode = (TableScanNode) node;
-      DataSource dataSource = new DataSource();
-      String tableNameWithType = TableNameBuilder.forType(tableType)
-          
.tableNameWithType(TableNameBuilder.extractRawTableName(tableScanNode.getTableName()));
-      dataSource.setTableName(tableNameWithType);
-      pinotQuery.setDataSource(dataSource);
-      
pinotQuery.setSelectList(tableScanNode.getTableScanColumns().stream().map(RequestUtils::getIdentifierExpression)
-          .collect(Collectors.toList()));
-    } else if (node instanceof FilterNode) {
-      pinotQuery.setFilterExpression(CalciteRexExpressionParser.toExpression(
-          ((FilterNode) node).getCondition(), pinotQuery));
-    } else if (node instanceof ProjectNode) {
-      pinotQuery.setSelectList(CalciteRexExpressionParser.overwriteSelectList(
-          ((ProjectNode) node).getProjects(), pinotQuery));
-    } else if (node instanceof AggregateNode) {
-      // set group-by list
-      pinotQuery.setGroupByList(CalciteRexExpressionParser.convertGroupByList(
-          ((AggregateNode) node).getGroupSet(), pinotQuery));
-      // set agg list
-      
pinotQuery.setSelectList(CalciteRexExpressionParser.addSelectList(pinotQuery.getGroupByList(),
-          ((AggregateNode) node).getAggCalls(), pinotQuery));
-    } else if (node instanceof SortNode) {
-      if (((SortNode) node).getCollationKeys().size() > 0) {
-        
pinotQuery.setOrderByList(CalciteRexExpressionParser.convertOrderByList(((SortNode)
 node).getCollationKeys(),
-            ((SortNode) node).getCollationDirections(), pinotQuery));
-      }
-      if (((SortNode) node).getFetch() > 0) {
-        pinotQuery.setLimit(((SortNode) node).getFetch());
-      }
-      if (((SortNode) node).getOffset() > 0) {
-        pinotQuery.setOffset(((SortNode) node).getOffset());
-      }
-    } else if (node instanceof MailboxSendNode) {
-      // TODO: MailboxSendNode should be the root of the leaf stage. but 
ignore for now since it is handle seperately
-      // in QueryRunner as a single step sender.
-    } else {
-      throw new UnsupportedOperationException("Unsupported logical plan node: 
" + node);
-    }
-  }
-
-  /**
-   * Helper method to attach the time boundary to the given PinotQuery.
-   */
-  private static void attachTimeBoundary(PinotQuery pinotQuery, 
TimeBoundaryInfo timeBoundaryInfo,
-      boolean isOfflineRequest) {
-    String timeColumn = timeBoundaryInfo.getTimeColumn();
-    String timeValue = timeBoundaryInfo.getTimeValue();
-    Expression timeFilterExpression = RequestUtils.getFunctionExpression(
-        isOfflineRequest ? FilterKind.LESS_THAN_OR_EQUAL.name() : 
FilterKind.GREATER_THAN.name());
-    timeFilterExpression.getFunctionCall().setOperands(
-        Arrays.asList(RequestUtils.getIdentifierExpression(timeColumn), 
RequestUtils.getLiteralExpression(timeValue)));
-
-    Expression filterExpression = pinotQuery.getFilterExpression();
-    if (filterExpression != null) {
-      Expression andFilterExpression = 
RequestUtils.getFunctionExpression(FilterKind.AND.name());
-      
andFilterExpression.getFunctionCall().setOperands(Arrays.asList(filterExpression,
 timeFilterExpression));
-      pinotQuery.setFilterExpression(andFilterExpression);
-    } else {
-      pinotQuery.setFilterExpression(timeFilterExpression);
-    }
-  }
-}
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
index eb7e354b9e..3c985d77aa 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
@@ -26,6 +26,10 @@ import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import org.apache.pinot.common.utils.NamedThreadFactory;
+import org.apache.pinot.core.query.scheduler.resources.ResourceManager;
 import org.apache.pinot.core.transport.ServerInstance;
 import org.apache.pinot.query.planner.QueryPlan;
 import org.apache.pinot.query.planner.stage.MailboxReceiveNode;
@@ -38,6 +42,8 @@ import org.testng.annotations.Test;
 
 
 public class QueryRunnerTest extends QueryRunnerTestBase {
+  private static final ExecutorService EXECUTOR_SERVICE = 
Executors.newFixedThreadPool(
+      ResourceManager.DEFAULT_QUERY_WORKER_THREADS, new 
NamedThreadFactory("query_server_enclosure"));
 
   @Test(dataProvider = "testDataWithSqlToFinalRowCount")
   public void testSqlWithFinalRowCountChecker(String sql, int expectedRows)
@@ -71,7 +77,9 @@ public class QueryRunnerTest extends QueryRunnerTestBase {
         for (ServerInstance serverInstance : 
queryPlan.getStageMetadataMap().get(stageId).getServerInstances()) {
           DistributedStagePlan distributedStagePlan =
               QueryDispatcher.constructDistributedStagePlan(queryPlan, 
stageId, serverInstance);
-          _servers.get(serverInstance).processQuery(distributedStagePlan, 
requestMetadataMap);
+          EXECUTOR_SERVICE.submit(() -> {
+            _servers.get(serverInstance).processQuery(distributedStagePlan, 
requestMetadataMap);
+          });
         }
       }
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to