This is an automated email from the ASF dual-hosted git repository. siddteotia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 8bf94e7252 refactor query and option compilation (#9239) 8bf94e7252 is described below commit 8bf94e72526cc02bd1538a05e5faacfb4fd1ca08 Author: Rong Rong <ro...@apache.org> AuthorDate: Wed Aug 24 14:08:12 2022 -0700 refactor query and option compilation (#9239) * refactor sql compiler - move option parsing and query compile to common utils - support option string parsing as well from controller rest api * revert debug option changes * fix linter and compile issue * revert parse request change Co-authored-by: Rong Rong <ro...@startree.ai> --- .../broker/api/resources/PinotClientRequest.java | 4 +- .../requesthandler/BaseBrokerRequestHandler.java | 63 ++---------- .../BrokerRequestHandlerDelegate.java | 37 +++---- .../MultiStageBrokerRequestHandler.java | 13 +-- .../requesthandler/BrokerRequestOptionsTest.java | 108 ++++++++++----------- .../pinot/common/utils/request/RequestUtils.java | 65 +++++++++++++ .../apache/pinot/sql/parsers/CalciteSqlParser.java | 5 +- .../pinot/sql/parsers/SqlNodeAndOptions.java | 4 +- .../pinot/sql/parsers/CalciteSqlCompilerTest.java | 2 +- .../api/resources/PinotQueryResource.java | 13 ++- 10 files changed, 157 insertions(+), 157 deletions(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java b/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java index e1ccd67b2f..739dfe24ec 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java @@ -58,12 +58,12 @@ import org.apache.pinot.common.metrics.BrokerMeter; import org.apache.pinot.common.metrics.BrokerMetrics; import org.apache.pinot.common.response.BrokerResponse; import org.apache.pinot.common.response.broker.BrokerResponseNative; +import org.apache.pinot.common.utils.request.RequestUtils; import org.apache.pinot.core.query.executor.sql.SqlQueryExecutor; import org.apache.pinot.spi.trace.RequestScope; import org.apache.pinot.spi.trace.Tracing; import org.apache.pinot.spi.utils.CommonConstants.Broker.Request; import org.apache.pinot.spi.utils.JsonUtils; -import org.apache.pinot.sql.parsers.CalciteSqlParser; import org.apache.pinot.sql.parsers.PinotSqlType; import org.apache.pinot.sql.parsers.SqlNodeAndOptions; import org.glassfish.jersey.server.ManagedAsync; @@ -209,7 +209,7 @@ public class PinotClientRequest { throws Exception { SqlNodeAndOptions sqlNodeAndOptions; try { - sqlNodeAndOptions = CalciteSqlParser.compileToSqlNodeAndOptions(sqlRequestJson.get(Request.SQL).asText()); + sqlNodeAndOptions = RequestUtils.parseQuery(sqlRequestJson.get(Request.SQL).asText(), sqlRequestJson); } catch (Exception e) { return new BrokerResponseNative(QueryException.getException(QueryException.SQL_PARSING_ERROR, e)); } diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java index 1322227797..07d9b321f7 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java @@ -21,7 +21,6 @@ package org.apache.pinot.broker.requesthandler; import com.fasterxml.jackson.databind.JsonNode; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; -import com.google.common.base.Splitter; import com.google.common.util.concurrent.RateLimiter; import java.net.InetAddress; import java.util.ArrayList; @@ -290,16 +289,13 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { throws Exception { LOGGER.debug("SQL query for request {}: {}", requestId, query); - // Compile the request - long compilationStartTimeNs = System.nanoTime(); + long compilationStartTimeNs; PinotQuery pinotQuery; try { - if (sqlNodeAndOptions != null) { - // Include parse time when the query is already parsed - compilationStartTimeNs -= sqlNodeAndOptions.getParseTimeNs(); - } else { - sqlNodeAndOptions = CalciteSqlParser.compileToSqlNodeAndOptions(query); - } + // Parse the request + sqlNodeAndOptions = sqlNodeAndOptions != null ? sqlNodeAndOptions : RequestUtils.parseQuery(query, request); + // Compile the request into PinotQuery + compilationStartTimeNs = System.nanoTime(); pinotQuery = CalciteSqlParser.compileToPinotQuery(sqlNodeAndOptions); } catch (Exception e) { LOGGER.info("Caught exception while compiling SQL request {}: {}, {}", requestId, query, e.getMessage()); @@ -307,7 +303,6 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { requestContext.setErrorCode(QueryException.SQL_PARSING_ERROR_CODE); return new BrokerResponseNative(QueryException.getException(QueryException.SQL_PARSING_ERROR, e)); } - setOptions(pinotQuery, requestId, query, request); if (isLiteralOnlyQuery(pinotQuery)) { LOGGER.debug("Request {} contains only Literal, skipping server query: {}", requestId, query); @@ -377,8 +372,9 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { } long compilationEndTimeNs = System.nanoTime(); + // full request compile time = compilationTimeNs + parserTimeNs _brokerMetrics.addPhaseTiming(rawTableName, BrokerQueryPhase.REQUEST_COMPILATION, - compilationEndTimeNs - compilationStartTimeNs); + (compilationEndTimeNs - compilationStartTimeNs) + sqlNodeAndOptions.getParseTimeNs()); // Second-stage table-level access control // TODO: Modify AccessControl interface to directly take PinotQuery @@ -1543,11 +1539,6 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { throw new BadQueryRequestException("Unknown columnName '" + columnName + "' found in the query"); } - public static Map<String, String> getOptionsFromJson(JsonNode request, String optionsKey) { - return Splitter.on(';').omitEmptyStrings().trimResults().withKeyValueSeparator('=') - .split(request.get(optionsKey).asText()); - } - /** * Helper function to decide whether to force the log * @@ -1566,46 +1557,6 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { return totalTimeMs > 1000L; } - /** - * Sets extra options for the given query. - */ - @VisibleForTesting - static void setOptions(PinotQuery pinotQuery, long requestId, String query, JsonNode jsonRequest) { - Map<String, String> queryOptions = new HashMap<>(); - if (jsonRequest.has(Broker.Request.DEBUG_OPTIONS)) { - Map<String, String> debugOptions = getOptionsFromJson(jsonRequest, Broker.Request.DEBUG_OPTIONS); - if (!debugOptions.isEmpty()) { - // TODO: Do not set debug options after releasing 0.11.0. Currently we kept it for backward compatibility. - LOGGER.debug("Debug options are set to: {} for request {}: {}", debugOptions, requestId, query); - pinotQuery.setDebugOptions(debugOptions); - - // NOTE: Debug options are deprecated. Put all debug options into query options for backward compatibility. - queryOptions.putAll(debugOptions); - } - } - if (jsonRequest.has(Broker.Request.QUERY_OPTIONS)) { - Map<String, String> queryOptionsFromJson = getOptionsFromJson(jsonRequest, Broker.Request.QUERY_OPTIONS); - queryOptions.putAll(queryOptionsFromJson); - } - Map<String, String> queryOptionsFromQuery = pinotQuery.getQueryOptions(); - if (queryOptionsFromQuery != null) { - queryOptions.putAll(queryOptionsFromQuery); - } - boolean enableTrace = jsonRequest.has(Broker.Request.TRACE) && jsonRequest.get(Broker.Request.TRACE).asBoolean(); - if (enableTrace) { - queryOptions.put(Broker.Request.TRACE, "true"); - } - // NOTE: Always set query options because we will put 'timeoutMs' later - pinotQuery.setQueryOptions(queryOptions); - if (!queryOptions.isEmpty()) { - LOGGER.debug("Query options are set to: {} for request {}: {}", queryOptions, requestId, query); - } - // TODO: Remove the SQL query options after releasing 0.11.0 - // The query engine will break if these 2 options are missing during version upgrade. - queryOptions.put(Broker.Request.QueryOptionKey.GROUP_BY_MODE, Broker.Request.SQL); - queryOptions.put(Broker.Request.QueryOptionKey.RESPONSE_FORMAT, Broker.Request.SQL); - } - /** * Sets the query timeout (remaining time in milliseconds) into the query options, and returns the remaining time in * milliseconds. diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java index ce10cb6c2f..c6eee5f04f 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java @@ -29,11 +29,9 @@ import org.apache.pinot.common.metrics.BrokerMeter; import org.apache.pinot.common.metrics.BrokerMetrics; import org.apache.pinot.common.response.BrokerResponse; import org.apache.pinot.common.response.broker.BrokerResponseNative; -import org.apache.pinot.spi.exception.BadQueryRequestException; +import org.apache.pinot.common.utils.request.RequestUtils; import org.apache.pinot.spi.trace.RequestContext; -import org.apache.pinot.spi.utils.CommonConstants.Broker.Request; -import org.apache.pinot.spi.utils.CommonConstants.Broker.Request.QueryOptionKey; -import org.apache.pinot.sql.parsers.CalciteSqlParser; +import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.sql.parsers.SqlNodeAndOptions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -84,42 +82,29 @@ public class BrokerRequestHandlerDelegate implements BrokerRequestHandler { @Nullable RequesterIdentity requesterIdentity, RequestContext requestContext) throws Exception { if (sqlNodeAndOptions == null) { - JsonNode sql = request.get(Request.SQL); - if (sql == null) { - throw new BadQueryRequestException("Failed to find 'sql' in the request: " + request); - } try { - sqlNodeAndOptions = CalciteSqlParser.compileToSqlNodeAndOptions(sql.asText()); + sqlNodeAndOptions = RequestUtils.parseQuery(request.get(CommonConstants.Broker.Request.SQL).asText(), request); } catch (Exception e) { - LOGGER.info("Caught exception while compiling SQL: {}, {}", sql.asText(), e.getMessage()); + LOGGER.info("Caught exception while compiling SQL: {}, {}", request, e.getMessage()); _brokerMetrics.addMeteredGlobalValue(BrokerMeter.REQUEST_COMPILATION_EXCEPTIONS, 1); requestContext.setErrorCode(QueryException.SQL_PARSING_ERROR_CODE); return new BrokerResponseNative(QueryException.getException(QueryException.SQL_PARSING_ERROR, e)); } } + if (request.has(CommonConstants.Broker.Request.QUERY_OPTIONS)) { + sqlNodeAndOptions.setExtraOptions(RequestUtils.getOptionsFromJson(request, + CommonConstants.Broker.Request.QUERY_OPTIONS)); + } - if (_multiStageWorkerRequestHandler != null && useMultiStageEngine(request, sqlNodeAndOptions)) { - return _multiStageWorkerRequestHandler.handleRequest(request, sqlNodeAndOptions, requesterIdentity, - requestContext); + if (_multiStageWorkerRequestHandler != null && Boolean.parseBoolean(sqlNodeAndOptions.getOptions().get( + CommonConstants.Broker.Request.QueryOptionKey.USE_MULTISTAGE_ENGINE))) { + return _multiStageWorkerRequestHandler.handleRequest(request, requesterIdentity, requestContext); } else { return _singleStageBrokerRequestHandler.handleRequest(request, sqlNodeAndOptions, requesterIdentity, requestContext); } } - private boolean useMultiStageEngine(JsonNode request, SqlNodeAndOptions sqlNodeAndOptions) { - Map<String, String> optionsFromSql = sqlNodeAndOptions.getOptions(); - if (Boolean.parseBoolean(optionsFromSql.get(QueryOptionKey.USE_MULTISTAGE_ENGINE))) { - return true; - } - if (request.has(Request.QUERY_OPTIONS)) { - Map<String, String> optionsFromRequest = - BaseBrokerRequestHandler.getOptionsFromJson(request, Request.QUERY_OPTIONS); - return Boolean.parseBoolean(optionsFromRequest.get(QueryOptionKey.USE_MULTISTAGE_ENGINE)); - } - return false; - } - @Override public Map<Long, String> getRunningQueries() { // TODO: add support for multiStaged engine: track running queries for multiStaged engine and combine its diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java index 8e5546a9dd..5d83a6e329 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java @@ -41,6 +41,7 @@ import org.apache.pinot.common.response.broker.BrokerResponseNative; import org.apache.pinot.common.response.broker.ResultTable; import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.common.utils.DataTable; +import org.apache.pinot.common.utils.request.RequestUtils; import org.apache.pinot.core.query.selection.SelectionOperatorUtils; import org.apache.pinot.core.transport.ServerInstance; import org.apache.pinot.query.QueryEnvironment; @@ -57,7 +58,6 @@ import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.exception.BadQueryRequestException; import org.apache.pinot.spi.trace.RequestContext; import org.apache.pinot.spi.utils.CommonConstants; -import org.apache.pinot.sql.parsers.CalciteSqlParser; import org.apache.pinot.sql.parsers.SqlNodeAndOptions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -133,16 +133,12 @@ public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler { throws Exception { LOGGER.debug("SQL query for request {}: {}", requestId, query); + // Parse the request + sqlNodeAndOptions = sqlNodeAndOptions != null ? sqlNodeAndOptions : RequestUtils.parseQuery(query, request); // Compile the request long compilationStartTimeNs = System.nanoTime(); QueryPlan queryPlan; try { - if (sqlNodeAndOptions != null) { - // Include parse time when the query is already parsed - compilationStartTimeNs -= sqlNodeAndOptions.getParseTimeNs(); - } else { - sqlNodeAndOptions = CalciteSqlParser.compileToSqlNodeAndOptions(query); - } queryPlan = _queryEnvironment.planQuery(query, sqlNodeAndOptions); } catch (Exception e) { LOGGER.info("Caught exception while compiling SQL request {}: {}, {}", requestId, query, e.getMessage()); @@ -163,7 +159,8 @@ public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler { long executionEndTimeNs = System.nanoTime(); // Set total query processing time - long totalTimeMs = TimeUnit.NANOSECONDS.toMillis(executionEndTimeNs - compilationStartTimeNs); + long totalTimeMs = TimeUnit.NANOSECONDS.toMillis(sqlNodeAndOptions.getParseTimeNs() + + (executionEndTimeNs - compilationStartTimeNs)); brokerResponse.setTimeUsedMs(totalTimeMs); brokerResponse.setResultTable(toResultTable(queryResults)); requestContext.setQueryProcessingTime(totalTimeMs); diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BrokerRequestOptionsTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BrokerRequestOptionsTest.java index 527e4585da..14d1d39bdb 100644 --- a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BrokerRequestOptionsTest.java +++ b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BrokerRequestOptionsTest.java @@ -21,10 +21,11 @@ package org.apache.pinot.broker.requesthandler; import com.fasterxml.jackson.databind.node.ObjectNode; import java.util.HashMap; import java.util.Map; -import org.apache.pinot.common.request.PinotQuery; +import org.apache.pinot.common.utils.request.RequestUtils; import org.apache.pinot.spi.utils.CommonConstants.Broker.Request; import org.apache.pinot.spi.utils.JsonUtils; import org.apache.pinot.sql.parsers.CalciteSqlParser; +import org.apache.pinot.sql.parsers.SqlNodeAndOptions; import org.testng.Assert; import org.testng.annotations.Test; @@ -43,49 +44,46 @@ public class BrokerRequestOptionsTest { // None of the options ObjectNode jsonRequest = JsonUtils.newObjectNode(); - PinotQuery pinotQuery = CalciteSqlParser.compileToPinotQuery(query); - BaseBrokerRequestHandler.setOptions(pinotQuery, requestId, query, jsonRequest); - Assert.assertNull(pinotQuery.getDebugOptions()); - Assert.assertEquals(pinotQuery.getQueryOptionsSize(), 0 + LEGACY_PQL_QUERY_OPTION_SIZE); + SqlNodeAndOptions sqlNodeAndOptions = CalciteSqlParser.compileToSqlNodeAndOptions(query);; + RequestUtils.setOptions(sqlNodeAndOptions, jsonRequest); + Assert.assertEquals(sqlNodeAndOptions.getOptions().size(), 0 + LEGACY_PQL_QUERY_OPTION_SIZE); // TRACE // Has trace false jsonRequest.put(Request.TRACE, false); - pinotQuery = CalciteSqlParser.compileToPinotQuery(query); - BaseBrokerRequestHandler.setOptions(pinotQuery, requestId, query, jsonRequest); - Assert.assertNull(pinotQuery.getDebugOptions()); - Assert.assertEquals(pinotQuery.getQueryOptionsSize(), 0 + LEGACY_PQL_QUERY_OPTION_SIZE); + sqlNodeAndOptions = CalciteSqlParser.compileToSqlNodeAndOptions(query); + RequestUtils.setOptions(sqlNodeAndOptions, jsonRequest); + Assert.assertEquals(sqlNodeAndOptions.getOptions().size(), 0 + LEGACY_PQL_QUERY_OPTION_SIZE); // Has trace true jsonRequest.put(Request.TRACE, true); - pinotQuery = CalciteSqlParser.compileToPinotQuery(query); - BaseBrokerRequestHandler.setOptions(pinotQuery, requestId, query, jsonRequest); - Assert.assertNull(pinotQuery.getDebugOptions()); - Assert.assertEquals(pinotQuery.getQueryOptionsSize(), 1 + LEGACY_PQL_QUERY_OPTION_SIZE); - Assert.assertEquals(pinotQuery.getQueryOptions().get(Request.TRACE), "true"); + sqlNodeAndOptions = CalciteSqlParser.compileToSqlNodeAndOptions(query); + RequestUtils.setOptions(sqlNodeAndOptions, jsonRequest); + Assert.assertEquals(sqlNodeAndOptions.getOptions().size(), 1 + LEGACY_PQL_QUERY_OPTION_SIZE); + Assert.assertEquals(sqlNodeAndOptions.getOptions().get(Request.TRACE), "true"); // DEBUG_OPTIONS (debug options will also be included as query options) // Has debugOptions jsonRequest = JsonUtils.newObjectNode(); jsonRequest.put(Request.DEBUG_OPTIONS, "debugOption1=foo"); - pinotQuery = CalciteSqlParser.compileToPinotQuery(query); - BaseBrokerRequestHandler.setOptions(pinotQuery, requestId, query, jsonRequest); - Assert.assertEquals(pinotQuery.getDebugOptionsSize(), 1); - Assert.assertEquals(pinotQuery.getDebugOptions().get("debugOption1"), "foo"); + sqlNodeAndOptions = CalciteSqlParser.compileToSqlNodeAndOptions(query); + RequestUtils.setOptions(sqlNodeAndOptions, jsonRequest); + Assert.assertEquals(sqlNodeAndOptions.getOptions().size(), 1 + LEGACY_PQL_QUERY_OPTION_SIZE); + Assert.assertEquals(sqlNodeAndOptions.getOptions().get("debugOption1"), "foo"); // Has multiple debugOptions jsonRequest.put(Request.DEBUG_OPTIONS, "debugOption1=foo;debugOption2=bar"); - pinotQuery = CalciteSqlParser.compileToPinotQuery(query); - BaseBrokerRequestHandler.setOptions(pinotQuery, requestId, query, jsonRequest); - Assert.assertEquals(pinotQuery.getDebugOptionsSize(), 2); - Assert.assertEquals(pinotQuery.getDebugOptions().get("debugOption1"), "foo"); - Assert.assertEquals(pinotQuery.getDebugOptions().get("debugOption2"), "bar"); + sqlNodeAndOptions = CalciteSqlParser.compileToSqlNodeAndOptions(query); + RequestUtils.setOptions(sqlNodeAndOptions, jsonRequest); + Assert.assertEquals(sqlNodeAndOptions.getOptions().size(), 2 + LEGACY_PQL_QUERY_OPTION_SIZE); + Assert.assertEquals(sqlNodeAndOptions.getOptions().get("debugOption1"), "foo"); + Assert.assertEquals(sqlNodeAndOptions.getOptions().get("debugOption2"), "bar"); // Invalid debug options jsonRequest.put(Request.DEBUG_OPTIONS, "debugOption1"); - pinotQuery = CalciteSqlParser.compileToPinotQuery(query); + sqlNodeAndOptions = CalciteSqlParser.compileToSqlNodeAndOptions(query); try { - BaseBrokerRequestHandler.setOptions(pinotQuery, requestId, query, jsonRequest); + RequestUtils.setOptions(sqlNodeAndOptions, jsonRequest); Assert.fail(); } catch (Exception e) { // Expected @@ -93,53 +91,47 @@ public class BrokerRequestOptionsTest { // QUERY_OPTIONS jsonRequest = JsonUtils.newObjectNode(); - // Has queryOptions in pinotQuery already - pinotQuery = CalciteSqlParser.compileToPinotQuery(query); + // Has queryOptions in sqlNodeAndOptions already + sqlNodeAndOptions = CalciteSqlParser.compileToSqlNodeAndOptions(query); Map<String, String> queryOptions = new HashMap<>(); queryOptions.put("queryOption1", "foo"); - pinotQuery.setQueryOptions(queryOptions); - BaseBrokerRequestHandler.setOptions(pinotQuery, requestId, query, jsonRequest); - Assert.assertNull(pinotQuery.getDebugOptions()); - Assert.assertEquals(pinotQuery.getQueryOptionsSize(), 1 + LEGACY_PQL_QUERY_OPTION_SIZE); - Assert.assertEquals(pinotQuery.getQueryOptions().get("queryOption1"), "foo"); + sqlNodeAndOptions.getOptions().putAll(queryOptions); + RequestUtils.setOptions(sqlNodeAndOptions, jsonRequest); + Assert.assertEquals(sqlNodeAndOptions.getOptions().size(), 1 + LEGACY_PQL_QUERY_OPTION_SIZE); + Assert.assertEquals(sqlNodeAndOptions.getOptions().get("queryOption1"), "foo"); // Has queryOptions in query - pinotQuery = CalciteSqlParser.compileToPinotQuery("SET queryOption1='foo'; select * from testTable"); - BaseBrokerRequestHandler.setOptions(pinotQuery, requestId, query, jsonRequest); - Assert.assertNull(pinotQuery.getDebugOptions()); - Assert.assertEquals(pinotQuery.getQueryOptionsSize(), 1 + LEGACY_PQL_QUERY_OPTION_SIZE); - Assert.assertEquals(pinotQuery.getQueryOptions().get("queryOption1"), "foo"); + sqlNodeAndOptions = CalciteSqlParser.compileToSqlNodeAndOptions("SET queryOption1='foo'; select * from testTable"); + RequestUtils.setOptions(sqlNodeAndOptions, jsonRequest); + Assert.assertEquals(sqlNodeAndOptions.getOptions().size(), 1 + LEGACY_PQL_QUERY_OPTION_SIZE); + Assert.assertEquals(sqlNodeAndOptions.getOptions().get("queryOption1"), "foo"); // Has query options in json payload jsonRequest.put(Request.QUERY_OPTIONS, "queryOption1=foo"); - pinotQuery = CalciteSqlParser.compileToPinotQuery(query); - BaseBrokerRequestHandler.setOptions(pinotQuery, requestId, query, jsonRequest); - Assert.assertNull(pinotQuery.getDebugOptions()); - Assert.assertEquals(pinotQuery.getQueryOptionsSize(), 1 + LEGACY_PQL_QUERY_OPTION_SIZE); - Assert.assertEquals(pinotQuery.getQueryOptions().get("queryOption1"), "foo"); + sqlNodeAndOptions = CalciteSqlParser.compileToSqlNodeAndOptions(query); + RequestUtils.setOptions(sqlNodeAndOptions, jsonRequest); + Assert.assertEquals(sqlNodeAndOptions.getOptions().size(), 1 + LEGACY_PQL_QUERY_OPTION_SIZE); + Assert.assertEquals(sqlNodeAndOptions.getOptions().get("queryOption1"), "foo"); - // Has query options in both json payload and pinotQuery, pinotQuery takes priority + // Has query options in both json payload and sqlNodeAndOptions, sqlNodeAndOptions takes priority jsonRequest.put(Request.QUERY_OPTIONS, "queryOption1=bar;queryOption2=moo"); - pinotQuery = CalciteSqlParser.compileToPinotQuery("SET queryOption1='foo'; select * from testTable;"); - BaseBrokerRequestHandler.setOptions(pinotQuery, requestId, query, jsonRequest); - Assert.assertNull(pinotQuery.getDebugOptions()); - Assert.assertEquals(pinotQuery.getQueryOptionsSize(), 2 + LEGACY_PQL_QUERY_OPTION_SIZE); - Assert.assertEquals(pinotQuery.getQueryOptions().get("queryOption1"), "foo"); - Assert.assertEquals(pinotQuery.getQueryOptions().get("queryOption2"), "moo"); + sqlNodeAndOptions = CalciteSqlParser.compileToSqlNodeAndOptions("SET queryOption1='foo'; select * from testTable;"); + RequestUtils.setOptions(sqlNodeAndOptions, jsonRequest); + Assert.assertEquals(sqlNodeAndOptions.getOptions().size(), 2 + LEGACY_PQL_QUERY_OPTION_SIZE); + Assert.assertEquals(sqlNodeAndOptions.getOptions().get("queryOption1"), "foo"); + Assert.assertEquals(sqlNodeAndOptions.getOptions().get("queryOption2"), "moo"); // Has all 3 jsonRequest = JsonUtils.newObjectNode(); jsonRequest.put(Request.TRACE, true); jsonRequest.put(Request.DEBUG_OPTIONS, "debugOption1=foo"); jsonRequest.put(Request.QUERY_OPTIONS, "queryOption1=bar;queryOption2=moo"); - pinotQuery = CalciteSqlParser.compileToPinotQuery(query); - BaseBrokerRequestHandler.setOptions(pinotQuery, requestId, query, jsonRequest); - Assert.assertEquals(pinotQuery.getDebugOptionsSize(), 1); - Assert.assertEquals(pinotQuery.getDebugOptions().get("debugOption1"), "foo"); - Assert.assertEquals(pinotQuery.getQueryOptionsSize(), 4 + LEGACY_PQL_QUERY_OPTION_SIZE); - Assert.assertEquals(pinotQuery.getQueryOptions().get("queryOption1"), "bar"); - Assert.assertEquals(pinotQuery.getQueryOptions().get("queryOption2"), "moo"); - Assert.assertEquals(pinotQuery.getQueryOptions().get(Request.TRACE), "true"); - Assert.assertEquals(pinotQuery.getDebugOptions().get("debugOption1"), "foo"); + sqlNodeAndOptions = CalciteSqlParser.compileToSqlNodeAndOptions(query); + RequestUtils.setOptions(sqlNodeAndOptions, jsonRequest); + Assert.assertEquals(sqlNodeAndOptions.getOptions().size(), 4 + LEGACY_PQL_QUERY_OPTION_SIZE); + Assert.assertEquals(sqlNodeAndOptions.getOptions().get("queryOption1"), "bar"); + Assert.assertEquals(sqlNodeAndOptions.getOptions().get("queryOption2"), "moo"); + Assert.assertEquals(sqlNodeAndOptions.getOptions().get(Request.TRACE), "true"); + Assert.assertEquals(sqlNodeAndOptions.getOptions().get("debugOption1"), "foo"); } } diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/request/RequestUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/request/RequestUtils.java index 4f0f7bcca8..3da9b4cb21 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/request/RequestUtils.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/request/RequestUtils.java @@ -18,6 +18,9 @@ */ package org.apache.pinot.common.utils.request; +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Splitter; import java.util.HashMap; import java.util.Map; import org.apache.calcite.sql.SqlLiteral; @@ -30,13 +33,67 @@ import org.apache.pinot.common.request.Identifier; import org.apache.pinot.common.request.Literal; import org.apache.pinot.common.request.PinotQuery; import org.apache.pinot.spi.utils.BytesUtils; +import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.sql.FilterKind; +import org.apache.pinot.sql.parsers.CalciteSqlParser; +import org.apache.pinot.sql.parsers.SqlCompilationException; +import org.apache.pinot.sql.parsers.SqlNodeAndOptions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class RequestUtils { + private static final Logger LOGGER = LoggerFactory.getLogger(RequestUtils.class); + private RequestUtils() { } + public static SqlNodeAndOptions parseQuery(String query, JsonNode request) + throws SqlCompilationException { + long parserStartTimeNs = System.nanoTime(); + SqlNodeAndOptions sqlNodeAndOptions = CalciteSqlParser.compileToSqlNodeAndOptions(query); + setOptions(sqlNodeAndOptions, request); + sqlNodeAndOptions.setParseTimeNs(System.nanoTime() - parserStartTimeNs); + return sqlNodeAndOptions; + } + + /** + * Sets extra options for the given query. + */ + @VisibleForTesting + public static void setOptions(SqlNodeAndOptions sqlNodeAndOptions, JsonNode jsonRequest) { + Map<String, String> queryOptions = new HashMap<>(); + if (jsonRequest.has(CommonConstants.Broker.Request.DEBUG_OPTIONS)) { + Map<String, String> debugOptions = RequestUtils.getOptionsFromJson(jsonRequest, + CommonConstants.Broker.Request.DEBUG_OPTIONS); + // TODO: remove debug options after releasing 0.11.0. + if (!debugOptions.isEmpty()) { + // NOTE: Debug options are deprecated. Put all debug options into query options for backward compatibility. + LOGGER.debug("Debug options are set to: {}", debugOptions); + queryOptions.putAll(debugOptions); + } + } + if (jsonRequest.has(CommonConstants.Broker.Request.QUERY_OPTIONS)) { + Map<String, String> queryOptionsFromJson = RequestUtils.getOptionsFromJson(jsonRequest, + CommonConstants.Broker.Request.QUERY_OPTIONS); + queryOptions.putAll(queryOptionsFromJson); + } + boolean enableTrace = jsonRequest.has(CommonConstants.Broker.Request.TRACE) && jsonRequest.get( + CommonConstants.Broker.Request.TRACE).asBoolean(); + if (enableTrace) { + queryOptions.put(CommonConstants.Broker.Request.TRACE, "true"); + } + if (!queryOptions.isEmpty()) { + LOGGER.debug("Query options are set to: {}", queryOptions); + } + // TODO: Remove the SQL query options after releasing 0.11.0 + // The query engine will break if these 2 options are missing during version upgrade. + queryOptions.put(CommonConstants.Broker.Request.QueryOptionKey.GROUP_BY_MODE, CommonConstants.Broker.Request.SQL); + queryOptions.put(CommonConstants.Broker.Request.QueryOptionKey.RESPONSE_FORMAT, CommonConstants.Broker.Request.SQL); + // Setting all query options back into SqlNodeAndOptions. The above ordering matters due to priority overwrite rule + sqlNodeAndOptions.setExtraOptions(queryOptions); + } + public static Expression getIdentifierExpression(String identifier) { Expression expression = new Expression(ExpressionType.IDENTIFIER); expression.setIdentifier(new Identifier(identifier)); @@ -184,4 +241,12 @@ public class RequestUtils { } return pinotQuery.getDataSource().getTableName(); } + + public static Map<String, String> getOptionsFromJson(JsonNode request, String optionsKey) { + return getOptionsFromString(request.get(optionsKey).asText()); + } + + public static Map<String, String> getOptionsFromString(String optionStr) { + return Splitter.on(';').omitEmptyStrings().trimResults().withKeyValueSeparator('=').split(optionStr); + } } diff --git a/pinot-common/src/main/java/org/apache/pinot/sql/parsers/CalciteSqlParser.java b/pinot-common/src/main/java/org/apache/pinot/sql/parsers/CalciteSqlParser.java index e4bfd4a32a..0065c76c1f 100644 --- a/pinot-common/src/main/java/org/apache/pinot/sql/parsers/CalciteSqlParser.java +++ b/pinot-common/src/main/java/org/apache/pinot/sql/parsers/CalciteSqlParser.java @@ -125,7 +125,7 @@ public class CalciteSqlParser { SqlParserImpl sqlParser = newSqlParser(inStream); SqlNodeList sqlNodeList = sqlParser.SqlStmtsEof(); // Extract OPTION statements from sql. - SqlNodeAndOptions sqlNodeAndOptions = extractSqlNodeAndOptions(sqlNodeList); + SqlNodeAndOptions sqlNodeAndOptions = extractSqlNodeAndOptions(sql, sqlNodeList); // add legacy OPTIONS keyword-based options if (options.size() > 0) { sqlNodeAndOptions.setExtraOptions(extractOptionsMap(options)); @@ -137,7 +137,8 @@ public class CalciteSqlParser { } } - public static SqlNodeAndOptions extractSqlNodeAndOptions(SqlNodeList sqlNodeList) { + @VisibleForTesting + static SqlNodeAndOptions extractSqlNodeAndOptions(String sql, SqlNodeList sqlNodeList) { PinotSqlType sqlType = null; SqlNode statementNode = null; Map<String, String> options = new HashMap<>(); diff --git a/pinot-common/src/main/java/org/apache/pinot/sql/parsers/SqlNodeAndOptions.java b/pinot-common/src/main/java/org/apache/pinot/sql/parsers/SqlNodeAndOptions.java index e03caca1a7..666a8bda54 100644 --- a/pinot-common/src/main/java/org/apache/pinot/sql/parsers/SqlNodeAndOptions.java +++ b/pinot-common/src/main/java/org/apache/pinot/sql/parsers/SqlNodeAndOptions.java @@ -57,6 +57,8 @@ public class SqlNodeAndOptions { } public void setExtraOptions(Map<String, String> extractOptionsMap) { - _options.putAll(extractOptionsMap); + for (Map.Entry<String, String> e : extractOptionsMap.entrySet()) { + _options.putIfAbsent(e.getKey(), e.getValue()); + } } } diff --git a/pinot-common/src/test/java/org/apache/pinot/sql/parsers/CalciteSqlCompilerTest.java b/pinot-common/src/test/java/org/apache/pinot/sql/parsers/CalciteSqlCompilerTest.java index 5801d14f0a..63501cb9b5 100644 --- a/pinot-common/src/test/java/org/apache/pinot/sql/parsers/CalciteSqlCompilerTest.java +++ b/pinot-common/src/test/java/org/apache/pinot/sql/parsers/CalciteSqlCompilerTest.java @@ -2654,7 +2654,7 @@ public class CalciteSqlCompilerTest { SqlParserImpl sqlParser = CalciteSqlParser.newSqlParser(inStream); SqlNodeList sqlNodeList = sqlParser.SqlStmtsEof(); // Extract OPTION statements from sql. - return CalciteSqlParser.extractSqlNodeAndOptions(sqlNodeList); + return CalciteSqlParser.extractSqlNodeAndOptions(sqlString, sqlNodeList); } catch (Exception e) { Assert.fail("test custom sql parser failed", e); return null; diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryResource.java index a382122c33..b8e043319c 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryResource.java @@ -121,9 +121,17 @@ public class PinotQueryResource { } private String executeSqlQuery(@Context HttpHeaders httpHeaders, String sqlQuery, String traceEnabled, - String queryOptions) + @Nullable String queryOptions) throws Exception { - if (queryOptions != null && queryOptions.contains(QueryOptionKey.USE_MULTISTAGE_ENGINE)) { + SqlNodeAndOptions sqlNodeAndOptions = CalciteSqlParser.compileToSqlNodeAndOptions(sqlQuery); + Map<String, String> options = sqlNodeAndOptions.getOptions(); + if (queryOptions != null) { + Map<String, String> optionsFromString = RequestUtils.getOptionsFromString(queryOptions); + sqlNodeAndOptions.setExtraOptions(optionsFromString); + } + + // Determine which engine to used based on query options. + if (Boolean.parseBoolean(options.get(QueryOptionKey.USE_MULTISTAGE_ENGINE))) { if (_controllerConf.getProperty(CommonConstants.Helix.CONFIG_OF_MULTI_STAGE_ENGINE_ENABLED, CommonConstants.Helix.DEFAULT_MULTI_STAGE_ENGINE_ENABLED)) { return getMultiStageQueryResponse(sqlQuery, queryOptions, httpHeaders); @@ -132,7 +140,6 @@ public class PinotQueryResource { + "Please see https://docs.pinot.apache.org/ for instruction to enable V2 engine."); } } else { - SqlNodeAndOptions sqlNodeAndOptions = CalciteSqlParser.compileToSqlNodeAndOptions(sqlQuery); PinotSqlType sqlType = sqlNodeAndOptions.getSqlType(); switch (sqlType) { case DQL: --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org