This is an automated email from the ASF dual-hosted git repository. ankitsultana pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new acbc35b463 [multistage] Add broker config defaults for physical optimizer and lite mode settings (#16240) acbc35b463 is described below commit acbc35b463d09ee164899730dc0b16ec8641a342 Author: Shaurya Chaturvedi <shauryach...@gmail.com> AuthorDate: Mon Jun 30 19:34:36 2025 -0700 [multistage] Add broker config defaults for physical optimizer and lite mode settings (#16240) --- .../MultiStageBrokerRequestHandler.java | 20 +++++++ .../common/utils/config/QueryOptionsUtils.java | 25 +++++--- .../org/apache/pinot/query/QueryEnvironment.java | 67 +++++++++++++++++++++- .../query/context/PhysicalPlannerContext.java | 30 +++++++++- .../opt/rules/LeafStageWorkerAssignmentRule.java | 2 +- .../v2/opt/rules/LiteModeSortInsertRule.java | 15 +++-- .../v2/opt/rules/LiteModeWorkerAssignmentRule.java | 3 +- .../apache/pinot/spi/utils/CommonConstants.java | 38 ++++++++++++ 8 files changed, 176 insertions(+), 24 deletions(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java index 0ba41ebaf2..b288a8e95e 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java @@ -374,6 +374,21 @@ public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler { boolean defaultEnableDynamicFilteringSemiJoin = _config.getProperty( CommonConstants.Broker.CONFIG_OF_BROKER_ENABLE_DYNAMIC_FILTERING_SEMI_JOIN, CommonConstants.Broker.DEFAULT_ENABLE_DYNAMIC_FILTERING_SEMI_JOIN); + boolean defaultUsePhysicalOptimizer = _config.getProperty( + CommonConstants.Broker.CONFIG_OF_USE_PHYSICAL_OPTIMIZER, + CommonConstants.Broker.DEFAULT_USE_PHYSICAL_OPTIMIZER); + boolean defaultUseLiteMode = _config.getProperty( + CommonConstants.Broker.CONFIG_OF_USE_LITE_MODE, + CommonConstants.Broker.DEFAULT_USE_LITE_MODE); + boolean defaultRunInBroker = _config.getProperty( + CommonConstants.Broker.CONFIG_OF_RUN_IN_BROKER, + CommonConstants.Broker.DEFAULT_RUN_IN_BROKER); + boolean defaultUseBrokerPruning = _config.getProperty( + CommonConstants.Broker.CONFIG_OF_USE_BROKER_PRUNING, + CommonConstants.Broker.DEFAULT_USE_BROKER_PRUNING); + int defaultLiteModeServerStageLimit = _config.getProperty( + CommonConstants.Broker.CONFIG_OF_LITE_MODE_LEAF_STAGE_LIMIT, + CommonConstants.Broker.DEFAULT_LITE_MODE_LEAF_STAGE_LIMIT); boolean caseSensitive = !_config.getProperty( CommonConstants.Helix.ENABLE_CASE_INSENSITIVE_KEY, CommonConstants.Helix.DEFAULT_ENABLE_CASE_INSENSITIVE @@ -389,6 +404,11 @@ public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler { .defaultUseLeafServerForIntermediateStage(defaultUseLeafServerForIntermediateStage) .defaultEnableGroupTrim(defaultEnableGroupTrim) .defaultEnableDynamicFilteringSemiJoin(defaultEnableDynamicFilteringSemiJoin) + .defaultUsePhysicalOptimizer(defaultUsePhysicalOptimizer) + .defaultUseLiteMode(defaultUseLiteMode) + .defaultRunInBroker(defaultRunInBroker) + .defaultUseBrokerPruning(defaultUseBrokerPruning) + .defaultLiteModeServerStageLimit(defaultLiteModeServerStageLimit) .build(); } diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java index d562d0c85d..1252b29cee 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java @@ -431,20 +431,29 @@ public class QueryOptionsUtils { return option != null ? Boolean.parseBoolean(option) : defaultValue; } - public static boolean isUsePhysicalOptimizer(Map<String, String> queryOptions) { - return Boolean.parseBoolean(queryOptions.getOrDefault(QueryOptionKey.USE_PHYSICAL_OPTIMIZER, "false")); + public static boolean isUsePhysicalOptimizer(Map<String, String> queryOptions, boolean defaultValue) { + String option = queryOptions.get(QueryOptionKey.USE_PHYSICAL_OPTIMIZER); + return option != null ? Boolean.parseBoolean(option) : defaultValue; + } + + public static boolean isUseLiteMode(Map<String, String> queryOptions, boolean defaultValue) { + String option = queryOptions.get(QueryOptionKey.USE_LITE_MODE); + return option != null ? Boolean.parseBoolean(option) : defaultValue; } - public static boolean isUseLiteMode(Map<String, String> queryOptions) { - return Boolean.parseBoolean(queryOptions.getOrDefault(QueryOptionKey.USE_LITE_MODE, "false")); + public static boolean isUseBrokerPruning(Map<String, String> queryOptions, boolean defaultValue) { + String option = queryOptions.get(QueryOptionKey.USE_BROKER_PRUNING); + return option != null ? Boolean.parseBoolean(option) : defaultValue; } - public static boolean isUseBrokerPruning(Map<String, String> queryOptions) { - return Boolean.parseBoolean(queryOptions.getOrDefault(QueryOptionKey.USE_BROKER_PRUNING, "true")); + public static boolean isRunInBroker(Map<String, String> queryOptions, boolean defaultValue) { + String option = queryOptions.get(QueryOptionKey.RUN_IN_BROKER); + return option != null ? Boolean.parseBoolean(option) : defaultValue; } - public static boolean isRunInBroker(Map<String, String> queryOptions) { - return Boolean.parseBoolean(queryOptions.getOrDefault(QueryOptionKey.RUN_IN_BROKER, "true")); + public static int getLiteModeServerStageLimit(Map<String, String> queryOptions, int defaultValue) { + String option = queryOptions.get(QueryOptionKey.LITE_MODE_SERVER_STAGE_LIMIT); + return option != null ? checkedParseIntPositive(QueryOptionKey.LITE_MODE_SERVER_STAGE_LIMIT, option) : defaultValue; } @Nullable diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java index f8adf7fb15..68a73472d2 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java @@ -178,7 +178,8 @@ public class QueryEnvironment { optProgram = getOptProgram(skipRuleSet); } } - boolean usePhysicalOptimizer = QueryOptionsUtils.isUsePhysicalOptimizer(sqlNodeAndOptions.getOptions()); + boolean usePhysicalOptimizer = QueryOptionsUtils.isUsePhysicalOptimizer(sqlNodeAndOptions.getOptions(), + _envConfig.defaultUsePhysicalOptimizer()); HepProgram traitProgram = getTraitProgram(workerManager, _envConfig, usePhysicalOptimizer); SqlExplainFormat format = SqlExplainFormat.DOT; if (sqlNodeAndOptions.getSqlNode().getKind().equals(SqlKind.EXPLAIN)) { @@ -192,7 +193,9 @@ public class QueryEnvironment { workerManager = _envConfig.getWorkerManager(); physicalPlannerContext = new PhysicalPlannerContext(workerManager.getRoutingManager(), workerManager.getHostName(), workerManager.getPort(), _envConfig.getRequestId(), - workerManager.getInstanceId(), sqlNodeAndOptions.getOptions()); + workerManager.getInstanceId(), sqlNodeAndOptions.getOptions(), + _envConfig.defaultUseLiteMode(), _envConfig.defaultRunInBroker(), _envConfig.defaultUseBrokerPruning(), + _envConfig.defaultLiteModeServerStageLimit()); } return new PlannerContext(_config, _catalogReader, _typeFactory, optProgram, traitProgram, sqlNodeAndOptions.getOptions(), _envConfig, format, physicalPlannerContext); @@ -694,6 +697,66 @@ public class QueryEnvironment { return CommonConstants.Broker.DEFAULT_ENABLE_DYNAMIC_FILTERING_SEMI_JOIN; } + /** + * Whether to use physical optimizer by default. + * + * This is treated as the default value for the broker and it is expected to be obtained from a Pinot configuration. + * This default value can be always overridden at query level by the query option + * {@link CommonConstants.Broker.Request.QueryOptionKey#USE_PHYSICAL_OPTIMIZER}. + */ + @Value.Default + default boolean defaultUsePhysicalOptimizer() { + return CommonConstants.Broker.DEFAULT_USE_PHYSICAL_OPTIMIZER; + } + + /** + * Whether to use lite mode by default. + * + * This is treated as the default value for the broker and it is expected to be obtained from a Pinot configuration. + * This default value can be always overridden at query level by the query option + * {@link CommonConstants.Broker.Request.QueryOptionKey#USE_LITE_MODE}. + */ + @Value.Default + default boolean defaultUseLiteMode() { + return CommonConstants.Broker.DEFAULT_USE_LITE_MODE; + } + + /** + * Whether to run in broker by default. + * + * This is treated as the default value for the broker and it is expected to be obtained from a Pinot configuration. + * This default value can be always overridden at query level by the query option + * {@link CommonConstants.Broker.Request.QueryOptionKey#RUN_IN_BROKER}. + */ + @Value.Default + default boolean defaultRunInBroker() { + return CommonConstants.Broker.DEFAULT_RUN_IN_BROKER; + } + + /** + * Whether to use broker pruning by default. + * + * This is treated as the default value for the broker and it is expected to be obtained from a Pinot configuration. + * This default value can be always overridden at query level by the query option + * {@link CommonConstants.Broker.Request.QueryOptionKey#USE_BROKER_PRUNING}. + */ + @Value.Default + default boolean defaultUseBrokerPruning() { + return CommonConstants.Broker.DEFAULT_USE_BROKER_PRUNING; + } + + /** + * Default server stage limit for lite mode queries. + * + * This is treated as the default value for the broker and it is expected to be obtained from a Pinot configuration. + * This default value can be always overridden at query level by the query option + * {@link CommonConstants.Broker.Request.QueryOptionKey#LITE_MODE_SERVER_STAGE_LIMIT}. + */ + @Value.Default + default int defaultLiteModeServerStageLimit() { + return CommonConstants.Broker.DEFAULT_LITE_MODE_LEAF_STAGE_LIMIT; + } + /** * Returns the worker manager. * diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/context/PhysicalPlannerContext.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/context/PhysicalPlannerContext.java index bb6ee79a59..b6c9d2e2e8 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/context/PhysicalPlannerContext.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/context/PhysicalPlannerContext.java @@ -25,6 +25,7 @@ import javax.annotation.Nullable; import org.apache.pinot.common.utils.config.QueryOptionsUtils; import org.apache.pinot.core.routing.RoutingManager; import org.apache.pinot.query.routing.QueryServerInstance; +import org.apache.pinot.spi.utils.CommonConstants; /** @@ -58,6 +59,9 @@ public class PhysicalPlannerContext { private final String _instanceId; private final Map<String, String> _queryOptions; private final boolean _useLiteMode; + private final boolean _runInBroker; + private final boolean _useBrokerPruning; + private final int _liteModeServerStageLimit; /** * Used by controller when it needs to extract table names from the query. @@ -70,18 +74,26 @@ public class PhysicalPlannerContext { _requestId = 0; _instanceId = ""; _queryOptions = Map.of(); - _useLiteMode = false; + _useLiteMode = CommonConstants.Broker.DEFAULT_USE_LITE_MODE; + _runInBroker = CommonConstants.Broker.DEFAULT_RUN_IN_BROKER; + _useBrokerPruning = CommonConstants.Broker.DEFAULT_USE_BROKER_PRUNING; + _liteModeServerStageLimit = CommonConstants.Broker.DEFAULT_LITE_MODE_LEAF_STAGE_LIMIT; } public PhysicalPlannerContext(RoutingManager routingManager, String hostName, int port, long requestId, - String instanceId, Map<String, String> queryOptions) { + String instanceId, Map<String, String> queryOptions, boolean defaultUseLiteMode, boolean defaultRunInBroker, + boolean defaultUseBrokerPruning, int defaultLiteModeServerStageLimit) { _routingManager = routingManager; _hostName = hostName; _port = port; _requestId = requestId; _instanceId = instanceId; _queryOptions = queryOptions == null ? Map.of() : queryOptions; - _useLiteMode = QueryOptionsUtils.isUseLiteMode(_queryOptions); + _useLiteMode = QueryOptionsUtils.isUseLiteMode(_queryOptions, defaultUseLiteMode); + _runInBroker = QueryOptionsUtils.isRunInBroker(_queryOptions, defaultRunInBroker); + _useBrokerPruning = QueryOptionsUtils.isUseBrokerPruning(_queryOptions, defaultUseBrokerPruning); + _liteModeServerStageLimit = QueryOptionsUtils.getLiteModeServerStageLimit(_queryOptions, + defaultLiteModeServerStageLimit); _instanceIdToQueryServerInstance.put(instanceId, getBrokerQueryServerInstance()); } @@ -122,6 +134,18 @@ public class PhysicalPlannerContext { return _useLiteMode; } + public boolean isRunInBroker() { + return _runInBroker; + } + + public boolean isUseBrokerPruning() { + return _useBrokerPruning; + } + + public int getLiteModeServerStageLimit() { + return _liteModeServerStageLimit; + } + private QueryServerInstance getBrokerQueryServerInstance() { return new QueryServerInstance(_instanceId, _hostName, _port, _port); } diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LeafStageWorkerAssignmentRule.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LeafStageWorkerAssignmentRule.java index 0993766049..09008e4144 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LeafStageWorkerAssignmentRule.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LeafStageWorkerAssignmentRule.java @@ -119,7 +119,7 @@ public class LeafStageWorkerAssignmentRule extends PRelOptRule { leafStageRoot = leafStageRoot == null ? call._currentNode : leafStageRoot; String tableName = getActualTableName((TableScan) call._currentNode.unwrap()); PinotQuery pinotQuery = LeafStageToPinotQuery.createPinotQueryForRouting(tableName, leafStageRoot.unwrap(), - !QueryOptionsUtils.isUseBrokerPruning(_physicalPlannerContext.getQueryOptions())); + !_physicalPlannerContext.isUseBrokerPruning()); return assignTableScan((PhysicalTableScan) call._currentNode, _physicalPlannerContext.getRequestId(), pinotQuery); } diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LiteModeSortInsertRule.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LiteModeSortInsertRule.java index 860833e87b..b4d102a45d 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LiteModeSortInsertRule.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LiteModeSortInsertRule.java @@ -50,8 +50,6 @@ import org.apache.pinot.query.type.TypeFactory; public class LiteModeSortInsertRule extends PRelOptRule { private static final TypeFactory TYPE_FACTORY = new TypeFactory(); private static final RexBuilder REX_BUILDER = new RexBuilder(TYPE_FACTORY); - // TODO: This should be configurable at broker and via SET statements. - private static final int DEFAULT_SERVER_STAGE_LIMIT = 100_000; private final PhysicalPlannerContext _context; public LiteModeSortInsertRule(PhysicalPlannerContext context) { @@ -65,7 +63,8 @@ public class LiteModeSortInsertRule extends PRelOptRule { @Override public PRelNode onMatch(PRelOptRuleCall call) { - RexNode newFetch = REX_BUILDER.makeLiteral(DEFAULT_SERVER_STAGE_LIMIT, TYPE_FACTORY.createSqlType( + int serverStageLimit = _context.getLiteModeServerStageLimit(); + RexNode newFetch = REX_BUILDER.makeLiteral(serverStageLimit, TYPE_FACTORY.createSqlType( SqlTypeName.INTEGER)); if (call._currentNode instanceof PhysicalSort) { // When current node is a Sort, if it has a fetch already, verify it is less than the hard limit. Otherwise, @@ -73,9 +72,9 @@ public class LiteModeSortInsertRule extends PRelOptRule { PhysicalSort sort = (PhysicalSort) call._currentNode; if (sort.fetch != null) { int currentFetch = RexExpressionUtils.getValueAsInt(sort.fetch); - Preconditions.checkState(currentFetch <= DEFAULT_SERVER_STAGE_LIMIT, + Preconditions.checkState(currentFetch <= serverStageLimit, "Attempted to stream %s records from server which exceed limit %s", currentFetch, - DEFAULT_SERVER_STAGE_LIMIT); + serverStageLimit); return sort; } return sort.withFetch(newFetch); @@ -83,9 +82,9 @@ public class LiteModeSortInsertRule extends PRelOptRule { if (call._currentNode instanceof PhysicalAggregate) { // When current node is aggregate, add the limit to the Aggregate itself and skip adding the Sort. PhysicalAggregate aggregate = (PhysicalAggregate) call._currentNode; - Preconditions.checkState(aggregate.getLimit() <= DEFAULT_SERVER_STAGE_LIMIT, - "Group trim limit={} exceeds server stage limit={}", aggregate.getLimit(), DEFAULT_SERVER_STAGE_LIMIT); - int limit = aggregate.getLimit() > 0 ? aggregate.getLimit() : DEFAULT_SERVER_STAGE_LIMIT; + Preconditions.checkState(aggregate.getLimit() <= serverStageLimit, + "Group trim limit={} exceeds server stage limit={}", aggregate.getLimit(), serverStageLimit); + int limit = aggregate.getLimit() > 0 ? aggregate.getLimit() : serverStageLimit; return aggregate.withLimit(limit); } PRelNode input = call._currentNode; diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LiteModeWorkerAssignmentRule.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LiteModeWorkerAssignmentRule.java index 5b96eadea7..2ebaa7a837 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LiteModeWorkerAssignmentRule.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LiteModeWorkerAssignmentRule.java @@ -33,7 +33,6 @@ import org.apache.calcite.rel.RelCollation; import org.apache.calcite.rel.RelDistribution; import org.apache.calcite.rel.core.Sort; import org.apache.pinot.calcite.rel.traits.PinotExecStrategyTrait; -import org.apache.pinot.common.utils.config.QueryOptionsUtils; import org.apache.pinot.query.context.PhysicalPlannerContext; import org.apache.pinot.query.planner.physical.v2.ExchangeStrategy; import org.apache.pinot.query.planner.physical.v2.PRelNode; @@ -57,7 +56,7 @@ public class LiteModeWorkerAssignmentRule implements PRelNodeTransformer { public LiteModeWorkerAssignmentRule(PhysicalPlannerContext context) { _context = context; - _runInBroker = QueryOptionsUtils.isRunInBroker(context.getQueryOptions()); + _runInBroker = context.isRunInBroker(); } @Override diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java index 9fe08090ed..4d202b0508 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java @@ -492,6 +492,42 @@ public class CommonConstants { "pinot.broker.enable.dynamic.filtering.semijoin"; public static final boolean DEFAULT_ENABLE_DYNAMIC_FILTERING_SEMI_JOIN = true; + /** + * Whether to use physical optimizer by default. + * This value can always be overridden by {@link Request.QueryOptionKey#USE_PHYSICAL_OPTIMIZER} query option + */ + public static final String CONFIG_OF_USE_PHYSICAL_OPTIMIZER = "pinot.broker.multistage.use.physical.optimizer"; + public static final boolean DEFAULT_USE_PHYSICAL_OPTIMIZER = false; + + /** + * Whether to use lite mode by default. + * This value can always be overridden by {@link Request.QueryOptionKey#USE_LITE_MODE} query option + */ + public static final String CONFIG_OF_USE_LITE_MODE = "pinot.broker.multistage.use.lite.mode"; + public static final boolean DEFAULT_USE_LITE_MODE = false; + + /** + * Whether to run in broker by default. + * This value can always be overridden by {@link Request.QueryOptionKey#RUN_IN_BROKER} query option + */ + public static final String CONFIG_OF_RUN_IN_BROKER = "pinot.broker.multistage.run.in.broker"; + public static final boolean DEFAULT_RUN_IN_BROKER = true; + + /** + * Whether to use broker pruning by default. + * This value can always be overridden by {@link Request.QueryOptionKey#USE_BROKER_PRUNING} query option + */ + public static final String CONFIG_OF_USE_BROKER_PRUNING = "pinot.broker.multistage.use.broker.pruning"; + public static final boolean DEFAULT_USE_BROKER_PRUNING = true; + + /** + * Default server stage limit for lite mode queries. + * This value can always be overridden by {@link Request.QueryOptionKey#LITE_MODE_SERVER_STAGE_LIMIT} query option + */ + public static final String CONFIG_OF_LITE_MODE_LEAF_STAGE_LIMIT = + "pinot.broker.multistage.lite.mode.leaf.stage.limit"; + public static final int DEFAULT_LITE_MODE_LEAF_STAGE_LIMIT = 100_000; + // When the server instance's pool field is null or the pool contains multi distinguished group value, the broker // would set the pool to -1 in the routing table for that server. public static final int FALLBACK_POOL_ID = -1; @@ -682,6 +718,8 @@ public class CommonConstants { // that was supposed to be in partition-1. public static final String INFER_REALTIME_SEGMENT_PARTITION = "inferRealtimeSegmentPartition"; public static final String USE_LITE_MODE = "useLiteMode"; + // Server stage limit for lite mode queries. + public static final String LITE_MODE_SERVER_STAGE_LIMIT = "liteModeServerStageLimit"; // Used by the MSE Engine to determine whether to use the broker pruning logic. Only supported by the // new MSE query optimizer. // TODO(mse-physical): Consider removing this query option and making this the default, since there's already --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org