This is an automated email from the ASF dual-hosted git repository.

gortiz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 835949b645 Automatically detect whether a v1 query could have run on 
the v2 query engine (#13628)
835949b645 is described below

commit 835949b64529b8e5c0bb49474cdec2e17c9050f7
Author: Yash Mayya <yash.ma...@gmail.com>
AuthorDate: Tue Jul 30 15:20:22 2024 +0530

    Automatically detect whether a v1 query could have run on the v2 query 
engine (#13628)
---
 .../BaseSingleStageBrokerRequestHandler.java       |  59 +++++++++
 .../requesthandler/GrpcBrokerRequestHandler.java   |   2 +
 .../SingleConnectionBrokerRequestHandler.java      |   2 +
 .../apache/pinot/common/metrics/BrokerMeter.java   |  12 +-
 .../tests/JmxMetricsIntegrationTest.java           | 145 +++++++++++++++++++++
 .../org/apache/pinot/query/QueryEnvironment.java   |  21 +++
 .../pinot/query/parser/utils/ParserUtils.java      |  16 +--
 .../apache/pinot/spi/utils/CommonConstants.java    |   8 ++
 8 files changed, 254 insertions(+), 11 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
index 27e5c19cac..6a839d5e36 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
@@ -29,9 +29,13 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletionService;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.stream.Collectors;
@@ -138,6 +142,9 @@ public abstract class BaseSingleStageBrokerRequestHandler 
extends BaseBrokerRequ
   protected final boolean _enableDistinctCountBitmapOverride;
   protected final int _queryResponseLimit;
   protected final Map<Long, QueryServers> _queriesById;
+  protected final boolean _enableMultistageMigrationMetric;
+  protected ExecutorService _multistageCompileExecutor;
+  protected BlockingQueue<Pair<String, String>> _multistageCompileQueryQueue;
 
   public BaseSingleStageBrokerRequestHandler(PinotConfiguration config, String 
brokerId,
       BrokerRoutingManager routingManager, AccessControlFactory 
accessControlFactory,
@@ -155,12 +162,53 @@ public abstract class BaseSingleStageBrokerRequestHandler 
extends BaseBrokerRequ
     boolean enableQueryCancellation =
         
Boolean.parseBoolean(config.getProperty(Broker.CONFIG_OF_BROKER_ENABLE_QUERY_CANCELLATION));
     _queriesById = enableQueryCancellation ? new ConcurrentHashMap<>() : null;
+
+    _enableMultistageMigrationMetric = 
_config.getProperty(Broker.CONFIG_OF_BROKER_ENABLE_MULTISTAGE_MIGRATION_METRIC,
+        Broker.DEFAULT_ENABLE_MULTISTAGE_MIGRATION_METRIC);
+    if (_enableMultistageMigrationMetric) {
+      _multistageCompileExecutor = Executors.newSingleThreadExecutor();
+      _multistageCompileQueryQueue = new LinkedBlockingQueue<>(1000);
+    }
+
     LOGGER.info("Initialized {} with broker id: {}, timeout: {}ms, query 
response limit: {}, query log max length: {}, "
             + "query log max rate: {}, query cancellation enabled: {}", 
getClass().getSimpleName(), _brokerId,
         _brokerTimeoutMs, _queryResponseLimit, 
_queryLogger.getMaxQueryLengthToLog(), _queryLogger.getLogRateLimit(),
         enableQueryCancellation);
   }
 
+  @Override
+  public void start() {
+    if (_enableMultistageMigrationMetric) {
+      _multistageCompileExecutor.submit(() -> {
+        while (!Thread.currentThread().isInterrupted()) {
+          Pair<String, String> query;
+          try {
+            query = _multistageCompileQueryQueue.take();
+          } catch (InterruptedException e) {
+            // Exit gracefully when the thread is interrupted, presumably when 
this single thread executor is shutdown.
+            // Since this task is all that this single thread is doing, 
there's no need to preserve the thread's
+            // interrupt status flag.
+            return;
+          }
+          String queryString = query.getLeft();
+          String database = query.getRight();
+
+          // Check if the query is a v2 supported query
+          if (!ParserUtils.canCompileWithMultiStageEngine(queryString, 
database, _tableCache)) {
+            
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.SINGLE_STAGE_QUERIES_INVALID_MULTI_STAGE,
 1);
+          }
+        }
+      });
+    }
+  }
+
+  @Override
+  public void shutDown() {
+    if (_enableMultistageMigrationMetric) {
+      _multistageCompileExecutor.shutdownNow();
+    }
+  }
+
   @Override
   public Map<Long, String> getRunningQueries() {
     Preconditions.checkState(_queriesById != null, "Query cancellation is not 
enabled on broker");
@@ -478,8 +526,19 @@ public abstract class BaseSingleStageBrokerRequestHandler 
extends BaseBrokerRequ
       }
 
       _brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.QUERIES, 
1);
+      _brokerMetrics.addMeteredGlobalValue(BrokerMeter.QUERIES_GLOBAL, 1);
       _brokerMetrics.addValueToTableGauge(rawTableName, 
BrokerGauge.REQUEST_SIZE, query.length());
 
+      if (!pinotQuery.isExplain() && _enableMultistageMigrationMetric) {
+        // Check if the query is a v2 supported query
+        String database = 
DatabaseUtils.extractDatabaseFromQueryRequest(sqlNodeAndOptions.getOptions(), 
httpHeaders);
+        // Attempt to add the query to the compile queue; drop if queue is full
+        if (!_multistageCompileQueryQueue.offer(Pair.of(query, database))) {
+          LOGGER.trace("Not compiling query `{}` using the multi-stage query 
engine because the query queue is full",
+              query);
+        }
+      }
+
       // Prepare OFFLINE and REALTIME requests
       BrokerRequest offlineBrokerRequest = null;
       BrokerRequest realtimeBrokerRequest = null;
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java
index 96e06ffec5..0484476a41 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java
@@ -63,10 +63,12 @@ public class GrpcBrokerRequestHandler extends 
BaseSingleStageBrokerRequestHandle
 
   @Override
   public void start() {
+    super.start();
   }
 
   @Override
   public void shutDown() {
+    super.shutDown();
     _streamingQueryClient.shutdown();
     _streamingReduceService.shutDown();
   }
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java
index a51634c983..349affe6c8 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java
@@ -82,12 +82,14 @@ public class SingleConnectionBrokerRequestHandler extends 
BaseSingleStageBrokerR
 
   @Override
   public void start() {
+    super.start();
     _failureDetector.register(this);
     _failureDetector.start();
   }
 
   @Override
   public void shutDown() {
+    super.shutDown();
     _failureDetector.stop();
     _queryRouter.shutDown();
     _brokerReduceService.shutDown();
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java
index d6b7af9840..086d5ffd9e 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java
@@ -35,6 +35,13 @@ public enum BrokerMeter implements AbstractMetrics.Meter {
    * At this moment this counter does not include queries executed in 
multi-stage mode.
    */
   QUERIES("queries", false),
+  /**
+   * Number of single-stage queries that have been started.
+   * <p>
+   * Unlike {@link #QUERIES}, this metric is global and not attached to a 
particular table.
+   * That means it can be used to know how many single-stage queries have been 
started in total.
+   */
+  QUERIES_GLOBAL("queries", true),
   /**
    * Number of multi-stage queries that have been started.
    * <p>
@@ -49,7 +56,10 @@ public enum BrokerMeter implements AbstractMetrics.Meter {
    * sum of this metric across all tables should be greater or equal than 
{@link #MULTI_STAGE_QUERIES_GLOBAL}.
    */
   MULTI_STAGE_QUERIES("queries", false),
-
+  /**
+   * Number of single-stage queries executed that would not have successfully 
run on the multi-stage query engine as is.
+   */
+  SINGLE_STAGE_QUERIES_INVALID_MULTI_STAGE("queries", true),
   // These metrics track the exceptions caught during query execution in 
broker side.
   // Query rejected by Jersey thread pool executor
   QUERY_REJECTED_EXCEPTIONS("exceptions", true),
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/JmxMetricsIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/JmxMetricsIntegrationTest.java
new file mode 100644
index 0000000000..2e1873d3f8
--- /dev/null
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/JmxMetricsIntegrationTest.java
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import java.io.File;
+import java.lang.management.ManagementFactory;
+import java.util.Hashtable;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.util.TestUtils;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+
+
+/**
+ * Tests that verify JMX metrics emitted by various Pinot components.
+ */
+public class JmxMetricsIntegrationTest extends BaseClusterIntegrationTestSet {
+
+  private static final int NUM_BROKERS = 1;
+  private static final int NUM_SERVERS = 1;
+
+  private static final MBeanServer MBEAN_SERVER = 
ManagementFactory.getPlatformMBeanServer();
+  private static final String PINOT_JMX_METRICS_DOMAIN = 
"\"org.apache.pinot.common.metrics\"";
+  private static final String BROKER_METRICS_TYPE = "\"BrokerMetrics\"";
+
+  @BeforeClass
+  public void setUp() throws Exception {
+    TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
+
+    // Start the Pinot cluster
+    startZk();
+    startController();
+    startBrokers(NUM_BROKERS);
+    startServers(NUM_SERVERS);
+
+    // Create and upload the schema and table config
+    Schema schema = createSchema();
+    addSchema(schema);
+    TableConfig tableConfig = createOfflineTableConfig();
+    addTableConfig(tableConfig);
+
+    // Unpack the Avro files
+    List<File> avroFiles = unpackAvroData(_tempDir);
+
+    // Create and upload segments
+    ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, tableConfig, 
schema, 0, _segmentDir, _tarDir);
+    uploadSegments(getTableName(), _tarDir);
+
+    // Wait for all documents loaded
+    waitForAllDocsLoaded(600_000L);
+  }
+
+  @Test
+  public void testMultiStageMigrationMetric() throws Exception {
+    ObjectName multiStageMigrationMetric = new 
ObjectName(PINOT_JMX_METRICS_DOMAIN,
+        new Hashtable<>(Map.of("type", BROKER_METRICS_TYPE,
+            "name", "\"pinot.broker.singleStageQueriesInvalidMultiStage\"")));
+
+    ObjectName queriesGlobalMetric = new ObjectName(PINOT_JMX_METRICS_DOMAIN,
+        new Hashtable<>(Map.of("type", BROKER_METRICS_TYPE,
+            "name", "\"pinot.broker.queriesGlobal\"")));
+
+    // Some queries are run during setup to ensure that all the docs are loaded
+    long initialQueryCount = (Long) 
MBEAN_SERVER.getAttribute(queriesGlobalMetric, "Count");
+    assertTrue(initialQueryCount > 0L);
+    assertEquals((Long) MBEAN_SERVER.getAttribute(multiStageMigrationMetric, 
"Count"), 0L);
+
+    postQuery("SELECT COUNT(*) FROM mytable");
+
+    // Run some queries that are known to not work as is with the multi-stage 
query engine
+
+    // Type differences
+    // STRING is VARCHAR in v2
+    JsonNode response = postQuery("SELECT CAST(ArrTime AS STRING) FROM 
mytable");
+    assertFalse(response.get("resultTable").get("rows").isEmpty());
+    // LONG is BIGINT in v2
+    response = postQuery("SELECT CAST(ArrTime AS LONG) FROM mytable");
+    assertFalse(response.get("resultTable").get("rows").isEmpty());
+    // FLOAT_ARRAY is FLOAT ARRAY in v2
+    response = postQuery("SELECT CAST(DivAirportIDs AS FLOAT_ARRAY) FROM 
mytable");
+    assertFalse(response.get("resultTable").get("rows").isEmpty());
+
+    // MV column requires ARRAY_TO_MV wrapper to be used in filter predicates
+    response = postQuery("SELECT COUNT(*) FROM mytable WHERE DivAirports = 
'JFK'");
+    assertFalse(response.get("resultTable").get("rows").isEmpty());
+
+    // Unsupported function
+    response = postQuery("SELECT AirlineID, count(*) FROM mytable WHERE 
IN_SUBQUERY(airlineID, 'SELECT "
+        + "ID_SET(AirlineID) FROM mytable WHERE Carrier = ''AA''') = 1 GROUP 
BY AirlineID;");
+    assertFalse(response.get("resultTable").get("rows").isEmpty());
+
+    // Repeated columns in an ORDER BY query
+    response = postQuery("SELECT AirTime, AirTime FROM mytable ORDER BY 
AirTime");
+    assertFalse(response.get("resultTable").get("rows").isEmpty());
+
+    assertEquals((Long) MBEAN_SERVER.getAttribute(queriesGlobalMetric, 
"Count"), initialQueryCount + 8L);
+
+    AtomicLong multiStageMigrationMetricValue = new AtomicLong();
+    TestUtils.waitForCondition((aVoid) -> {
+      try {
+        multiStageMigrationMetricValue.set((Long) 
MBEAN_SERVER.getAttribute(multiStageMigrationMetric, "Count"));
+        return multiStageMigrationMetricValue.get() == 6L;
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    }, 5000, "Expected value of MBean 
'pinot.broker.singleStageQueriesInvalidMultiStage' to be: "
+        + 6L + "; actual value: " + multiStageMigrationMetricValue.get());
+
+    assertEquals((Long) MBEAN_SERVER.getAttribute(multiStageMigrationMetric, 
"Count"), 6L);
+  }
+
+  @Override
+  protected void overrideBrokerConf(PinotConfiguration brokerConf) {
+    
brokerConf.setProperty(CommonConstants.Broker.CONFIG_OF_BROKER_ENABLE_MULTISTAGE_MIGRATION_METRIC,
 "true");
+  }
+}
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
index 1a54eec610..cdcfc2f173 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
@@ -71,6 +71,8 @@ import org.apache.pinot.query.validate.BytesCastVisitor;
 import org.apache.pinot.sql.parsers.CalciteSqlParser;
 import org.apache.pinot.sql.parsers.SqlNodeAndOptions;
 import org.apache.pinot.sql.parsers.parser.SqlPhysicalExplain;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
@@ -79,6 +81,7 @@ import org.apache.pinot.sql.parsers.parser.SqlPhysicalExplain;
  * <p>It provide the higher level entry interface to convert a SQL string into 
a {@link DispatchableSubPlan}.
  */
 public class QueryEnvironment {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(QueryEnvironment.class);
   private static final CalciteConnectionConfig CONNECTION_CONFIG;
 
   static {
@@ -200,6 +203,24 @@ public class QueryEnvironment {
     }
   }
 
+  /**
+   * Returns whether the query can be successfully compiled in this query 
environment
+   */
+  public boolean canCompileQuery(String query) {
+    try (PlannerContext plannerContext = getPlannerContext()) {
+      SqlNode sqlNode = 
CalciteSqlParser.compileToSqlNodeAndOptions(query).getSqlNode();
+      if (sqlNode.getKind().equals(SqlKind.EXPLAIN)) {
+        sqlNode = ((SqlExplain) sqlNode).getExplicandum();
+      }
+      compileQuery(sqlNode, plannerContext);
+      LOGGER.debug("Successfully compiled query using the multi-stage query 
engine: `{}`", query);
+      return true;
+    } catch (Exception e) {
+      LOGGER.warn("Encountered an error while compiling query `{}` using the 
multi-stage query engine", query, e);
+      return false;
+    }
+  }
+
   /**
    * Results of planning a query
    */
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/utils/ParserUtils.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/utils/ParserUtils.java
index f914e6ba8d..cbee4e7117 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/utils/ParserUtils.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/utils/ParserUtils.java
@@ -35,15 +35,11 @@ public class ParserUtils {
    */
   public static boolean canCompileWithMultiStageEngine(String query, String 
database, TableCache tableCache) {
     // try to parse and compile the query with the Calcite planner used by the 
multi-stage query engine
-    try {
-      LOGGER.info("Trying to compile query `{}` using the multi-stage query 
engine", query);
-      QueryEnvironment queryEnvironment = new QueryEnvironment(database, 
tableCache, null);
-      queryEnvironment.getTableNamesForQuery(query);
-      LOGGER.info("Successfully compiled query using the multi-stage query 
engine: `{}`", query);
-      return true;
-    } catch (Exception e) {
-      LOGGER.error("Encountered an error while compiling query `{}` using the 
multi-stage query engine", query, e);
-      return false;
-    }
+    long compileStartTime = System.currentTimeMillis();
+    LOGGER.debug("Trying to compile query `{}` using the multi-stage query 
engine", query);
+    QueryEnvironment queryEnvironment = new QueryEnvironment(database, 
tableCache, null);
+    boolean canCompile = queryEnvironment.canCompileQuery(query);
+    LOGGER.debug("Multi-stage query compilation time = {}ms", 
System.currentTimeMillis() - compileStartTime);
+    return canCompile;
   }
 }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 6abdec73e0..a069031b69 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -348,6 +348,14 @@ public class CommonConstants {
     public static final String CONFIG_OF_NEW_SEGMENT_EXPIRATION_SECONDS = 
"pinot.broker.new.segment.expiration.seconds";
     public static final long DEFAULT_VALUE_OF_NEW_SEGMENT_EXPIRATION_SECONDS = 
TimeUnit.MINUTES.toSeconds(5);
 
+    // If this config is set to true, the broker will check every query 
executed using the v1 query engine and attempt
+    // to determine whether the query could have successfully been run on the 
v2 / multi-stage query engine. If not,
+    // a counter metric will be incremented - if this counter remains 0 during 
regular query workload execution, it
+    // signals that users can potentially migrate their query workload to the 
multistage query engine.
+    public static final String 
CONFIG_OF_BROKER_ENABLE_MULTISTAGE_MIGRATION_METRIC
+        = "pinot.broker.enable.multistage.migration.metric";
+    public static final boolean DEFAULT_ENABLE_MULTISTAGE_MIGRATION_METRIC = 
false;
+
     public static class Request {
       public static final String SQL = "sql";
       public static final String TRACE = "trace";


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to