This is an automated email from the ASF dual-hosted git repository.

ankitsultana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 84f30208b7 [multistage] Introducing dynamic filtering semi-join broker 
level configuration (#15402)
84f30208b7 is described below

commit 84f30208b700913c44453312b4da5202c0134955
Author: Shaurya Chaturvedi <shauryach...@gmail.com>
AuthorDate: Wed Apr 2 12:00:44 2025 -0700

    [multistage] Introducing dynamic filtering semi-join broker level 
configuration (#15402)
---
 .../MultiStageBrokerRequestHandler.java            |  4 ++++
 .../org/apache/pinot/query/QueryEnvironment.java   | 23 +++++++++++++++++++---
 .../apache/pinot/spi/utils/CommonConstants.java    |  3 +++
 3 files changed, 27 insertions(+), 3 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
index 2dedf7b244..44e74ccc70 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
@@ -311,6 +311,9 @@ public class MultiStageBrokerRequestHandler extends 
BaseBrokerRequestHandler {
         CommonConstants.Broker.DEFAULT_OF_SPOOLS);
     boolean defaultEnableGroupTrim = 
_config.getProperty(CommonConstants.Broker.CONFIG_OF_MSE_ENABLE_GROUP_TRIM,
         CommonConstants.Broker.DEFAULT_MSE_ENABLE_GROUP_TRIM);
+    boolean defaultEnableDynamicFilteringSemiJoin = _config.getProperty(
+        
CommonConstants.Broker.CONFIG_OF_BROKER_ENABLE_DYNAMIC_FILTERING_SEMI_JOIN,
+        CommonConstants.Broker.DEFAULT_ENABLE_DYNAMIC_FILTERING_SEMI_JOIN);
     return QueryEnvironment.configBuilder()
         .database(database)
         .tableCache(_tableCache)
@@ -318,6 +321,7 @@ public class MultiStageBrokerRequestHandler extends 
BaseBrokerRequestHandler {
         .defaultInferPartitionHint(inferPartitionHint)
         .defaultUseSpools(defaultUseSpool)
         .defaultEnableGroupTrim(defaultEnableGroupTrim)
+        
.defaultEnableDynamicFilteringSemiJoin(defaultEnableDynamicFilteringSemiJoin)
         .build();
   }
 
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
index 1cec44a987..1e211ec64d 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
@@ -55,6 +55,7 @@ import org.apache.calcite.tools.FrameworkConfig;
 import org.apache.calcite.tools.Frameworks;
 import org.apache.calcite.tools.RelBuilder;
 import org.apache.pinot.calcite.rel.rules.PinotImplicitTableHintRule;
+import org.apache.pinot.calcite.rel.rules.PinotJoinToDynamicBroadcastRule;
 import org.apache.pinot.calcite.rel.rules.PinotQueryRuleSets;
 import org.apache.pinot.calcite.rel.rules.PinotRelDistributionTraitRule;
 import org.apache.pinot.calcite.rel.rules.PinotRuleUtils;
@@ -149,7 +150,7 @@ public class QueryEnvironment {
    */
   private PlannerContext getPlannerContext(SqlNodeAndOptions 
sqlNodeAndOptions) {
     WorkerManager workerManager = getWorkerManager(sqlNodeAndOptions);
-    HepProgram traitProgram = getTraitProgram(workerManager);
+    HepProgram traitProgram = getTraitProgram(workerManager, _envConfig);
     SqlExplainFormat format = SqlExplainFormat.DOT;
     if (sqlNodeAndOptions.getSqlNode().getKind().equals(SqlKind.EXPLAIN)) {
       SqlExplain explain = (SqlExplain) sqlNodeAndOptions.getSqlNode();
@@ -465,7 +466,7 @@ public class QueryEnvironment {
     return hepProgramBuilder.build();
   }
 
-  private static HepProgram getTraitProgram(@Nullable WorkerManager 
workerManager) {
+  private static HepProgram getTraitProgram(@Nullable WorkerManager 
workerManager, Config config) {
     HepProgramBuilder hepProgramBuilder = new HepProgramBuilder();
 
     // Set the match order as BOTTOM_UP.
@@ -474,7 +475,9 @@ public class QueryEnvironment {
     // ----
     // Run pinot specific rules that should run after all other rules, using 1 
HepInstruction per rule.
     for (RelOptRule relOptRule : PinotQueryRuleSets.PINOT_POST_RULES) {
-      hepProgramBuilder.addRuleInstance(relOptRule);
+      if (isEligibleQueryPostRule(relOptRule, config)) {
+        hepProgramBuilder.addRuleInstance(relOptRule);
+      }
     }
 
     // apply RelDistribution trait to all nodes
@@ -486,6 +489,14 @@ public class QueryEnvironment {
     return hepProgramBuilder.build();
   }
 
+  // This method is used to filter out post rules that are not eligible to run 
based on the config.
+  private static boolean isEligibleQueryPostRule(RelOptRule relOptRule, Config 
config) {
+    if (relOptRule instanceof PinotJoinToDynamicBroadcastRule && 
!config.defaultEnableDynamicFilteringSemiJoin()) {
+      return false;
+    }
+    return true;
+  }
+
   public static ImmutableQueryEnvironment.Config.Builder configBuilder() {
     return ImmutableQueryEnvironment.Config.builder();
   }
@@ -532,11 +543,17 @@ public class QueryEnvironment {
       return CommonConstants.Broker.DEFAULT_OF_SPOOLS;
     }
 
+
     @Value.Default
     default boolean defaultEnableGroupTrim() {
       return CommonConstants.Broker.DEFAULT_MSE_ENABLE_GROUP_TRIM;
     }
 
+    @Value.Default
+    default boolean defaultEnableDynamicFilteringSemiJoin() {
+      return CommonConstants.Broker.DEFAULT_ENABLE_DYNAMIC_FILTERING_SEMI_JOIN;
+    }
+
     /**
      * Returns the worker manager.
      *
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index a5a4b60bff..5f6b186f70 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -456,6 +456,9 @@ public class CommonConstants {
     public static final String 
CONFIG_OF_BROKER_ENABLE_MULTISTAGE_MIGRATION_METRIC =
         "pinot.broker.enable.multistage.migration.metric";
     public static final boolean DEFAULT_ENABLE_MULTISTAGE_MIGRATION_METRIC = 
false;
+    public static final String 
CONFIG_OF_BROKER_ENABLE_DYNAMIC_FILTERING_SEMI_JOIN =
+            "pinot.broker.enable.dynamic.filtering.semijoin";
+    public static final boolean DEFAULT_ENABLE_DYNAMIC_FILTERING_SEMI_JOIN = 
true;
 
     public static class Request {
       public static final String SQL = "sql";


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to