This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 784860e3b4 Remove BrokerResponse from QueryContext (#8643) 784860e3b4 is described below commit 784860e3b4b096c5233d051d9dbff62ec7310665 Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com> AuthorDate: Wed May 4 18:16:23 2022 -0700 Remove BrokerResponse from QueryContext (#8643) --- .../pinot/sql/parsers/CalciteSqlCompiler.java | 5 +- .../controller/recommender/io/InputManager.java | 14 +- .../query/executor/ServerQueryExecutorV1Impl.java | 25 ++-- .../core/query/reduce/BrokerReduceService.java | 8 +- .../core/query/reduce/ResultReducerFactory.java | 4 +- .../core/query/reduce/StreamingReduceService.java | 11 +- .../core/query/request/ServerQueryRequest.java | 16 +-- .../core/query/request/context/QueryContext.java | 98 ++++++------- .../BrokerRequestToQueryContextConverter.java | 157 --------------------- .../context/utils/QueryContextConverterUtils.java | 134 +++++++++++++++++- .../plan/maker/QueryOverrideWithHintsTest.java | 20 ++- .../org/apache/pinot/queries/BaseQueriesTest.java | 38 +++-- .../tests/ClusterIntegrationTestUtils.java | 11 +- .../apache/pinot/spi/utils/CommonConstants.java | 1 - 14 files changed, 252 insertions(+), 290 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/sql/parsers/CalciteSqlCompiler.java b/pinot-common/src/main/java/org/apache/pinot/sql/parsers/CalciteSqlCompiler.java index 2b1a69aa0f..f6e36aebc8 100644 --- a/pinot-common/src/main/java/org/apache/pinot/sql/parsers/CalciteSqlCompiler.java +++ b/pinot-common/src/main/java/org/apache/pinot/sql/parsers/CalciteSqlCompiler.java @@ -29,7 +29,10 @@ public class CalciteSqlCompiler { } public static BrokerRequest compileToBrokerRequest(String query) { - PinotQuery pinotQuery = CalciteSqlParser.compileToPinotQuery(query); + return convertToBrokerRequest(CalciteSqlParser.compileToPinotQuery(query)); + } + + public static BrokerRequest convertToBrokerRequest(PinotQuery pinotQuery) { BrokerRequest brokerRequest = new BrokerRequest(); brokerRequest.setPinotQuery(pinotQuery); // Set table name in broker request because it is used for access control, query routing etc. diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/io/InputManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/io/InputManager.java index ffd7f2d52b..dc5deab407 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/io/InputManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/recommender/io/InputManager.java @@ -36,6 +36,7 @@ import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.lang3.tuple.Triple; import org.apache.pinot.common.request.BrokerRequest; +import org.apache.pinot.common.request.PinotQuery; import org.apache.pinot.controller.recommender.exceptions.InvalidInputException; import org.apache.pinot.controller.recommender.io.metadata.FieldMetadata; import org.apache.pinot.controller.recommender.io.metadata.SchemaWithMetaData; @@ -51,7 +52,7 @@ import org.apache.pinot.controller.recommender.rules.io.params.SegmentSizeRulePa import org.apache.pinot.controller.recommender.rules.utils.FixedLenBitset; import org.apache.pinot.core.query.optimizer.QueryOptimizer; import org.apache.pinot.core.query.request.context.QueryContext; -import org.apache.pinot.core.query.request.context.utils.BrokerRequestToQueryContextConverter; +import org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils; import org.apache.pinot.segment.local.utils.SchemaUtils; import org.apache.pinot.spi.data.DateTimeFieldSpec; import org.apache.pinot.spi.data.DimensionFieldSpec; @@ -59,6 +60,7 @@ import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.spi.data.MetricFieldSpec; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.sql.parsers.CalciteSqlCompiler; +import org.apache.pinot.sql.parsers.CalciteSqlParser; import org.apache.pinot.sql.parsers.SqlCompilationException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -166,10 +168,12 @@ public class InputManager { List<String> invalidQueries = new LinkedList<>(); for (String queryString : _queryWeightMap.keySet()) { try { - BrokerRequest brokerRequest = CalciteSqlCompiler.compileToBrokerRequest(queryString); - _queryOptimizer.optimize(brokerRequest.getPinotQuery(), _schema); - QueryContext queryContext = BrokerRequestToQueryContextConverter.convert(brokerRequest); - _parsedQueries.put(queryString, Triple.of(_queryWeightMap.get(queryString), brokerRequest, queryContext)); + PinotQuery pinotQuery = CalciteSqlParser.compileToPinotQuery(queryString); + _queryOptimizer.optimize(pinotQuery, _schema); + QueryContext queryContext = QueryContextConverterUtils.getQueryContext(pinotQuery); + _parsedQueries.put(queryString, + Triple.of(_queryWeightMap.get(queryString), CalciteSqlCompiler.convertToBrokerRequest(pinotQuery), + queryContext)); } catch (SqlCompilationException e) { invalidQueries.add(queryString); _overWrittenConfigs.getFlaggedQueries().add(queryString, ERROR_INVALID_QUERY); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java index bd88fcd736..200692daf9 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java @@ -160,9 +160,9 @@ public class ServerQueryExecutorV1Impl implements QueryExecutor { long querySchedulingTimeMs = System.currentTimeMillis() - queryArrivalTimeMs; if (querySchedulingTimeMs >= queryTimeoutMs) { _serverMetrics.addMeteredTableValue(tableNameWithType, ServerMeter.SCHEDULING_TIMEOUT_EXCEPTIONS, 1); - String errorMessage = String - .format("Query scheduling took %dms (longer than query timeout of %dms) on server: %s", querySchedulingTimeMs, - queryTimeoutMs, _instanceDataManager.getInstanceId()); + String errorMessage = + String.format("Query scheduling took %dms (longer than query timeout of %dms) on server: %s", + querySchedulingTimeMs, queryTimeoutMs, _instanceDataManager.getInstanceId()); DataTable dataTable = DataTableBuilder.getEmptyDataTable(); dataTable.addException(QueryException.getException(QueryException.QUERY_SCHEDULING_TIMEOUT_ERROR, errorMessage)); LOGGER.error("{} while processing requestId: {}", errorMessage, requestId); @@ -171,8 +171,8 @@ public class ServerQueryExecutorV1Impl implements QueryExecutor { TableDataManager tableDataManager = _instanceDataManager.getTableDataManager(tableNameWithType); if (tableDataManager == null) { - String errorMessage = String - .format("Failed to find table: %s on server: %s", tableNameWithType, _instanceDataManager.getInstanceId()); + String errorMessage = String.format("Failed to find table: %s on server: %s", tableNameWithType, + _instanceDataManager.getInstanceId()); DataTable dataTable = DataTableBuilder.getEmptyDataTable(); dataTable.addException(QueryException.getException(QueryException.SERVER_TABLE_MISSING_ERROR, errorMessage)); LOGGER.error("{} while processing requestId: {}", errorMessage, requestId); @@ -210,7 +210,7 @@ public class ServerQueryExecutorV1Impl implements QueryExecutor { DataTable dataTable = null; try { dataTable = processQuery(indexSegments, queryContext, timerContext, executorService, responseObserver, - queryRequest.isEnableStreaming(), queryRequest.isExplain()); + queryRequest.isEnableStreaming()); } catch (Exception e) { _serverMetrics.addMeteredTableValue(tableNameWithType, ServerMeter.QUERY_EXECUTION_EXCEPTIONS, 1); @@ -271,7 +271,7 @@ public class ServerQueryExecutorV1Impl implements QueryExecutor { private DataTable processQuery(List<IndexSegment> indexSegments, QueryContext queryContext, TimerContext timerContext, ExecutorService executorService, @Nullable StreamObserver<Server.ServerResponse> responseObserver, - boolean enableStreaming, boolean isExplain) + boolean enableStreaming) throws Exception { handleSubquery(queryContext, indexSegments, timerContext, executorService); @@ -288,7 +288,7 @@ public class ServerQueryExecutorV1Impl implements QueryExecutor { LOGGER.debug("Matched {} segments after pruning", numSelectedSegments); if (numSelectedSegments == 0) { // Only return metadata for streaming query - if (isExplain) { + if (queryContext.isExplain()) { return EXPLAIN_PLAN_RESULTS_NO_MATCHING_SEGMENT; } @@ -309,7 +309,7 @@ public class ServerQueryExecutorV1Impl implements QueryExecutor { planBuildTimer.stopAndRecord(); TimerContext.Timer planExecTimer = timerContext.startNewPhaseTimer(ServerQueryPhase.QUERY_PLAN_EXECUTION); - DataTable dataTable = isExplain ? processExplainPlanQueries(queryPlan) : queryPlan.execute(); + DataTable dataTable = queryContext.isExplain() ? processExplainPlanQueries(queryPlan) : queryPlan.execute(); planExecTimer.stopAndRecord(); // Update the total docs in the metadata based on the un-pruned segments @@ -349,8 +349,8 @@ public class ServerQueryExecutorV1Impl implements QueryExecutor { } /** Create EXPLAIN query result {@link DataTable} by recursively stepping through the {@link Operator} tree. */ - public static void addOperatorToTable(DataTableBuilder dataTableBuilder, Operator node, int[] globalId, - int parentId) throws IOException { + public static void addOperatorToTable(DataTableBuilder dataTableBuilder, Operator node, int[] globalId, int parentId) + throws IOException { if (node == null) { return; } @@ -434,8 +434,7 @@ public class ServerQueryExecutorV1Impl implements QueryExecutor { subqueryExpression.getLiteral()); // Execute the subquery subquery.setEndTimeMs(endTimeMs); - DataTable dataTable = processQuery(indexSegments, subquery, timerContext, executorService, null, - false, false); + DataTable dataTable = processQuery(indexSegments, subquery, timerContext, executorService, null, false); IdSet idSet = dataTable.getObject(0, 0); String serializedIdSet = idSet.toBase64String(); // Rewrite the expression diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java index 4a635b613a..82a1f6022c 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java @@ -28,7 +28,7 @@ import org.apache.pinot.common.response.broker.BrokerResponseNative; import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.common.utils.DataTable; import org.apache.pinot.core.query.request.context.QueryContext; -import org.apache.pinot.core.query.request.context.utils.BrokerRequestToQueryContextConverter; +import org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils; import org.apache.pinot.core.transport.ServerRoutingInstance; import org.apache.pinot.core.util.GapfillUtils; import org.apache.pinot.spi.env.PinotConfiguration; @@ -53,7 +53,7 @@ public class BrokerReduceService extends BaseReduceService { return BrokerResponseNative.empty(); } - Map<String, String> queryOptions = serverBrokerRequest.getPinotQuery().getQueryOptions(); + Map<String, String> queryOptions = brokerRequest.getPinotQuery().getQueryOptions(); boolean enableTrace = queryOptions != null && Boolean.parseBoolean(queryOptions.get(CommonConstants.Broker.Request.TRACE)); @@ -101,7 +101,7 @@ public class BrokerReduceService extends BaseReduceService { return brokerResponseNative; } - QueryContext serverQueryContext = BrokerRequestToQueryContextConverter.convert(serverBrokerRequest); + QueryContext serverQueryContext = QueryContextConverterUtils.getQueryContext(serverBrokerRequest.getPinotQuery()); DataTableReducer dataTableReducer = ResultReducerFactory.getResultReducer(serverQueryContext); dataTableReducer.reduceAndSetResults(rawTableName, cachedDataSchema, dataTableMap, brokerResponseNative, new DataTableReducerContext(_reduceExecutorService, _maxReduceThreadsPerQuery, reduceTimeOutMs, @@ -110,7 +110,7 @@ public class BrokerReduceService extends BaseReduceService { if (brokerRequest == serverBrokerRequest) { queryContext = serverQueryContext; } else { - queryContext = BrokerRequestToQueryContextConverter.convert(brokerRequest); + queryContext = QueryContextConverterUtils.getQueryContext(brokerRequest.getPinotQuery()); GapfillUtils.GapfillType gapfillType = GapfillUtils.getGapfillType(queryContext); if (gapfillType != null) { GapfillProcessor gapfillProcessor = new GapfillProcessor(queryContext, gapfillType); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ResultReducerFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ResultReducerFactory.java index 7d0678f4ba..53f242f7ca 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ResultReducerFactory.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ResultReducerFactory.java @@ -18,7 +18,6 @@ */ package org.apache.pinot.core.query.reduce; -import org.apache.pinot.common.request.BrokerRequest; import org.apache.pinot.core.query.aggregation.function.AggregationFunction; import org.apache.pinot.core.query.aggregation.function.DistinctAggregationFunction; import org.apache.pinot.core.query.request.context.QueryContext; @@ -38,8 +37,7 @@ public final class ResultReducerFactory { * Constructs the right result reducer based on the given query context. */ public static DataTableReducer getResultReducer(QueryContext queryContext) { - BrokerRequest brokerRequest = queryContext.getBrokerRequest(); - if (brokerRequest != null && brokerRequest.getPinotQuery() != null && brokerRequest.getPinotQuery().isExplain()) { + if (queryContext.isExplain()) { return new ExplainPlanDataTableReducer(queryContext); } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/StreamingReduceService.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/StreamingReduceService.java index ed3e13b402..c1f6ef8d23 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/StreamingReduceService.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/StreamingReduceService.java @@ -30,11 +30,12 @@ import javax.annotation.concurrent.ThreadSafe; import org.apache.pinot.common.metrics.BrokerMetrics; import org.apache.pinot.common.proto.Server; import org.apache.pinot.common.request.BrokerRequest; +import org.apache.pinot.common.request.PinotQuery; import org.apache.pinot.common.response.broker.BrokerResponseNative; import org.apache.pinot.common.utils.DataTable; import org.apache.pinot.core.common.datatable.DataTableFactory; import org.apache.pinot.core.query.request.context.QueryContext; -import org.apache.pinot.core.query.request.context.utils.BrokerRequestToQueryContextConverter; +import org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils; import org.apache.pinot.core.transport.ServerRoutingInstance; import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.utils.CommonConstants; @@ -65,13 +66,13 @@ public class StreamingReduceService extends BaseReduceService { } // prepare contextual info for reduce. - Map<String, String> queryOptions = brokerRequest.getPinotQuery().getQueryOptions(); + PinotQuery pinotQuery = brokerRequest.getPinotQuery(); + Map<String, String> queryOptions = pinotQuery.getQueryOptions(); boolean enableTrace = queryOptions != null && Boolean.parseBoolean(queryOptions.get(CommonConstants.Broker.Request.TRACE)); - QueryContext queryContext = BrokerRequestToQueryContextConverter.convert(brokerRequest); - - String tableName = brokerRequest.getQuerySource().getTableName(); + QueryContext queryContext = QueryContextConverterUtils.getQueryContext(pinotQuery); + String tableName = queryContext.getTableName(); String rawTableName = TableNameBuilder.extractRawTableName(tableName); // initialize empty response. diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/request/ServerQueryRequest.java b/pinot-core/src/main/java/org/apache/pinot/core/query/request/ServerQueryRequest.java index 2c685d070e..09bfa4ad2f 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/request/ServerQueryRequest.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/request/ServerQueryRequest.java @@ -26,7 +26,7 @@ import org.apache.pinot.common.request.BrokerRequest; import org.apache.pinot.common.request.InstanceRequest; import org.apache.pinot.core.query.request.context.QueryContext; import org.apache.pinot.core.query.request.context.TimerContext; -import org.apache.pinot.core.query.request.context.utils.BrokerRequestToQueryContextConverter; +import org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils; import org.apache.pinot.spi.utils.CommonConstants.Query.Request; import org.apache.pinot.sql.parsers.CalciteSqlCompiler; import org.apache.thrift.TDeserializer; @@ -46,7 +46,6 @@ public class ServerQueryRequest { private final boolean _enableStreaming; private final List<String> _segmentsToQuery; private final QueryContext _queryContext; - private final boolean _explain; // Timing information for different phases of query execution private final TimerContext _timerContext; @@ -57,11 +56,7 @@ public class ServerQueryRequest { _enableTrace = instanceRequest.isEnableTrace(); _enableStreaming = false; _segmentsToQuery = instanceRequest.getSearchSegments(); - BrokerRequest brokerRequest = instanceRequest.getQuery(); - _queryContext = BrokerRequestToQueryContextConverter.convert(brokerRequest); - _explain = - brokerRequest != null && brokerRequest.getPinotQuery() != null ? brokerRequest.getPinotQuery().isExplain() - : false; + _queryContext = QueryContextConverterUtils.getQueryContext(instanceRequest.getQuery().getPinotQuery()); _timerContext = new TimerContext(_queryContext.getTableName(), serverMetrics, queryArrivalTimeMs); } @@ -88,9 +83,8 @@ public class ServerQueryRequest { } else { throw new UnsupportedOperationException("Unsupported payloadType: " + payloadType); } - _queryContext = BrokerRequestToQueryContextConverter.convert(brokerRequest); + _queryContext = QueryContextConverterUtils.getQueryContext(brokerRequest.getPinotQuery()); _timerContext = new TimerContext(_queryContext.getTableName(), serverMetrics, queryArrivalTimeMs); - _explain = Boolean.parseBoolean(metadata.get(Request.MetadataKeys.EXPLAIN)); } public long getRequestId() { @@ -124,8 +118,4 @@ public class ServerQueryRequest { public TimerContext getTimerContext() { return _timerContext; } - - public boolean isExplain() { - return _explain; - } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/QueryContext.java b/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/QueryContext.java index 1076a8504f..fe9311e928 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/QueryContext.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/QueryContext.java @@ -30,7 +30,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.function.Function; import javax.annotation.Nullable; import org.apache.commons.lang3.tuple.Pair; -import org.apache.pinot.common.request.BrokerRequest; import org.apache.pinot.common.request.context.ExpressionContext; import org.apache.pinot.common.request.context.FilterContext; import org.apache.pinot.common.request.context.FunctionContext; @@ -71,6 +70,7 @@ import org.apache.pinot.core.util.MemoizedClassAssociation; @SuppressWarnings({"rawtypes", "unchecked"}) public class QueryContext { private final String _tableName; + private final QueryContext _subquery; private final List<ExpressionContext> _selectExpressions; private final List<String> _aliasList; private final FilterContext _filter; @@ -82,20 +82,15 @@ public class QueryContext { private final Map<String, String> _queryOptions; private final Map<String, String> _debugOptions; private final Map<ExpressionContext, ExpressionContext> _expressionOverrideHints; - - // Keep the BrokerRequest to make incremental changes - // TODO: Remove it once the whole query engine is using the QueryContext - private final BrokerRequest _brokerRequest; - private final QueryContext _subquery; + private final boolean _explain; private final Function<Class<?>, Map<?, ?>> _sharedValues = MemoizedClassAssociation.of(ConcurrentHashMap::new); // Pre-calculate the aggregation functions and columns for the query so that it can be shared across all the segments private AggregationFunction[] _aggregationFunctions; - private List<Pair<AggregationFunction, FilterContext>> _filteredAggregationFunctions; - private Map<FunctionContext, Integer> _aggregationFunctionIndexMap; private boolean _hasFilteredAggregations; + private List<Pair<AggregationFunction, FilterContext>> _filteredAggregationFunctions; private Map<Pair<FunctionContext, FilterContext>, Integer> _filteredAggregationsIndexMap; private Set<String> _columns; @@ -120,13 +115,14 @@ public class QueryContext { // Trim threshold to use for server combine for SQL GROUP BY private int _groupTrimThreshold = InstancePlanMakerImplV2.DEFAULT_GROUPBY_TRIM_THRESHOLD; - private QueryContext(String tableName, List<ExpressionContext> selectExpressions, List<String> aliasList, - @Nullable FilterContext filter, @Nullable List<ExpressionContext> groupByExpressions, - @Nullable FilterContext havingFilter, @Nullable List<OrderByExpressionContext> orderByExpressions, int limit, - int offset, Map<String, String> queryOptions, @Nullable Map<String, String> debugOptions, - BrokerRequest brokerRequest, QueryContext subquery, - @Nullable Map<ExpressionContext, ExpressionContext> expressionOverrideHints) { + private QueryContext(@Nullable String tableName, @Nullable QueryContext subquery, + List<ExpressionContext> selectExpressions, List<String> aliasList, @Nullable FilterContext filter, + @Nullable List<ExpressionContext> groupByExpressions, @Nullable FilterContext havingFilter, + @Nullable List<OrderByExpressionContext> orderByExpressions, int limit, int offset, + Map<String, String> queryOptions, @Nullable Map<String, String> debugOptions, + @Nullable Map<ExpressionContext, ExpressionContext> expressionOverrideHints, boolean explain) { _tableName = tableName; + _subquery = subquery; _selectExpressions = selectExpressions; _aliasList = Collections.unmodifiableList(aliasList); _filter = filter; @@ -137,18 +133,26 @@ public class QueryContext { _offset = offset; _queryOptions = queryOptions; _debugOptions = debugOptions; - _brokerRequest = brokerRequest; - _subquery = subquery; _expressionOverrideHints = expressionOverrideHints; + _explain = explain; } /** * Returns the table name. + * NOTE: on the broker side, table name might be {@code null} when subquery is available. */ public String getTableName() { return _tableName; } + /** + * Returns the subquery. + */ + @Nullable + public QueryContext getSubquery() { + return _subquery; + } + /** * Returns a list of expressions in the SELECT clause. */ @@ -195,10 +199,6 @@ public class QueryContext { return _orderByExpressions; } - public QueryContext getSubquery() { - return _subquery; - } - /** * Returns the limit of the query. */ @@ -236,10 +236,10 @@ public class QueryContext { } /** - * Returns the BrokerRequest where the QueryContext is extracted from. + * Returns {@code true} if the query is an EXPLAIN query, {@code false} otherwise. */ - public BrokerRequest getBrokerRequest() { - return _brokerRequest; + public boolean isExplain() { + return _explain; } /** @@ -381,15 +381,16 @@ public class QueryContext { */ @Override public String toString() { - return "QueryContext{" + "_tableName='" + _tableName + '\'' + ", _selectExpressions=" + _selectExpressions - + ", _aliasList=" + _aliasList + ", _filter=" + _filter + ", _groupByExpressions=" + _groupByExpressions - + ", _havingFilter=" + _havingFilter + ", _orderByExpressions=" + _orderByExpressions + ", _limit=" + _limit - + ", _offset=" + _offset + ", _queryOptions=" + _queryOptions + ", _debugOptions=" + _debugOptions - + ", _brokerRequest=" + _brokerRequest + '}'; + return "QueryContext{" + "_tableName='" + _tableName + '\'' + ", _subquery=" + _subquery + ", _selectExpressions=" + + _selectExpressions + ", _aliasList=" + _aliasList + ", _filter=" + _filter + ", _groupByExpressions=" + + _groupByExpressions + ", _havingFilter=" + _havingFilter + ", _orderByExpressions=" + _orderByExpressions + + ", _limit=" + _limit + ", _offset=" + _offset + ", _queryOptions=" + _queryOptions + ", _debugOptions=" + + _debugOptions + ", _expressionOverrideHints=" + _expressionOverrideHints + ", _explain=" + _explain + '}'; } public static class Builder { private String _tableName; + private QueryContext _subquery; private List<ExpressionContext> _selectExpressions; private List<String> _aliasList; private FilterContext _filter; @@ -400,15 +401,19 @@ public class QueryContext { private int _offset; private Map<String, String> _queryOptions; private Map<String, String> _debugOptions; - private BrokerRequest _brokerRequest; - private QueryContext _subquery; private Map<ExpressionContext, ExpressionContext> _expressionOverrideHints; + private boolean _explain; public Builder setTableName(String tableName) { _tableName = tableName; return this; } + public Builder setSubquery(QueryContext subquery) { + _subquery = subquery; + return this; + } + public Builder setSelectExpressions(List<ExpressionContext> selectExpressions) { _selectExpressions = selectExpressions; return this; @@ -419,22 +424,22 @@ public class QueryContext { return this; } - public Builder setFilter(@Nullable FilterContext filter) { + public Builder setFilter(FilterContext filter) { _filter = filter; return this; } - public Builder setGroupByExpressions(@Nullable List<ExpressionContext> groupByExpressions) { + public Builder setGroupByExpressions(List<ExpressionContext> groupByExpressions) { _groupByExpressions = groupByExpressions; return this; } - public Builder setHavingFilter(@Nullable FilterContext havingFilter) { + public Builder setHavingFilter(FilterContext havingFilter) { _havingFilter = havingFilter; return this; } - public Builder setOrderByExpressions(@Nullable List<OrderByExpressionContext> orderByExpressions) { + public Builder setOrderByExpressions(List<OrderByExpressionContext> orderByExpressions) { _orderByExpressions = orderByExpressions; return this; } @@ -449,28 +454,23 @@ public class QueryContext { return this; } - public Builder setQueryOptions(@Nullable Map<String, String> queryOptions) { + public Builder setQueryOptions(Map<String, String> queryOptions) { _queryOptions = queryOptions; return this; } - public Builder setDebugOptions(@Nullable Map<String, String> debugOptions) { + public Builder setDebugOptions(Map<String, String> debugOptions) { _debugOptions = debugOptions; return this; } - public Builder setBrokerRequest(BrokerRequest brokerRequest) { - _brokerRequest = brokerRequest; - return this; - } - - public Builder setSubquery(QueryContext subquery) { - _subquery = subquery; + public Builder setExpressionOverrideHints(Map<ExpressionContext, ExpressionContext> expressionOverrideHints) { + _expressionOverrideHints = expressionOverrideHints; return this; } - public Builder setExpressionOverrideHints(Map<ExpressionContext, ExpressionContext> expressionOverrideHints) { - _expressionOverrideHints = expressionOverrideHints; + public Builder setExplain(boolean explain) { + _explain = explain; return this; } @@ -481,9 +481,9 @@ public class QueryContext { _queryOptions = Collections.emptyMap(); } QueryContext queryContext = - new QueryContext(_tableName, _selectExpressions, _aliasList, _filter, _groupByExpressions, _havingFilter, - _orderByExpressions, _limit, _offset, _queryOptions, _debugOptions, _brokerRequest, _subquery, - _expressionOverrideHints); + new QueryContext(_tableName, _subquery, _selectExpressions, _aliasList, _filter, _groupByExpressions, + _havingFilter, _orderByExpressions, _limit, _offset, _queryOptions, _debugOptions, + _expressionOverrideHints, _explain); // Pre-calculate the aggregation functions and columns for the query generateAggregationFunctions(queryContext); @@ -555,8 +555,8 @@ public class QueryContext { aggregationFunctionIndexMap.put(entry.getKey().getLeft(), entry.getValue()); } queryContext._aggregationFunctions = aggregationFunctions; - queryContext._filteredAggregationFunctions = filteredAggregationFunctions; queryContext._aggregationFunctionIndexMap = aggregationFunctionIndexMap; + queryContext._filteredAggregationFunctions = filteredAggregationFunctions; queryContext._filteredAggregationsIndexMap = filteredAggregationsIndexMap; } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/utils/BrokerRequestToQueryContextConverter.java b/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/utils/BrokerRequestToQueryContextConverter.java deleted file mode 100644 index dfdd1f9ab0..0000000000 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/utils/BrokerRequestToQueryContextConverter.java +++ /dev/null @@ -1,157 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.core.query.request.context.utils; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import org.apache.commons.collections.CollectionUtils; -import org.apache.pinot.common.request.BrokerRequest; -import org.apache.pinot.common.request.Expression; -import org.apache.pinot.common.request.ExpressionType; -import org.apache.pinot.common.request.Function; -import org.apache.pinot.common.request.PinotQuery; -import org.apache.pinot.common.request.context.ExpressionContext; -import org.apache.pinot.common.request.context.FilterContext; -import org.apache.pinot.common.request.context.OrderByExpressionContext; -import org.apache.pinot.common.request.context.RequestContextUtils; -import org.apache.pinot.core.query.request.context.QueryContext; - - -public class BrokerRequestToQueryContextConverter { - private BrokerRequestToQueryContextConverter() { - } - - /** - * Converts the given {@link BrokerRequest} into a {@link QueryContext}. - */ - public static QueryContext convert(BrokerRequest brokerRequest) { - return convert(brokerRequest.getPinotQuery(), brokerRequest); - } - - private static QueryContext convert(PinotQuery pinotQuery, BrokerRequest brokerRequest) { - QueryContext subquery = null; - if (pinotQuery.getDataSource().getSubquery() != null) { - subquery = convert(pinotQuery.getDataSource().getSubquery(), brokerRequest); - } - // SELECT - List<ExpressionContext> selectExpressions; - List<Expression> selectList = pinotQuery.getSelectList(); - List<String> aliasList = new ArrayList<>(selectList.size()); - selectExpressions = new ArrayList<>(selectList.size()); - for (Expression thriftExpression : selectList) { - // Handle alias - Expression expressionWithoutAlias = thriftExpression; - if (thriftExpression.getType() == ExpressionType.FUNCTION) { - Function function = thriftExpression.getFunctionCall(); - List<Expression> operands = function.getOperands(); - switch (function.getOperator().toUpperCase()) { - case "AS": - expressionWithoutAlias = operands.get(0); - aliasList.add(operands.get(1).getIdentifier().getName()); - break; - case "DISTINCT": - int numOperands = operands.size(); - for (int i = 0; i < numOperands; i++) { - Expression operand = operands.get(i); - Function operandFunction = operand.getFunctionCall(); - if (operandFunction != null && operandFunction.getOperator().equalsIgnoreCase("AS")) { - operands.set(i, operandFunction.getOperands().get(0)); - aliasList.add(operandFunction.getOperands().get(1).getIdentifier().getName()); - } else { - aliasList.add(null); - } - } - break; - default: - // Add null as a placeholder for alias. - aliasList.add(null); - break; - } - } else { - // Add null as a placeholder for alias. - aliasList.add(null); - } - selectExpressions.add(RequestContextUtils.getExpression(expressionWithoutAlias)); - } - - // WHERE - FilterContext filter = null; - Expression filterExpression = pinotQuery.getFilterExpression(); - if (filterExpression != null) { - filter = RequestContextUtils.getFilter(filterExpression); - } - - // GROUP BY - List<ExpressionContext> groupByExpressions = null; - List<Expression> groupByList = pinotQuery.getGroupByList(); - if (CollectionUtils.isNotEmpty(groupByList)) { - groupByExpressions = new ArrayList<>(groupByList.size()); - for (Expression thriftExpression : groupByList) { - groupByExpressions.add(RequestContextUtils.getExpression(thriftExpression)); - } - } - - // ORDER BY - List<OrderByExpressionContext> orderByExpressions = null; - List<Expression> orderByList = pinotQuery.getOrderByList(); - if (CollectionUtils.isNotEmpty(orderByList)) { - // Deduplicate the order-by expressions - orderByExpressions = new ArrayList<>(orderByList.size()); - Set<ExpressionContext> expressionSet = new HashSet<>(); - for (Expression orderBy : orderByList) { - // NOTE: Order-by is always a Function with the ordering of the Expression - Function thriftFunction = orderBy.getFunctionCall(); - ExpressionContext expression = RequestContextUtils.getExpression(thriftFunction.getOperands().get(0)); - if (expressionSet.add(expression)) { - boolean isAsc = thriftFunction.getOperator().equalsIgnoreCase("ASC"); - orderByExpressions.add(new OrderByExpressionContext(expression, isAsc)); - } - } - } - - // HAVING - FilterContext havingFilter = null; - Expression havingExpression = pinotQuery.getHavingExpression(); - if (havingExpression != null) { - havingFilter = RequestContextUtils.getFilter(havingExpression); - } - - // EXPRESSION OVERRIDE HINTS - Map<ExpressionContext, ExpressionContext> expressionContextOverrideHints = new HashMap<>(); - Map<Expression, Expression> expressionOverrideHints = pinotQuery.getExpressionOverrideHints(); - if (expressionOverrideHints != null) { - for (Map.Entry<Expression, Expression> entry : expressionOverrideHints.entrySet()) { - expressionContextOverrideHints.put(RequestContextUtils.getExpression(entry.getKey()), - RequestContextUtils.getExpression(entry.getValue())); - } - } - - return new QueryContext.Builder().setTableName(pinotQuery.getDataSource().getTableName()) - .setSelectExpressions(selectExpressions).setAliasList(aliasList).setFilter(filter) - .setGroupByExpressions(groupByExpressions).setOrderByExpressions(orderByExpressions) - .setHavingFilter(havingFilter).setLimit(pinotQuery.getLimit()).setOffset(pinotQuery.getOffset()) - .setQueryOptions(pinotQuery.getQueryOptions()).setDebugOptions(pinotQuery.getDebugOptions()) - .setSubquery(subquery).setExpressionOverrideHints(expressionContextOverrideHints) - .setBrokerRequest(brokerRequest).build(); - } -} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/utils/QueryContextConverterUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/utils/QueryContextConverterUtils.java index d2595e5bb7..d6a8437ec4 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/utils/QueryContextConverterUtils.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/utils/QueryContextConverterUtils.java @@ -18,8 +18,24 @@ */ package org.apache.pinot.core.query.request.context.utils; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.commons.collections.CollectionUtils; +import org.apache.pinot.common.request.DataSource; +import org.apache.pinot.common.request.Expression; +import org.apache.pinot.common.request.ExpressionType; +import org.apache.pinot.common.request.Function; +import org.apache.pinot.common.request.PinotQuery; +import org.apache.pinot.common.request.context.ExpressionContext; +import org.apache.pinot.common.request.context.FilterContext; +import org.apache.pinot.common.request.context.OrderByExpressionContext; +import org.apache.pinot.common.request.context.RequestContextUtils; import org.apache.pinot.core.query.request.context.QueryContext; -import org.apache.pinot.sql.parsers.CalciteSqlCompiler; +import org.apache.pinot.sql.parsers.CalciteSqlParser; public class QueryContextConverterUtils { @@ -30,6 +46,120 @@ public class QueryContextConverterUtils { * Converts the given query into a {@link QueryContext}. */ public static QueryContext getQueryContext(String query) { - return BrokerRequestToQueryContextConverter.convert(CalciteSqlCompiler.compileToBrokerRequest(query)); + return getQueryContext(CalciteSqlParser.compileToPinotQuery(query)); + } + + /** + * Converts the given {@link PinotQuery} into a {@link QueryContext}. + */ + public static QueryContext getQueryContext(PinotQuery pinotQuery) { + // FROM + String tableName; + DataSource dataSource = pinotQuery.getDataSource(); + tableName = dataSource.getTableName(); + QueryContext subquery = null; + if (dataSource.getSubquery() != null) { + subquery = getQueryContext(dataSource.getSubquery()); + } + + // SELECT + List<ExpressionContext> selectExpressions; + List<Expression> selectList = pinotQuery.getSelectList(); + List<String> aliasList = new ArrayList<>(selectList.size()); + selectExpressions = new ArrayList<>(selectList.size()); + for (Expression thriftExpression : selectList) { + // Handle alias + Expression expressionWithoutAlias = thriftExpression; + if (thriftExpression.getType() == ExpressionType.FUNCTION) { + Function function = thriftExpression.getFunctionCall(); + List<Expression> operands = function.getOperands(); + switch (function.getOperator().toUpperCase()) { + case "AS": + expressionWithoutAlias = operands.get(0); + aliasList.add(operands.get(1).getIdentifier().getName()); + break; + case "DISTINCT": + int numOperands = operands.size(); + for (int i = 0; i < numOperands; i++) { + Expression operand = operands.get(i); + Function operandFunction = operand.getFunctionCall(); + if (operandFunction != null && operandFunction.getOperator().equalsIgnoreCase("AS")) { + operands.set(i, operandFunction.getOperands().get(0)); + aliasList.add(operandFunction.getOperands().get(1).getIdentifier().getName()); + } else { + aliasList.add(null); + } + } + break; + default: + // Add null as a placeholder for alias. + aliasList.add(null); + break; + } + } else { + // Add null as a placeholder for alias. + aliasList.add(null); + } + selectExpressions.add(RequestContextUtils.getExpression(expressionWithoutAlias)); + } + + // WHERE + FilterContext filter = null; + Expression filterExpression = pinotQuery.getFilterExpression(); + if (filterExpression != null) { + filter = RequestContextUtils.getFilter(filterExpression); + } + + // GROUP BY + List<ExpressionContext> groupByExpressions = null; + List<Expression> groupByList = pinotQuery.getGroupByList(); + if (CollectionUtils.isNotEmpty(groupByList)) { + groupByExpressions = new ArrayList<>(groupByList.size()); + for (Expression thriftExpression : groupByList) { + groupByExpressions.add(RequestContextUtils.getExpression(thriftExpression)); + } + } + + // ORDER BY + List<OrderByExpressionContext> orderByExpressions = null; + List<Expression> orderByList = pinotQuery.getOrderByList(); + if (CollectionUtils.isNotEmpty(orderByList)) { + // Deduplicate the order-by expressions + orderByExpressions = new ArrayList<>(orderByList.size()); + Set<ExpressionContext> expressionSet = new HashSet<>(); + for (Expression orderBy : orderByList) { + // NOTE: Order-by is always a Function with the ordering of the Expression + Function thriftFunction = orderBy.getFunctionCall(); + ExpressionContext expression = RequestContextUtils.getExpression(thriftFunction.getOperands().get(0)); + if (expressionSet.add(expression)) { + boolean isAsc = thriftFunction.getOperator().equalsIgnoreCase("ASC"); + orderByExpressions.add(new OrderByExpressionContext(expression, isAsc)); + } + } + } + + // HAVING + FilterContext havingFilter = null; + Expression havingExpression = pinotQuery.getHavingExpression(); + if (havingExpression != null) { + havingFilter = RequestContextUtils.getFilter(havingExpression); + } + + // EXPRESSION OVERRIDE HINTS + Map<ExpressionContext, ExpressionContext> expressionContextOverrideHints = new HashMap<>(); + Map<Expression, Expression> expressionOverrideHints = pinotQuery.getExpressionOverrideHints(); + if (expressionOverrideHints != null) { + for (Map.Entry<Expression, Expression> entry : expressionOverrideHints.entrySet()) { + expressionContextOverrideHints.put(RequestContextUtils.getExpression(entry.getKey()), + RequestContextUtils.getExpression(entry.getValue())); + } + } + + return new QueryContext.Builder().setTableName(tableName).setSubquery(subquery) + .setSelectExpressions(selectExpressions).setAliasList(aliasList).setFilter(filter) + .setGroupByExpressions(groupByExpressions).setOrderByExpressions(orderByExpressions) + .setHavingFilter(havingFilter).setLimit(pinotQuery.getLimit()).setOffset(pinotQuery.getOffset()) + .setQueryOptions(pinotQuery.getQueryOptions()).setDebugOptions(pinotQuery.getDebugOptions()) + .setExpressionOverrideHints(expressionContextOverrideHints).setExplain(pinotQuery.isExplain()).build(); } } diff --git a/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/QueryOverrideWithHintsTest.java b/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/QueryOverrideWithHintsTest.java index f83979b482..31d65af0da 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/QueryOverrideWithHintsTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/QueryOverrideWithHintsTest.java @@ -26,8 +26,8 @@ import java.util.List; import java.util.Map; import java.util.Set; import javax.annotation.Nullable; -import org.apache.pinot.common.request.BrokerRequest; import org.apache.pinot.common.request.Expression; +import org.apache.pinot.common.request.PinotQuery; import org.apache.pinot.common.request.context.ExpressionContext; import org.apache.pinot.common.request.context.FilterContext; import org.apache.pinot.common.request.context.FunctionContext; @@ -35,14 +35,14 @@ import org.apache.pinot.common.request.context.RequestContextUtils; import org.apache.pinot.common.request.context.predicate.EqPredicate; import org.apache.pinot.common.utils.request.RequestUtils; import org.apache.pinot.core.query.request.context.QueryContext; -import org.apache.pinot.core.query.request.context.utils.BrokerRequestToQueryContextConverter; +import org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils; import org.apache.pinot.segment.spi.IndexSegment; import org.apache.pinot.segment.spi.SegmentMetadata; import org.apache.pinot.segment.spi.datasource.DataSource; import org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap; import org.apache.pinot.segment.spi.index.startree.StarTreeV2; import org.apache.pinot.spi.data.readers.GenericRow; -import org.apache.pinot.sql.parsers.CalciteSqlCompiler; +import org.apache.pinot.sql.parsers.CalciteSqlParser; import org.testng.annotations.Test; import static org.testng.Assert.assertEquals; @@ -198,15 +198,14 @@ public class QueryOverrideWithHintsTest { @Test public void testRewriteExpressionsWithHints() { - BrokerRequest brokerRequest = CalciteSqlCompiler.compileToBrokerRequest( + PinotQuery pinotQuery = CalciteSqlParser.compileToPinotQuery( "SELECT datetrunc('MONTH', ts), count(*), sum(abc) from myTable group by datetrunc('MONTH', ts) "); Expression dateTruncFunctionExpr = RequestUtils.getFunctionExpression("datetrunc"); dateTruncFunctionExpr.getFunctionCall().setOperands(new ArrayList<>( ImmutableList.of(RequestUtils.getLiteralExpression("MONTH"), RequestUtils.getIdentifierExpression("ts")))); Expression timestampIndexColumn = RequestUtils.getIdentifierExpression("$ts$MONTH"); - brokerRequest.getPinotQuery() - .setExpressionOverrideHints(ImmutableMap.of(dateTruncFunctionExpr, timestampIndexColumn)); - QueryContext queryContext = BrokerRequestToQueryContextConverter.convert(brokerRequest); + pinotQuery.setExpressionOverrideHints(ImmutableMap.of(dateTruncFunctionExpr, timestampIndexColumn)); + QueryContext queryContext = QueryContextConverterUtils.getQueryContext(pinotQuery); InstancePlanMakerImplV2.rewriteQueryContextWithHints(queryContext, _indexSegment); assertEquals(queryContext.getSelectExpressions().get(0).getIdentifier(), "$ts$MONTH"); assertEquals(queryContext.getGroupByExpressions().get(0).getIdentifier(), "$ts$MONTH"); @@ -214,15 +213,14 @@ public class QueryOverrideWithHintsTest { @Test public void testNotRewriteExpressionsWithHints() { - BrokerRequest brokerRequest = CalciteSqlCompiler.compileToBrokerRequest( + PinotQuery pinotQuery = CalciteSqlParser.compileToPinotQuery( "SELECT datetrunc('DAY', ts), count(*), sum(abc) from myTable group by datetrunc('DAY', ts)"); Expression dateTruncFunctionExpr = RequestUtils.getFunctionExpression("datetrunc"); dateTruncFunctionExpr.getFunctionCall().setOperands(new ArrayList<>( ImmutableList.of(RequestUtils.getLiteralExpression("DAY"), RequestUtils.getIdentifierExpression("ts")))); Expression timestampIndexColumn = RequestUtils.getIdentifierExpression("$ts$DAY"); - brokerRequest.getPinotQuery() - .setExpressionOverrideHints(ImmutableMap.of(dateTruncFunctionExpr, timestampIndexColumn)); - QueryContext queryContext = BrokerRequestToQueryContextConverter.convert(brokerRequest); + pinotQuery.setExpressionOverrideHints(ImmutableMap.of(dateTruncFunctionExpr, timestampIndexColumn)); + QueryContext queryContext = QueryContextConverterUtils.getQueryContext(pinotQuery); InstancePlanMakerImplV2.rewriteQueryContextWithHints(queryContext, _indexSegment); assertEquals(queryContext.getSelectExpressions().get(0).getFunction(), queryContext.getExpressionOverrideHints().keySet().iterator().next().getFunction()); diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/BaseQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/BaseQueriesTest.java index fc33e780e7..ee7ab37778 100644 --- a/pinot-core/src/test/java/org/apache/pinot/queries/BaseQueriesTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/queries/BaseQueriesTest.java @@ -38,7 +38,7 @@ import org.apache.pinot.core.query.executor.ServerQueryExecutorV1Impl; import org.apache.pinot.core.query.optimizer.QueryOptimizer; import org.apache.pinot.core.query.reduce.BrokerReduceService; import org.apache.pinot.core.query.request.context.QueryContext; -import org.apache.pinot.core.query.request.context.utils.BrokerRequestToQueryContextConverter; +import org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils; import org.apache.pinot.core.transport.ServerRoutingInstance; import org.apache.pinot.core.util.GapfillUtils; import org.apache.pinot.segment.spi.IndexSegment; @@ -49,6 +49,7 @@ import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.CommonConstants.Server; import org.apache.pinot.sql.parsers.CalciteSqlCompiler; +import org.apache.pinot.sql.parsers.CalciteSqlParser; /** @@ -74,7 +75,7 @@ public abstract class BaseQueriesTest { protected <T extends Operator> T getOperator(String query) { BrokerRequest brokerRequest = CalciteSqlCompiler.compileToBrokerRequest(query); BrokerRequest serverBrokerRequest = GapfillUtils.stripGapfill(brokerRequest); - QueryContext queryContext = BrokerRequestToQueryContextConverter.convert(serverBrokerRequest); + QueryContext queryContext = QueryContextConverterUtils.getQueryContext(serverBrokerRequest.getPinotQuery()); return (T) PLAN_MAKER.makeSegmentPlanNode(getIndexSegment(), queryContext).run(); } @@ -130,8 +131,7 @@ public abstract class BaseQueriesTest { */ private BrokerResponseNative getBrokerResponse(String query, PlanMaker planMaker, @Nullable Map<String, String> extraQueryOptions) { - BrokerRequest brokerRequest = CalciteSqlCompiler.compileToBrokerRequest(query); - PinotQuery pinotQuery = brokerRequest.getPinotQuery(); + PinotQuery pinotQuery = CalciteSqlParser.compileToPinotQuery(query); if (extraQueryOptions != null) { Map<String, String> queryOptions = pinotQuery.getQueryOptions(); if (queryOptions == null) { @@ -140,11 +140,7 @@ public abstract class BaseQueriesTest { } queryOptions.putAll(extraQueryOptions); } - BrokerRequest serverBrokerRequest = GapfillUtils.stripGapfill(brokerRequest); - QueryContext queryContext = BrokerRequestToQueryContextConverter.convert(brokerRequest); - QueryContext serverQueryContext = brokerRequest == serverBrokerRequest ? queryContext - : BrokerRequestToQueryContextConverter.convert(serverBrokerRequest); - return getBrokerResponse(queryContext, serverQueryContext, planMaker); + return getBrokerResponse(pinotQuery, planMaker); } /** @@ -152,14 +148,18 @@ public abstract class BaseQueriesTest { * <p>Use this to test the whole flow from server to broker. * <p>The result should be equivalent to querying 4 identical index segments. */ - private BrokerResponseNative getBrokerResponse(QueryContext queryContext, QueryContext serverQueryContext, - PlanMaker planMaker) { + private BrokerResponseNative getBrokerResponse(PinotQuery pinotQuery, PlanMaker planMaker) { + BrokerRequest brokerRequest = CalciteSqlCompiler.convertToBrokerRequest(pinotQuery); + BrokerRequest serverBrokerRequest = GapfillUtils.stripGapfill(brokerRequest); + QueryContext queryContext = QueryContextConverterUtils.getQueryContext(pinotQuery); + QueryContext serverQueryContext = serverBrokerRequest == brokerRequest ? queryContext + : QueryContextConverterUtils.getQueryContext(serverBrokerRequest.getPinotQuery()); + // Server side serverQueryContext.setEndTimeMs(System.currentTimeMillis() + Server.DEFAULT_QUERY_EXECUTOR_TIMEOUT_MS); Plan plan = planMaker.makeInstancePlan(getIndexSegments(), serverQueryContext, EXECUTOR_SERVICE); - PinotQuery pinotQuery = serverQueryContext.getBrokerRequest().getPinotQuery(); DataTable instanceResponse = - pinotQuery.isExplain() ? ServerQueryExecutorV1Impl.processExplainPlanQueries(plan) : plan.execute(); + queryContext.isExplain() ? ServerQueryExecutorV1Impl.processExplainPlanQueries(plan) : plan.execute(); // Broker side // Use 2 Threads for 2 data-tables @@ -177,8 +177,8 @@ public abstract class BaseQueriesTest { throw new RuntimeException(e); } BrokerResponseNative brokerResponse = - brokerReduceService.reduceOnDataTable(queryContext.getBrokerRequest(), serverQueryContext.getBrokerRequest(), - dataTableMap, CommonConstants.Broker.DEFAULT_BROKER_TIMEOUT_MS, null); + brokerReduceService.reduceOnDataTable(brokerRequest, serverBrokerRequest, dataTableMap, + CommonConstants.Broker.DEFAULT_BROKER_TIMEOUT_MS, null); brokerReduceService.shutDown(); return brokerResponse; @@ -191,12 +191,8 @@ public abstract class BaseQueriesTest { */ protected BrokerResponseNative getBrokerResponseForOptimizedQuery(String query, @Nullable TableConfig config, @Nullable Schema schema) { - BrokerRequest brokerRequest = CalciteSqlCompiler.compileToBrokerRequest(query); - PinotQuery pinotQuery = brokerRequest.getPinotQuery(); + PinotQuery pinotQuery = CalciteSqlParser.compileToPinotQuery(query); OPTIMIZER.optimize(pinotQuery, config, schema); - BrokerRequest serverBrokerRequest = GapfillUtils.stripGapfill(brokerRequest); - QueryContext queryContext = BrokerRequestToQueryContextConverter.convert(brokerRequest); - QueryContext serverQueryContext = BrokerRequestToQueryContextConverter.convert(serverBrokerRequest); - return getBrokerResponse(queryContext, serverQueryContext, PLAN_MAKER); + return getBrokerResponse(pinotQuery, PLAN_MAKER); } } diff --git a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java index 3d69f5d4c5..fbea4e6220 100644 --- a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java +++ b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java @@ -59,6 +59,7 @@ import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.pinot.client.ResultSetGroup; import org.apache.pinot.common.request.PinotQuery; +import org.apache.pinot.common.request.context.OrderByExpressionContext; import org.apache.pinot.common.utils.TarGzCompressionUtils; import org.apache.pinot.core.query.request.context.QueryContext; import org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils; @@ -563,11 +564,11 @@ public class ClusterIntegrationTestUtils { if (!QueryContextUtils.isAggregationQuery(queryContext)) { // selection/distinct - List<String> orderByColumns = new ArrayList<>(); + Set<String> orderByColumns = new HashSet<>(); if (queryContext.getOrderByExpressions() != null) { - orderByColumns.addAll( - CalciteSqlParser.extractIdentifiers(queryContext.getBrokerRequest().getPinotQuery().getOrderByList(), - false)); + for (OrderByExpressionContext orderByExpression : queryContext.getOrderByExpressions()) { + orderByExpression.getColumns(orderByColumns); + } } Set<String> expectedValues = new HashSet<>(); List<String> expectedOrderByValues = new ArrayList<>(); @@ -717,7 +718,7 @@ public class ClusterIntegrationTestUtils { private static void comparePinotResultsWithExpectedValues(Set<String> expectedValues, List<String> expectedOrderByValues, org.apache.pinot.client.ResultSet connectionResultSet, - Collection<String> orderByColumns, String pinotQuery, String h2Query, int h2NumRows, + Set<String> orderByColumns, String pinotQuery, String h2Query, int h2NumRows, long pinotNumRecordsSelected) { int pinotNumRows = connectionResultSet.getRowCount(); diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java index 8d6c7646f9..15fdc312a5 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java @@ -616,7 +616,6 @@ public class CommonConstants { public static final String ENABLE_TRACE = "enableTrace"; public static final String ENABLE_STREAMING = "enableStreaming"; public static final String PAYLOAD_TYPE = "payloadType"; - public static final String EXPLAIN = "explain"; } public static class PayloadType { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org