This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new de577bc457 Add broker config to set default query null handling behavior (#13977) de577bc457 is described below commit de577bc457b580e89ecb4e82076ce09f209bca18 Author: Yash Mayya <yash.ma...@gmail.com> AuthorDate: Fri Sep 13 10:32:41 2024 +0530 Add broker config to set default query null handling behavior (#13977) --- .../requesthandler/BaseBrokerRequestHandler.java | 32 +++++++-- .../BaseSingleStageBrokerRequestHandler.java | 36 +++++----- .../MultiStageBrokerRequestHandler.java | 27 ++------ .../tests/NullHandlingIntegrationTest.java | 81 ++++++++++------------ .../apache/pinot/spi/utils/CommonConstants.java | 1 + 5 files changed, 89 insertions(+), 88 deletions(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java index b43dcd9763..406d3d032a 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java @@ -42,7 +42,9 @@ import org.apache.pinot.common.exception.QueryException; import org.apache.pinot.common.metrics.BrokerMeter; import org.apache.pinot.common.metrics.BrokerMetrics; import org.apache.pinot.common.response.BrokerResponse; +import org.apache.pinot.common.response.broker.BrokerResponseNative; import org.apache.pinot.common.response.broker.QueryProcessingException; +import org.apache.pinot.common.utils.request.RequestUtils; import org.apache.pinot.spi.auth.AuthorizationResult; import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.eventlistener.query.BrokerQueryEventListener; @@ -67,6 +69,8 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { protected final BrokerRequestIdGenerator _requestIdGenerator; protected final long _brokerTimeoutMs; protected final QueryLogger _queryLogger; + @Nullable + protected final String _enableNullHandling; public BaseBrokerRequestHandler(PinotConfiguration config, String brokerId, BrokerRoutingManager routingManager, AccessControlFactory accessControlFactory, QueryQuotaManager queryQuotaManager, TableCache tableCache) { @@ -82,6 +86,7 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { _requestIdGenerator = new BrokerRequestIdGenerator(brokerId); _brokerTimeoutMs = config.getProperty(Broker.CONFIG_OF_BROKER_TIMEOUT_MS, Broker.DEFAULT_BROKER_TIMEOUT_MS); _queryLogger = new QueryLogger(config); + _enableNullHandling = config.getProperty(Broker.CONFIG_OF_BROKER_QUERY_ENABLE_NULL_HANDLING); } @Override @@ -116,8 +121,7 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { if (StringUtils.isNotBlank(failureMessage)) { failureMessage = "Reason: " + failureMessage; } - throw new WebApplicationException("Permission denied." + failureMessage, - Response.Status.FORBIDDEN); + throw new WebApplicationException("Permission denied." + failureMessage, Response.Status.FORBIDDEN); } JsonNode sql = request.get(Broker.Request.SQL); @@ -129,6 +133,24 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { String query = sql.textValue(); requestContext.setQuery(query); + + // Parse the query if needed + if (sqlNodeAndOptions == null) { + try { + sqlNodeAndOptions = RequestUtils.parseQuery(query, request); + } catch (Exception e) { + // Do not log or emit metric here because it is pure user error + requestContext.setErrorCode(QueryException.SQL_PARSING_ERROR_CODE); + return new BrokerResponseNative(QueryException.getException(QueryException.SQL_PARSING_ERROR, e)); + } + } + + // Add null handling option from broker config only if there is no override in the query + if (_enableNullHandling != null) { + sqlNodeAndOptions.getOptions() + .putIfAbsent(Broker.Request.QueryOptionKey.ENABLE_NULL_HANDLING, _enableNullHandling); + } + BrokerResponse brokerResponse = handleRequest(requestId, query, sqlNodeAndOptions, request, requesterIdentity, requestContext, httpHeaders, accessControl); @@ -139,9 +161,9 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { return brokerResponse; } - protected abstract BrokerResponse handleRequest(long requestId, String query, - @Nullable SqlNodeAndOptions sqlNodeAndOptions, JsonNode request, @Nullable RequesterIdentity requesterIdentity, - RequestContext requestContext, @Nullable HttpHeaders httpHeaders, AccessControl accessControl) + protected abstract BrokerResponse handleRequest(long requestId, String query, SqlNodeAndOptions sqlNodeAndOptions, + JsonNode request, @Nullable RequesterIdentity requesterIdentity, RequestContext requestContext, + @Nullable HttpHeaders httpHeaders, AccessControl accessControl) throws Exception; protected static void augmentStatistics(RequestContext statistics, BrokerResponse response) { diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java index 83f83188dc..c9862d1af0 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java @@ -277,7 +277,7 @@ public abstract class BaseSingleStageBrokerRequestHandler extends BaseBrokerRequ } @Override - protected BrokerResponse handleRequest(long requestId, String query, @Nullable SqlNodeAndOptions sqlNodeAndOptions, + protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndOptions sqlNodeAndOptions, JsonNode request, @Nullable RequesterIdentity requesterIdentity, RequestContext requestContext, @Nullable HttpHeaders httpHeaders, AccessControl accessControl) throws Exception { @@ -287,17 +287,6 @@ public abstract class BaseSingleStageBrokerRequestHandler extends BaseBrokerRequ Tracing.ThreadAccountantOps.setupRunner(String.valueOf(requestId)); try { - // Parse the query if needed - if (sqlNodeAndOptions == null) { - try { - sqlNodeAndOptions = RequestUtils.parseQuery(query, request); - } catch (Exception e) { - // Do not log or emit metric here because it is pure user error - requestContext.setErrorCode(QueryException.SQL_PARSING_ERROR_CODE); - return new BrokerResponseNative(QueryException.getException(QueryException.SQL_PARSING_ERROR, e)); - } - } - // Compile the request into PinotQuery long compilationStartTimeNs = System.nanoTime(); PinotQuery pinotQuery; @@ -308,8 +297,7 @@ public abstract class BaseSingleStageBrokerRequestHandler extends BaseBrokerRequ _brokerMetrics.addMeteredGlobalValue(BrokerMeter.REQUEST_COMPILATION_EXCEPTIONS, 1); requestContext.setErrorCode(QueryException.SQL_PARSING_ERROR_CODE); // Check if the query is a v2 supported query - Map<String, String> queryOptions = sqlNodeAndOptions.getOptions(); - String database = DatabaseUtils.extractDatabaseFromQueryRequest(queryOptions, httpHeaders); + String database = DatabaseUtils.extractDatabaseFromQueryRequest(sqlNodeAndOptions.getOptions(), httpHeaders); if (ParserUtils.canCompileWithMultiStageEngine(query, database, _tableCache)) { return new BrokerResponseNative(QueryException.getException(QueryException.SQL_PARSING_ERROR, new Exception( "It seems that the query is only supported by the multi-stage query engine, please retry the query using " @@ -988,9 +976,25 @@ public abstract class BaseSingleStageBrokerRequestHandler extends BaseBrokerRequ Literal subqueryLiteral = operands.get(1).getLiteral(); Preconditions.checkState(subqueryLiteral != null, "Second argument of IN_SUBQUERY must be a literal (subquery)"); String subquery = subqueryLiteral.getStringValue(); + + SqlNodeAndOptions sqlNodeAndOptions; + try { + sqlNodeAndOptions = RequestUtils.parseQuery(subquery, jsonRequest); + } catch (Exception e) { + // Do not log or emit metric here because it is pure user error + requestContext.setErrorCode(QueryException.SQL_PARSING_ERROR_CODE); + throw new RuntimeException("Failed to parse subquery: " + subquery, e); + } + + // Add null handling option from broker config only if there is no override in the query + if (_enableNullHandling != null) { + sqlNodeAndOptions.getOptions() + .putIfAbsent(Broker.Request.QueryOptionKey.ENABLE_NULL_HANDLING, _enableNullHandling); + } + BrokerResponse response = - handleRequest(requestId, subquery, null, jsonRequest, requesterIdentity, requestContext, httpHeaders, - accessControl); + handleRequest(requestId, subquery, sqlNodeAndOptions, jsonRequest, requesterIdentity, requestContext, + httpHeaders, accessControl); if (response.getExceptionsSize() != 0) { throw new RuntimeException("Caught exception while executing subquery: " + subquery); } diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java index fad63d18e2..d5791d1661 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java @@ -51,7 +51,6 @@ import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.common.utils.DatabaseUtils; import org.apache.pinot.common.utils.ExceptionUtils; import org.apache.pinot.common.utils.config.QueryOptionsUtils; -import org.apache.pinot.common.utils.request.RequestUtils; import org.apache.pinot.core.auth.Actions; import org.apache.pinot.core.auth.TargetType; import org.apache.pinot.query.QueryEnvironment; @@ -110,24 +109,14 @@ public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler { } @Override - protected BrokerResponse handleRequest(long requestId, String query, @Nullable SqlNodeAndOptions sqlNodeAndOptions, + protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndOptions sqlNodeAndOptions, JsonNode request, @Nullable RequesterIdentity requesterIdentity, RequestContext requestContext, HttpHeaders httpHeaders, AccessControl accessControl) { LOGGER.debug("SQL query for request {}: {}", requestId, query); - // Parse the query if needed - if (sqlNodeAndOptions == null) { - try { - sqlNodeAndOptions = RequestUtils.parseQuery(query, request); - } catch (Exception e) { - // Do not log or emit metric here because it is pure user error - requestContext.setErrorCode(QueryException.SQL_PARSING_ERROR_CODE); - return new BrokerResponseNative(QueryException.getException(QueryException.SQL_PARSING_ERROR, e)); - } - } - // Compile the request Map<String, String> queryOptions = sqlNodeAndOptions.getOptions(); + long compilationStartTimeNs = System.nanoTime(); long queryTimeoutMs; QueryEnvironment.QueryPlannerResult queryPlanResult; @@ -149,8 +138,7 @@ public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler { if (StringUtils.isNotBlank(failureMessage)) { failureMessage = "Reason: " + failureMessage; } - throw new WebApplicationException("Permission denied. " + failureMessage, - Response.Status.FORBIDDEN); + throw new WebApplicationException("Permission denied. " + failureMessage, Response.Status.FORBIDDEN); } return constructMultistageExplainPlan(query, plan); case SELECT: @@ -202,8 +190,7 @@ public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler { if (StringUtils.isNotBlank(failureMessage)) { failureMessage = "Reason: " + failureMessage; } - throw new WebApplicationException("Permission denied." + failureMessage, - Response.Status.FORBIDDEN); + throw new WebApplicationException("Permission denied." + failureMessage, Response.Status.FORBIDDEN); } // Validate QPS quota @@ -291,10 +278,8 @@ public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler { } } catch (Exception e) { LOGGER.warn("Error encountered while collecting multi-stage stats", e); - brokerResponse.setStageStats(JsonNodeFactory.instance.objectNode().put( - "error", - "Error encountered while collecting multi-stage stats - " + e) - ); + brokerResponse.setStageStats(JsonNodeFactory.instance.objectNode() + .put("error", "Error encountered while collecting multi-stage stats - " + e)); } } diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/NullHandlingIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/NullHandlingIntegrationTest.java index cdf0af03bd..765c55cc7a 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/NullHandlingIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/NullHandlingIntegrationTest.java @@ -23,6 +23,8 @@ import java.io.File; import java.util.List; import javax.annotation.Nullable; import org.apache.commons.io.FileUtils; +import org.apache.pinot.spi.env.PinotConfiguration; +import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.util.TestUtils; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; @@ -198,11 +200,11 @@ public class NullHandlingIntegrationTest extends BaseClusterIntegrationTestSet { public void testTotalCountWithNullHandlingQueryOptionEnabled(boolean useMultiStageQueryEngine) throws Exception { setUseMultiStageQueryEngine(useMultiStageQueryEngine); - String pinotQuery = "SELECT COUNT(*) FROM " + getTableName() + " option(enableNullHandling=true)"; + String pinotQuery = "SELECT COUNT(*) FROM " + getTableName(); String h2Query = "SELECT COUNT(*) FROM " + getTableName(); testQuery(pinotQuery, h2Query); - pinotQuery = "SELECT COUNT(1) FROM " + getTableName() + " option(enableNullHandling=true)"; + pinotQuery = "SELECT COUNT(1) FROM " + getTableName(); h2Query = "SELECT COUNT(1) FROM " + getTableName(); testQuery(pinotQuery, h2Query); } @@ -256,41 +258,34 @@ public class NullHandlingIntegrationTest extends BaseClusterIntegrationTestSet { public void testOrderByByNullableKeepsOtherColNulls() throws Exception { setUseMultiStageQueryEngine(false); - String h2Query = "select salary from mytable" + String query = "select salary from mytable" + " where salary is null" + " order by description"; - String pinotQuery = h2Query + " option(enableNullHandling=true)"; - testQuery(pinotQuery, h2Query); + testQuery(query); } @Test(dataProvider = "useBothQueryEngines") public void testOrderByNullsFirst(boolean useMultiStageQueryEngine) throws Exception { setUseMultiStageQueryEngine(useMultiStageQueryEngine); - String h2Query = "SELECT salary FROM " + getTableName() + " ORDER BY salary NULLS FIRST"; - String pinotQuery = h2Query + " option(enableNullHandling=true)"; - - testQuery(pinotQuery, h2Query); + String query = "SELECT salary FROM " + getTableName() + " ORDER BY salary NULLS FIRST"; + testQuery(query); } @Test(dataProvider = "useBothQueryEngines") public void testOrderByNullsLast(boolean useMultiStageQueryEngine) throws Exception { setUseMultiStageQueryEngine(useMultiStageQueryEngine); - String h2Query = "SELECT salary FROM " + getTableName() + " ORDER BY salary DESC NULLS LAST"; - String pinotQuery = h2Query + " option(enableNullHandling=true)"; - - testQuery(pinotQuery, h2Query); + String query = "SELECT salary FROM " + getTableName() + " ORDER BY salary DESC NULLS LAST"; + testQuery(query); } @Test(dataProvider = "useBothQueryEngines") public void testDistinctOrderByNullsLast(boolean useMultiStageQueryEngine) throws Exception { setUseMultiStageQueryEngine(useMultiStageQueryEngine); - String h2Query = "SELECT distinct salary FROM " + getTableName() + " ORDER BY salary DESC NULLS LAST"; - String pinotQuery = h2Query + " option(enableNullHandling=true)"; - - testQuery(pinotQuery, h2Query); + String query = "SELECT distinct salary FROM " + getTableName() + " ORDER BY salary DESC NULLS LAST"; + testQuery(query); } @Test(dataProvider = "useBothQueryEngines") @@ -299,7 +294,7 @@ public class NullHandlingIntegrationTest extends BaseClusterIntegrationTestSet { setUseMultiStageQueryEngine(useMultiStageQueryEngine); // Need to also select an identifier column to skip the all literal query optimization which returns without // querying the segment. - String sqlQuery = "SELECT NULL, salary FROM mytable OPTION(enableNullHandling=true)"; + String sqlQuery = "SELECT NULL, salary FROM mytable"; JsonNode response = postQuery(sqlQuery); @@ -311,45 +306,39 @@ public class NullHandlingIntegrationTest extends BaseClusterIntegrationTestSet { public void testCaseWhenAllLiteral(boolean useMultiStageQueryEngine) throws Exception { setUseMultiStageQueryEngine(useMultiStageQueryEngine); - String sqlQuery = - "SELECT CASE WHEN true THEN 1 WHEN NOT true THEN 0 ELSE NULL END FROM mytable OPTION(enableNullHandling=true)"; - + String sqlQuery = "SELECT CASE WHEN true THEN 1 WHEN NOT true THEN 0 ELSE NULL END FROM mytable"; JsonNode response = postQuery(sqlQuery); - assertEquals(response.get("resultTable").get("rows").get(0).get(0).asInt(), 1); } + @Override + protected void overrideBrokerConf(PinotConfiguration brokerConf) { + brokerConf.setProperty(CommonConstants.Broker.CONFIG_OF_BROKER_QUERY_ENABLE_NULL_HANDLING, "true"); + } + @DataProvider(name = "nullLiteralQueries") public Object[][] nullLiteralQueries() { // Query string, expected value return new Object[][]{ // Null literal only - {String.format("SELECT null FROM %s OPTION(enableNullHandling=true)", getTableName()), "null"}, + {String.format("SELECT null FROM %s", getTableName()), "null"}, // Null related functions - {String.format("SELECT isNull(null) FROM %s OPTION (enableNullHandling=true)", getTableName()), true}, - {String.format("SELECT isNotNull(null) FROM %s OPTION (enableNullHandling=true)", getTableName()), false}, - {String.format("SELECT coalesce(null, 1) FROM %s OPTION (enableNullHandling=true)", getTableName()), 1}, - {String.format("SELECT coalesce(null, null) FROM %s OPTION (enableNullHandling=true)", getTableName()), "null"}, - {String.format("SELECT isDistinctFrom(null, null) FROM %s OPTION (enableNullHandling=true)", getTableName()), - false}, - {String.format("SELECT isNotDistinctFrom(null, null) FROM %s OPTION (enableNullHandling=true)", getTableName()), - true}, - {String.format("SELECT isDistinctFrom(null, 1) FROM %s OPTION (enableNullHandling=true)", getTableName()), - true}, - {String.format("SELECT isNotDistinctFrom(null, 1) FROM %s OPTION (enableNullHandling=true)", getTableName()), - false}, - {String.format("SELECT case when true then null end FROM %s OPTION (enableNullHandling=true)", getTableName()), - "null"}, - {String.format("SELECT case when false then 1 end FROM %s OPTION (enableNullHandling=true)", getTableName()), - "null"}, + {String.format("SELECT isNull(null) FROM %s", getTableName()), true}, + {String.format("SELECT isNotNull(null) FROM %s", getTableName()), false}, + {String.format("SELECT coalesce(null, 1) FROM %s", getTableName()), 1}, + {String.format("SELECT coalesce(null, null) FROM %s", getTableName()), "null"}, + {String.format("SELECT isDistinctFrom(null, null) FROM %s", getTableName()), false}, + {String.format("SELECT isNotDistinctFrom(null, null) FROM %s", getTableName()), true}, + {String.format("SELECT isDistinctFrom(null, 1) FROM %s", getTableName()), true}, + {String.format("SELECT isNotDistinctFrom(null, 1) FROM %s", getTableName()), false}, + {String.format("SELECT case when true then null end FROM %s", getTableName()), "null"}, + {String.format("SELECT case when false then 1 end FROM %s", getTableName()), "null"}, // Null intolerant functions - {String.format("SELECT add(null, 1) FROM %s OPTION (enableNullHandling=true)", getTableName()), "null"}, - {String.format("SELECT greater_than(null, 1) FROM %s OPTION (enableNullHandling=true)", getTableName()), - "null"}, - {String.format("SELECT to_epoch_seconds(null) FROM %s OPTION (enableNullHandling=true)", getTableName()), - "null"}, - {String.format("SELECT not(null) FROM %s OPTION (enableNullHandling=true)", getTableName()), "null"}, - {String.format("SELECT tan(null) FROM %s OPTION (enableNullHandling=true)", getTableName()), "null"} + {String.format("SELECT add(null, 1) FROM %s", getTableName()), "null"}, + {String.format("SELECT greater_than(null, 1) FROM %s", getTableName()), "null"}, + {String.format("SELECT to_epoch_seconds(null) FROM %s", getTableName()), "null"}, + {String.format("SELECT not(null) FROM %s", getTableName()), "null"}, + {String.format("SELECT tan(null) FROM %s", getTableName()), "null"} }; } } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java index 466718180d..58905ba71f 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java @@ -248,6 +248,7 @@ public class CommonConstants { public static final int DEFAULT_BROKER_QUERY_LOG_LENGTH = Integer.MAX_VALUE; public static final String CONFIG_OF_BROKER_QUERY_LOG_MAX_RATE_PER_SECOND = "pinot.broker.query.log.maxRatePerSecond"; + public static final String CONFIG_OF_BROKER_QUERY_ENABLE_NULL_HANDLING = "pinot.broker.query.enable.null.handling"; public static final String CONFIG_OF_BROKER_ENABLE_QUERY_CANCELLATION = "pinot.broker.enable.query.cancellation"; public static final double DEFAULT_BROKER_QUERY_LOG_MAX_RATE_PER_SECOND = 10_000d; public static final String CONFIG_OF_BROKER_TIMEOUT_MS = "pinot.broker.timeoutMs"; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org