This is an automated email from the ASF dual-hosted git repository.
gortiz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new d5ce73b0c6c Fix a basic null handling issue in MSE (#16658)
d5ce73b0c6c is described below
commit d5ce73b0c6cbfe97ff8320e384d88478acf7677b
Author: Gonzalo Ortiz Jaureguizar <[email protected]>
AuthorDate: Tue Aug 26 14:46:52 2025 +0200
Fix a basic null handling issue in MSE (#16658)
---
.../MultiStageBrokerRequestHandler.java | 1 +
.../tests/ExplainIntegrationTestTrait.java | 19 ++++-
.../tests/NullHandlingIntegrationTest.java | 80 +++++++++++++++++++++-
.../pinot/calcite/sql/fun/PinotOperatorTable.java | 40 +++++++++--
.../org/apache/pinot/query/QueryEnvironment.java | 19 ++++-
.../query/planner/logical/RexExpressionUtils.java | 25 +++++--
.../resources/queries/PhysicalOptimizerPlans.json | 2 +-
7 files changed, 173 insertions(+), 13 deletions(-)
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
index e0fed9d8138..5f38a5f97c0 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
@@ -419,6 +419,7 @@ public class MultiStageBrokerRequestHandler extends
BaseBrokerRequestHandler {
.tableCache(_tableCache)
.workerManager(_workerManager)
.isCaseSensitive(caseSensitive)
+
.isNullHandlingEnabled(QueryOptionsUtils.isNullHandlingEnabled(queryOptions))
.defaultInferPartitionHint(inferPartitionHint)
.defaultUseSpools(defaultUseSpool)
.defaultUseLeafServerForIntermediateStage(defaultUseLeafServerForIntermediateStage)
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ExplainIntegrationTestTrait.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ExplainIntegrationTestTrait.java
index a2c93c6e6eb..0a6d3cb203b 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ExplainIntegrationTestTrait.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ExplainIntegrationTestTrait.java
@@ -21,6 +21,7 @@ package org.apache.pinot.integration.tests;
import com.fasterxml.jackson.databind.JsonNode;
import java.util.Arrays;
import java.util.List;
+import java.util.Map;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.pinot.spi.utils.JsonUtils;
@@ -34,8 +35,18 @@ public interface ExplainIntegrationTestTrait {
throws Exception;
default void explainLogical(@Language("sql") String query, String expected) {
+ explainLogical(query, expected, Map.of());
+ }
+
+ default void explainLogical(@Language("sql") String query, String expected,
Map<String, String> queryOptions) {
try {
- JsonNode jsonNode = postQuery("explain plan without implementation for "
+ query);
+ String extraOptions = queryOptions.entrySet().stream()
+ .map(entry -> "SET " + entry.getKey() + "=" + entry.getValue() +
";\n")
+ .collect(Collectors.joining());
+ JsonNode jsonNode = postQuery(extraOptions + "explain plan without
implementation for " + query);
+ if (!jsonNode.get("exceptions").isEmpty()) {
+ Assert.fail("Exception in response: " +
jsonNode.get("exceptions").toString());
+ }
JsonNode plan = jsonNode.get("resultTable").get("rows").get(0).get(1);
Assert.assertEquals(plan.asText(), expected);
@@ -54,6 +65,9 @@ public interface ExplainIntegrationTestTrait {
actualQuery = "SET explainPlanVerbose=true; " + actualQuery;
}
JsonNode jsonNode = postQuery(actualQuery);
+ if (!jsonNode.get("exceptions").isEmpty()) {
+ Assert.fail("Exception in response: " +
jsonNode.get("exceptions").toString());
+ }
JsonNode plan = jsonNode.get("resultTable").get("rows");
List<String> planAsStrList = (List<String>)
JsonUtils.jsonNodeToObject(plan, List.class).stream()
.map(Object::toString)
@@ -92,6 +106,9 @@ public interface ExplainIntegrationTestTrait {
default void explain(@Language("sql") String query, String expected) {
try {
JsonNode jsonNode = postQuery("explain plan for " + query);
+ if (!jsonNode.get("exceptions").isEmpty()) {
+ Assert.fail("Exception in response: " +
jsonNode.get("exceptions").toString());
+ }
JsonNode plan = jsonNode.get("resultTable").get("rows").get(0).get(1);
Assert.assertEquals(plan.asText(), expected);
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/NullHandlingIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/NullHandlingIntegrationTest.java
index 9449676af69..ae8506d0cdc 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/NullHandlingIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/NullHandlingIntegrationTest.java
@@ -21,11 +21,13 @@ package org.apache.pinot.integration.tests;
import com.fasterxml.jackson.databind.JsonNode;
import java.io.File;
import java.util.List;
+import java.util.Map;
import javax.annotation.Nullable;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.util.TestUtils;
+import org.intellij.lang.annotations.Language;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
@@ -39,7 +41,8 @@ import static org.testng.Assert.assertTrue;
* Integration test that creates a Kafka broker, creates a Pinot cluster that
consumes from Kafka and queries Pinot.
* The data pushed to Kafka includes null values.
*/
-public class NullHandlingIntegrationTest extends BaseClusterIntegrationTestSet
{
+public class NullHandlingIntegrationTest extends BaseClusterIntegrationTestSet
+ implements ExplainIntegrationTestTrait {
@BeforeClass
public void setUp()
@@ -91,6 +94,11 @@ public class NullHandlingIntegrationTest extends
BaseClusterIntegrationTestSet {
FileUtils.deleteDirectory(_tempDir);
}
+ public JsonNode postQuery(@Language("sql") String query)
+ throws Exception {
+ return queryBrokerHttpEndpoint(query);
+ }
+
@Override
protected String getAvroTarFileName() {
return "avro_data_with_nulls.tar.gz";
@@ -351,6 +359,76 @@ public class NullHandlingIntegrationTest extends
BaseClusterIntegrationTestSet {
}
}
+ @Test
+ public void isNotNullAndComparisonWithoutNullHandling() {
+ setUseMultiStageQueryEngine(true);
+ String query = ""
+ + "SELECT 1 \n"
+ + "FROM " + getTableName() + " \n"
+ + "WHERE\n"
+ + " salary IS NOT NULL \n"
+ + "AND salary <> 0";
+
+ explainLogical(query,
+ "Execution Plan\n"
+ + "LogicalProject(EXPR$0=[1])\n"
+ + " LogicalFilter(condition=[AND(IS NOT NULL($7), <>($7, 0))])\n"
+ + " PinotLogicalTableScan(table=[[default, mytable]])\n",
+
Map.of(CommonConstants.Broker.Request.QueryOptionKey.ENABLE_NULL_HANDLING,
"false"));
+ }
+
+ @Test
+ public void isNotNullAndComparisonWithNullHandling() {
+ setUseMultiStageQueryEngine(true);
+ String query = ""
+ + "SELECT 1 \n"
+ + "FROM " + getTableName() + " \n"
+ + "WHERE \n"
+ + " salary IS NOT NULL "
+ + "AND salary <> 0";
+
+ explainLogical(query,
+ "Execution Plan\n"
+ + "LogicalProject(EXPR$0=[1])\n"
+ + " LogicalFilter(condition=[<>($7, 0)])\n"
+ + " PinotLogicalTableScan(table=[[default, mytable]])\n",
+
Map.of(CommonConstants.Broker.Request.QueryOptionKey.ENABLE_NULL_HANDLING,
"true"));
+ }
+
+ @Test
+ public void mseIsNullAndComparisonWithoutNullHandling() {
+ setUseMultiStageQueryEngine(true);
+ String query = ""
+ + "SELECT 1 \n"
+ + "FROM " + getTableName() + " \n"
+ + "WHERE\n"
+ + " salary IS NULL \n"
+ + "AND salary <> 0";
+
+ explainLogical(query,
+ "Execution Plan\n"
+ + "LogicalProject(EXPR$0=[1])\n"
+ + " LogicalFilter(condition=[AND(IS NULL($7), <>($7, 0))])\n"
+ + " PinotLogicalTableScan(table=[[default, mytable]])\n",
+
Map.of(CommonConstants.Broker.Request.QueryOptionKey.ENABLE_NULL_HANDLING,
"false"));
+ }
+
+ @Test
+ public void mseIsNullAndComparisonWithNullHandling() {
+ setUseMultiStageQueryEngine(true);
+ String query = ""
+ + "SELECT 1 \n"
+ + "FROM " + getTableName() + " \n"
+ + "WHERE \n"
+ + " salary IS NULL "
+ + "AND salary <> 0";
+
+ explainLogical(query,
+ "Execution Plan\n"
+ + "LogicalValues(tuples=[[]])\n",
+
Map.of(CommonConstants.Broker.Request.QueryOptionKey.ENABLE_NULL_HANDLING,
"true"));
+ }
+
@Override
protected void overrideBrokerConf(PinotConfiguration brokerConf) {
brokerConf.setProperty(CommonConstants.Broker.CONFIG_OF_BROKER_QUERY_ENABLE_NULL_HANDLING,
"true");
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/sql/fun/PinotOperatorTable.java
b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/sql/fun/PinotOperatorTable.java
index 2e380f8f94e..ef3ea9c78f7 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/sql/fun/PinotOperatorTable.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/sql/fun/PinotOperatorTable.java
@@ -32,6 +32,7 @@ import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlOperator;
import org.apache.calcite.sql.SqlOperatorTable;
+import org.apache.calcite.sql.SqlPostfixOperator;
import org.apache.calcite.sql.SqlSyntax;
import org.apache.calcite.sql.fun.SqlLeadLagAggFunction;
import org.apache.calcite.sql.fun.SqlMonotonicBinaryOperator;
@@ -67,10 +68,13 @@ import org.apache.pinot.segment.spi.AggregationFunctionType;
*/
@SuppressWarnings("unused") // unused fields are accessed by reflection
public class PinotOperatorTable implements SqlOperatorTable {
- private static final Supplier<PinotOperatorTable> INSTANCE =
Suppliers.memoize(PinotOperatorTable::new);
+ private static final Supplier<PinotOperatorTable> WITH_NULL_HANDLING =
+ Suppliers.memoize(() -> new PinotOperatorTable(true));
+ private static final Supplier<PinotOperatorTable> WITHOUT_NULL_HANDLING =
+ Suppliers.memoize(() -> new PinotOperatorTable(false));
- public static PinotOperatorTable instance() {
- return INSTANCE.get();
+ public static PinotOperatorTable instance(boolean nullHandlingEnabled) {
+ return nullHandlingEnabled ? WITH_NULL_HANDLING.get() :
WITHOUT_NULL_HANDLING.get();
}
// The standard Calcite + and - operators don't support operations on
TIMESTAMP types. However, Pinot supports these
@@ -97,6 +101,26 @@ public class PinotOperatorTable implements SqlOperatorTable
{
InferTypes.FIRST_KNOWN,
OperandTypes.MINUS_OPERATOR.or(OperandTypes.family(SqlTypeFamily.TIMESTAMP,
SqlTypeFamily.TIMESTAMP)));
+ // See the usage of this attribute to see why it is needed
+ public static final SqlPostfixOperator PINOT_IS_NOT_NULL =
+ new SqlPostfixOperator(
+ "IS NOT NULL",
+ SqlKind.OTHER_FUNCTION,
+ 28,
+ ReturnTypes.BOOLEAN_NOT_NULL,
+ InferTypes.VARCHAR_1024,
+ OperandTypes.ANY);
+
+ // See the usage of this attribute to see why it is needed
+ public static final SqlPostfixOperator PINOT_IS_NULL =
+ new SqlPostfixOperator(
+ "IS NULL",
+ SqlKind.OTHER_FUNCTION,
+ 28,
+ ReturnTypes.BOOLEAN_NOT_NULL,
+ InferTypes.VARCHAR_1024,
+ OperandTypes.ANY);
+
/**
* This list includes the supported standard {@link SqlOperator}s defined in
{@link SqlStdOperatorTable}.
* NOTE: The operator order follows the same order as defined in {@link
SqlStdOperatorTable} for easier search.
@@ -302,7 +326,7 @@ public class PinotOperatorTable implements SqlOperatorTable
{
private final Map<String, SqlOperator> _operatorMap;
private final List<SqlOperator> _operatorList;
- private PinotOperatorTable() {
+ private PinotOperatorTable(boolean nullHandlingEnabled) {
Map<String, SqlOperator> operatorMap = new HashMap<>();
// Register standard operators
@@ -315,6 +339,14 @@ public class PinotOperatorTable implements
SqlOperatorTable {
register(name, operator, operatorMap);
}
}
+ // When null handling is disabled, we need to use a fake IS NULL and IS
NOT NULL operators to skip some
+ // standard Calcite simplifications which are correct in SQL, but
incorrect for Pinot. For example, in SQL
+ // `col IS NOT NULL AND col <> 0` is simplified to `col <> 0` because `col
IS NOT NULL` is always true if
+ // `col <> 0` is true. However, in Pinot, `IS NOT NULL` has a special
meaning when using basic null handling
+ if (!nullHandlingEnabled) {
+ operatorMap.put(FunctionRegistry.canonicalize(PINOT_IS_NULL.getName()),
PINOT_IS_NULL);
+
operatorMap.put(FunctionRegistry.canonicalize(PINOT_IS_NOT_NULL.getName()),
PINOT_IS_NOT_NULL);
+ }
// Register Pinot operators
for (SqlOperator operator : PINOT_OPERATORS) {
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
index 225e1dc4f6a..407af2284fa 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
@@ -147,8 +147,12 @@ public class QueryEnvironment {
String database = config.getDatabase();
_catalog = new PinotCatalog(config.getTableCache(), database);
CalciteSchema rootSchema = CalciteSchema.createRootSchema(false, false,
database, _catalog);
- _config =
Frameworks.newConfigBuilder().traitDefs().operatorTable(PinotOperatorTable.instance())
-
.defaultSchema(rootSchema.plus()).sqlToRelConverterConfig(PinotRuleUtils.PINOT_SQL_TO_REL_CONFIG).build();
+ _config = Frameworks.newConfigBuilder()
+ .traitDefs()
+
.operatorTable(PinotOperatorTable.instance(config.isNullHandlingEnabled()))
+ .defaultSchema(rootSchema.plus())
+ .sqlToRelConverterConfig(PinotRuleUtils.PINOT_SQL_TO_REL_CONFIG)
+ .build();
_catalogReader = new PinotCatalogReader(
rootSchema, List.of(database), _typeFactory, CONNECTION_CONFIG,
config.isCaseSensitive());
// default optProgram with no skip rule options and no use rule options
@@ -156,11 +160,17 @@ public class QueryEnvironment {
}
public QueryEnvironment(String database, TableCache tableCache, @Nullable
WorkerManager workerManager) {
+ this(database, tableCache, workerManager, true);
+ }
+
+ public QueryEnvironment(String database, TableCache tableCache, @Nullable
WorkerManager workerManager,
+ boolean nullHandlingEnabled) {
this(configBuilder()
.requestId(-1L)
.database(database)
.tableCache(tableCache)
.workerManager(workerManager)
+ .isNullHandlingEnabled(nullHandlingEnabled)
.build());
}
@@ -647,6 +657,11 @@ public class QueryEnvironment {
@Nullable
TableCache getTableCache();
+ @Value.Default
+ default boolean isNullHandlingEnabled() {
+ return false;
+ }
+
/**
* Whether the schema should be considered case-insensitive.
*/
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RexExpressionUtils.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RexExpressionUtils.java
index afef239ff98..a3838af71ac 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RexExpressionUtils.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RexExpressionUtils.java
@@ -295,11 +295,28 @@ public class RexExpressionUtils {
private static String getFunctionName(SqlOperator operator) {
switch (operator.kind) {
- case OTHER:
+ case OTHER: {
// NOTE: SqlStdOperatorTable.CONCAT has OTHER kind and "||" as name
- return operator.getName().equals("||") ? "CONCAT" : operator.getName();
- case OTHER_FUNCTION:
- return operator.getName();
+ String name = operator.getName();
+ return name.equals("||") ? "CONCAT" : name;
+ }
+ case OTHER_FUNCTION: {
+ // See https://github.com/apache/pinot/pull/16658
+ // If null handling is disabled, functions `is null` and `is not null`
are registered as OTHER_FUNCTION with
+ // name "IS NULL" and "IS NOT NULL".
+ // They have to be registered with these names in order to be
recognized in the SQL parser, but at the same
+ // time, servers won't recognize function names with spaces. This is
why we convert them to "IS_NULL" and
+ // "IS_NOT_NULL" here to match the SqlKind name.
+ String name = operator.getName();
+ switch (name) {
+ case "IS NULL":
+ return SqlKind.IS_NULL.name();
+ case "IS NOT NULL":
+ return SqlKind.IS_NOT_NULL.name();
+ default:
+ return name;
+ }
+ }
default:
return operator.kind.name();
}
diff --git
a/pinot-query-planner/src/test/resources/queries/PhysicalOptimizerPlans.json
b/pinot-query-planner/src/test/resources/queries/PhysicalOptimizerPlans.json
index e716a543bb0..147761f22d8 100644
--- a/pinot-query-planner/src/test/resources/queries/PhysicalOptimizerPlans.json
+++ b/pinot-query-planner/src/test/resources/queries/PhysicalOptimizerPlans.json
@@ -23,7 +23,7 @@
},
{
"description": "Query that gets optimized to a Values node",
- "sql": "SET usePhysicalOptimizer=true; EXPLAIN PLAN FOR SELECT * FROM
a WHERE col1 IS NULL LIMIT 1",
+ "sql": "SET usePhysicalOptimizer=true; EXPLAIN PLAN FOR SELECT * FROM
a WHERE col1 <> col1 LIMIT 1",
"output": [
"Execution Plan",
"\nPhysicalSort(fetch=[1])",
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]