This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new b91fa406bc [Refactor] Return InstanceResponseBlock in QueryExecutor 
(#9561)
b91fa406bc is described below

commit b91fa406bcf7db3a88012614e549cd3f2ddaef3d
Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com>
AuthorDate: Mon Oct 10 16:55:23 2022 -0700

    [Refactor] Return InstanceResponseBlock in QueryExecutor (#9561)
---
 .../pinot/common/exception/QueryException.java     |   5 +
 .../common/datatable/DataTableBuilderFactory.java  |  20 ++
 .../common/datatable/DataTableBuilderUtils.java    | 195 ----------------
 .../core/operator/InstanceResponseOperator.java    |  19 +-
 .../operator/blocks/InstanceResponseBlock.java     | 118 +++++++++-
 .../blocks/results/AggregationResultsBlock.java    |  31 ++-
 .../operator/blocks/results/BaseResultsBlock.java  |  35 ++-
 .../blocks/results/DistinctResultsBlock.java       |  25 +-
 .../blocks/results/ExceptionResultsBlock.java      |  24 +-
 .../blocks/results/ExplainResultsBlock.java        |  93 ++++++++
 .../blocks/results/GroupByResultsBlock.java        |  51 ++++-
 .../blocks/results/MetadataResultsBlock.java       |  24 +-
 .../operator/blocks/results/ResultsBlockUtils.java | 119 ++++++++++
 .../blocks/results/SelectionResultsBlock.java      |  18 +-
 .../StreamingInstanceResponseOperator.java         |  21 +-
 .../apache/pinot/core/plan/GlobalPlanImplV0.java   |   5 +-
 .../main/java/org/apache/pinot/core/plan/Plan.java |   4 +-
 .../plan/StreamingInstanceResponsePlanNode.java    |   2 +-
 .../pinot/core/query/executor/QueryExecutor.java   |  47 +++-
 .../query/executor/ServerQueryExecutorV1Impl.java  | 251 +++++++++------------
 .../pinot/core/query/scheduler/QueryScheduler.java |  71 +++---
 .../query/selection/SelectionOperatorUtils.java    |   5 +-
 .../core/transport/InstanceRequestHandler.java     |  20 +-
 .../pinot/core/transport/grpc/GrpcQueryServer.java |  17 +-
 .../core/common/datatable/DataTableSerDeTest.java  |   2 +-
 .../blocks/results/ResultsBlockUtilsTest.java}     |  31 +--
 .../executor/QueryExecutorExceptionsTest.java      |   4 +-
 .../core/query/executor/QueryExecutorTest.java     |  27 ++-
 .../query/scheduler/PrioritySchedulerTest.java     |  10 +-
 .../pinot/core/transport/QueryRoutingTest.java     |   8 +-
 .../org/apache/pinot/queries/BaseQueriesTest.java  |  23 +-
 .../pinot/queries/ExplainPlanQueriesTest.java      |   9 +-
 .../queries/SegmentWithNullValueVectorTest.java    |  27 ++-
 .../apache/pinot/query/runtime/QueryRunner.java    |  12 +-
 34 files changed, 812 insertions(+), 561 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/exception/QueryException.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/exception/QueryException.java
index e09a2ff238..0009cee884 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/exception/QueryException.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/exception/QueryException.java
@@ -25,6 +25,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.pinot.common.response.ProcessingException;
 
 
+// TODO: Clean up ProcessingException (thrift) because we don't send it 
through the wire
 public class QueryException {
   private QueryException() {
   }
@@ -61,6 +62,7 @@ public class QueryException {
   public static final int SERVER_SEGMENT_MISSING_ERROR_CODE = 235;
   public static final int QUERY_SCHEDULING_TIMEOUT_ERROR_CODE = 240;
   public static final int EXECUTION_TIMEOUT_ERROR_CODE = 250;
+  public static final int DATA_TABLE_SERIALIZATION_ERROR_CODE = 260;
   public static final int BROKER_GATHER_ERROR_CODE = 300;
   public static final int BROKER_SEGMENT_UNAVAILABLE_ERROR_CODE = 305;
   public static final int DATA_TABLE_DESERIALIZATION_ERROR_CODE = 310;
@@ -105,6 +107,8 @@ public class QueryException {
       new ProcessingException(QUERY_SCHEDULING_TIMEOUT_ERROR_CODE);
   public static final ProcessingException EXECUTION_TIMEOUT_ERROR =
       new ProcessingException(EXECUTION_TIMEOUT_ERROR_CODE);
+  public static final ProcessingException DATA_TABLE_SERIALIZATION_ERROR =
+      new ProcessingException(DATA_TABLE_SERIALIZATION_ERROR_CODE);
   public static final ProcessingException BROKER_GATHER_ERROR = new 
ProcessingException(BROKER_GATHER_ERROR_CODE);
   public static final ProcessingException DATA_TABLE_DESERIALIZATION_ERROR =
       new ProcessingException(DATA_TABLE_DESERIALIZATION_ERROR_CODE);
@@ -142,6 +146,7 @@ public class QueryException {
     SERVER_SEGMENT_MISSING_ERROR.setMessage("ServerSegmentMissing");
     QUERY_SCHEDULING_TIMEOUT_ERROR.setMessage("QuerySchedulingTimeoutError");
     EXECUTION_TIMEOUT_ERROR.setMessage("ExecutionTimeoutError");
+    DATA_TABLE_DESERIALIZATION_ERROR.setMessage("DataTableSerializationError");
     BROKER_GATHER_ERROR.setMessage("BrokerGatherError");
     
DATA_TABLE_DESERIALIZATION_ERROR.setMessage("DataTableDeserializationError");
     FUTURE_CALL_ERROR.setMessage("FutureCallError");
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableBuilderFactory.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableBuilderFactory.java
index 5d9e616974..2bf0426a78 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableBuilderFactory.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableBuilderFactory.java
@@ -18,7 +18,11 @@
  */
 package org.apache.pinot.core.common.datatable;
 
+import org.apache.pinot.common.datatable.DataTable;
 import org.apache.pinot.common.datatable.DataTableFactory;
+import org.apache.pinot.common.datatable.DataTableImplV2;
+import org.apache.pinot.common.datatable.DataTableImplV3;
+import org.apache.pinot.common.datatable.DataTableImplV4;
 import org.apache.pinot.common.utils.DataSchema;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -58,4 +62,20 @@ public class DataTableBuilderFactory {
         throw new IllegalStateException("Unsupported data table version: " + 
_version);
     }
   }
+
+  /**
+   * Returns an empty data table without data.
+   */
+  public static DataTable getEmptyDataTable() {
+    switch (_version) {
+      case DataTableFactory.VERSION_2:
+        return new DataTableImplV2();
+      case DataTableFactory.VERSION_3:
+        return new DataTableImplV3();
+      case DataTableFactory.VERSION_4:
+        return new DataTableImplV4();
+      default:
+        throw new IllegalStateException("Unsupported data table version: " + 
_version);
+    }
+  }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableBuilderUtils.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableBuilderUtils.java
deleted file mode 100644
index bc631af2fe..0000000000
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableBuilderUtils.java
+++ /dev/null
@@ -1,195 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.common.datatable;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import org.apache.pinot.common.datatable.DataTable;
-import org.apache.pinot.common.datatable.DataTableFactory;
-import org.apache.pinot.common.datatable.DataTableImplV2;
-import org.apache.pinot.common.datatable.DataTableImplV3;
-import org.apache.pinot.common.datatable.DataTableImplV4;
-import org.apache.pinot.common.request.context.ExpressionContext;
-import org.apache.pinot.common.utils.DataSchema;
-import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
-import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
-import 
org.apache.pinot.core.query.aggregation.function.DistinctAggregationFunction;
-import org.apache.pinot.core.query.distinct.DistinctTable;
-import org.apache.pinot.core.query.request.context.QueryContext;
-import org.apache.pinot.core.query.request.context.utils.QueryContextUtils;
-
-
-/**
- * The <code>DataTableUtils</code> class provides utility methods for data 
table.
- */
-@SuppressWarnings("rawtypes")
-public class DataTableBuilderUtils {
-  private DataTableBuilderUtils() {
-  }
-
-  /**
-   * Returns an empty data table without data.
-   */
-  public static DataTable getEmptyDataTable() {
-    int version = DataTableBuilderFactory.getDataTableVersion();
-    switch (version) {
-      case DataTableFactory.VERSION_2:
-        return new DataTableImplV2();
-      case DataTableFactory.VERSION_3:
-        return new DataTableImplV3();
-      case DataTableFactory.VERSION_4:
-        return new DataTableImplV4();
-      default:
-        throw new IllegalStateException("Unsupported data table version: " + 
version);
-    }
-  }
-
-  /**
-   * Builds an empty data table based on the broker request.
-   */
-  public static DataTable buildEmptyDataTable(QueryContext queryContext)
-      throws IOException {
-    if (QueryContextUtils.isSelectionQuery(queryContext)) {
-      return buildEmptyDataTableForSelectionQuery(queryContext);
-    } else if (QueryContextUtils.isAggregationQuery(queryContext)) {
-      return buildEmptyDataTableForAggregationQuery(queryContext);
-    } else {
-      assert QueryContextUtils.isDistinctQuery(queryContext);
-      return buildEmptyDataTableForDistinctQuery(queryContext);
-    }
-  }
-
-  /**
-   * Helper method to build an empty data table for selection query.
-   */
-  private static DataTable buildEmptyDataTableForSelectionQuery(QueryContext 
queryContext) {
-    List<ExpressionContext> selectExpressions = 
queryContext.getSelectExpressions();
-    int numSelectExpressions = selectExpressions.size();
-    String[] columnNames = new String[numSelectExpressions];
-    for (int i = 0; i < numSelectExpressions; i++) {
-      columnNames[i] = selectExpressions.get(i).toString();
-    }
-    ColumnDataType[] columnDataTypes = new 
ColumnDataType[numSelectExpressions];
-    // NOTE: Use STRING column data type as default for selection query
-    Arrays.fill(columnDataTypes, ColumnDataType.STRING);
-    DataSchema dataSchema = new DataSchema(columnNames, columnDataTypes);
-    return DataTableBuilderFactory.getDataTableBuilder(dataSchema).build();
-  }
-
-  /**
-   * Helper method to build an empty data table for aggregation query.
-   */
-  private static DataTable buildEmptyDataTableForAggregationQuery(QueryContext 
queryContext)
-      throws IOException {
-    AggregationFunction[] aggregationFunctions = 
queryContext.getAggregationFunctions();
-    assert aggregationFunctions != null;
-    int numAggregations = aggregationFunctions.length;
-    List<ExpressionContext> groupByExpressions = 
queryContext.getGroupByExpressions();
-    if (groupByExpressions != null) {
-      // Aggregation group-by query
-
-      int numColumns = groupByExpressions.size() + numAggregations;
-      String[] columnNames = new String[numColumns];
-      ColumnDataType[] columnDataTypes = new ColumnDataType[numColumns];
-      int index = 0;
-      for (ExpressionContext groupByExpression : groupByExpressions) {
-        columnNames[index] = groupByExpression.toString();
-        // Use STRING column data type as default for group-by expressions
-        columnDataTypes[index] = ColumnDataType.STRING;
-        index++;
-      }
-      for (AggregationFunction aggregationFunction : aggregationFunctions) {
-        // NOTE: Use AggregationFunction.getResultColumnName() for SQL format 
response
-        columnNames[index] = aggregationFunction.getResultColumnName();
-        columnDataTypes[index] = 
aggregationFunction.getIntermediateResultColumnType();
-        index++;
-      }
-      return DataTableBuilderFactory.getDataTableBuilder(new 
DataSchema(columnNames, columnDataTypes)).build();
-    } else {
-      // Aggregation only query
-
-      String[] aggregationColumnNames = new String[numAggregations];
-      ColumnDataType[] columnDataTypes = new ColumnDataType[numAggregations];
-      Object[] aggregationResults = new Object[numAggregations];
-      for (int i = 0; i < numAggregations; i++) {
-        AggregationFunction aggregationFunction = aggregationFunctions[i];
-        // NOTE: For backward-compatibility, use 
AggregationFunction.getColumnName() for aggregation only query
-        aggregationColumnNames[i] = aggregationFunction.getColumnName();
-        columnDataTypes[i] = 
aggregationFunction.getIntermediateResultColumnType();
-        aggregationResults[i] =
-            
aggregationFunction.extractAggregationResult(aggregationFunction.createAggregationResultHolder());
-      }
-
-      // Build the data table
-      DataTableBuilder dataTableBuilder =
-          DataTableBuilderFactory.getDataTableBuilder(new 
DataSchema(aggregationColumnNames, columnDataTypes));
-      dataTableBuilder.startRow();
-      for (int i = 0; i < numAggregations; i++) {
-        switch (columnDataTypes[i]) {
-          case LONG:
-            dataTableBuilder.setColumn(i, ((Number) 
aggregationResults[i]).longValue());
-            break;
-          case DOUBLE:
-            dataTableBuilder.setColumn(i, ((Double) 
aggregationResults[i]).doubleValue());
-            break;
-          case OBJECT:
-            dataTableBuilder.setColumn(i, aggregationResults[i]);
-            break;
-          default:
-            throw new UnsupportedOperationException(
-                "Unsupported aggregation column data type: " + 
columnDataTypes[i] + " for column: "
-                    + aggregationColumnNames[i]);
-        }
-      }
-      dataTableBuilder.finishRow();
-      return dataTableBuilder.build();
-    }
-  }
-
-  /**
-   * Helper method to build an empty data table for distinct query.
-   */
-  private static DataTable buildEmptyDataTableForDistinctQuery(QueryContext 
queryContext)
-      throws IOException {
-    AggregationFunction[] aggregationFunctions = 
queryContext.getAggregationFunctions();
-    assert aggregationFunctions != null && aggregationFunctions.length == 1
-        && aggregationFunctions[0] instanceof DistinctAggregationFunction;
-    DistinctAggregationFunction distinctAggregationFunction = 
(DistinctAggregationFunction) aggregationFunctions[0];
-
-    // Create the distinct table
-    String[] columnNames = distinctAggregationFunction.getColumns();
-    ColumnDataType[] columnDataTypes = new ColumnDataType[columnNames.length];
-    // NOTE: Use STRING column data type as default for distinct query
-    Arrays.fill(columnDataTypes, ColumnDataType.STRING);
-    DistinctTable distinctTable =
-        new DistinctTable(new DataSchema(columnNames, columnDataTypes), 
Collections.emptySet(),
-            queryContext.isNullHandlingEnabled());
-
-    // Build the data table
-    DataTableBuilder dataTableBuilder = 
DataTableBuilderFactory.getDataTableBuilder(
-        new DataSchema(new 
String[]{distinctAggregationFunction.getColumnName()},
-            new ColumnDataType[]{ColumnDataType.OBJECT}));
-    dataTableBuilder.startRow();
-    dataTableBuilder.setColumn(0, distinctTable);
-    dataTableBuilder.finishRow();
-    return dataTableBuilder.build();
-  }
-}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/InstanceResponseOperator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/InstanceResponseOperator.java
index 6bfdb3717e..a1b317d66c 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/InstanceResponseOperator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/InstanceResponseOperator.java
@@ -20,8 +20,6 @@ package org.apache.pinot.core.operator;
 
 import java.util.Collections;
 import java.util.List;
-import java.util.Map;
-import org.apache.pinot.common.datatable.DataTable;
 import org.apache.pinot.common.datatable.DataTable.MetadataKey;
 import org.apache.pinot.common.request.context.ThreadTimer;
 import org.apache.pinot.core.common.Operator;
@@ -83,7 +81,7 @@ public class InstanceResponseOperator extends 
BaseOperator<InstanceResponseBlock
 
       ThreadTimer mainThreadTimer = new ThreadTimer();
       BaseResultsBlock resultsBlock = getCombinedResults();
-      InstanceResponseBlock instanceResponseBlock = new 
InstanceResponseBlock(getDataTable(resultsBlock));
+      InstanceResponseBlock instanceResponseBlock = new 
InstanceResponseBlock(resultsBlock, _queryContext);
       long mainThreadCpuTimeNs = mainThreadTimer.getThreadTimeNs();
 
       long totalWallClockTimeNs = System.nanoTime() - startWallClockTimeNs;
@@ -99,14 +97,13 @@ public class InstanceResponseOperator extends 
BaseOperator<InstanceResponseBlock
               numServerThreads);
 
       long threadCpuTimeNs = mainThreadCpuTimeNs + multipleThreadCpuTimeNs;
-      Map<String, String> responseMetaData = 
instanceResponseBlock.getInstanceResponseDataTable().getMetadata();
-      responseMetaData.put(MetadataKey.THREAD_CPU_TIME_NS.getName(), 
String.valueOf(threadCpuTimeNs));
-      responseMetaData.put(MetadataKey.SYSTEM_ACTIVITIES_CPU_TIME_NS.getName(),
+      
instanceResponseBlock.addMetadata(MetadataKey.THREAD_CPU_TIME_NS.getName(), 
String.valueOf(threadCpuTimeNs));
+      
instanceResponseBlock.addMetadata(MetadataKey.SYSTEM_ACTIVITIES_CPU_TIME_NS.getName(),
           String.valueOf(systemActivitiesCpuTimeNs));
 
       return instanceResponseBlock;
     } else {
-      return new InstanceResponseBlock(getDataTable(getCombinedResults()));
+      return new InstanceResponseBlock(getCombinedResults(), _queryContext);
     }
   }
 
@@ -119,14 +116,6 @@ public class InstanceResponseOperator extends 
BaseOperator<InstanceResponseBlock
     }
   }
 
-  private DataTable getDataTable(BaseResultsBlock resultsBlock) {
-    try {
-      return resultsBlock.getDataTable(_queryContext);
-    } catch (Exception e) {
-      throw new RuntimeException("Caught exception while building data table", 
e);
-    }
-  }
-
   private void prefetchAll() {
     for (int i = 0; i < _fetchContextSize; i++) {
       _indexSegments.get(i).prefetch(_fetchContexts.get(i));
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/InstanceResponseBlock.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/InstanceResponseBlock.java
index 45170ff0d2..4708d5cf83 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/InstanceResponseBlock.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/InstanceResponseBlock.java
@@ -18,26 +18,132 @@
  */
 package org.apache.pinot.core.operator.blocks;
 
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
 import org.apache.pinot.common.datatable.DataTable;
+import org.apache.pinot.common.response.ProcessingException;
+import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.core.common.Block;
 import org.apache.pinot.core.common.BlockDocIdSet;
 import org.apache.pinot.core.common.BlockDocIdValueSet;
 import org.apache.pinot.core.common.BlockMetadata;
 import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.datatable.DataTableBuilderFactory;
+import org.apache.pinot.core.operator.blocks.results.BaseResultsBlock;
+import org.apache.pinot.core.query.request.context.QueryContext;
 
 
 /**
- * InstanceResponseBlock is just a holder to get InstanceResponse from 
InstanceResponseBlock.
+ * The {@code InstanceResponseBlock} is the holder of the server side results.
  */
 public class InstanceResponseBlock implements Block {
-  private final DataTable _instanceResponseDataTable;
+  private final BaseResultsBlock _resultsBlock;
+  private final QueryContext _queryContext;
+  private final Map<Integer, String> _exceptions;
+  private final Map<String, String> _metadata;
 
-  public InstanceResponseBlock(DataTable dataTable) {
-    _instanceResponseDataTable = dataTable;
+  public InstanceResponseBlock(BaseResultsBlock resultsBlock, QueryContext 
queryContext) {
+    _resultsBlock = resultsBlock;
+    _queryContext = queryContext;
+    _exceptions = new HashMap<>();
+    List<ProcessingException> processingExceptions = 
resultsBlock.getProcessingExceptions();
+    if (processingExceptions != null) {
+      for (ProcessingException processingException : processingExceptions) {
+        _exceptions.put(processingException.getErrorCode(), 
processingException.getMessage());
+      }
+    }
+    _metadata = resultsBlock.getResultsMetadata();
   }
 
-  public DataTable getInstanceResponseDataTable() {
-    return _instanceResponseDataTable;
+  /**
+   * Metadata only instance response.
+   */
+  public InstanceResponseBlock() {
+    _resultsBlock = null;
+    _queryContext = null;
+    _exceptions = new HashMap<>();
+    _metadata = new HashMap<>();
+  }
+
+  private InstanceResponseBlock(Map<Integer, String> exceptions, Map<String, 
String> metadata) {
+    _resultsBlock = null;
+    _queryContext = null;
+    _exceptions = exceptions;
+    _metadata = metadata;
+  }
+
+  public InstanceResponseBlock toMetadataOnlyResponseBlock() {
+    return new InstanceResponseBlock(_exceptions, _metadata);
+  }
+
+  public void addException(ProcessingException processingException) {
+    _exceptions.put(processingException.getErrorCode(), 
processingException.getMessage());
+  }
+
+  public void addException(int errorCode, String exceptionMessage) {
+    _exceptions.put(errorCode, exceptionMessage);
+  }
+
+  public void addMetadata(String key, String value) {
+    _metadata.put(key, value);
+  }
+
+  @Nullable
+  public BaseResultsBlock getResultsBlock() {
+    return _resultsBlock;
+  }
+
+  @Nullable
+  public QueryContext getQueryContext() {
+    return _queryContext;
+  }
+
+  public Map<Integer, String> getExceptions() {
+    return _exceptions;
+  }
+
+  public Map<String, String> getResponseMetadata() {
+    return _metadata;
+  }
+
+  @Nullable
+  public DataSchema getDataSchema() {
+    return _resultsBlock != null ? _resultsBlock.getDataSchema(_queryContext) 
: null;
+  }
+
+  @Nullable
+  public Collection<Object[]> getRows() {
+    return _resultsBlock != null ? _resultsBlock.getRows(_queryContext) : null;
+  }
+
+  public DataTable toDataTable()
+      throws IOException {
+    DataTable dataTable = toDataOnlyDataTable();
+    attachMetadata(dataTable);
+    return dataTable;
+  }
+
+  public DataTable toDataOnlyDataTable()
+      throws IOException {
+    return _resultsBlock != null ? _resultsBlock.getDataTable(_queryContext)
+        : DataTableBuilderFactory.getEmptyDataTable();
+  }
+
+  public DataTable toMetadataOnlyDataTable() {
+    DataTable dataTable = DataTableBuilderFactory.getEmptyDataTable();
+    attachMetadata(dataTable);
+    return dataTable;
+  }
+
+  private void attachMetadata(DataTable dataTable) {
+    for (Map.Entry<Integer, String> entry : _exceptions.entrySet()) {
+      dataTable.addException(entry.getKey(), entry.getValue());
+    }
+    dataTable.getMetadata().putAll(_metadata);
   }
 
   @Override
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/AggregationResultsBlock.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/AggregationResultsBlock.java
index 10c641713e..3724b645e3 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/AggregationResultsBlock.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/AggregationResultsBlock.java
@@ -21,6 +21,8 @@ package org.apache.pinot.core.operator.blocks.results;
 import it.unimi.dsi.fastutil.doubles.DoubleArrayList;
 import java.io.IOException;
 import java.math.BigDecimal;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import org.apache.pinot.common.datatable.DataTable;
 import org.apache.pinot.common.utils.DataSchema;
@@ -55,11 +57,8 @@ public class AggregationResultsBlock extends 
BaseResultsBlock {
   }
 
   @Override
-  public DataTable getDataTable(QueryContext queryContext)
-      throws Exception {
+  public DataSchema getDataSchema(QueryContext queryContext) {
     boolean returnFinalResult = queryContext.isServerReturnFinalResult();
-
-    // Extract result column name and type from each aggregation function
     int numColumns = _aggregationFunctions.length;
     String[] columnNames = new String[numColumns];
     ColumnDataType[] columnDataTypes = new ColumnDataType[numColumns];
@@ -69,10 +68,23 @@ public class AggregationResultsBlock extends 
BaseResultsBlock {
       columnDataTypes[i] = returnFinalResult ? 
aggregationFunction.getFinalResultColumnType()
           : aggregationFunction.getIntermediateResultColumnType();
     }
+    return new DataSchema(columnNames, columnDataTypes);
+  }
+
+  @Override
+  public Collection<Object[]> getRows(QueryContext queryContext) {
+    return Collections.singletonList(_results.toArray());
+  }
 
-    // Build the data table.
-    DataTableBuilder dataTableBuilder =
-        DataTableBuilderFactory.getDataTableBuilder(new 
DataSchema(columnNames, columnDataTypes));
+  @Override
+  public DataTable getDataTable(QueryContext queryContext)
+      throws IOException {
+    boolean returnFinalResult = queryContext.isServerReturnFinalResult();
+    DataSchema dataSchema = getDataSchema(queryContext);
+    assert dataSchema != null;
+    ColumnDataType[] columnDataTypes = dataSchema.getColumnDataTypes();
+    int numColumns = columnDataTypes.length;
+    DataTableBuilder dataTableBuilder = 
DataTableBuilderFactory.getDataTableBuilder(dataSchema);
     if (queryContext.isNullHandlingEnabled()) {
       RoaringBitmap[] nullBitmaps = new RoaringBitmap[numColumns];
       for (int i = 0; i < numColumns; i++) {
@@ -108,10 +120,7 @@ public class AggregationResultsBlock extends 
BaseResultsBlock {
       }
       dataTableBuilder.finishRow();
     }
-
-    DataTable dataTable = dataTableBuilder.build();
-    attachMetadataToDataTable(dataTable);
-    return dataTable;
+    return dataTableBuilder.build();
   }
 
   private void setIntermediateResult(DataTableBuilder dataTableBuilder, 
ColumnDataType[] columnDataTypes, int index,
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/BaseResultsBlock.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/BaseResultsBlock.java
index 24573e04c1..e48e50b89d 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/BaseResultsBlock.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/BaseResultsBlock.java
@@ -19,14 +19,17 @@
 package org.apache.pinot.core.operator.blocks.results;
 
 import com.google.common.annotations.VisibleForTesting;
+import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import javax.annotation.Nullable;
-import org.apache.commons.collections.CollectionUtils;
 import org.apache.pinot.common.datatable.DataTable;
 import org.apache.pinot.common.datatable.DataTable.MetadataKey;
 import org.apache.pinot.common.response.ProcessingException;
+import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.core.common.Block;
 import org.apache.pinot.core.common.BlockDocIdSet;
 import org.apache.pinot.core.common.BlockDocIdValueSet;
@@ -155,16 +158,29 @@ public abstract class BaseResultsBlock implements Block {
     _numServerThreads = numServerThreads;
   }
 
+  /**
+   * Returns the data schema for the results. Return {@code null} when the 
block only contains metadata.
+   */
+  @Nullable
+  public abstract DataSchema getDataSchema(QueryContext queryContext);
+
+  /**
+   * Returns the rows for the results. Return {@code null} when the block only 
contains metadata.
+   */
+  @Nullable
+  public abstract Collection<Object[]> getRows(QueryContext queryContext);
+
+  /**
+   * Returns a data table without metadata or exception attached.
+   */
   public abstract DataTable getDataTable(QueryContext queryContext)
-      throws Exception;
+      throws IOException;
 
-  protected void attachMetadataToDataTable(DataTable dataTable) {
-    if (CollectionUtils.isNotEmpty(_processingExceptions)) {
-      for (ProcessingException exception : _processingExceptions) {
-        dataTable.addException(exception);
-      }
-    }
-    Map<String, String> metadata = dataTable.getMetadata();
+  /**
+   * Returns the metadata for the results.
+   */
+  public Map<String, String> getResultsMetadata() {
+    Map<String, String> metadata = new HashMap<>();
     metadata.put(MetadataKey.TOTAL_DOCS.getName(), 
Long.toString(_numTotalDocs));
     metadata.put(MetadataKey.NUM_DOCS_SCANNED.getName(), 
Long.toString(_numDocsScanned));
     metadata.put(MetadataKey.NUM_ENTRIES_SCANNED_IN_FILTER.getName(), 
Long.toString(_numEntriesScannedInFilter));
@@ -174,6 +190,7 @@ public abstract class BaseResultsBlock implements Block {
     metadata.put(MetadataKey.NUM_CONSUMING_SEGMENTS_PROCESSED.getName(),
         Integer.toString(_numConsumingSegmentsProcessed));
     metadata.put(MetadataKey.NUM_CONSUMING_SEGMENTS_MATCHED.getName(), 
Integer.toString(_numConsumingSegmentsMatched));
+    return metadata;
   }
 
   @Override
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/DistinctResultsBlock.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/DistinctResultsBlock.java
index b721671726..8db1dfc474 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/DistinctResultsBlock.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/DistinctResultsBlock.java
@@ -18,11 +18,16 @@
  */
 package org.apache.pinot.core.operator.blocks.results;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
 import org.apache.pinot.common.datatable.DataTable;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
 import org.apache.pinot.core.common.datatable.DataTableBuilder;
 import org.apache.pinot.core.common.datatable.DataTableBuilderFactory;
+import org.apache.pinot.core.data.table.Record;
 import 
org.apache.pinot.core.query.aggregation.function.DistinctAggregationFunction;
 import org.apache.pinot.core.query.distinct.DistinctTable;
 import org.apache.pinot.core.query.request.context.QueryContext;
@@ -48,9 +53,23 @@ public class DistinctResultsBlock extends BaseResultsBlock {
     _distinctTable = distinctTable;
   }
 
+  @Override
+  public DataSchema getDataSchema(QueryContext queryContext) {
+    return _distinctTable.getDataSchema();
+  }
+
+  @Override
+  public Collection<Object[]> getRows(QueryContext queryContext) {
+    List<Object[]> rows = new ArrayList<>(_distinctTable.size());
+    for (Record record : _distinctTable.getRecords()) {
+      rows.add(record.getValues());
+    }
+    return rows;
+  }
+
   @Override
   public DataTable getDataTable(QueryContext queryContext)
-      throws Exception {
+      throws IOException {
     String[] columnNames = new String[]{_distinctFunction.getColumnName()};
     ColumnDataType[] columnDataTypes = new 
ColumnDataType[]{ColumnDataType.OBJECT};
     DataTableBuilder dataTableBuilder =
@@ -58,8 +77,6 @@ public class DistinctResultsBlock extends BaseResultsBlock {
     dataTableBuilder.startRow();
     dataTableBuilder.setColumn(0, _distinctTable);
     dataTableBuilder.finishRow();
-    DataTable dataTable = dataTableBuilder.build();
-    attachMetadataToDataTable(dataTable);
-    return dataTable;
+    return dataTableBuilder.build();
   }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/ExceptionResultsBlock.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/ExceptionResultsBlock.java
index 23a6a2120d..d8df97ee7c 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/ExceptionResultsBlock.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/ExceptionResultsBlock.java
@@ -18,10 +18,13 @@
  */
 package org.apache.pinot.core.operator.blocks.results;
 
+import java.util.Collection;
+import javax.annotation.Nullable;
 import org.apache.pinot.common.datatable.DataTable;
 import org.apache.pinot.common.exception.QueryException;
 import org.apache.pinot.common.response.ProcessingException;
-import org.apache.pinot.core.common.datatable.DataTableBuilderUtils;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.datatable.DataTableBuilderFactory;
 import org.apache.pinot.core.query.request.context.QueryContext;
 
 
@@ -35,11 +38,20 @@ public class ExceptionResultsBlock extends BaseResultsBlock 
{
     this(QueryException.QUERY_EXECUTION_ERROR, e);
   }
 
+  @Nullable
   @Override
-  public DataTable getDataTable(QueryContext queryContext)
-      throws Exception {
-    DataTable dataTable = DataTableBuilderUtils.getEmptyDataTable();
-    attachMetadataToDataTable(dataTable);
-    return dataTable;
+  public DataSchema getDataSchema(QueryContext queryContext) {
+    return null;
+  }
+
+  @Nullable
+  @Override
+  public Collection<Object[]> getRows(QueryContext queryContext) {
+    return null;
+  }
+
+  @Override
+  public DataTable getDataTable(QueryContext queryContext) {
+    return DataTableBuilderFactory.getEmptyDataTable();
   }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/ExplainResultsBlock.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/ExplainResultsBlock.java
new file mode 100644
index 0000000000..e229848591
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/ExplainResultsBlock.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.blocks.results;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.common.datatable.DataTable;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.datatable.DataTableBuilder;
+import org.apache.pinot.core.common.datatable.DataTableBuilderFactory;
+import org.apache.pinot.core.query.request.context.QueryContext;
+
+
+/**
+ * Results block for EXPLAIN queries.
+ */
+public class ExplainResultsBlock extends BaseResultsBlock {
+  private final List<ExplainEntry> _entries = new ArrayList<>();
+
+  public void addOperator(String operatorName, int operatorId, int parentId) {
+    _entries.add(new ExplainEntry(operatorName, operatorId, parentId));
+  }
+
+  @Override
+  public DataSchema getDataSchema(QueryContext queryContext) {
+    return DataSchema.EXPLAIN_RESULT_SCHEMA;
+  }
+
+  @Override
+  public Collection<Object[]> getRows(QueryContext queryContext) {
+    List<Object[]> rows = new ArrayList<>(_entries.size());
+    for (ExplainEntry entry : _entries) {
+      rows.add(entry.toRow());
+    }
+    return rows;
+  }
+
+  @Override
+  public DataTable getDataTable(QueryContext queryContext)
+      throws IOException {
+    DataTableBuilder dataTableBuilder = 
DataTableBuilderFactory.getDataTableBuilder(DataSchema.EXPLAIN_RESULT_SCHEMA);
+    for (ExplainEntry entry : _entries) {
+      dataTableBuilder.startRow();
+      dataTableBuilder.setColumn(0, entry._operatorName);
+      dataTableBuilder.setColumn(1, entry._operatorId);
+      dataTableBuilder.setColumn(2, entry._parentId);
+      dataTableBuilder.finishRow();
+    }
+    return dataTableBuilder.build();
+  }
+
+  @Override
+  public Map<String, String> getResultsMetadata() {
+    // Do not add metadata for EXPLAIN results
+    return new HashMap<>();
+  }
+
+  private static class ExplainEntry {
+    final String _operatorName;
+    final int _operatorId;
+    final int _parentId;
+
+    ExplainEntry(String operatorName, int operatorId, int parentId) {
+      _operatorName = operatorName;
+      _operatorId = operatorId;
+      _parentId = parentId;
+    }
+
+    Object[] toRow() {
+      return new Object[]{_operatorName, _parentId, _parentId};
+    }
+  }
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/GroupByResultsBlock.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/GroupByResultsBlock.java
index 3cd88b95cc..ffc5109f14 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/GroupByResultsBlock.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/GroupByResultsBlock.java
@@ -18,13 +18,16 @@
  */
 package org.apache.pinot.core.operator.blocks.results;
 
-import com.google.common.base.Preconditions;
 import it.unimi.dsi.fastutil.doubles.DoubleArrayList;
 import java.io.IOException;
 import java.math.BigDecimal;
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
+import javax.annotation.Nullable;
 import org.apache.pinot.common.datatable.DataTable;
 import org.apache.pinot.common.datatable.DataTable.MetadataKey;
 import org.apache.pinot.common.utils.DataSchema;
@@ -83,6 +86,16 @@ public class GroupByResultsBlock extends BaseResultsBlock {
     _table = table;
   }
 
+  /**
+   * For instance level empty group-by results.
+   */
+  public GroupByResultsBlock(DataSchema dataSchema) {
+    _dataSchema = dataSchema;
+    _aggregationGroupByResult = null;
+    _intermediateRecords = null;
+    _table = null;
+  }
+
   public DataSchema getDataSchema() {
     return _dataSchema;
   }
@@ -123,11 +136,33 @@ public class GroupByResultsBlock extends BaseResultsBlock 
{
     _resizeTimeMs = resizeTimeMs;
   }
 
+  @Nullable
+  @Override
+  public DataSchema getDataSchema(QueryContext queryContext) {
+    return _dataSchema;
+  }
+
+  @Nullable
+  @Override
+  public Collection<Object[]> getRows(QueryContext queryContext) {
+    if (_table == null) {
+      return Collections.emptyList();
+    }
+    List<Object[]> rows = new ArrayList<>(_table.size());
+    Iterator<Record> iterator = _table.iterator();
+    while (iterator.hasNext()) {
+      rows.add(iterator.next().getValues());
+    }
+    return rows;
+  }
+
   @Override
   public DataTable getDataTable(QueryContext queryContext)
-      throws Exception {
-    Preconditions.checkState(_table != null, "Cannot get DataTable from 
segment level results");
+      throws IOException {
     DataTableBuilder dataTableBuilder = 
DataTableBuilderFactory.getDataTableBuilder(_dataSchema);
+    if (_table == null) {
+      return dataTableBuilder.build();
+    }
     ColumnDataType[] storedColumnDataTypes = 
_dataSchema.getStoredColumnDataTypes();
     int numColumns = _dataSchema.size();
     Iterator<Record> iterator = _table.iterator();
@@ -166,9 +201,7 @@ public class GroupByResultsBlock extends BaseResultsBlock {
         dataTableBuilder.finishRow();
       }
     }
-    DataTable dataTable = dataTableBuilder.build();
-    attachMetadataToDataTable(dataTable);
-    return dataTable;
+    return dataTableBuilder.build();
   }
 
   private void setDataTableColumn(ColumnDataType storedColumnDataType, 
DataTableBuilder dataTableBuilder,
@@ -224,13 +257,13 @@ public class GroupByResultsBlock extends BaseResultsBlock 
{
   }
 
   @Override
-  protected void attachMetadataToDataTable(DataTable dataTable) {
-    super.attachMetadataToDataTable(dataTable);
-    Map<String, String> metadata = dataTable.getMetadata();
+  public Map<String, String> getResultsMetadata() {
+    Map<String, String> metadata = super.getResultsMetadata();
     if (_numGroupsLimitReached) {
       metadata.put(MetadataKey.NUM_GROUPS_LIMIT_REACHED.getName(), "true");
     }
     metadata.put(MetadataKey.NUM_RESIZES.getName(), 
Integer.toString(_numResizes));
     metadata.put(MetadataKey.RESIZE_TIME_MS.getName(), 
Long.toString(_resizeTimeMs));
+    return metadata;
   }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/MetadataResultsBlock.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/MetadataResultsBlock.java
index 7ffeb0643d..dcfddf17cf 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/MetadataResultsBlock.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/MetadataResultsBlock.java
@@ -18,18 +18,30 @@
  */
 package org.apache.pinot.core.operator.blocks.results;
 
+import java.util.Collection;
+import javax.annotation.Nullable;
 import org.apache.pinot.common.datatable.DataTable;
-import org.apache.pinot.core.common.datatable.DataTableBuilderUtils;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.datatable.DataTableBuilderFactory;
 import org.apache.pinot.core.query.request.context.QueryContext;
 
 
 public class MetadataResultsBlock extends BaseResultsBlock {
 
+  @Nullable
   @Override
-  public DataTable getDataTable(QueryContext queryContext)
-      throws Exception {
-    DataTable dataTable = DataTableBuilderUtils.getEmptyDataTable();
-    attachMetadataToDataTable(dataTable);
-    return dataTable;
+  public DataSchema getDataSchema(QueryContext queryContext) {
+    return null;
+  }
+
+  @Nullable
+  @Override
+  public Collection<Object[]> getRows(QueryContext queryContext) {
+    return null;
+  }
+
+  @Override
+  public DataTable getDataTable(QueryContext queryContext) {
+    return DataTableBuilderFactory.getEmptyDataTable();
   }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/ResultsBlockUtils.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/ResultsBlockUtils.java
new file mode 100644
index 0000000000..5f5e7d0769
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/ResultsBlockUtils.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.blocks.results;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
+import 
org.apache.pinot.core.query.aggregation.function.DistinctAggregationFunction;
+import org.apache.pinot.core.query.distinct.DistinctTable;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.core.query.request.context.utils.QueryContextUtils;
+
+
+@SuppressWarnings("rawtypes")
+public class ResultsBlockUtils {
+  private ResultsBlockUtils() {
+  }
+
+  public static BaseResultsBlock buildEmptyQueryResults(QueryContext 
queryContext) {
+    if (QueryContextUtils.isSelectionQuery(queryContext)) {
+      return buildEmptySelectionQueryResults(queryContext);
+    } else if (QueryContextUtils.isAggregationQuery(queryContext)) {
+      if (queryContext.getGroupByExpressions() == null) {
+        return buildEmptyAggregationQueryResults(queryContext);
+      } else {
+        return buildEmptyGroupByQueryResults(queryContext);
+      }
+    } else {
+      assert QueryContextUtils.isDistinctQuery(queryContext);
+      return buildEmptyDistinctQueryResults(queryContext);
+    }
+  }
+
+  private static SelectionResultsBlock 
buildEmptySelectionQueryResults(QueryContext queryContext) {
+    List<ExpressionContext> selectExpressions = 
queryContext.getSelectExpressions();
+    int numSelectExpressions = selectExpressions.size();
+    String[] columnNames = new String[numSelectExpressions];
+    for (int i = 0; i < numSelectExpressions; i++) {
+      columnNames[i] = selectExpressions.get(i).toString();
+    }
+    ColumnDataType[] columnDataTypes = new 
ColumnDataType[numSelectExpressions];
+    // NOTE: Use STRING column data type as default for selection query
+    Arrays.fill(columnDataTypes, ColumnDataType.STRING);
+    DataSchema dataSchema = new DataSchema(columnNames, columnDataTypes);
+    return new SelectionResultsBlock(dataSchema, Collections.emptyList());
+  }
+
+  private static AggregationResultsBlock 
buildEmptyAggregationQueryResults(QueryContext queryContext) {
+    AggregationFunction[] aggregationFunctions = 
queryContext.getAggregationFunctions();
+    assert aggregationFunctions != null;
+    int numAggregations = aggregationFunctions.length;
+    List<Object> results = new ArrayList<>(numAggregations);
+    for (AggregationFunction aggregationFunction : aggregationFunctions) {
+      
results.add(aggregationFunction.extractAggregationResult(aggregationFunction.createAggregationResultHolder()));
+    }
+    return new AggregationResultsBlock(aggregationFunctions, results);
+  }
+
+  private static GroupByResultsBlock 
buildEmptyGroupByQueryResults(QueryContext queryContext) {
+    AggregationFunction[] aggregationFunctions = 
queryContext.getAggregationFunctions();
+    assert aggregationFunctions != null;
+    int numAggregations = aggregationFunctions.length;
+    List<ExpressionContext> groupByExpressions = 
queryContext.getGroupByExpressions();
+    assert groupByExpressions != null;
+    int numColumns = groupByExpressions.size() + numAggregations;
+    String[] columnNames = new String[numColumns];
+    ColumnDataType[] columnDataTypes = new ColumnDataType[numColumns];
+    int index = 0;
+    for (ExpressionContext groupByExpression : groupByExpressions) {
+      columnNames[index] = groupByExpression.toString();
+      // Use STRING column data type as default for group-by expressions
+      columnDataTypes[index] = ColumnDataType.STRING;
+      index++;
+    }
+    for (AggregationFunction aggregationFunction : aggregationFunctions) {
+      // NOTE: Use AggregationFunction.getResultColumnName() for SQL format 
response
+      columnNames[index] = aggregationFunction.getResultColumnName();
+      columnDataTypes[index] = 
aggregationFunction.getIntermediateResultColumnType();
+      index++;
+    }
+    return new GroupByResultsBlock(new DataSchema(columnNames, 
columnDataTypes));
+  }
+
+  private static DistinctResultsBlock 
buildEmptyDistinctQueryResults(QueryContext queryContext) {
+    AggregationFunction[] aggregationFunctions = 
queryContext.getAggregationFunctions();
+    assert aggregationFunctions != null && aggregationFunctions.length == 1
+        && aggregationFunctions[0] instanceof DistinctAggregationFunction;
+    DistinctAggregationFunction distinctAggregationFunction = 
(DistinctAggregationFunction) aggregationFunctions[0];
+    String[] columnNames = distinctAggregationFunction.getColumns();
+    ColumnDataType[] columnDataTypes = new ColumnDataType[columnNames.length];
+    // NOTE: Use STRING column data type as default for distinct query
+    Arrays.fill(columnDataTypes, ColumnDataType.STRING);
+    DistinctTable distinctTable =
+        new DistinctTable(new DataSchema(columnNames, columnDataTypes), 
Collections.emptySet(),
+            queryContext.isNullHandlingEnabled());
+    return new DistinctResultsBlock(distinctAggregationFunction, 
distinctTable);
+  }
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/SelectionResultsBlock.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/SelectionResultsBlock.java
index 4f714ae7be..2a7fd5847c 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/SelectionResultsBlock.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/SelectionResultsBlock.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.core.operator.blocks.results;
 
+import java.io.IOException;
 import java.util.Collection;
 import org.apache.pinot.common.datatable.DataTable;
 import org.apache.pinot.common.utils.DataSchema;
@@ -45,12 +46,19 @@ public class SelectionResultsBlock extends BaseResultsBlock 
{
     return _rows;
   }
 
+  @Override
+  public DataSchema getDataSchema(QueryContext queryContext) {
+    return _dataSchema;
+  }
+
+  @Override
+  public Collection<Object[]> getRows(QueryContext queryContext) {
+    return _rows;
+  }
+
   @Override
   public DataTable getDataTable(QueryContext queryContext)
-      throws Exception {
-    DataTable dataTable =
-        SelectionOperatorUtils.getDataTableFromRows(_rows, _dataSchema, 
queryContext.isNullHandlingEnabled());
-    attachMetadataToDataTable(dataTable);
-    return dataTable;
+      throws IOException {
+    return SelectionOperatorUtils.getDataTableFromRows(_rows, _dataSchema, 
queryContext.isNullHandlingEnabled());
   }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/StreamingInstanceResponseOperator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingInstanceResponseOperator.java
similarity index 69%
rename from 
pinot-core/src/main/java/org/apache/pinot/core/operator/StreamingInstanceResponseOperator.java
rename to 
pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingInstanceResponseOperator.java
index 3243dcd406..4bbd21ba3c 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/StreamingInstanceResponseOperator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingInstanceResponseOperator.java
@@ -16,18 +16,16 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.core.operator;
+package org.apache.pinot.core.operator.streaming;
 
 import io.grpc.stub.StreamObserver;
 import java.io.IOException;
 import java.util.List;
-import org.apache.pinot.common.datatable.DataTable;
 import org.apache.pinot.common.exception.QueryException;
 import org.apache.pinot.common.proto.Server;
-import org.apache.pinot.core.common.datatable.DataTableBuilderUtils;
+import org.apache.pinot.core.operator.InstanceResponseOperator;
 import org.apache.pinot.core.operator.blocks.InstanceResponseBlock;
 import org.apache.pinot.core.operator.combine.BaseCombineOperator;
-import org.apache.pinot.core.operator.streaming.StreamingResponseUtils;
 import org.apache.pinot.core.query.request.context.QueryContext;
 import org.apache.pinot.segment.spi.FetchContext;
 import org.apache.pinot.segment.spi.IndexSegment;
@@ -46,18 +44,15 @@ public class StreamingInstanceResponseOperator extends 
InstanceResponseOperator
 
   @Override
   protected InstanceResponseBlock getNextBlock() {
-    InstanceResponseBlock nextBlock = super.getNextBlock();
-    DataTable instanceResponseDataTable = 
nextBlock.getInstanceResponseDataTable();
-    DataTable metadataOnlyDataTable;
+    InstanceResponseBlock responseBlock = super.getNextBlock();
+    InstanceResponseBlock metadataOnlyResponseBlock = 
responseBlock.toMetadataOnlyResponseBlock();
     try {
-      metadataOnlyDataTable = 
instanceResponseDataTable.toMetadataOnlyDataTable();
-      
_streamObserver.onNext(StreamingResponseUtils.getDataResponse(instanceResponseDataTable.toDataOnlyDataTable()));
+      
_streamObserver.onNext(StreamingResponseUtils.getDataResponse(responseBlock.toDataOnlyDataTable()));
     } catch (IOException e) {
-      // when exception occurs in streaming, we return an error-only metadata 
block.
-      metadataOnlyDataTable = DataTableBuilderUtils.getEmptyDataTable();
-      
metadataOnlyDataTable.addException(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR,
 e));
+      metadataOnlyResponseBlock.addException(
+          
QueryException.getException(QueryException.DATA_TABLE_SERIALIZATION_ERROR, e));
     }
     // return a metadata-only block.
-    return new InstanceResponseBlock(metadataOnlyDataTable);
+    return metadataOnlyResponseBlock;
   }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/plan/GlobalPlanImplV0.java 
b/pinot-core/src/main/java/org/apache/pinot/core/plan/GlobalPlanImplV0.java
index d2bff6ab65..94cccc45ae 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/plan/GlobalPlanImplV0.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/GlobalPlanImplV0.java
@@ -19,7 +19,6 @@
 package org.apache.pinot.core.plan;
 
 import java.util.concurrent.TimeoutException;
-import org.apache.pinot.common.datatable.DataTable;
 import org.apache.pinot.core.operator.InstanceResponseOperator;
 import org.apache.pinot.core.operator.blocks.InstanceResponseBlock;
 import org.slf4j.Logger;
@@ -46,7 +45,7 @@ public class GlobalPlanImplV0 implements Plan {
   }
 
   @Override
-  public DataTable execute()
+  public InstanceResponseBlock execute()
       throws TimeoutException {
     long startTime = System.currentTimeMillis();
     InstanceResponseOperator instanceResponseOperator = 
_instanceResponsePlanNode.run();
@@ -58,6 +57,6 @@ public class GlobalPlanImplV0 implements Plan {
     InstanceResponseBlock instanceResponseBlock = 
instanceResponseOperator.nextBlock();
     long endTime2 = System.currentTimeMillis();
     LOGGER.debug("InstanceResponseOperator.nextBlock() took: {}ms", endTime2 - 
endTime1);
-    return instanceResponseBlock.getInstanceResponseDataTable();
+    return instanceResponseBlock;
   }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/plan/Plan.java 
b/pinot-core/src/main/java/org/apache/pinot/core/plan/Plan.java
index 567dce0f6a..06432da734 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/plan/Plan.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/Plan.java
@@ -19,7 +19,7 @@
 package org.apache.pinot.core.plan;
 
 import java.util.concurrent.TimeoutException;
-import org.apache.pinot.common.datatable.DataTable;
+import org.apache.pinot.core.operator.blocks.InstanceResponseBlock;
 import org.apache.pinot.spi.annotations.InterfaceAudience;
 
 
@@ -33,6 +33,6 @@ public interface Plan {
   PlanNode getPlanNode();
 
   /** Execute the query plan and get the instance response. */
-  DataTable execute()
+  InstanceResponseBlock execute()
       throws TimeoutException;
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/plan/StreamingInstanceResponsePlanNode.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/plan/StreamingInstanceResponsePlanNode.java
index 6cd2b09846..25b20ec5b9 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/plan/StreamingInstanceResponsePlanNode.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/plan/StreamingInstanceResponsePlanNode.java
@@ -22,7 +22,7 @@ import io.grpc.stub.StreamObserver;
 import java.util.List;
 import org.apache.pinot.common.proto.Server;
 import org.apache.pinot.core.operator.InstanceResponseOperator;
-import org.apache.pinot.core.operator.StreamingInstanceResponseOperator;
+import 
org.apache.pinot.core.operator.streaming.StreamingInstanceResponseOperator;
 import org.apache.pinot.core.query.request.context.QueryContext;
 import org.apache.pinot.segment.spi.FetchContext;
 import org.apache.pinot.segment.spi.IndexSegment;
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/QueryExecutor.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/QueryExecutor.java
index 4712d7f911..b7ac472743 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/QueryExecutor.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/QueryExecutor.java
@@ -24,9 +24,11 @@ import javax.annotation.Nullable;
 import javax.annotation.concurrent.ThreadSafe;
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.pinot.common.datatable.DataTable;
+import org.apache.pinot.common.exception.QueryException;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.proto.Server;
 import org.apache.pinot.core.data.manager.InstanceDataManager;
+import org.apache.pinot.core.operator.blocks.InstanceResponseBlock;
 import org.apache.pinot.core.query.request.ServerQueryRequest;
 import org.apache.pinot.spi.env.PinotConfiguration;
 
@@ -55,7 +57,10 @@ public interface QueryExecutor {
 
   /**
    * Processes the non-streaming query with the given executor service.
+   *
+   * Deprecated: use execute() instead.
    */
+  @Deprecated
   default DataTable processQuery(ServerQueryRequest queryRequest, 
ExecutorService executorService) {
     return processQuery(queryRequest, executorService, null);
   }
@@ -64,14 +69,48 @@ public interface QueryExecutor {
    * Processes the query (streaming or non-streaming) with the given executor 
service.
    * <ul>
    *   <li>
-   *     For streaming request, the returned DataTable contains only the 
metadata. The response is streamed back via the
-   *     observer.
+   *     For streaming request, the returned {@link DataTable} contains only 
the metadata. The response is streamed back
+   *     via the observer.
    *   </li>
    *   <li>
-   *     For non-streaming request, the returned DataTable contains both data 
and metadata.
+   *     For non-streaming request, the returned {@link DataTable} contains 
both data and metadata.
    *   </li>
    * </ul>
+   *
+   * Deprecated: use execute() instead.
    */
-  DataTable processQuery(ServerQueryRequest queryRequest, ExecutorService 
executorService,
+  @Deprecated
+  default DataTable processQuery(ServerQueryRequest queryRequest, 
ExecutorService executorService,
+      @Nullable StreamObserver<Server.ServerResponse> responseObserver) {
+    InstanceResponseBlock instanceResponse = execute(queryRequest, 
executorService, responseObserver);
+    try {
+      return instanceResponse.toDataTable();
+    } catch (Exception e) {
+      DataTable metadataOnlyDataTable = 
instanceResponse.toMetadataOnlyDataTable();
+      
metadataOnlyDataTable.addException(QueryException.getException(QueryException.DATA_TABLE_SERIALIZATION_ERROR,
 e));
+      return metadataOnlyDataTable;
+    }
+  }
+
+  /**
+   * Executes the non-streaming query with the given executor service.
+   */
+  default InstanceResponseBlock execute(ServerQueryRequest queryRequest, 
ExecutorService executorService) {
+    return execute(queryRequest, executorService, null);
+  }
+
+  /**
+   * Executes the query (streaming or non-streaming) with the given executor 
service.
+   * <ul>
+   *   <li>
+   *     For streaming request, the returned {@link InstanceResponseBlock} 
contains only the metadata. The response is
+   *     streamed back via the observer.
+   *   </li>
+   *   <li>
+   *     For non-streaming request, the returned {@link InstanceResponseBlock} 
contains both data and metadata.
+   *   </li>
+   * </ul>
+   */
+  InstanceResponseBlock execute(ServerQueryRequest queryRequest, 
ExecutorService executorService,
       @Nullable StreamObserver<Server.ServerResponse> responseObserver);
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
index 075882448e..bf0838d4d1 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
@@ -20,10 +20,7 @@ package org.apache.pinot.core.query.executor;
 
 import com.google.common.base.Preconditions;
 import io.grpc.stub.StreamObserver;
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
-import java.util.Base64;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -34,7 +31,6 @@ import javax.annotation.Nullable;
 import javax.annotation.concurrent.ThreadSafe;
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.commons.lang.StringUtils;
-import org.apache.pinot.common.datatable.DataTable;
 import org.apache.pinot.common.datatable.DataTable.MetadataKey;
 import org.apache.pinot.common.exception.QueryException;
 import org.apache.pinot.common.function.TransformFunctionType;
@@ -45,16 +41,16 @@ import org.apache.pinot.common.proto.Server;
 import org.apache.pinot.common.request.context.ExpressionContext;
 import org.apache.pinot.common.request.context.FilterContext;
 import org.apache.pinot.common.request.context.FunctionContext;
-import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.core.common.ExplainPlanRowData;
 import org.apache.pinot.core.common.ExplainPlanRows;
-import org.apache.pinot.core.common.ObjectSerDeUtils;
 import org.apache.pinot.core.common.Operator;
-import org.apache.pinot.core.common.datatable.DataTableBuilder;
-import org.apache.pinot.core.common.datatable.DataTableBuilderFactory;
-import org.apache.pinot.core.common.datatable.DataTableBuilderUtils;
 import org.apache.pinot.core.data.manager.InstanceDataManager;
 import org.apache.pinot.core.data.manager.realtime.RealtimeTableDataManager;
+import org.apache.pinot.core.operator.blocks.InstanceResponseBlock;
+import org.apache.pinot.core.operator.blocks.results.AggregationResultsBlock;
+import org.apache.pinot.core.operator.blocks.results.BaseResultsBlock;
+import org.apache.pinot.core.operator.blocks.results.ExplainResultsBlock;
+import org.apache.pinot.core.operator.blocks.results.ResultsBlockUtils;
 import org.apache.pinot.core.operator.filter.EmptyFilterOperator;
 import org.apache.pinot.core.operator.filter.MatchAllFilterOperator;
 import org.apache.pinot.core.plan.Plan;
@@ -68,6 +64,7 @@ import org.apache.pinot.core.query.request.ServerQueryRequest;
 import org.apache.pinot.core.query.request.context.QueryContext;
 import org.apache.pinot.core.query.request.context.TimerContext;
 import 
org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
+import org.apache.pinot.core.query.utils.idset.IdSet;
 import org.apache.pinot.core.util.QueryOptionsUtils;
 import org.apache.pinot.core.util.trace.TraceContext;
 import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
@@ -132,20 +129,20 @@ public class ServerQueryExecutorV1Impl implements 
QueryExecutor {
   }
 
   @Override
-  public DataTable processQuery(ServerQueryRequest queryRequest, 
ExecutorService executorService,
+  public InstanceResponseBlock execute(ServerQueryRequest queryRequest, 
ExecutorService executorService,
       @Nullable StreamObserver<Server.ServerResponse> responseObserver) {
     if (!queryRequest.isEnableTrace()) {
-      return processQueryInternal(queryRequest, executorService, 
responseObserver);
+      return executeInternal(queryRequest, executorService, responseObserver);
     }
     try {
       Tracing.getTracer().register(queryRequest.getRequestId());
-      return processQueryInternal(queryRequest, executorService, 
responseObserver);
+      return executeInternal(queryRequest, executorService, responseObserver);
     } finally {
       Tracing.getTracer().unregister();
     }
   }
 
-  private DataTable processQueryInternal(ServerQueryRequest queryRequest, 
ExecutorService executorService,
+  private InstanceResponseBlock executeInternal(ServerQueryRequest 
queryRequest, ExecutorService executorService,
       @Nullable StreamObserver<Server.ServerResponse> responseObserver) {
     TimerContext timerContext = queryRequest.getTimerContext();
     TimerContext.Timer schedulerWaitTimer = 
timerContext.getPhaseTimer(ServerQueryPhase.SCHEDULER_WAIT);
@@ -178,20 +175,22 @@ public class ServerQueryExecutorV1Impl implements 
QueryExecutor {
       String errorMessage =
           String.format("Query scheduling took %dms (longer than query timeout 
of %dms) on server: %s",
               querySchedulingTimeMs, queryTimeoutMs, 
_instanceDataManager.getInstanceId());
-      DataTable dataTable = DataTableBuilderUtils.getEmptyDataTable();
-      
dataTable.addException(QueryException.getException(QueryException.QUERY_SCHEDULING_TIMEOUT_ERROR,
 errorMessage));
+      InstanceResponseBlock instanceResponse = new InstanceResponseBlock();
+      instanceResponse.addException(
+          
QueryException.getException(QueryException.QUERY_SCHEDULING_TIMEOUT_ERROR, 
errorMessage));
       LOGGER.error("{} while processing requestId: {}", errorMessage, 
requestId);
-      return dataTable;
+      return instanceResponse;
     }
 
     TableDataManager tableDataManager = 
_instanceDataManager.getTableDataManager(tableNameWithType);
     if (tableDataManager == null) {
       String errorMessage = String.format("Failed to find table: %s on server: 
%s", tableNameWithType,
           _instanceDataManager.getInstanceId());
-      DataTable dataTable = DataTableBuilderUtils.getEmptyDataTable();
-      
dataTable.addException(QueryException.getException(QueryException.SERVER_TABLE_MISSING_ERROR,
 errorMessage));
+      InstanceResponseBlock instanceResponse = new InstanceResponseBlock();
+      instanceResponse.addException(
+          
QueryException.getException(QueryException.SERVER_TABLE_MISSING_ERROR, 
errorMessage));
       LOGGER.error("{} while processing requestId: {}", errorMessage, 
requestId);
-      return dataTable;
+      return instanceResponse;
     }
 
     List<String> segmentsToQuery = queryRequest.getSegmentsToQuery();
@@ -245,17 +244,17 @@ public class ServerQueryExecutorV1Impl implements 
QueryExecutor {
       }
     }
 
-    DataTable dataTable = null;
+    InstanceResponseBlock instanceResponse = null;
     try {
-      dataTable = processQuery(indexSegments, queryContext, timerContext, 
executorService, responseObserver,
+      instanceResponse = executeInternal(indexSegments, queryContext, 
timerContext, executorService, responseObserver,
           queryRequest.isEnableStreaming());
     } catch (Exception e) {
       _serverMetrics.addMeteredTableValue(tableNameWithType, 
ServerMeter.QUERY_EXECUTION_EXCEPTIONS, 1);
-      dataTable = DataTableBuilderUtils.getEmptyDataTable();
+      instanceResponse = new InstanceResponseBlock();
       // Do not log verbose error for BadQueryRequestException and 
QueryCancelledException.
       if (e instanceof BadQueryRequestException) {
         LOGGER.info("Caught BadQueryRequestException while processing 
requestId: {}, {}", requestId, e.getMessage());
-        
dataTable.addException(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR,
 e));
+        
instanceResponse.addException(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR,
 e));
       } else if (e instanceof QueryCancelledException) {
         if (LOGGER.isDebugEnabled()) {
           LOGGER.debug("Cancelled while processing requestId: {}", requestId, 
e);
@@ -265,28 +264,27 @@ public class ServerQueryExecutorV1Impl implements 
QueryExecutor {
         // NOTE most likely the onFailure() callback registered on query 
future in InstanceRequestHandler would
         // return the error table to broker sooner than here. But in case of 
race condition, we construct the error
         // table here too.
-        
dataTable.addException(QueryException.getException(QueryException.QUERY_CANCELLATION_ERROR,
+        
instanceResponse.addException(QueryException.getException(QueryException.QUERY_CANCELLATION_ERROR,
             "Query cancelled on: " + _instanceDataManager.getInstanceId()));
       } else {
         LOGGER.error("Exception processing requestId {}", requestId, e);
-        
dataTable.addException(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR,
 e));
+        
instanceResponse.addException(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR,
 e));
       }
     } finally {
       for (SegmentDataManager segmentDataManager : segmentDataManagers) {
         tableDataManager.releaseSegment(segmentDataManager);
       }
       if (queryRequest.isEnableTrace()) {
-        if (TraceContext.traceEnabled() && dataTable != null) {
-          dataTable.getMetadata().put(MetadataKey.TRACE_INFO.getName(), 
TraceContext.getTraceInfo());
+        if (TraceContext.traceEnabled() && instanceResponse != null) {
+          instanceResponse.addMetadata(MetadataKey.TRACE_INFO.getName(), 
TraceContext.getTraceInfo());
         }
       }
     }
 
     queryProcessingTimer.stopAndRecord();
     long queryProcessingTime = queryProcessingTimer.getDurationMs();
-    Map<String, String> metadata = dataTable.getMetadata();
-    metadata.put(MetadataKey.NUM_SEGMENTS_QUERIED.getName(), 
Integer.toString(numSegmentsAcquired));
-    metadata.put(MetadataKey.TIME_USED_MS.getName(), 
Long.toString(queryProcessingTime));
+    instanceResponse.addMetadata(MetadataKey.NUM_SEGMENTS_QUERIED.getName(), 
Integer.toString(numSegmentsAcquired));
+    instanceResponse.addMetadata(MetadataKey.TIME_USED_MS.getName(), 
Long.toString(queryProcessingTime));
 
     // When segment is removed from the IdealState:
     // 1. Controller schedules a state transition to server to turn segment 
OFFLINE
@@ -302,7 +300,7 @@ public class ServerQueryExecutorV1Impl implements 
QueryExecutor {
               .collect(Collectors.toList());
       int numMissingSegments = missingSegments.size();
       if (numMissingSegments > 0) {
-        
dataTable.addException(QueryException.getException(QueryException.SERVER_SEGMENT_MISSING_ERROR,
+        
instanceResponse.addException(QueryException.getException(QueryException.SERVER_SEGMENT_MISSING_ERROR,
             String.format("%d segments %s missing on server: %s", 
numMissingSegments, missingSegments,
                 _instanceDataManager.getInstanceId())));
         _serverMetrics.addMeteredTableValue(tableNameWithType, 
ServerMeter.NUM_MISSING_SEGMENTS, numMissingSegments);
@@ -310,31 +308,32 @@ public class ServerQueryExecutorV1Impl implements 
QueryExecutor {
     }
 
     if (tableDataManager instanceof RealtimeTableDataManager) {
-      long minConsumingFreshnessTimeMs = Long.MAX_VALUE;
+      long minConsumingFreshnessTimeMs;
       if (numConsumingSegmentsQueried > 0) {
         minConsumingFreshnessTimeMs = minIngestionTimeMs != Long.MAX_VALUE ? 
minIngestionTimeMs : minIndexTimeMs;
-        metadata.put(MetadataKey.NUM_CONSUMING_SEGMENTS_QUERIED.getName(),
+        
instanceResponse.addMetadata(MetadataKey.NUM_CONSUMING_SEGMENTS_QUERIED.getName(),
             Integer.toString(numConsumingSegmentsQueried));
-        metadata.put(MetadataKey.MIN_CONSUMING_FRESHNESS_TIME_MS.getName(), 
Long.toString(minConsumingFreshnessTimeMs));
+        
instanceResponse.addMetadata(MetadataKey.MIN_CONSUMING_FRESHNESS_TIME_MS.getName(),
+            Long.toString(minConsumingFreshnessTimeMs));
         LOGGER.debug("Request {} queried {} consuming segments with 
minConsumingFreshnessTimeMs: {}", requestId,
             numConsumingSegmentsQueried, minConsumingFreshnessTimeMs);
       } else if (numConsumingSegmentsQueried == 0 && maxEndTimeMs != 
Long.MIN_VALUE) {
         minConsumingFreshnessTimeMs = maxEndTimeMs;
-        metadata.put(MetadataKey.MIN_CONSUMING_FRESHNESS_TIME_MS.getName(), 
Long.toString(maxEndTimeMs));
+        
instanceResponse.addMetadata(MetadataKey.MIN_CONSUMING_FRESHNESS_TIME_MS.getName(),
+            Long.toString(maxEndTimeMs));
         LOGGER.debug("Request {} queried {} consuming segments with 
minConsumingFreshnessTimeMs: {}", requestId,
             numConsumingSegmentsQueried, minConsumingFreshnessTimeMs);
       }
     }
 
     LOGGER.debug("Query processing time for request Id - {}: {}", requestId, 
queryProcessingTime);
-    LOGGER.debug("InstanceResponse for request Id - {}: {}", requestId, 
dataTable);
-    return dataTable;
+    return instanceResponse;
   }
 
   // NOTE: This method might change indexSegments. Do not use it after calling 
this method.
-  private DataTable processQuery(List<IndexSegment> indexSegments, 
QueryContext queryContext, TimerContext timerContext,
-      ExecutorService executorService, @Nullable 
StreamObserver<Server.ServerResponse> responseObserver,
-      boolean enableStreaming)
+  private InstanceResponseBlock executeInternal(List<IndexSegment> 
indexSegments, QueryContext queryContext,
+      TimerContext timerContext, ExecutorService executorService,
+      @Nullable StreamObserver<Server.ServerResponse> responseObserver, 
boolean enableStreaming)
       throws Exception {
     handleSubquery(queryContext, indexSegments, timerContext, executorService);
 
@@ -345,33 +344,20 @@ public class ServerQueryExecutorV1Impl implements 
QueryExecutor {
     }
 
     TimerContext.Timer segmentPruneTimer = 
timerContext.startNewPhaseTimer(ServerQueryPhase.SEGMENT_PRUNING);
-    int totalSegments = indexSegments.size();
+    int numTotalSegments = indexSegments.size();
     SegmentPrunerStatistics prunerStats = new SegmentPrunerStatistics();
     List<IndexSegment> selectedSegments = 
_segmentPrunerService.prune(indexSegments, queryContext, prunerStats);
     segmentPruneTimer.stopAndRecord();
     int numSelectedSegments = selectedSegments.size();
     LOGGER.debug("Matched {} segments after pruning", numSelectedSegments);
+    InstanceResponseBlock instanceResponse;
     if (numSelectedSegments == 0) {
-      // Only return metadata for streaming query
-      DataTable dataTable;
       if (queryContext.isExplain()) {
-        dataTable = getExplainPlanResultsForNoMatchingSegment(totalSegments);
+        instanceResponse = 
getExplainResponseForNoMatchingSegment(numTotalSegments, queryContext);
       } else {
-        dataTable = DataTableBuilderUtils.buildEmptyDataTable(queryContext);
+        instanceResponse =
+            new 
InstanceResponseBlock(ResultsBlockUtils.buildEmptyQueryResults(queryContext), 
queryContext);
       }
-
-      Map<String, String> metadata = dataTable.getMetadata();
-      metadata.put(MetadataKey.TOTAL_DOCS.getName(), 
String.valueOf(numTotalDocs));
-      metadata.put(MetadataKey.NUM_DOCS_SCANNED.getName(), "0");
-      metadata.put(MetadataKey.NUM_ENTRIES_SCANNED_IN_FILTER.getName(), "0");
-      metadata.put(MetadataKey.NUM_ENTRIES_SCANNED_POST_FILTER.getName(), "0");
-      metadata.put(MetadataKey.NUM_SEGMENTS_PROCESSED.getName(), "0");
-      metadata.put(MetadataKey.NUM_SEGMENTS_MATCHED.getName(), "0");
-      metadata.put(MetadataKey.NUM_CONSUMING_SEGMENTS_PROCESSED.getName(), 
"0");
-      metadata.put(MetadataKey.NUM_CONSUMING_SEGMENTS_MATCHED.getName(), "0");
-      metadata.put(MetadataKey.NUM_SEGMENTS_PRUNED_BY_SERVER.getName(), 
String.valueOf(totalSegments));
-      addPrunerStats(metadata, prunerStats);
-      return dataTable;
     } else {
       TimerContext.Timer planBuildTimer = 
timerContext.startNewPhaseTimer(ServerQueryPhase.BUILD_QUERY_PLAN);
       Plan queryPlan =
@@ -381,40 +367,28 @@ public class ServerQueryExecutorV1Impl implements 
QueryExecutor {
       planBuildTimer.stopAndRecord();
 
       TimerContext.Timer planExecTimer = 
timerContext.startNewPhaseTimer(ServerQueryPhase.QUERY_PLAN_EXECUTION);
-      DataTable dataTable = queryContext.isExplain() ? 
processExplainPlanQueries(queryPlan) : queryPlan.execute();
+      instanceResponse = queryContext.isExplain() ? 
executeExplainQuery(queryPlan, queryContext) : queryPlan.execute();
       planExecTimer.stopAndRecord();
+    }
 
-      Map<String, String> metadata = dataTable.getMetadata();
-      // Update the total docs in the metadata based on the un-pruned segments
-      metadata.put(MetadataKey.TOTAL_DOCS.getName(), 
Long.toString(numTotalDocs));
+    // Update the total docs in the metadata based on the un-pruned segments
+    instanceResponse.addMetadata(MetadataKey.TOTAL_DOCS.getName(), 
Long.toString(numTotalDocs));
 
-      // Set the number of pruned segments. This count does not include the 
segments which returned empty filters
-      int prunedSegments = totalSegments - numSelectedSegments;
-      metadata.put(MetadataKey.NUM_SEGMENTS_PRUNED_BY_SERVER.getName(), 
String.valueOf(prunedSegments));
-      addPrunerStats(metadata, prunerStats);
+    // Set the number of pruned segments. This count does not include the 
segments which returned empty filters
+    int prunedSegments = numTotalSegments - numSelectedSegments;
+    
instanceResponse.addMetadata(MetadataKey.NUM_SEGMENTS_PRUNED_BY_SERVER.getName(),
 String.valueOf(prunedSegments));
+    addPrunerStats(instanceResponse, prunerStats);
 
-      return dataTable;
-    }
+    return instanceResponse;
   }
 
-  /** @return EXPLAIN_PLAN query result {@link DataTable} when no segments get 
selected for query execution.*/
-  private static DataTable getExplainPlanResultsForNoMatchingSegment(int 
totalNumSegments) {
-    DataTableBuilder dataTableBuilder = 
DataTableBuilderFactory.getDataTableBuilder(DataSchema.EXPLAIN_RESULT_SCHEMA);
-    try {
-      dataTableBuilder.startRow();
-      dataTableBuilder.setColumn(0, 
String.format(ExplainPlanRows.PLAN_START_FORMAT, totalNumSegments));
-      dataTableBuilder.setColumn(1, ExplainPlanRows.PLAN_START_IDS);
-      dataTableBuilder.setColumn(2, ExplainPlanRows.PLAN_START_IDS);
-      dataTableBuilder.finishRow();
-      dataTableBuilder.startRow();
-      dataTableBuilder.setColumn(0, 
ExplainPlanRows.ALL_SEGMENTS_PRUNED_ON_SERVER);
-      dataTableBuilder.setColumn(1, 2);
-      dataTableBuilder.setColumn(2, 1);
-      dataTableBuilder.finishRow();
-    } catch (IOException ioe) {
-      LOGGER.error("Unable to create EXPLAIN PLAN result table.", ioe);
-    }
-    return dataTableBuilder.build();
+  private static InstanceResponseBlock 
getExplainResponseForNoMatchingSegment(int numTotalSegments,
+      QueryContext queryContext) {
+    ExplainResultsBlock explainResults = new ExplainResultsBlock();
+    
explainResults.addOperator(String.format(ExplainPlanRows.PLAN_START_FORMAT, 
numTotalSegments),
+        ExplainPlanRows.PLAN_START_IDS, ExplainPlanRows.PLAN_START_IDS);
+    explainResults.addOperator(ExplainPlanRows.ALL_SEGMENTS_PRUNED_ON_SERVER, 
2, 1);
+    return new InstanceResponseBlock(explainResults, queryContext);
   }
 
   /**
@@ -502,9 +476,8 @@ public class ServerQueryExecutorV1Impl implements 
QueryExecutor {
     }
   }
 
-  /** @return EXPLAIN PLAN query result {@link DataTable}. */
-  public static DataTable processExplainPlanQueries(Plan queryPlan) {
-    DataTableBuilder dataTableBuilder = 
DataTableBuilderFactory.getDataTableBuilder(DataSchema.EXPLAIN_RESULT_SCHEMA);
+  public static InstanceResponseBlock executeExplainQuery(Plan queryPlan, 
QueryContext queryContext) {
+    ExplainResultsBlock explainResults = new ExplainResultsBlock();
     List<Operator> childOperators = 
queryPlan.getPlanNode().run().getChildOperators();
     assert childOperators.size() == 1;
     Operator root = childOperators.get(0);
@@ -512,55 +485,35 @@ public class ServerQueryExecutorV1Impl implements 
QueryExecutor {
     int numEmptyFilterSegments = 0;
     int numMatchAllFilterSegments = 0;
 
-    try {
-      // Get the list of unique explain plans
-      operatorDepthToRowDataMap = getAllSegmentsUniqueExplainPlanRowData(root);
-      List<ExplainPlanRows> listOfExplainPlans = new ArrayList<>();
-      operatorDepthToRowDataMap.forEach((key, value) -> 
listOfExplainPlans.addAll(value));
-
-      // Setup the combine root's explain string
-      setValueInDataTableBuilder(dataTableBuilder, root.toExplainString(), 2, 
1);
-
-      // Walk through all the explain plans and create the entries in the 
explain plan output for each plan
-      for (ExplainPlanRows explainPlanRows : listOfExplainPlans) {
-        numEmptyFilterSegments +=
-            explainPlanRows.isHasEmptyFilter() ? 
explainPlanRows.getNumSegmentsMatchingThisPlan() : 0;
-        numMatchAllFilterSegments +=
-            explainPlanRows.isHasMatchAllFilter() ? 
explainPlanRows.getNumSegmentsMatchingThisPlan() : 0;
-        setValueInDataTableBuilder(dataTableBuilder,
-            String.format(ExplainPlanRows.PLAN_START_FORMAT, 
explainPlanRows.getNumSegmentsMatchingThisPlan()),
-            ExplainPlanRows.PLAN_START_IDS, ExplainPlanRows.PLAN_START_IDS);
-        for (ExplainPlanRowData explainPlanRowData : 
explainPlanRows.getExplainPlanRowData()) {
-          setValueInDataTableBuilder(dataTableBuilder, 
explainPlanRowData.getExplainPlanString(),
-              explainPlanRowData.getOperatorId(), 
explainPlanRowData.getParentId());
-        }
+    // Get the list of unique explain plans
+    operatorDepthToRowDataMap = getAllSegmentsUniqueExplainPlanRowData(root);
+    List<ExplainPlanRows> listOfExplainPlans = new ArrayList<>();
+    operatorDepthToRowDataMap.forEach((key, value) -> 
listOfExplainPlans.addAll(value));
+
+    // Setup the combine root's explain string
+    explainResults.addOperator(root.toExplainString(), 2, 1);
+
+    // Walk through all the explain plans and create the entries in the 
explain plan output for each plan
+    for (ExplainPlanRows explainPlanRows : listOfExplainPlans) {
+      numEmptyFilterSegments +=
+          explainPlanRows.isHasEmptyFilter() ? 
explainPlanRows.getNumSegmentsMatchingThisPlan() : 0;
+      numMatchAllFilterSegments +=
+          explainPlanRows.isHasMatchAllFilter() ? 
explainPlanRows.getNumSegmentsMatchingThisPlan() : 0;
+      explainResults.addOperator(
+          String.format(ExplainPlanRows.PLAN_START_FORMAT, 
explainPlanRows.getNumSegmentsMatchingThisPlan()),
+          ExplainPlanRows.PLAN_START_IDS, ExplainPlanRows.PLAN_START_IDS);
+      for (ExplainPlanRowData explainPlanRowData : 
explainPlanRows.getExplainPlanRowData()) {
+        explainResults.addOperator(explainPlanRowData.getExplainPlanString(), 
explainPlanRowData.getOperatorId(),
+            explainPlanRowData.getParentId());
       }
-    } catch (IOException ioe) {
-      LOGGER.error("Unable to create EXPLAIN PLAN result table.", ioe);
     }
 
-    DataTable dataTable = dataTableBuilder.build();
-    dataTable.getMetadata()
-        .put(MetadataKey.EXPLAIN_PLAN_NUM_EMPTY_FILTER_SEGMENTS.getName(), 
String.valueOf(numEmptyFilterSegments));
-    
dataTable.getMetadata().put(MetadataKey.EXPLAIN_PLAN_NUM_MATCH_ALL_FILTER_SEGMENTS.getName(),
+    InstanceResponseBlock instanceResponse = new 
InstanceResponseBlock(explainResults, queryContext);
+    
instanceResponse.addMetadata(MetadataKey.EXPLAIN_PLAN_NUM_EMPTY_FILTER_SEGMENTS.getName(),
+        String.valueOf(numEmptyFilterSegments));
+    
instanceResponse.addMetadata(MetadataKey.EXPLAIN_PLAN_NUM_MATCH_ALL_FILTER_SEGMENTS.getName(),
         String.valueOf(numMatchAllFilterSegments));
-    return dataTable;
-  }
-
-  /**
-   * Set the value for the explain plan fields in the DataTableBuilder
-   */
-  private static void setValueInDataTableBuilder(DataTableBuilder 
dataTableBuilder, String explainPlanString,
-      int operatorId, int parentId)
-      throws IOException {
-    if (explainPlanString != null) {
-      // Only those operators that return a non-null description will be added 
to the EXPLAIN PLAN output.
-      dataTableBuilder.startRow();
-      dataTableBuilder.setColumn(0, explainPlanString);
-      dataTableBuilder.setColumn(1, operatorId);
-      dataTableBuilder.setColumn(2, parentId);
-      dataTableBuilder.finishRow();
-    }
+    return instanceResponse;
   }
 
   /**
@@ -626,16 +579,19 @@ public class ServerQueryExecutorV1Impl implements 
QueryExecutor {
       // Execute the subquery
       subquery.setEndTimeMs(endTimeMs);
       // Make a clone of indexSegments because the method might modify the list
-      DataTable dataTable =
-          processQuery(new ArrayList<>(indexSegments), subquery, timerContext, 
executorService, null, false);
-      DataTable.CustomObject idSet = dataTable.getCustomObject(0, 0);
-      Preconditions.checkState(idSet != null && idSet.getType() == 
ObjectSerDeUtils.ObjectType.IdSet.getValue(),
-          "Result is not an IdSet");
-      String serializedIdSet =
-          new String(Base64.getEncoder().encode(idSet.getBuffer()).array(), 
StandardCharsets.ISO_8859_1);
+      InstanceResponseBlock instanceResponse =
+          executeInternal(new ArrayList<>(indexSegments), subquery, 
timerContext, executorService, null, false);
+      BaseResultsBlock resultsBlock = instanceResponse.getResultsBlock();
+      Preconditions.checkState(resultsBlock instanceof AggregationResultsBlock,
+          "Got unexpected results block type: %s, expecting aggregation 
results",
+          resultsBlock != null ? resultsBlock.getClass().getSimpleName() : 
null);
+      Object result = ((AggregationResultsBlock) 
resultsBlock).getResults().get(0);
+      Preconditions.checkState(result instanceof IdSet, "Got unexpected result 
type: %s, expecting IdSet",
+          result != null ? result.getClass().getSimpleName() : null);
       // Rewrite the expression
       function.setFunctionName(TransformFunctionType.INIDSET.name());
-      arguments.set(1, 
ExpressionContext.forLiteralContext(FieldSpec.DataType.STRING, 
serializedIdSet));
+      arguments.set(1,
+          ExpressionContext.forLiteralContext(FieldSpec.DataType.STRING, 
((IdSet) result).toBase64String()));
     } else {
       for (ExpressionContext argument : arguments) {
         handleSubquery(argument, indexSegments, timerContext, executorService, 
endTimeMs);
@@ -643,9 +599,12 @@ public class ServerQueryExecutorV1Impl implements 
QueryExecutor {
     }
   }
 
-  private void addPrunerStats(Map<String, String> metadata, 
SegmentPrunerStatistics prunerStats) {
-    metadata.put(MetadataKey.NUM_SEGMENTS_PRUNED_INVALID.getName(), 
String.valueOf(prunerStats.getInvalidSegments()));
-    metadata.put(MetadataKey.NUM_SEGMENTS_PRUNED_BY_LIMIT.getName(), 
String.valueOf(prunerStats.getLimitPruned()));
-    metadata.put(MetadataKey.NUM_SEGMENTS_PRUNED_BY_VALUE.getName(), 
String.valueOf(prunerStats.getValuePruned()));
+  private void addPrunerStats(InstanceResponseBlock instanceResponse, 
SegmentPrunerStatistics prunerStats) {
+    
instanceResponse.addMetadata(MetadataKey.NUM_SEGMENTS_PRUNED_INVALID.getName(),
+        String.valueOf(prunerStats.getInvalidSegments()));
+    
instanceResponse.addMetadata(MetadataKey.NUM_SEGMENTS_PRUNED_BY_LIMIT.getName(),
+        String.valueOf(prunerStats.getLimitPruned()));
+    
instanceResponse.addMetadata(MetadataKey.NUM_SEGMENTS_PRUNED_BY_VALUE.getName(),
+        String.valueOf(prunerStats.getValuePruned()));
   }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
index 0ce0046a99..d1d2b4ec23 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
@@ -29,7 +29,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.LongAccumulator;
 import javax.annotation.Nullable;
-import org.apache.pinot.common.datatable.DataTable;
 import org.apache.pinot.common.datatable.DataTable.MetadataKey;
 import org.apache.pinot.common.exception.QueryException;
 import org.apache.pinot.common.metrics.ServerGauge;
@@ -38,7 +37,7 @@ import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.metrics.ServerQueryPhase;
 import org.apache.pinot.common.metrics.ServerTimer;
 import org.apache.pinot.common.response.ProcessingException;
-import org.apache.pinot.core.common.datatable.DataTableBuilderUtils;
+import org.apache.pinot.core.operator.blocks.InstanceResponseBlock;
 import org.apache.pinot.core.query.executor.QueryExecutor;
 import org.apache.pinot.core.query.request.ServerQueryRequest;
 import org.apache.pinot.core.query.request.context.TimerContext;
@@ -145,59 +144,58 @@ public abstract class QueryScheduler {
   @Nullable
   protected byte[] processQueryAndSerialize(ServerQueryRequest queryRequest, 
ExecutorService executorService) {
     _latestQueryTime.accumulate(System.currentTimeMillis());
-    DataTable dataTable;
+    InstanceResponseBlock instanceResponse;
     try {
-      dataTable = _queryExecutor.processQuery(queryRequest, executorService);
+      instanceResponse = _queryExecutor.execute(queryRequest, executorService);
     } catch (Exception e) {
       LOGGER.error("Encountered exception while processing requestId {} from 
broker {}", queryRequest.getRequestId(),
           queryRequest.getBrokerId(), e);
       // For not handled exceptions
       _serverMetrics.addMeteredGlobalValue(ServerMeter.UNCAUGHT_EXCEPTIONS, 1);
-      dataTable = DataTableBuilderUtils.getEmptyDataTable();
-      
dataTable.addException(QueryException.getException(QueryException.INTERNAL_ERROR,
 e));
+      instanceResponse = new InstanceResponseBlock();
+      
instanceResponse.addException(QueryException.getException(QueryException.INTERNAL_ERROR,
 e));
     }
     long requestId = queryRequest.getRequestId();
-    Map<String, String> dataTableMetadata = dataTable.getMetadata();
-    dataTableMetadata.put(MetadataKey.REQUEST_ID.getName(), 
Long.toString(requestId));
+    Map<String, String> responseMetadata = 
instanceResponse.getResponseMetadata();
+    responseMetadata.put(MetadataKey.REQUEST_ID.getName(), 
Long.toString(requestId));
 
-    byte[] responseBytes = serializeDataTable(queryRequest, dataTable);
+    byte[] responseBytes = serializeResponse(queryRequest, instanceResponse);
 
     // Log the statistics
     String tableNameWithType = queryRequest.getTableNameWithType();
     long numDocsScanned =
-        
Long.parseLong(dataTableMetadata.getOrDefault(MetadataKey.NUM_DOCS_SCANNED.getName(),
 INVALID_NUM_SCANNED));
+        
Long.parseLong(responseMetadata.getOrDefault(MetadataKey.NUM_DOCS_SCANNED.getName(),
 INVALID_NUM_SCANNED));
     long numEntriesScannedInFilter = Long.parseLong(
-        
dataTableMetadata.getOrDefault(MetadataKey.NUM_ENTRIES_SCANNED_IN_FILTER.getName(),
 INVALID_NUM_SCANNED));
+        
responseMetadata.getOrDefault(MetadataKey.NUM_ENTRIES_SCANNED_IN_FILTER.getName(),
 INVALID_NUM_SCANNED));
     long numEntriesScannedPostFilter = Long.parseLong(
-        
dataTableMetadata.getOrDefault(MetadataKey.NUM_ENTRIES_SCANNED_POST_FILTER.getName(),
 INVALID_NUM_SCANNED));
+        
responseMetadata.getOrDefault(MetadataKey.NUM_ENTRIES_SCANNED_POST_FILTER.getName(),
 INVALID_NUM_SCANNED));
     long numSegmentsProcessed = Long.parseLong(
-        
dataTableMetadata.getOrDefault(MetadataKey.NUM_SEGMENTS_PROCESSED.getName(), 
INVALID_SEGMENTS_COUNT));
+        
responseMetadata.getOrDefault(MetadataKey.NUM_SEGMENTS_PROCESSED.getName(), 
INVALID_SEGMENTS_COUNT));
     long numSegmentsMatched = Long.parseLong(
-        
dataTableMetadata.getOrDefault(MetadataKey.NUM_SEGMENTS_MATCHED.getName(), 
INVALID_SEGMENTS_COUNT));
+        
responseMetadata.getOrDefault(MetadataKey.NUM_SEGMENTS_MATCHED.getName(), 
INVALID_SEGMENTS_COUNT));
     long numSegmentsPrunedInvalid = Long.parseLong(
-        
dataTableMetadata.getOrDefault(MetadataKey.NUM_SEGMENTS_PRUNED_INVALID.getName(),
 INVALID_SEGMENTS_COUNT));
+        
responseMetadata.getOrDefault(MetadataKey.NUM_SEGMENTS_PRUNED_INVALID.getName(),
 INVALID_SEGMENTS_COUNT));
     long numSegmentsPrunedByLimit = Long.parseLong(
-        
dataTableMetadata.getOrDefault(MetadataKey.NUM_SEGMENTS_PRUNED_BY_LIMIT.getName(),
 INVALID_SEGMENTS_COUNT));
+        
responseMetadata.getOrDefault(MetadataKey.NUM_SEGMENTS_PRUNED_BY_LIMIT.getName(),
 INVALID_SEGMENTS_COUNT));
     long numSegmentsPrunedByValue = Long.parseLong(
-        
dataTableMetadata.getOrDefault(MetadataKey.NUM_SEGMENTS_PRUNED_BY_VALUE.getName(),
 INVALID_SEGMENTS_COUNT));
+        
responseMetadata.getOrDefault(MetadataKey.NUM_SEGMENTS_PRUNED_BY_VALUE.getName(),
 INVALID_SEGMENTS_COUNT));
     long numSegmentsConsuming = Long.parseLong(
-        
dataTableMetadata.getOrDefault(MetadataKey.NUM_CONSUMING_SEGMENTS_QUERIED.getName(),
 INVALID_SEGMENTS_COUNT));
+        
responseMetadata.getOrDefault(MetadataKey.NUM_CONSUMING_SEGMENTS_QUERIED.getName(),
 INVALID_SEGMENTS_COUNT));
     long numConsumingSegmentsProcessed = Long.parseLong(
-        
dataTableMetadata.getOrDefault(MetadataKey.NUM_CONSUMING_SEGMENTS_PROCESSED.getName(),
 INVALID_SEGMENTS_COUNT));
+        
responseMetadata.getOrDefault(MetadataKey.NUM_CONSUMING_SEGMENTS_PROCESSED.getName(),
 INVALID_SEGMENTS_COUNT));
     long numConsumingSegmentsMatched = Long.parseLong(
-        
dataTableMetadata.getOrDefault(MetadataKey.NUM_CONSUMING_SEGMENTS_MATCHED.getName(),
 INVALID_SEGMENTS_COUNT));
+        
responseMetadata.getOrDefault(MetadataKey.NUM_CONSUMING_SEGMENTS_MATCHED.getName(),
 INVALID_SEGMENTS_COUNT));
     long minConsumingFreshnessMs = Long.parseLong(
-        
dataTableMetadata.getOrDefault(MetadataKey.MIN_CONSUMING_FRESHNESS_TIME_MS.getName(),
 INVALID_FRESHNESS_MS));
+        
responseMetadata.getOrDefault(MetadataKey.MIN_CONSUMING_FRESHNESS_TIME_MS.getName(),
 INVALID_FRESHNESS_MS));
     int numResizes =
-        
Integer.parseInt(dataTableMetadata.getOrDefault(MetadataKey.NUM_RESIZES.getName(),
 INVALID_NUM_RESIZES));
+        
Integer.parseInt(responseMetadata.getOrDefault(MetadataKey.NUM_RESIZES.getName(),
 INVALID_NUM_RESIZES));
     long resizeTimeMs =
-        
Long.parseLong(dataTableMetadata.getOrDefault(MetadataKey.RESIZE_TIME_MS.getName(),
 INVALID_RESIZE_TIME_MS));
-    long threadCpuTimeNs =
-        
Long.parseLong(dataTableMetadata.getOrDefault(MetadataKey.THREAD_CPU_TIME_NS.getName(),
 "0"));
+        
Long.parseLong(responseMetadata.getOrDefault(MetadataKey.RESIZE_TIME_MS.getName(),
 INVALID_RESIZE_TIME_MS));
+    long threadCpuTimeNs = 
Long.parseLong(responseMetadata.getOrDefault(MetadataKey.THREAD_CPU_TIME_NS.getName(),
 "0"));
     long systemActivitiesCpuTimeNs =
-        
Long.parseLong(dataTableMetadata.getOrDefault(MetadataKey.SYSTEM_ACTIVITIES_CPU_TIME_NS.getName(),
 "0"));
+        
Long.parseLong(responseMetadata.getOrDefault(MetadataKey.SYSTEM_ACTIVITIES_CPU_TIME_NS.getName(),
 "0"));
     long responseSerializationCpuTimeNs =
-        
Long.parseLong(dataTableMetadata.getOrDefault(MetadataKey.RESPONSE_SER_CPU_TIME_NS.getName(),
 "0"));
+        
Long.parseLong(responseMetadata.getOrDefault(MetadataKey.RESPONSE_SER_CPU_TIME_NS.getName(),
 "0"));
     long totalCpuTimeNs = threadCpuTimeNs + systemActivitiesCpuTimeNs + 
responseSerializationCpuTimeNs;
 
     if (numDocsScanned > 0) {
@@ -311,20 +309,20 @@ public abstract class QueryScheduler {
   }
 
   /**
-   * Serialize the DataTable response for query request
+   * Serialize the instance response for query request
    * @param queryRequest Server query request for which response is serialized
-   * @param dataTable DataTable to serialize
+   * @param instanceResponse instance response to serialize
    * @return serialized response bytes
    */
   @Nullable
-  private byte[] serializeDataTable(ServerQueryRequest queryRequest, DataTable 
dataTable) {
+  private byte[] serializeResponse(ServerQueryRequest queryRequest, 
InstanceResponseBlock instanceResponse) {
     TimerContext timerContext = queryRequest.getTimerContext();
     TimerContext.Timer responseSerializationTimer =
         
timerContext.startNewPhaseTimer(ServerQueryPhase.RESPONSE_SERIALIZATION);
 
     byte[] responseByte = null;
     try {
-      responseByte = dataTable.toBytes();
+      responseByte = instanceResponse.toDataTable().toBytes();
     } catch (Exception e) {
       
_serverMetrics.addMeteredGlobalValue(ServerMeter.RESPONSE_SERIALIZATION_EXCEPTIONS,
 1);
       LOGGER.error("Caught exception while serializing response for requestId: 
{}, brokerId: {}",
@@ -344,12 +342,9 @@ public abstract class QueryScheduler {
    */
   protected ListenableFuture<byte[]> immediateErrorResponse(ServerQueryRequest 
queryRequest,
       ProcessingException error) {
-    DataTable result = DataTableBuilderUtils.getEmptyDataTable();
-
-    Map<String, String> dataTableMetadata = result.getMetadata();
-    dataTableMetadata.put(MetadataKey.REQUEST_ID.getName(), 
Long.toString(queryRequest.getRequestId()));
-
-    result.addException(error);
-    return Futures.immediateFuture(serializeDataTable(queryRequest, result));
+    InstanceResponseBlock instanceResponse = new InstanceResponseBlock();
+    instanceResponse.addMetadata(MetadataKey.REQUEST_ID.getName(), 
Long.toString(queryRequest.getRequestId()));
+    instanceResponse.addException(error);
+    return Futures.immediateFuture(serializeResponse(queryRequest, 
instanceResponse));
   }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java
index 71dbe1127f..649e377942 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.core.query.selection;
 
+import java.io.IOException;
 import java.math.BigDecimal;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -231,11 +232,11 @@ public class SelectionOperatorUtils {
    * @param dataSchema data schema.
    * @param nullHandlingEnabled whether null handling is enabled.
    * @return data table.
-   * @throws Exception
+   * @throws IOException
    */
   public static DataTable getDataTableFromRows(Collection<Object[]> rows, 
DataSchema dataSchema,
       boolean nullHandlingEnabled)
-      throws Exception {
+      throws IOException {
     ColumnDataType[] storedColumnDataTypes = 
dataSchema.getStoredColumnDataTypes();
     int numColumns = storedColumnDataTypes.length;
 
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java
index 886dd68446..9b5031c46b 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java
@@ -46,7 +46,7 @@ import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.metrics.ServerQueryPhase;
 import org.apache.pinot.common.metrics.ServerTimer;
 import org.apache.pinot.common.request.InstanceRequest;
-import org.apache.pinot.core.common.datatable.DataTableBuilderUtils;
+import org.apache.pinot.core.common.datatable.DataTableBuilderFactory;
 import org.apache.pinot.core.query.request.ServerQueryRequest;
 import org.apache.pinot.core.query.scheduler.QueryScheduler;
 import org.apache.pinot.server.access.AccessControl;
@@ -155,7 +155,7 @@ public class InstanceRequestHandler extends 
SimpleChannelInboundHandler<ByteBuf>
       long requestId = instanceRequest != null ? 
instanceRequest.getRequestId() : 0;
       LOGGER.error("Exception while processing instance request: {}", 
hexString, e);
       sendErrorResponse(ctx, requestId, tableNameWithType, queryArrivalTimeMs,
-          DataTableBuilderUtils.getEmptyDataTable(), e);
+          DataTableBuilderFactory.getEmptyDataTable(), e);
     }
   }
 
@@ -175,9 +175,9 @@ public class InstanceRequestHandler extends 
SimpleChannelInboundHandler<ByteBuf>
       }
       _queryFuturesById.put(queryId, future);
     }
-    Futures
-        .addCallback(future, createCallback(ctx, tableNameWithType, 
queryArrivalTimeMs, instanceRequest, queryRequest),
-            MoreExecutors.directExecutor());
+    Futures.addCallback(future,
+        createCallback(ctx, tableNameWithType, queryArrivalTimeMs, 
instanceRequest, queryRequest),
+        MoreExecutors.directExecutor());
   }
 
   private FutureCallback<byte[]> createCallback(ChannelHandlerContext ctx, 
String tableNameWithType,
@@ -198,7 +198,7 @@ public class InstanceRequestHandler extends 
SimpleChannelInboundHandler<ByteBuf>
         } else {
           // Send exception response.
           sendErrorResponse(ctx, queryRequest.getRequestId(), 
tableNameWithType, queryArrivalTimeMs,
-              DataTableBuilderUtils.getEmptyDataTable(), new Exception("Null 
query response."));
+              DataTableBuilderFactory.getEmptyDataTable(), new Exception("Null 
query response."));
         }
       }
 
@@ -225,7 +225,7 @@ public class InstanceRequestHandler extends 
SimpleChannelInboundHandler<ByteBuf>
           e = new Exception(t);
         }
         sendErrorResponse(ctx, instanceRequest.getRequestId(), 
tableNameWithType, queryArrivalTimeMs,
-            DataTableBuilderUtils.getEmptyDataTable(), e);
+            DataTableBuilderFactory.getEmptyDataTable(), e);
       }
     };
   }
@@ -236,7 +236,7 @@ public class InstanceRequestHandler extends 
SimpleChannelInboundHandler<ByteBuf>
     // will only be called if for some remote reason we are unable to handle 
exceptions in channelRead0.
     String message = "Unhandled Exception in " + getClass().getCanonicalName();
     LOGGER.error(message, cause);
-    sendErrorResponse(ctx, 0, null, System.currentTimeMillis(), 
DataTableBuilderUtils.getEmptyDataTable(),
+    sendErrorResponse(ctx, 0, null, System.currentTimeMillis(), 
DataTableBuilderFactory.getEmptyDataTable(),
         new Exception(message, cause));
   }
 
@@ -282,8 +282,8 @@ public class InstanceRequestHandler extends 
SimpleChannelInboundHandler<ByteBuf>
       Map<String, String> dataTableMetadata = dataTable.getMetadata();
       dataTableMetadata.put(MetadataKey.REQUEST_ID.getName(), 
Long.toString(requestId));
       if (cancelled) {
-        dataTable.addException(QueryException
-            .getException(QueryException.QUERY_CANCELLATION_ERROR, "Query 
cancelled on: " + _instanceName));
+        
dataTable.addException(QueryException.getException(QueryException.QUERY_CANCELLATION_ERROR,
+            "Query cancelled on: " + _instanceName));
       } else {
         
dataTable.addException(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR,
 e));
       }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/transport/grpc/GrpcQueryServer.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/transport/grpc/GrpcQueryServer.java
index 579f0691a5..f92a5e23f0 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/transport/grpc/GrpcQueryServer.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/transport/grpc/GrpcQueryServer.java
@@ -40,6 +40,7 @@ import org.apache.pinot.common.proto.PinotQueryServerGrpc;
 import org.apache.pinot.common.proto.Server.ServerRequest;
 import org.apache.pinot.common.proto.Server.ServerResponse;
 import org.apache.pinot.common.utils.TlsUtils;
+import org.apache.pinot.core.operator.blocks.InstanceResponseBlock;
 import org.apache.pinot.core.operator.streaming.StreamingResponseUtils;
 import org.apache.pinot.core.query.executor.QueryExecutor;
 import org.apache.pinot.core.query.request.ServerQueryRequest;
@@ -68,8 +69,7 @@ public class GrpcQueryServer extends 
PinotQueryServerGrpc.PinotQueryServerImplBa
     if (tlsConfig != null) {
       try {
         _server = 
NettyServerBuilder.forPort(port).sslContext(buildGRpcSslContext(tlsConfig))
-            .maxInboundMessageSize(config.getMaxInboundMessageSizeBytes())
-            .addService(this).build();
+            
.maxInboundMessageSize(config.getMaxInboundMessageSizeBytes()).addService(this).build();
       } catch (Exception e) {
         throw new RuntimeException("Failed to start secure grpcQueryServer", 
e);
       }
@@ -144,9 +144,9 @@ public class GrpcQueryServer extends 
PinotQueryServerGrpc.PinotQueryServerImplBa
     }
 
     // Process the query
-    DataTable dataTable;
+    InstanceResponseBlock instanceResponse;
     try {
-      dataTable = _queryExecutor.processQuery(queryRequest, _executorService, 
responseObserver);
+      instanceResponse = _queryExecutor.execute(queryRequest, 
_executorService, responseObserver);
     } catch (Exception e) {
       LOGGER.error("Caught exception while processing request {}: {} from 
broker: {}", queryRequest.getRequestId(),
           queryRequest.getQueryContext(), queryRequest.getBrokerId(), e);
@@ -155,18 +155,19 @@ public class GrpcQueryServer extends 
PinotQueryServerGrpc.PinotQueryServerImplBa
       return;
     }
 
-    ServerResponse response;
+    ServerResponse serverResponse;
     try {
-      response = queryRequest.isEnableStreaming() ? 
StreamingResponseUtils.getMetadataResponse(dataTable)
+      DataTable dataTable = instanceResponse.toDataTable();
+      serverResponse = queryRequest.isEnableStreaming() ? 
StreamingResponseUtils.getMetadataResponse(dataTable)
           : StreamingResponseUtils.getNonStreamingResponse(dataTable);
     } catch (Exception e) {
-      LOGGER.error("Caught exception while constructing response from data 
table for request {}: {} from broker: {}",
+      LOGGER.error("Caught exception while serializing response for request 
{}: {} from broker: {}",
           queryRequest.getRequestId(), queryRequest.getQueryContext(), 
queryRequest.getBrokerId(), e);
       
_serverMetrics.addMeteredGlobalValue(ServerMeter.RESPONSE_SERIALIZATION_EXCEPTIONS,
 1);
       responseObserver.onError(Status.INTERNAL.withCause(e).asException());
       return;
     }
-    responseObserver.onNext(response);
+    responseObserver.onNext(serverResponse);
     responseObserver.onCompleted();
   }
 }
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/common/datatable/DataTableSerDeTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/common/datatable/DataTableSerDeTest.java
index 86f28d1f6b..0040e842a8 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/common/datatable/DataTableSerDeTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/common/datatable/DataTableSerDeTest.java
@@ -108,7 +108,7 @@ public class DataTableSerDeTest {
         QueryException.getException(QueryException.QUERY_EXECUTION_ERROR, 
exception);
     String expected = processingException.getMessage();
 
-    DataTable dataTable = DataTableBuilderUtils.getEmptyDataTable();
+    DataTable dataTable = DataTableBuilderFactory.getEmptyDataTable();
     dataTable.addException(processingException);
     DataTable newDataTable = 
DataTableFactory.getDataTable(dataTable.toBytes());
     Assert.assertNull(newDataTable.getDataSchema());
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/common/datatable/DataTableBuilderUtilsTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/operator/blocks/results/ResultsBlockUtilsTest.java
similarity index 70%
rename from 
pinot-core/src/test/java/org/apache/pinot/core/common/datatable/DataTableBuilderUtilsTest.java
rename to 
pinot-core/src/test/java/org/apache/pinot/core/operator/blocks/results/ResultsBlockUtilsTest.java
index 7ef912bd6d..75fb625a94 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/common/datatable/DataTableBuilderUtilsTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/operator/blocks/results/ResultsBlockUtilsTest.java
@@ -16,12 +16,11 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.core.common.datatable;
+package org.apache.pinot.core.operator.blocks.results;
 
 import java.io.IOException;
 import org.apache.pinot.common.datatable.DataTable;
 import org.apache.pinot.common.utils.DataSchema;
-import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
 import org.apache.pinot.core.common.ObjectSerDeUtils;
 import org.apache.pinot.core.query.distinct.DistinctTable;
 import org.apache.pinot.core.query.request.context.QueryContext;
@@ -32,27 +31,28 @@ import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
 
 
-public class DataTableBuilderUtilsTest {
+public class ResultsBlockUtilsTest {
 
   @Test
-  public void testBuildEmptyDataTable()
+  public void testBuildEmptyQueryResults()
       throws IOException {
     // Selection
     QueryContext queryContext = 
QueryContextConverterUtils.getQueryContext("SELECT * FROM testTable WHERE foo = 
'bar'");
-    DataTable dataTable = 
DataTableBuilderUtils.buildEmptyDataTable(queryContext);
+    DataTable dataTable = 
ResultsBlockUtils.buildEmptyQueryResults(queryContext).getDataTable(queryContext);
     DataSchema dataSchema = dataTable.getDataSchema();
     assertEquals(dataSchema.getColumnNames(), new String[]{"*"});
-    assertEquals(dataSchema.getColumnDataTypes(), new 
ColumnDataType[]{ColumnDataType.STRING});
+    assertEquals(dataSchema.getColumnDataTypes(), new 
DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING});
     assertEquals(dataTable.getNumberOfRows(), 0);
 
     // Aggregation
     queryContext =
         QueryContextConverterUtils.getQueryContext("SELECT COUNT(*), SUM(a), 
MAX(b) FROM testTable WHERE foo = 'bar'");
-    dataTable = DataTableBuilderUtils.buildEmptyDataTable(queryContext);
+    dataTable = 
ResultsBlockUtils.buildEmptyQueryResults(queryContext).getDataTable(queryContext);
     dataSchema = dataTable.getDataSchema();
     assertEquals(dataSchema.getColumnNames(), new String[]{"count_star", 
"sum_a", "max_b"});
-    assertEquals(dataSchema.getColumnDataTypes(),
-        new ColumnDataType[]{ColumnDataType.LONG, ColumnDataType.DOUBLE, 
ColumnDataType.DOUBLE});
+    assertEquals(dataSchema.getColumnDataTypes(), new 
DataSchema.ColumnDataType[]{
+        DataSchema.ColumnDataType.LONG, DataSchema.ColumnDataType.DOUBLE, 
DataSchema.ColumnDataType.DOUBLE
+    });
     assertEquals(dataTable.getNumberOfRows(), 1);
     assertEquals(dataTable.getLong(0, 0), 0L);
     assertEquals(dataTable.getDouble(0, 1), 0.0);
@@ -61,20 +61,21 @@ public class DataTableBuilderUtilsTest {
     // Group-by
     queryContext = QueryContextConverterUtils.getQueryContext(
         "SELECT c, d, COUNT(*), SUM(a), MAX(b) FROM testTable WHERE foo = 
'bar' GROUP BY c, d");
-    dataTable = DataTableBuilderUtils.buildEmptyDataTable(queryContext);
+    dataTable = 
ResultsBlockUtils.buildEmptyQueryResults(queryContext).getDataTable(queryContext);
     dataSchema = dataTable.getDataSchema();
     assertEquals(dataSchema.getColumnNames(), new String[]{"c", "d", 
"count(*)", "sum(a)", "max(b)"});
-    assertEquals(dataSchema.getColumnDataTypes(), new ColumnDataType[]{
-        ColumnDataType.STRING, ColumnDataType.STRING, ColumnDataType.LONG, 
ColumnDataType.DOUBLE, ColumnDataType.DOUBLE
+    assertEquals(dataSchema.getColumnDataTypes(), new 
DataSchema.ColumnDataType[]{
+        DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.STRING, 
DataSchema.ColumnDataType.LONG,
+        DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE
     });
     assertEquals(dataTable.getNumberOfRows(), 0);
 
     // Distinct
     queryContext = QueryContextConverterUtils.getQueryContext("SELECT DISTINCT 
a, b FROM testTable WHERE foo = 'bar'");
-    dataTable = DataTableBuilderUtils.buildEmptyDataTable(queryContext);
+    dataTable = 
ResultsBlockUtils.buildEmptyQueryResults(queryContext).getDataTable(queryContext);
     dataSchema = dataTable.getDataSchema();
     assertEquals(dataSchema.getColumnNames(), new String[]{"distinct_a:b"});
-    assertEquals(dataSchema.getColumnDataTypes(), new 
ColumnDataType[]{ColumnDataType.OBJECT});
+    assertEquals(dataSchema.getColumnDataTypes(), new 
DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.OBJECT});
     assertEquals(dataTable.getNumberOfRows(), 1);
     DataTable.CustomObject customObject = dataTable.getCustomObject(0, 0);
     assertNotNull(customObject);
@@ -82,6 +83,6 @@ public class DataTableBuilderUtilsTest {
     assertEquals(distinctTable.size(), 0);
     assertEquals(distinctTable.getDataSchema().getColumnNames(), new 
String[]{"a", "b"});
     assertEquals(distinctTable.getDataSchema().getColumnDataTypes(),
-        new ColumnDataType[]{ColumnDataType.STRING, ColumnDataType.STRING});
+        new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING, 
DataSchema.ColumnDataType.STRING});
   }
 }
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorExceptionsTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorExceptionsTest.java
index d9cda7ad6d..1b3f0fa7ad 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorExceptionsTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorExceptionsTest.java
@@ -31,12 +31,12 @@ import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.helix.HelixManager;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
-import org.apache.pinot.common.datatable.DataTable;
 import org.apache.pinot.common.exception.QueryException;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.request.InstanceRequest;
 import org.apache.pinot.core.data.manager.InstanceDataManager;
 import org.apache.pinot.core.data.manager.offline.TableDataManagerProvider;
+import org.apache.pinot.core.operator.blocks.InstanceResponseBlock;
 import org.apache.pinot.core.query.request.ServerQueryRequest;
 import org.apache.pinot.segment.local.data.manager.TableDataManager;
 import org.apache.pinot.segment.local.data.manager.TableDataManagerConfig;
@@ -168,7 +168,7 @@ public class QueryExecutorExceptionsTest {
     String query = "SELECT COUNT(*) FROM " + TABLE_NAME;
     InstanceRequest instanceRequest = new InstanceRequest(0L, 
CalciteSqlCompiler.compileToBrokerRequest(query));
     instanceRequest.setSearchSegments(_segmentNames);
-    DataTable instanceResponse = 
_queryExecutor.processQuery(getQueryRequest(instanceRequest), QUERY_RUNNERS);
+    InstanceResponseBlock instanceResponse = 
_queryExecutor.execute(getQueryRequest(instanceRequest), QUERY_RUNNERS);
     Map<Integer, String> exceptions = instanceResponse.getExceptions();
     
assertTrue(exceptions.containsKey(QueryException.SERVER_SEGMENT_MISSING_ERROR_CODE));
 
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java
index a62ec50dbb..4ccef3a050 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java
@@ -28,11 +28,12 @@ import 
org.apache.commons.configuration.PropertiesConfiguration;
 import org.apache.commons.io.FileUtils;
 import org.apache.helix.HelixManager;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
-import org.apache.pinot.common.datatable.DataTable;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.request.InstanceRequest;
 import org.apache.pinot.core.data.manager.InstanceDataManager;
 import org.apache.pinot.core.data.manager.offline.TableDataManagerProvider;
+import org.apache.pinot.core.operator.blocks.InstanceResponseBlock;
+import org.apache.pinot.core.operator.blocks.results.AggregationResultsBlock;
 import org.apache.pinot.core.query.request.ServerQueryRequest;
 import org.apache.pinot.segment.local.data.manager.TableDataManager;
 import org.apache.pinot.segment.local.data.manager.TableDataManagerConfig;
@@ -61,6 +62,8 @@ import org.testng.annotations.Test;
 
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
 
 
 public class QueryExecutorTest {
@@ -84,7 +87,7 @@ public class QueryExecutorTest {
       throws Exception {
     // Set up the segments
     FileUtils.deleteQuietly(INDEX_DIR);
-    Assert.assertTrue(INDEX_DIR.mkdirs());
+    assertTrue(INDEX_DIR.mkdirs());
     URL resourceUrl = getClass().getClassLoader().getResource(AVRO_DATA_PATH);
     Assert.assertNotNull(resourceUrl);
     File avroFile = new File(resourceUrl.getFile());
@@ -160,8 +163,9 @@ public class QueryExecutorTest {
     String query = "SELECT COUNT(*) FROM " + TABLE_NAME;
     InstanceRequest instanceRequest = new InstanceRequest(0L, 
CalciteSqlCompiler.compileToBrokerRequest(query));
     instanceRequest.setSearchSegments(_segmentNames);
-    DataTable instanceResponse = 
_queryExecutor.processQuery(getQueryRequest(instanceRequest), QUERY_RUNNERS);
-    Assert.assertEquals(instanceResponse.getLong(0, 0), 400002L);
+    InstanceResponseBlock instanceResponse = 
_queryExecutor.execute(getQueryRequest(instanceRequest), QUERY_RUNNERS);
+    assertTrue(instanceResponse.getResultsBlock() instanceof 
AggregationResultsBlock);
+    assertEquals(((AggregationResultsBlock) 
instanceResponse.getResultsBlock()).getResults().get(0), 400002L);
   }
 
   @Test
@@ -169,8 +173,9 @@ public class QueryExecutorTest {
     String query = "SELECT SUM(met) FROM " + TABLE_NAME;
     InstanceRequest instanceRequest = new InstanceRequest(0L, 
CalciteSqlCompiler.compileToBrokerRequest(query));
     instanceRequest.setSearchSegments(_segmentNames);
-    DataTable instanceResponse = 
_queryExecutor.processQuery(getQueryRequest(instanceRequest), QUERY_RUNNERS);
-    Assert.assertEquals(instanceResponse.getDouble(0, 0), 40000200000.0);
+    InstanceResponseBlock instanceResponse = 
_queryExecutor.execute(getQueryRequest(instanceRequest), QUERY_RUNNERS);
+    assertTrue(instanceResponse.getResultsBlock() instanceof 
AggregationResultsBlock);
+    assertEquals(((AggregationResultsBlock) 
instanceResponse.getResultsBlock()).getResults().get(0), 40000200000.0);
   }
 
   @Test
@@ -178,8 +183,9 @@ public class QueryExecutorTest {
     String query = "SELECT MAX(met) FROM " + TABLE_NAME;
     InstanceRequest instanceRequest = new InstanceRequest(0L, 
CalciteSqlCompiler.compileToBrokerRequest(query));
     instanceRequest.setSearchSegments(_segmentNames);
-    DataTable instanceResponse = 
_queryExecutor.processQuery(getQueryRequest(instanceRequest), QUERY_RUNNERS);
-    Assert.assertEquals(instanceResponse.getDouble(0, 0), 200000.0);
+    InstanceResponseBlock instanceResponse = 
_queryExecutor.execute(getQueryRequest(instanceRequest), QUERY_RUNNERS);
+    assertTrue(instanceResponse.getResultsBlock() instanceof 
AggregationResultsBlock);
+    assertEquals(((AggregationResultsBlock) 
instanceResponse.getResultsBlock()).getResults().get(0), 200000.0);
   }
 
   @Test
@@ -187,8 +193,9 @@ public class QueryExecutorTest {
     String query = "SELECT MIN(met) FROM " + TABLE_NAME;
     InstanceRequest instanceRequest = new InstanceRequest(0L, 
CalciteSqlCompiler.compileToBrokerRequest(query));
     instanceRequest.setSearchSegments(_segmentNames);
-    DataTable instanceResponse = 
_queryExecutor.processQuery(getQueryRequest(instanceRequest), QUERY_RUNNERS);
-    Assert.assertEquals(instanceResponse.getDouble(0, 0), 0.0);
+    InstanceResponseBlock instanceResponse = 
_queryExecutor.execute(getQueryRequest(instanceRequest), QUERY_RUNNERS);
+    assertTrue(instanceResponse.getResultsBlock() instanceof 
AggregationResultsBlock);
+    assertEquals(((AggregationResultsBlock) 
instanceResponse.getResultsBlock()).getResults().get(0), 0.0);
   }
 
   @AfterClass
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/PrioritySchedulerTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/PrioritySchedulerTest.java
index 527744b795..01e5920aa8 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/PrioritySchedulerTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/PrioritySchedulerTest.java
@@ -44,8 +44,8 @@ import org.apache.pinot.common.datatable.DataTableFactory;
 import org.apache.pinot.common.exception.QueryException;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.proto.Server;
-import org.apache.pinot.core.common.datatable.DataTableBuilderUtils;
 import org.apache.pinot.core.data.manager.InstanceDataManager;
+import org.apache.pinot.core.operator.blocks.InstanceResponseBlock;
 import org.apache.pinot.core.query.executor.QueryExecutor;
 import org.apache.pinot.core.query.request.ServerQueryRequest;
 import 
org.apache.pinot.core.query.scheduler.resources.PolicyBasedResourceManager;
@@ -297,7 +297,7 @@ public class PrioritySchedulerTest {
     }
 
     @Override
-    public DataTable processQuery(ServerQueryRequest queryRequest, 
ExecutorService executorService,
+    public InstanceResponseBlock execute(ServerQueryRequest queryRequest, 
ExecutorService executorService,
         @Nullable StreamObserver<Server.ServerResponse> responseObserver) {
       if (_useBarrier) {
         try {
@@ -306,8 +306,8 @@ public class PrioritySchedulerTest {
           throw new RuntimeException(e);
         }
       }
-      DataTable result = DataTableBuilderUtils.getEmptyDataTable();
-      result.getMetadata().put(MetadataKey.TABLE.getName(), 
queryRequest.getTableNameWithType());
+      InstanceResponseBlock instanceResponse = new InstanceResponseBlock();
+      instanceResponse.addMetadata(MetadataKey.TABLE.getName(), 
queryRequest.getTableNameWithType());
       if (_useBarrier) {
         try {
           _validationBarrier.await();
@@ -316,7 +316,7 @@ public class PrioritySchedulerTest {
         }
       }
       _numQueries.countDown();
-      return result;
+      return instanceResponse;
     }
   }
 }
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java
index a6deca642e..9fdaf7fb7d 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java
@@ -27,7 +27,7 @@ import 
org.apache.pinot.common.datatable.DataTable.MetadataKey;
 import org.apache.pinot.common.metrics.BrokerMetrics;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.request.BrokerRequest;
-import org.apache.pinot.core.common.datatable.DataTableBuilderUtils;
+import org.apache.pinot.core.common.datatable.DataTableBuilderFactory;
 import org.apache.pinot.core.query.scheduler.QueryScheduler;
 import 
org.apache.pinot.core.transport.server.routing.stats.ServerRoutingStatsManager;
 import org.apache.pinot.server.access.AccessControl;
@@ -85,7 +85,7 @@ public class QueryRoutingTest {
   public void testValidResponse()
       throws Exception {
     long requestId = 123;
-    DataTable dataTable = DataTableBuilderUtils.getEmptyDataTable();
+    DataTable dataTable = DataTableBuilderFactory.getEmptyDataTable();
     dataTable.getMetadata().put(MetadataKey.REQUEST_ID.getName(), 
Long.toString(requestId));
     byte[] responseBytes = dataTable.toBytes();
 
@@ -163,7 +163,7 @@ public class QueryRoutingTest {
   public void testNonMatchingRequestId()
       throws Exception {
     long requestId = 123;
-    DataTable dataTable = DataTableBuilderUtils.getEmptyDataTable();
+    DataTable dataTable = DataTableBuilderFactory.getEmptyDataTable();
     dataTable.getMetadata().put(MetadataKey.REQUEST_ID.getName(), 
Long.toString(requestId));
     byte[] responseBytes = dataTable.toBytes();
 
@@ -196,7 +196,7 @@ public class QueryRoutingTest {
     // To avoid flakyness, set timeoutMs to 2000 msec. For some test runs, it 
can take up to
     // 1400 msec to mark request as failed.
     long timeoutMs = 2000L;
-    DataTable dataTable = DataTableBuilderUtils.getEmptyDataTable();
+    DataTable dataTable = DataTableBuilderFactory.getEmptyDataTable();
     dataTable.getMetadata().put(MetadataKey.REQUEST_ID.getName(), 
Long.toString(requestId));
     byte[] responseBytes = dataTable.toBytes();
 
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/BaseQueriesTest.java 
b/pinot-core/src/test/java/org/apache/pinot/queries/BaseQueriesTest.java
index d449ec5f8b..e44d40d70f 100644
--- a/pinot-core/src/test/java/org/apache/pinot/queries/BaseQueriesTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/BaseQueriesTest.java
@@ -32,6 +32,7 @@ import org.apache.pinot.common.request.BrokerRequest;
 import org.apache.pinot.common.request.PinotQuery;
 import org.apache.pinot.common.response.broker.BrokerResponseNative;
 import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.core.operator.blocks.InstanceResponseBlock;
 import org.apache.pinot.core.plan.Plan;
 import org.apache.pinot.core.plan.maker.InstancePlanMakerImplV2;
 import org.apache.pinot.core.plan.maker.PlanMaker;
@@ -197,10 +198,10 @@ public abstract class BaseQueriesTest {
     // Server side
     serverQueryContext.setEndTimeMs(System.currentTimeMillis() + 
Server.DEFAULT_QUERY_EXECUTOR_TIMEOUT_MS);
     Plan plan = planMaker.makeInstancePlan(getIndexSegments(), 
serverQueryContext, EXECUTOR_SERVICE, null);
-    DataTable instanceResponse;
+    InstanceResponseBlock instanceResponse;
     try {
       instanceResponse =
-          queryContext.isExplain() ? 
ServerQueryExecutorV1Impl.processExplainPlanQueries(plan) : plan.execute();
+          queryContext.isExplain() ? 
ServerQueryExecutorV1Impl.executeExplainQuery(plan, queryContext) : 
plan.execute();
     } catch (TimeoutException e) {
       throw new RuntimeException(e);
     }
@@ -212,7 +213,7 @@ public abstract class BaseQueriesTest {
     Map<ServerRoutingInstance, DataTable> dataTableMap = new HashMap<>();
     try {
       // For multi-threaded BrokerReduceService, we cannot reuse the same 
data-table
-      byte[] serializedResponse = instanceResponse.toBytes();
+      byte[] serializedResponse = instanceResponse.toDataTable().toBytes();
       dataTableMap.put(new ServerRoutingInstance("localhost", 1234, 
TableType.OFFLINE),
           DataTableFactory.getDataTable(serializedResponse));
       dataTableMap.put(new ServerRoutingInstance("localhost", 1234, 
TableType.REALTIME),
@@ -270,17 +271,17 @@ public abstract class BaseQueriesTest {
     Plan plan1 = planMaker.makeInstancePlan(instances.get(0), 
serverQueryContext, EXECUTOR_SERVICE, null);
     Plan plan2 = planMaker.makeInstancePlan(instances.get(1), 
serverQueryContext, EXECUTOR_SERVICE, null);
 
-    DataTable instanceResponse1;
+    InstanceResponseBlock instanceResponse1;
     try {
-      instanceResponse1 =
-          queryContext.isExplain() ? 
ServerQueryExecutorV1Impl.processExplainPlanQueries(plan1) : plan1.execute();
+      instanceResponse1 = queryContext.isExplain() ? 
ServerQueryExecutorV1Impl.executeExplainQuery(plan1, queryContext)
+          : plan1.execute();
     } catch (TimeoutException e) {
       throw new RuntimeException(e);
     }
-    DataTable instanceResponse2;
+    InstanceResponseBlock instanceResponse2;
     try {
-      instanceResponse2 =
-          queryContext.isExplain() ? 
ServerQueryExecutorV1Impl.processExplainPlanQueries(plan2) : plan2.execute();
+      instanceResponse2 = queryContext.isExplain() ? 
ServerQueryExecutorV1Impl.executeExplainQuery(plan2, queryContext)
+          : plan2.execute();
     } catch (TimeoutException e) {
       throw new RuntimeException(e);
     }
@@ -292,8 +293,8 @@ public abstract class BaseQueriesTest {
     Map<ServerRoutingInstance, DataTable> dataTableMap = new HashMap<>();
     try {
       // For multi-threaded BrokerReduceService, we cannot reuse the same 
data-table
-      byte[] serializedResponse1 = instanceResponse1.toBytes();
-      byte[] serializedResponse2 = instanceResponse2.toBytes();
+      byte[] serializedResponse1 = instanceResponse1.toDataTable().toBytes();
+      byte[] serializedResponse2 = instanceResponse2.toDataTable().toBytes();
       dataTableMap.put(new ServerRoutingInstance("localhost", 1234, 
TableType.OFFLINE),
           DataTableFactory.getDataTable(serializedResponse1));
       dataTableMap.put(new ServerRoutingInstance("localhost", 1234, 
TableType.REALTIME),
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/ExplainPlanQueriesTest.java 
b/pinot-core/src/test/java/org/apache/pinot/queries/ExplainPlanQueriesTest.java
index ecd0745ec9..b4f8aafa1e 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/queries/ExplainPlanQueriesTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/queries/ExplainPlanQueriesTest.java
@@ -44,6 +44,7 @@ import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.core.common.ExplainPlanRows;
 import org.apache.pinot.core.data.manager.InstanceDataManager;
 import org.apache.pinot.core.data.manager.offline.TableDataManagerProvider;
+import org.apache.pinot.core.operator.blocks.InstanceResponseBlock;
 import org.apache.pinot.core.query.executor.QueryExecutor;
 import org.apache.pinot.core.query.executor.ServerQueryExecutorV1Impl;
 import org.apache.pinot.core.query.reduce.BrokerReduceService;
@@ -316,11 +317,11 @@ public class ExplainPlanQueriesTest extends 
BaseQueriesTest {
 
     InstanceRequest instanceRequest1 = new InstanceRequest(0L, brokerRequest);
     instanceRequest1.setSearchSegments(indexSegmentsForServer1);
-    DataTable instanceResponse1 = 
_queryExecutor.processQuery(getQueryRequest(instanceRequest1), QUERY_RUNNERS);
+    InstanceResponseBlock instanceResponse1 = 
_queryExecutor.execute(getQueryRequest(instanceRequest1), QUERY_RUNNERS);
 
     InstanceRequest instanceRequest2 = new InstanceRequest(0L, brokerRequest);
     instanceRequest2.setSearchSegments(indexSegmentsForServer2);
-    DataTable instanceResponse2 = 
_queryExecutor.processQuery(getQueryRequest(instanceRequest2), QUERY_RUNNERS);
+    InstanceResponseBlock instanceResponse2 = 
_queryExecutor.execute(getQueryRequest(instanceRequest2), QUERY_RUNNERS);
 
     // Broker side
     // Use 2 Threads for 2 data-tables
@@ -329,10 +330,10 @@ public class ExplainPlanQueriesTest extends 
BaseQueriesTest {
     Map<ServerRoutingInstance, DataTable> dataTableMap = new HashMap<>();
     try {
       // For multi-threaded BrokerReduceService, we cannot reuse the same 
data-table
-      byte[] serializedResponse1 = instanceResponse1.toBytes();
+      byte[] serializedResponse1 = instanceResponse1.toDataTable().toBytes();
       dataTableMap.put(new ServerRoutingInstance("localhost", 1234, 
TableType.OFFLINE),
           DataTableFactory.getDataTable(serializedResponse1));
-      byte[] serializedResponse2 = instanceResponse2.toBytes();
+      byte[] serializedResponse2 = instanceResponse2.toDataTable().toBytes();
       dataTableMap.put(new ServerRoutingInstance("localhost", 1234, 
TableType.REALTIME),
           DataTableFactory.getDataTable(serializedResponse2));
     } catch (Exception e) {
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/SegmentWithNullValueVectorTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/queries/SegmentWithNullValueVectorTest.java
index 682cfa5716..807d27a1ad 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/queries/SegmentWithNullValueVectorTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/queries/SegmentWithNullValueVectorTest.java
@@ -35,11 +35,12 @@ import 
org.apache.commons.configuration.PropertiesConfiguration;
 import org.apache.commons.io.FileUtils;
 import org.apache.helix.HelixManager;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
-import org.apache.pinot.common.datatable.DataTable;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.request.InstanceRequest;
 import org.apache.pinot.core.data.manager.InstanceDataManager;
 import org.apache.pinot.core.data.manager.offline.TableDataManagerProvider;
+import org.apache.pinot.core.operator.blocks.InstanceResponseBlock;
+import org.apache.pinot.core.operator.blocks.results.AggregationResultsBlock;
 import org.apache.pinot.core.query.executor.QueryExecutor;
 import org.apache.pinot.core.query.executor.ServerQueryExecutorV1Impl;
 import org.apache.pinot.core.query.request.ServerQueryRequest;
@@ -74,6 +75,8 @@ import org.testng.annotations.Test;
 import static 
org.apache.pinot.segment.local.segment.index.creator.RawIndexCreatorTest.getRandomValue;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
 
 
 /**
@@ -107,8 +110,8 @@ public class SegmentWithNullValueVectorTest {
   private static final String TABLE_NAME = "testTable";
   private static final String QUERY_EXECUTOR_CONFIG_PATH = 
"conf/query-executor.properties";
   private static final ExecutorService QUERY_RUNNERS = 
Executors.newFixedThreadPool(20);
-  private int _nullIntKeyCount = 0;
-  private int _longKeyCount = 0;
+  private long _nullIntKeyCount = 0;
+  private long _longKeyCount = 0;
 
   /**
    * Setup to build a segment with raw indexes (no-dictionary) of various data 
types.
@@ -251,7 +254,7 @@ public class SegmentWithNullValueVectorTest {
     for (int i = 0; i < NUM_ROWS; i++) {
       for (FieldSpec fieldSpec : _schema.getAllFieldSpecs()) {
         String colName = fieldSpec.getName();
-        Assert.assertEquals(_actualNullVectorMap.get(colName)[i], 
nullValueVectorReaderMap.get(colName).isNull(i));
+        assertEquals(_actualNullVectorMap.get(colName)[i], 
nullValueVectorReaderMap.get(colName).isNull(i));
       }
     }
   }
@@ -261,8 +264,10 @@ public class SegmentWithNullValueVectorTest {
     String query = "SELECT COUNT(*) FROM " + TABLE_NAME + " where " + 
INT_COLUMN + " IS NOT NULL";
     InstanceRequest instanceRequest = new InstanceRequest(0L, 
CalciteSqlCompiler.compileToBrokerRequest(query));
     instanceRequest.setSearchSegments(_segmentNames);
-    DataTable instanceResponse = 
_queryExecutor.processQuery(getQueryRequest(instanceRequest), QUERY_RUNNERS);
-    Assert.assertEquals(instanceResponse.getLong(0, 0), NUM_ROWS - 
_nullIntKeyCount);
+    InstanceResponseBlock instanceResponse = 
_queryExecutor.execute(getQueryRequest(instanceRequest), QUERY_RUNNERS);
+    assertTrue(instanceResponse.getResultsBlock() instanceof 
AggregationResultsBlock);
+    assertEquals(((AggregationResultsBlock) 
instanceResponse.getResultsBlock()).getResults().get(0),
+        NUM_ROWS - _nullIntKeyCount);
   }
 
   @Test
@@ -270,8 +275,9 @@ public class SegmentWithNullValueVectorTest {
     String query = "SELECT COUNT(*) FROM " + TABLE_NAME + " where " + 
INT_COLUMN + " IS NULL";
     InstanceRequest instanceRequest = new InstanceRequest(0L, 
CalciteSqlCompiler.compileToBrokerRequest(query));
     instanceRequest.setSearchSegments(_segmentNames);
-    DataTable instanceResponse = 
_queryExecutor.processQuery(getQueryRequest(instanceRequest), QUERY_RUNNERS);
-    Assert.assertEquals(instanceResponse.getLong(0, 0), _nullIntKeyCount);
+    InstanceResponseBlock instanceResponse = 
_queryExecutor.execute(getQueryRequest(instanceRequest), QUERY_RUNNERS);
+    assertTrue(instanceResponse.getResultsBlock() instanceof 
AggregationResultsBlock);
+    assertEquals(((AggregationResultsBlock) 
instanceResponse.getResultsBlock()).getResults().get(0), _nullIntKeyCount);
   }
 
   @Test
@@ -281,8 +287,9 @@ public class SegmentWithNullValueVectorTest {
             + LONG_VALUE_THRESHOLD;
     InstanceRequest instanceRequest = new InstanceRequest(0L, 
CalciteSqlCompiler.compileToBrokerRequest(query));
     instanceRequest.setSearchSegments(_segmentNames);
-    DataTable instanceResponse = 
_queryExecutor.processQuery(getQueryRequest(instanceRequest), QUERY_RUNNERS);
-    Assert.assertEquals(instanceResponse.getLong(0, 0), _longKeyCount);
+    InstanceResponseBlock instanceResponse = 
_queryExecutor.execute(getQueryRequest(instanceRequest), QUERY_RUNNERS);
+    assertTrue(instanceResponse.getResultsBlock() instanceof 
AggregationResultsBlock);
+    assertEquals(((AggregationResultsBlock) 
instanceResponse.getResultsBlock()).getResults().get(0), _longKeyCount);
   }
 
   private ServerQueryRequest getQueryRequest(InstanceRequest instanceRequest) {
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
index c26ea3bffe..a19658c586 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
@@ -37,6 +37,7 @@ import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.core.common.Operator;
 import org.apache.pinot.core.data.manager.InstanceDataManager;
 import org.apache.pinot.core.operator.BaseOperator;
+import org.apache.pinot.core.operator.blocks.InstanceResponseBlock;
 import org.apache.pinot.core.query.executor.ServerQueryExecutorV1Impl;
 import org.apache.pinot.core.query.request.ServerQueryRequest;
 import org.apache.pinot.core.transport.ServerInstance;
@@ -139,18 +140,17 @@ public class QueryRunner {
     }
   }
 
-  private BaseDataBlock processServerQuery(ServerQueryRequest 
serverQueryRequest,
-      ExecutorService executorService) {
+  private BaseDataBlock processServerQuery(ServerQueryRequest 
serverQueryRequest, ExecutorService executorService) {
     BaseDataBlock dataBlock;
     try {
-      DataTable dataTable = _serverExecutor.processQuery(serverQueryRequest, 
executorService, null);
-      if (!dataTable.getExceptions().isEmpty()) {
+      InstanceResponseBlock instanceResponse = 
_serverExecutor.execute(serverQueryRequest, executorService);
+      if (!instanceResponse.getExceptions().isEmpty()) {
         // if contains exception, directly return a metadata block with the 
exceptions.
-        dataBlock = 
DataBlockUtils.getErrorDataBlock(dataTable.getExceptions());
+        dataBlock = 
DataBlockUtils.getErrorDataBlock(instanceResponse.getExceptions());
       } else {
         // this works because default DataTableImplV3 will have a version 
number at beginning:
         // the new DataBlock encodes lower 16 bits as version and upper 16 
bits as type (ROW, COLUMNAR, METADATA)
-        dataBlock = 
DataBlockUtils.getDataBlock(ByteBuffer.wrap(dataTable.toBytes()));
+        dataBlock = 
DataBlockUtils.getDataBlock(ByteBuffer.wrap(instanceResponse.toDataTable().toBytes()));
       }
     } catch (Exception e) {
       dataBlock = DataBlockUtils.getErrorDataBlock(e);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org


Reply via email to