This is an automated email from the ASF dual-hosted git repository.

siddteotia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 8bf94e7252 refactor query and option compilation (#9239)
8bf94e7252 is described below

commit 8bf94e72526cc02bd1538a05e5faacfb4fd1ca08
Author: Rong Rong <ro...@apache.org>
AuthorDate: Wed Aug 24 14:08:12 2022 -0700

    refactor query and option compilation (#9239)
    
    * refactor sql compiler
    
    - move option parsing and query compile to common utils
    - support option string parsing as well from controller rest api
    
    * revert debug option changes
    
    * fix linter and compile issue
    
    * revert parse request change
    
    Co-authored-by: Rong Rong <ro...@startree.ai>
---
 .../broker/api/resources/PinotClientRequest.java   |   4 +-
 .../requesthandler/BaseBrokerRequestHandler.java   |  63 ++----------
 .../BrokerRequestHandlerDelegate.java              |  37 +++----
 .../MultiStageBrokerRequestHandler.java            |  13 +--
 .../requesthandler/BrokerRequestOptionsTest.java   | 108 ++++++++++-----------
 .../pinot/common/utils/request/RequestUtils.java   |  65 +++++++++++++
 .../apache/pinot/sql/parsers/CalciteSqlParser.java |   5 +-
 .../pinot/sql/parsers/SqlNodeAndOptions.java       |   4 +-
 .../pinot/sql/parsers/CalciteSqlCompilerTest.java  |   2 +-
 .../api/resources/PinotQueryResource.java          |  13 ++-
 10 files changed, 157 insertions(+), 157 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java
index e1ccd67b2f..739dfe24ec 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java
@@ -58,12 +58,12 @@ import org.apache.pinot.common.metrics.BrokerMeter;
 import org.apache.pinot.common.metrics.BrokerMetrics;
 import org.apache.pinot.common.response.BrokerResponse;
 import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.common.utils.request.RequestUtils;
 import org.apache.pinot.core.query.executor.sql.SqlQueryExecutor;
 import org.apache.pinot.spi.trace.RequestScope;
 import org.apache.pinot.spi.trace.Tracing;
 import org.apache.pinot.spi.utils.CommonConstants.Broker.Request;
 import org.apache.pinot.spi.utils.JsonUtils;
-import org.apache.pinot.sql.parsers.CalciteSqlParser;
 import org.apache.pinot.sql.parsers.PinotSqlType;
 import org.apache.pinot.sql.parsers.SqlNodeAndOptions;
 import org.glassfish.jersey.server.ManagedAsync;
@@ -209,7 +209,7 @@ public class PinotClientRequest {
       throws Exception {
     SqlNodeAndOptions sqlNodeAndOptions;
     try {
-      sqlNodeAndOptions = 
CalciteSqlParser.compileToSqlNodeAndOptions(sqlRequestJson.get(Request.SQL).asText());
+      sqlNodeAndOptions = 
RequestUtils.parseQuery(sqlRequestJson.get(Request.SQL).asText(), 
sqlRequestJson);
     } catch (Exception e) {
       return new 
BrokerResponseNative(QueryException.getException(QueryException.SQL_PARSING_ERROR,
 e));
     }
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
index 1322227797..07d9b321f7 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
@@ -21,7 +21,6 @@ package org.apache.pinot.broker.requesthandler;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
-import com.google.common.base.Splitter;
 import com.google.common.util.concurrent.RateLimiter;
 import java.net.InetAddress;
 import java.util.ArrayList;
@@ -290,16 +289,13 @@ public abstract class BaseBrokerRequestHandler implements 
BrokerRequestHandler {
       throws Exception {
     LOGGER.debug("SQL query for request {}: {}", requestId, query);
 
-    // Compile the request
-    long compilationStartTimeNs = System.nanoTime();
+    long compilationStartTimeNs;
     PinotQuery pinotQuery;
     try {
-      if (sqlNodeAndOptions != null) {
-        // Include parse time when the query is already parsed
-        compilationStartTimeNs -= sqlNodeAndOptions.getParseTimeNs();
-      } else {
-        sqlNodeAndOptions = CalciteSqlParser.compileToSqlNodeAndOptions(query);
-      }
+      // Parse the request
+      sqlNodeAndOptions = sqlNodeAndOptions != null ? sqlNodeAndOptions : 
RequestUtils.parseQuery(query, request);
+      // Compile the request into PinotQuery
+      compilationStartTimeNs = System.nanoTime();
       pinotQuery = CalciteSqlParser.compileToPinotQuery(sqlNodeAndOptions);
     } catch (Exception e) {
       LOGGER.info("Caught exception while compiling SQL request {}: {}, {}", 
requestId, query, e.getMessage());
@@ -307,7 +303,6 @@ public abstract class BaseBrokerRequestHandler implements 
BrokerRequestHandler {
       requestContext.setErrorCode(QueryException.SQL_PARSING_ERROR_CODE);
       return new 
BrokerResponseNative(QueryException.getException(QueryException.SQL_PARSING_ERROR,
 e));
     }
-    setOptions(pinotQuery, requestId, query, request);
 
     if (isLiteralOnlyQuery(pinotQuery)) {
       LOGGER.debug("Request {} contains only Literal, skipping server query: 
{}", requestId, query);
@@ -377,8 +372,9 @@ public abstract class BaseBrokerRequestHandler implements 
BrokerRequestHandler {
     }
 
     long compilationEndTimeNs = System.nanoTime();
+    // full request compile time = compilationTimeNs + parserTimeNs
     _brokerMetrics.addPhaseTiming(rawTableName, 
BrokerQueryPhase.REQUEST_COMPILATION,
-        compilationEndTimeNs - compilationStartTimeNs);
+        (compilationEndTimeNs - compilationStartTimeNs) + 
sqlNodeAndOptions.getParseTimeNs());
 
     // Second-stage table-level access control
     // TODO: Modify AccessControl interface to directly take PinotQuery
@@ -1543,11 +1539,6 @@ public abstract class BaseBrokerRequestHandler 
implements BrokerRequestHandler {
     throw new BadQueryRequestException("Unknown columnName '" + columnName + 
"' found in the query");
   }
 
-  public static Map<String, String> getOptionsFromJson(JsonNode request, 
String optionsKey) {
-    return 
Splitter.on(';').omitEmptyStrings().trimResults().withKeyValueSeparator('=')
-        .split(request.get(optionsKey).asText());
-  }
-
   /**
    * Helper function to decide whether to force the log
    *
@@ -1566,46 +1557,6 @@ public abstract class BaseBrokerRequestHandler 
implements BrokerRequestHandler {
     return totalTimeMs > 1000L;
   }
 
-  /**
-   * Sets extra options for the given query.
-   */
-  @VisibleForTesting
-  static void setOptions(PinotQuery pinotQuery, long requestId, String query, 
JsonNode jsonRequest) {
-    Map<String, String> queryOptions = new HashMap<>();
-    if (jsonRequest.has(Broker.Request.DEBUG_OPTIONS)) {
-      Map<String, String> debugOptions = getOptionsFromJson(jsonRequest, 
Broker.Request.DEBUG_OPTIONS);
-      if (!debugOptions.isEmpty()) {
-        // TODO: Do not set debug options after releasing 0.11.0. Currently we 
kept it for backward compatibility.
-        LOGGER.debug("Debug options are set to: {} for request {}: {}", 
debugOptions, requestId, query);
-        pinotQuery.setDebugOptions(debugOptions);
-
-        // NOTE: Debug options are deprecated. Put all debug options into 
query options for backward compatibility.
-        queryOptions.putAll(debugOptions);
-      }
-    }
-    if (jsonRequest.has(Broker.Request.QUERY_OPTIONS)) {
-      Map<String, String> queryOptionsFromJson = 
getOptionsFromJson(jsonRequest, Broker.Request.QUERY_OPTIONS);
-      queryOptions.putAll(queryOptionsFromJson);
-    }
-    Map<String, String> queryOptionsFromQuery = pinotQuery.getQueryOptions();
-    if (queryOptionsFromQuery != null) {
-      queryOptions.putAll(queryOptionsFromQuery);
-    }
-    boolean enableTrace = jsonRequest.has(Broker.Request.TRACE) && 
jsonRequest.get(Broker.Request.TRACE).asBoolean();
-    if (enableTrace) {
-      queryOptions.put(Broker.Request.TRACE, "true");
-    }
-    // NOTE: Always set query options because we will put 'timeoutMs' later
-    pinotQuery.setQueryOptions(queryOptions);
-    if (!queryOptions.isEmpty()) {
-      LOGGER.debug("Query options are set to: {} for request {}: {}", 
queryOptions, requestId, query);
-    }
-    // TODO: Remove the SQL query options after releasing 0.11.0
-    // The query engine will break if these 2 options are missing during 
version upgrade.
-    queryOptions.put(Broker.Request.QueryOptionKey.GROUP_BY_MODE, 
Broker.Request.SQL);
-    queryOptions.put(Broker.Request.QueryOptionKey.RESPONSE_FORMAT, 
Broker.Request.SQL);
-  }
-
   /**
    * Sets the query timeout (remaining time in milliseconds) into the query 
options, and returns the remaining time in
    * milliseconds.
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java
index ce10cb6c2f..c6eee5f04f 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java
@@ -29,11 +29,9 @@ import org.apache.pinot.common.metrics.BrokerMeter;
 import org.apache.pinot.common.metrics.BrokerMetrics;
 import org.apache.pinot.common.response.BrokerResponse;
 import org.apache.pinot.common.response.broker.BrokerResponseNative;
-import org.apache.pinot.spi.exception.BadQueryRequestException;
+import org.apache.pinot.common.utils.request.RequestUtils;
 import org.apache.pinot.spi.trace.RequestContext;
-import org.apache.pinot.spi.utils.CommonConstants.Broker.Request;
-import 
org.apache.pinot.spi.utils.CommonConstants.Broker.Request.QueryOptionKey;
-import org.apache.pinot.sql.parsers.CalciteSqlParser;
+import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.sql.parsers.SqlNodeAndOptions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -84,42 +82,29 @@ public class BrokerRequestHandlerDelegate implements 
BrokerRequestHandler {
       @Nullable RequesterIdentity requesterIdentity, RequestContext 
requestContext)
       throws Exception {
     if (sqlNodeAndOptions == null) {
-      JsonNode sql = request.get(Request.SQL);
-      if (sql == null) {
-        throw new BadQueryRequestException("Failed to find 'sql' in the 
request: " + request);
-      }
       try {
-        sqlNodeAndOptions = 
CalciteSqlParser.compileToSqlNodeAndOptions(sql.asText());
+        sqlNodeAndOptions = 
RequestUtils.parseQuery(request.get(CommonConstants.Broker.Request.SQL).asText(),
 request);
       } catch (Exception e) {
-        LOGGER.info("Caught exception while compiling SQL: {}, {}", 
sql.asText(), e.getMessage());
+        LOGGER.info("Caught exception while compiling SQL: {}, {}", request, 
e.getMessage());
         
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.REQUEST_COMPILATION_EXCEPTIONS,
 1);
         requestContext.setErrorCode(QueryException.SQL_PARSING_ERROR_CODE);
         return new 
BrokerResponseNative(QueryException.getException(QueryException.SQL_PARSING_ERROR,
 e));
       }
     }
+    if (request.has(CommonConstants.Broker.Request.QUERY_OPTIONS)) {
+      
sqlNodeAndOptions.setExtraOptions(RequestUtils.getOptionsFromJson(request,
+          CommonConstants.Broker.Request.QUERY_OPTIONS));
+    }
 
-    if (_multiStageWorkerRequestHandler != null && 
useMultiStageEngine(request, sqlNodeAndOptions)) {
-      return _multiStageWorkerRequestHandler.handleRequest(request, 
sqlNodeAndOptions, requesterIdentity,
-          requestContext);
+    if (_multiStageWorkerRequestHandler != null && 
Boolean.parseBoolean(sqlNodeAndOptions.getOptions().get(
+          
CommonConstants.Broker.Request.QueryOptionKey.USE_MULTISTAGE_ENGINE))) {
+        return _multiStageWorkerRequestHandler.handleRequest(request, 
requesterIdentity, requestContext);
     } else {
       return _singleStageBrokerRequestHandler.handleRequest(request, 
sqlNodeAndOptions, requesterIdentity,
           requestContext);
     }
   }
 
-  private boolean useMultiStageEngine(JsonNode request, SqlNodeAndOptions 
sqlNodeAndOptions) {
-    Map<String, String> optionsFromSql = sqlNodeAndOptions.getOptions();
-    if 
(Boolean.parseBoolean(optionsFromSql.get(QueryOptionKey.USE_MULTISTAGE_ENGINE)))
 {
-      return true;
-    }
-    if (request.has(Request.QUERY_OPTIONS)) {
-      Map<String, String> optionsFromRequest =
-          BaseBrokerRequestHandler.getOptionsFromJson(request, 
Request.QUERY_OPTIONS);
-      return 
Boolean.parseBoolean(optionsFromRequest.get(QueryOptionKey.USE_MULTISTAGE_ENGINE));
-    }
-    return false;
-  }
-
   @Override
   public Map<Long, String> getRunningQueries() {
     // TODO: add support for multiStaged engine: track running queries for 
multiStaged engine and combine its
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
index 8e5546a9dd..5d83a6e329 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
@@ -41,6 +41,7 @@ import 
org.apache.pinot.common.response.broker.BrokerResponseNative;
 import org.apache.pinot.common.response.broker.ResultTable;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.common.utils.request.RequestUtils;
 import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
 import org.apache.pinot.core.transport.ServerInstance;
 import org.apache.pinot.query.QueryEnvironment;
@@ -57,7 +58,6 @@ import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.exception.BadQueryRequestException;
 import org.apache.pinot.spi.trace.RequestContext;
 import org.apache.pinot.spi.utils.CommonConstants;
-import org.apache.pinot.sql.parsers.CalciteSqlParser;
 import org.apache.pinot.sql.parsers.SqlNodeAndOptions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -133,16 +133,12 @@ public class MultiStageBrokerRequestHandler extends 
BaseBrokerRequestHandler {
       throws Exception {
     LOGGER.debug("SQL query for request {}: {}", requestId, query);
 
+    // Parse the request
+    sqlNodeAndOptions = sqlNodeAndOptions != null ? sqlNodeAndOptions : 
RequestUtils.parseQuery(query, request);
     // Compile the request
     long compilationStartTimeNs = System.nanoTime();
     QueryPlan queryPlan;
     try {
-      if (sqlNodeAndOptions != null) {
-        // Include parse time when the query is already parsed
-        compilationStartTimeNs -= sqlNodeAndOptions.getParseTimeNs();
-      } else {
-        sqlNodeAndOptions = CalciteSqlParser.compileToSqlNodeAndOptions(query);
-      }
       queryPlan = _queryEnvironment.planQuery(query, sqlNodeAndOptions);
     } catch (Exception e) {
       LOGGER.info("Caught exception while compiling SQL request {}: {}, {}", 
requestId, query, e.getMessage());
@@ -163,7 +159,8 @@ public class MultiStageBrokerRequestHandler extends 
BaseBrokerRequestHandler {
     long executionEndTimeNs = System.nanoTime();
 
     // Set total query processing time
-    long totalTimeMs = TimeUnit.NANOSECONDS.toMillis(executionEndTimeNs - 
compilationStartTimeNs);
+    long totalTimeMs = 
TimeUnit.NANOSECONDS.toMillis(sqlNodeAndOptions.getParseTimeNs()
+        + (executionEndTimeNs - compilationStartTimeNs));
     brokerResponse.setTimeUsedMs(totalTimeMs);
     brokerResponse.setResultTable(toResultTable(queryResults));
     requestContext.setQueryProcessingTime(totalTimeMs);
diff --git 
a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BrokerRequestOptionsTest.java
 
b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BrokerRequestOptionsTest.java
index 527e4585da..14d1d39bdb 100644
--- 
a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BrokerRequestOptionsTest.java
+++ 
b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BrokerRequestOptionsTest.java
@@ -21,10 +21,11 @@ package org.apache.pinot.broker.requesthandler;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import java.util.HashMap;
 import java.util.Map;
-import org.apache.pinot.common.request.PinotQuery;
+import org.apache.pinot.common.utils.request.RequestUtils;
 import org.apache.pinot.spi.utils.CommonConstants.Broker.Request;
 import org.apache.pinot.spi.utils.JsonUtils;
 import org.apache.pinot.sql.parsers.CalciteSqlParser;
+import org.apache.pinot.sql.parsers.SqlNodeAndOptions;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
@@ -43,49 +44,46 @@ public class BrokerRequestOptionsTest {
 
     // None of the options
     ObjectNode jsonRequest = JsonUtils.newObjectNode();
-    PinotQuery pinotQuery = CalciteSqlParser.compileToPinotQuery(query);
-    BaseBrokerRequestHandler.setOptions(pinotQuery, requestId, query, 
jsonRequest);
-    Assert.assertNull(pinotQuery.getDebugOptions());
-    Assert.assertEquals(pinotQuery.getQueryOptionsSize(), 0 + 
LEGACY_PQL_QUERY_OPTION_SIZE);
+    SqlNodeAndOptions sqlNodeAndOptions = 
CalciteSqlParser.compileToSqlNodeAndOptions(query);;
+    RequestUtils.setOptions(sqlNodeAndOptions, jsonRequest);
+    Assert.assertEquals(sqlNodeAndOptions.getOptions().size(), 0 + 
LEGACY_PQL_QUERY_OPTION_SIZE);
 
     // TRACE
     // Has trace false
     jsonRequest.put(Request.TRACE, false);
-    pinotQuery = CalciteSqlParser.compileToPinotQuery(query);
-    BaseBrokerRequestHandler.setOptions(pinotQuery, requestId, query, 
jsonRequest);
-    Assert.assertNull(pinotQuery.getDebugOptions());
-    Assert.assertEquals(pinotQuery.getQueryOptionsSize(), 0 + 
LEGACY_PQL_QUERY_OPTION_SIZE);
+    sqlNodeAndOptions = CalciteSqlParser.compileToSqlNodeAndOptions(query);
+    RequestUtils.setOptions(sqlNodeAndOptions, jsonRequest);
+    Assert.assertEquals(sqlNodeAndOptions.getOptions().size(), 0 + 
LEGACY_PQL_QUERY_OPTION_SIZE);
 
     // Has trace true
     jsonRequest.put(Request.TRACE, true);
-    pinotQuery = CalciteSqlParser.compileToPinotQuery(query);
-    BaseBrokerRequestHandler.setOptions(pinotQuery, requestId, query, 
jsonRequest);
-    Assert.assertNull(pinotQuery.getDebugOptions());
-    Assert.assertEquals(pinotQuery.getQueryOptionsSize(), 1 + 
LEGACY_PQL_QUERY_OPTION_SIZE);
-    Assert.assertEquals(pinotQuery.getQueryOptions().get(Request.TRACE), 
"true");
+    sqlNodeAndOptions = CalciteSqlParser.compileToSqlNodeAndOptions(query);
+    RequestUtils.setOptions(sqlNodeAndOptions, jsonRequest);
+    Assert.assertEquals(sqlNodeAndOptions.getOptions().size(), 1 + 
LEGACY_PQL_QUERY_OPTION_SIZE);
+    Assert.assertEquals(sqlNodeAndOptions.getOptions().get(Request.TRACE), 
"true");
 
     // DEBUG_OPTIONS (debug options will also be included as query options)
     // Has debugOptions
     jsonRequest = JsonUtils.newObjectNode();
     jsonRequest.put(Request.DEBUG_OPTIONS, "debugOption1=foo");
-    pinotQuery = CalciteSqlParser.compileToPinotQuery(query);
-    BaseBrokerRequestHandler.setOptions(pinotQuery, requestId, query, 
jsonRequest);
-    Assert.assertEquals(pinotQuery.getDebugOptionsSize(), 1);
-    Assert.assertEquals(pinotQuery.getDebugOptions().get("debugOption1"), 
"foo");
+    sqlNodeAndOptions = CalciteSqlParser.compileToSqlNodeAndOptions(query);
+    RequestUtils.setOptions(sqlNodeAndOptions, jsonRequest);
+    Assert.assertEquals(sqlNodeAndOptions.getOptions().size(), 1 + 
LEGACY_PQL_QUERY_OPTION_SIZE);
+    Assert.assertEquals(sqlNodeAndOptions.getOptions().get("debugOption1"), 
"foo");
 
     // Has multiple debugOptions
     jsonRequest.put(Request.DEBUG_OPTIONS, 
"debugOption1=foo;debugOption2=bar");
-    pinotQuery = CalciteSqlParser.compileToPinotQuery(query);
-    BaseBrokerRequestHandler.setOptions(pinotQuery, requestId, query, 
jsonRequest);
-    Assert.assertEquals(pinotQuery.getDebugOptionsSize(), 2);
-    Assert.assertEquals(pinotQuery.getDebugOptions().get("debugOption1"), 
"foo");
-    Assert.assertEquals(pinotQuery.getDebugOptions().get("debugOption2"), 
"bar");
+    sqlNodeAndOptions = CalciteSqlParser.compileToSqlNodeAndOptions(query);
+    RequestUtils.setOptions(sqlNodeAndOptions, jsonRequest);
+    Assert.assertEquals(sqlNodeAndOptions.getOptions().size(), 2 + 
LEGACY_PQL_QUERY_OPTION_SIZE);
+    Assert.assertEquals(sqlNodeAndOptions.getOptions().get("debugOption1"), 
"foo");
+    Assert.assertEquals(sqlNodeAndOptions.getOptions().get("debugOption2"), 
"bar");
 
     // Invalid debug options
     jsonRequest.put(Request.DEBUG_OPTIONS, "debugOption1");
-    pinotQuery = CalciteSqlParser.compileToPinotQuery(query);
+    sqlNodeAndOptions = CalciteSqlParser.compileToSqlNodeAndOptions(query);
     try {
-      BaseBrokerRequestHandler.setOptions(pinotQuery, requestId, query, 
jsonRequest);
+      RequestUtils.setOptions(sqlNodeAndOptions, jsonRequest);
       Assert.fail();
     } catch (Exception e) {
       // Expected
@@ -93,53 +91,47 @@ public class BrokerRequestOptionsTest {
 
     // QUERY_OPTIONS
     jsonRequest = JsonUtils.newObjectNode();
-    // Has queryOptions in pinotQuery already
-    pinotQuery = CalciteSqlParser.compileToPinotQuery(query);
+    // Has queryOptions in sqlNodeAndOptions already
+    sqlNodeAndOptions = CalciteSqlParser.compileToSqlNodeAndOptions(query);
     Map<String, String> queryOptions = new HashMap<>();
     queryOptions.put("queryOption1", "foo");
-    pinotQuery.setQueryOptions(queryOptions);
-    BaseBrokerRequestHandler.setOptions(pinotQuery, requestId, query, 
jsonRequest);
-    Assert.assertNull(pinotQuery.getDebugOptions());
-    Assert.assertEquals(pinotQuery.getQueryOptionsSize(), 1 + 
LEGACY_PQL_QUERY_OPTION_SIZE);
-    Assert.assertEquals(pinotQuery.getQueryOptions().get("queryOption1"), 
"foo");
+    sqlNodeAndOptions.getOptions().putAll(queryOptions);
+    RequestUtils.setOptions(sqlNodeAndOptions, jsonRequest);
+    Assert.assertEquals(sqlNodeAndOptions.getOptions().size(), 1 + 
LEGACY_PQL_QUERY_OPTION_SIZE);
+    Assert.assertEquals(sqlNodeAndOptions.getOptions().get("queryOption1"), 
"foo");
 
     // Has queryOptions in query
-    pinotQuery = CalciteSqlParser.compileToPinotQuery("SET queryOption1='foo'; 
select * from testTable");
-    BaseBrokerRequestHandler.setOptions(pinotQuery, requestId, query, 
jsonRequest);
-    Assert.assertNull(pinotQuery.getDebugOptions());
-    Assert.assertEquals(pinotQuery.getQueryOptionsSize(), 1 + 
LEGACY_PQL_QUERY_OPTION_SIZE);
-    Assert.assertEquals(pinotQuery.getQueryOptions().get("queryOption1"), 
"foo");
+    sqlNodeAndOptions = CalciteSqlParser.compileToSqlNodeAndOptions("SET 
queryOption1='foo'; select * from testTable");
+    RequestUtils.setOptions(sqlNodeAndOptions, jsonRequest);
+    Assert.assertEquals(sqlNodeAndOptions.getOptions().size(), 1 + 
LEGACY_PQL_QUERY_OPTION_SIZE);
+    Assert.assertEquals(sqlNodeAndOptions.getOptions().get("queryOption1"), 
"foo");
 
     // Has query options in json payload
     jsonRequest.put(Request.QUERY_OPTIONS, "queryOption1=foo");
-    pinotQuery = CalciteSqlParser.compileToPinotQuery(query);
-    BaseBrokerRequestHandler.setOptions(pinotQuery, requestId, query, 
jsonRequest);
-    Assert.assertNull(pinotQuery.getDebugOptions());
-    Assert.assertEquals(pinotQuery.getQueryOptionsSize(), 1 + 
LEGACY_PQL_QUERY_OPTION_SIZE);
-    Assert.assertEquals(pinotQuery.getQueryOptions().get("queryOption1"), 
"foo");
+    sqlNodeAndOptions = CalciteSqlParser.compileToSqlNodeAndOptions(query);
+    RequestUtils.setOptions(sqlNodeAndOptions, jsonRequest);
+    Assert.assertEquals(sqlNodeAndOptions.getOptions().size(), 1 + 
LEGACY_PQL_QUERY_OPTION_SIZE);
+    Assert.assertEquals(sqlNodeAndOptions.getOptions().get("queryOption1"), 
"foo");
 
-    // Has query options in both json payload and pinotQuery, pinotQuery takes 
priority
+    // Has query options in both json payload and sqlNodeAndOptions, 
sqlNodeAndOptions takes priority
     jsonRequest.put(Request.QUERY_OPTIONS, 
"queryOption1=bar;queryOption2=moo");
-    pinotQuery = CalciteSqlParser.compileToPinotQuery("SET queryOption1='foo'; 
select * from testTable;");
-    BaseBrokerRequestHandler.setOptions(pinotQuery, requestId, query, 
jsonRequest);
-    Assert.assertNull(pinotQuery.getDebugOptions());
-    Assert.assertEquals(pinotQuery.getQueryOptionsSize(), 2 + 
LEGACY_PQL_QUERY_OPTION_SIZE);
-    Assert.assertEquals(pinotQuery.getQueryOptions().get("queryOption1"), 
"foo");
-    Assert.assertEquals(pinotQuery.getQueryOptions().get("queryOption2"), 
"moo");
+    sqlNodeAndOptions = CalciteSqlParser.compileToSqlNodeAndOptions("SET 
queryOption1='foo'; select * from testTable;");
+    RequestUtils.setOptions(sqlNodeAndOptions, jsonRequest);
+    Assert.assertEquals(sqlNodeAndOptions.getOptions().size(), 2 + 
LEGACY_PQL_QUERY_OPTION_SIZE);
+    Assert.assertEquals(sqlNodeAndOptions.getOptions().get("queryOption1"), 
"foo");
+    Assert.assertEquals(sqlNodeAndOptions.getOptions().get("queryOption2"), 
"moo");
 
     // Has all 3
     jsonRequest = JsonUtils.newObjectNode();
     jsonRequest.put(Request.TRACE, true);
     jsonRequest.put(Request.DEBUG_OPTIONS, "debugOption1=foo");
     jsonRequest.put(Request.QUERY_OPTIONS, 
"queryOption1=bar;queryOption2=moo");
-    pinotQuery = CalciteSqlParser.compileToPinotQuery(query);
-    BaseBrokerRequestHandler.setOptions(pinotQuery, requestId, query, 
jsonRequest);
-    Assert.assertEquals(pinotQuery.getDebugOptionsSize(), 1);
-    Assert.assertEquals(pinotQuery.getDebugOptions().get("debugOption1"), 
"foo");
-    Assert.assertEquals(pinotQuery.getQueryOptionsSize(), 4 + 
LEGACY_PQL_QUERY_OPTION_SIZE);
-    Assert.assertEquals(pinotQuery.getQueryOptions().get("queryOption1"), 
"bar");
-    Assert.assertEquals(pinotQuery.getQueryOptions().get("queryOption2"), 
"moo");
-    Assert.assertEquals(pinotQuery.getQueryOptions().get(Request.TRACE), 
"true");
-    Assert.assertEquals(pinotQuery.getDebugOptions().get("debugOption1"), 
"foo");
+    sqlNodeAndOptions = CalciteSqlParser.compileToSqlNodeAndOptions(query);
+    RequestUtils.setOptions(sqlNodeAndOptions, jsonRequest);
+    Assert.assertEquals(sqlNodeAndOptions.getOptions().size(), 4 + 
LEGACY_PQL_QUERY_OPTION_SIZE);
+    Assert.assertEquals(sqlNodeAndOptions.getOptions().get("queryOption1"), 
"bar");
+    Assert.assertEquals(sqlNodeAndOptions.getOptions().get("queryOption2"), 
"moo");
+    Assert.assertEquals(sqlNodeAndOptions.getOptions().get(Request.TRACE), 
"true");
+    Assert.assertEquals(sqlNodeAndOptions.getOptions().get("debugOption1"), 
"foo");
   }
 }
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/request/RequestUtils.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/request/RequestUtils.java
index 4f0f7bcca8..3da9b4cb21 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/request/RequestUtils.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/request/RequestUtils.java
@@ -18,6 +18,9 @@
  */
 package org.apache.pinot.common.utils.request;
 
+import com.fasterxml.jackson.databind.JsonNode;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Splitter;
 import java.util.HashMap;
 import java.util.Map;
 import org.apache.calcite.sql.SqlLiteral;
@@ -30,13 +33,67 @@ import org.apache.pinot.common.request.Identifier;
 import org.apache.pinot.common.request.Literal;
 import org.apache.pinot.common.request.PinotQuery;
 import org.apache.pinot.spi.utils.BytesUtils;
+import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.sql.FilterKind;
+import org.apache.pinot.sql.parsers.CalciteSqlParser;
+import org.apache.pinot.sql.parsers.SqlCompilationException;
+import org.apache.pinot.sql.parsers.SqlNodeAndOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 public class RequestUtils {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(RequestUtils.class);
+
   private RequestUtils() {
   }
 
+  public static SqlNodeAndOptions parseQuery(String query, JsonNode request)
+      throws SqlCompilationException {
+    long parserStartTimeNs = System.nanoTime();
+    SqlNodeAndOptions sqlNodeAndOptions = 
CalciteSqlParser.compileToSqlNodeAndOptions(query);
+    setOptions(sqlNodeAndOptions, request);
+    sqlNodeAndOptions.setParseTimeNs(System.nanoTime() - parserStartTimeNs);
+    return sqlNodeAndOptions;
+  }
+
+  /**
+   * Sets extra options for the given query.
+   */
+  @VisibleForTesting
+  public static void setOptions(SqlNodeAndOptions sqlNodeAndOptions, JsonNode 
jsonRequest) {
+    Map<String, String> queryOptions = new HashMap<>();
+    if (jsonRequest.has(CommonConstants.Broker.Request.DEBUG_OPTIONS)) {
+      Map<String, String> debugOptions = 
RequestUtils.getOptionsFromJson(jsonRequest,
+          CommonConstants.Broker.Request.DEBUG_OPTIONS);
+      // TODO: remove debug options after releasing 0.11.0.
+      if (!debugOptions.isEmpty()) {
+        // NOTE: Debug options are deprecated. Put all debug options into 
query options for backward compatibility.
+        LOGGER.debug("Debug options are set to: {}", debugOptions);
+        queryOptions.putAll(debugOptions);
+      }
+    }
+    if (jsonRequest.has(CommonConstants.Broker.Request.QUERY_OPTIONS)) {
+      Map<String, String> queryOptionsFromJson = 
RequestUtils.getOptionsFromJson(jsonRequest,
+          CommonConstants.Broker.Request.QUERY_OPTIONS);
+      queryOptions.putAll(queryOptionsFromJson);
+    }
+    boolean enableTrace = 
jsonRequest.has(CommonConstants.Broker.Request.TRACE) && jsonRequest.get(
+        CommonConstants.Broker.Request.TRACE).asBoolean();
+    if (enableTrace) {
+      queryOptions.put(CommonConstants.Broker.Request.TRACE, "true");
+    }
+    if (!queryOptions.isEmpty()) {
+      LOGGER.debug("Query options are set to: {}", queryOptions);
+    }
+    // TODO: Remove the SQL query options after releasing 0.11.0
+    // The query engine will break if these 2 options are missing during 
version upgrade.
+    
queryOptions.put(CommonConstants.Broker.Request.QueryOptionKey.GROUP_BY_MODE, 
CommonConstants.Broker.Request.SQL);
+    
queryOptions.put(CommonConstants.Broker.Request.QueryOptionKey.RESPONSE_FORMAT, 
CommonConstants.Broker.Request.SQL);
+    // Setting all query options back into SqlNodeAndOptions. The above 
ordering matters due to priority overwrite rule
+    sqlNodeAndOptions.setExtraOptions(queryOptions);
+  }
+
   public static Expression getIdentifierExpression(String identifier) {
     Expression expression = new Expression(ExpressionType.IDENTIFIER);
     expression.setIdentifier(new Identifier(identifier));
@@ -184,4 +241,12 @@ public class RequestUtils {
     }
     return pinotQuery.getDataSource().getTableName();
   }
+
+  public static Map<String, String> getOptionsFromJson(JsonNode request, 
String optionsKey) {
+    return getOptionsFromString(request.get(optionsKey).asText());
+  }
+
+  public static Map<String, String> getOptionsFromString(String optionStr) {
+    return 
Splitter.on(';').omitEmptyStrings().trimResults().withKeyValueSeparator('=').split(optionStr);
+  }
 }
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/sql/parsers/CalciteSqlParser.java 
b/pinot-common/src/main/java/org/apache/pinot/sql/parsers/CalciteSqlParser.java
index e4bfd4a32a..0065c76c1f 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/sql/parsers/CalciteSqlParser.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/sql/parsers/CalciteSqlParser.java
@@ -125,7 +125,7 @@ public class CalciteSqlParser {
       SqlParserImpl sqlParser = newSqlParser(inStream);
       SqlNodeList sqlNodeList = sqlParser.SqlStmtsEof();
       // Extract OPTION statements from sql.
-      SqlNodeAndOptions sqlNodeAndOptions = 
extractSqlNodeAndOptions(sqlNodeList);
+      SqlNodeAndOptions sqlNodeAndOptions = extractSqlNodeAndOptions(sql, 
sqlNodeList);
       // add legacy OPTIONS keyword-based options
       if (options.size() > 0) {
         sqlNodeAndOptions.setExtraOptions(extractOptionsMap(options));
@@ -137,7 +137,8 @@ public class CalciteSqlParser {
     }
   }
 
-  public static SqlNodeAndOptions extractSqlNodeAndOptions(SqlNodeList 
sqlNodeList) {
+  @VisibleForTesting
+  static SqlNodeAndOptions extractSqlNodeAndOptions(String sql, SqlNodeList 
sqlNodeList) {
     PinotSqlType sqlType = null;
     SqlNode statementNode = null;
     Map<String, String> options = new HashMap<>();
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/sql/parsers/SqlNodeAndOptions.java
 
b/pinot-common/src/main/java/org/apache/pinot/sql/parsers/SqlNodeAndOptions.java
index e03caca1a7..666a8bda54 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/sql/parsers/SqlNodeAndOptions.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/sql/parsers/SqlNodeAndOptions.java
@@ -57,6 +57,8 @@ public class SqlNodeAndOptions {
   }
 
   public void setExtraOptions(Map<String, String> extractOptionsMap) {
-    _options.putAll(extractOptionsMap);
+    for (Map.Entry<String, String> e : extractOptionsMap.entrySet()) {
+      _options.putIfAbsent(e.getKey(), e.getValue());
+    }
   }
 }
diff --git 
a/pinot-common/src/test/java/org/apache/pinot/sql/parsers/CalciteSqlCompilerTest.java
 
b/pinot-common/src/test/java/org/apache/pinot/sql/parsers/CalciteSqlCompilerTest.java
index 5801d14f0a..63501cb9b5 100644
--- 
a/pinot-common/src/test/java/org/apache/pinot/sql/parsers/CalciteSqlCompilerTest.java
+++ 
b/pinot-common/src/test/java/org/apache/pinot/sql/parsers/CalciteSqlCompilerTest.java
@@ -2654,7 +2654,7 @@ public class CalciteSqlCompilerTest {
       SqlParserImpl sqlParser = CalciteSqlParser.newSqlParser(inStream);
       SqlNodeList sqlNodeList = sqlParser.SqlStmtsEof();
       // Extract OPTION statements from sql.
-      return CalciteSqlParser.extractSqlNodeAndOptions(sqlNodeList);
+      return CalciteSqlParser.extractSqlNodeAndOptions(sqlString, sqlNodeList);
     } catch (Exception e) {
       Assert.fail("test custom sql parser failed", e);
       return null;
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryResource.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryResource.java
index a382122c33..b8e043319c 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryResource.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryResource.java
@@ -121,9 +121,17 @@ public class PinotQueryResource {
   }
 
   private String executeSqlQuery(@Context HttpHeaders httpHeaders, String 
sqlQuery, String traceEnabled,
-      String queryOptions)
+      @Nullable String queryOptions)
       throws Exception {
-    if (queryOptions != null && 
queryOptions.contains(QueryOptionKey.USE_MULTISTAGE_ENGINE)) {
+    SqlNodeAndOptions sqlNodeAndOptions = 
CalciteSqlParser.compileToSqlNodeAndOptions(sqlQuery);
+    Map<String, String> options = sqlNodeAndOptions.getOptions();
+    if (queryOptions != null) {
+      Map<String, String> optionsFromString = 
RequestUtils.getOptionsFromString(queryOptions);
+      sqlNodeAndOptions.setExtraOptions(optionsFromString);
+    }
+
+    // Determine which engine to used based on query options.
+    if 
(Boolean.parseBoolean(options.get(QueryOptionKey.USE_MULTISTAGE_ENGINE))) {
       if 
(_controllerConf.getProperty(CommonConstants.Helix.CONFIG_OF_MULTI_STAGE_ENGINE_ENABLED,
           CommonConstants.Helix.DEFAULT_MULTI_STAGE_ENGINE_ENABLED)) {
         return getMultiStageQueryResponse(sqlQuery, queryOptions, httpHeaders);
@@ -132,7 +140,6 @@ public class PinotQueryResource {
             + "Please see https://docs.pinot.apache.org/ for instruction to 
enable V2 engine.");
       }
     } else {
-      SqlNodeAndOptions sqlNodeAndOptions = 
CalciteSqlParser.compileToSqlNodeAndOptions(sqlQuery);
       PinotSqlType sqlType = sqlNodeAndOptions.getSqlType();
       switch (sqlType) {
         case DQL:


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to