Jackie-Jiang commented on code in PR #13943: URL: https://github.com/apache/pinot/pull/13943#discussion_r1801671644
########## pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java: ########## @@ -398,8 +447,44 @@ private static HepProgram getTraitProgram() { } // apply RelDistribution trait to all nodes + if (workerManager != null) { + hepProgramBuilder.addRuleInstance(PinotImplicitTableHintRule.withWorkerManager(workerManager)); + } hepProgramBuilder.addRuleInstance(PinotRelDistributionTraitRule.INSTANCE); return hepProgramBuilder.build(); } + + public static ImmutableQueryEnvironment.Config.Builder configBuilder() { + return ImmutableQueryEnvironment.Config.builder(); + } + + @Value.Immutable + public interface Config { + String getDatabase(); + + @Nullable // In theory nullable only in tests. We should fix LiteralOnlyBrokerRequestTest to not need this. + TableCache getTableCache(); + + /** + * Whether to use implicit colocated join by default. + * + * This is treated as the default value for the broker and it is expected to be obtained from a Pinot configuration. + * This default value can be always overridden at query level by the query option + * {@link CommonConstants.Broker.Request.QueryOptionKey#AUTO_PARTITION_HINT}. + */ + @Value.Default + default boolean useImplicitColocatedByDefault() { Review Comment: Update method name and javadoc ########## pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java: ########## @@ -128,7 +128,16 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO Long timeoutMsFromQueryOption = QueryOptionsUtils.getTimeoutMs(queryOptions); queryTimeoutMs = timeoutMsFromQueryOption != null ? timeoutMsFromQueryOption : _brokerTimeoutMs; database = DatabaseUtils.extractDatabaseFromQueryRequest(queryOptions, httpHeaders); - QueryEnvironment queryEnvironment = new QueryEnvironment(database, _tableCache, _workerManager); + boolean useImplicitColocatedByDefault = _config.getProperty(CommonConstants.Broker.AUTO_PARTITION_HINT, Review Comment: Please update the variable name ########## pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java: ########## @@ -343,7 +343,9 @@ public static class Broker { public static final String CONFIG_OF_ENABLE_PARTITION_METADATA_MANAGER = "pinot.broker.enable.partition.metadata.manager"; - public static final boolean DEFAULT_ENABLE_PARTITION_METADATA_MANAGER = false; + public static final boolean DEFAULT_ENABLE_PARTITION_METADATA_MANAGER = true; + public static final String AUTO_PARTITION_HINT = "pinot.broker.multistage.auto.partition.hint"; Review Comment: To keep it consistent, name it `CONFIG_OF_AUTO_PARTITION_HINT`. For other configs, they usually start with a verb, so I personally prefer `autoApplyPartitionHint`. This is optional, but let's keep the default value name and config name consistent ########## pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/hint/PinotHintStrategyTable.java: ########## @@ -108,4 +110,25 @@ public static String getHintOption(List<RelHint> hintList, String hintName, Stri } return null; } + + /** + * Get the first hint that has the specified name. + */ + public static RelHint findFirst(Hintable hintable, String hintName) { Review Comment: Return is also `Nullable`. Can we keep the utils consistent naming and calling convention? I'd naturally put this next to `containsHint()` and call it `getHint()` ########## pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java: ########## @@ -100,26 +111,64 @@ public class QueryEnvironment { private final FrameworkConfig _config; private final CalciteCatalogReader _catalogReader; private final HepProgram _optProgram; - private final HepProgram _traitProgram; - - // Pinot extensions - private final TableCache _tableCache; - private final WorkerManager _workerManager; + private final Config _envConfig; - public QueryEnvironment(String database, TableCache tableCache, @Nullable WorkerManager workerManager) { - PinotCatalog catalog = new PinotCatalog(tableCache, database); + public QueryEnvironment(Config config) { + _envConfig = config; + String database = config.getDatabase(); + PinotCatalog catalog = new PinotCatalog(config.getTableCache(), database); CalciteSchema rootSchema = CalciteSchema.createRootSchema(false, false, database, catalog); _config = Frameworks.newConfigBuilder().traitDefs().operatorTable(PinotOperatorTable.instance()) .defaultSchema(rootSchema.plus()).sqlToRelConverterConfig(PinotRuleUtils.PINOT_SQL_TO_REL_CONFIG).build(); _catalogReader = new CalciteCatalogReader(rootSchema, List.of(database), _typeFactory, CONNECTION_CONFIG); _optProgram = getOptProgram(); - _traitProgram = getTraitProgram(); - _tableCache = tableCache; - _workerManager = workerManager; } - private PlannerContext getPlannerContext() { - return new PlannerContext(_config, _catalogReader, _typeFactory, _optProgram, _traitProgram); + public QueryEnvironment(String database, TableCache tableCache, @Nullable WorkerManager workerManager) { + this(configBuilder() + .database(database) + .tableCache(tableCache) + .workerManager(workerManager) + .build()); + } + + /** + * Returns a planner context that can be used to either parse, explain or execute a query. + */ + private PlannerContext getPlannerContext(SqlNodeAndOptions sqlNodeAndOptions) { + WorkerManager workerManager = getWorkerManager(sqlNodeAndOptions); + HepProgram traitProgram = getTraitProgram(workerManager); + return new PlannerContext(_config, _catalogReader, _typeFactory, _optProgram, traitProgram); + } + + @Nullable + private WorkerManager getWorkerManager(SqlNodeAndOptions sqlNodeAndOptions) { + String useImplicitColocatedOptionValue = sqlNodeAndOptions.getOptions() Review Comment: Update the variable name ########## pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java: ########## @@ -100,26 +111,64 @@ public class QueryEnvironment { private final FrameworkConfig _config; private final CalciteCatalogReader _catalogReader; private final HepProgram _optProgram; - private final HepProgram _traitProgram; - - // Pinot extensions - private final TableCache _tableCache; - private final WorkerManager _workerManager; + private final Config _envConfig; - public QueryEnvironment(String database, TableCache tableCache, @Nullable WorkerManager workerManager) { - PinotCatalog catalog = new PinotCatalog(tableCache, database); + public QueryEnvironment(Config config) { + _envConfig = config; + String database = config.getDatabase(); + PinotCatalog catalog = new PinotCatalog(config.getTableCache(), database); CalciteSchema rootSchema = CalciteSchema.createRootSchema(false, false, database, catalog); _config = Frameworks.newConfigBuilder().traitDefs().operatorTable(PinotOperatorTable.instance()) .defaultSchema(rootSchema.plus()).sqlToRelConverterConfig(PinotRuleUtils.PINOT_SQL_TO_REL_CONFIG).build(); _catalogReader = new CalciteCatalogReader(rootSchema, List.of(database), _typeFactory, CONNECTION_CONFIG); _optProgram = getOptProgram(); - _traitProgram = getTraitProgram(); - _tableCache = tableCache; - _workerManager = workerManager; } - private PlannerContext getPlannerContext() { - return new PlannerContext(_config, _catalogReader, _typeFactory, _optProgram, _traitProgram); + public QueryEnvironment(String database, TableCache tableCache, @Nullable WorkerManager workerManager) { + this(configBuilder() + .database(database) + .tableCache(tableCache) + .workerManager(workerManager) + .build()); + } + + /** + * Returns a planner context that can be used to either parse, explain or execute a query. + */ + private PlannerContext getPlannerContext(SqlNodeAndOptions sqlNodeAndOptions) { + WorkerManager workerManager = getWorkerManager(sqlNodeAndOptions); + HepProgram traitProgram = getTraitProgram(workerManager); + return new PlannerContext(_config, _catalogReader, _typeFactory, _optProgram, traitProgram); + } + + @Nullable + private WorkerManager getWorkerManager(SqlNodeAndOptions sqlNodeAndOptions) { + String useImplicitColocatedOptionValue = sqlNodeAndOptions.getOptions() + .get(CommonConstants.Broker.Request.QueryOptionKey.IMPLICIT_COLOCATE_JOIN); + WorkerManager workerManager = _envConfig.getWorkerManager(); + + if (useImplicitColocatedOptionValue == null) { + return _envConfig.useImplicitColocatedByDefault() ? workerManager : null; + } + switch (useImplicitColocatedOptionValue.toLowerCase()) { + case "true": + Objects.requireNonNull(workerManager, "WorkerManager is required for implicit colocated join"); Review Comment: Here the worker manager is read from the environment config, so whether it is null is not decided by this config, but purely based on the code path that initialized the `QueryEnvironment` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org