This is an automated email from the ASF dual-hosted git repository. gortiz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 835949b645 Automatically detect whether a v1 query could have run on the v2 query engine (#13628) 835949b645 is described below commit 835949b64529b8e5c0bb49474cdec2e17c9050f7 Author: Yash Mayya <yash.ma...@gmail.com> AuthorDate: Tue Jul 30 15:20:22 2024 +0530 Automatically detect whether a v1 query could have run on the v2 query engine (#13628) --- .../BaseSingleStageBrokerRequestHandler.java | 59 +++++++++ .../requesthandler/GrpcBrokerRequestHandler.java | 2 + .../SingleConnectionBrokerRequestHandler.java | 2 + .../apache/pinot/common/metrics/BrokerMeter.java | 12 +- .../tests/JmxMetricsIntegrationTest.java | 145 +++++++++++++++++++++ .../org/apache/pinot/query/QueryEnvironment.java | 21 +++ .../pinot/query/parser/utils/ParserUtils.java | 16 +-- .../apache/pinot/spi/utils/CommonConstants.java | 8 ++ 8 files changed, 254 insertions(+), 11 deletions(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java index 27e5c19cac..6a839d5e36 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java @@ -29,9 +29,13 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletionService; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; @@ -138,6 +142,9 @@ public abstract class BaseSingleStageBrokerRequestHandler extends BaseBrokerRequ protected final boolean _enableDistinctCountBitmapOverride; protected final int _queryResponseLimit; protected final Map<Long, QueryServers> _queriesById; + protected final boolean _enableMultistageMigrationMetric; + protected ExecutorService _multistageCompileExecutor; + protected BlockingQueue<Pair<String, String>> _multistageCompileQueryQueue; public BaseSingleStageBrokerRequestHandler(PinotConfiguration config, String brokerId, BrokerRoutingManager routingManager, AccessControlFactory accessControlFactory, @@ -155,12 +162,53 @@ public abstract class BaseSingleStageBrokerRequestHandler extends BaseBrokerRequ boolean enableQueryCancellation = Boolean.parseBoolean(config.getProperty(Broker.CONFIG_OF_BROKER_ENABLE_QUERY_CANCELLATION)); _queriesById = enableQueryCancellation ? new ConcurrentHashMap<>() : null; + + _enableMultistageMigrationMetric = _config.getProperty(Broker.CONFIG_OF_BROKER_ENABLE_MULTISTAGE_MIGRATION_METRIC, + Broker.DEFAULT_ENABLE_MULTISTAGE_MIGRATION_METRIC); + if (_enableMultistageMigrationMetric) { + _multistageCompileExecutor = Executors.newSingleThreadExecutor(); + _multistageCompileQueryQueue = new LinkedBlockingQueue<>(1000); + } + LOGGER.info("Initialized {} with broker id: {}, timeout: {}ms, query response limit: {}, query log max length: {}, " + "query log max rate: {}, query cancellation enabled: {}", getClass().getSimpleName(), _brokerId, _brokerTimeoutMs, _queryResponseLimit, _queryLogger.getMaxQueryLengthToLog(), _queryLogger.getLogRateLimit(), enableQueryCancellation); } + @Override + public void start() { + if (_enableMultistageMigrationMetric) { + _multistageCompileExecutor.submit(() -> { + while (!Thread.currentThread().isInterrupted()) { + Pair<String, String> query; + try { + query = _multistageCompileQueryQueue.take(); + } catch (InterruptedException e) { + // Exit gracefully when the thread is interrupted, presumably when this single thread executor is shutdown. + // Since this task is all that this single thread is doing, there's no need to preserve the thread's + // interrupt status flag. + return; + } + String queryString = query.getLeft(); + String database = query.getRight(); + + // Check if the query is a v2 supported query + if (!ParserUtils.canCompileWithMultiStageEngine(queryString, database, _tableCache)) { + _brokerMetrics.addMeteredGlobalValue(BrokerMeter.SINGLE_STAGE_QUERIES_INVALID_MULTI_STAGE, 1); + } + } + }); + } + } + + @Override + public void shutDown() { + if (_enableMultistageMigrationMetric) { + _multistageCompileExecutor.shutdownNow(); + } + } + @Override public Map<Long, String> getRunningQueries() { Preconditions.checkState(_queriesById != null, "Query cancellation is not enabled on broker"); @@ -478,8 +526,19 @@ public abstract class BaseSingleStageBrokerRequestHandler extends BaseBrokerRequ } _brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.QUERIES, 1); + _brokerMetrics.addMeteredGlobalValue(BrokerMeter.QUERIES_GLOBAL, 1); _brokerMetrics.addValueToTableGauge(rawTableName, BrokerGauge.REQUEST_SIZE, query.length()); + if (!pinotQuery.isExplain() && _enableMultistageMigrationMetric) { + // Check if the query is a v2 supported query + String database = DatabaseUtils.extractDatabaseFromQueryRequest(sqlNodeAndOptions.getOptions(), httpHeaders); + // Attempt to add the query to the compile queue; drop if queue is full + if (!_multistageCompileQueryQueue.offer(Pair.of(query, database))) { + LOGGER.trace("Not compiling query `{}` using the multi-stage query engine because the query queue is full", + query); + } + } + // Prepare OFFLINE and REALTIME requests BrokerRequest offlineBrokerRequest = null; BrokerRequest realtimeBrokerRequest = null; diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java index 96e06ffec5..0484476a41 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java @@ -63,10 +63,12 @@ public class GrpcBrokerRequestHandler extends BaseSingleStageBrokerRequestHandle @Override public void start() { + super.start(); } @Override public void shutDown() { + super.shutDown(); _streamingQueryClient.shutdown(); _streamingReduceService.shutDown(); } diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java index a51634c983..349affe6c8 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java @@ -82,12 +82,14 @@ public class SingleConnectionBrokerRequestHandler extends BaseSingleStageBrokerR @Override public void start() { + super.start(); _failureDetector.register(this); _failureDetector.start(); } @Override public void shutDown() { + super.shutDown(); _failureDetector.stop(); _queryRouter.shutDown(); _brokerReduceService.shutDown(); diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java index d6b7af9840..086d5ffd9e 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java @@ -35,6 +35,13 @@ public enum BrokerMeter implements AbstractMetrics.Meter { * At this moment this counter does not include queries executed in multi-stage mode. */ QUERIES("queries", false), + /** + * Number of single-stage queries that have been started. + * <p> + * Unlike {@link #QUERIES}, this metric is global and not attached to a particular table. + * That means it can be used to know how many single-stage queries have been started in total. + */ + QUERIES_GLOBAL("queries", true), /** * Number of multi-stage queries that have been started. * <p> @@ -49,7 +56,10 @@ public enum BrokerMeter implements AbstractMetrics.Meter { * sum of this metric across all tables should be greater or equal than {@link #MULTI_STAGE_QUERIES_GLOBAL}. */ MULTI_STAGE_QUERIES("queries", false), - + /** + * Number of single-stage queries executed that would not have successfully run on the multi-stage query engine as is. + */ + SINGLE_STAGE_QUERIES_INVALID_MULTI_STAGE("queries", true), // These metrics track the exceptions caught during query execution in broker side. // Query rejected by Jersey thread pool executor QUERY_REJECTED_EXCEPTIONS("exceptions", true), diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/JmxMetricsIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/JmxMetricsIntegrationTest.java new file mode 100644 index 0000000000..2e1873d3f8 --- /dev/null +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/JmxMetricsIntegrationTest.java @@ -0,0 +1,145 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.integration.tests; + +import com.fasterxml.jackson.databind.JsonNode; +import java.io.File; +import java.lang.management.ManagementFactory; +import java.util.Hashtable; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; +import javax.management.MBeanServer; +import javax.management.ObjectName; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.env.PinotConfiguration; +import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.pinot.util.TestUtils; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; + + +/** + * Tests that verify JMX metrics emitted by various Pinot components. + */ +public class JmxMetricsIntegrationTest extends BaseClusterIntegrationTestSet { + + private static final int NUM_BROKERS = 1; + private static final int NUM_SERVERS = 1; + + private static final MBeanServer MBEAN_SERVER = ManagementFactory.getPlatformMBeanServer(); + private static final String PINOT_JMX_METRICS_DOMAIN = "\"org.apache.pinot.common.metrics\""; + private static final String BROKER_METRICS_TYPE = "\"BrokerMetrics\""; + + @BeforeClass + public void setUp() throws Exception { + TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir); + + // Start the Pinot cluster + startZk(); + startController(); + startBrokers(NUM_BROKERS); + startServers(NUM_SERVERS); + + // Create and upload the schema and table config + Schema schema = createSchema(); + addSchema(schema); + TableConfig tableConfig = createOfflineTableConfig(); + addTableConfig(tableConfig); + + // Unpack the Avro files + List<File> avroFiles = unpackAvroData(_tempDir); + + // Create and upload segments + ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, tableConfig, schema, 0, _segmentDir, _tarDir); + uploadSegments(getTableName(), _tarDir); + + // Wait for all documents loaded + waitForAllDocsLoaded(600_000L); + } + + @Test + public void testMultiStageMigrationMetric() throws Exception { + ObjectName multiStageMigrationMetric = new ObjectName(PINOT_JMX_METRICS_DOMAIN, + new Hashtable<>(Map.of("type", BROKER_METRICS_TYPE, + "name", "\"pinot.broker.singleStageQueriesInvalidMultiStage\""))); + + ObjectName queriesGlobalMetric = new ObjectName(PINOT_JMX_METRICS_DOMAIN, + new Hashtable<>(Map.of("type", BROKER_METRICS_TYPE, + "name", "\"pinot.broker.queriesGlobal\""))); + + // Some queries are run during setup to ensure that all the docs are loaded + long initialQueryCount = (Long) MBEAN_SERVER.getAttribute(queriesGlobalMetric, "Count"); + assertTrue(initialQueryCount > 0L); + assertEquals((Long) MBEAN_SERVER.getAttribute(multiStageMigrationMetric, "Count"), 0L); + + postQuery("SELECT COUNT(*) FROM mytable"); + + // Run some queries that are known to not work as is with the multi-stage query engine + + // Type differences + // STRING is VARCHAR in v2 + JsonNode response = postQuery("SELECT CAST(ArrTime AS STRING) FROM mytable"); + assertFalse(response.get("resultTable").get("rows").isEmpty()); + // LONG is BIGINT in v2 + response = postQuery("SELECT CAST(ArrTime AS LONG) FROM mytable"); + assertFalse(response.get("resultTable").get("rows").isEmpty()); + // FLOAT_ARRAY is FLOAT ARRAY in v2 + response = postQuery("SELECT CAST(DivAirportIDs AS FLOAT_ARRAY) FROM mytable"); + assertFalse(response.get("resultTable").get("rows").isEmpty()); + + // MV column requires ARRAY_TO_MV wrapper to be used in filter predicates + response = postQuery("SELECT COUNT(*) FROM mytable WHERE DivAirports = 'JFK'"); + assertFalse(response.get("resultTable").get("rows").isEmpty()); + + // Unsupported function + response = postQuery("SELECT AirlineID, count(*) FROM mytable WHERE IN_SUBQUERY(airlineID, 'SELECT " + + "ID_SET(AirlineID) FROM mytable WHERE Carrier = ''AA''') = 1 GROUP BY AirlineID;"); + assertFalse(response.get("resultTable").get("rows").isEmpty()); + + // Repeated columns in an ORDER BY query + response = postQuery("SELECT AirTime, AirTime FROM mytable ORDER BY AirTime"); + assertFalse(response.get("resultTable").get("rows").isEmpty()); + + assertEquals((Long) MBEAN_SERVER.getAttribute(queriesGlobalMetric, "Count"), initialQueryCount + 8L); + + AtomicLong multiStageMigrationMetricValue = new AtomicLong(); + TestUtils.waitForCondition((aVoid) -> { + try { + multiStageMigrationMetricValue.set((Long) MBEAN_SERVER.getAttribute(multiStageMigrationMetric, "Count")); + return multiStageMigrationMetricValue.get() == 6L; + } catch (Exception e) { + throw new RuntimeException(e); + } + }, 5000, "Expected value of MBean 'pinot.broker.singleStageQueriesInvalidMultiStage' to be: " + + 6L + "; actual value: " + multiStageMigrationMetricValue.get()); + + assertEquals((Long) MBEAN_SERVER.getAttribute(multiStageMigrationMetric, "Count"), 6L); + } + + @Override + protected void overrideBrokerConf(PinotConfiguration brokerConf) { + brokerConf.setProperty(CommonConstants.Broker.CONFIG_OF_BROKER_ENABLE_MULTISTAGE_MIGRATION_METRIC, "true"); + } +} diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java index 1a54eec610..cdcfc2f173 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java @@ -71,6 +71,8 @@ import org.apache.pinot.query.validate.BytesCastVisitor; import org.apache.pinot.sql.parsers.CalciteSqlParser; import org.apache.pinot.sql.parsers.SqlNodeAndOptions; import org.apache.pinot.sql.parsers.parser.SqlPhysicalExplain; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** @@ -79,6 +81,7 @@ import org.apache.pinot.sql.parsers.parser.SqlPhysicalExplain; * <p>It provide the higher level entry interface to convert a SQL string into a {@link DispatchableSubPlan}. */ public class QueryEnvironment { + private static final Logger LOGGER = LoggerFactory.getLogger(QueryEnvironment.class); private static final CalciteConnectionConfig CONNECTION_CONFIG; static { @@ -200,6 +203,24 @@ public class QueryEnvironment { } } + /** + * Returns whether the query can be successfully compiled in this query environment + */ + public boolean canCompileQuery(String query) { + try (PlannerContext plannerContext = getPlannerContext()) { + SqlNode sqlNode = CalciteSqlParser.compileToSqlNodeAndOptions(query).getSqlNode(); + if (sqlNode.getKind().equals(SqlKind.EXPLAIN)) { + sqlNode = ((SqlExplain) sqlNode).getExplicandum(); + } + compileQuery(sqlNode, plannerContext); + LOGGER.debug("Successfully compiled query using the multi-stage query engine: `{}`", query); + return true; + } catch (Exception e) { + LOGGER.warn("Encountered an error while compiling query `{}` using the multi-stage query engine", query, e); + return false; + } + } + /** * Results of planning a query */ diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/utils/ParserUtils.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/utils/ParserUtils.java index f914e6ba8d..cbee4e7117 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/utils/ParserUtils.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/utils/ParserUtils.java @@ -35,15 +35,11 @@ public class ParserUtils { */ public static boolean canCompileWithMultiStageEngine(String query, String database, TableCache tableCache) { // try to parse and compile the query with the Calcite planner used by the multi-stage query engine - try { - LOGGER.info("Trying to compile query `{}` using the multi-stage query engine", query); - QueryEnvironment queryEnvironment = new QueryEnvironment(database, tableCache, null); - queryEnvironment.getTableNamesForQuery(query); - LOGGER.info("Successfully compiled query using the multi-stage query engine: `{}`", query); - return true; - } catch (Exception e) { - LOGGER.error("Encountered an error while compiling query `{}` using the multi-stage query engine", query, e); - return false; - } + long compileStartTime = System.currentTimeMillis(); + LOGGER.debug("Trying to compile query `{}` using the multi-stage query engine", query); + QueryEnvironment queryEnvironment = new QueryEnvironment(database, tableCache, null); + boolean canCompile = queryEnvironment.canCompileQuery(query); + LOGGER.debug("Multi-stage query compilation time = {}ms", System.currentTimeMillis() - compileStartTime); + return canCompile; } } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java index 6abdec73e0..a069031b69 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java @@ -348,6 +348,14 @@ public class CommonConstants { public static final String CONFIG_OF_NEW_SEGMENT_EXPIRATION_SECONDS = "pinot.broker.new.segment.expiration.seconds"; public static final long DEFAULT_VALUE_OF_NEW_SEGMENT_EXPIRATION_SECONDS = TimeUnit.MINUTES.toSeconds(5); + // If this config is set to true, the broker will check every query executed using the v1 query engine and attempt + // to determine whether the query could have successfully been run on the v2 / multi-stage query engine. If not, + // a counter metric will be incremented - if this counter remains 0 during regular query workload execution, it + // signals that users can potentially migrate their query workload to the multistage query engine. + public static final String CONFIG_OF_BROKER_ENABLE_MULTISTAGE_MIGRATION_METRIC + = "pinot.broker.enable.multistage.migration.metric"; + public static final boolean DEFAULT_ENABLE_MULTISTAGE_MIGRATION_METRIC = false; + public static class Request { public static final String SQL = "sql"; public static final String TRACE = "trace"; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org