This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 58b56c213e0 KAFKA-15853: [1/N] Move static methods, SocketServer and
Log configs to AbstractKafkaConfig (#21548)
58b56c213e0 is described below
commit 58b56c213e03af9b473a25546f97b007cda51ffa
Author: Christo Lolov <[email protected]>
AuthorDate: Fri Mar 27 02:15:14 2026 +0000
KAFKA-15853: [1/N] Move static methods, SocketServer and Log configs to
AbstractKafkaConfig (#21548)
This is a gradual migration of KafkaConfig to Java for easier reviews.
Moved the static methods to AbstractKafkaConfig and the methods used to
access SocketServer and Log configs.
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../main/scala/kafka/network/RequestChannel.scala | 6 +-
.../main/scala/kafka/network/SocketServer.scala | 8 +-
.../scala/kafka/server/ConfigAdminManager.scala | 3 +-
core/src/main/scala/kafka/server/KafkaConfig.scala | 129 +---------
.../scala/unit/kafka/server/KafkaConfigTest.scala | 2 +-
.../kafka/server/config/AbstractKafkaConfig.java | 277 +++++++++++++++++++++
6 files changed, 296 insertions(+), 129 deletions(-)
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala
b/core/src/main/scala/kafka/network/RequestChannel.scala
index de0a9d93173..25155145b5a 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -22,7 +22,6 @@ import java.util.concurrent._
import com.fasterxml.jackson.databind.JsonNode
import com.typesafe.scalalogging.Logger
import kafka.network
-import kafka.server.KafkaConfig
import kafka.utils.Logging
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.memory.MemoryPool
@@ -35,6 +34,7 @@ import
org.apache.kafka.network.metrics.{RequestChannelMetrics, RequestMetrics}
import org.apache.kafka.server.common.RequestLocal
import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.network.{RequestConvertToJson, Session}
+import org.apache.kafka.server.config.AbstractKafkaConfig
import scala.jdk.CollectionConverters._
import scala.jdk.OptionConverters.RichOption
@@ -170,7 +170,7 @@ object RequestChannel extends Logging {
newData.resources().forEach(resource => {
val resourceType =
ConfigResource.Type.forId(resource.resourceType())
resource.configs().forEach(config => {
- config.setValue(KafkaConfig.loggableValue(resourceType,
config.name(), config.value()))
+ config.setValue(AbstractKafkaConfig.loggableValue(resourceType,
config.name(), config.value()))
})
})
new AlterConfigsRequest(newData, alterConfigs.version())
@@ -180,7 +180,7 @@ object RequestChannel extends Logging {
newData.resources().forEach(resource => {
val resourceType =
ConfigResource.Type.forId(resource.resourceType())
resource.configs().forEach(config => {
- config.setValue(KafkaConfig.loggableValue(resourceType,
config.name(), config.value()))
+ config.setValue(AbstractKafkaConfig.loggableValue(resourceType,
config.name(), config.value()))
})
})
new
IncrementalAlterConfigsRequest.Builder(newData).build(alterConfigs.version())
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala
b/core/src/main/scala/kafka/network/SocketServer.scala
index cb7dc5ad92a..590be14e2dd 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -324,8 +324,8 @@ class SocketServer(
}
val maxConnectionsPerIpOverrides = newConfig.maxConnectionsPerIpOverrides
if (maxConnectionsPerIpOverrides !=
oldConfig.maxConnectionsPerIpOverrides) {
- info(s"Updating maxConnectionsPerIpOverrides:
${maxConnectionsPerIpOverrides.map { case (k, v) => s"$k=$v" }.mkString(",")}")
-
connectionQuotas.updateMaxConnectionsPerIpOverride(maxConnectionsPerIpOverrides)
+ info(s"Updating maxConnectionsPerIpOverrides:
${maxConnectionsPerIpOverrides.asScala.map { case (k, v) => s"$k=$v"
}.mkString(",")}")
+
connectionQuotas.updateMaxConnectionsPerIpOverride(maxConnectionsPerIpOverrides.asScala.map
{ case (k, v) => (k, v.intValue()) }.toMap)
}
val maxConnections = newConfig.maxConnections
if (maxConnections != oldConfig.maxConnections) {
@@ -1275,7 +1275,7 @@ private[kafka] class Processor(
class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics)
extends Logging with AutoCloseable {
@volatile private var defaultMaxConnectionsPerIp: Int =
config.maxConnectionsPerIp
- @volatile private var maxConnectionsPerIpOverrides =
config.maxConnectionsPerIpOverrides.map { case (host, count) =>
(InetAddress.getByName(host), count) }
+ @volatile private var maxConnectionsPerIpOverrides =
config.maxConnectionsPerIpOverrides.asScala.map { case (host, count) =>
(InetAddress.getByName(host), count.intValue()) }.toMap
@volatile private var brokerMaxConnections = config.maxConnections
private val interBrokerListenerName = config.interBrokerListenerName
private val counts = mutable.Map[InetAddress, Int]()
@@ -1313,7 +1313,7 @@ class ConnectionQuotas(config: KafkaConfig, time: Time,
metrics: Metrics) extend
}
private[network] def updateMaxConnectionsPerIpOverride(overrideQuotas:
Map[String, Int]): Unit = {
- maxConnectionsPerIpOverrides = overrideQuotas.map { case (host, count) =>
(InetAddress.getByName(host), count) }
+ maxConnectionsPerIpOverrides = overrideQuotas.map { case (host, count) =>
(InetAddress.getByName(host), count) }.toMap
}
private[network] def updateBrokerMaxConnections(maxConnections: Int): Unit =
{
diff --git a/core/src/main/scala/kafka/server/ConfigAdminManager.scala
b/core/src/main/scala/kafka/server/ConfigAdminManager.scala
index f86af7d37af..1b021531601 100644
--- a/core/src/main/scala/kafka/server/ConfigAdminManager.scala
+++ b/core/src/main/scala/kafka/server/ConfigAdminManager.scala
@@ -36,6 +36,7 @@ import
org.apache.kafka.common.protocol.Errors.{INVALID_REQUEST, UNKNOWN_SERVER_
import org.apache.kafka.common.requests.ApiError
import org.apache.kafka.common.resource.{Resource, ResourceType}
import org.apache.kafka.metadata.ConfigRepository
+import org.apache.kafka.server.config.AbstractKafkaConfig
import org.slf4j.{Logger, LoggerFactory}
import scala.collection.{Map, Seq}
@@ -400,7 +401,7 @@ object ConfigAdminManager {
*/
def toLoggableProps(resource: ConfigResource, configProps: Properties):
Map[String, String] = {
configProps.asScala.map {
- case (key, value) => (key, KafkaConfig.loggableValue(resource.`type`,
key, value))
+ case (key, value) => (key,
AbstractKafkaConfig.loggableValue(resource.`type`, key, value))
}
}
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 1bdfbb5b37c..c5075d5bd99 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -23,35 +23,31 @@ import java.util.Properties
import kafka.utils.Logging
import kafka.utils.Implicits._
import org.apache.kafka.common.{Endpoint, Reconfigurable}
-import org.apache.kafka.common.config.{ConfigDef, ConfigException,
ConfigResource, TopicConfig}
+import org.apache.kafka.common.config.{ConfigDef, ConfigException, TopicConfig}
import org.apache.kafka.common.config.ConfigDef.ConfigKey
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
-import org.apache.kafka.common.config.types.Password
import org.apache.kafka.common.internals.Plugin
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network.ListenerName
-import org.apache.kafka.common.record.TimestampType
import org.apache.kafka.common.security.auth.KafkaPrincipalSerde
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.coordinator.group.Group.GroupType
import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig
-import org.apache.kafka.coordinator.group.{GroupConfig, GroupCoordinatorConfig}
+import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
import org.apache.kafka.coordinator.share.ShareCoordinatorConfig
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.raft.{KRaftConfigs, MetadataLogConfig, QuorumConfig}
import org.apache.kafka.security.authorizer.AuthorizerUtils
import org.apache.kafka.server.ProcessRole
import org.apache.kafka.server.authorizer.Authorizer
-import org.apache.kafka.server.config.AbstractKafkaConfig.getMap
-import org.apache.kafka.server.config.{AbstractKafkaConfig, DynamicConfig,
QuotaConfig, ReplicationConfigs, ServerConfigs, ServerLogConfigs,
DynamicBrokerConfig => JDynamicBrokerConfig}
+import org.apache.kafka.server.config.{AbstractKafkaConfig, QuotaConfig,
ReplicationConfigs, ServerConfigs, ServerLogConfigs, DynamicBrokerConfig =>
JDynamicBrokerConfig}
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
import org.apache.kafka.server.metrics.MetricConfigs
-import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig}
import scala.jdk.CollectionConverters._
import scala.collection.{Map, Seq}
-import scala.jdk.OptionConverters.RichOptional
+import scala.jdk.OptionConverters.{RichOption, RichOptional}
object KafkaConfig {
@@ -92,49 +88,11 @@ object KafkaConfig {
def apply(props: java.util.Map[_, _], doLog: Boolean = true): KafkaConfig =
new KafkaConfig(props, doLog)
- private def typeOf(name: String): Option[ConfigDef.Type] =
Option(configDef.configKeys.get(name)).map(_.`type`)
-
- def configType(configName: String): Option[ConfigDef.Type] = {
- val configType = configTypeExact(configName)
- if (configType.isDefined) {
- return configType
- }
- typeOf(configName) match {
- case Some(t) => Some(t)
- case None =>
- JDynamicBrokerConfig.brokerConfigSynonyms(configName,
true).asScala.flatMap(typeOf).headOption
- }
- }
-
- private def configTypeExact(exactName: String): Option[ConfigDef.Type] = {
- val configType = typeOf(exactName).orNull
- if (configType != null) {
- Some(configType)
- } else {
- val configKey = DynamicConfig.Broker.configKeys.get(exactName)
- if (configKey != null) {
- Some(configKey.`type`)
- } else {
- None
- }
- }
- }
+ def configType(configName: String): Option[ConfigDef.Type] =
+ AbstractKafkaConfig.configType(configName).toScala
def maybeSensitive(configType: Option[ConfigDef.Type]): Boolean = {
- // If we can't determine the config entry type, treat it as a sensitive
config to be safe
- configType.isEmpty || configType.contains(ConfigDef.Type.PASSWORD)
- }
-
- def loggableValue(resourceType: ConfigResource.Type, name: String, value:
String): String = {
- val maybeSensitive = resourceType match {
- case ConfigResource.Type.BROKER =>
KafkaConfig.maybeSensitive(KafkaConfig.configType(name))
- case ConfigResource.Type.TOPIC =>
KafkaConfig.maybeSensitive(LogConfig.configType(name).toScala)
- case ConfigResource.Type.GROUP =>
KafkaConfig.maybeSensitive(GroupConfig.configType(name).toScala)
- case ConfigResource.Type.BROKER_LOGGER => false
- case ConfigResource.Type.CLIENT_METRICS => false
- case _ => true
- }
- if (maybeSensitive) Password.HIDDEN else value
+ AbstractKafkaConfig.maybeSensitive(configType.toJava)
}
}
@@ -233,8 +191,6 @@ class KafkaConfig private(doLog: Boolean, val props:
util.Map[_, _])
val serverMaxStartupTimeMs =
getLong(KRaftConfigs.SERVER_MAX_STARTUP_TIME_MS_CONFIG)
def messageMaxBytes = getInt(ServerConfigs.MESSAGE_MAX_BYTES_CONFIG)
- val connectionSetupTimeoutMs =
getLong(ServerConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG)
- val connectionSetupTimeoutMaxMs =
getLong(ServerConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG)
def getNumReplicaAlterLogDirsThreads: Int = {
val numThreads: Integer =
Option(getInt(ServerConfigs.NUM_REPLICA_ALTER_LOG_DIRS_THREADS_CONFIG)).getOrElse(logDirs.size)
@@ -276,61 +232,9 @@ class KafkaConfig private(doLog: Boolean, val props:
util.Map[_, _])
}
}
- /** ********* Socket Server Configuration ***********/
- val socketSendBufferBytes =
getInt(SocketServerConfigs.SOCKET_SEND_BUFFER_BYTES_CONFIG)
- val socketReceiveBufferBytes =
getInt(SocketServerConfigs.SOCKET_RECEIVE_BUFFER_BYTES_CONFIG)
- val socketRequestMaxBytes =
getInt(SocketServerConfigs.SOCKET_REQUEST_MAX_BYTES_CONFIG)
- val socketListenBacklogSize =
getInt(SocketServerConfigs.SOCKET_LISTEN_BACKLOG_SIZE_CONFIG)
- def maxConnectionsPerIp =
getInt(SocketServerConfigs.MAX_CONNECTIONS_PER_IP_CONFIG)
- def maxConnectionsPerIpOverrides: Map[String, Int] =
- getMap(SocketServerConfigs.MAX_CONNECTIONS_PER_IP_OVERRIDES_CONFIG,
getString(SocketServerConfigs.MAX_CONNECTIONS_PER_IP_OVERRIDES_CONFIG)).asScala.map
{ case (k, v) => (k, v.toInt)}
- def maxConnections = getInt(SocketServerConfigs.MAX_CONNECTIONS_CONFIG)
- def maxConnectionCreationRate =
getInt(SocketServerConfigs.MAX_CONNECTION_CREATION_RATE_CONFIG)
- val connectionsMaxIdleMs =
getLong(SocketServerConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG)
- val failedAuthenticationDelayMs =
getInt(SocketServerConfigs.FAILED_AUTHENTICATION_DELAY_MS_CONFIG)
- val queuedMaxRequests =
getInt(SocketServerConfigs.QUEUED_MAX_REQUESTS_CONFIG)
- val queuedMaxBytes = getLong(SocketServerConfigs.QUEUED_MAX_BYTES_CONFIG)
- def numNetworkThreads =
getInt(SocketServerConfigs.NUM_NETWORK_THREADS_CONFIG)
-
/***************** rack configuration **************/
val replicaSelectorClassName =
Option(getString(ReplicationConfigs.REPLICA_SELECTOR_CLASS_CONFIG))
- /** ********* Log Configuration ***********/
- val autoCreateTopicsEnable =
getBoolean(ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG)
- val numPartitions = getInt(ServerLogConfigs.NUM_PARTITIONS_CONFIG)
- def logSegmentBytes = getInt(ServerLogConfigs.LOG_SEGMENT_BYTES_CONFIG)
- def logFlushIntervalMessages =
getLong(ServerLogConfigs.LOG_FLUSH_INTERVAL_MESSAGES_CONFIG)
- def logCleanerThreads = getInt(CleanerConfig.LOG_CLEANER_THREADS_PROP)
- val logFlushSchedulerIntervalMs =
getLong(ServerLogConfigs.LOG_FLUSH_SCHEDULER_INTERVAL_MS_CONFIG)
- val logFlushOffsetCheckpointIntervalMs =
getInt(ServerLogConfigs.LOG_FLUSH_OFFSET_CHECKPOINT_INTERVAL_MS_CONFIG).toLong
- val logFlushStartOffsetCheckpointIntervalMs =
getInt(ServerLogConfigs.LOG_FLUSH_START_OFFSET_CHECKPOINT_INTERVAL_MS_CONFIG).toLong
- val logCleanupIntervalMs =
getLong(ServerLogConfigs.LOG_CLEANUP_INTERVAL_MS_CONFIG)
- def logCleanupPolicy = getList(ServerLogConfigs.LOG_CLEANUP_POLICY_CONFIG)
-
- def logRetentionBytes = getLong(ServerLogConfigs.LOG_RETENTION_BYTES_CONFIG)
- def logCleanerDedupeBufferSize =
getLong(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP)
- def logCleanerDeleteRetentionMs =
getLong(CleanerConfig.LOG_CLEANER_DELETE_RETENTION_MS_PROP)
- def logCleanerMinCompactionLagMs =
getLong(CleanerConfig.LOG_CLEANER_MIN_COMPACTION_LAG_MS_PROP)
- def logCleanerMaxCompactionLagMs =
getLong(CleanerConfig.LOG_CLEANER_MAX_COMPACTION_LAG_MS_PROP)
- def logCleanerMinCleanRatio =
getDouble(CleanerConfig.LOG_CLEANER_MIN_CLEAN_RATIO_PROP)
- def logIndexSizeMaxBytes =
getInt(ServerLogConfigs.LOG_INDEX_SIZE_MAX_BYTES_CONFIG)
- def logIndexIntervalBytes =
getInt(ServerLogConfigs.LOG_INDEX_INTERVAL_BYTES_CONFIG)
- def logDeleteDelayMs = getLong(ServerLogConfigs.LOG_DELETE_DELAY_MS_CONFIG)
- def logRollTimeMillis: java.lang.Long =
Option(getLong(ServerLogConfigs.LOG_ROLL_TIME_MILLIS_CONFIG)).getOrElse(60 * 60
* 1000L * getInt(ServerLogConfigs.LOG_ROLL_TIME_HOURS_CONFIG))
- def logRollTimeJitterMillis: java.lang.Long =
Option(getLong(ServerLogConfigs.LOG_ROLL_TIME_JITTER_MILLIS_CONFIG)).getOrElse(60
* 60 * 1000L * getInt(ServerLogConfigs.LOG_ROLL_TIME_JITTER_HOURS_CONFIG))
- def logFlushIntervalMs: java.lang.Long =
Option(getLong(ServerLogConfigs.LOG_FLUSH_INTERVAL_MS_CONFIG)).getOrElse(getLong(ServerLogConfigs.LOG_FLUSH_SCHEDULER_INTERVAL_MS_CONFIG))
- def minInSyncReplicas = getInt(ServerLogConfigs.MIN_IN_SYNC_REPLICAS_CONFIG)
- def logPreAllocateEnable: java.lang.Boolean =
getBoolean(ServerLogConfigs.LOG_PRE_ALLOCATE_CONFIG)
- def logInitialTaskDelayMs: java.lang.Long =
Option(getLong(ServerLogConfigs.LOG_INITIAL_TASK_DELAY_MS_CONFIG)).getOrElse(ServerLogConfigs.LOG_INITIAL_TASK_DELAY_MS_DEFAULT)
-
- def logMessageTimestampType =
TimestampType.forName(getString(ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_TYPE_CONFIG))
-
- def logMessageTimestampBeforeMaxMs: Long =
getLong(ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_BEFORE_MAX_MS_CONFIG)
-
- def logMessageTimestampAfterMaxMs: Long =
getLong(ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG)
-
- def logDirFailureTimeoutMs: Long =
getLong(ServerLogConfigs.LOG_DIR_FAILURE_TIMEOUT_MS_CONFIG)
-
/** ********* Replication configuration ***********/
val controllerSocketTimeoutMs: Int =
getInt(ReplicationConfigs.CONTROLLER_SOCKET_TIMEOUT_MS_CONFIG)
val defaultReplicationFactor: Int =
getInt(ReplicationConfigs.DEFAULT_REPLICATION_FACTOR_CONFIG)
@@ -433,21 +337,6 @@ class KafkaConfig private(doLog: Boolean, val props:
util.Map[_, _])
dynamicConfig.removeReconfigurable(reconfigurable)
}
- def logRetentionTimeMillis: Long = {
- val millisInMinute = 60L * 1000L
- val millisInHour = 60L * millisInMinute
-
- val millis: java.lang.Long =
-
Option(getLong(ServerLogConfigs.LOG_RETENTION_TIME_MILLIS_CONFIG)).getOrElse(
- Option(getInt(ServerLogConfigs.LOG_RETENTION_TIME_MINUTES_CONFIG))
match {
- case Some(mins) => millisInMinute * mins
- case None =>
getInt(ServerLogConfigs.LOG_RETENTION_TIME_HOURS_CONFIG) * millisInHour
- })
-
- if (millis < 0) return -1
- millis
- }
-
def listeners: Seq[Endpoint] =
AbstractKafkaConfig.listenerListToEndPoints(getList(SocketServerConfigs.LISTENERS_CONFIG),
effectiveListenerSecurityProtocolMap).asScala
@@ -663,10 +552,10 @@ class KafkaConfig private(doLog: Boolean, val props:
util.Map[_, _])
s"${SocketServerConfigs.QUEUED_MAX_BYTES_CONFIG} must be larger or equal
to ${SocketServerConfigs.SOCKET_REQUEST_MAX_BYTES_CONFIG}")
if (maxConnectionsPerIp == 0)
- require(maxConnectionsPerIpOverrides.nonEmpty,
s"${SocketServerConfigs.MAX_CONNECTIONS_PER_IP_CONFIG} can be set to zero only
if" +
+ require(!maxConnectionsPerIpOverrides.isEmpty,
s"${SocketServerConfigs.MAX_CONNECTIONS_PER_IP_CONFIG} can be set to zero only
if" +
s" ${SocketServerConfigs.MAX_CONNECTIONS_PER_IP_OVERRIDES_CONFIG}
property is set.")
- val invalidAddresses = maxConnectionsPerIpOverrides.keys.filterNot(address
=> Utils.validHostPattern(address))
+ val invalidAddresses =
maxConnectionsPerIpOverrides.asScala.keys.filterNot(address =>
Utils.validHostPattern(address))
if (invalidAddresses.nonEmpty)
throw new
IllegalArgumentException(s"${SocketServerConfigs.MAX_CONNECTIONS_PER_IP_OVERRIDES_CONFIG}
contains invalid addresses : ${invalidAddresses.mkString(",")}")
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 2a035f77333..90dffba4b48 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -1217,7 +1217,7 @@ class KafkaConfigTest {
assertEquals(1, config.brokerId)
assertEndpointsEqual(new Endpoint("PLAINTEXT", SecurityProtocol.PLAINTEXT,
"127.0.0.1", 1122),
config.effectiveAdvertisedBrokerListeners.head)
- assertEquals(Map("127.0.0.1" -> 2, "127.0.0.2" -> 3),
config.maxConnectionsPerIpOverrides)
+ assertEquals(util.Map.of("127.0.0.1", 2, "127.0.0.2", 3),
config.maxConnectionsPerIpOverrides)
assertEquals(util.List.of("/tmp1", "/tmp2"), config.logDirs)
assertEquals(12 * 60L * 1000L * 60, config.logRollTimeMillis)
assertEquals(11 * 60L * 1000L * 60, config.logRollTimeJitterMillis)
diff --git
a/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java
b/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java
index 9baa67c1515..903e24a78eb 100644
---
a/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java
+++
b/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java
@@ -22,10 +22,14 @@ import org.apache.kafka.common.Reconfigurable;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
+import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.coordinator.group.GroupConfig;
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig;
import org.apache.kafka.coordinator.share.ShareCoordinatorConfig;
@@ -50,6 +54,7 @@ import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
@@ -350,4 +355,276 @@ public abstract class AbstractKafkaConfig extends
AbstractConfig {
* @param reconfigurable the component to unregister
*/
public abstract void removeReconfigurable(Reconfigurable reconfigurable);
+
+ /**
+ * Determines whether a config entry might be sensitive based on its type.
+ * If the type cannot be determined, the config is treated as sensitive
+ * to be safe.
+ *
+ * @param configType the config entry type, or empty if unknown
+ * @return true if the config might be sensitive
+ */
+ public static boolean maybeSensitive(Optional<ConfigDef.Type> configType) {
+ return configType.isEmpty()
+ || configType.get() == ConfigDef.Type.PASSWORD;
+ }
+
+ /**
+ * Looks up the type for a config key by name directly from
+ * {@link #CONFIG_DEF}.
+ *
+ * @param name the config key name
+ * @return the type if found, or empty
+ */
+ public static Optional<ConfigDef.Type> configDefTypeOf(String name) {
+ return Optional.ofNullable(CONFIG_DEF.configKeys().get(name))
+ .map(key -> key.type);
+ }
+
+ /**
+ * Resolves the {@link ConfigDef.Type} for a given config name.
+ * First tries an exact match, then falls back to checking broker
+ * config synonyms.
+ *
+ * @param configName the config name to look up
+ * @return the type if found, or empty
+ */
+ public static Optional<ConfigDef.Type> configType(String configName) {
+ return configDefTypeOf(configName)
+ .or(() ->
Optional.ofNullable(DynamicConfig.Broker.configKeys().get(configName))
+ .map(key -> key.type))
+ .or(() -> DynamicBrokerConfig.brokerConfigSynonyms(configName,
true)
+ .stream()
+ .map(AbstractKafkaConfig::configDefTypeOf)
+ .flatMap(Optional::stream)
+ .findFirst());
+ }
+
+ /**
+ * Returns the loggable form of a config value. Sensitive values
+ * are replaced with {@link Password#HIDDEN}.
+ *
+ * @param resourceType the config resource type
+ * @param name the config name
+ * @param value the config value
+ * @return the value suitable for logging
+ */
+ public static String loggableValue(ConfigResource.Type resourceType,
+ String name,
+ String value) {
+ boolean sensitive = switch (resourceType) {
+ case BROKER -> maybeSensitive(configType(name));
+ case TOPIC -> maybeSensitive(LogConfig.configType(name));
+ case GROUP -> maybeSensitive(GroupConfig.configType(name));
+ case BROKER_LOGGER, CLIENT_METRICS -> false;
+ default -> true;
+ };
+ return sensitive ? Password.HIDDEN : value;
+ }
+
+ // ********* Socket Server Configuration **********
+
+ public int socketSendBufferBytes() {
+ return getInt(SocketServerConfigs.SOCKET_SEND_BUFFER_BYTES_CONFIG);
+ }
+
+ public int socketReceiveBufferBytes() {
+ return getInt(SocketServerConfigs.SOCKET_RECEIVE_BUFFER_BYTES_CONFIG);
+ }
+
+ public int socketRequestMaxBytes() {
+ return getInt(SocketServerConfigs.SOCKET_REQUEST_MAX_BYTES_CONFIG);
+ }
+
+ public int socketListenBacklogSize() {
+ return getInt(SocketServerConfigs.SOCKET_LISTEN_BACKLOG_SIZE_CONFIG);
+ }
+
+ public int maxConnectionsPerIp() {
+ return getInt(SocketServerConfigs.MAX_CONNECTIONS_PER_IP_CONFIG);
+ }
+
+ public Map<String, Integer> maxConnectionsPerIpOverrides() {
+ return
getMap(SocketServerConfigs.MAX_CONNECTIONS_PER_IP_OVERRIDES_CONFIG,
+
getString(SocketServerConfigs.MAX_CONNECTIONS_PER_IP_OVERRIDES_CONFIG))
+ .entrySet()
+ .stream()
+ .collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, e ->
Integer.parseInt(e.getValue())));
+ }
+
+ public int maxConnections() {
+ return getInt(SocketServerConfigs.MAX_CONNECTIONS_CONFIG);
+ }
+
+ public int maxConnectionCreationRate() {
+ return getInt(SocketServerConfigs.MAX_CONNECTION_CREATION_RATE_CONFIG);
+ }
+
+ public long connectionsMaxIdleMs() {
+ return getLong(SocketServerConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG);
+ }
+
+ public int failedAuthenticationDelayMs() {
+ return
getInt(SocketServerConfigs.FAILED_AUTHENTICATION_DELAY_MS_CONFIG);
+ }
+
+ public int queuedMaxRequests() {
+ return getInt(SocketServerConfigs.QUEUED_MAX_REQUESTS_CONFIG);
+ }
+
+ public long queuedMaxBytes() {
+ return getLong(SocketServerConfigs.QUEUED_MAX_BYTES_CONFIG);
+ }
+
+ public int numNetworkThreads() {
+ return getInt(SocketServerConfigs.NUM_NETWORK_THREADS_CONFIG);
+ }
+
+ public long connectionSetupTimeoutMs() {
+ return
getLong(ServerConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG);
+ }
+
+ public long connectionSetupTimeoutMaxMs() {
+ return
getLong(ServerConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG);
+ }
+
+ // ********* Log Configuration **********
+
+ public boolean autoCreateTopicsEnable() {
+ return getBoolean(ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG);
+ }
+
+ public int numPartitions() {
+ return getInt(ServerLogConfigs.NUM_PARTITIONS_CONFIG);
+ }
+
+ public Integer logSegmentBytes() {
+ return getInt(ServerLogConfigs.LOG_SEGMENT_BYTES_CONFIG);
+ }
+
+ public Long logFlushIntervalMessages() {
+ return getLong(ServerLogConfigs.LOG_FLUSH_INTERVAL_MESSAGES_CONFIG);
+ }
+
+ public int logCleanerThreads() {
+ return getInt(CleanerConfig.LOG_CLEANER_THREADS_PROP);
+ }
+
+ public long logFlushSchedulerIntervalMs() {
+ return
getLong(ServerLogConfigs.LOG_FLUSH_SCHEDULER_INTERVAL_MS_CONFIG);
+ }
+
+ public long logFlushOffsetCheckpointIntervalMs() {
+ return
getInt(ServerLogConfigs.LOG_FLUSH_OFFSET_CHECKPOINT_INTERVAL_MS_CONFIG).longValue();
+ }
+
+ public long logFlushStartOffsetCheckpointIntervalMs() {
+ return
getInt(ServerLogConfigs.LOG_FLUSH_START_OFFSET_CHECKPOINT_INTERVAL_MS_CONFIG).longValue();
+ }
+
+ public long logCleanupIntervalMs() {
+ return getLong(ServerLogConfigs.LOG_CLEANUP_INTERVAL_MS_CONFIG);
+ }
+
+ public List<String> logCleanupPolicy() {
+ return getList(ServerLogConfigs.LOG_CLEANUP_POLICY_CONFIG);
+ }
+
+ public Long logRetentionBytes() {
+ return getLong(ServerLogConfigs.LOG_RETENTION_BYTES_CONFIG);
+ }
+
+ public long logCleanerDedupeBufferSize() {
+ return getLong(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP);
+ }
+
+ public Long logCleanerDeleteRetentionMs() {
+ return getLong(CleanerConfig.LOG_CLEANER_DELETE_RETENTION_MS_PROP);
+ }
+
+ public Long logCleanerMinCompactionLagMs() {
+ return getLong(CleanerConfig.LOG_CLEANER_MIN_COMPACTION_LAG_MS_PROP);
+ }
+
+ public Long logCleanerMaxCompactionLagMs() {
+ return getLong(CleanerConfig.LOG_CLEANER_MAX_COMPACTION_LAG_MS_PROP);
+ }
+
+ public Double logCleanerMinCleanRatio() {
+ return getDouble(CleanerConfig.LOG_CLEANER_MIN_CLEAN_RATIO_PROP);
+ }
+
+ public Integer logIndexSizeMaxBytes() {
+ return getInt(ServerLogConfigs.LOG_INDEX_SIZE_MAX_BYTES_CONFIG);
+ }
+
+ public Integer logIndexIntervalBytes() {
+ return getInt(ServerLogConfigs.LOG_INDEX_INTERVAL_BYTES_CONFIG);
+ }
+
+ public Long logDeleteDelayMs() {
+ return getLong(ServerLogConfigs.LOG_DELETE_DELAY_MS_CONFIG);
+ }
+
+ public Long logRollTimeMillis() {
+ Long millis = getLong(ServerLogConfigs.LOG_ROLL_TIME_MILLIS_CONFIG);
+ if (millis != null) return millis;
+ return 60L * 60L * 1000L *
getInt(ServerLogConfigs.LOG_ROLL_TIME_HOURS_CONFIG);
+ }
+
+ public Long logRollTimeJitterMillis() {
+ Long millis =
getLong(ServerLogConfigs.LOG_ROLL_TIME_JITTER_MILLIS_CONFIG);
+ if (millis != null) return millis;
+ return
TimeUnit.HOURS.toMillis(getInt(ServerLogConfigs.LOG_ROLL_TIME_JITTER_HOURS_CONFIG));
+ }
+
+ public Long logFlushIntervalMs() {
+ Long millis = getLong(ServerLogConfigs.LOG_FLUSH_INTERVAL_MS_CONFIG);
+ if (millis != null) return millis;
+ return
getLong(ServerLogConfigs.LOG_FLUSH_SCHEDULER_INTERVAL_MS_CONFIG);
+ }
+
+ public Integer minInSyncReplicas() {
+ return getInt(ServerLogConfigs.MIN_IN_SYNC_REPLICAS_CONFIG);
+ }
+
+ public Boolean logPreAllocateEnable() {
+ return getBoolean(ServerLogConfigs.LOG_PRE_ALLOCATE_CONFIG);
+ }
+
+ public long logInitialTaskDelayMs() {
+ Long millis =
getLong(ServerLogConfigs.LOG_INITIAL_TASK_DELAY_MS_CONFIG);
+ if (millis != null) return millis;
+ return ServerLogConfigs.LOG_INITIAL_TASK_DELAY_MS_DEFAULT;
+ }
+
+ public TimestampType logMessageTimestampType() {
+ return
TimestampType.forName(getString(ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_TYPE_CONFIG));
+ }
+
+ public long logMessageTimestampBeforeMaxMs() {
+ return
getLong(ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_BEFORE_MAX_MS_CONFIG);
+ }
+
+ public long logMessageTimestampAfterMaxMs() {
+ return
getLong(ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG);
+ }
+
+ public long logDirFailureTimeoutMs() {
+ return getLong(ServerLogConfigs.LOG_DIR_FAILURE_TIMEOUT_MS_CONFIG);
+ }
+
+ public Long logRetentionTimeMillis() {
+ Long millis =
getLong(ServerLogConfigs.LOG_RETENTION_TIME_MILLIS_CONFIG);
+ if (millis == null) {
+ Integer mins =
getInt(ServerLogConfigs.LOG_RETENTION_TIME_MINUTES_CONFIG);
+ if (mins != null) {
+ millis = TimeUnit.MINUTES.toMillis(mins);
+ } else {
+ millis =
TimeUnit.HOURS.toMillis(getInt(ServerLogConfigs.LOG_RETENTION_TIME_HOURS_CONFIG));
+ }
+ }
+
+ return millis < 0 ? Long.valueOf(-1) : millis;
+ }
}