Jackie-Jiang commented on code in PR #8491:
URL: https://github.com/apache/pinot/pull/8491#discussion_r848983278


##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java:
##########
@@ -214,87 +219,178 @@ private void processInstanceConfigChange() {
     long startTimeMs = System.currentTimeMillis();
 
     List<ZNRecord> instanceConfigZNRecords =
-        _zkDataAccessor.getChildren(_instanceConfigsPath, null, 
AccessOption.PERSISTENT,
-            CommonConstants.Helix.ZkClient.RETRY_COUNT, 
CommonConstants.Helix.ZkClient.RETRY_INTERVAL_MS);
+        _zkDataAccessor.getChildren(_instanceConfigsPath, null, 
AccessOption.PERSISTENT, Helix.ZkClient.RETRY_COUNT,
+            Helix.ZkClient.RETRY_INTERVAL_MS);
     long fetchInstanceConfigsEndTimeMs = System.currentTimeMillis();
 
-    // Calculate new enabled and disabled instances
-    Set<String> enabledInstances = new HashSet<>();
-    List<String> newEnabledInstances = new ArrayList<>();
+    // Calculate new enabled and disabled servers
+    Set<String> enabledServers = new HashSet<>();
+    List<String> newEnabledServers = new ArrayList<>();
     for (ZNRecord instanceConfigZNRecord : instanceConfigZNRecords) {
-      String instance = instanceConfigZNRecord.getId();
-      if (isInstanceEnabled(instanceConfigZNRecord)) {
-        enabledInstances.add(instance);
+      String instanceId = instanceConfigZNRecord.getId();
+      if (isEnabledServer(instanceConfigZNRecord)) {
+        enabledServers.add(instanceId);
 
         // Always refresh the server instance with the latest instance config 
in case it changes
         ServerInstance serverInstance = new ServerInstance(new 
InstanceConfig(instanceConfigZNRecord));
-        if (_enabledServerInstanceMap.put(instance, serverInstance) == null) {
-          newEnabledInstances.add(instance);
+        if (_enabledServerInstanceMap.put(instanceId, serverInstance) == null) 
{
+          newEnabledServers.add(instanceId);
+
+          // NOTE: Remove new enabled server from excluded servers because the 
server is likely being restarted
+          if (_excludedServers.remove(instanceId)) {
+            LOGGER.info("Got excluded server: {} re-enabled, including it into 
the routing", instanceId);
+          }
         }
       }
     }
-    List<String> newDisabledInstances = new ArrayList<>();
+    List<String> newDisabledServers = new ArrayList<>();
     for (String instance : _enabledServerInstanceMap.keySet()) {
-      if (!enabledInstances.contains(instance)) {
-        newDisabledInstances.add(instance);
+      if (!enabledServers.contains(instance)) {
+        newDisabledServers.add(instance);
       }
     }
-    List<String> changedInstances = new ArrayList<>(newEnabledInstances.size() 
+ newDisabledInstances.size());
-    changedInstances.addAll(newEnabledInstances);
-    changedInstances.addAll(newDisabledInstances);
-    long calculateChangedInstancesEndTimeMs = System.currentTimeMillis();
-
-    // Early terminate if there is no instance changed
-    if (changedInstances.isEmpty()) {
-      LOGGER.info(
-          "Processed instance config change in {}ms (fetch {} instance 
configs: {}ms, calculate changed instances: "
-              + "{}ms) without instance change", 
calculateChangedInstancesEndTimeMs - startTimeMs,
-          instanceConfigZNRecords.size(), fetchInstanceConfigsEndTimeMs - 
startTimeMs,
-          calculateChangedInstancesEndTimeMs - fetchInstanceConfigsEndTimeMs);
+
+    // Calculate the routable servers and the changed routable servers
+    List<String> changedServers = new ArrayList<>(newEnabledServers.size() + 
newDisabledServers.size());
+    if (_excludedServers.isEmpty()) {
+      _routableServers = enabledServers;
+      changedServers.addAll(newEnabledServers);
+      changedServers.addAll(newDisabledServers);
+    } else {
+      enabledServers.removeAll(_excludedServers);
+      _routableServers = enabledServers;
+      // NOTE: All new enabled servers are routable
+      changedServers.addAll(newEnabledServers);
+      for (String newDisabledServer : newDisabledServers) {
+        if (_excludedServers.contains(newDisabledServer)) {
+          changedServers.add(newDisabledServer);
+        }
+      }
+    }
+    long calculateChangedServersEndTimeMs = System.currentTimeMillis();
+
+    // Early terminate if there is no changed servers
+    if (changedServers.isEmpty()) {
+      LOGGER.info("Processed instance config change in {}ms "
+              + "(fetch {} instance configs: {}ms, calculate changed servers: 
{}ms) without instance change",
+          calculateChangedServersEndTimeMs - startTimeMs, 
instanceConfigZNRecords.size(),
+          fetchInstanceConfigsEndTimeMs - startTimeMs,
+          calculateChangedServersEndTimeMs - fetchInstanceConfigsEndTimeMs);
       return;
     }
 
     // Update routing entry for all tables
     for (RoutingEntry routingEntry : _routingEntryMap.values()) {
       try {
-        routingEntry.onInstancesChange(enabledInstances, changedInstances);
+        routingEntry.onInstancesChange(_routableServers, changedServers);
       } catch (Exception e) {
         LOGGER.error("Caught unexpected exception while updating routing entry 
on instances change for table: {}",
             routingEntry.getTableNameWithType(), e);
       }
     }
     long updateRoutingEntriesEndTimeMs = System.currentTimeMillis();
 
-    // Remove new disabled instances from _enabledServerInstanceMap after 
updating all routing entries to ensure it
-    // always contains the selected instances
-    for (String newDisabledInstance : newDisabledInstances) {
+    // Remove new disabled servers from _enabledServerInstanceMap after 
updating all routing entries to ensure it
+    // always contains the selected servers
+    for (String newDisabledInstance : newDisabledServers) {
       _enabledServerInstanceMap.remove(newDisabledInstance);
     }
 
-    LOGGER.info(
-        "Processed instance config change in {}ms (fetch {} instance configs: 
{}ms, calculate changed instances: "
-            + "{}ms, update {} routing entries: {}ms), new enabled instances: 
{}, new disabled instances: {}",
+    LOGGER.info("Processed instance config change in {}ms "
+            + "(fetch {} instance configs: {}ms, calculate changed servers: 
{}ms, update {} routing entries: {}ms), "
+            + "new enabled servers: {}, new disabled servers: {}, excluded 
servers: {}",
         updateRoutingEntriesEndTimeMs - startTimeMs, 
instanceConfigZNRecords.size(),
-        fetchInstanceConfigsEndTimeMs - startTimeMs, 
calculateChangedInstancesEndTimeMs - fetchInstanceConfigsEndTimeMs,
-        _routingEntryMap.size(), updateRoutingEntriesEndTimeMs - 
calculateChangedInstancesEndTimeMs,
-        newEnabledInstances, newDisabledInstances);
+        fetchInstanceConfigsEndTimeMs - startTimeMs, 
calculateChangedServersEndTimeMs - fetchInstanceConfigsEndTimeMs,
+        _routingEntryMap.size(), updateRoutingEntriesEndTimeMs - 
calculateChangedServersEndTimeMs, newEnabledServers,
+        newDisabledServers, _excludedServers);
   }
 
-  private static boolean isInstanceEnabled(ZNRecord instanceConfigZNRecord) {
+  private static boolean isEnabledServer(ZNRecord instanceConfigZNRecord) {
+    String instanceId = instanceConfigZNRecord.getId();
+    if (!instanceId.startsWith(Helix.PREFIX_OF_SERVER_INSTANCE)) {
+      // NOTE: Some legacy configs might not contain the instance type prefix
+      if (instanceId.startsWith(Helix.PREFIX_OF_CONTROLLER_INSTANCE) || 
instanceId.startsWith(
+          Helix.PREFIX_OF_BROKER_INSTANCE) || 
instanceId.startsWith(Helix.PREFIX_OF_MINION_INSTANCE)) {
+        return false;
+      }
+    }
     if ("false".equalsIgnoreCase(
         
instanceConfigZNRecord.getSimpleField(InstanceConfig.InstanceConfigProperty.HELIX_ENABLED.name())))
 {
       return false;
     }
-    if 
("true".equalsIgnoreCase(instanceConfigZNRecord.getSimpleField(CommonConstants.Helix.IS_SHUTDOWN_IN_PROGRESS)))
 {
+    if 
("true".equalsIgnoreCase(instanceConfigZNRecord.getSimpleField(Helix.IS_SHUTDOWN_IN_PROGRESS)))
 {
       return false;
     }
     //noinspection RedundantIfStatement
-    if 
("true".equalsIgnoreCase(instanceConfigZNRecord.getSimpleField(CommonConstants.Helix.QUERIES_DISABLED)))
 {
+    if 
("true".equalsIgnoreCase(instanceConfigZNRecord.getSimpleField(Helix.QUERIES_DISABLED)))
 {
       return false;
     }
     return true;
   }
 
+  /**
+   * Excludes a server from the routing.
+   */
+  public synchronized void excludeServerFromRouting(String instanceId) {
+    LOGGER.warn("Excluding server: {} from routing", instanceId);
+    if (!_excludedServers.add(instanceId)) {
+      LOGGER.info("Server: {} is already excluded from routing, skipping 
updating the routing", instanceId);
+      return;
+    }
+    if (!_routableServers.contains(instanceId)) {
+      LOGGER.info("Server: {} is not enabled, skipping updating the routing", 
instanceId);
+      return;
+    }
+
+    // Update routing entry for all tables
+    long startTimeMs = System.currentTimeMillis();
+    Set<String> routableServers = new HashSet<>(_routableServers);
+    routableServers.remove(instanceId);
+    _routableServers = routableServers;
+    List<String> changedServers = Collections.singletonList(instanceId);
+    for (RoutingEntry routingEntry : _routingEntryMap.values()) {
+      try {
+        routingEntry.onInstancesChange(_routableServers, changedServers);
+      } catch (Exception e) {
+        LOGGER.error("Caught unexpected exception while updating routing entry 
when excluding server: {} for table: {}",
+            instanceId, routingEntry.getTableNameWithType(), e);
+      }
+    }
+    LOGGER.info("Excluded server: {} from routing in {}ms (updated {} routing 
entries)", instanceId,
+        System.currentTimeMillis() - startTimeMs, _routingEntryMap.size());
+  }
+
+  /**
+   * Includes a previous excluded server to the routing.
+   */
+  public synchronized void includeServerToRouting(String instanceId) {
+    LOGGER.info("Including server: {} to routing", instanceId);
+    if (!_excludedServers.remove(instanceId)) {
+      LOGGER.info("Server: {} is not previously excluded, skipping updating 
the routing", instanceId);
+    }
+    if (!_enabledServerInstanceMap.containsKey(instanceId)) {
+      LOGGER.info("Server: {} is not enabled, skipping updating the routing", 
instanceId);
+      return;
+    }
+
+    // Update routing entry for all tables
+    long startTimeMs = System.currentTimeMillis();
+    Set<String> routableServers = new HashSet<>(_routableServers);
+    routableServers.add(instanceId);
+    _routableServers = routableServers;
+    List<String> changedServers = Collections.singletonList(instanceId);
+    for (RoutingEntry routingEntry : _routingEntryMap.values()) {
+      try {
+        routingEntry.onInstancesChange(_routableServers, changedServers);
+      } catch (Exception e) {
+        LOGGER.error("Caught unexpected exception while updating routing entry 
when including server: {} for table: {}",
+            instanceId, routingEntry.getTableNameWithType(), e);
+      }
+    }
+    LOGGER.info("Included server: {} to routing in {}ms (updated {} routing 
entries)", instanceId,

Review Comment:
   My personal preference is to use nanotime to measure the duration, but there 
are some on-going discussion around which one should be favorable for 
performance perspective, and there is no conclusion yet. Since we already use 
millis in this class, I want to keep it consistent. Why would it cause 
incomparable durations? All the time we took here are in millis.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to