This is an automated email from the ASF dual-hosted git repository. jtao pushed a commit to branch hotfix-empty-schema in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/hotfix-empty-schema by this push: new 85e4761a2d protect usage MSQE compiler for empty schema polyfill with config param disabled by default (#15078) 85e4761a2d is described below commit 85e4761a2da62fe7c78db797e4c8c858f7c4bede Author: Alberto Bastos <alberto.var...@startree.ai> AuthorDate: Wed Feb 19 15:57:20 2025 +0100 protect usage MSQE compiler for empty schema polyfill with config param disabled by default (#15078) --- .../BaseSingleStageBrokerRequestHandler.java | 12 +++++++++-- .../common/utils/config/QueryOptionsUtils.java | 5 +++++ .../tests/BasePauselessRealtimeIngestionTest.java | 6 ++++++ .../tests/EmptyResponseIntegrationTest.java | 7 ++++++ ...mentGenerationMinionClusterIntegrationTest.java | 8 +++++++ .../pinot/query/parser/utils/ParserUtils.java | 25 +++++++++++++--------- .../apache/pinot/spi/utils/CommonConstants.java | 8 +++++++ 7 files changed, 59 insertions(+), 12 deletions(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java index 41824eaf05..e83fe780cc 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java @@ -146,6 +146,7 @@ public abstract class BaseSingleStageBrokerRequestHandler extends BaseBrokerRequ protected final int _defaultQueryLimit; protected final Map<Long, QueryServers> _queriesById; protected final boolean _enableMultistageMigrationMetric; + protected final boolean _useMSEToFillEmptyResponseSchema; protected ExecutorService _multistageCompileExecutor; protected BlockingQueue<Pair<String, String>> _multistageCompileQueryQueue; @@ -175,6 +176,9 @@ public abstract class BaseSingleStageBrokerRequestHandler extends BaseBrokerRequ _multistageCompileQueryQueue = new LinkedBlockingQueue<>(1000); } + _useMSEToFillEmptyResponseSchema = _config.getProperty(Broker.USE_MSE_TO_FILL_EMPTY_RESPONSE_SCHEMA, + Broker.DEFAULT_USE_MSE_TO_FILL_EMPTY_RESPONSE_SCHEMA); + LOGGER.info("Initialized {} with broker id: {}, timeout: {}ms, query response limit: {}, " + "default query limit {}, query log max length: {}, query log max rate: {}, query cancellation " + "enabled: {}", getClass().getSimpleName(), _brokerId, _brokerTimeoutMs, _queryResponseLimit, @@ -853,7 +857,9 @@ public abstract class BaseSingleStageBrokerRequestHandler extends BaseBrokerRequ // server returns STRING as default dataType for all columns in (some) scenarios where no rows are returned // this is an attempt to return more faithful information based on other sources if (brokerResponse.getNumRowsResultSet() == 0) { - ParserUtils.fillEmptyResponseSchema(brokerResponse, _tableCache, schema, database, query); + boolean useMSE = QueryOptionsUtils.isUseMSEToFillEmptySchema( + pinotQuery.getQueryOptions(), _useMSEToFillEmptyResponseSchema); + ParserUtils.fillEmptyResponseSchema(useMSE, brokerResponse, _tableCache, schema, database, query); } // Set total query processing time @@ -928,7 +934,9 @@ public abstract class BaseSingleStageBrokerRequestHandler extends BaseBrokerRequ // Send empty response since we don't need to evaluate either offline or realtime request. BrokerResponseNative brokerResponse = BrokerResponseNative.empty(); - ParserUtils.fillEmptyResponseSchema(brokerResponse, _tableCache, schema, database, query); + boolean useMSE = QueryOptionsUtils.isUseMSEToFillEmptySchema( + pinotQuery.getQueryOptions(), _useMSEToFillEmptyResponseSchema); + ParserUtils.fillEmptyResponseSchema(useMSE, brokerResponse, _tableCache, schema, database, query); brokerResponse.setTimeUsedMs(System.currentTimeMillis() - requestContext.getRequestArrivalTimeMillis()); _queryLogger.log( new QueryLogger.QueryLogParams(requestContext, tableName, brokerResponse, requesterIdentity, null)); diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java index 5e8ba86643..3aa24cd29f 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java @@ -356,6 +356,11 @@ public class QueryOptionsUtils { return Boolean.parseBoolean(queryOptions.get(QueryOptionKey.IS_SECONDARY_WORKLOAD)); } + public static Boolean isUseMSEToFillEmptySchema(Map<String, String> queryOptions, boolean defaultValue) { + String useMSEToFillEmptySchema = queryOptions.get(QueryOptionKey.USE_MSE_TO_FILL_EMPTY_RESPONSE_SCHEMA); + return useMSEToFillEmptySchema != null ? Boolean.parseBoolean(useMSEToFillEmptySchema) : defaultValue; + } + @Nullable private static Integer uncheckedParseInt(String optionName, @Nullable String optionValue) { if (optionValue == null) { diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BasePauselessRealtimeIngestionTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BasePauselessRealtimeIngestionTest.java index 5d7f7caa4a..036fc50079 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BasePauselessRealtimeIngestionTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BasePauselessRealtimeIngestionTest.java @@ -88,6 +88,12 @@ public abstract class BasePauselessRealtimeIngestionTest extends BaseClusterInte 500); } + @Override + protected void overrideBrokerConf(PinotConfiguration brokerConf) { + super.overrideBrokerConf(brokerConf); + brokerConf.setProperty(CommonConstants.Broker.USE_MSE_TO_FILL_EMPTY_RESPONSE_SCHEMA, true); + } + @Override protected void overrideServerConf(PinotConfiguration serverConf) { try { diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/EmptyResponseIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/EmptyResponseIntegrationTest.java index 5c0c3454a9..97470be599 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/EmptyResponseIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/EmptyResponseIntegrationTest.java @@ -36,6 +36,7 @@ import org.apache.pinot.spi.config.table.RoutingConfig; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.InstanceTypeUtils; import org.apache.pinot.spi.utils.builder.TableNameBuilder; @@ -82,6 +83,12 @@ public class EmptyResponseIntegrationTest extends BaseClusterIntegrationTestSet CompressionCodec.MV_ENTRY_DICT, null)); } + @Override + protected void overrideBrokerConf(PinotConfiguration brokerConf) { + super.overrideBrokerConf(brokerConf); + brokerConf.setProperty(CommonConstants.Broker.USE_MSE_TO_FILL_EMPTY_RESPONSE_SCHEMA, true); + } + @BeforeClass public void setUp() throws Exception { diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentGenerationMinionClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentGenerationMinionClusterIntegrationTest.java index bd4673f853..d9352a801c 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentGenerationMinionClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentGenerationMinionClusterIntegrationTest.java @@ -29,7 +29,9 @@ import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.config.task.AdhocTaskConfig; import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties; +import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.JsonUtils; import org.apache.pinot.spi.utils.builder.TableConfigBuilder; import org.apache.pinot.util.TestUtils; @@ -46,6 +48,12 @@ import static org.testng.Assert.assertEquals; public class SegmentGenerationMinionClusterIntegrationTest extends BaseClusterIntegrationTest { private static final Logger LOGGER = LoggerFactory.getLogger(SegmentGenerationMinionClusterIntegrationTest.class); + @Override + protected void overrideBrokerConf(PinotConfiguration brokerConf) { + super.overrideBrokerConf(brokerConf); + brokerConf.setProperty(CommonConstants.Broker.USE_MSE_TO_FILL_EMPTY_RESPONSE_SCHEMA, true); + } + @BeforeClass public void setUp() throws Exception { diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/utils/ParserUtils.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/utils/ParserUtils.java index ec58dd296c..eb308f45cd 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/utils/ParserUtils.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/parser/utils/ParserUtils.java @@ -62,27 +62,32 @@ public class ParserUtils { * 2. Data schema has all columns set to default type (STRING) (when all segments pruned on server). * * Priority is: - * - Types from multi-stage engine validation for the given query. + * - Types from multi-stage engine validation for the given query (if allowed). * - Types from schema for the given table (only applicable to selection fields). * - Types from single-stage engine response (no action). * * Multi-stage engine schema will be available only if query compiles. */ - public static void fillEmptyResponseSchema(BrokerResponse response, TableCache tableCache, Schema schema, - String database, String query) { + public static void fillEmptyResponseSchema(boolean useMSE, BrokerResponse response, TableCache tableCache, + Schema schema, String database, String query) { Preconditions.checkState(response.getNumRowsResultSet() == 0, "Cannot fill schema for non-empty response"); DataSchema dataSchema = response.getResultTable() != null ? response.getResultTable().getDataSchema() : null; List<RelDataTypeField> dataTypeFields = null; - try { - QueryEnvironment queryEnvironment = new QueryEnvironment(database, tableCache, null); - RelRoot node = queryEnvironment.getRelRootIfCanCompile(query); - if (node != null && node.validatedRowType != null) { - dataTypeFields = node.validatedRowType.getFieldList(); + // Turn on (with pinot.broker.use.mse.to.fill.empty.response.schema=true or query option + // useMSEToFillEmptyResponseSchema=true) only for clusters where no queries with huge IN clauses are expected + // (see https://github.com/apache/pinot/issues/15064) + if (useMSE) { + try { + QueryEnvironment queryEnvironment = new QueryEnvironment(database, tableCache, null); + RelRoot node = queryEnvironment.getRelRootIfCanCompile(query); + if (node != null && node.validatedRowType != null) { + dataTypeFields = node.validatedRowType.getFieldList(); + } + } catch (Exception ignored) { + // Ignored } - } catch (Exception ignored) { - // Ignored } if (dataSchema == null && dataTypeFields == null) { diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java index f1695e1119..736ec34b73 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java @@ -557,6 +557,10 @@ public class CommonConstants { public static final String GET_CURSOR = "getCursor"; // Number of rows that the cursor should contain public static final String CURSOR_NUM_ROWS = "cursorNumRows"; + + // Use MSE compiler when trying to fill a response with no schema metadata + // (overrides the "pinot.broker.use.mse.to.fill.empty.response.schema" broker conf) + public static final String USE_MSE_TO_FILL_EMPTY_RESPONSE_SCHEMA = "useMSEToFillEmptyResponseSchema"; } public static class QueryOptionValue { @@ -677,6 +681,10 @@ public class CommonConstants { } public static final String PREFIX_OF_CONFIG_OF_PINOT_FS_FACTORY = "pinot.broker.storage.factory"; + + public static final String USE_MSE_TO_FILL_EMPTY_RESPONSE_SCHEMA = + "pinot.broker.use.mse.to.fill.empty.response.schema"; + public static final boolean DEFAULT_USE_MSE_TO_FILL_EMPTY_RESPONSE_SCHEMA = false; } public static class Server { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org