This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 58b56c213e0 KAFKA-15853: [1/N] Move static methods, SocketServer and 
Log configs to AbstractKafkaConfig (#21548)
58b56c213e0 is described below

commit 58b56c213e03af9b473a25546f97b007cda51ffa
Author: Christo Lolov <[email protected]>
AuthorDate: Fri Mar 27 02:15:14 2026 +0000

    KAFKA-15853: [1/N] Move static methods, SocketServer and Log configs to 
AbstractKafkaConfig (#21548)
    
    This is a gradual migration of KafkaConfig to Java for easier reviews.
    
    Moved the static methods to AbstractKafkaConfig and the methods used to
    access SocketServer and Log configs.
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../main/scala/kafka/network/RequestChannel.scala  |   6 +-
 .../main/scala/kafka/network/SocketServer.scala    |   8 +-
 .../scala/kafka/server/ConfigAdminManager.scala    |   3 +-
 core/src/main/scala/kafka/server/KafkaConfig.scala | 129 +---------
 .../scala/unit/kafka/server/KafkaConfigTest.scala  |   2 +-
 .../kafka/server/config/AbstractKafkaConfig.java   | 277 +++++++++++++++++++++
 6 files changed, 296 insertions(+), 129 deletions(-)

diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala 
b/core/src/main/scala/kafka/network/RequestChannel.scala
index de0a9d93173..25155145b5a 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -22,7 +22,6 @@ import java.util.concurrent._
 import com.fasterxml.jackson.databind.JsonNode
 import com.typesafe.scalalogging.Logger
 import kafka.network
-import kafka.server.KafkaConfig
 import kafka.utils.Logging
 import org.apache.kafka.common.config.ConfigResource
 import org.apache.kafka.common.memory.MemoryPool
@@ -35,6 +34,7 @@ import 
org.apache.kafka.network.metrics.{RequestChannelMetrics, RequestMetrics}
 import org.apache.kafka.server.common.RequestLocal
 import org.apache.kafka.server.metrics.KafkaMetricsGroup
 import org.apache.kafka.network.{RequestConvertToJson, Session}
+import org.apache.kafka.server.config.AbstractKafkaConfig
 
 import scala.jdk.CollectionConverters._
 import scala.jdk.OptionConverters.RichOption
@@ -170,7 +170,7 @@ object RequestChannel extends Logging {
           newData.resources().forEach(resource => {
             val resourceType = 
ConfigResource.Type.forId(resource.resourceType())
             resource.configs().forEach(config => {
-              config.setValue(KafkaConfig.loggableValue(resourceType, 
config.name(), config.value()))
+              config.setValue(AbstractKafkaConfig.loggableValue(resourceType, 
config.name(), config.value()))
             })
           })
           new AlterConfigsRequest(newData, alterConfigs.version())
@@ -180,7 +180,7 @@ object RequestChannel extends Logging {
           newData.resources().forEach(resource => {
             val resourceType = 
ConfigResource.Type.forId(resource.resourceType())
             resource.configs().forEach(config => {
-              config.setValue(KafkaConfig.loggableValue(resourceType, 
config.name(), config.value()))
+              config.setValue(AbstractKafkaConfig.loggableValue(resourceType, 
config.name(), config.value()))
             })
           })
           new 
IncrementalAlterConfigsRequest.Builder(newData).build(alterConfigs.version())
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala 
b/core/src/main/scala/kafka/network/SocketServer.scala
index cb7dc5ad92a..590be14e2dd 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -324,8 +324,8 @@ class SocketServer(
     }
     val maxConnectionsPerIpOverrides = newConfig.maxConnectionsPerIpOverrides
     if (maxConnectionsPerIpOverrides != 
oldConfig.maxConnectionsPerIpOverrides) {
-      info(s"Updating maxConnectionsPerIpOverrides: 
${maxConnectionsPerIpOverrides.map { case (k, v) => s"$k=$v" }.mkString(",")}")
-      
connectionQuotas.updateMaxConnectionsPerIpOverride(maxConnectionsPerIpOverrides)
+      info(s"Updating maxConnectionsPerIpOverrides: 
${maxConnectionsPerIpOverrides.asScala.map { case (k, v) => s"$k=$v" 
}.mkString(",")}")
+      
connectionQuotas.updateMaxConnectionsPerIpOverride(maxConnectionsPerIpOverrides.asScala.map
  { case (k, v) => (k, v.intValue()) }.toMap)
     }
     val maxConnections = newConfig.maxConnections
     if (maxConnections != oldConfig.maxConnections) {
@@ -1275,7 +1275,7 @@ private[kafka] class Processor(
 class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) 
extends Logging with AutoCloseable {
 
   @volatile private var defaultMaxConnectionsPerIp: Int = 
config.maxConnectionsPerIp
-  @volatile private var maxConnectionsPerIpOverrides = 
config.maxConnectionsPerIpOverrides.map { case (host, count) => 
(InetAddress.getByName(host), count) }
+  @volatile private var maxConnectionsPerIpOverrides = 
config.maxConnectionsPerIpOverrides.asScala.map { case (host, count) => 
(InetAddress.getByName(host), count.intValue()) }.toMap
   @volatile private var brokerMaxConnections = config.maxConnections
   private val interBrokerListenerName = config.interBrokerListenerName
   private val counts = mutable.Map[InetAddress, Int]()
@@ -1313,7 +1313,7 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, 
metrics: Metrics) extend
   }
 
   private[network] def updateMaxConnectionsPerIpOverride(overrideQuotas: 
Map[String, Int]): Unit = {
-    maxConnectionsPerIpOverrides = overrideQuotas.map { case (host, count) => 
(InetAddress.getByName(host), count) }
+    maxConnectionsPerIpOverrides = overrideQuotas.map { case (host, count) => 
(InetAddress.getByName(host), count) }.toMap
   }
 
   private[network] def updateBrokerMaxConnections(maxConnections: Int): Unit = 
{
diff --git a/core/src/main/scala/kafka/server/ConfigAdminManager.scala 
b/core/src/main/scala/kafka/server/ConfigAdminManager.scala
index f86af7d37af..1b021531601 100644
--- a/core/src/main/scala/kafka/server/ConfigAdminManager.scala
+++ b/core/src/main/scala/kafka/server/ConfigAdminManager.scala
@@ -36,6 +36,7 @@ import 
org.apache.kafka.common.protocol.Errors.{INVALID_REQUEST, UNKNOWN_SERVER_
 import org.apache.kafka.common.requests.ApiError
 import org.apache.kafka.common.resource.{Resource, ResourceType}
 import org.apache.kafka.metadata.ConfigRepository
+import org.apache.kafka.server.config.AbstractKafkaConfig
 import org.slf4j.{Logger, LoggerFactory}
 
 import scala.collection.{Map, Seq}
@@ -400,7 +401,7 @@ object ConfigAdminManager {
    */
   def toLoggableProps(resource: ConfigResource, configProps: Properties): 
Map[String, String] = {
     configProps.asScala.map {
-      case (key, value) => (key, KafkaConfig.loggableValue(resource.`type`, 
key, value))
+      case (key, value) => (key, 
AbstractKafkaConfig.loggableValue(resource.`type`, key, value))
     }
   }
 
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala 
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 1bdfbb5b37c..c5075d5bd99 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -23,35 +23,31 @@ import java.util.Properties
 import kafka.utils.Logging
 import kafka.utils.Implicits._
 import org.apache.kafka.common.{Endpoint, Reconfigurable}
-import org.apache.kafka.common.config.{ConfigDef, ConfigException, 
ConfigResource, TopicConfig}
+import org.apache.kafka.common.config.{ConfigDef, ConfigException, TopicConfig}
 import org.apache.kafka.common.config.ConfigDef.ConfigKey
 import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
-import org.apache.kafka.common.config.types.Password
 import org.apache.kafka.common.internals.Plugin
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.network.ListenerName
-import org.apache.kafka.common.record.TimestampType
 import org.apache.kafka.common.security.auth.KafkaPrincipalSerde
 import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.coordinator.group.Group.GroupType
 import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig
-import org.apache.kafka.coordinator.group.{GroupConfig, GroupCoordinatorConfig}
+import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
 import org.apache.kafka.coordinator.share.ShareCoordinatorConfig
 import org.apache.kafka.network.SocketServerConfigs
 import org.apache.kafka.raft.{KRaftConfigs, MetadataLogConfig, QuorumConfig}
 import org.apache.kafka.security.authorizer.AuthorizerUtils
 import org.apache.kafka.server.ProcessRole
 import org.apache.kafka.server.authorizer.Authorizer
-import org.apache.kafka.server.config.AbstractKafkaConfig.getMap
-import org.apache.kafka.server.config.{AbstractKafkaConfig, DynamicConfig, 
QuotaConfig, ReplicationConfigs, ServerConfigs, ServerLogConfigs, 
DynamicBrokerConfig => JDynamicBrokerConfig}
+import org.apache.kafka.server.config.{AbstractKafkaConfig, QuotaConfig, 
ReplicationConfigs, ServerConfigs, ServerLogConfigs, DynamicBrokerConfig => 
JDynamicBrokerConfig}
 import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
 import org.apache.kafka.server.metrics.MetricConfigs
-import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig}
 
 import scala.jdk.CollectionConverters._
 import scala.collection.{Map, Seq}
-import scala.jdk.OptionConverters.RichOptional
+import scala.jdk.OptionConverters.{RichOption, RichOptional}
 
 object KafkaConfig {
 
@@ -92,49 +88,11 @@ object KafkaConfig {
 
   def apply(props: java.util.Map[_, _], doLog: Boolean = true): KafkaConfig = 
new KafkaConfig(props, doLog)
 
-  private def typeOf(name: String): Option[ConfigDef.Type] = 
Option(configDef.configKeys.get(name)).map(_.`type`)
-
-  def configType(configName: String): Option[ConfigDef.Type] = {
-    val configType = configTypeExact(configName)
-    if (configType.isDefined) {
-      return configType
-    }
-    typeOf(configName) match {
-      case Some(t) => Some(t)
-      case None =>
-        JDynamicBrokerConfig.brokerConfigSynonyms(configName, 
true).asScala.flatMap(typeOf).headOption
-    }
-  }
-
-  private def configTypeExact(exactName: String): Option[ConfigDef.Type] = {
-    val configType = typeOf(exactName).orNull
-    if (configType != null) {
-      Some(configType)
-    } else {
-      val configKey = DynamicConfig.Broker.configKeys.get(exactName)
-      if (configKey != null) {
-        Some(configKey.`type`)
-      } else {
-        None
-      }
-    }
-  }
+  def configType(configName: String): Option[ConfigDef.Type] =
+    AbstractKafkaConfig.configType(configName).toScala
 
   def maybeSensitive(configType: Option[ConfigDef.Type]): Boolean = {
-    // If we can't determine the config entry type, treat it as a sensitive 
config to be safe
-    configType.isEmpty || configType.contains(ConfigDef.Type.PASSWORD)
-  }
-
-  def loggableValue(resourceType: ConfigResource.Type, name: String, value: 
String): String = {
-    val maybeSensitive = resourceType match {
-      case ConfigResource.Type.BROKER => 
KafkaConfig.maybeSensitive(KafkaConfig.configType(name))
-      case ConfigResource.Type.TOPIC => 
KafkaConfig.maybeSensitive(LogConfig.configType(name).toScala)
-      case ConfigResource.Type.GROUP => 
KafkaConfig.maybeSensitive(GroupConfig.configType(name).toScala)
-      case ConfigResource.Type.BROKER_LOGGER => false
-      case ConfigResource.Type.CLIENT_METRICS => false
-      case _ => true
-    }
-    if (maybeSensitive) Password.HIDDEN else value
+    AbstractKafkaConfig.maybeSensitive(configType.toJava)
   }
 }
 
@@ -233,8 +191,6 @@ class KafkaConfig private(doLog: Boolean, val props: 
util.Map[_, _])
   val serverMaxStartupTimeMs = 
getLong(KRaftConfigs.SERVER_MAX_STARTUP_TIME_MS_CONFIG)
 
   def messageMaxBytes = getInt(ServerConfigs.MESSAGE_MAX_BYTES_CONFIG)
-  val connectionSetupTimeoutMs = 
getLong(ServerConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG)
-  val connectionSetupTimeoutMaxMs = 
getLong(ServerConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG)
 
   def getNumReplicaAlterLogDirsThreads: Int = {
     val numThreads: Integer = 
Option(getInt(ServerConfigs.NUM_REPLICA_ALTER_LOG_DIRS_THREADS_CONFIG)).getOrElse(logDirs.size)
@@ -276,61 +232,9 @@ class KafkaConfig private(doLog: Boolean, val props: 
util.Map[_, _])
     }
   }
 
-  /** ********* Socket Server Configuration ***********/
-  val socketSendBufferBytes = 
getInt(SocketServerConfigs.SOCKET_SEND_BUFFER_BYTES_CONFIG)
-  val socketReceiveBufferBytes = 
getInt(SocketServerConfigs.SOCKET_RECEIVE_BUFFER_BYTES_CONFIG)
-  val socketRequestMaxBytes = 
getInt(SocketServerConfigs.SOCKET_REQUEST_MAX_BYTES_CONFIG)
-  val socketListenBacklogSize = 
getInt(SocketServerConfigs.SOCKET_LISTEN_BACKLOG_SIZE_CONFIG)
-  def maxConnectionsPerIp = 
getInt(SocketServerConfigs.MAX_CONNECTIONS_PER_IP_CONFIG)
-  def maxConnectionsPerIpOverrides: Map[String, Int] =
-    getMap(SocketServerConfigs.MAX_CONNECTIONS_PER_IP_OVERRIDES_CONFIG, 
getString(SocketServerConfigs.MAX_CONNECTIONS_PER_IP_OVERRIDES_CONFIG)).asScala.map
 { case (k, v) => (k, v.toInt)}
-  def maxConnections = getInt(SocketServerConfigs.MAX_CONNECTIONS_CONFIG)
-  def maxConnectionCreationRate = 
getInt(SocketServerConfigs.MAX_CONNECTION_CREATION_RATE_CONFIG)
-  val connectionsMaxIdleMs = 
getLong(SocketServerConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG)
-  val failedAuthenticationDelayMs = 
getInt(SocketServerConfigs.FAILED_AUTHENTICATION_DELAY_MS_CONFIG)
-  val queuedMaxRequests = 
getInt(SocketServerConfigs.QUEUED_MAX_REQUESTS_CONFIG)
-  val queuedMaxBytes = getLong(SocketServerConfigs.QUEUED_MAX_BYTES_CONFIG)
-  def numNetworkThreads = 
getInt(SocketServerConfigs.NUM_NETWORK_THREADS_CONFIG)
-
   /***************** rack configuration **************/
   val replicaSelectorClassName = 
Option(getString(ReplicationConfigs.REPLICA_SELECTOR_CLASS_CONFIG))
 
-  /** ********* Log Configuration ***********/
-  val autoCreateTopicsEnable = 
getBoolean(ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG)
-  val numPartitions = getInt(ServerLogConfigs.NUM_PARTITIONS_CONFIG)
-  def logSegmentBytes = getInt(ServerLogConfigs.LOG_SEGMENT_BYTES_CONFIG)
-  def logFlushIntervalMessages = 
getLong(ServerLogConfigs.LOG_FLUSH_INTERVAL_MESSAGES_CONFIG)
-  def logCleanerThreads = getInt(CleanerConfig.LOG_CLEANER_THREADS_PROP)
-  val logFlushSchedulerIntervalMs = 
getLong(ServerLogConfigs.LOG_FLUSH_SCHEDULER_INTERVAL_MS_CONFIG)
-  val logFlushOffsetCheckpointIntervalMs = 
getInt(ServerLogConfigs.LOG_FLUSH_OFFSET_CHECKPOINT_INTERVAL_MS_CONFIG).toLong
-  val logFlushStartOffsetCheckpointIntervalMs = 
getInt(ServerLogConfigs.LOG_FLUSH_START_OFFSET_CHECKPOINT_INTERVAL_MS_CONFIG).toLong
-  val logCleanupIntervalMs = 
getLong(ServerLogConfigs.LOG_CLEANUP_INTERVAL_MS_CONFIG)
-  def logCleanupPolicy = getList(ServerLogConfigs.LOG_CLEANUP_POLICY_CONFIG)
-
-  def logRetentionBytes = getLong(ServerLogConfigs.LOG_RETENTION_BYTES_CONFIG)
-  def logCleanerDedupeBufferSize = 
getLong(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP)
-  def logCleanerDeleteRetentionMs = 
getLong(CleanerConfig.LOG_CLEANER_DELETE_RETENTION_MS_PROP)
-  def logCleanerMinCompactionLagMs = 
getLong(CleanerConfig.LOG_CLEANER_MIN_COMPACTION_LAG_MS_PROP)
-  def logCleanerMaxCompactionLagMs = 
getLong(CleanerConfig.LOG_CLEANER_MAX_COMPACTION_LAG_MS_PROP)
-  def logCleanerMinCleanRatio = 
getDouble(CleanerConfig.LOG_CLEANER_MIN_CLEAN_RATIO_PROP)
-  def logIndexSizeMaxBytes = 
getInt(ServerLogConfigs.LOG_INDEX_SIZE_MAX_BYTES_CONFIG)
-  def logIndexIntervalBytes = 
getInt(ServerLogConfigs.LOG_INDEX_INTERVAL_BYTES_CONFIG)
-  def logDeleteDelayMs = getLong(ServerLogConfigs.LOG_DELETE_DELAY_MS_CONFIG)
-  def logRollTimeMillis: java.lang.Long = 
Option(getLong(ServerLogConfigs.LOG_ROLL_TIME_MILLIS_CONFIG)).getOrElse(60 * 60 
* 1000L * getInt(ServerLogConfigs.LOG_ROLL_TIME_HOURS_CONFIG))
-  def logRollTimeJitterMillis: java.lang.Long = 
Option(getLong(ServerLogConfigs.LOG_ROLL_TIME_JITTER_MILLIS_CONFIG)).getOrElse(60
 * 60 * 1000L * getInt(ServerLogConfigs.LOG_ROLL_TIME_JITTER_HOURS_CONFIG))
-  def logFlushIntervalMs: java.lang.Long = 
Option(getLong(ServerLogConfigs.LOG_FLUSH_INTERVAL_MS_CONFIG)).getOrElse(getLong(ServerLogConfigs.LOG_FLUSH_SCHEDULER_INTERVAL_MS_CONFIG))
-  def minInSyncReplicas = getInt(ServerLogConfigs.MIN_IN_SYNC_REPLICAS_CONFIG)
-  def logPreAllocateEnable: java.lang.Boolean = 
getBoolean(ServerLogConfigs.LOG_PRE_ALLOCATE_CONFIG)
-  def logInitialTaskDelayMs: java.lang.Long = 
Option(getLong(ServerLogConfigs.LOG_INITIAL_TASK_DELAY_MS_CONFIG)).getOrElse(ServerLogConfigs.LOG_INITIAL_TASK_DELAY_MS_DEFAULT)
-
-  def logMessageTimestampType = 
TimestampType.forName(getString(ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_TYPE_CONFIG))
-
-  def logMessageTimestampBeforeMaxMs: Long = 
getLong(ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_BEFORE_MAX_MS_CONFIG)
-
-  def logMessageTimestampAfterMaxMs: Long = 
getLong(ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG)
-
-  def logDirFailureTimeoutMs: Long = 
getLong(ServerLogConfigs.LOG_DIR_FAILURE_TIMEOUT_MS_CONFIG)
-
   /** ********* Replication configuration ***********/
   val controllerSocketTimeoutMs: Int = 
getInt(ReplicationConfigs.CONTROLLER_SOCKET_TIMEOUT_MS_CONFIG)
   val defaultReplicationFactor: Int = 
getInt(ReplicationConfigs.DEFAULT_REPLICATION_FACTOR_CONFIG)
@@ -433,21 +337,6 @@ class KafkaConfig private(doLog: Boolean, val props: 
util.Map[_, _])
     dynamicConfig.removeReconfigurable(reconfigurable)
   }
 
-  def logRetentionTimeMillis: Long = {
-    val millisInMinute = 60L * 1000L
-    val millisInHour = 60L * millisInMinute
-
-    val millis: java.lang.Long =
-      
Option(getLong(ServerLogConfigs.LOG_RETENTION_TIME_MILLIS_CONFIG)).getOrElse(
-        Option(getInt(ServerLogConfigs.LOG_RETENTION_TIME_MINUTES_CONFIG)) 
match {
-          case Some(mins) => millisInMinute * mins
-          case None => 
getInt(ServerLogConfigs.LOG_RETENTION_TIME_HOURS_CONFIG) * millisInHour
-        })
-
-    if (millis < 0) return -1
-    millis
-  }
-
   def listeners: Seq[Endpoint] =
     
AbstractKafkaConfig.listenerListToEndPoints(getList(SocketServerConfigs.LISTENERS_CONFIG),
 effectiveListenerSecurityProtocolMap).asScala
 
@@ -663,10 +552,10 @@ class KafkaConfig private(doLog: Boolean, val props: 
util.Map[_, _])
       s"${SocketServerConfigs.QUEUED_MAX_BYTES_CONFIG} must be larger or equal 
to ${SocketServerConfigs.SOCKET_REQUEST_MAX_BYTES_CONFIG}")
 
     if (maxConnectionsPerIp == 0)
-      require(maxConnectionsPerIpOverrides.nonEmpty, 
s"${SocketServerConfigs.MAX_CONNECTIONS_PER_IP_CONFIG} can be set to zero only 
if" +
+      require(!maxConnectionsPerIpOverrides.isEmpty, 
s"${SocketServerConfigs.MAX_CONNECTIONS_PER_IP_CONFIG} can be set to zero only 
if" +
         s" ${SocketServerConfigs.MAX_CONNECTIONS_PER_IP_OVERRIDES_CONFIG} 
property is set.")
 
-    val invalidAddresses = maxConnectionsPerIpOverrides.keys.filterNot(address 
=> Utils.validHostPattern(address))
+    val invalidAddresses = 
maxConnectionsPerIpOverrides.asScala.keys.filterNot(address => 
Utils.validHostPattern(address))
     if (invalidAddresses.nonEmpty)
       throw new 
IllegalArgumentException(s"${SocketServerConfigs.MAX_CONNECTIONS_PER_IP_OVERRIDES_CONFIG}
 contains invalid addresses : ${invalidAddresses.mkString(",")}")
 
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 2a035f77333..90dffba4b48 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -1217,7 +1217,7 @@ class KafkaConfigTest {
     assertEquals(1, config.brokerId)
     assertEndpointsEqual(new Endpoint("PLAINTEXT", SecurityProtocol.PLAINTEXT, 
"127.0.0.1", 1122),
       config.effectiveAdvertisedBrokerListeners.head)
-    assertEquals(Map("127.0.0.1" -> 2, "127.0.0.2" -> 3), 
config.maxConnectionsPerIpOverrides)
+    assertEquals(util.Map.of("127.0.0.1", 2, "127.0.0.2", 3), 
config.maxConnectionsPerIpOverrides)
     assertEquals(util.List.of("/tmp1", "/tmp2"), config.logDirs)
     assertEquals(12 * 60L * 1000L * 60, config.logRollTimeMillis)
     assertEquals(11 * 60L * 1000L * 60, config.logRollTimeJitterMillis)
diff --git 
a/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java 
b/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java
index 9baa67c1515..903e24a78eb 100644
--- 
a/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java
+++ 
b/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java
@@ -22,10 +22,14 @@ import org.apache.kafka.common.Reconfigurable;
 import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.config.ConfigResource;
 import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
+import org.apache.kafka.common.config.types.Password;
 import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.security.auth.SecurityProtocol;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.coordinator.group.GroupConfig;
 import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
 import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig;
 import org.apache.kafka.coordinator.share.ShareCoordinatorConfig;
@@ -50,6 +54,7 @@ import java.util.Locale;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 /**
@@ -350,4 +355,276 @@ public abstract class AbstractKafkaConfig extends 
AbstractConfig {
      * @param reconfigurable the component to unregister
      */
     public abstract void removeReconfigurable(Reconfigurable reconfigurable);
+
+    /**
+     * Determines whether a config entry might be sensitive based on its type.
+     * If the type cannot be determined, the config is treated as sensitive
+     * to be safe.
+     *
+     * @param configType the config entry type, or empty if unknown
+     * @return true if the config might be sensitive
+     */
+    public static boolean maybeSensitive(Optional<ConfigDef.Type> configType) {
+        return configType.isEmpty()
+                || configType.get() == ConfigDef.Type.PASSWORD;
+    }
+
+    /**
+     * Looks up the type for a config key by name directly from
+     * {@link #CONFIG_DEF}.
+     *
+     * @param name the config key name
+     * @return the type if found, or empty
+     */
+    public static Optional<ConfigDef.Type> configDefTypeOf(String name) {
+        return Optional.ofNullable(CONFIG_DEF.configKeys().get(name))
+                .map(key -> key.type);
+    }
+
+    /**
+     * Resolves the {@link ConfigDef.Type} for a given config name.
+     * First tries an exact match, then falls back to checking broker
+     * config synonyms.
+     *
+     * @param configName the config name to look up
+     * @return the type if found, or empty
+     */
+    public static Optional<ConfigDef.Type> configType(String configName) {
+        return configDefTypeOf(configName)
+                .or(() -> 
Optional.ofNullable(DynamicConfig.Broker.configKeys().get(configName))
+                        .map(key -> key.type))
+                .or(() -> DynamicBrokerConfig.brokerConfigSynonyms(configName, 
true)
+                        .stream()
+                        .map(AbstractKafkaConfig::configDefTypeOf)
+                        .flatMap(Optional::stream)
+                        .findFirst());
+    }
+
+    /**
+     * Returns the loggable form of a config value. Sensitive values
+     * are replaced with {@link Password#HIDDEN}.
+     *
+     * @param resourceType the config resource type
+     * @param name         the config name
+     * @param value        the config value
+     * @return the value suitable for logging
+     */
+    public static String loggableValue(ConfigResource.Type resourceType,
+                                       String name,
+                                       String value) {
+        boolean sensitive = switch (resourceType) {
+            case BROKER -> maybeSensitive(configType(name));
+            case TOPIC -> maybeSensitive(LogConfig.configType(name));
+            case GROUP -> maybeSensitive(GroupConfig.configType(name));
+            case BROKER_LOGGER, CLIENT_METRICS -> false;
+            default -> true;
+        };
+        return sensitive ? Password.HIDDEN : value;
+    }
+
+    // ********* Socket Server Configuration **********
+
+    public int socketSendBufferBytes() {
+        return getInt(SocketServerConfigs.SOCKET_SEND_BUFFER_BYTES_CONFIG);
+    }
+
+    public int socketReceiveBufferBytes() {
+        return getInt(SocketServerConfigs.SOCKET_RECEIVE_BUFFER_BYTES_CONFIG);
+    }
+
+    public int socketRequestMaxBytes() {
+        return getInt(SocketServerConfigs.SOCKET_REQUEST_MAX_BYTES_CONFIG);
+    }
+
+    public int socketListenBacklogSize() {
+        return getInt(SocketServerConfigs.SOCKET_LISTEN_BACKLOG_SIZE_CONFIG);
+    }
+
+    public int maxConnectionsPerIp() {
+        return getInt(SocketServerConfigs.MAX_CONNECTIONS_PER_IP_CONFIG);
+    }
+
+    public Map<String, Integer> maxConnectionsPerIpOverrides() {
+        return 
getMap(SocketServerConfigs.MAX_CONNECTIONS_PER_IP_OVERRIDES_CONFIG,
+                
getString(SocketServerConfigs.MAX_CONNECTIONS_PER_IP_OVERRIDES_CONFIG))
+                .entrySet()
+                .stream()
+                .collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, e -> 
Integer.parseInt(e.getValue())));
+    }
+
+    public int maxConnections() {
+        return getInt(SocketServerConfigs.MAX_CONNECTIONS_CONFIG);
+    }
+
+    public int maxConnectionCreationRate() {
+        return getInt(SocketServerConfigs.MAX_CONNECTION_CREATION_RATE_CONFIG);
+    }
+
+    public long connectionsMaxIdleMs() {
+        return getLong(SocketServerConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG);
+    }
+
+    public int failedAuthenticationDelayMs() {
+        return 
getInt(SocketServerConfigs.FAILED_AUTHENTICATION_DELAY_MS_CONFIG);
+    }
+
+    public int queuedMaxRequests() {
+        return getInt(SocketServerConfigs.QUEUED_MAX_REQUESTS_CONFIG);
+    }
+
+    public long queuedMaxBytes() {
+        return getLong(SocketServerConfigs.QUEUED_MAX_BYTES_CONFIG);
+    }
+
+    public int numNetworkThreads() {
+        return getInt(SocketServerConfigs.NUM_NETWORK_THREADS_CONFIG);
+    }
+
+    public long connectionSetupTimeoutMs() {
+        return 
getLong(ServerConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG);
+    }
+
+    public long connectionSetupTimeoutMaxMs() {
+        return 
getLong(ServerConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG);
+    }
+
+    // ********* Log Configuration **********
+
+    public boolean autoCreateTopicsEnable() {
+        return getBoolean(ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG);
+    }
+
+    public int numPartitions() {
+        return getInt(ServerLogConfigs.NUM_PARTITIONS_CONFIG);
+    }
+
+    public Integer logSegmentBytes() {
+        return getInt(ServerLogConfigs.LOG_SEGMENT_BYTES_CONFIG);
+    }
+
+    public Long logFlushIntervalMessages() {
+        return getLong(ServerLogConfigs.LOG_FLUSH_INTERVAL_MESSAGES_CONFIG);
+    }
+
+    public int logCleanerThreads() {
+        return getInt(CleanerConfig.LOG_CLEANER_THREADS_PROP);
+    }
+
+    public long logFlushSchedulerIntervalMs() {
+        return 
getLong(ServerLogConfigs.LOG_FLUSH_SCHEDULER_INTERVAL_MS_CONFIG);
+    }
+
+    public long logFlushOffsetCheckpointIntervalMs() {
+        return 
getInt(ServerLogConfigs.LOG_FLUSH_OFFSET_CHECKPOINT_INTERVAL_MS_CONFIG).longValue();
+    }
+
+    public long logFlushStartOffsetCheckpointIntervalMs() {
+        return 
getInt(ServerLogConfigs.LOG_FLUSH_START_OFFSET_CHECKPOINT_INTERVAL_MS_CONFIG).longValue();
+    }
+
+    public long logCleanupIntervalMs() {
+        return getLong(ServerLogConfigs.LOG_CLEANUP_INTERVAL_MS_CONFIG);
+    }
+
+    public List<String> logCleanupPolicy() {
+        return getList(ServerLogConfigs.LOG_CLEANUP_POLICY_CONFIG);
+    }
+
+    public Long logRetentionBytes() {
+        return getLong(ServerLogConfigs.LOG_RETENTION_BYTES_CONFIG);
+    }
+
+    public long logCleanerDedupeBufferSize() {
+        return getLong(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP);
+    }
+
+    public Long logCleanerDeleteRetentionMs() {
+        return getLong(CleanerConfig.LOG_CLEANER_DELETE_RETENTION_MS_PROP);
+    }
+
+    public Long logCleanerMinCompactionLagMs() {
+        return getLong(CleanerConfig.LOG_CLEANER_MIN_COMPACTION_LAG_MS_PROP);
+    }
+
+    public Long logCleanerMaxCompactionLagMs() {
+        return getLong(CleanerConfig.LOG_CLEANER_MAX_COMPACTION_LAG_MS_PROP);
+    }
+
+    public Double logCleanerMinCleanRatio() {
+        return getDouble(CleanerConfig.LOG_CLEANER_MIN_CLEAN_RATIO_PROP);
+    }
+
+    public Integer logIndexSizeMaxBytes() {
+        return getInt(ServerLogConfigs.LOG_INDEX_SIZE_MAX_BYTES_CONFIG);
+    }
+
+    public Integer logIndexIntervalBytes() {
+        return getInt(ServerLogConfigs.LOG_INDEX_INTERVAL_BYTES_CONFIG);
+    }
+
+    public Long logDeleteDelayMs() {
+        return getLong(ServerLogConfigs.LOG_DELETE_DELAY_MS_CONFIG);
+    }
+
+    public Long logRollTimeMillis() {
+        Long millis = getLong(ServerLogConfigs.LOG_ROLL_TIME_MILLIS_CONFIG);
+        if (millis != null) return millis;
+        return 60L * 60L * 1000L * 
getInt(ServerLogConfigs.LOG_ROLL_TIME_HOURS_CONFIG);
+    }
+
+    public Long logRollTimeJitterMillis() {
+        Long millis = 
getLong(ServerLogConfigs.LOG_ROLL_TIME_JITTER_MILLIS_CONFIG);
+        if (millis != null) return millis;
+        return 
TimeUnit.HOURS.toMillis(getInt(ServerLogConfigs.LOG_ROLL_TIME_JITTER_HOURS_CONFIG));
+    }
+
+    public Long logFlushIntervalMs() {
+        Long millis = getLong(ServerLogConfigs.LOG_FLUSH_INTERVAL_MS_CONFIG);
+        if (millis != null) return millis;
+        return 
getLong(ServerLogConfigs.LOG_FLUSH_SCHEDULER_INTERVAL_MS_CONFIG);
+    }
+
+    public Integer minInSyncReplicas() {
+        return getInt(ServerLogConfigs.MIN_IN_SYNC_REPLICAS_CONFIG);
+    }
+
+    public Boolean logPreAllocateEnable() {
+        return getBoolean(ServerLogConfigs.LOG_PRE_ALLOCATE_CONFIG);
+    }
+
+    public long logInitialTaskDelayMs() {
+        Long millis = 
getLong(ServerLogConfigs.LOG_INITIAL_TASK_DELAY_MS_CONFIG);
+        if (millis != null) return millis;
+        return ServerLogConfigs.LOG_INITIAL_TASK_DELAY_MS_DEFAULT;
+    }
+
+    public TimestampType logMessageTimestampType() {
+        return 
TimestampType.forName(getString(ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_TYPE_CONFIG));
+    }
+
+    public long logMessageTimestampBeforeMaxMs() {
+        return 
getLong(ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_BEFORE_MAX_MS_CONFIG);
+    }
+
+    public long logMessageTimestampAfterMaxMs() {
+        return 
getLong(ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG);
+    }
+
+    public long logDirFailureTimeoutMs() {
+        return getLong(ServerLogConfigs.LOG_DIR_FAILURE_TIMEOUT_MS_CONFIG);
+    }
+
+    public Long logRetentionTimeMillis() {
+        Long millis = 
getLong(ServerLogConfigs.LOG_RETENTION_TIME_MILLIS_CONFIG);
+        if (millis == null) {
+            Integer mins = 
getInt(ServerLogConfigs.LOG_RETENTION_TIME_MINUTES_CONFIG);
+            if (mins != null) {
+                millis = TimeUnit.MINUTES.toMillis(mins);
+            } else {
+                millis = 
TimeUnit.HOURS.toMillis(getInt(ServerLogConfigs.LOG_RETENTION_TIME_HOURS_CONFIG));
+            }
+        }
+
+        return millis < 0 ? Long.valueOf(-1) : millis;
+    }
 }

Reply via email to