richardstartin commented on code in PR #8491:
URL: https://github.com/apache/pinot/pull/8491#discussion_r849839949


##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java:
##########
@@ -214,87 +219,178 @@ private void processInstanceConfigChange() {
     long startTimeMs = System.currentTimeMillis();
 
     List<ZNRecord> instanceConfigZNRecords =
-        _zkDataAccessor.getChildren(_instanceConfigsPath, null, 
AccessOption.PERSISTENT,
-            CommonConstants.Helix.ZkClient.RETRY_COUNT, 
CommonConstants.Helix.ZkClient.RETRY_INTERVAL_MS);
+        _zkDataAccessor.getChildren(_instanceConfigsPath, null, 
AccessOption.PERSISTENT, Helix.ZkClient.RETRY_COUNT,
+            Helix.ZkClient.RETRY_INTERVAL_MS);
     long fetchInstanceConfigsEndTimeMs = System.currentTimeMillis();
 
-    // Calculate new enabled and disabled instances
-    Set<String> enabledInstances = new HashSet<>();
-    List<String> newEnabledInstances = new ArrayList<>();
+    // Calculate new enabled and disabled servers
+    Set<String> enabledServers = new HashSet<>();
+    List<String> newEnabledServers = new ArrayList<>();
     for (ZNRecord instanceConfigZNRecord : instanceConfigZNRecords) {
-      String instance = instanceConfigZNRecord.getId();
-      if (isInstanceEnabled(instanceConfigZNRecord)) {
-        enabledInstances.add(instance);
+      String instanceId = instanceConfigZNRecord.getId();
+      if (isEnabledServer(instanceConfigZNRecord)) {
+        enabledServers.add(instanceId);
 
         // Always refresh the server instance with the latest instance config 
in case it changes
         ServerInstance serverInstance = new ServerInstance(new 
InstanceConfig(instanceConfigZNRecord));
-        if (_enabledServerInstanceMap.put(instance, serverInstance) == null) {
-          newEnabledInstances.add(instance);
+        if (_enabledServerInstanceMap.put(instanceId, serverInstance) == null) 
{
+          newEnabledServers.add(instanceId);
+
+          // NOTE: Remove new enabled server from excluded servers because the 
server is likely being restarted
+          if (_excludedServers.remove(instanceId)) {
+            LOGGER.info("Got excluded server: {} re-enabled, including it into 
the routing", instanceId);
+          }
         }
       }
     }
-    List<String> newDisabledInstances = new ArrayList<>();
+    List<String> newDisabledServers = new ArrayList<>();
     for (String instance : _enabledServerInstanceMap.keySet()) {
-      if (!enabledInstances.contains(instance)) {
-        newDisabledInstances.add(instance);
+      if (!enabledServers.contains(instance)) {
+        newDisabledServers.add(instance);
       }
     }
-    List<String> changedInstances = new ArrayList<>(newEnabledInstances.size() 
+ newDisabledInstances.size());
-    changedInstances.addAll(newEnabledInstances);
-    changedInstances.addAll(newDisabledInstances);
-    long calculateChangedInstancesEndTimeMs = System.currentTimeMillis();
-
-    // Early terminate if there is no instance changed
-    if (changedInstances.isEmpty()) {
-      LOGGER.info(
-          "Processed instance config change in {}ms (fetch {} instance 
configs: {}ms, calculate changed instances: "
-              + "{}ms) without instance change", 
calculateChangedInstancesEndTimeMs - startTimeMs,
-          instanceConfigZNRecords.size(), fetchInstanceConfigsEndTimeMs - 
startTimeMs,
-          calculateChangedInstancesEndTimeMs - fetchInstanceConfigsEndTimeMs);
+
+    // Calculate the routable servers and the changed routable servers
+    List<String> changedServers = new ArrayList<>(newEnabledServers.size() + 
newDisabledServers.size());
+    if (_excludedServers.isEmpty()) {
+      _routableServers = enabledServers;
+      changedServers.addAll(newEnabledServers);
+      changedServers.addAll(newDisabledServers);
+    } else {
+      enabledServers.removeAll(_excludedServers);
+      _routableServers = enabledServers;
+      // NOTE: All new enabled servers are routable
+      changedServers.addAll(newEnabledServers);
+      for (String newDisabledServer : newDisabledServers) {
+        if (_excludedServers.contains(newDisabledServer)) {
+          changedServers.add(newDisabledServer);
+        }
+      }
+    }
+    long calculateChangedServersEndTimeMs = System.currentTimeMillis();
+
+    // Early terminate if there is no changed servers
+    if (changedServers.isEmpty()) {
+      LOGGER.info("Processed instance config change in {}ms "
+              + "(fetch {} instance configs: {}ms, calculate changed servers: 
{}ms) without instance change",
+          calculateChangedServersEndTimeMs - startTimeMs, 
instanceConfigZNRecords.size(),
+          fetchInstanceConfigsEndTimeMs - startTimeMs,
+          calculateChangedServersEndTimeMs - fetchInstanceConfigsEndTimeMs);
       return;
     }
 
     // Update routing entry for all tables
     for (RoutingEntry routingEntry : _routingEntryMap.values()) {
       try {
-        routingEntry.onInstancesChange(enabledInstances, changedInstances);
+        routingEntry.onInstancesChange(_routableServers, changedServers);
       } catch (Exception e) {
         LOGGER.error("Caught unexpected exception while updating routing entry 
on instances change for table: {}",
             routingEntry.getTableNameWithType(), e);
       }
     }
     long updateRoutingEntriesEndTimeMs = System.currentTimeMillis();
 
-    // Remove new disabled instances from _enabledServerInstanceMap after 
updating all routing entries to ensure it
-    // always contains the selected instances
-    for (String newDisabledInstance : newDisabledInstances) {
+    // Remove new disabled servers from _enabledServerInstanceMap after 
updating all routing entries to ensure it
+    // always contains the selected servers
+    for (String newDisabledInstance : newDisabledServers) {
       _enabledServerInstanceMap.remove(newDisabledInstance);
     }
 
-    LOGGER.info(
-        "Processed instance config change in {}ms (fetch {} instance configs: 
{}ms, calculate changed instances: "
-            + "{}ms, update {} routing entries: {}ms), new enabled instances: 
{}, new disabled instances: {}",
+    LOGGER.info("Processed instance config change in {}ms "
+            + "(fetch {} instance configs: {}ms, calculate changed servers: 
{}ms, update {} routing entries: {}ms), "
+            + "new enabled servers: {}, new disabled servers: {}, excluded 
servers: {}",
         updateRoutingEntriesEndTimeMs - startTimeMs, 
instanceConfigZNRecords.size(),
-        fetchInstanceConfigsEndTimeMs - startTimeMs, 
calculateChangedInstancesEndTimeMs - fetchInstanceConfigsEndTimeMs,
-        _routingEntryMap.size(), updateRoutingEntriesEndTimeMs - 
calculateChangedInstancesEndTimeMs,
-        newEnabledInstances, newDisabledInstances);
+        fetchInstanceConfigsEndTimeMs - startTimeMs, 
calculateChangedServersEndTimeMs - fetchInstanceConfigsEndTimeMs,
+        _routingEntryMap.size(), updateRoutingEntriesEndTimeMs - 
calculateChangedServersEndTimeMs, newEnabledServers,
+        newDisabledServers, _excludedServers);
   }
 
-  private static boolean isInstanceEnabled(ZNRecord instanceConfigZNRecord) {
+  private static boolean isEnabledServer(ZNRecord instanceConfigZNRecord) {
+    String instanceId = instanceConfigZNRecord.getId();
+    if (!instanceId.startsWith(Helix.PREFIX_OF_SERVER_INSTANCE)) {
+      // NOTE: Some legacy configs might not contain the instance type prefix
+      if (instanceId.startsWith(Helix.PREFIX_OF_CONTROLLER_INSTANCE) || 
instanceId.startsWith(
+          Helix.PREFIX_OF_BROKER_INSTANCE) || 
instanceId.startsWith(Helix.PREFIX_OF_MINION_INSTANCE)) {
+        return false;
+      }
+    }
     if ("false".equalsIgnoreCase(
         
instanceConfigZNRecord.getSimpleField(InstanceConfig.InstanceConfigProperty.HELIX_ENABLED.name())))
 {
       return false;
     }
-    if 
("true".equalsIgnoreCase(instanceConfigZNRecord.getSimpleField(CommonConstants.Helix.IS_SHUTDOWN_IN_PROGRESS)))
 {
+    if 
("true".equalsIgnoreCase(instanceConfigZNRecord.getSimpleField(Helix.IS_SHUTDOWN_IN_PROGRESS)))
 {
       return false;
     }
     //noinspection RedundantIfStatement
-    if 
("true".equalsIgnoreCase(instanceConfigZNRecord.getSimpleField(CommonConstants.Helix.QUERIES_DISABLED)))
 {
+    if 
("true".equalsIgnoreCase(instanceConfigZNRecord.getSimpleField(Helix.QUERIES_DISABLED)))
 {
       return false;
     }
     return true;
   }
 
+  /**
+   * Excludes a server from the routing.
+   */
+  public synchronized void excludeServerFromRouting(String instanceId) {
+    LOGGER.warn("Excluding server: {} from routing", instanceId);
+    if (!_excludedServers.add(instanceId)) {
+      LOGGER.info("Server: {} is already excluded from routing, skipping 
updating the routing", instanceId);
+      return;
+    }
+    if (!_routableServers.contains(instanceId)) {
+      LOGGER.info("Server: {} is not enabled, skipping updating the routing", 
instanceId);
+      return;
+    }
+
+    // Update routing entry for all tables
+    long startTimeMs = System.currentTimeMillis();
+    Set<String> routableServers = new HashSet<>(_routableServers);
+    routableServers.remove(instanceId);
+    _routableServers = routableServers;
+    List<String> changedServers = Collections.singletonList(instanceId);
+    for (RoutingEntry routingEntry : _routingEntryMap.values()) {
+      try {
+        routingEntry.onInstancesChange(_routableServers, changedServers);
+      } catch (Exception e) {
+        LOGGER.error("Caught unexpected exception while updating routing entry 
when excluding server: {} for table: {}",
+            instanceId, routingEntry.getTableNameWithType(), e);
+      }
+    }
+    LOGGER.info("Excluded server: {} from routing in {}ms (updated {} routing 
entries)", instanceId,
+        System.currentTimeMillis() - startTimeMs, _routingEntryMap.size());
+  }
+
+  /**
+   * Includes a previous excluded server to the routing.
+   */
+  public synchronized void includeServerToRouting(String instanceId) {
+    LOGGER.info("Including server: {} to routing", instanceId);
+    if (!_excludedServers.remove(instanceId)) {
+      LOGGER.info("Server: {} is not previously excluded, skipping updating 
the routing", instanceId);
+    }
+    if (!_enabledServerInstanceMap.containsKey(instanceId)) {
+      LOGGER.info("Server: {} is not enabled, skipping updating the routing", 
instanceId);
+      return;
+    }
+
+    // Update routing entry for all tables
+    long startTimeMs = System.currentTimeMillis();
+    Set<String> routableServers = new HashSet<>(_routableServers);
+    routableServers.add(instanceId);
+    _routableServers = routableServers;
+    List<String> changedServers = Collections.singletonList(instanceId);
+    for (RoutingEntry routingEntry : _routingEntryMap.values()) {
+      try {
+        routingEntry.onInstancesChange(_routableServers, changedServers);
+      } catch (Exception e) {
+        LOGGER.error("Caught unexpected exception while updating routing entry 
when including server: {} for table: {}",
+            instanceId, routingEntry.getTableNameWithType(), e);
+      }
+    }
+    LOGGER.info("Included server: {} to routing in {}ms (updated {} routing 
entries)", instanceId,

Review Comment:
   The durations are tracked for the purpose of observability, and it's 
confusing that different clocks are used. Let's create a `Timer` SPI so users 
can choose and get consistency. No need to fix that here and now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to