mimaison commented on code in PR #21302: URL: https://github.com/apache/kafka/pull/21302#discussion_r2712126084
########## server/src/main/java/org/apache/kafka/server/config/DynamicBrokerConfig.java: ########## @@ -0,0 +1,288 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.config; + +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.config.SaslConfigs; +import org.apache.kafka.common.config.SslConfigs; +import org.apache.kafka.common.config.internals.BrokerSecurityConfigs; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.coordinator.group.GroupCoordinatorConfig; +import org.apache.kafka.coordinator.share.ShareCoordinatorConfig; +import org.apache.kafka.coordinator.transaction.TransactionLogConfig; +import org.apache.kafka.network.SocketServer; +import org.apache.kafka.network.SocketServerConfigs; +import org.apache.kafka.server.DynamicThreadPool; +import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig; +import org.apache.kafka.server.metrics.MetricConfigs; +import org.apache.kafka.storage.internals.log.LogCleaner; + +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import java.util.Set; +import java.util.function.Function; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +public class DynamicBrokerConfig { + + public static final Set<String> DYNAMIC_SECURITY_CONFIGS = SslConfigs.RECONFIGURABLE_CONFIGS; + + private static final Set<String> DYNAMIC_PRODUCER_STATE_MANAGER_CONFIGS = Set.of( + TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_CONFIG, + TransactionLogConfig.TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG); + + private static final Set<String> CLUSTER_LEVEL_LISTENER_CONFIGS = Set.of( + SocketServerConfigs.MAX_CONNECTIONS_CONFIG, + SocketServerConfigs.MAX_CONNECTION_CREATION_RATE_CONFIG, + SocketServerConfigs.NUM_NETWORK_THREADS_CONFIG); + + private static final Set<String> PER_BROKER_CONFIGS = Stream.of( + DYNAMIC_SECURITY_CONFIGS, + DynamicListenerConfig.RECONFIGURABLE_CONFIGS) + .flatMap(Collection::stream) + .filter(c -> !CLUSTER_LEVEL_LISTENER_CONFIGS.contains(c)) + .collect(Collectors.toUnmodifiableSet()); + + public static final Set<String> ALL_DYNAMIC_CONFIGS = Stream.of( + DYNAMIC_SECURITY_CONFIGS, + LogCleaner.RECONFIGURABLE_CONFIGS, + DynamicLogConfig.RECONFIGURABLE_CONFIGS, + DynamicThreadPool.RECONFIGURABLE_CONFIGS, + List.of(MetricConfigs.METRIC_REPORTER_CLASSES_CONFIG), + DynamicListenerConfig.RECONFIGURABLE_CONFIGS, + SocketServer.RECONFIGURABLE_CONFIGS, + DYNAMIC_PRODUCER_STATE_MANAGER_CONFIGS, + DynamicRemoteLogConfig.RECONFIGURABLE_CONFIGS, + DynamicReplicationConfig.RECONFIGURABLE_CONFIGS, + List.of(AbstractConfig.CONFIG_PROVIDERS_CONFIG), + GroupCoordinatorConfig.RECONFIGURABLE_CONFIGS, + ShareCoordinatorConfig.RECONFIGURABLE_CONFIGS) + .flatMap(Collection::stream) + .collect(Collectors.toUnmodifiableSet()); + + private static final Set<String> LISTENER_MECHANISM_CONFIGS = Set.of( + SaslConfigs.SASL_JAAS_CONFIG, + SaslConfigs.SASL_LOGIN_CALLBACK_HANDLER_CLASS, + SaslConfigs.SASL_LOGIN_CLASS, + BrokerSecurityConfigs.SASL_SERVER_CALLBACK_HANDLER_CLASS_CONFIG, + BrokerSecurityConfigs.CONNECTIONS_MAX_REAUTH_MS_CONFIG); + + private static final Pattern LISTENER_CONFIG_REGEX = Pattern.compile("listener\\.name\\.[^.]*\\.(.*)"); + + public static List<String> brokerConfigSynonyms(String name, boolean matchListenerOverride) { + List<String> logRollConfigs = List.of(ServerLogConfigs.LOG_ROLL_TIME_MILLIS_CONFIG, ServerLogConfigs.LOG_ROLL_TIME_HOURS_CONFIG); + List<String> logRollJitterConfigs = List.of(ServerLogConfigs.LOG_ROLL_TIME_JITTER_MILLIS_CONFIG, ServerLogConfigs.LOG_ROLL_TIME_JITTER_HOURS_CONFIG); + List<String> logRetentionConfigs = List.of(ServerLogConfigs.LOG_RETENTION_TIME_MILLIS_CONFIG, ServerLogConfigs.LOG_RETENTION_TIME_MINUTES_CONFIG, ServerLogConfigs.LOG_RETENTION_TIME_HOURS_CONFIG); + List<String> logFlushConfigs = List.of(ServerLogConfigs.LOG_FLUSH_INTERVAL_MS_CONFIG, ServerLogConfigs.LOG_FLUSH_SCHEDULER_INTERVAL_MS_CONFIG); + if (logRollConfigs.contains(name)) { + return logRollConfigs; + } else if (logRollJitterConfigs.contains(name)) { + return logRollJitterConfigs; + } else if (ServerLogConfigs.LOG_FLUSH_INTERVAL_MS_CONFIG.equals(name)) { // KafkaLogConfigs.LOG_FLUSH_SCHEDULER_INTERVAL_MS_CONFIG is used as default + return logFlushConfigs; + } else if (logRetentionConfigs.contains(name)) { + return logRetentionConfigs; + } else if (matchListenerOverride) { + Matcher matcher = LISTENER_CONFIG_REGEX.matcher(name); + if (matcher.matches()) { + String baseName = matcher.group(1); + // `ListenerMechanismConfigs` are specified as listenerPrefix.mechanism.<configName> + // and other listener configs are specified as listenerPrefix.<configName> + // Add <configName> as a synonym in both cases. + Optional<String> mechanismConfig = LISTENER_MECHANISM_CONFIGS.stream().filter(baseName::endsWith).findFirst(); + return List.of(name, mechanismConfig.orElse(baseName)); + } + } + return List.of(name); + } + + private static void checkInvalidProps(Set<String> invalidPropNames, String errorMessage) { + if (!invalidPropNames.isEmpty()) { + throw new ConfigException(errorMessage + ": " + invalidPropNames); + } + } + + public static void validateConfigs(Properties props, boolean perBrokerConfig) { + checkInvalidProps(nonDynamicConfigs(props), "Cannot update these configs dynamically"); + checkInvalidProps(securityConfigsWithoutListenerPrefix(props), + "These security configs can be dynamically updated only per-listener using the listener prefix"); + validateConfigTypes(props); + if (!perBrokerConfig) { + checkInvalidProps(perBrokerConfigs(props), + "Cannot update these configs at default cluster level, broker id must be specified"); + } + } + + public static Set<String> securityConfigsWithoutListenerPrefix(Properties props) { + return DYNAMIC_SECURITY_CONFIGS.stream().filter(props::containsKey).collect(Collectors.toSet()); + } + + public static void validateConfigTypes(Properties props) { + Properties baseProps = new Properties(); + props.forEach((name, value) -> { + Matcher matcher = LISTENER_CONFIG_REGEX.matcher((String) name); + if (matcher.matches()) { + String baseName = matcher.group(1); + baseProps.put(baseName, value); + } else { + baseProps.put(name, value); + } + }); + DynamicConfig.Broker.validate(baseProps); + } + + public static Set<String> perBrokerConfigs(Properties props) { + Set<String> configNames = props.stringPropertyNames(); + Set<String> perBrokerConfigs = new HashSet<>(); + for (String name : configNames) { + if (PER_BROKER_CONFIGS.contains(name)) { + perBrokerConfigs.add(name); + } else { + Matcher matcher = LISTENER_CONFIG_REGEX.matcher(name); + if (matcher.matches()) { + String baseName = matcher.group(1); + if (!CLUSTER_LEVEL_LISTENER_CONFIGS.contains(baseName)) { + perBrokerConfigs.add(name); + } + } + } + } + return perBrokerConfigs; + } + + public static Set<String> nonDynamicConfigs(Properties props) { + Set<String> nonDynamicConfigs = new HashSet<>(props.stringPropertyNames()); + nonDynamicConfigs.retainAll(DynamicConfig.Broker.nonDynamicProps()); + return nonDynamicConfigs; + } + + public static Properties resolveVariableConfigs(Properties propsOriginal) { + Properties props = new Properties(); + AbstractConfig config = new AbstractConfig(new ConfigDef(), propsOriginal, Utils.castToStringObjectMap(propsOriginal), false); + config.originals().forEach((key, value) -> { + if (!key.startsWith(AbstractConfig.CONFIG_PROVIDERS_CONFIG)) { + props.put(key, value); + } + }); + return props; + } + + public static Map<String, String> dynamicConfigUpdateModes() { + return ALL_DYNAMIC_CONFIGS.stream().collect(Collectors.toMap( + Function.identity(), + name -> PER_BROKER_CONFIGS.contains(name) ? "per-broker" : "cluster-wide" + ) + ); + } + + public static class DynamicLogConfig { + /** + * The broker configurations pertaining to logs that are reconfigurable. This set contains + * the names you would use when setting a static or dynamic broker configuration (not topic + * configuration). + */ + public static final Set<String> RECONFIGURABLE_CONFIGS = Set.copyOf( + ServerTopicConfigSynonyms.TOPIC_CONFIG_SYNONYMS.values()); + } + + public static class DynamicListenerConfig { + /** + * The set of configurations which the DynamicListenerConfig object listens for. Many of + * these are also monitored by other objects such as ChannelBuilders and SocketServers. + */ + public static final Set<String> RECONFIGURABLE_CONFIGS; + + static { Review Comment: Good idea, done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
