Jackie-Jiang commented on code in PR #13943:
URL: https://github.com/apache/pinot/pull/13943#discussion_r1801671644


##########
pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java:
##########
@@ -398,8 +447,44 @@ private static HepProgram getTraitProgram() {
     }
 
     // apply RelDistribution trait to all nodes
+    if (workerManager != null) {
+      
hepProgramBuilder.addRuleInstance(PinotImplicitTableHintRule.withWorkerManager(workerManager));
+    }
     hepProgramBuilder.addRuleInstance(PinotRelDistributionTraitRule.INSTANCE);
 
     return hepProgramBuilder.build();
   }
+
+  public static ImmutableQueryEnvironment.Config.Builder configBuilder() {
+    return ImmutableQueryEnvironment.Config.builder();
+  }
+
+  @Value.Immutable
+  public interface Config {
+    String getDatabase();
+
+    @Nullable // In theory nullable only in tests. We should fix 
LiteralOnlyBrokerRequestTest to not need this.
+    TableCache getTableCache();
+
+    /**
+     * Whether to use implicit colocated join by default.
+     *
+     * This is treated as the default value for the broker and it is expected 
to be obtained from a Pinot configuration.
+     * This default value can be always overridden at query level by the query 
option
+     * {@link 
CommonConstants.Broker.Request.QueryOptionKey#AUTO_PARTITION_HINT}.
+     */
+    @Value.Default
+    default boolean useImplicitColocatedByDefault() {

Review Comment:
   Update method name and javadoc



##########
pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java:
##########
@@ -128,7 +128,16 @@ protected BrokerResponse handleRequest(long requestId, 
String query, SqlNodeAndO
       Long timeoutMsFromQueryOption = 
QueryOptionsUtils.getTimeoutMs(queryOptions);
       queryTimeoutMs = timeoutMsFromQueryOption != null ? 
timeoutMsFromQueryOption : _brokerTimeoutMs;
       database = DatabaseUtils.extractDatabaseFromQueryRequest(queryOptions, 
httpHeaders);
-      QueryEnvironment queryEnvironment = new QueryEnvironment(database, 
_tableCache, _workerManager);
+      boolean useImplicitColocatedByDefault = 
_config.getProperty(CommonConstants.Broker.AUTO_PARTITION_HINT,

Review Comment:
   Please update the variable name



##########
pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java:
##########
@@ -343,7 +343,9 @@ public static class Broker {
 
     public static final String CONFIG_OF_ENABLE_PARTITION_METADATA_MANAGER =
         "pinot.broker.enable.partition.metadata.manager";
-    public static final boolean DEFAULT_ENABLE_PARTITION_METADATA_MANAGER = 
false;
+    public static final boolean DEFAULT_ENABLE_PARTITION_METADATA_MANAGER = 
true;
+    public static final String AUTO_PARTITION_HINT = 
"pinot.broker.multistage.auto.partition.hint";

Review Comment:
   To keep it consistent, name it `CONFIG_OF_AUTO_PARTITION_HINT`.
   For other configs, they usually start with a verb, so I personally prefer 
`autoApplyPartitionHint`. This is optional, but let's keep the default value 
name and config name consistent



##########
pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/hint/PinotHintStrategyTable.java:
##########
@@ -108,4 +110,25 @@ public static String getHintOption(List<RelHint> hintList, 
String hintName, Stri
     }
     return null;
   }
+
+  /**
+   * Get the first hint that has the specified name.
+   */
+  public static RelHint findFirst(Hintable hintable, String hintName) {

Review Comment:
   Return is also `Nullable`.
   Can we keep the utils consistent naming and calling convention? I'd 
naturally put this next to `containsHint()` and call it `getHint()`



##########
pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java:
##########
@@ -100,26 +111,64 @@ public class QueryEnvironment {
   private final FrameworkConfig _config;
   private final CalciteCatalogReader _catalogReader;
   private final HepProgram _optProgram;
-  private final HepProgram _traitProgram;
-
-  // Pinot extensions
-  private final TableCache _tableCache;
-  private final WorkerManager _workerManager;
+  private final Config _envConfig;
 
-  public QueryEnvironment(String database, TableCache tableCache, @Nullable 
WorkerManager workerManager) {
-    PinotCatalog catalog = new PinotCatalog(tableCache, database);
+  public QueryEnvironment(Config config) {
+    _envConfig = config;
+    String database = config.getDatabase();
+    PinotCatalog catalog = new PinotCatalog(config.getTableCache(), database);
     CalciteSchema rootSchema = CalciteSchema.createRootSchema(false, false, 
database, catalog);
     _config = 
Frameworks.newConfigBuilder().traitDefs().operatorTable(PinotOperatorTable.instance())
         
.defaultSchema(rootSchema.plus()).sqlToRelConverterConfig(PinotRuleUtils.PINOT_SQL_TO_REL_CONFIG).build();
     _catalogReader = new CalciteCatalogReader(rootSchema, List.of(database), 
_typeFactory, CONNECTION_CONFIG);
     _optProgram = getOptProgram();
-    _traitProgram = getTraitProgram();
-    _tableCache = tableCache;
-    _workerManager = workerManager;
   }
 
-  private PlannerContext getPlannerContext() {
-    return new PlannerContext(_config, _catalogReader, _typeFactory, 
_optProgram, _traitProgram);
+  public QueryEnvironment(String database, TableCache tableCache, @Nullable 
WorkerManager workerManager) {
+    this(configBuilder()
+        .database(database)
+        .tableCache(tableCache)
+        .workerManager(workerManager)
+        .build());
+  }
+
+  /**
+   * Returns a planner context that can be used to either parse, explain or 
execute a query.
+   */
+  private PlannerContext getPlannerContext(SqlNodeAndOptions 
sqlNodeAndOptions) {
+    WorkerManager workerManager = getWorkerManager(sqlNodeAndOptions);
+    HepProgram traitProgram = getTraitProgram(workerManager);
+    return new PlannerContext(_config, _catalogReader, _typeFactory, 
_optProgram, traitProgram);
+  }
+
+  @Nullable
+  private WorkerManager getWorkerManager(SqlNodeAndOptions sqlNodeAndOptions) {
+    String useImplicitColocatedOptionValue = sqlNodeAndOptions.getOptions()

Review Comment:
   Update the variable name



##########
pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java:
##########
@@ -100,26 +111,64 @@ public class QueryEnvironment {
   private final FrameworkConfig _config;
   private final CalciteCatalogReader _catalogReader;
   private final HepProgram _optProgram;
-  private final HepProgram _traitProgram;
-
-  // Pinot extensions
-  private final TableCache _tableCache;
-  private final WorkerManager _workerManager;
+  private final Config _envConfig;
 
-  public QueryEnvironment(String database, TableCache tableCache, @Nullable 
WorkerManager workerManager) {
-    PinotCatalog catalog = new PinotCatalog(tableCache, database);
+  public QueryEnvironment(Config config) {
+    _envConfig = config;
+    String database = config.getDatabase();
+    PinotCatalog catalog = new PinotCatalog(config.getTableCache(), database);
     CalciteSchema rootSchema = CalciteSchema.createRootSchema(false, false, 
database, catalog);
     _config = 
Frameworks.newConfigBuilder().traitDefs().operatorTable(PinotOperatorTable.instance())
         
.defaultSchema(rootSchema.plus()).sqlToRelConverterConfig(PinotRuleUtils.PINOT_SQL_TO_REL_CONFIG).build();
     _catalogReader = new CalciteCatalogReader(rootSchema, List.of(database), 
_typeFactory, CONNECTION_CONFIG);
     _optProgram = getOptProgram();
-    _traitProgram = getTraitProgram();
-    _tableCache = tableCache;
-    _workerManager = workerManager;
   }
 
-  private PlannerContext getPlannerContext() {
-    return new PlannerContext(_config, _catalogReader, _typeFactory, 
_optProgram, _traitProgram);
+  public QueryEnvironment(String database, TableCache tableCache, @Nullable 
WorkerManager workerManager) {
+    this(configBuilder()
+        .database(database)
+        .tableCache(tableCache)
+        .workerManager(workerManager)
+        .build());
+  }
+
+  /**
+   * Returns a planner context that can be used to either parse, explain or 
execute a query.
+   */
+  private PlannerContext getPlannerContext(SqlNodeAndOptions 
sqlNodeAndOptions) {
+    WorkerManager workerManager = getWorkerManager(sqlNodeAndOptions);
+    HepProgram traitProgram = getTraitProgram(workerManager);
+    return new PlannerContext(_config, _catalogReader, _typeFactory, 
_optProgram, traitProgram);
+  }
+
+  @Nullable
+  private WorkerManager getWorkerManager(SqlNodeAndOptions sqlNodeAndOptions) {
+    String useImplicitColocatedOptionValue = sqlNodeAndOptions.getOptions()
+        
.get(CommonConstants.Broker.Request.QueryOptionKey.IMPLICIT_COLOCATE_JOIN);
+    WorkerManager workerManager = _envConfig.getWorkerManager();
+
+    if (useImplicitColocatedOptionValue == null) {
+      return _envConfig.useImplicitColocatedByDefault() ? workerManager : null;
+    }
+    switch (useImplicitColocatedOptionValue.toLowerCase()) {
+      case "true":
+        Objects.requireNonNull(workerManager, "WorkerManager is required for 
implicit colocated join");

Review Comment:
   Here the worker manager is read from the environment config, so whether it 
is null is not decided by this config, but purely based on the code path that 
initialized the `QueryEnvironment`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to