This is an automated email from the ASF dual-hosted git repository. sajjad pushed a commit to branch hotfix-zk-watcher-2 in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/hotfix-zk-watcher-2 by this push: new 4fa09f6163 Reduce ZK access for SendStatsPredicate (#15895) (#15917) 4fa09f6163 is described below commit 4fa09f6163370eda56dea6d9bc014ca290b72e48 Author: Praveen <praveenkchagan...@gmail.com> AuthorDate: Tue May 27 15:14:25 2025 -0700 Reduce ZK access for SendStatsPredicate (#15895) (#15917) Co-authored-by: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com> --- .../server/starter/helix/BaseServerStarter.java | 14 +- .../server/starter/helix/SendStatsPredicate.java | 141 ++++++++++++++++----- .../pinot/server/worker/WorkerQueryServer.java | 2 +- 3 files changed, 117 insertions(+), 40 deletions(-) diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java index 5704d44875..689ace66f7 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java @@ -672,7 +672,7 @@ public abstract class BaseServerStarter implements ServiceStartable { new SegmentOperationsThrottler(segmentAllIndexPreprocessThrottler, segmentStarTreePreprocessThrottler, segmentDownloadThrottler); - SendStatsPredicate sendStatsPredicate = SendStatsPredicate.create(_serverConf); + SendStatsPredicate sendStatsPredicate = SendStatsPredicate.create(_serverConf, _helixManager); ServerConf serverConf = new ServerConf(_serverConf); _serverInstance = new ServerInstance(serverConf, _helixManager, _accessControlFactory, _segmentOperationsThrottler, sendStatsPredicate); @@ -706,11 +706,13 @@ public abstract class BaseServerStarter implements ServiceStartable { } _clusterConfigChangeHandler.registerClusterConfigChangeListener(_segmentOperationsThrottler); - LOGGER.info("Initializing and registering the SendStatsPredicate"); - try { - _helixManager.addInstanceConfigChangeListener(sendStatsPredicate); - } catch (Exception e) { - LOGGER.error("Failed to register SendStatsPredicate as the Helix InstanceConfigChangeListener", e); + if (sendStatsPredicate.needWatchForInstanceConfigChange()) { + LOGGER.info("Initializing and registering the SendStatsPredicate"); + try { + _helixManager.addInstanceConfigChangeListener(sendStatsPredicate); + } catch (Exception e) { + LOGGER.error("Failed to register SendStatsPredicate as the Helix InstanceConfigChangeListener", e); + } } // Start restlet server for admin API endpoint diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SendStatsPredicate.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SendStatsPredicate.java index 833f7d4504..4678cab2dd 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SendStatsPredicate.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SendStatsPredicate.java @@ -22,12 +22,16 @@ import java.util.HashMap; import java.util.List; import java.util.Locale; import java.util.Map; -import java.util.concurrent.atomic.AtomicBoolean; import javax.annotation.Nullable; +import org.apache.helix.HelixAdmin; +import org.apache.helix.HelixManager; import org.apache.helix.NotificationContext; +import org.apache.helix.api.listeners.BatchMode; import org.apache.helix.api.listeners.InstanceConfigChangeListener; +import org.apache.helix.api.listeners.PreFetch; import org.apache.helix.model.InstanceConfig; import org.apache.pinot.common.version.PinotVersion; +import org.apache.pinot.spi.config.instance.InstanceType; import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.InstanceTypeUtils; @@ -54,10 +58,13 @@ import org.slf4j.LoggerFactory; public abstract class SendStatsPredicate implements InstanceConfigChangeListener { private static final Logger LOGGER = LoggerFactory.getLogger(SendStatsPredicate.class); - public abstract boolean getSendStats(); + public abstract boolean isSendStats(); - public static SendStatsPredicate create(PinotConfiguration configuration) { - String modeStr = configuration.getProperty( + public abstract boolean needWatchForInstanceConfigChange(); + + // NOTE: When this method is called, the helix manager is not yet connected. + public static SendStatsPredicate create(PinotConfiguration serverConf, HelixManager helixManager) { + String modeStr = serverConf.getProperty( CommonConstants.MultiStageQueryRunner.KEY_OF_SEND_STATS_MODE, CommonConstants.MultiStageQueryRunner.DEFAULT_SEND_STATS_MODE).toUpperCase(Locale.ENGLISH); Mode mode; @@ -67,87 +74,155 @@ public abstract class SendStatsPredicate implements InstanceConfigChangeListener throw new IllegalArgumentException("Invalid value " + modeStr + " for " + CommonConstants.MultiStageQueryRunner.KEY_OF_SEND_STATS_MODE, e); } - return mode.create(); + return mode.create(helixManager); } public enum Mode { SAFE { @Override - public SendStatsPredicate create() { - return new Safe(); + public SendStatsPredicate create(HelixManager helixManager) { + return new Safe(helixManager); } }, ALWAYS { @Override - public SendStatsPredicate create() { + public SendStatsPredicate create(HelixManager helixManager) { return new SendStatsPredicate() { @Override - public boolean getSendStats() { + public boolean isSendStats() { return true; } + @Override + public boolean needWatchForInstanceConfigChange() { + return false; + } + @Override public void onInstanceConfigChange(List<InstanceConfig> instanceConfigs, NotificationContext context) { - // Nothing to do + throw new UnsupportedOperationException("Should not be invoked"); } }; } }, NEVER { @Override - public SendStatsPredicate create() { + public SendStatsPredicate create(HelixManager helixManager) { return new SendStatsPredicate() { @Override - public boolean getSendStats() { + public boolean isSendStats() { + return false; + } + + @Override + public boolean needWatchForInstanceConfigChange() { return false; } @Override public void onInstanceConfigChange(List<InstanceConfig> instanceConfigs, NotificationContext context) { - // Nothing to do + throw new UnsupportedOperationException("Should not be invoked"); } }; } }; - public abstract SendStatsPredicate create(); + public abstract SendStatsPredicate create(HelixManager helixManager); } + @BatchMode(enabled = false) + @PreFetch(enabled = false) private static class Safe extends SendStatsPredicate { - private final AtomicBoolean _sendStats = new AtomicBoolean(true); + private final HelixManager _helixManager; + private final String _clusterName; + private final Map<String, String> _problematicVersionsById = new HashMap<>(); + + private HelixAdmin _helixAdmin; + private volatile boolean _sendStats = true; + + public Safe(HelixManager helixManager) { + _helixManager = helixManager; + _clusterName = helixManager.getClusterName(); + } + + @Override + public boolean isSendStats() { + return _sendStats; + } @Override - public boolean getSendStats() { - return _sendStats.get(); + public boolean needWatchForInstanceConfigChange() { + return true; } @Override - public void onInstanceConfigChange(List<InstanceConfig> instanceConfigs, NotificationContext context) { - Map<String, String> problematicVersionsById = new HashMap<>(); - for (InstanceConfig instanceConfig : instanceConfigs) { - switch (InstanceTypeUtils.getInstanceType(instanceConfig.getInstanceName())) { - case BROKER: - case SERVER: - String otherVersion = instanceConfig.getRecord() - .getStringField(CommonConstants.Helix.Instance.PINOT_VERSION_KEY, null); - if (isProblematicVersion(otherVersion)) { - problematicVersionsById.put(instanceConfig.getInstanceName(), otherVersion); + public synchronized void onInstanceConfigChange(List<InstanceConfig> instanceConfigs, NotificationContext context) { + if (_helixAdmin == null) { + _helixAdmin = _helixManager.getClusterManagmentTool(); + } + NotificationContext.Type type = context.getType(); + if (type != NotificationContext.Type.INIT && type != NotificationContext.Type.CALLBACK) { + LOGGER.warn("Ignoring notification type: {} for instance config change", type); + return; + } + if (type == NotificationContext.Type.INIT || context.getIsChildChange()) { + _problematicVersionsById.clear(); + for (String instance : _helixAdmin.getInstancesInCluster(_clusterName)) { + if (needVersionCheck(instance)) { + InstanceConfig instanceConfig; + try { + instanceConfig = _helixAdmin.getInstanceConfig(_clusterName, instance); + } catch (Exception e) { + LOGGER.warn("Failed to get instance config for instance: {}, continue", instance, e); + continue; } - break; - default: - continue; + String version = getVersion(instanceConfig); + if (isProblematicVersion(version)) { + _problematicVersionsById.put(instance, version); + } + } + } + } else { + String pathChanged = context.getPathChanged(); + String instanceName = pathChanged.substring(pathChanged.lastIndexOf('/') + 1); + if (needVersionCheck(instanceName)) { + InstanceConfig instanceConfig; + try { + instanceConfig = _helixAdmin.getInstanceConfig(_clusterName, instanceName); + String version = getVersion(instanceConfig); + if (isProblematicVersion(version)) { + _problematicVersionsById.put(instanceName, version); + } else { + _problematicVersionsById.remove(instanceName); + } + } catch (Exception e) { + LOGGER.warn("Failed to get instance config for instance: {}, treating it as non-problematic", instanceName, + e); + _problematicVersionsById.remove(instanceName); + } } } - boolean sendStats = problematicVersionsById.isEmpty(); - if (_sendStats.getAndSet(sendStats) != sendStats) { + boolean sendStats = _problematicVersionsById.isEmpty(); + if (_sendStats != sendStats) { + _sendStats = sendStats; if (sendStats) { LOGGER.warn("Send MSE stats is now enabled"); } else { - LOGGER.warn("Send MSE stats is now disabled (problematic versions: {})", problematicVersionsById); + LOGGER.warn("Send MSE stats is now disabled (problematic versions: {})", _problematicVersionsById); } } } + private boolean needVersionCheck(String instanceName) { + InstanceType instanceType = InstanceTypeUtils.getInstanceType(instanceName); + return instanceType == InstanceType.BROKER || instanceType == InstanceType.SERVER; + } + + @Nullable + private String getVersion(InstanceConfig instanceConfig) { + return instanceConfig.getRecord().getStringField(CommonConstants.Helix.Instance.PINOT_VERSION_KEY, null); + } + /// Returns true if the version is problematic /// /// Ideally [PinotVersion] should have a way to extract versions in comparable format, but given it doesn't we diff --git a/pinot-server/src/main/java/org/apache/pinot/server/worker/WorkerQueryServer.java b/pinot-server/src/main/java/org/apache/pinot/server/worker/WorkerQueryServer.java index 9fa5af663d..9a2e976064 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/worker/WorkerQueryServer.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/worker/WorkerQueryServer.java @@ -41,7 +41,7 @@ public class WorkerQueryServer { _queryServicePort = _configuration.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_SERVER_PORT, CommonConstants.MultiStageQueryRunner.DEFAULT_QUERY_SERVER_PORT); QueryRunner queryRunner = new QueryRunner(); - queryRunner.init(_configuration, instanceDataManager, tlsConfig, sendStats::getSendStats); + queryRunner.init(_configuration, instanceDataManager, tlsConfig, sendStats::isSendStats); _queryWorkerService = new QueryServer(_queryServicePort, queryRunner, tlsConfig, configuration); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org