This is an automated email from the ASF dual-hosted git repository. ankitsultana pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 84f30208b7 [multistage] Introducing dynamic filtering semi-join broker level configuration (#15402) 84f30208b7 is described below commit 84f30208b700913c44453312b4da5202c0134955 Author: Shaurya Chaturvedi <shauryach...@gmail.com> AuthorDate: Wed Apr 2 12:00:44 2025 -0700 [multistage] Introducing dynamic filtering semi-join broker level configuration (#15402) --- .../MultiStageBrokerRequestHandler.java | 4 ++++ .../org/apache/pinot/query/QueryEnvironment.java | 23 +++++++++++++++++++--- .../apache/pinot/spi/utils/CommonConstants.java | 3 +++ 3 files changed, 27 insertions(+), 3 deletions(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java index 2dedf7b244..44e74ccc70 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java @@ -311,6 +311,9 @@ public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler { CommonConstants.Broker.DEFAULT_OF_SPOOLS); boolean defaultEnableGroupTrim = _config.getProperty(CommonConstants.Broker.CONFIG_OF_MSE_ENABLE_GROUP_TRIM, CommonConstants.Broker.DEFAULT_MSE_ENABLE_GROUP_TRIM); + boolean defaultEnableDynamicFilteringSemiJoin = _config.getProperty( + CommonConstants.Broker.CONFIG_OF_BROKER_ENABLE_DYNAMIC_FILTERING_SEMI_JOIN, + CommonConstants.Broker.DEFAULT_ENABLE_DYNAMIC_FILTERING_SEMI_JOIN); return QueryEnvironment.configBuilder() .database(database) .tableCache(_tableCache) @@ -318,6 +321,7 @@ public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler { .defaultInferPartitionHint(inferPartitionHint) .defaultUseSpools(defaultUseSpool) .defaultEnableGroupTrim(defaultEnableGroupTrim) + .defaultEnableDynamicFilteringSemiJoin(defaultEnableDynamicFilteringSemiJoin) .build(); } diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java index 1cec44a987..1e211ec64d 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java @@ -55,6 +55,7 @@ import org.apache.calcite.tools.FrameworkConfig; import org.apache.calcite.tools.Frameworks; import org.apache.calcite.tools.RelBuilder; import org.apache.pinot.calcite.rel.rules.PinotImplicitTableHintRule; +import org.apache.pinot.calcite.rel.rules.PinotJoinToDynamicBroadcastRule; import org.apache.pinot.calcite.rel.rules.PinotQueryRuleSets; import org.apache.pinot.calcite.rel.rules.PinotRelDistributionTraitRule; import org.apache.pinot.calcite.rel.rules.PinotRuleUtils; @@ -149,7 +150,7 @@ public class QueryEnvironment { */ private PlannerContext getPlannerContext(SqlNodeAndOptions sqlNodeAndOptions) { WorkerManager workerManager = getWorkerManager(sqlNodeAndOptions); - HepProgram traitProgram = getTraitProgram(workerManager); + HepProgram traitProgram = getTraitProgram(workerManager, _envConfig); SqlExplainFormat format = SqlExplainFormat.DOT; if (sqlNodeAndOptions.getSqlNode().getKind().equals(SqlKind.EXPLAIN)) { SqlExplain explain = (SqlExplain) sqlNodeAndOptions.getSqlNode(); @@ -465,7 +466,7 @@ public class QueryEnvironment { return hepProgramBuilder.build(); } - private static HepProgram getTraitProgram(@Nullable WorkerManager workerManager) { + private static HepProgram getTraitProgram(@Nullable WorkerManager workerManager, Config config) { HepProgramBuilder hepProgramBuilder = new HepProgramBuilder(); // Set the match order as BOTTOM_UP. @@ -474,7 +475,9 @@ public class QueryEnvironment { // ---- // Run pinot specific rules that should run after all other rules, using 1 HepInstruction per rule. for (RelOptRule relOptRule : PinotQueryRuleSets.PINOT_POST_RULES) { - hepProgramBuilder.addRuleInstance(relOptRule); + if (isEligibleQueryPostRule(relOptRule, config)) { + hepProgramBuilder.addRuleInstance(relOptRule); + } } // apply RelDistribution trait to all nodes @@ -486,6 +489,14 @@ public class QueryEnvironment { return hepProgramBuilder.build(); } + // This method is used to filter out post rules that are not eligible to run based on the config. + private static boolean isEligibleQueryPostRule(RelOptRule relOptRule, Config config) { + if (relOptRule instanceof PinotJoinToDynamicBroadcastRule && !config.defaultEnableDynamicFilteringSemiJoin()) { + return false; + } + return true; + } + public static ImmutableQueryEnvironment.Config.Builder configBuilder() { return ImmutableQueryEnvironment.Config.builder(); } @@ -532,11 +543,17 @@ public class QueryEnvironment { return CommonConstants.Broker.DEFAULT_OF_SPOOLS; } + @Value.Default default boolean defaultEnableGroupTrim() { return CommonConstants.Broker.DEFAULT_MSE_ENABLE_GROUP_TRIM; } + @Value.Default + default boolean defaultEnableDynamicFilteringSemiJoin() { + return CommonConstants.Broker.DEFAULT_ENABLE_DYNAMIC_FILTERING_SEMI_JOIN; + } + /** * Returns the worker manager. * diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java index a5a4b60bff..5f6b186f70 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java @@ -456,6 +456,9 @@ public class CommonConstants { public static final String CONFIG_OF_BROKER_ENABLE_MULTISTAGE_MIGRATION_METRIC = "pinot.broker.enable.multistage.migration.metric"; public static final boolean DEFAULT_ENABLE_MULTISTAGE_MIGRATION_METRIC = false; + public static final String CONFIG_OF_BROKER_ENABLE_DYNAMIC_FILTERING_SEMI_JOIN = + "pinot.broker.enable.dynamic.filtering.semijoin"; + public static final boolean DEFAULT_ENABLE_DYNAMIC_FILTERING_SEMI_JOIN = true; public static class Request { public static final String SQL = "sql"; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org