This is an automated email from the ASF dual-hosted git repository.

gortiz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new d5ce73b0c6c Fix a basic null handling issue in MSE (#16658)
d5ce73b0c6c is described below

commit d5ce73b0c6cbfe97ff8320e384d88478acf7677b
Author: Gonzalo Ortiz Jaureguizar <[email protected]>
AuthorDate: Tue Aug 26 14:46:52 2025 +0200

    Fix a basic null handling issue in MSE (#16658)
---
 .../MultiStageBrokerRequestHandler.java            |  1 +
 .../tests/ExplainIntegrationTestTrait.java         | 19 ++++-
 .../tests/NullHandlingIntegrationTest.java         | 80 +++++++++++++++++++++-
 .../pinot/calcite/sql/fun/PinotOperatorTable.java  | 40 +++++++++--
 .../org/apache/pinot/query/QueryEnvironment.java   | 19 ++++-
 .../query/planner/logical/RexExpressionUtils.java  | 25 +++++--
 .../resources/queries/PhysicalOptimizerPlans.json  |  2 +-
 7 files changed, 173 insertions(+), 13 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
index e0fed9d8138..5f38a5f97c0 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
@@ -419,6 +419,7 @@ public class MultiStageBrokerRequestHandler extends 
BaseBrokerRequestHandler {
         .tableCache(_tableCache)
         .workerManager(_workerManager)
         .isCaseSensitive(caseSensitive)
+        
.isNullHandlingEnabled(QueryOptionsUtils.isNullHandlingEnabled(queryOptions))
         .defaultInferPartitionHint(inferPartitionHint)
         .defaultUseSpools(defaultUseSpool)
         
.defaultUseLeafServerForIntermediateStage(defaultUseLeafServerForIntermediateStage)
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ExplainIntegrationTestTrait.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ExplainIntegrationTestTrait.java
index a2c93c6e6eb..0a6d3cb203b 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ExplainIntegrationTestTrait.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ExplainIntegrationTestTrait.java
@@ -21,6 +21,7 @@ package org.apache.pinot.integration.tests;
 import com.fasterxml.jackson.databind.JsonNode;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Map;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 import org.apache.pinot.spi.utils.JsonUtils;
@@ -34,8 +35,18 @@ public interface ExplainIntegrationTestTrait {
       throws Exception;
 
   default void explainLogical(@Language("sql") String query, String expected) {
+    explainLogical(query, expected, Map.of());
+  }
+
+  default void explainLogical(@Language("sql") String query, String expected, 
Map<String, String> queryOptions) {
     try {
-      JsonNode jsonNode = postQuery("explain plan without implementation for " 
+ query);
+      String extraOptions = queryOptions.entrySet().stream()
+          .map(entry -> "SET " + entry.getKey() + "=" + entry.getValue() + 
";\n")
+          .collect(Collectors.joining());
+      JsonNode jsonNode = postQuery(extraOptions + "explain plan without 
implementation for " + query);
+      if (!jsonNode.get("exceptions").isEmpty()) {
+        Assert.fail("Exception in response: " + 
jsonNode.get("exceptions").toString());
+      }
       JsonNode plan = jsonNode.get("resultTable").get("rows").get(0).get(1);
 
       Assert.assertEquals(plan.asText(), expected);
@@ -54,6 +65,9 @@ public interface ExplainIntegrationTestTrait {
         actualQuery = "SET explainPlanVerbose=true; " + actualQuery;
       }
       JsonNode jsonNode = postQuery(actualQuery);
+      if (!jsonNode.get("exceptions").isEmpty()) {
+        Assert.fail("Exception in response: " + 
jsonNode.get("exceptions").toString());
+      }
       JsonNode plan = jsonNode.get("resultTable").get("rows");
       List<String> planAsStrList = (List<String>) 
JsonUtils.jsonNodeToObject(plan, List.class).stream()
           .map(Object::toString)
@@ -92,6 +106,9 @@ public interface ExplainIntegrationTestTrait {
   default void explain(@Language("sql") String query, String expected) {
     try {
       JsonNode jsonNode = postQuery("explain plan for " + query);
+      if (!jsonNode.get("exceptions").isEmpty()) {
+        Assert.fail("Exception in response: " + 
jsonNode.get("exceptions").toString());
+      }
       JsonNode plan = jsonNode.get("resultTable").get("rows").get(0).get(1);
 
       Assert.assertEquals(plan.asText(), expected);
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/NullHandlingIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/NullHandlingIntegrationTest.java
index 9449676af69..ae8506d0cdc 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/NullHandlingIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/NullHandlingIntegrationTest.java
@@ -21,11 +21,13 @@ package org.apache.pinot.integration.tests;
 import com.fasterxml.jackson.databind.JsonNode;
 import java.io.File;
 import java.util.List;
+import java.util.Map;
 import javax.annotation.Nullable;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.util.TestUtils;
+import org.intellij.lang.annotations.Language;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.DataProvider;
@@ -39,7 +41,8 @@ import static org.testng.Assert.assertTrue;
  * Integration test that creates a Kafka broker, creates a Pinot cluster that 
consumes from Kafka and queries Pinot.
  * The data pushed to Kafka includes null values.
  */
-public class NullHandlingIntegrationTest extends BaseClusterIntegrationTestSet 
{
+public class NullHandlingIntegrationTest extends BaseClusterIntegrationTestSet
+    implements ExplainIntegrationTestTrait {
 
   @BeforeClass
   public void setUp()
@@ -91,6 +94,11 @@ public class NullHandlingIntegrationTest extends 
BaseClusterIntegrationTestSet {
     FileUtils.deleteDirectory(_tempDir);
   }
 
+  public JsonNode postQuery(@Language("sql") String query)
+      throws Exception {
+    return queryBrokerHttpEndpoint(query);
+  }
+
   @Override
   protected String getAvroTarFileName() {
     return "avro_data_with_nulls.tar.gz";
@@ -351,6 +359,76 @@ public class NullHandlingIntegrationTest extends 
BaseClusterIntegrationTestSet {
     }
   }
 
+  @Test
+  public void isNotNullAndComparisonWithoutNullHandling() {
+    setUseMultiStageQueryEngine(true);
+    String query = ""
+        + "SELECT 1 \n"
+        + "FROM " + getTableName() + " \n"
+        + "WHERE\n"
+        + "    salary IS NOT NULL \n"
+        + "AND salary <> 0";
+
+    explainLogical(query,
+        "Execution Plan\n"
+            + "LogicalProject(EXPR$0=[1])\n"
+            + "  LogicalFilter(condition=[AND(IS NOT NULL($7), <>($7, 0))])\n"
+            + "    PinotLogicalTableScan(table=[[default, mytable]])\n",
+        
Map.of(CommonConstants.Broker.Request.QueryOptionKey.ENABLE_NULL_HANDLING, 
"false"));
+  }
+
+  @Test
+  public void isNotNullAndComparisonWithNullHandling() {
+    setUseMultiStageQueryEngine(true);
+    String query = ""
+        + "SELECT 1 \n"
+        + "FROM " + getTableName() + " \n"
+        + "WHERE \n"
+        + "    salary IS NOT NULL "
+        + "AND salary <> 0";
+
+    explainLogical(query,
+        "Execution Plan\n"
+            + "LogicalProject(EXPR$0=[1])\n"
+            + "  LogicalFilter(condition=[<>($7, 0)])\n"
+            + "    PinotLogicalTableScan(table=[[default, mytable]])\n",
+        
Map.of(CommonConstants.Broker.Request.QueryOptionKey.ENABLE_NULL_HANDLING, 
"true"));
+  }
+
+  @Test
+  public void mseIsNullAndComparisonWithoutNullHandling() {
+    setUseMultiStageQueryEngine(true);
+    String query = ""
+        + "SELECT 1 \n"
+        + "FROM " + getTableName() + " \n"
+        + "WHERE\n"
+        + "    salary IS NULL \n"
+        + "AND salary <> 0";
+
+    explainLogical(query,
+        "Execution Plan\n"
+            + "LogicalProject(EXPR$0=[1])\n"
+            + "  LogicalFilter(condition=[AND(IS NULL($7), <>($7, 0))])\n"
+            + "    PinotLogicalTableScan(table=[[default, mytable]])\n",
+        
Map.of(CommonConstants.Broker.Request.QueryOptionKey.ENABLE_NULL_HANDLING, 
"false"));
+  }
+
+  @Test
+  public void mseIsNullAndComparisonWithNullHandling() {
+    setUseMultiStageQueryEngine(true);
+    String query = ""
+        + "SELECT 1 \n"
+        + "FROM " + getTableName() + " \n"
+        + "WHERE \n"
+        + "    salary IS NULL "
+        + "AND salary <> 0";
+
+    explainLogical(query,
+        "Execution Plan\n"
+            + "LogicalValues(tuples=[[]])\n",
+        
Map.of(CommonConstants.Broker.Request.QueryOptionKey.ENABLE_NULL_HANDLING, 
"true"));
+  }
+
   @Override
   protected void overrideBrokerConf(PinotConfiguration brokerConf) {
     
brokerConf.setProperty(CommonConstants.Broker.CONFIG_OF_BROKER_QUERY_ENABLE_NULL_HANDLING,
 "true");
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/sql/fun/PinotOperatorTable.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/sql/fun/PinotOperatorTable.java
index 2e380f8f94e..ef3ea9c78f7 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/sql/fun/PinotOperatorTable.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/sql/fun/PinotOperatorTable.java
@@ -32,6 +32,7 @@ import org.apache.calcite.sql.SqlIdentifier;
 import org.apache.calcite.sql.SqlKind;
 import org.apache.calcite.sql.SqlOperator;
 import org.apache.calcite.sql.SqlOperatorTable;
+import org.apache.calcite.sql.SqlPostfixOperator;
 import org.apache.calcite.sql.SqlSyntax;
 import org.apache.calcite.sql.fun.SqlLeadLagAggFunction;
 import org.apache.calcite.sql.fun.SqlMonotonicBinaryOperator;
@@ -67,10 +68,13 @@ import org.apache.pinot.segment.spi.AggregationFunctionType;
  */
 @SuppressWarnings("unused") // unused fields are accessed by reflection
 public class PinotOperatorTable implements SqlOperatorTable {
-  private static final Supplier<PinotOperatorTable> INSTANCE = 
Suppliers.memoize(PinotOperatorTable::new);
+  private static final Supplier<PinotOperatorTable> WITH_NULL_HANDLING =
+      Suppliers.memoize(() -> new PinotOperatorTable(true));
+  private static final Supplier<PinotOperatorTable> WITHOUT_NULL_HANDLING =
+      Suppliers.memoize(() -> new PinotOperatorTable(false));
 
-  public static PinotOperatorTable instance() {
-    return INSTANCE.get();
+  public static PinotOperatorTable instance(boolean nullHandlingEnabled) {
+    return nullHandlingEnabled ? WITH_NULL_HANDLING.get() : 
WITHOUT_NULL_HANDLING.get();
   }
 
   // The standard Calcite + and - operators don't support operations on 
TIMESTAMP types. However, Pinot supports these
@@ -97,6 +101,26 @@ public class PinotOperatorTable implements SqlOperatorTable 
{
           InferTypes.FIRST_KNOWN,
           
OperandTypes.MINUS_OPERATOR.or(OperandTypes.family(SqlTypeFamily.TIMESTAMP, 
SqlTypeFamily.TIMESTAMP)));
 
+  // See the usage of this attribute to see why it is needed
+  public static final SqlPostfixOperator PINOT_IS_NOT_NULL =
+      new SqlPostfixOperator(
+          "IS NOT NULL",
+          SqlKind.OTHER_FUNCTION,
+          28,
+          ReturnTypes.BOOLEAN_NOT_NULL,
+          InferTypes.VARCHAR_1024,
+          OperandTypes.ANY);
+
+  // See the usage of this attribute to see why it is needed
+  public static final SqlPostfixOperator PINOT_IS_NULL =
+      new SqlPostfixOperator(
+          "IS NULL",
+          SqlKind.OTHER_FUNCTION,
+          28,
+          ReturnTypes.BOOLEAN_NOT_NULL,
+          InferTypes.VARCHAR_1024,
+          OperandTypes.ANY);
+
   /**
    * This list includes the supported standard {@link SqlOperator}s defined in 
{@link SqlStdOperatorTable}.
    * NOTE: The operator order follows the same order as defined in {@link 
SqlStdOperatorTable} for easier search.
@@ -302,7 +326,7 @@ public class PinotOperatorTable implements SqlOperatorTable 
{
   private final Map<String, SqlOperator> _operatorMap;
   private final List<SqlOperator> _operatorList;
 
-  private PinotOperatorTable() {
+  private PinotOperatorTable(boolean nullHandlingEnabled) {
     Map<String, SqlOperator> operatorMap = new HashMap<>();
 
     // Register standard operators
@@ -315,6 +339,14 @@ public class PinotOperatorTable implements 
SqlOperatorTable {
         register(name, operator, operatorMap);
       }
     }
+    // When null handling is disabled, we need to use a fake IS NULL and IS 
NOT NULL operators to skip some
+    // standard Calcite simplifications which are correct in SQL, but 
incorrect for Pinot. For example, in SQL
+    // `col IS NOT NULL AND col <> 0` is simplified to `col <> 0` because `col 
IS NOT NULL` is always true if
+    // `col <> 0` is true. However, in Pinot, `IS NOT NULL` has a special 
meaning when using basic null handling
+    if (!nullHandlingEnabled) {
+      operatorMap.put(FunctionRegistry.canonicalize(PINOT_IS_NULL.getName()), 
PINOT_IS_NULL);
+      
operatorMap.put(FunctionRegistry.canonicalize(PINOT_IS_NOT_NULL.getName()), 
PINOT_IS_NOT_NULL);
+    }
 
     // Register Pinot operators
     for (SqlOperator operator : PINOT_OPERATORS) {
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
index 225e1dc4f6a..407af2284fa 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
@@ -147,8 +147,12 @@ public class QueryEnvironment {
     String database = config.getDatabase();
     _catalog = new PinotCatalog(config.getTableCache(), database);
     CalciteSchema rootSchema = CalciteSchema.createRootSchema(false, false, 
database, _catalog);
-    _config = 
Frameworks.newConfigBuilder().traitDefs().operatorTable(PinotOperatorTable.instance())
-        
.defaultSchema(rootSchema.plus()).sqlToRelConverterConfig(PinotRuleUtils.PINOT_SQL_TO_REL_CONFIG).build();
+    _config = Frameworks.newConfigBuilder()
+        .traitDefs()
+        
.operatorTable(PinotOperatorTable.instance(config.isNullHandlingEnabled()))
+        .defaultSchema(rootSchema.plus())
+        .sqlToRelConverterConfig(PinotRuleUtils.PINOT_SQL_TO_REL_CONFIG)
+        .build();
     _catalogReader = new PinotCatalogReader(
         rootSchema, List.of(database), _typeFactory, CONNECTION_CONFIG, 
config.isCaseSensitive());
     // default optProgram with no skip rule options and no use rule options
@@ -156,11 +160,17 @@ public class QueryEnvironment {
   }
 
   public QueryEnvironment(String database, TableCache tableCache, @Nullable 
WorkerManager workerManager) {
+    this(database, tableCache, workerManager, true);
+  }
+
+  public QueryEnvironment(String database, TableCache tableCache, @Nullable 
WorkerManager workerManager,
+      boolean nullHandlingEnabled) {
     this(configBuilder()
         .requestId(-1L)
         .database(database)
         .tableCache(tableCache)
         .workerManager(workerManager)
+        .isNullHandlingEnabled(nullHandlingEnabled)
         .build());
   }
 
@@ -647,6 +657,11 @@ public class QueryEnvironment {
     @Nullable
     TableCache getTableCache();
 
+    @Value.Default
+    default boolean isNullHandlingEnabled() {
+      return false;
+    }
+
     /**
      * Whether the schema should be considered case-insensitive.
      */
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RexExpressionUtils.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RexExpressionUtils.java
index afef239ff98..a3838af71ac 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RexExpressionUtils.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RexExpressionUtils.java
@@ -295,11 +295,28 @@ public class RexExpressionUtils {
 
   private static String getFunctionName(SqlOperator operator) {
     switch (operator.kind) {
-      case OTHER:
+      case OTHER: {
         // NOTE: SqlStdOperatorTable.CONCAT has OTHER kind and "||" as name
-        return operator.getName().equals("||") ? "CONCAT" : operator.getName();
-      case OTHER_FUNCTION:
-        return operator.getName();
+        String name = operator.getName();
+        return name.equals("||") ? "CONCAT" : name;
+      }
+      case OTHER_FUNCTION: {
+        // See https://github.com/apache/pinot/pull/16658
+        // If null handling is disabled, functions `is null` and `is not null` 
are registered as OTHER_FUNCTION with
+        // name "IS NULL" and "IS NOT NULL".
+        // They have to be registered with these names in order to be 
recognized in the SQL parser, but at the same
+        // time, servers won't recognize function names with spaces. This is 
why we convert them to "IS_NULL" and
+        // "IS_NOT_NULL" here to match the SqlKind name.
+        String name = operator.getName();
+        switch (name) {
+          case "IS NULL":
+            return SqlKind.IS_NULL.name();
+          case "IS NOT NULL":
+            return SqlKind.IS_NOT_NULL.name();
+          default:
+            return name;
+        }
+      }
       default:
         return operator.kind.name();
     }
diff --git 
a/pinot-query-planner/src/test/resources/queries/PhysicalOptimizerPlans.json 
b/pinot-query-planner/src/test/resources/queries/PhysicalOptimizerPlans.json
index e716a543bb0..147761f22d8 100644
--- a/pinot-query-planner/src/test/resources/queries/PhysicalOptimizerPlans.json
+++ b/pinot-query-planner/src/test/resources/queries/PhysicalOptimizerPlans.json
@@ -23,7 +23,7 @@
       },
       {
         "description": "Query that gets optimized to a Values node",
-        "sql": "SET usePhysicalOptimizer=true; EXPLAIN PLAN FOR SELECT * FROM 
a WHERE col1 IS NULL LIMIT 1",
+        "sql": "SET usePhysicalOptimizer=true; EXPLAIN PLAN FOR SELECT * FROM 
a WHERE col1 <> col1 LIMIT 1",
         "output": [
           "Execution Plan",
           "\nPhysicalSort(fetch=[1])",


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to