This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 330f0a511c Add hooks to service starters to apply custom cluster and instance configs (#15595) 330f0a511c is described below commit 330f0a511c476b4d829c90cb5ada71c8919c9cac Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com> AuthorDate: Mon Apr 21 15:11:09 2025 -0600 Add hooks to service starters to apply custom cluster and instance configs (#15595) --- .../broker/broker/helix/BaseBrokerStarter.java | 10 ++- .../pinot/controller/BaseControllerStarter.java | 48 ++++++++---- .../helix/core/util/HelixSetupUtils.java | 91 +++++++++++++++------- .../org/apache/pinot/minion/BaseMinionStarter.java | 35 +++++---- .../server/starter/helix/BaseServerStarter.java | 10 ++- .../apache/pinot/spi/utils/CommonConstants.java | 1 + 6 files changed, 132 insertions(+), 63 deletions(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java index 464e7dc48e..12448e26aa 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java @@ -162,10 +162,10 @@ public abstract class BaseBrokerStarter implements ServiceStartable { _zkServers = brokerConf.getProperty(Helix.CONFIG_OF_ZOOKEEPR_SERVER).replaceAll("\\s+", ""); _clusterName = brokerConf.getProperty(Helix.CONFIG_OF_CLUSTER_NAME); ServiceStartableUtils.applyClusterConfig(_brokerConf, _zkServers, _clusterName, ServiceRole.BROKER); + applyCustomConfigs(brokerConf); - PinotInsecureMode.setPinotInInsecureMode(Boolean.valueOf( - _brokerConf.getProperty(CommonConstants.CONFIG_OF_PINOT_INSECURE_MODE, - CommonConstants.DEFAULT_PINOT_INSECURE_MODE))); + PinotInsecureMode.setPinotInInsecureMode( + _brokerConf.getProperty(CommonConstants.CONFIG_OF_PINOT_INSECURE_MODE, false)); if (_brokerConf.getProperty(MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_PORT, MultiStageQueryRunner.DEFAULT_QUERY_RUNNER_PORT) == 0) { @@ -203,6 +203,10 @@ public abstract class BaseBrokerStarter implements ServiceStartable { ContinuousJfrStarter.init(_brokerConf); } + /// Can be overridden to apply custom configs to the broker conf. + protected void applyCustomConfigs(PinotConfiguration brokerConf) { + } + private void setupHelixSystemProperties() { // NOTE: Helix will disconnect the manager and disable the instance if it detects flapping (too frequent disconnect // from ZooKeeper). Setting flapping time window to a small value can avoid this from happening. Helix ignores the diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java index bd77104c45..8722f80487 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java @@ -48,6 +48,7 @@ import org.apache.helix.HelixManagerFactory; import org.apache.helix.InstanceType; import org.apache.helix.SystemPropertyKeys; import org.apache.helix.api.listeners.ControllerChangeListener; +import org.apache.helix.manager.zk.ZKHelixManager; import org.apache.helix.model.ClusterConstraints; import org.apache.helix.model.ConstraintItem; import org.apache.helix.model.InstanceConfig; @@ -137,6 +138,7 @@ import org.apache.pinot.spi.plugin.PluginManager; import org.apache.pinot.spi.services.ServiceRole; import org.apache.pinot.spi.services.ServiceStartable; import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.pinot.spi.utils.CommonConstants.Helix; import org.apache.pinot.spi.utils.InstanceTypeUtils; import org.apache.pinot.spi.utils.NetUtils; import org.apache.pinot.sql.parsers.rewriter.QueryRewriterFactory; @@ -213,16 +215,19 @@ public abstract class BaseControllerStarter implements ServiceStartable { _config = new ControllerConf(pinotConfiguration.toMap()); _helixZkURL = HelixConfig.getAbsoluteZkPathForHelix(_config.getZkStr()); _helixClusterName = _config.getHelixClusterName(); + _controllerMode = _config.getControllerMode(); + if (_controllerMode == ControllerConf.ControllerMode.DUAL + || _controllerMode == ControllerConf.ControllerMode.HELIX_ONLY) { + HelixSetupUtils.setupHelixClusterWithDefaultConfigs(_helixZkURL, _helixClusterName, getDefaultClusterConfigs()); + } ServiceStartableUtils.applyClusterConfig(_config, _helixZkURL, _helixClusterName, ServiceRole.CONTROLLER); + applyCustomConfigs(_config); - PinotInsecureMode.setPinotInInsecureMode(Boolean.valueOf( - _config.getProperty(CommonConstants.CONFIG_OF_PINOT_INSECURE_MODE, - CommonConstants.DEFAULT_PINOT_INSECURE_MODE))); + PinotInsecureMode.setPinotInInsecureMode(_config.getProperty(CommonConstants.CONFIG_OF_PINOT_INSECURE_MODE, false)); setupHelixSystemProperties(); IdealStateGroupCommit.setMinNumCharsInISToTurnOnCompression(_config.getMinNumCharsInISToTurnOnCompression()); _listenerConfigs = ListenerConfigUtil.buildControllerConfigs(_config); - _controllerMode = _config.getControllerMode(); inferHostnameIfNeeded(_config); _hostname = _config.getControllerHost(); _port = _listenerConfigs.get(0).getPort(); @@ -236,7 +241,7 @@ public abstract class BaseControllerStarter implements ServiceStartable { if (_helixParticipantInstanceId != null) { // NOTE: Force all instances to have the same prefix in order to derive the instance type based on the instance id Preconditions.checkState(InstanceTypeUtils.isController(_helixParticipantInstanceId), - "Instance id must have prefix '%s', got '%s'", CommonConstants.Helix.PREFIX_OF_CONTROLLER_INSTANCE, + "Instance id must have prefix '%s', got '%s'", Helix.PREFIX_OF_CONTROLLER_INSTANCE, _helixParticipantInstanceId); } else { _helixParticipantInstanceId = LeadControllerUtils.generateParticipantInstanceId(_hostname, _port); @@ -273,6 +278,22 @@ public abstract class BaseControllerStarter implements ServiceStartable { ContinuousJfrStarter.init(_config); } + /// Returns the default cluster configs to be stored in ZK as Helix cluster config. These configs will then be + /// propagated to all the instance configs to control the default behavior for each component. + /// Can be overridden to add more configs. + protected Map<String, String> getDefaultClusterConfigs() { + Map<String, String> configs = new HashMap<>(); + configs.put(ZKHelixManager.ALLOW_PARTICIPANT_AUTO_JOIN, "true"); + configs.put(Helix.ENABLE_CASE_INSENSITIVE_KEY, Boolean.toString(Helix.DEFAULT_ENABLE_CASE_INSENSITIVE)); + configs.put(Helix.DEFAULT_HYPERLOGLOG_LOG2M_KEY, Integer.toString(Helix.DEFAULT_HYPERLOGLOG_LOG2M)); + configs.put(CommonConstants.Broker.CONFIG_OF_ENABLE_QUERY_LIMIT_OVERRIDE, "true"); + return configs; + } + + /// Can be overridden to apply custom configs to the controller conf. + protected void applyCustomConfigs(ControllerConf controllerConf) { + } + // If thread pool size is not configured executor will use cached thread pool private ExecutorService createExecutorService(int numThreadPool, String threadNameFormat) { ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat(threadNameFormat).build(); @@ -282,7 +303,7 @@ public abstract class BaseControllerStarter implements ServiceStartable { private void inferHostnameIfNeeded(ControllerConf config) { if (config.getControllerHost() == null) { - if (config.getProperty(CommonConstants.Helix.SET_INSTANCE_ID_TO_HOSTNAME_KEY, false)) { + if (config.getProperty(Helix.SET_INSTANCE_ID_TO_HOSTNAME_KEY, false)) { final String inferredHostname = NetUtils.getHostnameOrAddress(); if (inferredHostname != null) { config.setControllerHost(inferredHostname); @@ -299,14 +320,12 @@ public abstract class BaseControllerStarter implements ServiceStartable { // from ZooKeeper). Setting flapping time window to a small value can avoid this from happening. Helix ignores the // non-positive value, so set the default value as 1. System.setProperty(SystemPropertyKeys.FLAPPING_TIME_WINDOW, - _config.getProperty(CommonConstants.Helix.CONFIG_OF_CONTROLLER_FLAPPING_TIME_WINDOW_MS, - CommonConstants.Helix.DEFAULT_FLAPPING_TIME_WINDOW_MS)); + _config.getProperty(Helix.CONFIG_OF_CONTROLLER_FLAPPING_TIME_WINDOW_MS, Helix.DEFAULT_FLAPPING_TIME_WINDOW_MS)); } private void setupHelixClusterConstraints() { - String maxStateTransitions = - _config.getProperty(CommonConstants.Helix.CONFIG_OF_HELIX_INSTANCE_MAX_STATE_TRANSITIONS, - CommonConstants.Helix.DEFAULT_HELIX_INSTANCE_MAX_STATE_TRANSITIONS); + String maxStateTransitions = _config.getProperty(Helix.CONFIG_OF_HELIX_INSTANCE_MAX_STATE_TRANSITIONS, + Helix.DEFAULT_HELIX_INSTANCE_MAX_STATE_TRANSITIONS); Map<ClusterConstraints.ConstraintAttribute, String> constraintAttributes = new HashMap<>(); constraintAttributes.put(ClusterConstraints.ConstraintAttribute.INSTANCE, ".*"); constraintAttributes.put(ClusterConstraints.ConstraintAttribute.MESSAGE_TYPE, @@ -430,7 +449,7 @@ public abstract class BaseControllerStarter implements ServiceStartable { HelixSetupUtils.setupHelixController(_helixClusterName, _helixZkURL, _helixControllerInstanceId); // Emit helix controller metrics - _controllerMetrics.addCallbackGauge(CommonConstants.Helix.INSTANCE_CONNECTED_METRIC_NAME, + _controllerMetrics.addCallbackGauge(Helix.INSTANCE_CONNECTED_METRIC_NAME, () -> _helixControllerManager.isConnected() ? 1L : 0L); // Deprecated, since getting the leadership of Helix does not mean Helix has been ready for pinot. _controllerMetrics.addCallbackGauge("helix.leader", () -> _helixControllerManager.isLeader() ? 1L : 0L); @@ -780,7 +799,7 @@ public abstract class BaseControllerStarter implements ServiceStartable { (resourceConfigList, changeContext) -> _leadControllerManager.onResourceConfigChange()); } catch (Exception e) { throw new RuntimeException( - "Error registering resource config listener for " + CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_NAME, e); + "Error registering resource config listener for " + Helix.LEAD_CONTROLLER_RESOURCE_NAME, e); } } @@ -791,8 +810,7 @@ public abstract class BaseControllerStarter implements ServiceStartable { if (_tlsPort > 0) { updated |= HelixHelper.updateTlsPort(instanceConfig, _tlsPort); } - updated |= HelixHelper.addDefaultTags(instanceConfig, - () -> Collections.singletonList(CommonConstants.Helix.CONTROLLER_INSTANCE)); + updated |= HelixHelper.addDefaultTags(instanceConfig, () -> Collections.singletonList(Helix.CONTROLLER_INSTANCE)); updated |= HelixHelper.removeDisabledPartitions(instanceConfig); updated |= HelixHelper.updatePinotVersion(instanceConfig); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/HelixSetupUtils.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/HelixSetupUtils.java index 1223135de2..91d8ea163d 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/HelixSetupUtils.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/HelixSetupUtils.java @@ -19,7 +19,9 @@ package org.apache.pinot.controller.helix.core.util; import com.google.common.base.Preconditions; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; import org.apache.helix.ConfigAccessor; @@ -29,7 +31,6 @@ import org.apache.helix.HelixManager; import org.apache.helix.controller.HelixControllerMain; import org.apache.helix.manager.zk.ZKHelixAdmin; import org.apache.helix.manager.zk.ZKHelixDataAccessor; -import org.apache.helix.manager.zk.ZKHelixManager; import org.apache.helix.manager.zk.ZkBaseDataAccessor; import org.apache.helix.model.HelixConfigScope; import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty; @@ -60,39 +61,77 @@ public class HelixSetupUtils { private static final Logger LOGGER = LoggerFactory.getLogger(HelixSetupUtils.class); - public static HelixManager setupHelixController(String helixClusterName, String zkPath, String instanceId) { - setupHelixClusterIfNeeded(helixClusterName, zkPath); - return HelixControllerMain - .startHelixController(zkPath, helixClusterName, instanceId, HelixControllerMain.STANDALONE); - } - - private static void setupHelixClusterIfNeeded(String helixClusterName, String zkPath) { - HelixAdmin admin = null; + /** + * Set up Helix cluster with the given default configs as the Helix cluster config. + * + * <ul> + * <li> + * If the cluster doesn't exist, a new cluster will be created with the default configs. + * </li> + * <li> + * If the cluster already exists, it will be updated with the default configs. Default config will be set only if + * the config doesn't already exist. + * </li> + * </ul> + */ + public static void setupHelixClusterWithDefaultConfigs(String zkAddress, String clusterName, + Map<String, String> defaultConfigs) { + HelixAdmin admin = new ZKHelixAdmin.Builder().setZkAddress(zkAddress).build(); try { - admin = new ZKHelixAdmin.Builder().setZkAddress(zkPath).build(); - if (admin.getClusters().contains(helixClusterName)) { - LOGGER.info("Helix cluster: {} already exists", helixClusterName); + if (admin.getClusters().contains(clusterName)) { + LOGGER.info("Helix cluster: {} already exists, updating default configs: {}", clusterName, defaultConfigs); + HelixConfigScope configScope = + new HelixConfigScopeBuilder(ConfigScopeProperty.CLUSTER).forCluster(clusterName).build(); + List<String> keys = new ArrayList<>(defaultConfigs.keySet()); + Map<String, String> existingConfigs = admin.getConfig(configScope, keys); + Map<String, String> newConfigs = new HashMap<>(); + Map<String, String> ignoredConfigs = new HashMap<>(); + for (Map.Entry<String, String> entry : defaultConfigs.entrySet()) { + String key = entry.getKey(); + String value = entry.getValue(); + String existingValue = existingConfigs.get(key); + if (existingValue == null) { + newConfigs.put(key, value); + } else if (!existingValue.equals(value)) { + ignoredConfigs.put(key, existingValue); + } + } + if (newConfigs.isEmpty()) { + if (ignoredConfigs.isEmpty()) { + LOGGER.info("No config change needed for Helix cluster: {}. All configs are using default values", + clusterName); + } else { + LOGGER.info("No config change needed for Helix cluster: {}. Ignored configs using non-default values: {}", + clusterName, ignoredConfigs); + } + } else { + admin.setConfig(configScope, newConfigs); + if (ignoredConfigs.isEmpty()) { + LOGGER.info("Updated helix cluster: {} with new configs: {}. All other configs are using default values", + clusterName, newConfigs); + } else { + LOGGER.info("Updated helix cluster: {} with new configs: {}. Ignored configs using non-default values: {}", + clusterName, newConfigs, ignoredConfigs); + } + } } else { - LOGGER.info("Creating a new Helix cluster: {}", helixClusterName); - admin.addCluster(helixClusterName, false); - // Enable Auto-Join for the cluster + LOGGER.info("Creating a new Helix cluster: {} with default configs: {}", clusterName, defaultConfigs); + admin.addCluster(clusterName, false); HelixConfigScope configScope = - new HelixConfigScopeBuilder(ConfigScopeProperty.CLUSTER).forCluster(helixClusterName).build(); - Map<String, String> configMap = new HashMap<>(); - configMap.put(ZKHelixManager.ALLOW_PARTICIPANT_AUTO_JOIN, Boolean.toString(true)); - configMap.put(ENABLE_CASE_INSENSITIVE_KEY, Boolean.toString(DEFAULT_ENABLE_CASE_INSENSITIVE)); - configMap.put(DEFAULT_HYPERLOGLOG_LOG2M_KEY, Integer.toString(DEFAULT_HYPERLOGLOG_LOG2M)); - configMap.put(CommonConstants.Broker.CONFIG_OF_ENABLE_QUERY_LIMIT_OVERRIDE, Boolean.toString(false)); - admin.setConfig(configScope, configMap); - LOGGER.info("New Helix cluster: {} created", helixClusterName); + new HelixConfigScopeBuilder(ConfigScopeProperty.CLUSTER).forCluster(clusterName).build(); + admin.setConfig(configScope, defaultConfigs); + LOGGER.info("New Helix cluster: {} created with default configs: {}", clusterName, defaultConfigs); } } finally { - if (admin != null) { - admin.close(); - } + admin.close(); } } + public static HelixManager setupHelixController(String helixClusterName, String zkPath, String instanceId) { + return HelixControllerMain.startHelixController(zkPath, helixClusterName, instanceId, + HelixControllerMain.STANDALONE); + } + public static void setupPinotCluster(String helixClusterName, String zkPath, boolean isUpdateStateModel, boolean enableBatchMessageMode, ControllerConf controllerConf) { ZkClient zkClient = null; diff --git a/pinot-minion/src/main/java/org/apache/pinot/minion/BaseMinionStarter.java b/pinot-minion/src/main/java/org/apache/pinot/minion/BaseMinionStarter.java index 421d0bc987..a5cffb1a60 100644 --- a/pinot-minion/src/main/java/org/apache/pinot/minion/BaseMinionStarter.java +++ b/pinot-minion/src/main/java/org/apache/pinot/minion/BaseMinionStarter.java @@ -107,10 +107,9 @@ public abstract class BaseMinionStarter implements ServiceStartable { String zkAddress = _config.getZkAddress(); String helixClusterName = _config.getHelixClusterName(); ServiceStartableUtils.applyClusterConfig(_config, zkAddress, helixClusterName, ServiceRole.MINION); + applyCustomConfigs(_config); - PinotInsecureMode.setPinotInInsecureMode( - Boolean.valueOf(_config.getProperty(CommonConstants.CONFIG_OF_PINOT_INSECURE_MODE, - CommonConstants.DEFAULT_PINOT_INSECURE_MODE))); + PinotInsecureMode.setPinotInInsecureMode(_config.getProperty(CommonConstants.CONFIG_OF_PINOT_INSECURE_MODE, false)); setupHelixSystemProperties(); _hostname = _config.getHostName(); @@ -137,19 +136,8 @@ public abstract class BaseMinionStarter implements ServiceStartable { ContinuousJfrStarter.init(_config); } - private void updateInstanceConfigIfNeeded() { - InstanceConfig instanceConfig = HelixHelper.getInstanceConfig(_helixManager, _instanceId); - boolean updated = HelixHelper.updateHostnamePort(instanceConfig, _hostname, _port); - if (_tlsPort > 0) { - updated |= HelixHelper.updateTlsPort(instanceConfig, _tlsPort); - } - updated |= HelixHelper.addDefaultTags(instanceConfig, - () -> Collections.singletonList(CommonConstants.Helix.UNTAGGED_MINION_INSTANCE)); - updated |= HelixHelper.removeDisabledPartitions(instanceConfig); - updated |= HelixHelper.updatePinotVersion(instanceConfig); - if (updated) { - HelixHelper.updateInstanceConfig(_helixManager, instanceConfig); - } + /// Can be overridden to apply custom configs to the minion conf. + protected void applyCustomConfigs(MinionConf minionConf) { } private void setupHelixSystemProperties() { @@ -355,6 +343,21 @@ public abstract class BaseMinionStarter implements ServiceStartable { LOGGER.info("Pinot minion started"); } + private void updateInstanceConfigIfNeeded() { + InstanceConfig instanceConfig = HelixHelper.getInstanceConfig(_helixManager, _instanceId); + boolean updated = HelixHelper.updateHostnamePort(instanceConfig, _hostname, _port); + if (_tlsPort > 0) { + updated |= HelixHelper.updateTlsPort(instanceConfig, _tlsPort); + } + updated |= HelixHelper.addDefaultTags(instanceConfig, + () -> Collections.singletonList(CommonConstants.Helix.UNTAGGED_MINION_INSTANCE)); + updated |= HelixHelper.removeDisabledPartitions(instanceConfig); + updated |= HelixHelper.updatePinotVersion(instanceConfig); + if (updated) { + HelixHelper.updateInstanceConfig(_helixManager, instanceConfig); + } + } + /** * Stops the Pinot Minion instance. */ diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java index c832600861..5704d44875 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java @@ -171,10 +171,10 @@ public abstract class BaseServerStarter implements ServiceStartable { _zkAddress = _serverConf.getProperty(CommonConstants.Helix.CONFIG_OF_ZOOKEEPR_SERVER); _helixClusterName = _serverConf.getProperty(CommonConstants.Helix.CONFIG_OF_CLUSTER_NAME); ServiceStartableUtils.applyClusterConfig(_serverConf, _zkAddress, _helixClusterName, ServiceRole.SERVER); + applyCustomConfigs(_serverConf); - PinotInsecureMode.setPinotInInsecureMode(Boolean.parseBoolean( - _serverConf.getProperty(CommonConstants.CONFIG_OF_PINOT_INSECURE_MODE, - CommonConstants.DEFAULT_PINOT_INSECURE_MODE))); + PinotInsecureMode.setPinotInInsecureMode( + _serverConf.getProperty(CommonConstants.CONFIG_OF_PINOT_INSECURE_MODE, false)); String tarCompressionCodecName = _serverConf.getProperty(CommonConstants.CONFIG_OF_PINOT_TAR_COMPRESSION_CODEC_NAME); @@ -261,6 +261,10 @@ public abstract class BaseServerStarter implements ServiceStartable { ContinuousJfrStarter.init(_serverConf); } + /// Can be overridden to apply custom configs to the server conf. + protected void applyCustomConfigs(PinotConfiguration serverConf) { + } + /** * Invoke pinot environment provider factory's init method to register the environment provider & * return the instantiated environment provider. diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java index 1bcbc1126e..5921c9bdba 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java @@ -64,6 +64,7 @@ public class CommonConstants { public static final String DATABASE = "database"; public static final String DEFAULT_DATABASE = "default"; public static final String CONFIG_OF_PINOT_INSECURE_MODE = "pinot.insecure.mode"; + @Deprecated public static final String DEFAULT_PINOT_INSECURE_MODE = "false"; public static final String CONFIG_OF_EXECUTORS_FIXED_NUM_THREADS = "pinot.executors.fixed.default.numThreads"; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org