This is an automated email from the ASF dual-hosted git repository.

sajjad pushed a commit to branch hotfix-zk-watcher-2
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/hotfix-zk-watcher-2 by this 
push:
     new 4fa09f6163 Reduce ZK access for SendStatsPredicate (#15895) (#15917)
4fa09f6163 is described below

commit 4fa09f6163370eda56dea6d9bc014ca290b72e48
Author: Praveen <praveenkchagan...@gmail.com>
AuthorDate: Tue May 27 15:14:25 2025 -0700

    Reduce ZK access for SendStatsPredicate (#15895) (#15917)
    
    Co-authored-by: Xiaotian (Jackie) Jiang 
<17555551+jackie-ji...@users.noreply.github.com>
---
 .../server/starter/helix/BaseServerStarter.java    |  14 +-
 .../server/starter/helix/SendStatsPredicate.java   | 141 ++++++++++++++++-----
 .../pinot/server/worker/WorkerQueryServer.java     |   2 +-
 3 files changed, 117 insertions(+), 40 deletions(-)

diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
index 5704d44875..689ace66f7 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
@@ -672,7 +672,7 @@ public abstract class BaseServerStarter implements 
ServiceStartable {
         new SegmentOperationsThrottler(segmentAllIndexPreprocessThrottler, 
segmentStarTreePreprocessThrottler,
             segmentDownloadThrottler);
 
-    SendStatsPredicate sendStatsPredicate = 
SendStatsPredicate.create(_serverConf);
+    SendStatsPredicate sendStatsPredicate = 
SendStatsPredicate.create(_serverConf, _helixManager);
     ServerConf serverConf = new ServerConf(_serverConf);
     _serverInstance = new ServerInstance(serverConf, _helixManager, 
_accessControlFactory, _segmentOperationsThrottler,
         sendStatsPredicate);
@@ -706,11 +706,13 @@ public abstract class BaseServerStarter implements 
ServiceStartable {
     }
     
_clusterConfigChangeHandler.registerClusterConfigChangeListener(_segmentOperationsThrottler);
 
-    LOGGER.info("Initializing and registering the SendStatsPredicate");
-    try {
-      _helixManager.addInstanceConfigChangeListener(sendStatsPredicate);
-    } catch (Exception e) {
-      LOGGER.error("Failed to register SendStatsPredicate as the Helix 
InstanceConfigChangeListener", e);
+    if (sendStatsPredicate.needWatchForInstanceConfigChange()) {
+      LOGGER.info("Initializing and registering the SendStatsPredicate");
+      try {
+        _helixManager.addInstanceConfigChangeListener(sendStatsPredicate);
+      } catch (Exception e) {
+        LOGGER.error("Failed to register SendStatsPredicate as the Helix 
InstanceConfigChangeListener", e);
+      }
     }
 
     // Start restlet server for admin API endpoint
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SendStatsPredicate.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SendStatsPredicate.java
index 833f7d4504..4678cab2dd 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SendStatsPredicate.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SendStatsPredicate.java
@@ -22,12 +22,16 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
 import javax.annotation.Nullable;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixManager;
 import org.apache.helix.NotificationContext;
+import org.apache.helix.api.listeners.BatchMode;
 import org.apache.helix.api.listeners.InstanceConfigChangeListener;
+import org.apache.helix.api.listeners.PreFetch;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.pinot.common.version.PinotVersion;
+import org.apache.pinot.spi.config.instance.InstanceType;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.InstanceTypeUtils;
@@ -54,10 +58,13 @@ import org.slf4j.LoggerFactory;
 public abstract class SendStatsPredicate implements 
InstanceConfigChangeListener {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(SendStatsPredicate.class);
 
-  public abstract boolean getSendStats();
+  public abstract boolean isSendStats();
 
-  public static SendStatsPredicate create(PinotConfiguration configuration) {
-    String modeStr = configuration.getProperty(
+  public abstract boolean needWatchForInstanceConfigChange();
+
+  // NOTE: When this method is called, the helix manager is not yet connected.
+  public static SendStatsPredicate create(PinotConfiguration serverConf, 
HelixManager helixManager) {
+    String modeStr = serverConf.getProperty(
         CommonConstants.MultiStageQueryRunner.KEY_OF_SEND_STATS_MODE,
         
CommonConstants.MultiStageQueryRunner.DEFAULT_SEND_STATS_MODE).toUpperCase(Locale.ENGLISH);
     Mode mode;
@@ -67,87 +74,155 @@ public abstract class SendStatsPredicate implements 
InstanceConfigChangeListener
       throw new IllegalArgumentException("Invalid value " + modeStr + " for "
           + CommonConstants.MultiStageQueryRunner.KEY_OF_SEND_STATS_MODE, e);
     }
-    return mode.create();
+    return mode.create(helixManager);
   }
 
   public enum Mode {
     SAFE {
       @Override
-      public SendStatsPredicate create() {
-        return new Safe();
+      public SendStatsPredicate create(HelixManager helixManager) {
+        return new Safe(helixManager);
       }
     },
     ALWAYS {
       @Override
-      public SendStatsPredicate create() {
+      public SendStatsPredicate create(HelixManager helixManager) {
         return new SendStatsPredicate() {
           @Override
-          public boolean getSendStats() {
+          public boolean isSendStats() {
             return true;
           }
 
+          @Override
+          public boolean needWatchForInstanceConfigChange() {
+            return false;
+          }
+
           @Override
           public void onInstanceConfigChange(List<InstanceConfig> 
instanceConfigs, NotificationContext context) {
-            // Nothing to do
+            throw new UnsupportedOperationException("Should not be invoked");
           }
         };
       }
     },
     NEVER {
       @Override
-      public SendStatsPredicate create() {
+      public SendStatsPredicate create(HelixManager helixManager) {
         return new SendStatsPredicate() {
           @Override
-          public boolean getSendStats() {
+          public boolean isSendStats() {
+            return false;
+          }
+
+          @Override
+          public boolean needWatchForInstanceConfigChange() {
             return false;
           }
 
           @Override
           public void onInstanceConfigChange(List<InstanceConfig> 
instanceConfigs, NotificationContext context) {
-            // Nothing to do
+            throw new UnsupportedOperationException("Should not be invoked");
           }
         };
       }
     };
 
-    public abstract SendStatsPredicate create();
+    public abstract SendStatsPredicate create(HelixManager helixManager);
   }
 
+  @BatchMode(enabled = false)
+  @PreFetch(enabled = false)
   private static class Safe extends SendStatsPredicate {
-    private final AtomicBoolean _sendStats = new AtomicBoolean(true);
+    private final HelixManager _helixManager;
+    private final String _clusterName;
+    private final Map<String, String> _problematicVersionsById = new 
HashMap<>();
+
+    private HelixAdmin _helixAdmin;
+    private volatile boolean _sendStats = true;
+
+    public Safe(HelixManager helixManager) {
+      _helixManager = helixManager;
+      _clusterName = helixManager.getClusterName();
+    }
+
+    @Override
+    public boolean isSendStats() {
+      return _sendStats;
+    }
 
     @Override
-    public boolean getSendStats() {
-      return _sendStats.get();
+    public boolean needWatchForInstanceConfigChange() {
+      return true;
     }
 
     @Override
-    public void onInstanceConfigChange(List<InstanceConfig> instanceConfigs, 
NotificationContext context) {
-      Map<String, String> problematicVersionsById = new HashMap<>();
-      for (InstanceConfig instanceConfig : instanceConfigs) {
-        switch 
(InstanceTypeUtils.getInstanceType(instanceConfig.getInstanceName())) {
-          case BROKER:
-          case SERVER:
-            String otherVersion = instanceConfig.getRecord()
-                
.getStringField(CommonConstants.Helix.Instance.PINOT_VERSION_KEY, null);
-            if (isProblematicVersion(otherVersion)) {
-              problematicVersionsById.put(instanceConfig.getInstanceName(), 
otherVersion);
+    public synchronized void onInstanceConfigChange(List<InstanceConfig> 
instanceConfigs, NotificationContext context) {
+      if (_helixAdmin == null) {
+        _helixAdmin = _helixManager.getClusterManagmentTool();
+      }
+      NotificationContext.Type type = context.getType();
+      if (type != NotificationContext.Type.INIT && type != 
NotificationContext.Type.CALLBACK) {
+        LOGGER.warn("Ignoring notification type: {} for instance config 
change", type);
+        return;
+      }
+      if (type == NotificationContext.Type.INIT || context.getIsChildChange()) 
{
+        _problematicVersionsById.clear();
+        for (String instance : 
_helixAdmin.getInstancesInCluster(_clusterName)) {
+          if (needVersionCheck(instance)) {
+            InstanceConfig instanceConfig;
+            try {
+              instanceConfig = _helixAdmin.getInstanceConfig(_clusterName, 
instance);
+            } catch (Exception e) {
+              LOGGER.warn("Failed to get instance config for instance: {}, 
continue", instance, e);
+              continue;
             }
-            break;
-          default:
-            continue;
+            String version = getVersion(instanceConfig);
+            if (isProblematicVersion(version)) {
+              _problematicVersionsById.put(instance, version);
+            }
+          }
+        }
+      } else {
+        String pathChanged = context.getPathChanged();
+        String instanceName = 
pathChanged.substring(pathChanged.lastIndexOf('/') + 1);
+        if (needVersionCheck(instanceName)) {
+          InstanceConfig instanceConfig;
+          try {
+            instanceConfig = _helixAdmin.getInstanceConfig(_clusterName, 
instanceName);
+            String version = getVersion(instanceConfig);
+            if (isProblematicVersion(version)) {
+              _problematicVersionsById.put(instanceName, version);
+            } else {
+              _problematicVersionsById.remove(instanceName);
+            }
+          } catch (Exception e) {
+            LOGGER.warn("Failed to get instance config for instance: {}, 
treating it as non-problematic", instanceName,
+                e);
+            _problematicVersionsById.remove(instanceName);
+          }
         }
       }
-      boolean sendStats = problematicVersionsById.isEmpty();
-      if (_sendStats.getAndSet(sendStats) != sendStats) {
+      boolean sendStats = _problematicVersionsById.isEmpty();
+      if (_sendStats != sendStats) {
+        _sendStats = sendStats;
         if (sendStats) {
           LOGGER.warn("Send MSE stats is now enabled");
         } else {
-          LOGGER.warn("Send MSE stats is now disabled (problematic versions: 
{})", problematicVersionsById);
+          LOGGER.warn("Send MSE stats is now disabled (problematic versions: 
{})", _problematicVersionsById);
         }
       }
     }
 
+    private boolean needVersionCheck(String instanceName) {
+      InstanceType instanceType = 
InstanceTypeUtils.getInstanceType(instanceName);
+      return instanceType == InstanceType.BROKER || instanceType == 
InstanceType.SERVER;
+    }
+
+    @Nullable
+    private String getVersion(InstanceConfig instanceConfig) {
+      return 
instanceConfig.getRecord().getStringField(CommonConstants.Helix.Instance.PINOT_VERSION_KEY,
 null);
+    }
+
     /// Returns true if the version is problematic
     ///
     /// Ideally [PinotVersion] should have a way to extract versions in 
comparable format, but given it doesn't we
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/worker/WorkerQueryServer.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/worker/WorkerQueryServer.java
index 9fa5af663d..9a2e976064 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/worker/WorkerQueryServer.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/worker/WorkerQueryServer.java
@@ -41,7 +41,7 @@ public class WorkerQueryServer {
     _queryServicePort = 
_configuration.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_SERVER_PORT,
         CommonConstants.MultiStageQueryRunner.DEFAULT_QUERY_SERVER_PORT);
     QueryRunner queryRunner = new QueryRunner();
-    queryRunner.init(_configuration, instanceDataManager, tlsConfig, 
sendStats::getSendStats);
+    queryRunner.init(_configuration, instanceDataManager, tlsConfig, 
sendStats::isSendStats);
     _queryWorkerService = new QueryServer(_queryServicePort, queryRunner, 
tlsConfig, configuration);
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to