richardstartin commented on code in PR #8491: URL: https://github.com/apache/pinot/pull/8491#discussion_r849839949
########## pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java: ########## @@ -214,87 +219,178 @@ private void processInstanceConfigChange() { long startTimeMs = System.currentTimeMillis(); List<ZNRecord> instanceConfigZNRecords = - _zkDataAccessor.getChildren(_instanceConfigsPath, null, AccessOption.PERSISTENT, - CommonConstants.Helix.ZkClient.RETRY_COUNT, CommonConstants.Helix.ZkClient.RETRY_INTERVAL_MS); + _zkDataAccessor.getChildren(_instanceConfigsPath, null, AccessOption.PERSISTENT, Helix.ZkClient.RETRY_COUNT, + Helix.ZkClient.RETRY_INTERVAL_MS); long fetchInstanceConfigsEndTimeMs = System.currentTimeMillis(); - // Calculate new enabled and disabled instances - Set<String> enabledInstances = new HashSet<>(); - List<String> newEnabledInstances = new ArrayList<>(); + // Calculate new enabled and disabled servers + Set<String> enabledServers = new HashSet<>(); + List<String> newEnabledServers = new ArrayList<>(); for (ZNRecord instanceConfigZNRecord : instanceConfigZNRecords) { - String instance = instanceConfigZNRecord.getId(); - if (isInstanceEnabled(instanceConfigZNRecord)) { - enabledInstances.add(instance); + String instanceId = instanceConfigZNRecord.getId(); + if (isEnabledServer(instanceConfigZNRecord)) { + enabledServers.add(instanceId); // Always refresh the server instance with the latest instance config in case it changes ServerInstance serverInstance = new ServerInstance(new InstanceConfig(instanceConfigZNRecord)); - if (_enabledServerInstanceMap.put(instance, serverInstance) == null) { - newEnabledInstances.add(instance); + if (_enabledServerInstanceMap.put(instanceId, serverInstance) == null) { + newEnabledServers.add(instanceId); + + // NOTE: Remove new enabled server from excluded servers because the server is likely being restarted + if (_excludedServers.remove(instanceId)) { + LOGGER.info("Got excluded server: {} re-enabled, including it into the routing", instanceId); + } } } } - List<String> newDisabledInstances = new ArrayList<>(); + List<String> newDisabledServers = new ArrayList<>(); for (String instance : _enabledServerInstanceMap.keySet()) { - if (!enabledInstances.contains(instance)) { - newDisabledInstances.add(instance); + if (!enabledServers.contains(instance)) { + newDisabledServers.add(instance); } } - List<String> changedInstances = new ArrayList<>(newEnabledInstances.size() + newDisabledInstances.size()); - changedInstances.addAll(newEnabledInstances); - changedInstances.addAll(newDisabledInstances); - long calculateChangedInstancesEndTimeMs = System.currentTimeMillis(); - - // Early terminate if there is no instance changed - if (changedInstances.isEmpty()) { - LOGGER.info( - "Processed instance config change in {}ms (fetch {} instance configs: {}ms, calculate changed instances: " - + "{}ms) without instance change", calculateChangedInstancesEndTimeMs - startTimeMs, - instanceConfigZNRecords.size(), fetchInstanceConfigsEndTimeMs - startTimeMs, - calculateChangedInstancesEndTimeMs - fetchInstanceConfigsEndTimeMs); + + // Calculate the routable servers and the changed routable servers + List<String> changedServers = new ArrayList<>(newEnabledServers.size() + newDisabledServers.size()); + if (_excludedServers.isEmpty()) { + _routableServers = enabledServers; + changedServers.addAll(newEnabledServers); + changedServers.addAll(newDisabledServers); + } else { + enabledServers.removeAll(_excludedServers); + _routableServers = enabledServers; + // NOTE: All new enabled servers are routable + changedServers.addAll(newEnabledServers); + for (String newDisabledServer : newDisabledServers) { + if (_excludedServers.contains(newDisabledServer)) { + changedServers.add(newDisabledServer); + } + } + } + long calculateChangedServersEndTimeMs = System.currentTimeMillis(); + + // Early terminate if there is no changed servers + if (changedServers.isEmpty()) { + LOGGER.info("Processed instance config change in {}ms " + + "(fetch {} instance configs: {}ms, calculate changed servers: {}ms) without instance change", + calculateChangedServersEndTimeMs - startTimeMs, instanceConfigZNRecords.size(), + fetchInstanceConfigsEndTimeMs - startTimeMs, + calculateChangedServersEndTimeMs - fetchInstanceConfigsEndTimeMs); return; } // Update routing entry for all tables for (RoutingEntry routingEntry : _routingEntryMap.values()) { try { - routingEntry.onInstancesChange(enabledInstances, changedInstances); + routingEntry.onInstancesChange(_routableServers, changedServers); } catch (Exception e) { LOGGER.error("Caught unexpected exception while updating routing entry on instances change for table: {}", routingEntry.getTableNameWithType(), e); } } long updateRoutingEntriesEndTimeMs = System.currentTimeMillis(); - // Remove new disabled instances from _enabledServerInstanceMap after updating all routing entries to ensure it - // always contains the selected instances - for (String newDisabledInstance : newDisabledInstances) { + // Remove new disabled servers from _enabledServerInstanceMap after updating all routing entries to ensure it + // always contains the selected servers + for (String newDisabledInstance : newDisabledServers) { _enabledServerInstanceMap.remove(newDisabledInstance); } - LOGGER.info( - "Processed instance config change in {}ms (fetch {} instance configs: {}ms, calculate changed instances: " - + "{}ms, update {} routing entries: {}ms), new enabled instances: {}, new disabled instances: {}", + LOGGER.info("Processed instance config change in {}ms " + + "(fetch {} instance configs: {}ms, calculate changed servers: {}ms, update {} routing entries: {}ms), " + + "new enabled servers: {}, new disabled servers: {}, excluded servers: {}", updateRoutingEntriesEndTimeMs - startTimeMs, instanceConfigZNRecords.size(), - fetchInstanceConfigsEndTimeMs - startTimeMs, calculateChangedInstancesEndTimeMs - fetchInstanceConfigsEndTimeMs, - _routingEntryMap.size(), updateRoutingEntriesEndTimeMs - calculateChangedInstancesEndTimeMs, - newEnabledInstances, newDisabledInstances); + fetchInstanceConfigsEndTimeMs - startTimeMs, calculateChangedServersEndTimeMs - fetchInstanceConfigsEndTimeMs, + _routingEntryMap.size(), updateRoutingEntriesEndTimeMs - calculateChangedServersEndTimeMs, newEnabledServers, + newDisabledServers, _excludedServers); } - private static boolean isInstanceEnabled(ZNRecord instanceConfigZNRecord) { + private static boolean isEnabledServer(ZNRecord instanceConfigZNRecord) { + String instanceId = instanceConfigZNRecord.getId(); + if (!instanceId.startsWith(Helix.PREFIX_OF_SERVER_INSTANCE)) { + // NOTE: Some legacy configs might not contain the instance type prefix + if (instanceId.startsWith(Helix.PREFIX_OF_CONTROLLER_INSTANCE) || instanceId.startsWith( + Helix.PREFIX_OF_BROKER_INSTANCE) || instanceId.startsWith(Helix.PREFIX_OF_MINION_INSTANCE)) { + return false; + } + } if ("false".equalsIgnoreCase( instanceConfigZNRecord.getSimpleField(InstanceConfig.InstanceConfigProperty.HELIX_ENABLED.name()))) { return false; } - if ("true".equalsIgnoreCase(instanceConfigZNRecord.getSimpleField(CommonConstants.Helix.IS_SHUTDOWN_IN_PROGRESS))) { + if ("true".equalsIgnoreCase(instanceConfigZNRecord.getSimpleField(Helix.IS_SHUTDOWN_IN_PROGRESS))) { return false; } //noinspection RedundantIfStatement - if ("true".equalsIgnoreCase(instanceConfigZNRecord.getSimpleField(CommonConstants.Helix.QUERIES_DISABLED))) { + if ("true".equalsIgnoreCase(instanceConfigZNRecord.getSimpleField(Helix.QUERIES_DISABLED))) { return false; } return true; } + /** + * Excludes a server from the routing. + */ + public synchronized void excludeServerFromRouting(String instanceId) { + LOGGER.warn("Excluding server: {} from routing", instanceId); + if (!_excludedServers.add(instanceId)) { + LOGGER.info("Server: {} is already excluded from routing, skipping updating the routing", instanceId); + return; + } + if (!_routableServers.contains(instanceId)) { + LOGGER.info("Server: {} is not enabled, skipping updating the routing", instanceId); + return; + } + + // Update routing entry for all tables + long startTimeMs = System.currentTimeMillis(); + Set<String> routableServers = new HashSet<>(_routableServers); + routableServers.remove(instanceId); + _routableServers = routableServers; + List<String> changedServers = Collections.singletonList(instanceId); + for (RoutingEntry routingEntry : _routingEntryMap.values()) { + try { + routingEntry.onInstancesChange(_routableServers, changedServers); + } catch (Exception e) { + LOGGER.error("Caught unexpected exception while updating routing entry when excluding server: {} for table: {}", + instanceId, routingEntry.getTableNameWithType(), e); + } + } + LOGGER.info("Excluded server: {} from routing in {}ms (updated {} routing entries)", instanceId, + System.currentTimeMillis() - startTimeMs, _routingEntryMap.size()); + } + + /** + * Includes a previous excluded server to the routing. + */ + public synchronized void includeServerToRouting(String instanceId) { + LOGGER.info("Including server: {} to routing", instanceId); + if (!_excludedServers.remove(instanceId)) { + LOGGER.info("Server: {} is not previously excluded, skipping updating the routing", instanceId); + } + if (!_enabledServerInstanceMap.containsKey(instanceId)) { + LOGGER.info("Server: {} is not enabled, skipping updating the routing", instanceId); + return; + } + + // Update routing entry for all tables + long startTimeMs = System.currentTimeMillis(); + Set<String> routableServers = new HashSet<>(_routableServers); + routableServers.add(instanceId); + _routableServers = routableServers; + List<String> changedServers = Collections.singletonList(instanceId); + for (RoutingEntry routingEntry : _routingEntryMap.values()) { + try { + routingEntry.onInstancesChange(_routableServers, changedServers); + } catch (Exception e) { + LOGGER.error("Caught unexpected exception while updating routing entry when including server: {} for table: {}", + instanceId, routingEntry.getTableNameWithType(), e); + } + } + LOGGER.info("Included server: {} to routing in {}ms (updated {} routing entries)", instanceId, Review Comment: The durations are tracked for the purpose of observability, and it's confusing that different clocks are used. Let's create a `Timer` SPI so users can choose and get consistency. No need to fix that here and now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org