This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new b91fa406bc [Refactor] Return InstanceResponseBlock in QueryExecutor (#9561) b91fa406bc is described below commit b91fa406bcf7db3a88012614e549cd3f2ddaef3d Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com> AuthorDate: Mon Oct 10 16:55:23 2022 -0700 [Refactor] Return InstanceResponseBlock in QueryExecutor (#9561) --- .../pinot/common/exception/QueryException.java | 5 + .../common/datatable/DataTableBuilderFactory.java | 20 ++ .../common/datatable/DataTableBuilderUtils.java | 195 ---------------- .../core/operator/InstanceResponseOperator.java | 19 +- .../operator/blocks/InstanceResponseBlock.java | 118 +++++++++- .../blocks/results/AggregationResultsBlock.java | 31 ++- .../operator/blocks/results/BaseResultsBlock.java | 35 ++- .../blocks/results/DistinctResultsBlock.java | 25 +- .../blocks/results/ExceptionResultsBlock.java | 24 +- .../blocks/results/ExplainResultsBlock.java | 93 ++++++++ .../blocks/results/GroupByResultsBlock.java | 51 ++++- .../blocks/results/MetadataResultsBlock.java | 24 +- .../operator/blocks/results/ResultsBlockUtils.java | 119 ++++++++++ .../blocks/results/SelectionResultsBlock.java | 18 +- .../StreamingInstanceResponseOperator.java | 21 +- .../apache/pinot/core/plan/GlobalPlanImplV0.java | 5 +- .../main/java/org/apache/pinot/core/plan/Plan.java | 4 +- .../plan/StreamingInstanceResponsePlanNode.java | 2 +- .../pinot/core/query/executor/QueryExecutor.java | 47 +++- .../query/executor/ServerQueryExecutorV1Impl.java | 251 +++++++++------------ .../pinot/core/query/scheduler/QueryScheduler.java | 71 +++--- .../query/selection/SelectionOperatorUtils.java | 5 +- .../core/transport/InstanceRequestHandler.java | 20 +- .../pinot/core/transport/grpc/GrpcQueryServer.java | 17 +- .../core/common/datatable/DataTableSerDeTest.java | 2 +- .../blocks/results/ResultsBlockUtilsTest.java} | 31 +-- .../executor/QueryExecutorExceptionsTest.java | 4 +- .../core/query/executor/QueryExecutorTest.java | 27 ++- .../query/scheduler/PrioritySchedulerTest.java | 10 +- .../pinot/core/transport/QueryRoutingTest.java | 8 +- .../org/apache/pinot/queries/BaseQueriesTest.java | 23 +- .../pinot/queries/ExplainPlanQueriesTest.java | 9 +- .../queries/SegmentWithNullValueVectorTest.java | 27 ++- .../apache/pinot/query/runtime/QueryRunner.java | 12 +- 34 files changed, 812 insertions(+), 561 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/exception/QueryException.java b/pinot-common/src/main/java/org/apache/pinot/common/exception/QueryException.java index e09a2ff238..0009cee884 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/exception/QueryException.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/exception/QueryException.java @@ -25,6 +25,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.pinot.common.response.ProcessingException; +// TODO: Clean up ProcessingException (thrift) because we don't send it through the wire public class QueryException { private QueryException() { } @@ -61,6 +62,7 @@ public class QueryException { public static final int SERVER_SEGMENT_MISSING_ERROR_CODE = 235; public static final int QUERY_SCHEDULING_TIMEOUT_ERROR_CODE = 240; public static final int EXECUTION_TIMEOUT_ERROR_CODE = 250; + public static final int DATA_TABLE_SERIALIZATION_ERROR_CODE = 260; public static final int BROKER_GATHER_ERROR_CODE = 300; public static final int BROKER_SEGMENT_UNAVAILABLE_ERROR_CODE = 305; public static final int DATA_TABLE_DESERIALIZATION_ERROR_CODE = 310; @@ -105,6 +107,8 @@ public class QueryException { new ProcessingException(QUERY_SCHEDULING_TIMEOUT_ERROR_CODE); public static final ProcessingException EXECUTION_TIMEOUT_ERROR = new ProcessingException(EXECUTION_TIMEOUT_ERROR_CODE); + public static final ProcessingException DATA_TABLE_SERIALIZATION_ERROR = + new ProcessingException(DATA_TABLE_SERIALIZATION_ERROR_CODE); public static final ProcessingException BROKER_GATHER_ERROR = new ProcessingException(BROKER_GATHER_ERROR_CODE); public static final ProcessingException DATA_TABLE_DESERIALIZATION_ERROR = new ProcessingException(DATA_TABLE_DESERIALIZATION_ERROR_CODE); @@ -142,6 +146,7 @@ public class QueryException { SERVER_SEGMENT_MISSING_ERROR.setMessage("ServerSegmentMissing"); QUERY_SCHEDULING_TIMEOUT_ERROR.setMessage("QuerySchedulingTimeoutError"); EXECUTION_TIMEOUT_ERROR.setMessage("ExecutionTimeoutError"); + DATA_TABLE_DESERIALIZATION_ERROR.setMessage("DataTableSerializationError"); BROKER_GATHER_ERROR.setMessage("BrokerGatherError"); DATA_TABLE_DESERIALIZATION_ERROR.setMessage("DataTableDeserializationError"); FUTURE_CALL_ERROR.setMessage("FutureCallError"); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableBuilderFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableBuilderFactory.java index 5d9e616974..2bf0426a78 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableBuilderFactory.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableBuilderFactory.java @@ -18,7 +18,11 @@ */ package org.apache.pinot.core.common.datatable; +import org.apache.pinot.common.datatable.DataTable; import org.apache.pinot.common.datatable.DataTableFactory; +import org.apache.pinot.common.datatable.DataTableImplV2; +import org.apache.pinot.common.datatable.DataTableImplV3; +import org.apache.pinot.common.datatable.DataTableImplV4; import org.apache.pinot.common.utils.DataSchema; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -58,4 +62,20 @@ public class DataTableBuilderFactory { throw new IllegalStateException("Unsupported data table version: " + _version); } } + + /** + * Returns an empty data table without data. + */ + public static DataTable getEmptyDataTable() { + switch (_version) { + case DataTableFactory.VERSION_2: + return new DataTableImplV2(); + case DataTableFactory.VERSION_3: + return new DataTableImplV3(); + case DataTableFactory.VERSION_4: + return new DataTableImplV4(); + default: + throw new IllegalStateException("Unsupported data table version: " + _version); + } + } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableBuilderUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableBuilderUtils.java deleted file mode 100644 index bc631af2fe..0000000000 --- a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableBuilderUtils.java +++ /dev/null @@ -1,195 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.core.common.datatable; - -import java.io.IOException; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import org.apache.pinot.common.datatable.DataTable; -import org.apache.pinot.common.datatable.DataTableFactory; -import org.apache.pinot.common.datatable.DataTableImplV2; -import org.apache.pinot.common.datatable.DataTableImplV3; -import org.apache.pinot.common.datatable.DataTableImplV4; -import org.apache.pinot.common.request.context.ExpressionContext; -import org.apache.pinot.common.utils.DataSchema; -import org.apache.pinot.common.utils.DataSchema.ColumnDataType; -import org.apache.pinot.core.query.aggregation.function.AggregationFunction; -import org.apache.pinot.core.query.aggregation.function.DistinctAggregationFunction; -import org.apache.pinot.core.query.distinct.DistinctTable; -import org.apache.pinot.core.query.request.context.QueryContext; -import org.apache.pinot.core.query.request.context.utils.QueryContextUtils; - - -/** - * The <code>DataTableUtils</code> class provides utility methods for data table. - */ -@SuppressWarnings("rawtypes") -public class DataTableBuilderUtils { - private DataTableBuilderUtils() { - } - - /** - * Returns an empty data table without data. - */ - public static DataTable getEmptyDataTable() { - int version = DataTableBuilderFactory.getDataTableVersion(); - switch (version) { - case DataTableFactory.VERSION_2: - return new DataTableImplV2(); - case DataTableFactory.VERSION_3: - return new DataTableImplV3(); - case DataTableFactory.VERSION_4: - return new DataTableImplV4(); - default: - throw new IllegalStateException("Unsupported data table version: " + version); - } - } - - /** - * Builds an empty data table based on the broker request. - */ - public static DataTable buildEmptyDataTable(QueryContext queryContext) - throws IOException { - if (QueryContextUtils.isSelectionQuery(queryContext)) { - return buildEmptyDataTableForSelectionQuery(queryContext); - } else if (QueryContextUtils.isAggregationQuery(queryContext)) { - return buildEmptyDataTableForAggregationQuery(queryContext); - } else { - assert QueryContextUtils.isDistinctQuery(queryContext); - return buildEmptyDataTableForDistinctQuery(queryContext); - } - } - - /** - * Helper method to build an empty data table for selection query. - */ - private static DataTable buildEmptyDataTableForSelectionQuery(QueryContext queryContext) { - List<ExpressionContext> selectExpressions = queryContext.getSelectExpressions(); - int numSelectExpressions = selectExpressions.size(); - String[] columnNames = new String[numSelectExpressions]; - for (int i = 0; i < numSelectExpressions; i++) { - columnNames[i] = selectExpressions.get(i).toString(); - } - ColumnDataType[] columnDataTypes = new ColumnDataType[numSelectExpressions]; - // NOTE: Use STRING column data type as default for selection query - Arrays.fill(columnDataTypes, ColumnDataType.STRING); - DataSchema dataSchema = new DataSchema(columnNames, columnDataTypes); - return DataTableBuilderFactory.getDataTableBuilder(dataSchema).build(); - } - - /** - * Helper method to build an empty data table for aggregation query. - */ - private static DataTable buildEmptyDataTableForAggregationQuery(QueryContext queryContext) - throws IOException { - AggregationFunction[] aggregationFunctions = queryContext.getAggregationFunctions(); - assert aggregationFunctions != null; - int numAggregations = aggregationFunctions.length; - List<ExpressionContext> groupByExpressions = queryContext.getGroupByExpressions(); - if (groupByExpressions != null) { - // Aggregation group-by query - - int numColumns = groupByExpressions.size() + numAggregations; - String[] columnNames = new String[numColumns]; - ColumnDataType[] columnDataTypes = new ColumnDataType[numColumns]; - int index = 0; - for (ExpressionContext groupByExpression : groupByExpressions) { - columnNames[index] = groupByExpression.toString(); - // Use STRING column data type as default for group-by expressions - columnDataTypes[index] = ColumnDataType.STRING; - index++; - } - for (AggregationFunction aggregationFunction : aggregationFunctions) { - // NOTE: Use AggregationFunction.getResultColumnName() for SQL format response - columnNames[index] = aggregationFunction.getResultColumnName(); - columnDataTypes[index] = aggregationFunction.getIntermediateResultColumnType(); - index++; - } - return DataTableBuilderFactory.getDataTableBuilder(new DataSchema(columnNames, columnDataTypes)).build(); - } else { - // Aggregation only query - - String[] aggregationColumnNames = new String[numAggregations]; - ColumnDataType[] columnDataTypes = new ColumnDataType[numAggregations]; - Object[] aggregationResults = new Object[numAggregations]; - for (int i = 0; i < numAggregations; i++) { - AggregationFunction aggregationFunction = aggregationFunctions[i]; - // NOTE: For backward-compatibility, use AggregationFunction.getColumnName() for aggregation only query - aggregationColumnNames[i] = aggregationFunction.getColumnName(); - columnDataTypes[i] = aggregationFunction.getIntermediateResultColumnType(); - aggregationResults[i] = - aggregationFunction.extractAggregationResult(aggregationFunction.createAggregationResultHolder()); - } - - // Build the data table - DataTableBuilder dataTableBuilder = - DataTableBuilderFactory.getDataTableBuilder(new DataSchema(aggregationColumnNames, columnDataTypes)); - dataTableBuilder.startRow(); - for (int i = 0; i < numAggregations; i++) { - switch (columnDataTypes[i]) { - case LONG: - dataTableBuilder.setColumn(i, ((Number) aggregationResults[i]).longValue()); - break; - case DOUBLE: - dataTableBuilder.setColumn(i, ((Double) aggregationResults[i]).doubleValue()); - break; - case OBJECT: - dataTableBuilder.setColumn(i, aggregationResults[i]); - break; - default: - throw new UnsupportedOperationException( - "Unsupported aggregation column data type: " + columnDataTypes[i] + " for column: " - + aggregationColumnNames[i]); - } - } - dataTableBuilder.finishRow(); - return dataTableBuilder.build(); - } - } - - /** - * Helper method to build an empty data table for distinct query. - */ - private static DataTable buildEmptyDataTableForDistinctQuery(QueryContext queryContext) - throws IOException { - AggregationFunction[] aggregationFunctions = queryContext.getAggregationFunctions(); - assert aggregationFunctions != null && aggregationFunctions.length == 1 - && aggregationFunctions[0] instanceof DistinctAggregationFunction; - DistinctAggregationFunction distinctAggregationFunction = (DistinctAggregationFunction) aggregationFunctions[0]; - - // Create the distinct table - String[] columnNames = distinctAggregationFunction.getColumns(); - ColumnDataType[] columnDataTypes = new ColumnDataType[columnNames.length]; - // NOTE: Use STRING column data type as default for distinct query - Arrays.fill(columnDataTypes, ColumnDataType.STRING); - DistinctTable distinctTable = - new DistinctTable(new DataSchema(columnNames, columnDataTypes), Collections.emptySet(), - queryContext.isNullHandlingEnabled()); - - // Build the data table - DataTableBuilder dataTableBuilder = DataTableBuilderFactory.getDataTableBuilder( - new DataSchema(new String[]{distinctAggregationFunction.getColumnName()}, - new ColumnDataType[]{ColumnDataType.OBJECT})); - dataTableBuilder.startRow(); - dataTableBuilder.setColumn(0, distinctTable); - dataTableBuilder.finishRow(); - return dataTableBuilder.build(); - } -} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/InstanceResponseOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/InstanceResponseOperator.java index 6bfdb3717e..a1b317d66c 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/InstanceResponseOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/InstanceResponseOperator.java @@ -20,8 +20,6 @@ package org.apache.pinot.core.operator; import java.util.Collections; import java.util.List; -import java.util.Map; -import org.apache.pinot.common.datatable.DataTable; import org.apache.pinot.common.datatable.DataTable.MetadataKey; import org.apache.pinot.common.request.context.ThreadTimer; import org.apache.pinot.core.common.Operator; @@ -83,7 +81,7 @@ public class InstanceResponseOperator extends BaseOperator<InstanceResponseBlock ThreadTimer mainThreadTimer = new ThreadTimer(); BaseResultsBlock resultsBlock = getCombinedResults(); - InstanceResponseBlock instanceResponseBlock = new InstanceResponseBlock(getDataTable(resultsBlock)); + InstanceResponseBlock instanceResponseBlock = new InstanceResponseBlock(resultsBlock, _queryContext); long mainThreadCpuTimeNs = mainThreadTimer.getThreadTimeNs(); long totalWallClockTimeNs = System.nanoTime() - startWallClockTimeNs; @@ -99,14 +97,13 @@ public class InstanceResponseOperator extends BaseOperator<InstanceResponseBlock numServerThreads); long threadCpuTimeNs = mainThreadCpuTimeNs + multipleThreadCpuTimeNs; - Map<String, String> responseMetaData = instanceResponseBlock.getInstanceResponseDataTable().getMetadata(); - responseMetaData.put(MetadataKey.THREAD_CPU_TIME_NS.getName(), String.valueOf(threadCpuTimeNs)); - responseMetaData.put(MetadataKey.SYSTEM_ACTIVITIES_CPU_TIME_NS.getName(), + instanceResponseBlock.addMetadata(MetadataKey.THREAD_CPU_TIME_NS.getName(), String.valueOf(threadCpuTimeNs)); + instanceResponseBlock.addMetadata(MetadataKey.SYSTEM_ACTIVITIES_CPU_TIME_NS.getName(), String.valueOf(systemActivitiesCpuTimeNs)); return instanceResponseBlock; } else { - return new InstanceResponseBlock(getDataTable(getCombinedResults())); + return new InstanceResponseBlock(getCombinedResults(), _queryContext); } } @@ -119,14 +116,6 @@ public class InstanceResponseOperator extends BaseOperator<InstanceResponseBlock } } - private DataTable getDataTable(BaseResultsBlock resultsBlock) { - try { - return resultsBlock.getDataTable(_queryContext); - } catch (Exception e) { - throw new RuntimeException("Caught exception while building data table", e); - } - } - private void prefetchAll() { for (int i = 0; i < _fetchContextSize; i++) { _indexSegments.get(i).prefetch(_fetchContexts.get(i)); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/InstanceResponseBlock.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/InstanceResponseBlock.java index 45170ff0d2..4708d5cf83 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/InstanceResponseBlock.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/InstanceResponseBlock.java @@ -18,26 +18,132 @@ */ package org.apache.pinot.core.operator.blocks; +import java.io.IOException; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import javax.annotation.Nullable; import org.apache.pinot.common.datatable.DataTable; +import org.apache.pinot.common.response.ProcessingException; +import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.core.common.Block; import org.apache.pinot.core.common.BlockDocIdSet; import org.apache.pinot.core.common.BlockDocIdValueSet; import org.apache.pinot.core.common.BlockMetadata; import org.apache.pinot.core.common.BlockValSet; +import org.apache.pinot.core.common.datatable.DataTableBuilderFactory; +import org.apache.pinot.core.operator.blocks.results.BaseResultsBlock; +import org.apache.pinot.core.query.request.context.QueryContext; /** - * InstanceResponseBlock is just a holder to get InstanceResponse from InstanceResponseBlock. + * The {@code InstanceResponseBlock} is the holder of the server side results. */ public class InstanceResponseBlock implements Block { - private final DataTable _instanceResponseDataTable; + private final BaseResultsBlock _resultsBlock; + private final QueryContext _queryContext; + private final Map<Integer, String> _exceptions; + private final Map<String, String> _metadata; - public InstanceResponseBlock(DataTable dataTable) { - _instanceResponseDataTable = dataTable; + public InstanceResponseBlock(BaseResultsBlock resultsBlock, QueryContext queryContext) { + _resultsBlock = resultsBlock; + _queryContext = queryContext; + _exceptions = new HashMap<>(); + List<ProcessingException> processingExceptions = resultsBlock.getProcessingExceptions(); + if (processingExceptions != null) { + for (ProcessingException processingException : processingExceptions) { + _exceptions.put(processingException.getErrorCode(), processingException.getMessage()); + } + } + _metadata = resultsBlock.getResultsMetadata(); } - public DataTable getInstanceResponseDataTable() { - return _instanceResponseDataTable; + /** + * Metadata only instance response. + */ + public InstanceResponseBlock() { + _resultsBlock = null; + _queryContext = null; + _exceptions = new HashMap<>(); + _metadata = new HashMap<>(); + } + + private InstanceResponseBlock(Map<Integer, String> exceptions, Map<String, String> metadata) { + _resultsBlock = null; + _queryContext = null; + _exceptions = exceptions; + _metadata = metadata; + } + + public InstanceResponseBlock toMetadataOnlyResponseBlock() { + return new InstanceResponseBlock(_exceptions, _metadata); + } + + public void addException(ProcessingException processingException) { + _exceptions.put(processingException.getErrorCode(), processingException.getMessage()); + } + + public void addException(int errorCode, String exceptionMessage) { + _exceptions.put(errorCode, exceptionMessage); + } + + public void addMetadata(String key, String value) { + _metadata.put(key, value); + } + + @Nullable + public BaseResultsBlock getResultsBlock() { + return _resultsBlock; + } + + @Nullable + public QueryContext getQueryContext() { + return _queryContext; + } + + public Map<Integer, String> getExceptions() { + return _exceptions; + } + + public Map<String, String> getResponseMetadata() { + return _metadata; + } + + @Nullable + public DataSchema getDataSchema() { + return _resultsBlock != null ? _resultsBlock.getDataSchema(_queryContext) : null; + } + + @Nullable + public Collection<Object[]> getRows() { + return _resultsBlock != null ? _resultsBlock.getRows(_queryContext) : null; + } + + public DataTable toDataTable() + throws IOException { + DataTable dataTable = toDataOnlyDataTable(); + attachMetadata(dataTable); + return dataTable; + } + + public DataTable toDataOnlyDataTable() + throws IOException { + return _resultsBlock != null ? _resultsBlock.getDataTable(_queryContext) + : DataTableBuilderFactory.getEmptyDataTable(); + } + + public DataTable toMetadataOnlyDataTable() { + DataTable dataTable = DataTableBuilderFactory.getEmptyDataTable(); + attachMetadata(dataTable); + return dataTable; + } + + private void attachMetadata(DataTable dataTable) { + for (Map.Entry<Integer, String> entry : _exceptions.entrySet()) { + dataTable.addException(entry.getKey(), entry.getValue()); + } + dataTable.getMetadata().putAll(_metadata); } @Override diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/AggregationResultsBlock.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/AggregationResultsBlock.java index 10c641713e..3724b645e3 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/AggregationResultsBlock.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/AggregationResultsBlock.java @@ -21,6 +21,8 @@ package org.apache.pinot.core.operator.blocks.results; import it.unimi.dsi.fastutil.doubles.DoubleArrayList; import java.io.IOException; import java.math.BigDecimal; +import java.util.Collection; +import java.util.Collections; import java.util.List; import org.apache.pinot.common.datatable.DataTable; import org.apache.pinot.common.utils.DataSchema; @@ -55,11 +57,8 @@ public class AggregationResultsBlock extends BaseResultsBlock { } @Override - public DataTable getDataTable(QueryContext queryContext) - throws Exception { + public DataSchema getDataSchema(QueryContext queryContext) { boolean returnFinalResult = queryContext.isServerReturnFinalResult(); - - // Extract result column name and type from each aggregation function int numColumns = _aggregationFunctions.length; String[] columnNames = new String[numColumns]; ColumnDataType[] columnDataTypes = new ColumnDataType[numColumns]; @@ -69,10 +68,23 @@ public class AggregationResultsBlock extends BaseResultsBlock { columnDataTypes[i] = returnFinalResult ? aggregationFunction.getFinalResultColumnType() : aggregationFunction.getIntermediateResultColumnType(); } + return new DataSchema(columnNames, columnDataTypes); + } + + @Override + public Collection<Object[]> getRows(QueryContext queryContext) { + return Collections.singletonList(_results.toArray()); + } - // Build the data table. - DataTableBuilder dataTableBuilder = - DataTableBuilderFactory.getDataTableBuilder(new DataSchema(columnNames, columnDataTypes)); + @Override + public DataTable getDataTable(QueryContext queryContext) + throws IOException { + boolean returnFinalResult = queryContext.isServerReturnFinalResult(); + DataSchema dataSchema = getDataSchema(queryContext); + assert dataSchema != null; + ColumnDataType[] columnDataTypes = dataSchema.getColumnDataTypes(); + int numColumns = columnDataTypes.length; + DataTableBuilder dataTableBuilder = DataTableBuilderFactory.getDataTableBuilder(dataSchema); if (queryContext.isNullHandlingEnabled()) { RoaringBitmap[] nullBitmaps = new RoaringBitmap[numColumns]; for (int i = 0; i < numColumns; i++) { @@ -108,10 +120,7 @@ public class AggregationResultsBlock extends BaseResultsBlock { } dataTableBuilder.finishRow(); } - - DataTable dataTable = dataTableBuilder.build(); - attachMetadataToDataTable(dataTable); - return dataTable; + return dataTableBuilder.build(); } private void setIntermediateResult(DataTableBuilder dataTableBuilder, ColumnDataType[] columnDataTypes, int index, diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/BaseResultsBlock.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/BaseResultsBlock.java index 24573e04c1..e48e50b89d 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/BaseResultsBlock.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/BaseResultsBlock.java @@ -19,14 +19,17 @@ package org.apache.pinot.core.operator.blocks.results; import com.google.common.annotations.VisibleForTesting; +import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; import java.util.List; import java.util.Map; import javax.annotation.Nullable; -import org.apache.commons.collections.CollectionUtils; import org.apache.pinot.common.datatable.DataTable; import org.apache.pinot.common.datatable.DataTable.MetadataKey; import org.apache.pinot.common.response.ProcessingException; +import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.core.common.Block; import org.apache.pinot.core.common.BlockDocIdSet; import org.apache.pinot.core.common.BlockDocIdValueSet; @@ -155,16 +158,29 @@ public abstract class BaseResultsBlock implements Block { _numServerThreads = numServerThreads; } + /** + * Returns the data schema for the results. Return {@code null} when the block only contains metadata. + */ + @Nullable + public abstract DataSchema getDataSchema(QueryContext queryContext); + + /** + * Returns the rows for the results. Return {@code null} when the block only contains metadata. + */ + @Nullable + public abstract Collection<Object[]> getRows(QueryContext queryContext); + + /** + * Returns a data table without metadata or exception attached. + */ public abstract DataTable getDataTable(QueryContext queryContext) - throws Exception; + throws IOException; - protected void attachMetadataToDataTable(DataTable dataTable) { - if (CollectionUtils.isNotEmpty(_processingExceptions)) { - for (ProcessingException exception : _processingExceptions) { - dataTable.addException(exception); - } - } - Map<String, String> metadata = dataTable.getMetadata(); + /** + * Returns the metadata for the results. + */ + public Map<String, String> getResultsMetadata() { + Map<String, String> metadata = new HashMap<>(); metadata.put(MetadataKey.TOTAL_DOCS.getName(), Long.toString(_numTotalDocs)); metadata.put(MetadataKey.NUM_DOCS_SCANNED.getName(), Long.toString(_numDocsScanned)); metadata.put(MetadataKey.NUM_ENTRIES_SCANNED_IN_FILTER.getName(), Long.toString(_numEntriesScannedInFilter)); @@ -174,6 +190,7 @@ public abstract class BaseResultsBlock implements Block { metadata.put(MetadataKey.NUM_CONSUMING_SEGMENTS_PROCESSED.getName(), Integer.toString(_numConsumingSegmentsProcessed)); metadata.put(MetadataKey.NUM_CONSUMING_SEGMENTS_MATCHED.getName(), Integer.toString(_numConsumingSegmentsMatched)); + return metadata; } @Override diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/DistinctResultsBlock.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/DistinctResultsBlock.java index b721671726..8db1dfc474 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/DistinctResultsBlock.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/DistinctResultsBlock.java @@ -18,11 +18,16 @@ */ package org.apache.pinot.core.operator.blocks.results; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; import org.apache.pinot.common.datatable.DataTable; import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.common.utils.DataSchema.ColumnDataType; import org.apache.pinot.core.common.datatable.DataTableBuilder; import org.apache.pinot.core.common.datatable.DataTableBuilderFactory; +import org.apache.pinot.core.data.table.Record; import org.apache.pinot.core.query.aggregation.function.DistinctAggregationFunction; import org.apache.pinot.core.query.distinct.DistinctTable; import org.apache.pinot.core.query.request.context.QueryContext; @@ -48,9 +53,23 @@ public class DistinctResultsBlock extends BaseResultsBlock { _distinctTable = distinctTable; } + @Override + public DataSchema getDataSchema(QueryContext queryContext) { + return _distinctTable.getDataSchema(); + } + + @Override + public Collection<Object[]> getRows(QueryContext queryContext) { + List<Object[]> rows = new ArrayList<>(_distinctTable.size()); + for (Record record : _distinctTable.getRecords()) { + rows.add(record.getValues()); + } + return rows; + } + @Override public DataTable getDataTable(QueryContext queryContext) - throws Exception { + throws IOException { String[] columnNames = new String[]{_distinctFunction.getColumnName()}; ColumnDataType[] columnDataTypes = new ColumnDataType[]{ColumnDataType.OBJECT}; DataTableBuilder dataTableBuilder = @@ -58,8 +77,6 @@ public class DistinctResultsBlock extends BaseResultsBlock { dataTableBuilder.startRow(); dataTableBuilder.setColumn(0, _distinctTable); dataTableBuilder.finishRow(); - DataTable dataTable = dataTableBuilder.build(); - attachMetadataToDataTable(dataTable); - return dataTable; + return dataTableBuilder.build(); } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/ExceptionResultsBlock.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/ExceptionResultsBlock.java index 23a6a2120d..d8df97ee7c 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/ExceptionResultsBlock.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/ExceptionResultsBlock.java @@ -18,10 +18,13 @@ */ package org.apache.pinot.core.operator.blocks.results; +import java.util.Collection; +import javax.annotation.Nullable; import org.apache.pinot.common.datatable.DataTable; import org.apache.pinot.common.exception.QueryException; import org.apache.pinot.common.response.ProcessingException; -import org.apache.pinot.core.common.datatable.DataTableBuilderUtils; +import org.apache.pinot.common.utils.DataSchema; +import org.apache.pinot.core.common.datatable.DataTableBuilderFactory; import org.apache.pinot.core.query.request.context.QueryContext; @@ -35,11 +38,20 @@ public class ExceptionResultsBlock extends BaseResultsBlock { this(QueryException.QUERY_EXECUTION_ERROR, e); } + @Nullable @Override - public DataTable getDataTable(QueryContext queryContext) - throws Exception { - DataTable dataTable = DataTableBuilderUtils.getEmptyDataTable(); - attachMetadataToDataTable(dataTable); - return dataTable; + public DataSchema getDataSchema(QueryContext queryContext) { + return null; + } + + @Nullable + @Override + public Collection<Object[]> getRows(QueryContext queryContext) { + return null; + } + + @Override + public DataTable getDataTable(QueryContext queryContext) { + return DataTableBuilderFactory.getEmptyDataTable(); } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/ExplainResultsBlock.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/ExplainResultsBlock.java new file mode 100644 index 0000000000..e229848591 --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/ExplainResultsBlock.java @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.operator.blocks.results; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.pinot.common.datatable.DataTable; +import org.apache.pinot.common.utils.DataSchema; +import org.apache.pinot.core.common.datatable.DataTableBuilder; +import org.apache.pinot.core.common.datatable.DataTableBuilderFactory; +import org.apache.pinot.core.query.request.context.QueryContext; + + +/** + * Results block for EXPLAIN queries. + */ +public class ExplainResultsBlock extends BaseResultsBlock { + private final List<ExplainEntry> _entries = new ArrayList<>(); + + public void addOperator(String operatorName, int operatorId, int parentId) { + _entries.add(new ExplainEntry(operatorName, operatorId, parentId)); + } + + @Override + public DataSchema getDataSchema(QueryContext queryContext) { + return DataSchema.EXPLAIN_RESULT_SCHEMA; + } + + @Override + public Collection<Object[]> getRows(QueryContext queryContext) { + List<Object[]> rows = new ArrayList<>(_entries.size()); + for (ExplainEntry entry : _entries) { + rows.add(entry.toRow()); + } + return rows; + } + + @Override + public DataTable getDataTable(QueryContext queryContext) + throws IOException { + DataTableBuilder dataTableBuilder = DataTableBuilderFactory.getDataTableBuilder(DataSchema.EXPLAIN_RESULT_SCHEMA); + for (ExplainEntry entry : _entries) { + dataTableBuilder.startRow(); + dataTableBuilder.setColumn(0, entry._operatorName); + dataTableBuilder.setColumn(1, entry._operatorId); + dataTableBuilder.setColumn(2, entry._parentId); + dataTableBuilder.finishRow(); + } + return dataTableBuilder.build(); + } + + @Override + public Map<String, String> getResultsMetadata() { + // Do not add metadata for EXPLAIN results + return new HashMap<>(); + } + + private static class ExplainEntry { + final String _operatorName; + final int _operatorId; + final int _parentId; + + ExplainEntry(String operatorName, int operatorId, int parentId) { + _operatorName = operatorName; + _operatorId = operatorId; + _parentId = parentId; + } + + Object[] toRow() { + return new Object[]{_operatorName, _parentId, _parentId}; + } + } +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/GroupByResultsBlock.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/GroupByResultsBlock.java index 3cd88b95cc..ffc5109f14 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/GroupByResultsBlock.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/GroupByResultsBlock.java @@ -18,13 +18,16 @@ */ package org.apache.pinot.core.operator.blocks.results; -import com.google.common.base.Preconditions; import it.unimi.dsi.fastutil.doubles.DoubleArrayList; import java.io.IOException; import java.math.BigDecimal; +import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.Iterator; +import java.util.List; import java.util.Map; +import javax.annotation.Nullable; import org.apache.pinot.common.datatable.DataTable; import org.apache.pinot.common.datatable.DataTable.MetadataKey; import org.apache.pinot.common.utils.DataSchema; @@ -83,6 +86,16 @@ public class GroupByResultsBlock extends BaseResultsBlock { _table = table; } + /** + * For instance level empty group-by results. + */ + public GroupByResultsBlock(DataSchema dataSchema) { + _dataSchema = dataSchema; + _aggregationGroupByResult = null; + _intermediateRecords = null; + _table = null; + } + public DataSchema getDataSchema() { return _dataSchema; } @@ -123,11 +136,33 @@ public class GroupByResultsBlock extends BaseResultsBlock { _resizeTimeMs = resizeTimeMs; } + @Nullable + @Override + public DataSchema getDataSchema(QueryContext queryContext) { + return _dataSchema; + } + + @Nullable + @Override + public Collection<Object[]> getRows(QueryContext queryContext) { + if (_table == null) { + return Collections.emptyList(); + } + List<Object[]> rows = new ArrayList<>(_table.size()); + Iterator<Record> iterator = _table.iterator(); + while (iterator.hasNext()) { + rows.add(iterator.next().getValues()); + } + return rows; + } + @Override public DataTable getDataTable(QueryContext queryContext) - throws Exception { - Preconditions.checkState(_table != null, "Cannot get DataTable from segment level results"); + throws IOException { DataTableBuilder dataTableBuilder = DataTableBuilderFactory.getDataTableBuilder(_dataSchema); + if (_table == null) { + return dataTableBuilder.build(); + } ColumnDataType[] storedColumnDataTypes = _dataSchema.getStoredColumnDataTypes(); int numColumns = _dataSchema.size(); Iterator<Record> iterator = _table.iterator(); @@ -166,9 +201,7 @@ public class GroupByResultsBlock extends BaseResultsBlock { dataTableBuilder.finishRow(); } } - DataTable dataTable = dataTableBuilder.build(); - attachMetadataToDataTable(dataTable); - return dataTable; + return dataTableBuilder.build(); } private void setDataTableColumn(ColumnDataType storedColumnDataType, DataTableBuilder dataTableBuilder, @@ -224,13 +257,13 @@ public class GroupByResultsBlock extends BaseResultsBlock { } @Override - protected void attachMetadataToDataTable(DataTable dataTable) { - super.attachMetadataToDataTable(dataTable); - Map<String, String> metadata = dataTable.getMetadata(); + public Map<String, String> getResultsMetadata() { + Map<String, String> metadata = super.getResultsMetadata(); if (_numGroupsLimitReached) { metadata.put(MetadataKey.NUM_GROUPS_LIMIT_REACHED.getName(), "true"); } metadata.put(MetadataKey.NUM_RESIZES.getName(), Integer.toString(_numResizes)); metadata.put(MetadataKey.RESIZE_TIME_MS.getName(), Long.toString(_resizeTimeMs)); + return metadata; } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/MetadataResultsBlock.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/MetadataResultsBlock.java index 7ffeb0643d..dcfddf17cf 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/MetadataResultsBlock.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/MetadataResultsBlock.java @@ -18,18 +18,30 @@ */ package org.apache.pinot.core.operator.blocks.results; +import java.util.Collection; +import javax.annotation.Nullable; import org.apache.pinot.common.datatable.DataTable; -import org.apache.pinot.core.common.datatable.DataTableBuilderUtils; +import org.apache.pinot.common.utils.DataSchema; +import org.apache.pinot.core.common.datatable.DataTableBuilderFactory; import org.apache.pinot.core.query.request.context.QueryContext; public class MetadataResultsBlock extends BaseResultsBlock { + @Nullable @Override - public DataTable getDataTable(QueryContext queryContext) - throws Exception { - DataTable dataTable = DataTableBuilderUtils.getEmptyDataTable(); - attachMetadataToDataTable(dataTable); - return dataTable; + public DataSchema getDataSchema(QueryContext queryContext) { + return null; + } + + @Nullable + @Override + public Collection<Object[]> getRows(QueryContext queryContext) { + return null; + } + + @Override + public DataTable getDataTable(QueryContext queryContext) { + return DataTableBuilderFactory.getEmptyDataTable(); } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/ResultsBlockUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/ResultsBlockUtils.java new file mode 100644 index 0000000000..5f5e7d0769 --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/ResultsBlockUtils.java @@ -0,0 +1,119 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.operator.blocks.results; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import org.apache.pinot.common.request.context.ExpressionContext; +import org.apache.pinot.common.utils.DataSchema; +import org.apache.pinot.common.utils.DataSchema.ColumnDataType; +import org.apache.pinot.core.query.aggregation.function.AggregationFunction; +import org.apache.pinot.core.query.aggregation.function.DistinctAggregationFunction; +import org.apache.pinot.core.query.distinct.DistinctTable; +import org.apache.pinot.core.query.request.context.QueryContext; +import org.apache.pinot.core.query.request.context.utils.QueryContextUtils; + + +@SuppressWarnings("rawtypes") +public class ResultsBlockUtils { + private ResultsBlockUtils() { + } + + public static BaseResultsBlock buildEmptyQueryResults(QueryContext queryContext) { + if (QueryContextUtils.isSelectionQuery(queryContext)) { + return buildEmptySelectionQueryResults(queryContext); + } else if (QueryContextUtils.isAggregationQuery(queryContext)) { + if (queryContext.getGroupByExpressions() == null) { + return buildEmptyAggregationQueryResults(queryContext); + } else { + return buildEmptyGroupByQueryResults(queryContext); + } + } else { + assert QueryContextUtils.isDistinctQuery(queryContext); + return buildEmptyDistinctQueryResults(queryContext); + } + } + + private static SelectionResultsBlock buildEmptySelectionQueryResults(QueryContext queryContext) { + List<ExpressionContext> selectExpressions = queryContext.getSelectExpressions(); + int numSelectExpressions = selectExpressions.size(); + String[] columnNames = new String[numSelectExpressions]; + for (int i = 0; i < numSelectExpressions; i++) { + columnNames[i] = selectExpressions.get(i).toString(); + } + ColumnDataType[] columnDataTypes = new ColumnDataType[numSelectExpressions]; + // NOTE: Use STRING column data type as default for selection query + Arrays.fill(columnDataTypes, ColumnDataType.STRING); + DataSchema dataSchema = new DataSchema(columnNames, columnDataTypes); + return new SelectionResultsBlock(dataSchema, Collections.emptyList()); + } + + private static AggregationResultsBlock buildEmptyAggregationQueryResults(QueryContext queryContext) { + AggregationFunction[] aggregationFunctions = queryContext.getAggregationFunctions(); + assert aggregationFunctions != null; + int numAggregations = aggregationFunctions.length; + List<Object> results = new ArrayList<>(numAggregations); + for (AggregationFunction aggregationFunction : aggregationFunctions) { + results.add(aggregationFunction.extractAggregationResult(aggregationFunction.createAggregationResultHolder())); + } + return new AggregationResultsBlock(aggregationFunctions, results); + } + + private static GroupByResultsBlock buildEmptyGroupByQueryResults(QueryContext queryContext) { + AggregationFunction[] aggregationFunctions = queryContext.getAggregationFunctions(); + assert aggregationFunctions != null; + int numAggregations = aggregationFunctions.length; + List<ExpressionContext> groupByExpressions = queryContext.getGroupByExpressions(); + assert groupByExpressions != null; + int numColumns = groupByExpressions.size() + numAggregations; + String[] columnNames = new String[numColumns]; + ColumnDataType[] columnDataTypes = new ColumnDataType[numColumns]; + int index = 0; + for (ExpressionContext groupByExpression : groupByExpressions) { + columnNames[index] = groupByExpression.toString(); + // Use STRING column data type as default for group-by expressions + columnDataTypes[index] = ColumnDataType.STRING; + index++; + } + for (AggregationFunction aggregationFunction : aggregationFunctions) { + // NOTE: Use AggregationFunction.getResultColumnName() for SQL format response + columnNames[index] = aggregationFunction.getResultColumnName(); + columnDataTypes[index] = aggregationFunction.getIntermediateResultColumnType(); + index++; + } + return new GroupByResultsBlock(new DataSchema(columnNames, columnDataTypes)); + } + + private static DistinctResultsBlock buildEmptyDistinctQueryResults(QueryContext queryContext) { + AggregationFunction[] aggregationFunctions = queryContext.getAggregationFunctions(); + assert aggregationFunctions != null && aggregationFunctions.length == 1 + && aggregationFunctions[0] instanceof DistinctAggregationFunction; + DistinctAggregationFunction distinctAggregationFunction = (DistinctAggregationFunction) aggregationFunctions[0]; + String[] columnNames = distinctAggregationFunction.getColumns(); + ColumnDataType[] columnDataTypes = new ColumnDataType[columnNames.length]; + // NOTE: Use STRING column data type as default for distinct query + Arrays.fill(columnDataTypes, ColumnDataType.STRING); + DistinctTable distinctTable = + new DistinctTable(new DataSchema(columnNames, columnDataTypes), Collections.emptySet(), + queryContext.isNullHandlingEnabled()); + return new DistinctResultsBlock(distinctAggregationFunction, distinctTable); + } +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/SelectionResultsBlock.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/SelectionResultsBlock.java index 4f714ae7be..2a7fd5847c 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/SelectionResultsBlock.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/SelectionResultsBlock.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.core.operator.blocks.results; +import java.io.IOException; import java.util.Collection; import org.apache.pinot.common.datatable.DataTable; import org.apache.pinot.common.utils.DataSchema; @@ -45,12 +46,19 @@ public class SelectionResultsBlock extends BaseResultsBlock { return _rows; } + @Override + public DataSchema getDataSchema(QueryContext queryContext) { + return _dataSchema; + } + + @Override + public Collection<Object[]> getRows(QueryContext queryContext) { + return _rows; + } + @Override public DataTable getDataTable(QueryContext queryContext) - throws Exception { - DataTable dataTable = - SelectionOperatorUtils.getDataTableFromRows(_rows, _dataSchema, queryContext.isNullHandlingEnabled()); - attachMetadataToDataTable(dataTable); - return dataTable; + throws IOException { + return SelectionOperatorUtils.getDataTableFromRows(_rows, _dataSchema, queryContext.isNullHandlingEnabled()); } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/StreamingInstanceResponseOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingInstanceResponseOperator.java similarity index 69% rename from pinot-core/src/main/java/org/apache/pinot/core/operator/StreamingInstanceResponseOperator.java rename to pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingInstanceResponseOperator.java index 3243dcd406..4bbd21ba3c 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/StreamingInstanceResponseOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingInstanceResponseOperator.java @@ -16,18 +16,16 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.core.operator; +package org.apache.pinot.core.operator.streaming; import io.grpc.stub.StreamObserver; import java.io.IOException; import java.util.List; -import org.apache.pinot.common.datatable.DataTable; import org.apache.pinot.common.exception.QueryException; import org.apache.pinot.common.proto.Server; -import org.apache.pinot.core.common.datatable.DataTableBuilderUtils; +import org.apache.pinot.core.operator.InstanceResponseOperator; import org.apache.pinot.core.operator.blocks.InstanceResponseBlock; import org.apache.pinot.core.operator.combine.BaseCombineOperator; -import org.apache.pinot.core.operator.streaming.StreamingResponseUtils; import org.apache.pinot.core.query.request.context.QueryContext; import org.apache.pinot.segment.spi.FetchContext; import org.apache.pinot.segment.spi.IndexSegment; @@ -46,18 +44,15 @@ public class StreamingInstanceResponseOperator extends InstanceResponseOperator @Override protected InstanceResponseBlock getNextBlock() { - InstanceResponseBlock nextBlock = super.getNextBlock(); - DataTable instanceResponseDataTable = nextBlock.getInstanceResponseDataTable(); - DataTable metadataOnlyDataTable; + InstanceResponseBlock responseBlock = super.getNextBlock(); + InstanceResponseBlock metadataOnlyResponseBlock = responseBlock.toMetadataOnlyResponseBlock(); try { - metadataOnlyDataTable = instanceResponseDataTable.toMetadataOnlyDataTable(); - _streamObserver.onNext(StreamingResponseUtils.getDataResponse(instanceResponseDataTable.toDataOnlyDataTable())); + _streamObserver.onNext(StreamingResponseUtils.getDataResponse(responseBlock.toDataOnlyDataTable())); } catch (IOException e) { - // when exception occurs in streaming, we return an error-only metadata block. - metadataOnlyDataTable = DataTableBuilderUtils.getEmptyDataTable(); - metadataOnlyDataTable.addException(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR, e)); + metadataOnlyResponseBlock.addException( + QueryException.getException(QueryException.DATA_TABLE_SERIALIZATION_ERROR, e)); } // return a metadata-only block. - return new InstanceResponseBlock(metadataOnlyDataTable); + return metadataOnlyResponseBlock; } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/plan/GlobalPlanImplV0.java b/pinot-core/src/main/java/org/apache/pinot/core/plan/GlobalPlanImplV0.java index d2bff6ab65..94cccc45ae 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/plan/GlobalPlanImplV0.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/GlobalPlanImplV0.java @@ -19,7 +19,6 @@ package org.apache.pinot.core.plan; import java.util.concurrent.TimeoutException; -import org.apache.pinot.common.datatable.DataTable; import org.apache.pinot.core.operator.InstanceResponseOperator; import org.apache.pinot.core.operator.blocks.InstanceResponseBlock; import org.slf4j.Logger; @@ -46,7 +45,7 @@ public class GlobalPlanImplV0 implements Plan { } @Override - public DataTable execute() + public InstanceResponseBlock execute() throws TimeoutException { long startTime = System.currentTimeMillis(); InstanceResponseOperator instanceResponseOperator = _instanceResponsePlanNode.run(); @@ -58,6 +57,6 @@ public class GlobalPlanImplV0 implements Plan { InstanceResponseBlock instanceResponseBlock = instanceResponseOperator.nextBlock(); long endTime2 = System.currentTimeMillis(); LOGGER.debug("InstanceResponseOperator.nextBlock() took: {}ms", endTime2 - endTime1); - return instanceResponseBlock.getInstanceResponseDataTable(); + return instanceResponseBlock; } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/plan/Plan.java b/pinot-core/src/main/java/org/apache/pinot/core/plan/Plan.java index 567dce0f6a..06432da734 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/plan/Plan.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/Plan.java @@ -19,7 +19,7 @@ package org.apache.pinot.core.plan; import java.util.concurrent.TimeoutException; -import org.apache.pinot.common.datatable.DataTable; +import org.apache.pinot.core.operator.blocks.InstanceResponseBlock; import org.apache.pinot.spi.annotations.InterfaceAudience; @@ -33,6 +33,6 @@ public interface Plan { PlanNode getPlanNode(); /** Execute the query plan and get the instance response. */ - DataTable execute() + InstanceResponseBlock execute() throws TimeoutException; } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/plan/StreamingInstanceResponsePlanNode.java b/pinot-core/src/main/java/org/apache/pinot/core/plan/StreamingInstanceResponsePlanNode.java index 6cd2b09846..25b20ec5b9 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/plan/StreamingInstanceResponsePlanNode.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/StreamingInstanceResponsePlanNode.java @@ -22,7 +22,7 @@ import io.grpc.stub.StreamObserver; import java.util.List; import org.apache.pinot.common.proto.Server; import org.apache.pinot.core.operator.InstanceResponseOperator; -import org.apache.pinot.core.operator.StreamingInstanceResponseOperator; +import org.apache.pinot.core.operator.streaming.StreamingInstanceResponseOperator; import org.apache.pinot.core.query.request.context.QueryContext; import org.apache.pinot.segment.spi.FetchContext; import org.apache.pinot.segment.spi.IndexSegment; diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/QueryExecutor.java b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/QueryExecutor.java index 4712d7f911..b7ac472743 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/QueryExecutor.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/QueryExecutor.java @@ -24,9 +24,11 @@ import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; import org.apache.commons.configuration.ConfigurationException; import org.apache.pinot.common.datatable.DataTable; +import org.apache.pinot.common.exception.QueryException; import org.apache.pinot.common.metrics.ServerMetrics; import org.apache.pinot.common.proto.Server; import org.apache.pinot.core.data.manager.InstanceDataManager; +import org.apache.pinot.core.operator.blocks.InstanceResponseBlock; import org.apache.pinot.core.query.request.ServerQueryRequest; import org.apache.pinot.spi.env.PinotConfiguration; @@ -55,7 +57,10 @@ public interface QueryExecutor { /** * Processes the non-streaming query with the given executor service. + * + * Deprecated: use execute() instead. */ + @Deprecated default DataTable processQuery(ServerQueryRequest queryRequest, ExecutorService executorService) { return processQuery(queryRequest, executorService, null); } @@ -64,14 +69,48 @@ public interface QueryExecutor { * Processes the query (streaming or non-streaming) with the given executor service. * <ul> * <li> - * For streaming request, the returned DataTable contains only the metadata. The response is streamed back via the - * observer. + * For streaming request, the returned {@link DataTable} contains only the metadata. The response is streamed back + * via the observer. * </li> * <li> - * For non-streaming request, the returned DataTable contains both data and metadata. + * For non-streaming request, the returned {@link DataTable} contains both data and metadata. * </li> * </ul> + * + * Deprecated: use execute() instead. */ - DataTable processQuery(ServerQueryRequest queryRequest, ExecutorService executorService, + @Deprecated + default DataTable processQuery(ServerQueryRequest queryRequest, ExecutorService executorService, + @Nullable StreamObserver<Server.ServerResponse> responseObserver) { + InstanceResponseBlock instanceResponse = execute(queryRequest, executorService, responseObserver); + try { + return instanceResponse.toDataTable(); + } catch (Exception e) { + DataTable metadataOnlyDataTable = instanceResponse.toMetadataOnlyDataTable(); + metadataOnlyDataTable.addException(QueryException.getException(QueryException.DATA_TABLE_SERIALIZATION_ERROR, e)); + return metadataOnlyDataTable; + } + } + + /** + * Executes the non-streaming query with the given executor service. + */ + default InstanceResponseBlock execute(ServerQueryRequest queryRequest, ExecutorService executorService) { + return execute(queryRequest, executorService, null); + } + + /** + * Executes the query (streaming or non-streaming) with the given executor service. + * <ul> + * <li> + * For streaming request, the returned {@link InstanceResponseBlock} contains only the metadata. The response is + * streamed back via the observer. + * </li> + * <li> + * For non-streaming request, the returned {@link InstanceResponseBlock} contains both data and metadata. + * </li> + * </ul> + */ + InstanceResponseBlock execute(ServerQueryRequest queryRequest, ExecutorService executorService, @Nullable StreamObserver<Server.ServerResponse> responseObserver); } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java index 075882448e..bf0838d4d1 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java @@ -20,10 +20,7 @@ package org.apache.pinot.core.query.executor; import com.google.common.base.Preconditions; import io.grpc.stub.StreamObserver; -import java.io.IOException; -import java.nio.charset.StandardCharsets; import java.util.ArrayList; -import java.util.Base64; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -34,7 +31,6 @@ import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; import org.apache.commons.configuration.ConfigurationException; import org.apache.commons.lang.StringUtils; -import org.apache.pinot.common.datatable.DataTable; import org.apache.pinot.common.datatable.DataTable.MetadataKey; import org.apache.pinot.common.exception.QueryException; import org.apache.pinot.common.function.TransformFunctionType; @@ -45,16 +41,16 @@ import org.apache.pinot.common.proto.Server; import org.apache.pinot.common.request.context.ExpressionContext; import org.apache.pinot.common.request.context.FilterContext; import org.apache.pinot.common.request.context.FunctionContext; -import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.core.common.ExplainPlanRowData; import org.apache.pinot.core.common.ExplainPlanRows; -import org.apache.pinot.core.common.ObjectSerDeUtils; import org.apache.pinot.core.common.Operator; -import org.apache.pinot.core.common.datatable.DataTableBuilder; -import org.apache.pinot.core.common.datatable.DataTableBuilderFactory; -import org.apache.pinot.core.common.datatable.DataTableBuilderUtils; import org.apache.pinot.core.data.manager.InstanceDataManager; import org.apache.pinot.core.data.manager.realtime.RealtimeTableDataManager; +import org.apache.pinot.core.operator.blocks.InstanceResponseBlock; +import org.apache.pinot.core.operator.blocks.results.AggregationResultsBlock; +import org.apache.pinot.core.operator.blocks.results.BaseResultsBlock; +import org.apache.pinot.core.operator.blocks.results.ExplainResultsBlock; +import org.apache.pinot.core.operator.blocks.results.ResultsBlockUtils; import org.apache.pinot.core.operator.filter.EmptyFilterOperator; import org.apache.pinot.core.operator.filter.MatchAllFilterOperator; import org.apache.pinot.core.plan.Plan; @@ -68,6 +64,7 @@ import org.apache.pinot.core.query.request.ServerQueryRequest; import org.apache.pinot.core.query.request.context.QueryContext; import org.apache.pinot.core.query.request.context.TimerContext; import org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils; +import org.apache.pinot.core.query.utils.idset.IdSet; import org.apache.pinot.core.util.QueryOptionsUtils; import org.apache.pinot.core.util.trace.TraceContext; import org.apache.pinot.segment.local.data.manager.SegmentDataManager; @@ -132,20 +129,20 @@ public class ServerQueryExecutorV1Impl implements QueryExecutor { } @Override - public DataTable processQuery(ServerQueryRequest queryRequest, ExecutorService executorService, + public InstanceResponseBlock execute(ServerQueryRequest queryRequest, ExecutorService executorService, @Nullable StreamObserver<Server.ServerResponse> responseObserver) { if (!queryRequest.isEnableTrace()) { - return processQueryInternal(queryRequest, executorService, responseObserver); + return executeInternal(queryRequest, executorService, responseObserver); } try { Tracing.getTracer().register(queryRequest.getRequestId()); - return processQueryInternal(queryRequest, executorService, responseObserver); + return executeInternal(queryRequest, executorService, responseObserver); } finally { Tracing.getTracer().unregister(); } } - private DataTable processQueryInternal(ServerQueryRequest queryRequest, ExecutorService executorService, + private InstanceResponseBlock executeInternal(ServerQueryRequest queryRequest, ExecutorService executorService, @Nullable StreamObserver<Server.ServerResponse> responseObserver) { TimerContext timerContext = queryRequest.getTimerContext(); TimerContext.Timer schedulerWaitTimer = timerContext.getPhaseTimer(ServerQueryPhase.SCHEDULER_WAIT); @@ -178,20 +175,22 @@ public class ServerQueryExecutorV1Impl implements QueryExecutor { String errorMessage = String.format("Query scheduling took %dms (longer than query timeout of %dms) on server: %s", querySchedulingTimeMs, queryTimeoutMs, _instanceDataManager.getInstanceId()); - DataTable dataTable = DataTableBuilderUtils.getEmptyDataTable(); - dataTable.addException(QueryException.getException(QueryException.QUERY_SCHEDULING_TIMEOUT_ERROR, errorMessage)); + InstanceResponseBlock instanceResponse = new InstanceResponseBlock(); + instanceResponse.addException( + QueryException.getException(QueryException.QUERY_SCHEDULING_TIMEOUT_ERROR, errorMessage)); LOGGER.error("{} while processing requestId: {}", errorMessage, requestId); - return dataTable; + return instanceResponse; } TableDataManager tableDataManager = _instanceDataManager.getTableDataManager(tableNameWithType); if (tableDataManager == null) { String errorMessage = String.format("Failed to find table: %s on server: %s", tableNameWithType, _instanceDataManager.getInstanceId()); - DataTable dataTable = DataTableBuilderUtils.getEmptyDataTable(); - dataTable.addException(QueryException.getException(QueryException.SERVER_TABLE_MISSING_ERROR, errorMessage)); + InstanceResponseBlock instanceResponse = new InstanceResponseBlock(); + instanceResponse.addException( + QueryException.getException(QueryException.SERVER_TABLE_MISSING_ERROR, errorMessage)); LOGGER.error("{} while processing requestId: {}", errorMessage, requestId); - return dataTable; + return instanceResponse; } List<String> segmentsToQuery = queryRequest.getSegmentsToQuery(); @@ -245,17 +244,17 @@ public class ServerQueryExecutorV1Impl implements QueryExecutor { } } - DataTable dataTable = null; + InstanceResponseBlock instanceResponse = null; try { - dataTable = processQuery(indexSegments, queryContext, timerContext, executorService, responseObserver, + instanceResponse = executeInternal(indexSegments, queryContext, timerContext, executorService, responseObserver, queryRequest.isEnableStreaming()); } catch (Exception e) { _serverMetrics.addMeteredTableValue(tableNameWithType, ServerMeter.QUERY_EXECUTION_EXCEPTIONS, 1); - dataTable = DataTableBuilderUtils.getEmptyDataTable(); + instanceResponse = new InstanceResponseBlock(); // Do not log verbose error for BadQueryRequestException and QueryCancelledException. if (e instanceof BadQueryRequestException) { LOGGER.info("Caught BadQueryRequestException while processing requestId: {}, {}", requestId, e.getMessage()); - dataTable.addException(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR, e)); + instanceResponse.addException(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR, e)); } else if (e instanceof QueryCancelledException) { if (LOGGER.isDebugEnabled()) { LOGGER.debug("Cancelled while processing requestId: {}", requestId, e); @@ -265,28 +264,27 @@ public class ServerQueryExecutorV1Impl implements QueryExecutor { // NOTE most likely the onFailure() callback registered on query future in InstanceRequestHandler would // return the error table to broker sooner than here. But in case of race condition, we construct the error // table here too. - dataTable.addException(QueryException.getException(QueryException.QUERY_CANCELLATION_ERROR, + instanceResponse.addException(QueryException.getException(QueryException.QUERY_CANCELLATION_ERROR, "Query cancelled on: " + _instanceDataManager.getInstanceId())); } else { LOGGER.error("Exception processing requestId {}", requestId, e); - dataTable.addException(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR, e)); + instanceResponse.addException(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR, e)); } } finally { for (SegmentDataManager segmentDataManager : segmentDataManagers) { tableDataManager.releaseSegment(segmentDataManager); } if (queryRequest.isEnableTrace()) { - if (TraceContext.traceEnabled() && dataTable != null) { - dataTable.getMetadata().put(MetadataKey.TRACE_INFO.getName(), TraceContext.getTraceInfo()); + if (TraceContext.traceEnabled() && instanceResponse != null) { + instanceResponse.addMetadata(MetadataKey.TRACE_INFO.getName(), TraceContext.getTraceInfo()); } } } queryProcessingTimer.stopAndRecord(); long queryProcessingTime = queryProcessingTimer.getDurationMs(); - Map<String, String> metadata = dataTable.getMetadata(); - metadata.put(MetadataKey.NUM_SEGMENTS_QUERIED.getName(), Integer.toString(numSegmentsAcquired)); - metadata.put(MetadataKey.TIME_USED_MS.getName(), Long.toString(queryProcessingTime)); + instanceResponse.addMetadata(MetadataKey.NUM_SEGMENTS_QUERIED.getName(), Integer.toString(numSegmentsAcquired)); + instanceResponse.addMetadata(MetadataKey.TIME_USED_MS.getName(), Long.toString(queryProcessingTime)); // When segment is removed from the IdealState: // 1. Controller schedules a state transition to server to turn segment OFFLINE @@ -302,7 +300,7 @@ public class ServerQueryExecutorV1Impl implements QueryExecutor { .collect(Collectors.toList()); int numMissingSegments = missingSegments.size(); if (numMissingSegments > 0) { - dataTable.addException(QueryException.getException(QueryException.SERVER_SEGMENT_MISSING_ERROR, + instanceResponse.addException(QueryException.getException(QueryException.SERVER_SEGMENT_MISSING_ERROR, String.format("%d segments %s missing on server: %s", numMissingSegments, missingSegments, _instanceDataManager.getInstanceId()))); _serverMetrics.addMeteredTableValue(tableNameWithType, ServerMeter.NUM_MISSING_SEGMENTS, numMissingSegments); @@ -310,31 +308,32 @@ public class ServerQueryExecutorV1Impl implements QueryExecutor { } if (tableDataManager instanceof RealtimeTableDataManager) { - long minConsumingFreshnessTimeMs = Long.MAX_VALUE; + long minConsumingFreshnessTimeMs; if (numConsumingSegmentsQueried > 0) { minConsumingFreshnessTimeMs = minIngestionTimeMs != Long.MAX_VALUE ? minIngestionTimeMs : minIndexTimeMs; - metadata.put(MetadataKey.NUM_CONSUMING_SEGMENTS_QUERIED.getName(), + instanceResponse.addMetadata(MetadataKey.NUM_CONSUMING_SEGMENTS_QUERIED.getName(), Integer.toString(numConsumingSegmentsQueried)); - metadata.put(MetadataKey.MIN_CONSUMING_FRESHNESS_TIME_MS.getName(), Long.toString(minConsumingFreshnessTimeMs)); + instanceResponse.addMetadata(MetadataKey.MIN_CONSUMING_FRESHNESS_TIME_MS.getName(), + Long.toString(minConsumingFreshnessTimeMs)); LOGGER.debug("Request {} queried {} consuming segments with minConsumingFreshnessTimeMs: {}", requestId, numConsumingSegmentsQueried, minConsumingFreshnessTimeMs); } else if (numConsumingSegmentsQueried == 0 && maxEndTimeMs != Long.MIN_VALUE) { minConsumingFreshnessTimeMs = maxEndTimeMs; - metadata.put(MetadataKey.MIN_CONSUMING_FRESHNESS_TIME_MS.getName(), Long.toString(maxEndTimeMs)); + instanceResponse.addMetadata(MetadataKey.MIN_CONSUMING_FRESHNESS_TIME_MS.getName(), + Long.toString(maxEndTimeMs)); LOGGER.debug("Request {} queried {} consuming segments with minConsumingFreshnessTimeMs: {}", requestId, numConsumingSegmentsQueried, minConsumingFreshnessTimeMs); } } LOGGER.debug("Query processing time for request Id - {}: {}", requestId, queryProcessingTime); - LOGGER.debug("InstanceResponse for request Id - {}: {}", requestId, dataTable); - return dataTable; + return instanceResponse; } // NOTE: This method might change indexSegments. Do not use it after calling this method. - private DataTable processQuery(List<IndexSegment> indexSegments, QueryContext queryContext, TimerContext timerContext, - ExecutorService executorService, @Nullable StreamObserver<Server.ServerResponse> responseObserver, - boolean enableStreaming) + private InstanceResponseBlock executeInternal(List<IndexSegment> indexSegments, QueryContext queryContext, + TimerContext timerContext, ExecutorService executorService, + @Nullable StreamObserver<Server.ServerResponse> responseObserver, boolean enableStreaming) throws Exception { handleSubquery(queryContext, indexSegments, timerContext, executorService); @@ -345,33 +344,20 @@ public class ServerQueryExecutorV1Impl implements QueryExecutor { } TimerContext.Timer segmentPruneTimer = timerContext.startNewPhaseTimer(ServerQueryPhase.SEGMENT_PRUNING); - int totalSegments = indexSegments.size(); + int numTotalSegments = indexSegments.size(); SegmentPrunerStatistics prunerStats = new SegmentPrunerStatistics(); List<IndexSegment> selectedSegments = _segmentPrunerService.prune(indexSegments, queryContext, prunerStats); segmentPruneTimer.stopAndRecord(); int numSelectedSegments = selectedSegments.size(); LOGGER.debug("Matched {} segments after pruning", numSelectedSegments); + InstanceResponseBlock instanceResponse; if (numSelectedSegments == 0) { - // Only return metadata for streaming query - DataTable dataTable; if (queryContext.isExplain()) { - dataTable = getExplainPlanResultsForNoMatchingSegment(totalSegments); + instanceResponse = getExplainResponseForNoMatchingSegment(numTotalSegments, queryContext); } else { - dataTable = DataTableBuilderUtils.buildEmptyDataTable(queryContext); + instanceResponse = + new InstanceResponseBlock(ResultsBlockUtils.buildEmptyQueryResults(queryContext), queryContext); } - - Map<String, String> metadata = dataTable.getMetadata(); - metadata.put(MetadataKey.TOTAL_DOCS.getName(), String.valueOf(numTotalDocs)); - metadata.put(MetadataKey.NUM_DOCS_SCANNED.getName(), "0"); - metadata.put(MetadataKey.NUM_ENTRIES_SCANNED_IN_FILTER.getName(), "0"); - metadata.put(MetadataKey.NUM_ENTRIES_SCANNED_POST_FILTER.getName(), "0"); - metadata.put(MetadataKey.NUM_SEGMENTS_PROCESSED.getName(), "0"); - metadata.put(MetadataKey.NUM_SEGMENTS_MATCHED.getName(), "0"); - metadata.put(MetadataKey.NUM_CONSUMING_SEGMENTS_PROCESSED.getName(), "0"); - metadata.put(MetadataKey.NUM_CONSUMING_SEGMENTS_MATCHED.getName(), "0"); - metadata.put(MetadataKey.NUM_SEGMENTS_PRUNED_BY_SERVER.getName(), String.valueOf(totalSegments)); - addPrunerStats(metadata, prunerStats); - return dataTable; } else { TimerContext.Timer planBuildTimer = timerContext.startNewPhaseTimer(ServerQueryPhase.BUILD_QUERY_PLAN); Plan queryPlan = @@ -381,40 +367,28 @@ public class ServerQueryExecutorV1Impl implements QueryExecutor { planBuildTimer.stopAndRecord(); TimerContext.Timer planExecTimer = timerContext.startNewPhaseTimer(ServerQueryPhase.QUERY_PLAN_EXECUTION); - DataTable dataTable = queryContext.isExplain() ? processExplainPlanQueries(queryPlan) : queryPlan.execute(); + instanceResponse = queryContext.isExplain() ? executeExplainQuery(queryPlan, queryContext) : queryPlan.execute(); planExecTimer.stopAndRecord(); + } - Map<String, String> metadata = dataTable.getMetadata(); - // Update the total docs in the metadata based on the un-pruned segments - metadata.put(MetadataKey.TOTAL_DOCS.getName(), Long.toString(numTotalDocs)); + // Update the total docs in the metadata based on the un-pruned segments + instanceResponse.addMetadata(MetadataKey.TOTAL_DOCS.getName(), Long.toString(numTotalDocs)); - // Set the number of pruned segments. This count does not include the segments which returned empty filters - int prunedSegments = totalSegments - numSelectedSegments; - metadata.put(MetadataKey.NUM_SEGMENTS_PRUNED_BY_SERVER.getName(), String.valueOf(prunedSegments)); - addPrunerStats(metadata, prunerStats); + // Set the number of pruned segments. This count does not include the segments which returned empty filters + int prunedSegments = numTotalSegments - numSelectedSegments; + instanceResponse.addMetadata(MetadataKey.NUM_SEGMENTS_PRUNED_BY_SERVER.getName(), String.valueOf(prunedSegments)); + addPrunerStats(instanceResponse, prunerStats); - return dataTable; - } + return instanceResponse; } - /** @return EXPLAIN_PLAN query result {@link DataTable} when no segments get selected for query execution.*/ - private static DataTable getExplainPlanResultsForNoMatchingSegment(int totalNumSegments) { - DataTableBuilder dataTableBuilder = DataTableBuilderFactory.getDataTableBuilder(DataSchema.EXPLAIN_RESULT_SCHEMA); - try { - dataTableBuilder.startRow(); - dataTableBuilder.setColumn(0, String.format(ExplainPlanRows.PLAN_START_FORMAT, totalNumSegments)); - dataTableBuilder.setColumn(1, ExplainPlanRows.PLAN_START_IDS); - dataTableBuilder.setColumn(2, ExplainPlanRows.PLAN_START_IDS); - dataTableBuilder.finishRow(); - dataTableBuilder.startRow(); - dataTableBuilder.setColumn(0, ExplainPlanRows.ALL_SEGMENTS_PRUNED_ON_SERVER); - dataTableBuilder.setColumn(1, 2); - dataTableBuilder.setColumn(2, 1); - dataTableBuilder.finishRow(); - } catch (IOException ioe) { - LOGGER.error("Unable to create EXPLAIN PLAN result table.", ioe); - } - return dataTableBuilder.build(); + private static InstanceResponseBlock getExplainResponseForNoMatchingSegment(int numTotalSegments, + QueryContext queryContext) { + ExplainResultsBlock explainResults = new ExplainResultsBlock(); + explainResults.addOperator(String.format(ExplainPlanRows.PLAN_START_FORMAT, numTotalSegments), + ExplainPlanRows.PLAN_START_IDS, ExplainPlanRows.PLAN_START_IDS); + explainResults.addOperator(ExplainPlanRows.ALL_SEGMENTS_PRUNED_ON_SERVER, 2, 1); + return new InstanceResponseBlock(explainResults, queryContext); } /** @@ -502,9 +476,8 @@ public class ServerQueryExecutorV1Impl implements QueryExecutor { } } - /** @return EXPLAIN PLAN query result {@link DataTable}. */ - public static DataTable processExplainPlanQueries(Plan queryPlan) { - DataTableBuilder dataTableBuilder = DataTableBuilderFactory.getDataTableBuilder(DataSchema.EXPLAIN_RESULT_SCHEMA); + public static InstanceResponseBlock executeExplainQuery(Plan queryPlan, QueryContext queryContext) { + ExplainResultsBlock explainResults = new ExplainResultsBlock(); List<Operator> childOperators = queryPlan.getPlanNode().run().getChildOperators(); assert childOperators.size() == 1; Operator root = childOperators.get(0); @@ -512,55 +485,35 @@ public class ServerQueryExecutorV1Impl implements QueryExecutor { int numEmptyFilterSegments = 0; int numMatchAllFilterSegments = 0; - try { - // Get the list of unique explain plans - operatorDepthToRowDataMap = getAllSegmentsUniqueExplainPlanRowData(root); - List<ExplainPlanRows> listOfExplainPlans = new ArrayList<>(); - operatorDepthToRowDataMap.forEach((key, value) -> listOfExplainPlans.addAll(value)); - - // Setup the combine root's explain string - setValueInDataTableBuilder(dataTableBuilder, root.toExplainString(), 2, 1); - - // Walk through all the explain plans and create the entries in the explain plan output for each plan - for (ExplainPlanRows explainPlanRows : listOfExplainPlans) { - numEmptyFilterSegments += - explainPlanRows.isHasEmptyFilter() ? explainPlanRows.getNumSegmentsMatchingThisPlan() : 0; - numMatchAllFilterSegments += - explainPlanRows.isHasMatchAllFilter() ? explainPlanRows.getNumSegmentsMatchingThisPlan() : 0; - setValueInDataTableBuilder(dataTableBuilder, - String.format(ExplainPlanRows.PLAN_START_FORMAT, explainPlanRows.getNumSegmentsMatchingThisPlan()), - ExplainPlanRows.PLAN_START_IDS, ExplainPlanRows.PLAN_START_IDS); - for (ExplainPlanRowData explainPlanRowData : explainPlanRows.getExplainPlanRowData()) { - setValueInDataTableBuilder(dataTableBuilder, explainPlanRowData.getExplainPlanString(), - explainPlanRowData.getOperatorId(), explainPlanRowData.getParentId()); - } + // Get the list of unique explain plans + operatorDepthToRowDataMap = getAllSegmentsUniqueExplainPlanRowData(root); + List<ExplainPlanRows> listOfExplainPlans = new ArrayList<>(); + operatorDepthToRowDataMap.forEach((key, value) -> listOfExplainPlans.addAll(value)); + + // Setup the combine root's explain string + explainResults.addOperator(root.toExplainString(), 2, 1); + + // Walk through all the explain plans and create the entries in the explain plan output for each plan + for (ExplainPlanRows explainPlanRows : listOfExplainPlans) { + numEmptyFilterSegments += + explainPlanRows.isHasEmptyFilter() ? explainPlanRows.getNumSegmentsMatchingThisPlan() : 0; + numMatchAllFilterSegments += + explainPlanRows.isHasMatchAllFilter() ? explainPlanRows.getNumSegmentsMatchingThisPlan() : 0; + explainResults.addOperator( + String.format(ExplainPlanRows.PLAN_START_FORMAT, explainPlanRows.getNumSegmentsMatchingThisPlan()), + ExplainPlanRows.PLAN_START_IDS, ExplainPlanRows.PLAN_START_IDS); + for (ExplainPlanRowData explainPlanRowData : explainPlanRows.getExplainPlanRowData()) { + explainResults.addOperator(explainPlanRowData.getExplainPlanString(), explainPlanRowData.getOperatorId(), + explainPlanRowData.getParentId()); } - } catch (IOException ioe) { - LOGGER.error("Unable to create EXPLAIN PLAN result table.", ioe); } - DataTable dataTable = dataTableBuilder.build(); - dataTable.getMetadata() - .put(MetadataKey.EXPLAIN_PLAN_NUM_EMPTY_FILTER_SEGMENTS.getName(), String.valueOf(numEmptyFilterSegments)); - dataTable.getMetadata().put(MetadataKey.EXPLAIN_PLAN_NUM_MATCH_ALL_FILTER_SEGMENTS.getName(), + InstanceResponseBlock instanceResponse = new InstanceResponseBlock(explainResults, queryContext); + instanceResponse.addMetadata(MetadataKey.EXPLAIN_PLAN_NUM_EMPTY_FILTER_SEGMENTS.getName(), + String.valueOf(numEmptyFilterSegments)); + instanceResponse.addMetadata(MetadataKey.EXPLAIN_PLAN_NUM_MATCH_ALL_FILTER_SEGMENTS.getName(), String.valueOf(numMatchAllFilterSegments)); - return dataTable; - } - - /** - * Set the value for the explain plan fields in the DataTableBuilder - */ - private static void setValueInDataTableBuilder(DataTableBuilder dataTableBuilder, String explainPlanString, - int operatorId, int parentId) - throws IOException { - if (explainPlanString != null) { - // Only those operators that return a non-null description will be added to the EXPLAIN PLAN output. - dataTableBuilder.startRow(); - dataTableBuilder.setColumn(0, explainPlanString); - dataTableBuilder.setColumn(1, operatorId); - dataTableBuilder.setColumn(2, parentId); - dataTableBuilder.finishRow(); - } + return instanceResponse; } /** @@ -626,16 +579,19 @@ public class ServerQueryExecutorV1Impl implements QueryExecutor { // Execute the subquery subquery.setEndTimeMs(endTimeMs); // Make a clone of indexSegments because the method might modify the list - DataTable dataTable = - processQuery(new ArrayList<>(indexSegments), subquery, timerContext, executorService, null, false); - DataTable.CustomObject idSet = dataTable.getCustomObject(0, 0); - Preconditions.checkState(idSet != null && idSet.getType() == ObjectSerDeUtils.ObjectType.IdSet.getValue(), - "Result is not an IdSet"); - String serializedIdSet = - new String(Base64.getEncoder().encode(idSet.getBuffer()).array(), StandardCharsets.ISO_8859_1); + InstanceResponseBlock instanceResponse = + executeInternal(new ArrayList<>(indexSegments), subquery, timerContext, executorService, null, false); + BaseResultsBlock resultsBlock = instanceResponse.getResultsBlock(); + Preconditions.checkState(resultsBlock instanceof AggregationResultsBlock, + "Got unexpected results block type: %s, expecting aggregation results", + resultsBlock != null ? resultsBlock.getClass().getSimpleName() : null); + Object result = ((AggregationResultsBlock) resultsBlock).getResults().get(0); + Preconditions.checkState(result instanceof IdSet, "Got unexpected result type: %s, expecting IdSet", + result != null ? result.getClass().getSimpleName() : null); // Rewrite the expression function.setFunctionName(TransformFunctionType.INIDSET.name()); - arguments.set(1, ExpressionContext.forLiteralContext(FieldSpec.DataType.STRING, serializedIdSet)); + arguments.set(1, + ExpressionContext.forLiteralContext(FieldSpec.DataType.STRING, ((IdSet) result).toBase64String())); } else { for (ExpressionContext argument : arguments) { handleSubquery(argument, indexSegments, timerContext, executorService, endTimeMs); @@ -643,9 +599,12 @@ public class ServerQueryExecutorV1Impl implements QueryExecutor { } } - private void addPrunerStats(Map<String, String> metadata, SegmentPrunerStatistics prunerStats) { - metadata.put(MetadataKey.NUM_SEGMENTS_PRUNED_INVALID.getName(), String.valueOf(prunerStats.getInvalidSegments())); - metadata.put(MetadataKey.NUM_SEGMENTS_PRUNED_BY_LIMIT.getName(), String.valueOf(prunerStats.getLimitPruned())); - metadata.put(MetadataKey.NUM_SEGMENTS_PRUNED_BY_VALUE.getName(), String.valueOf(prunerStats.getValuePruned())); + private void addPrunerStats(InstanceResponseBlock instanceResponse, SegmentPrunerStatistics prunerStats) { + instanceResponse.addMetadata(MetadataKey.NUM_SEGMENTS_PRUNED_INVALID.getName(), + String.valueOf(prunerStats.getInvalidSegments())); + instanceResponse.addMetadata(MetadataKey.NUM_SEGMENTS_PRUNED_BY_LIMIT.getName(), + String.valueOf(prunerStats.getLimitPruned())); + instanceResponse.addMetadata(MetadataKey.NUM_SEGMENTS_PRUNED_BY_VALUE.getName(), + String.valueOf(prunerStats.getValuePruned())); } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java index 0ce0046a99..d1d2b4ec23 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java @@ -29,7 +29,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.LongAccumulator; import javax.annotation.Nullable; -import org.apache.pinot.common.datatable.DataTable; import org.apache.pinot.common.datatable.DataTable.MetadataKey; import org.apache.pinot.common.exception.QueryException; import org.apache.pinot.common.metrics.ServerGauge; @@ -38,7 +37,7 @@ import org.apache.pinot.common.metrics.ServerMetrics; import org.apache.pinot.common.metrics.ServerQueryPhase; import org.apache.pinot.common.metrics.ServerTimer; import org.apache.pinot.common.response.ProcessingException; -import org.apache.pinot.core.common.datatable.DataTableBuilderUtils; +import org.apache.pinot.core.operator.blocks.InstanceResponseBlock; import org.apache.pinot.core.query.executor.QueryExecutor; import org.apache.pinot.core.query.request.ServerQueryRequest; import org.apache.pinot.core.query.request.context.TimerContext; @@ -145,59 +144,58 @@ public abstract class QueryScheduler { @Nullable protected byte[] processQueryAndSerialize(ServerQueryRequest queryRequest, ExecutorService executorService) { _latestQueryTime.accumulate(System.currentTimeMillis()); - DataTable dataTable; + InstanceResponseBlock instanceResponse; try { - dataTable = _queryExecutor.processQuery(queryRequest, executorService); + instanceResponse = _queryExecutor.execute(queryRequest, executorService); } catch (Exception e) { LOGGER.error("Encountered exception while processing requestId {} from broker {}", queryRequest.getRequestId(), queryRequest.getBrokerId(), e); // For not handled exceptions _serverMetrics.addMeteredGlobalValue(ServerMeter.UNCAUGHT_EXCEPTIONS, 1); - dataTable = DataTableBuilderUtils.getEmptyDataTable(); - dataTable.addException(QueryException.getException(QueryException.INTERNAL_ERROR, e)); + instanceResponse = new InstanceResponseBlock(); + instanceResponse.addException(QueryException.getException(QueryException.INTERNAL_ERROR, e)); } long requestId = queryRequest.getRequestId(); - Map<String, String> dataTableMetadata = dataTable.getMetadata(); - dataTableMetadata.put(MetadataKey.REQUEST_ID.getName(), Long.toString(requestId)); + Map<String, String> responseMetadata = instanceResponse.getResponseMetadata(); + responseMetadata.put(MetadataKey.REQUEST_ID.getName(), Long.toString(requestId)); - byte[] responseBytes = serializeDataTable(queryRequest, dataTable); + byte[] responseBytes = serializeResponse(queryRequest, instanceResponse); // Log the statistics String tableNameWithType = queryRequest.getTableNameWithType(); long numDocsScanned = - Long.parseLong(dataTableMetadata.getOrDefault(MetadataKey.NUM_DOCS_SCANNED.getName(), INVALID_NUM_SCANNED)); + Long.parseLong(responseMetadata.getOrDefault(MetadataKey.NUM_DOCS_SCANNED.getName(), INVALID_NUM_SCANNED)); long numEntriesScannedInFilter = Long.parseLong( - dataTableMetadata.getOrDefault(MetadataKey.NUM_ENTRIES_SCANNED_IN_FILTER.getName(), INVALID_NUM_SCANNED)); + responseMetadata.getOrDefault(MetadataKey.NUM_ENTRIES_SCANNED_IN_FILTER.getName(), INVALID_NUM_SCANNED)); long numEntriesScannedPostFilter = Long.parseLong( - dataTableMetadata.getOrDefault(MetadataKey.NUM_ENTRIES_SCANNED_POST_FILTER.getName(), INVALID_NUM_SCANNED)); + responseMetadata.getOrDefault(MetadataKey.NUM_ENTRIES_SCANNED_POST_FILTER.getName(), INVALID_NUM_SCANNED)); long numSegmentsProcessed = Long.parseLong( - dataTableMetadata.getOrDefault(MetadataKey.NUM_SEGMENTS_PROCESSED.getName(), INVALID_SEGMENTS_COUNT)); + responseMetadata.getOrDefault(MetadataKey.NUM_SEGMENTS_PROCESSED.getName(), INVALID_SEGMENTS_COUNT)); long numSegmentsMatched = Long.parseLong( - dataTableMetadata.getOrDefault(MetadataKey.NUM_SEGMENTS_MATCHED.getName(), INVALID_SEGMENTS_COUNT)); + responseMetadata.getOrDefault(MetadataKey.NUM_SEGMENTS_MATCHED.getName(), INVALID_SEGMENTS_COUNT)); long numSegmentsPrunedInvalid = Long.parseLong( - dataTableMetadata.getOrDefault(MetadataKey.NUM_SEGMENTS_PRUNED_INVALID.getName(), INVALID_SEGMENTS_COUNT)); + responseMetadata.getOrDefault(MetadataKey.NUM_SEGMENTS_PRUNED_INVALID.getName(), INVALID_SEGMENTS_COUNT)); long numSegmentsPrunedByLimit = Long.parseLong( - dataTableMetadata.getOrDefault(MetadataKey.NUM_SEGMENTS_PRUNED_BY_LIMIT.getName(), INVALID_SEGMENTS_COUNT)); + responseMetadata.getOrDefault(MetadataKey.NUM_SEGMENTS_PRUNED_BY_LIMIT.getName(), INVALID_SEGMENTS_COUNT)); long numSegmentsPrunedByValue = Long.parseLong( - dataTableMetadata.getOrDefault(MetadataKey.NUM_SEGMENTS_PRUNED_BY_VALUE.getName(), INVALID_SEGMENTS_COUNT)); + responseMetadata.getOrDefault(MetadataKey.NUM_SEGMENTS_PRUNED_BY_VALUE.getName(), INVALID_SEGMENTS_COUNT)); long numSegmentsConsuming = Long.parseLong( - dataTableMetadata.getOrDefault(MetadataKey.NUM_CONSUMING_SEGMENTS_QUERIED.getName(), INVALID_SEGMENTS_COUNT)); + responseMetadata.getOrDefault(MetadataKey.NUM_CONSUMING_SEGMENTS_QUERIED.getName(), INVALID_SEGMENTS_COUNT)); long numConsumingSegmentsProcessed = Long.parseLong( - dataTableMetadata.getOrDefault(MetadataKey.NUM_CONSUMING_SEGMENTS_PROCESSED.getName(), INVALID_SEGMENTS_COUNT)); + responseMetadata.getOrDefault(MetadataKey.NUM_CONSUMING_SEGMENTS_PROCESSED.getName(), INVALID_SEGMENTS_COUNT)); long numConsumingSegmentsMatched = Long.parseLong( - dataTableMetadata.getOrDefault(MetadataKey.NUM_CONSUMING_SEGMENTS_MATCHED.getName(), INVALID_SEGMENTS_COUNT)); + responseMetadata.getOrDefault(MetadataKey.NUM_CONSUMING_SEGMENTS_MATCHED.getName(), INVALID_SEGMENTS_COUNT)); long minConsumingFreshnessMs = Long.parseLong( - dataTableMetadata.getOrDefault(MetadataKey.MIN_CONSUMING_FRESHNESS_TIME_MS.getName(), INVALID_FRESHNESS_MS)); + responseMetadata.getOrDefault(MetadataKey.MIN_CONSUMING_FRESHNESS_TIME_MS.getName(), INVALID_FRESHNESS_MS)); int numResizes = - Integer.parseInt(dataTableMetadata.getOrDefault(MetadataKey.NUM_RESIZES.getName(), INVALID_NUM_RESIZES)); + Integer.parseInt(responseMetadata.getOrDefault(MetadataKey.NUM_RESIZES.getName(), INVALID_NUM_RESIZES)); long resizeTimeMs = - Long.parseLong(dataTableMetadata.getOrDefault(MetadataKey.RESIZE_TIME_MS.getName(), INVALID_RESIZE_TIME_MS)); - long threadCpuTimeNs = - Long.parseLong(dataTableMetadata.getOrDefault(MetadataKey.THREAD_CPU_TIME_NS.getName(), "0")); + Long.parseLong(responseMetadata.getOrDefault(MetadataKey.RESIZE_TIME_MS.getName(), INVALID_RESIZE_TIME_MS)); + long threadCpuTimeNs = Long.parseLong(responseMetadata.getOrDefault(MetadataKey.THREAD_CPU_TIME_NS.getName(), "0")); long systemActivitiesCpuTimeNs = - Long.parseLong(dataTableMetadata.getOrDefault(MetadataKey.SYSTEM_ACTIVITIES_CPU_TIME_NS.getName(), "0")); + Long.parseLong(responseMetadata.getOrDefault(MetadataKey.SYSTEM_ACTIVITIES_CPU_TIME_NS.getName(), "0")); long responseSerializationCpuTimeNs = - Long.parseLong(dataTableMetadata.getOrDefault(MetadataKey.RESPONSE_SER_CPU_TIME_NS.getName(), "0")); + Long.parseLong(responseMetadata.getOrDefault(MetadataKey.RESPONSE_SER_CPU_TIME_NS.getName(), "0")); long totalCpuTimeNs = threadCpuTimeNs + systemActivitiesCpuTimeNs + responseSerializationCpuTimeNs; if (numDocsScanned > 0) { @@ -311,20 +309,20 @@ public abstract class QueryScheduler { } /** - * Serialize the DataTable response for query request + * Serialize the instance response for query request * @param queryRequest Server query request for which response is serialized - * @param dataTable DataTable to serialize + * @param instanceResponse instance response to serialize * @return serialized response bytes */ @Nullable - private byte[] serializeDataTable(ServerQueryRequest queryRequest, DataTable dataTable) { + private byte[] serializeResponse(ServerQueryRequest queryRequest, InstanceResponseBlock instanceResponse) { TimerContext timerContext = queryRequest.getTimerContext(); TimerContext.Timer responseSerializationTimer = timerContext.startNewPhaseTimer(ServerQueryPhase.RESPONSE_SERIALIZATION); byte[] responseByte = null; try { - responseByte = dataTable.toBytes(); + responseByte = instanceResponse.toDataTable().toBytes(); } catch (Exception e) { _serverMetrics.addMeteredGlobalValue(ServerMeter.RESPONSE_SERIALIZATION_EXCEPTIONS, 1); LOGGER.error("Caught exception while serializing response for requestId: {}, brokerId: {}", @@ -344,12 +342,9 @@ public abstract class QueryScheduler { */ protected ListenableFuture<byte[]> immediateErrorResponse(ServerQueryRequest queryRequest, ProcessingException error) { - DataTable result = DataTableBuilderUtils.getEmptyDataTable(); - - Map<String, String> dataTableMetadata = result.getMetadata(); - dataTableMetadata.put(MetadataKey.REQUEST_ID.getName(), Long.toString(queryRequest.getRequestId())); - - result.addException(error); - return Futures.immediateFuture(serializeDataTable(queryRequest, result)); + InstanceResponseBlock instanceResponse = new InstanceResponseBlock(); + instanceResponse.addMetadata(MetadataKey.REQUEST_ID.getName(), Long.toString(queryRequest.getRequestId())); + instanceResponse.addException(error); + return Futures.immediateFuture(serializeResponse(queryRequest, instanceResponse)); } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java index 71dbe1127f..649e377942 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/selection/SelectionOperatorUtils.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.core.query.selection; +import java.io.IOException; import java.math.BigDecimal; import java.util.ArrayList; import java.util.Arrays; @@ -231,11 +232,11 @@ public class SelectionOperatorUtils { * @param dataSchema data schema. * @param nullHandlingEnabled whether null handling is enabled. * @return data table. - * @throws Exception + * @throws IOException */ public static DataTable getDataTableFromRows(Collection<Object[]> rows, DataSchema dataSchema, boolean nullHandlingEnabled) - throws Exception { + throws IOException { ColumnDataType[] storedColumnDataTypes = dataSchema.getStoredColumnDataTypes(); int numColumns = storedColumnDataTypes.length; diff --git a/pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java b/pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java index 886dd68446..9b5031c46b 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java @@ -46,7 +46,7 @@ import org.apache.pinot.common.metrics.ServerMetrics; import org.apache.pinot.common.metrics.ServerQueryPhase; import org.apache.pinot.common.metrics.ServerTimer; import org.apache.pinot.common.request.InstanceRequest; -import org.apache.pinot.core.common.datatable.DataTableBuilderUtils; +import org.apache.pinot.core.common.datatable.DataTableBuilderFactory; import org.apache.pinot.core.query.request.ServerQueryRequest; import org.apache.pinot.core.query.scheduler.QueryScheduler; import org.apache.pinot.server.access.AccessControl; @@ -155,7 +155,7 @@ public class InstanceRequestHandler extends SimpleChannelInboundHandler<ByteBuf> long requestId = instanceRequest != null ? instanceRequest.getRequestId() : 0; LOGGER.error("Exception while processing instance request: {}", hexString, e); sendErrorResponse(ctx, requestId, tableNameWithType, queryArrivalTimeMs, - DataTableBuilderUtils.getEmptyDataTable(), e); + DataTableBuilderFactory.getEmptyDataTable(), e); } } @@ -175,9 +175,9 @@ public class InstanceRequestHandler extends SimpleChannelInboundHandler<ByteBuf> } _queryFuturesById.put(queryId, future); } - Futures - .addCallback(future, createCallback(ctx, tableNameWithType, queryArrivalTimeMs, instanceRequest, queryRequest), - MoreExecutors.directExecutor()); + Futures.addCallback(future, + createCallback(ctx, tableNameWithType, queryArrivalTimeMs, instanceRequest, queryRequest), + MoreExecutors.directExecutor()); } private FutureCallback<byte[]> createCallback(ChannelHandlerContext ctx, String tableNameWithType, @@ -198,7 +198,7 @@ public class InstanceRequestHandler extends SimpleChannelInboundHandler<ByteBuf> } else { // Send exception response. sendErrorResponse(ctx, queryRequest.getRequestId(), tableNameWithType, queryArrivalTimeMs, - DataTableBuilderUtils.getEmptyDataTable(), new Exception("Null query response.")); + DataTableBuilderFactory.getEmptyDataTable(), new Exception("Null query response.")); } } @@ -225,7 +225,7 @@ public class InstanceRequestHandler extends SimpleChannelInboundHandler<ByteBuf> e = new Exception(t); } sendErrorResponse(ctx, instanceRequest.getRequestId(), tableNameWithType, queryArrivalTimeMs, - DataTableBuilderUtils.getEmptyDataTable(), e); + DataTableBuilderFactory.getEmptyDataTable(), e); } }; } @@ -236,7 +236,7 @@ public class InstanceRequestHandler extends SimpleChannelInboundHandler<ByteBuf> // will only be called if for some remote reason we are unable to handle exceptions in channelRead0. String message = "Unhandled Exception in " + getClass().getCanonicalName(); LOGGER.error(message, cause); - sendErrorResponse(ctx, 0, null, System.currentTimeMillis(), DataTableBuilderUtils.getEmptyDataTable(), + sendErrorResponse(ctx, 0, null, System.currentTimeMillis(), DataTableBuilderFactory.getEmptyDataTable(), new Exception(message, cause)); } @@ -282,8 +282,8 @@ public class InstanceRequestHandler extends SimpleChannelInboundHandler<ByteBuf> Map<String, String> dataTableMetadata = dataTable.getMetadata(); dataTableMetadata.put(MetadataKey.REQUEST_ID.getName(), Long.toString(requestId)); if (cancelled) { - dataTable.addException(QueryException - .getException(QueryException.QUERY_CANCELLATION_ERROR, "Query cancelled on: " + _instanceName)); + dataTable.addException(QueryException.getException(QueryException.QUERY_CANCELLATION_ERROR, + "Query cancelled on: " + _instanceName)); } else { dataTable.addException(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR, e)); } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/transport/grpc/GrpcQueryServer.java b/pinot-core/src/main/java/org/apache/pinot/core/transport/grpc/GrpcQueryServer.java index 579f0691a5..f92a5e23f0 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/transport/grpc/GrpcQueryServer.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/grpc/GrpcQueryServer.java @@ -40,6 +40,7 @@ import org.apache.pinot.common.proto.PinotQueryServerGrpc; import org.apache.pinot.common.proto.Server.ServerRequest; import org.apache.pinot.common.proto.Server.ServerResponse; import org.apache.pinot.common.utils.TlsUtils; +import org.apache.pinot.core.operator.blocks.InstanceResponseBlock; import org.apache.pinot.core.operator.streaming.StreamingResponseUtils; import org.apache.pinot.core.query.executor.QueryExecutor; import org.apache.pinot.core.query.request.ServerQueryRequest; @@ -68,8 +69,7 @@ public class GrpcQueryServer extends PinotQueryServerGrpc.PinotQueryServerImplBa if (tlsConfig != null) { try { _server = NettyServerBuilder.forPort(port).sslContext(buildGRpcSslContext(tlsConfig)) - .maxInboundMessageSize(config.getMaxInboundMessageSizeBytes()) - .addService(this).build(); + .maxInboundMessageSize(config.getMaxInboundMessageSizeBytes()).addService(this).build(); } catch (Exception e) { throw new RuntimeException("Failed to start secure grpcQueryServer", e); } @@ -144,9 +144,9 @@ public class GrpcQueryServer extends PinotQueryServerGrpc.PinotQueryServerImplBa } // Process the query - DataTable dataTable; + InstanceResponseBlock instanceResponse; try { - dataTable = _queryExecutor.processQuery(queryRequest, _executorService, responseObserver); + instanceResponse = _queryExecutor.execute(queryRequest, _executorService, responseObserver); } catch (Exception e) { LOGGER.error("Caught exception while processing request {}: {} from broker: {}", queryRequest.getRequestId(), queryRequest.getQueryContext(), queryRequest.getBrokerId(), e); @@ -155,18 +155,19 @@ public class GrpcQueryServer extends PinotQueryServerGrpc.PinotQueryServerImplBa return; } - ServerResponse response; + ServerResponse serverResponse; try { - response = queryRequest.isEnableStreaming() ? StreamingResponseUtils.getMetadataResponse(dataTable) + DataTable dataTable = instanceResponse.toDataTable(); + serverResponse = queryRequest.isEnableStreaming() ? StreamingResponseUtils.getMetadataResponse(dataTable) : StreamingResponseUtils.getNonStreamingResponse(dataTable); } catch (Exception e) { - LOGGER.error("Caught exception while constructing response from data table for request {}: {} from broker: {}", + LOGGER.error("Caught exception while serializing response for request {}: {} from broker: {}", queryRequest.getRequestId(), queryRequest.getQueryContext(), queryRequest.getBrokerId(), e); _serverMetrics.addMeteredGlobalValue(ServerMeter.RESPONSE_SERIALIZATION_EXCEPTIONS, 1); responseObserver.onError(Status.INTERNAL.withCause(e).asException()); return; } - responseObserver.onNext(response); + responseObserver.onNext(serverResponse); responseObserver.onCompleted(); } } diff --git a/pinot-core/src/test/java/org/apache/pinot/core/common/datatable/DataTableSerDeTest.java b/pinot-core/src/test/java/org/apache/pinot/core/common/datatable/DataTableSerDeTest.java index 86f28d1f6b..0040e842a8 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/common/datatable/DataTableSerDeTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/common/datatable/DataTableSerDeTest.java @@ -108,7 +108,7 @@ public class DataTableSerDeTest { QueryException.getException(QueryException.QUERY_EXECUTION_ERROR, exception); String expected = processingException.getMessage(); - DataTable dataTable = DataTableBuilderUtils.getEmptyDataTable(); + DataTable dataTable = DataTableBuilderFactory.getEmptyDataTable(); dataTable.addException(processingException); DataTable newDataTable = DataTableFactory.getDataTable(dataTable.toBytes()); Assert.assertNull(newDataTable.getDataSchema()); diff --git a/pinot-core/src/test/java/org/apache/pinot/core/common/datatable/DataTableBuilderUtilsTest.java b/pinot-core/src/test/java/org/apache/pinot/core/operator/blocks/results/ResultsBlockUtilsTest.java similarity index 70% rename from pinot-core/src/test/java/org/apache/pinot/core/common/datatable/DataTableBuilderUtilsTest.java rename to pinot-core/src/test/java/org/apache/pinot/core/operator/blocks/results/ResultsBlockUtilsTest.java index 7ef912bd6d..75fb625a94 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/common/datatable/DataTableBuilderUtilsTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/operator/blocks/results/ResultsBlockUtilsTest.java @@ -16,12 +16,11 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.core.common.datatable; +package org.apache.pinot.core.operator.blocks.results; import java.io.IOException; import org.apache.pinot.common.datatable.DataTable; import org.apache.pinot.common.utils.DataSchema; -import org.apache.pinot.common.utils.DataSchema.ColumnDataType; import org.apache.pinot.core.common.ObjectSerDeUtils; import org.apache.pinot.core.query.distinct.DistinctTable; import org.apache.pinot.core.query.request.context.QueryContext; @@ -32,27 +31,28 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; -public class DataTableBuilderUtilsTest { +public class ResultsBlockUtilsTest { @Test - public void testBuildEmptyDataTable() + public void testBuildEmptyQueryResults() throws IOException { // Selection QueryContext queryContext = QueryContextConverterUtils.getQueryContext("SELECT * FROM testTable WHERE foo = 'bar'"); - DataTable dataTable = DataTableBuilderUtils.buildEmptyDataTable(queryContext); + DataTable dataTable = ResultsBlockUtils.buildEmptyQueryResults(queryContext).getDataTable(queryContext); DataSchema dataSchema = dataTable.getDataSchema(); assertEquals(dataSchema.getColumnNames(), new String[]{"*"}); - assertEquals(dataSchema.getColumnDataTypes(), new ColumnDataType[]{ColumnDataType.STRING}); + assertEquals(dataSchema.getColumnDataTypes(), new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING}); assertEquals(dataTable.getNumberOfRows(), 0); // Aggregation queryContext = QueryContextConverterUtils.getQueryContext("SELECT COUNT(*), SUM(a), MAX(b) FROM testTable WHERE foo = 'bar'"); - dataTable = DataTableBuilderUtils.buildEmptyDataTable(queryContext); + dataTable = ResultsBlockUtils.buildEmptyQueryResults(queryContext).getDataTable(queryContext); dataSchema = dataTable.getDataSchema(); assertEquals(dataSchema.getColumnNames(), new String[]{"count_star", "sum_a", "max_b"}); - assertEquals(dataSchema.getColumnDataTypes(), - new ColumnDataType[]{ColumnDataType.LONG, ColumnDataType.DOUBLE, ColumnDataType.DOUBLE}); + assertEquals(dataSchema.getColumnDataTypes(), new DataSchema.ColumnDataType[]{ + DataSchema.ColumnDataType.LONG, DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE + }); assertEquals(dataTable.getNumberOfRows(), 1); assertEquals(dataTable.getLong(0, 0), 0L); assertEquals(dataTable.getDouble(0, 1), 0.0); @@ -61,20 +61,21 @@ public class DataTableBuilderUtilsTest { // Group-by queryContext = QueryContextConverterUtils.getQueryContext( "SELECT c, d, COUNT(*), SUM(a), MAX(b) FROM testTable WHERE foo = 'bar' GROUP BY c, d"); - dataTable = DataTableBuilderUtils.buildEmptyDataTable(queryContext); + dataTable = ResultsBlockUtils.buildEmptyQueryResults(queryContext).getDataTable(queryContext); dataSchema = dataTable.getDataSchema(); assertEquals(dataSchema.getColumnNames(), new String[]{"c", "d", "count(*)", "sum(a)", "max(b)"}); - assertEquals(dataSchema.getColumnDataTypes(), new ColumnDataType[]{ - ColumnDataType.STRING, ColumnDataType.STRING, ColumnDataType.LONG, ColumnDataType.DOUBLE, ColumnDataType.DOUBLE + assertEquals(dataSchema.getColumnDataTypes(), new DataSchema.ColumnDataType[]{ + DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.LONG, + DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE }); assertEquals(dataTable.getNumberOfRows(), 0); // Distinct queryContext = QueryContextConverterUtils.getQueryContext("SELECT DISTINCT a, b FROM testTable WHERE foo = 'bar'"); - dataTable = DataTableBuilderUtils.buildEmptyDataTable(queryContext); + dataTable = ResultsBlockUtils.buildEmptyQueryResults(queryContext).getDataTable(queryContext); dataSchema = dataTable.getDataSchema(); assertEquals(dataSchema.getColumnNames(), new String[]{"distinct_a:b"}); - assertEquals(dataSchema.getColumnDataTypes(), new ColumnDataType[]{ColumnDataType.OBJECT}); + assertEquals(dataSchema.getColumnDataTypes(), new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.OBJECT}); assertEquals(dataTable.getNumberOfRows(), 1); DataTable.CustomObject customObject = dataTable.getCustomObject(0, 0); assertNotNull(customObject); @@ -82,6 +83,6 @@ public class DataTableBuilderUtilsTest { assertEquals(distinctTable.size(), 0); assertEquals(distinctTable.getDataSchema().getColumnNames(), new String[]{"a", "b"}); assertEquals(distinctTable.getDataSchema().getColumnDataTypes(), - new ColumnDataType[]{ColumnDataType.STRING, ColumnDataType.STRING}); + new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.STRING}); } } diff --git a/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorExceptionsTest.java b/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorExceptionsTest.java index d9cda7ad6d..1b3f0fa7ad 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorExceptionsTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorExceptionsTest.java @@ -31,12 +31,12 @@ import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.StringUtils; import org.apache.helix.HelixManager; import org.apache.helix.store.zk.ZkHelixPropertyStore; -import org.apache.pinot.common.datatable.DataTable; import org.apache.pinot.common.exception.QueryException; import org.apache.pinot.common.metrics.ServerMetrics; import org.apache.pinot.common.request.InstanceRequest; import org.apache.pinot.core.data.manager.InstanceDataManager; import org.apache.pinot.core.data.manager.offline.TableDataManagerProvider; +import org.apache.pinot.core.operator.blocks.InstanceResponseBlock; import org.apache.pinot.core.query.request.ServerQueryRequest; import org.apache.pinot.segment.local.data.manager.TableDataManager; import org.apache.pinot.segment.local.data.manager.TableDataManagerConfig; @@ -168,7 +168,7 @@ public class QueryExecutorExceptionsTest { String query = "SELECT COUNT(*) FROM " + TABLE_NAME; InstanceRequest instanceRequest = new InstanceRequest(0L, CalciteSqlCompiler.compileToBrokerRequest(query)); instanceRequest.setSearchSegments(_segmentNames); - DataTable instanceResponse = _queryExecutor.processQuery(getQueryRequest(instanceRequest), QUERY_RUNNERS); + InstanceResponseBlock instanceResponse = _queryExecutor.execute(getQueryRequest(instanceRequest), QUERY_RUNNERS); Map<Integer, String> exceptions = instanceResponse.getExceptions(); assertTrue(exceptions.containsKey(QueryException.SERVER_SEGMENT_MISSING_ERROR_CODE)); diff --git a/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java b/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java index a62ec50dbb..4ccef3a050 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java @@ -28,11 +28,12 @@ import org.apache.commons.configuration.PropertiesConfiguration; import org.apache.commons.io.FileUtils; import org.apache.helix.HelixManager; import org.apache.helix.store.zk.ZkHelixPropertyStore; -import org.apache.pinot.common.datatable.DataTable; import org.apache.pinot.common.metrics.ServerMetrics; import org.apache.pinot.common.request.InstanceRequest; import org.apache.pinot.core.data.manager.InstanceDataManager; import org.apache.pinot.core.data.manager.offline.TableDataManagerProvider; +import org.apache.pinot.core.operator.blocks.InstanceResponseBlock; +import org.apache.pinot.core.operator.blocks.results.AggregationResultsBlock; import org.apache.pinot.core.query.request.ServerQueryRequest; import org.apache.pinot.segment.local.data.manager.TableDataManager; import org.apache.pinot.segment.local.data.manager.TableDataManagerConfig; @@ -61,6 +62,8 @@ import org.testng.annotations.Test; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; public class QueryExecutorTest { @@ -84,7 +87,7 @@ public class QueryExecutorTest { throws Exception { // Set up the segments FileUtils.deleteQuietly(INDEX_DIR); - Assert.assertTrue(INDEX_DIR.mkdirs()); + assertTrue(INDEX_DIR.mkdirs()); URL resourceUrl = getClass().getClassLoader().getResource(AVRO_DATA_PATH); Assert.assertNotNull(resourceUrl); File avroFile = new File(resourceUrl.getFile()); @@ -160,8 +163,9 @@ public class QueryExecutorTest { String query = "SELECT COUNT(*) FROM " + TABLE_NAME; InstanceRequest instanceRequest = new InstanceRequest(0L, CalciteSqlCompiler.compileToBrokerRequest(query)); instanceRequest.setSearchSegments(_segmentNames); - DataTable instanceResponse = _queryExecutor.processQuery(getQueryRequest(instanceRequest), QUERY_RUNNERS); - Assert.assertEquals(instanceResponse.getLong(0, 0), 400002L); + InstanceResponseBlock instanceResponse = _queryExecutor.execute(getQueryRequest(instanceRequest), QUERY_RUNNERS); + assertTrue(instanceResponse.getResultsBlock() instanceof AggregationResultsBlock); + assertEquals(((AggregationResultsBlock) instanceResponse.getResultsBlock()).getResults().get(0), 400002L); } @Test @@ -169,8 +173,9 @@ public class QueryExecutorTest { String query = "SELECT SUM(met) FROM " + TABLE_NAME; InstanceRequest instanceRequest = new InstanceRequest(0L, CalciteSqlCompiler.compileToBrokerRequest(query)); instanceRequest.setSearchSegments(_segmentNames); - DataTable instanceResponse = _queryExecutor.processQuery(getQueryRequest(instanceRequest), QUERY_RUNNERS); - Assert.assertEquals(instanceResponse.getDouble(0, 0), 40000200000.0); + InstanceResponseBlock instanceResponse = _queryExecutor.execute(getQueryRequest(instanceRequest), QUERY_RUNNERS); + assertTrue(instanceResponse.getResultsBlock() instanceof AggregationResultsBlock); + assertEquals(((AggregationResultsBlock) instanceResponse.getResultsBlock()).getResults().get(0), 40000200000.0); } @Test @@ -178,8 +183,9 @@ public class QueryExecutorTest { String query = "SELECT MAX(met) FROM " + TABLE_NAME; InstanceRequest instanceRequest = new InstanceRequest(0L, CalciteSqlCompiler.compileToBrokerRequest(query)); instanceRequest.setSearchSegments(_segmentNames); - DataTable instanceResponse = _queryExecutor.processQuery(getQueryRequest(instanceRequest), QUERY_RUNNERS); - Assert.assertEquals(instanceResponse.getDouble(0, 0), 200000.0); + InstanceResponseBlock instanceResponse = _queryExecutor.execute(getQueryRequest(instanceRequest), QUERY_RUNNERS); + assertTrue(instanceResponse.getResultsBlock() instanceof AggregationResultsBlock); + assertEquals(((AggregationResultsBlock) instanceResponse.getResultsBlock()).getResults().get(0), 200000.0); } @Test @@ -187,8 +193,9 @@ public class QueryExecutorTest { String query = "SELECT MIN(met) FROM " + TABLE_NAME; InstanceRequest instanceRequest = new InstanceRequest(0L, CalciteSqlCompiler.compileToBrokerRequest(query)); instanceRequest.setSearchSegments(_segmentNames); - DataTable instanceResponse = _queryExecutor.processQuery(getQueryRequest(instanceRequest), QUERY_RUNNERS); - Assert.assertEquals(instanceResponse.getDouble(0, 0), 0.0); + InstanceResponseBlock instanceResponse = _queryExecutor.execute(getQueryRequest(instanceRequest), QUERY_RUNNERS); + assertTrue(instanceResponse.getResultsBlock() instanceof AggregationResultsBlock); + assertEquals(((AggregationResultsBlock) instanceResponse.getResultsBlock()).getResults().get(0), 0.0); } @AfterClass diff --git a/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/PrioritySchedulerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/PrioritySchedulerTest.java index 527744b795..01e5920aa8 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/PrioritySchedulerTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/PrioritySchedulerTest.java @@ -44,8 +44,8 @@ import org.apache.pinot.common.datatable.DataTableFactory; import org.apache.pinot.common.exception.QueryException; import org.apache.pinot.common.metrics.ServerMetrics; import org.apache.pinot.common.proto.Server; -import org.apache.pinot.core.common.datatable.DataTableBuilderUtils; import org.apache.pinot.core.data.manager.InstanceDataManager; +import org.apache.pinot.core.operator.blocks.InstanceResponseBlock; import org.apache.pinot.core.query.executor.QueryExecutor; import org.apache.pinot.core.query.request.ServerQueryRequest; import org.apache.pinot.core.query.scheduler.resources.PolicyBasedResourceManager; @@ -297,7 +297,7 @@ public class PrioritySchedulerTest { } @Override - public DataTable processQuery(ServerQueryRequest queryRequest, ExecutorService executorService, + public InstanceResponseBlock execute(ServerQueryRequest queryRequest, ExecutorService executorService, @Nullable StreamObserver<Server.ServerResponse> responseObserver) { if (_useBarrier) { try { @@ -306,8 +306,8 @@ public class PrioritySchedulerTest { throw new RuntimeException(e); } } - DataTable result = DataTableBuilderUtils.getEmptyDataTable(); - result.getMetadata().put(MetadataKey.TABLE.getName(), queryRequest.getTableNameWithType()); + InstanceResponseBlock instanceResponse = new InstanceResponseBlock(); + instanceResponse.addMetadata(MetadataKey.TABLE.getName(), queryRequest.getTableNameWithType()); if (_useBarrier) { try { _validationBarrier.await(); @@ -316,7 +316,7 @@ public class PrioritySchedulerTest { } } _numQueries.countDown(); - return result; + return instanceResponse; } } } diff --git a/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java b/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java index a6deca642e..9fdaf7fb7d 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java @@ -27,7 +27,7 @@ import org.apache.pinot.common.datatable.DataTable.MetadataKey; import org.apache.pinot.common.metrics.BrokerMetrics; import org.apache.pinot.common.metrics.ServerMetrics; import org.apache.pinot.common.request.BrokerRequest; -import org.apache.pinot.core.common.datatable.DataTableBuilderUtils; +import org.apache.pinot.core.common.datatable.DataTableBuilderFactory; import org.apache.pinot.core.query.scheduler.QueryScheduler; import org.apache.pinot.core.transport.server.routing.stats.ServerRoutingStatsManager; import org.apache.pinot.server.access.AccessControl; @@ -85,7 +85,7 @@ public class QueryRoutingTest { public void testValidResponse() throws Exception { long requestId = 123; - DataTable dataTable = DataTableBuilderUtils.getEmptyDataTable(); + DataTable dataTable = DataTableBuilderFactory.getEmptyDataTable(); dataTable.getMetadata().put(MetadataKey.REQUEST_ID.getName(), Long.toString(requestId)); byte[] responseBytes = dataTable.toBytes(); @@ -163,7 +163,7 @@ public class QueryRoutingTest { public void testNonMatchingRequestId() throws Exception { long requestId = 123; - DataTable dataTable = DataTableBuilderUtils.getEmptyDataTable(); + DataTable dataTable = DataTableBuilderFactory.getEmptyDataTable(); dataTable.getMetadata().put(MetadataKey.REQUEST_ID.getName(), Long.toString(requestId)); byte[] responseBytes = dataTable.toBytes(); @@ -196,7 +196,7 @@ public class QueryRoutingTest { // To avoid flakyness, set timeoutMs to 2000 msec. For some test runs, it can take up to // 1400 msec to mark request as failed. long timeoutMs = 2000L; - DataTable dataTable = DataTableBuilderUtils.getEmptyDataTable(); + DataTable dataTable = DataTableBuilderFactory.getEmptyDataTable(); dataTable.getMetadata().put(MetadataKey.REQUEST_ID.getName(), Long.toString(requestId)); byte[] responseBytes = dataTable.toBytes(); diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/BaseQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/BaseQueriesTest.java index d449ec5f8b..e44d40d70f 100644 --- a/pinot-core/src/test/java/org/apache/pinot/queries/BaseQueriesTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/queries/BaseQueriesTest.java @@ -32,6 +32,7 @@ import org.apache.pinot.common.request.BrokerRequest; import org.apache.pinot.common.request.PinotQuery; import org.apache.pinot.common.response.broker.BrokerResponseNative; import org.apache.pinot.core.common.Operator; +import org.apache.pinot.core.operator.blocks.InstanceResponseBlock; import org.apache.pinot.core.plan.Plan; import org.apache.pinot.core.plan.maker.InstancePlanMakerImplV2; import org.apache.pinot.core.plan.maker.PlanMaker; @@ -197,10 +198,10 @@ public abstract class BaseQueriesTest { // Server side serverQueryContext.setEndTimeMs(System.currentTimeMillis() + Server.DEFAULT_QUERY_EXECUTOR_TIMEOUT_MS); Plan plan = planMaker.makeInstancePlan(getIndexSegments(), serverQueryContext, EXECUTOR_SERVICE, null); - DataTable instanceResponse; + InstanceResponseBlock instanceResponse; try { instanceResponse = - queryContext.isExplain() ? ServerQueryExecutorV1Impl.processExplainPlanQueries(plan) : plan.execute(); + queryContext.isExplain() ? ServerQueryExecutorV1Impl.executeExplainQuery(plan, queryContext) : plan.execute(); } catch (TimeoutException e) { throw new RuntimeException(e); } @@ -212,7 +213,7 @@ public abstract class BaseQueriesTest { Map<ServerRoutingInstance, DataTable> dataTableMap = new HashMap<>(); try { // For multi-threaded BrokerReduceService, we cannot reuse the same data-table - byte[] serializedResponse = instanceResponse.toBytes(); + byte[] serializedResponse = instanceResponse.toDataTable().toBytes(); dataTableMap.put(new ServerRoutingInstance("localhost", 1234, TableType.OFFLINE), DataTableFactory.getDataTable(serializedResponse)); dataTableMap.put(new ServerRoutingInstance("localhost", 1234, TableType.REALTIME), @@ -270,17 +271,17 @@ public abstract class BaseQueriesTest { Plan plan1 = planMaker.makeInstancePlan(instances.get(0), serverQueryContext, EXECUTOR_SERVICE, null); Plan plan2 = planMaker.makeInstancePlan(instances.get(1), serverQueryContext, EXECUTOR_SERVICE, null); - DataTable instanceResponse1; + InstanceResponseBlock instanceResponse1; try { - instanceResponse1 = - queryContext.isExplain() ? ServerQueryExecutorV1Impl.processExplainPlanQueries(plan1) : plan1.execute(); + instanceResponse1 = queryContext.isExplain() ? ServerQueryExecutorV1Impl.executeExplainQuery(plan1, queryContext) + : plan1.execute(); } catch (TimeoutException e) { throw new RuntimeException(e); } - DataTable instanceResponse2; + InstanceResponseBlock instanceResponse2; try { - instanceResponse2 = - queryContext.isExplain() ? ServerQueryExecutorV1Impl.processExplainPlanQueries(plan2) : plan2.execute(); + instanceResponse2 = queryContext.isExplain() ? ServerQueryExecutorV1Impl.executeExplainQuery(plan2, queryContext) + : plan2.execute(); } catch (TimeoutException e) { throw new RuntimeException(e); } @@ -292,8 +293,8 @@ public abstract class BaseQueriesTest { Map<ServerRoutingInstance, DataTable> dataTableMap = new HashMap<>(); try { // For multi-threaded BrokerReduceService, we cannot reuse the same data-table - byte[] serializedResponse1 = instanceResponse1.toBytes(); - byte[] serializedResponse2 = instanceResponse2.toBytes(); + byte[] serializedResponse1 = instanceResponse1.toDataTable().toBytes(); + byte[] serializedResponse2 = instanceResponse2.toDataTable().toBytes(); dataTableMap.put(new ServerRoutingInstance("localhost", 1234, TableType.OFFLINE), DataTableFactory.getDataTable(serializedResponse1)); dataTableMap.put(new ServerRoutingInstance("localhost", 1234, TableType.REALTIME), diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/ExplainPlanQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/ExplainPlanQueriesTest.java index ecd0745ec9..b4f8aafa1e 100644 --- a/pinot-core/src/test/java/org/apache/pinot/queries/ExplainPlanQueriesTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/queries/ExplainPlanQueriesTest.java @@ -44,6 +44,7 @@ import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.core.common.ExplainPlanRows; import org.apache.pinot.core.data.manager.InstanceDataManager; import org.apache.pinot.core.data.manager.offline.TableDataManagerProvider; +import org.apache.pinot.core.operator.blocks.InstanceResponseBlock; import org.apache.pinot.core.query.executor.QueryExecutor; import org.apache.pinot.core.query.executor.ServerQueryExecutorV1Impl; import org.apache.pinot.core.query.reduce.BrokerReduceService; @@ -316,11 +317,11 @@ public class ExplainPlanQueriesTest extends BaseQueriesTest { InstanceRequest instanceRequest1 = new InstanceRequest(0L, brokerRequest); instanceRequest1.setSearchSegments(indexSegmentsForServer1); - DataTable instanceResponse1 = _queryExecutor.processQuery(getQueryRequest(instanceRequest1), QUERY_RUNNERS); + InstanceResponseBlock instanceResponse1 = _queryExecutor.execute(getQueryRequest(instanceRequest1), QUERY_RUNNERS); InstanceRequest instanceRequest2 = new InstanceRequest(0L, brokerRequest); instanceRequest2.setSearchSegments(indexSegmentsForServer2); - DataTable instanceResponse2 = _queryExecutor.processQuery(getQueryRequest(instanceRequest2), QUERY_RUNNERS); + InstanceResponseBlock instanceResponse2 = _queryExecutor.execute(getQueryRequest(instanceRequest2), QUERY_RUNNERS); // Broker side // Use 2 Threads for 2 data-tables @@ -329,10 +330,10 @@ public class ExplainPlanQueriesTest extends BaseQueriesTest { Map<ServerRoutingInstance, DataTable> dataTableMap = new HashMap<>(); try { // For multi-threaded BrokerReduceService, we cannot reuse the same data-table - byte[] serializedResponse1 = instanceResponse1.toBytes(); + byte[] serializedResponse1 = instanceResponse1.toDataTable().toBytes(); dataTableMap.put(new ServerRoutingInstance("localhost", 1234, TableType.OFFLINE), DataTableFactory.getDataTable(serializedResponse1)); - byte[] serializedResponse2 = instanceResponse2.toBytes(); + byte[] serializedResponse2 = instanceResponse2.toDataTable().toBytes(); dataTableMap.put(new ServerRoutingInstance("localhost", 1234, TableType.REALTIME), DataTableFactory.getDataTable(serializedResponse2)); } catch (Exception e) { diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/SegmentWithNullValueVectorTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/SegmentWithNullValueVectorTest.java index 682cfa5716..807d27a1ad 100644 --- a/pinot-core/src/test/java/org/apache/pinot/queries/SegmentWithNullValueVectorTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/queries/SegmentWithNullValueVectorTest.java @@ -35,11 +35,12 @@ import org.apache.commons.configuration.PropertiesConfiguration; import org.apache.commons.io.FileUtils; import org.apache.helix.HelixManager; import org.apache.helix.store.zk.ZkHelixPropertyStore; -import org.apache.pinot.common.datatable.DataTable; import org.apache.pinot.common.metrics.ServerMetrics; import org.apache.pinot.common.request.InstanceRequest; import org.apache.pinot.core.data.manager.InstanceDataManager; import org.apache.pinot.core.data.manager.offline.TableDataManagerProvider; +import org.apache.pinot.core.operator.blocks.InstanceResponseBlock; +import org.apache.pinot.core.operator.blocks.results.AggregationResultsBlock; import org.apache.pinot.core.query.executor.QueryExecutor; import org.apache.pinot.core.query.executor.ServerQueryExecutorV1Impl; import org.apache.pinot.core.query.request.ServerQueryRequest; @@ -74,6 +75,8 @@ import org.testng.annotations.Test; import static org.apache.pinot.segment.local.segment.index.creator.RawIndexCreatorTest.getRandomValue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; /** @@ -107,8 +110,8 @@ public class SegmentWithNullValueVectorTest { private static final String TABLE_NAME = "testTable"; private static final String QUERY_EXECUTOR_CONFIG_PATH = "conf/query-executor.properties"; private static final ExecutorService QUERY_RUNNERS = Executors.newFixedThreadPool(20); - private int _nullIntKeyCount = 0; - private int _longKeyCount = 0; + private long _nullIntKeyCount = 0; + private long _longKeyCount = 0; /** * Setup to build a segment with raw indexes (no-dictionary) of various data types. @@ -251,7 +254,7 @@ public class SegmentWithNullValueVectorTest { for (int i = 0; i < NUM_ROWS; i++) { for (FieldSpec fieldSpec : _schema.getAllFieldSpecs()) { String colName = fieldSpec.getName(); - Assert.assertEquals(_actualNullVectorMap.get(colName)[i], nullValueVectorReaderMap.get(colName).isNull(i)); + assertEquals(_actualNullVectorMap.get(colName)[i], nullValueVectorReaderMap.get(colName).isNull(i)); } } } @@ -261,8 +264,10 @@ public class SegmentWithNullValueVectorTest { String query = "SELECT COUNT(*) FROM " + TABLE_NAME + " where " + INT_COLUMN + " IS NOT NULL"; InstanceRequest instanceRequest = new InstanceRequest(0L, CalciteSqlCompiler.compileToBrokerRequest(query)); instanceRequest.setSearchSegments(_segmentNames); - DataTable instanceResponse = _queryExecutor.processQuery(getQueryRequest(instanceRequest), QUERY_RUNNERS); - Assert.assertEquals(instanceResponse.getLong(0, 0), NUM_ROWS - _nullIntKeyCount); + InstanceResponseBlock instanceResponse = _queryExecutor.execute(getQueryRequest(instanceRequest), QUERY_RUNNERS); + assertTrue(instanceResponse.getResultsBlock() instanceof AggregationResultsBlock); + assertEquals(((AggregationResultsBlock) instanceResponse.getResultsBlock()).getResults().get(0), + NUM_ROWS - _nullIntKeyCount); } @Test @@ -270,8 +275,9 @@ public class SegmentWithNullValueVectorTest { String query = "SELECT COUNT(*) FROM " + TABLE_NAME + " where " + INT_COLUMN + " IS NULL"; InstanceRequest instanceRequest = new InstanceRequest(0L, CalciteSqlCompiler.compileToBrokerRequest(query)); instanceRequest.setSearchSegments(_segmentNames); - DataTable instanceResponse = _queryExecutor.processQuery(getQueryRequest(instanceRequest), QUERY_RUNNERS); - Assert.assertEquals(instanceResponse.getLong(0, 0), _nullIntKeyCount); + InstanceResponseBlock instanceResponse = _queryExecutor.execute(getQueryRequest(instanceRequest), QUERY_RUNNERS); + assertTrue(instanceResponse.getResultsBlock() instanceof AggregationResultsBlock); + assertEquals(((AggregationResultsBlock) instanceResponse.getResultsBlock()).getResults().get(0), _nullIntKeyCount); } @Test @@ -281,8 +287,9 @@ public class SegmentWithNullValueVectorTest { + LONG_VALUE_THRESHOLD; InstanceRequest instanceRequest = new InstanceRequest(0L, CalciteSqlCompiler.compileToBrokerRequest(query)); instanceRequest.setSearchSegments(_segmentNames); - DataTable instanceResponse = _queryExecutor.processQuery(getQueryRequest(instanceRequest), QUERY_RUNNERS); - Assert.assertEquals(instanceResponse.getLong(0, 0), _longKeyCount); + InstanceResponseBlock instanceResponse = _queryExecutor.execute(getQueryRequest(instanceRequest), QUERY_RUNNERS); + assertTrue(instanceResponse.getResultsBlock() instanceof AggregationResultsBlock); + assertEquals(((AggregationResultsBlock) instanceResponse.getResultsBlock()).getResults().get(0), _longKeyCount); } private ServerQueryRequest getQueryRequest(InstanceRequest instanceRequest) { diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java index c26ea3bffe..a19658c586 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java @@ -37,6 +37,7 @@ import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.core.common.Operator; import org.apache.pinot.core.data.manager.InstanceDataManager; import org.apache.pinot.core.operator.BaseOperator; +import org.apache.pinot.core.operator.blocks.InstanceResponseBlock; import org.apache.pinot.core.query.executor.ServerQueryExecutorV1Impl; import org.apache.pinot.core.query.request.ServerQueryRequest; import org.apache.pinot.core.transport.ServerInstance; @@ -139,18 +140,17 @@ public class QueryRunner { } } - private BaseDataBlock processServerQuery(ServerQueryRequest serverQueryRequest, - ExecutorService executorService) { + private BaseDataBlock processServerQuery(ServerQueryRequest serverQueryRequest, ExecutorService executorService) { BaseDataBlock dataBlock; try { - DataTable dataTable = _serverExecutor.processQuery(serverQueryRequest, executorService, null); - if (!dataTable.getExceptions().isEmpty()) { + InstanceResponseBlock instanceResponse = _serverExecutor.execute(serverQueryRequest, executorService); + if (!instanceResponse.getExceptions().isEmpty()) { // if contains exception, directly return a metadata block with the exceptions. - dataBlock = DataBlockUtils.getErrorDataBlock(dataTable.getExceptions()); + dataBlock = DataBlockUtils.getErrorDataBlock(instanceResponse.getExceptions()); } else { // this works because default DataTableImplV3 will have a version number at beginning: // the new DataBlock encodes lower 16 bits as version and upper 16 bits as type (ROW, COLUMNAR, METADATA) - dataBlock = DataBlockUtils.getDataBlock(ByteBuffer.wrap(dataTable.toBytes())); + dataBlock = DataBlockUtils.getDataBlock(ByteBuffer.wrap(instanceResponse.toDataTable().toBytes())); } } catch (Exception e) { dataBlock = DataBlockUtils.getErrorDataBlock(e); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org