This is an automated email from the ASF dual-hosted git repository.

jtao pushed a commit to branch hotfix-empty-schema
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/hotfix-empty-schema by this 
push:
     new 85e4761a2d protect usage MSQE compiler for empty schema polyfill with 
config param disabled by default (#15078)
85e4761a2d is described below

commit 85e4761a2da62fe7c78db797e4c8c858f7c4bede
Author: Alberto Bastos <alberto.var...@startree.ai>
AuthorDate: Wed Feb 19 15:57:20 2025 +0100

    protect usage MSQE compiler for empty schema polyfill with config param 
disabled by default (#15078)
---
 .../BaseSingleStageBrokerRequestHandler.java       | 12 +++++++++--
 .../common/utils/config/QueryOptionsUtils.java     |  5 +++++
 .../tests/BasePauselessRealtimeIngestionTest.java  |  6 ++++++
 .../tests/EmptyResponseIntegrationTest.java        |  7 ++++++
 ...mentGenerationMinionClusterIntegrationTest.java |  8 +++++++
 .../pinot/query/parser/utils/ParserUtils.java      | 25 +++++++++++++---------
 .../apache/pinot/spi/utils/CommonConstants.java    |  8 +++++++
 7 files changed, 59 insertions(+), 12 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
index 41824eaf05..e83fe780cc 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
@@ -146,6 +146,7 @@ public abstract class BaseSingleStageBrokerRequestHandler 
extends BaseBrokerRequ
   protected final int _defaultQueryLimit;
   protected final Map<Long, QueryServers> _queriesById;
   protected final boolean _enableMultistageMigrationMetric;
+  protected final boolean _useMSEToFillEmptyResponseSchema;
   protected ExecutorService _multistageCompileExecutor;
   protected BlockingQueue<Pair<String, String>> _multistageCompileQueryQueue;
 
@@ -175,6 +176,9 @@ public abstract class BaseSingleStageBrokerRequestHandler 
extends BaseBrokerRequ
       _multistageCompileQueryQueue = new LinkedBlockingQueue<>(1000);
     }
 
+    _useMSEToFillEmptyResponseSchema = 
_config.getProperty(Broker.USE_MSE_TO_FILL_EMPTY_RESPONSE_SCHEMA,
+        Broker.DEFAULT_USE_MSE_TO_FILL_EMPTY_RESPONSE_SCHEMA);
+
     LOGGER.info("Initialized {} with broker id: {}, timeout: {}ms, query 
response limit: {}, "
             + "default query limit {}, query log max length: {}, query log max 
rate: {}, query cancellation "
             + "enabled: {}", getClass().getSimpleName(), _brokerId, 
_brokerTimeoutMs, _queryResponseLimit,
@@ -853,7 +857,9 @@ public abstract class BaseSingleStageBrokerRequestHandler 
extends BaseBrokerRequ
       // server returns STRING as default dataType for all columns in (some) 
scenarios where no rows are returned
       // this is an attempt to return more faithful information based on other 
sources
       if (brokerResponse.getNumRowsResultSet() == 0) {
-        ParserUtils.fillEmptyResponseSchema(brokerResponse, _tableCache, 
schema, database, query);
+        boolean useMSE = QueryOptionsUtils.isUseMSEToFillEmptySchema(
+            pinotQuery.getQueryOptions(), _useMSEToFillEmptyResponseSchema);
+        ParserUtils.fillEmptyResponseSchema(useMSE, brokerResponse, 
_tableCache, schema, database, query);
       }
 
       // Set total query processing time
@@ -928,7 +934,9 @@ public abstract class BaseSingleStageBrokerRequestHandler 
extends BaseBrokerRequ
 
     // Send empty response since we don't need to evaluate either offline or 
realtime request.
     BrokerResponseNative brokerResponse = BrokerResponseNative.empty();
-    ParserUtils.fillEmptyResponseSchema(brokerResponse, _tableCache, schema, 
database, query);
+    boolean useMSE = QueryOptionsUtils.isUseMSEToFillEmptySchema(
+        pinotQuery.getQueryOptions(), _useMSEToFillEmptyResponseSchema);
+    ParserUtils.fillEmptyResponseSchema(useMSE, brokerResponse, _tableCache, 
schema, database, query);
     brokerResponse.setTimeUsedMs(System.currentTimeMillis() - 
requestContext.getRequestArrivalTimeMillis());
     _queryLogger.log(
         new QueryLogger.QueryLogParams(requestContext, tableName, 
brokerResponse, requesterIdentity, null));
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
index 5e8ba86643..3aa24cd29f 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
@@ -356,6 +356,11 @@ public class QueryOptionsUtils {
     return 
Boolean.parseBoolean(queryOptions.get(QueryOptionKey.IS_SECONDARY_WORKLOAD));
   }
 
+  public static Boolean isUseMSEToFillEmptySchema(Map<String, String> 
queryOptions, boolean defaultValue) {
+    String useMSEToFillEmptySchema = 
queryOptions.get(QueryOptionKey.USE_MSE_TO_FILL_EMPTY_RESPONSE_SCHEMA);
+    return useMSEToFillEmptySchema != null ? 
Boolean.parseBoolean(useMSEToFillEmptySchema) : defaultValue;
+  }
+
   @Nullable
   private static Integer uncheckedParseInt(String optionName, @Nullable String 
optionValue) {
     if (optionValue == null) {
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BasePauselessRealtimeIngestionTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BasePauselessRealtimeIngestionTest.java
index 5d7f7caa4a..036fc50079 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BasePauselessRealtimeIngestionTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BasePauselessRealtimeIngestionTest.java
@@ -88,6 +88,12 @@ public abstract class BasePauselessRealtimeIngestionTest 
extends BaseClusterInte
         500);
   }
 
+  @Override
+  protected void overrideBrokerConf(PinotConfiguration brokerConf) {
+    super.overrideBrokerConf(brokerConf);
+    
brokerConf.setProperty(CommonConstants.Broker.USE_MSE_TO_FILL_EMPTY_RESPONSE_SCHEMA,
 true);
+  }
+
   @Override
   protected void overrideServerConf(PinotConfiguration serverConf) {
     try {
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/EmptyResponseIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/EmptyResponseIntegrationTest.java
index 5c0c3454a9..97470be599 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/EmptyResponseIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/EmptyResponseIntegrationTest.java
@@ -36,6 +36,7 @@ import org.apache.pinot.spi.config.table.RoutingConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.InstanceTypeUtils;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
@@ -82,6 +83,12 @@ public class EmptyResponseIntegrationTest extends 
BaseClusterIntegrationTestSet
             CompressionCodec.MV_ENTRY_DICT, null));
   }
 
+  @Override
+  protected void overrideBrokerConf(PinotConfiguration brokerConf) {
+    super.overrideBrokerConf(brokerConf);
+    
brokerConf.setProperty(CommonConstants.Broker.USE_MSE_TO_FILL_EMPTY_RESPONSE_SCHEMA,
 true);
+  }
+
   @BeforeClass
   public void setUp()
       throws Exception {
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentGenerationMinionClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentGenerationMinionClusterIntegrationTest.java
index bd4673f853..d9352a801c 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentGenerationMinionClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentGenerationMinionClusterIntegrationTest.java
@@ -29,7 +29,9 @@ import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.config.task.AdhocTaskConfig;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
+import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.JsonUtils;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.apache.pinot.util.TestUtils;
@@ -46,6 +48,12 @@ import static org.testng.Assert.assertEquals;
 public class SegmentGenerationMinionClusterIntegrationTest extends 
BaseClusterIntegrationTest {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(SegmentGenerationMinionClusterIntegrationTest.class);
 
+  @Override
+  protected void overrideBrokerConf(PinotConfiguration brokerConf) {
+    super.overrideBrokerConf(brokerConf);
+    
brokerConf.setProperty(CommonConstants.Broker.USE_MSE_TO_FILL_EMPTY_RESPONSE_SCHEMA,
 true);
+  }
+
   @BeforeClass
   public void setUp()
       throws Exception {
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/utils/ParserUtils.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/utils/ParserUtils.java
index ec58dd296c..eb308f45cd 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/utils/ParserUtils.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/utils/ParserUtils.java
@@ -62,27 +62,32 @@ public class ParserUtils {
    * 2. Data schema has all columns set to default type (STRING) (when all 
segments pruned on server).
    *
    * Priority is:
-   * - Types from multi-stage engine validation for the given query.
+   * - Types from multi-stage engine validation for the given query (if 
allowed).
    * - Types from schema for the given table (only applicable to selection 
fields).
    * - Types from single-stage engine response (no action).
    *
    * Multi-stage engine schema will be available only if query compiles.
    */
-  public static void fillEmptyResponseSchema(BrokerResponse response, 
TableCache tableCache, Schema schema,
-      String database, String query) {
+  public static void fillEmptyResponseSchema(boolean useMSE, BrokerResponse 
response, TableCache tableCache,
+      Schema schema, String database, String query) {
     Preconditions.checkState(response.getNumRowsResultSet() == 0, "Cannot fill 
schema for non-empty response");
 
     DataSchema dataSchema = response.getResultTable() != null ? 
response.getResultTable().getDataSchema() : null;
 
     List<RelDataTypeField> dataTypeFields = null;
-    try {
-      QueryEnvironment queryEnvironment = new QueryEnvironment(database, 
tableCache, null);
-      RelRoot node = queryEnvironment.getRelRootIfCanCompile(query);
-      if (node != null && node.validatedRowType != null) {
-        dataTypeFields = node.validatedRowType.getFieldList();
+    // Turn on (with pinot.broker.use.mse.to.fill.empty.response.schema=true 
or query option
+    // useMSEToFillEmptyResponseSchema=true) only for clusters where no 
queries with huge IN clauses are expected
+    // (see https://github.com/apache/pinot/issues/15064)
+    if (useMSE) {
+      try {
+        QueryEnvironment queryEnvironment = new QueryEnvironment(database, 
tableCache, null);
+        RelRoot node = queryEnvironment.getRelRootIfCanCompile(query);
+        if (node != null && node.validatedRowType != null) {
+          dataTypeFields = node.validatedRowType.getFieldList();
+        }
+      } catch (Exception ignored) {
+        // Ignored
       }
-    } catch (Exception ignored) {
-      // Ignored
     }
 
     if (dataSchema == null && dataTypeFields == null) {
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index f1695e1119..736ec34b73 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -557,6 +557,10 @@ public class CommonConstants {
         public static final String GET_CURSOR = "getCursor";
         // Number of rows that the cursor should contain
         public static final String CURSOR_NUM_ROWS = "cursorNumRows";
+
+        // Use MSE compiler when trying to fill a response with no schema 
metadata
+        // (overrides the "pinot.broker.use.mse.to.fill.empty.response.schema" 
broker conf)
+        public static final String USE_MSE_TO_FILL_EMPTY_RESPONSE_SCHEMA = 
"useMSEToFillEmptyResponseSchema";
       }
 
       public static class QueryOptionValue {
@@ -677,6 +681,10 @@ public class CommonConstants {
     }
 
     public static final String PREFIX_OF_CONFIG_OF_PINOT_FS_FACTORY = 
"pinot.broker.storage.factory";
+
+    public static final String USE_MSE_TO_FILL_EMPTY_RESPONSE_SCHEMA =
+        "pinot.broker.use.mse.to.fill.empty.response.schema";
+    public static final boolean DEFAULT_USE_MSE_TO_FILL_EMPTY_RESPONSE_SCHEMA 
= false;
   }
 
   public static class Server {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to