This is an automated email from the ASF dual-hosted git repository.

mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 6be48c6e54f KAFKA-20441: Fix handling of cordoned log dirs (#22070)
6be48c6e54f is described below

commit 6be48c6e54f2f2431b58fb4497d29abe07e150d2
Author: Mickael Maison <[email protected]>
AuthorDate: Mon May 4 13:54:46 2026 +0200

    KAFKA-20441: Fix handling of cordoned log dirs (#22070)
    
    This tightens the validation of the cordoned.log.dirs configuration on
    controllers. Controllers only allow uncordoning log directories as they
    don't have access to the full list of log directory paths.
    
    This also delays the initial propagation of cordoned log dirs.
    Previously log dirs where initially sent as part of the broker
    registration, now they are only sent via heartbeat once the broker has
    fully caught up with the metadata. This ensure dynamic changes cannot be
    lost.
    
    Reviewers: Chia-Ping Tsai <[email protected]>, José Armando García
     Sancio <[email protected]>
---
 .../common/message/BrokerHeartbeatRequest.json     |   5 +-
 .../common/message/BrokerRegistrationRequest.json  |   7 +-
 .../common/message/BrokerRegistrationResponse.json |   3 +-
 .../src/main/scala/kafka/server/BrokerServer.scala |  12 +-
 .../main/scala/kafka/server/ControllerApis.scala   |  12 +-
 .../scala/kafka/server/DynamicBrokerConfig.scala   |  23 +---
 core/src/main/scala/kafka/server/KafkaConfig.scala |   2 +-
 .../kafka/api/PlaintextAdminIntegrationTest.scala  |   2 +-
 .../test/scala/unit/kafka/log/LogManagerTest.scala |   2 +-
 .../kafka/server/BrokerLifecycleManagerTest.scala  |  84 ++++++--------
 .../unit/kafka/server/ControllerApisTest.scala     |   3 +-
 .../kafka/server/DynamicBrokerConfigTest.scala     |  19 ++-
 .../unit/kafka/server/ReplicaManagerTest.scala     |   8 +-
 .../unit/kafka/server/ReplicationQuotasTest.scala  |   1 +
 .../kafka/controller/ClusterControlManager.java    |  21 +---
 .../controller/ConfigurationControlManager.java    |  80 ++++++++++---
 .../org/apache/kafka/controller/Controller.java    |  12 +-
 .../apache/kafka/controller/QuorumController.java  |  16 ++-
 .../controller/ReplicationControlManager.java      |  18 +--
 .../java/org/apache/kafka/image/ClusterDelta.java  |   2 +-
 .../apache/kafka/metadata/BrokerRegistration.java  |  23 +++-
 .../metadata/BrokerRegistrationChangeRecord.json   |   5 +-
 .../common/metadata/RegisterBrokerRecord.json      |   5 +-
 .../controller/ClusterControlManagerTest.java      |  36 +++++-
 .../ConfigurationControlManagerTest.java           | 128 +++++++++++++++++----
 .../QuorumControllerIntegrationTestUtils.java      |   2 +-
 .../kafka/controller/QuorumControllerTest.java     |  34 +++---
 .../controller/ReplicationControlManagerTest.java  |  90 ++++++++++-----
 .../image/node/ClusterImageBrokersNodeTest.java    |   2 +-
 .../kafka/metadata/BrokerRegistrationTest.java     |  90 +++++++++------
 .../kafka/server/common/DirectoryEventHandler.java |  11 +-
 .../kafka/server/BrokerLifecycleManager.java       |  79 +++++--------
 .../server/CordonedLogDirsIntegrationTest.java     |  52 ++++++++-
 .../kafka/storage/internals/log/LogManager.java    |   4 +-
 .../apache/kafka/common/test/MockController.java   |   9 +-
 35 files changed, 560 insertions(+), 342 deletions(-)

diff --git 
a/clients/src/main/resources/common/message/BrokerHeartbeatRequest.json 
b/clients/src/main/resources/common/message/BrokerHeartbeatRequest.json
index 41d5fa0a150..63910c2fc4e 100644
--- a/clients/src/main/resources/common/message/BrokerHeartbeatRequest.json
+++ b/clients/src/main/resources/common/message/BrokerHeartbeatRequest.json
@@ -35,7 +35,8 @@
       "about": "True if the broker wants to be shut down, false otherwise." },
     { "name": "OfflineLogDirs", "type":  "[]uuid", "versions": "1+", 
"taggedVersions": "1+", "tag": 0,
       "about": "Log directories that failed and went offline." },
-    { "name": "CordonedLogDirs", "type":  "[]uuid", "versions": "2+", 
"taggedVersions": "2+",
-      "tag": "1", "about": "Log directories that are cordoned." }
+    { "name": "CordonedLogDirs", "type":  "[]uuid", "versions": "2+", 
"taggedVersions": "2+", "tag": 1,
+      "nullableVersions": "2+", "default": "null",
+      "about": "List of log directories that are cordoned. This is null before 
the broker reaches the RECOVERY state." }
   ]
 }
diff --git 
a/clients/src/main/resources/common/message/BrokerRegistrationRequest.json 
b/clients/src/main/resources/common/message/BrokerRegistrationRequest.json
index 53e37f21d5a..04cd1b65a79 100644
--- a/clients/src/main/resources/common/message/BrokerRegistrationRequest.json
+++ b/clients/src/main/resources/common/message/BrokerRegistrationRequest.json
@@ -17,13 +17,12 @@
 // Version 2 adds LogDirs for KIP-858
 // Version 3 adds the PreviousBrokerEpoch for the KIP-966
 // Version 4 fixes KAFKA-17011, which blocked SupportedFeatures.MinVersion in 
the response from being 0.
-// Version 5 adds the CordonedLogDirs flexible field
 {
   "apiKey":62,
   "type": "request",
   "listeners": ["controller"],
   "name": "BrokerRegistrationRequest",
-  "validVersions": "0-5",
+  "validVersions": "0-4",
   "flexibleVersions": "0+",
   "fields": [
     { "name": "BrokerId", "type": "int32", "versions": "0+", "entityType": 
"brokerId",
@@ -61,8 +60,6 @@
     { "name": "LogDirs", "type":  "[]uuid", "versions":  "2+",
       "about": "Log directories configured in this broker which are 
available.", "ignorable": true },
     { "name": "PreviousBrokerEpoch", "type": "int64", "versions": "3+", 
"default": "-1", "ignorable": true,
-      "about": "The epoch before a clean shutdown." },
-    { "name": "CordonedLogDirs", "type":  "[]uuid", "versions":  "5+", 
"taggedVersions": "5+",
-      "tag": "0", "about": "Log directories that are cordoned." }
+      "about": "The epoch before a clean shutdown." }
   ]
 }
diff --git 
a/clients/src/main/resources/common/message/BrokerRegistrationResponse.json 
b/clients/src/main/resources/common/message/BrokerRegistrationResponse.json
index 956e945df40..308fe3860d2 100644
--- a/clients/src/main/resources/common/message/BrokerRegistrationResponse.json
+++ b/clients/src/main/resources/common/message/BrokerRegistrationResponse.json
@@ -17,12 +17,11 @@
 // Version 2 adds the PreviousBrokerEpoch to the request for the KIP-966
 // Version 3 is the same as version 2 (new field in request).
 // Version 4 is the same as version 2 (new field in request).
-// Version 5 is the same as version 2 (new field in request).
 {
   "apiKey": 62,
   "type": "response",
   "name": "BrokerRegistrationResponse",
-  "validVersions": "0-5",
+  "validVersions": "0-4",
   "flexibleVersions": "0+",
   "fields": [
     { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala 
b/core/src/main/scala/kafka/server/BrokerServer.scala
index 8f741727735..1646c8c6a83 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -68,7 +68,6 @@ import java.util
 import java.util.Optional
 import java.util.concurrent.locks.{Condition, ReentrantLock}
 import java.util.concurrent.{CompletableFuture, ExecutionException, TimeUnit, 
TimeoutException}
-import java.util.stream.Collectors
 import scala.collection.Map
 import scala.jdk.CollectionConverters._
 import scala.jdk.OptionConverters.RichOption
@@ -228,7 +227,7 @@ class BrokerServer(
         config,
         time,
         s"broker-${config.nodeId}-",
-        logManager.directoryIdsSet,
+        logManager.directoryIds,
         () => new Thread(() => shutdown(), "kafka-shutdown-thread").start(),
         () => metadataCache.metadataVersion().isCordonedLogDirsSupported)
 
@@ -343,9 +342,6 @@ class BrokerServer(
 
         override def handleCordoned(directoryIds: util.Set[Uuid]): Unit =
           lifecycleManager.propagateDirectoryCordoned(directoryIds)
-
-        override def handleUncordoned(directoryIds: util.Set[Uuid]): Unit =
-          lifecycleManager.propagateDirectoryUncordoned(directoryIds)
 }
 
       /**
@@ -428,17 +424,13 @@ class BrokerServer(
         s"broker-${config.nodeId}-",
         config.brokerHeartbeatIntervalMs
       )
-      val initialCordonedLogDirs: util.Set[Uuid] = 
config.cordonedLogDirs().stream()
-        .map(dir => logManager.directoryId(dir).get)
-        .collect(Collectors.toSet())
       lifecycleManager.start(
         () => sharedServer.loader.lastAppliedOffset(),
         brokerLifecycleChannelManager,
         clusterId,
         listenerInfo.toBrokerRegistrationRequest,
         featuresRemapped,
-        logManager.readBrokerEpochFromCleanShutdownFiles(),
-        initialCordonedLogDirs
+        logManager.readBrokerEpochFromCleanShutdownFiles()
       )
 
       // The FetchSessionCache is divided into config.numIoThreads shards, 
each responsible
diff --git a/core/src/main/scala/kafka/server/ControllerApis.scala 
b/core/src/main/scala/kafka/server/ControllerApis.scala
index d928bbac01b..fbe76012508 100644
--- a/core/src/main/scala/kafka/server/ControllerApis.scala
+++ b/core/src/main/scala/kafka/server/ControllerApis.scala
@@ -370,7 +370,8 @@ class ControllerApis(
         authHelper.authorize(request.context, CREATE, CLUSTER, CLUSTER_NAME, 
logIfDenied = false),
         names => authHelper.filterByAuthorized(request.context, CREATE, TOPIC, 
names)(identity),
         names => authHelper.filterByAuthorized(request.context, 
DESCRIBE_CONFIGS, TOPIC,
-            names, logIfDenied = false)(identity))
+            names, logIfDenied = false)(identity),
+        request.isForwarded)
     future.handle[Unit] { (result, exception) =>
       val response = if (exception != null) {
         createTopicsRequest.getErrorResponse(exception)
@@ -392,7 +393,8 @@ class ControllerApis(
     request: CreateTopicsRequestData,
     hasClusterAuth: Boolean,
     getCreatableTopics: Iterable[String] => Set[String],
-    getDescribableTopics: Iterable[String] => Set[String]
+    getDescribableTopics: Iterable[String] => Set[String],
+    forwarded: Boolean
   ): CompletableFuture[CreateTopicsResponseData] = {
     val topicNames = new util.HashSet[String]()
     val duplicateTopicNames = new util.HashSet[String]()
@@ -422,7 +424,7 @@ class ControllerApis(
         iterator.remove()
       }
     }
-    controller.createTopics(context, effectiveRequest, 
describableTopicNames).thenApply { response =>
+    controller.createTopics(context, effectiveRequest, describableTopicNames, 
forwarded).thenApply { response =>
       duplicateTopicNames.forEach { name =>
         response.topics().add(new CreatableTopicResult().
           setName(name).
@@ -534,7 +536,7 @@ class ControllerApis(
         iterator.remove()
       }
     }
-    controller.legacyAlterConfigs(context, configChanges, 
alterConfigsRequest.data.validateOnly)
+    controller.legacyAlterConfigs(context, configChanges, 
alterConfigsRequest.data.validateOnly, request.isForwarded)
       .handle[Unit] { (controllerResults, exception) =>
         if (exception != null) {
           requestHelper.handleError(request, exception)
@@ -773,7 +775,7 @@ class ControllerApis(
         iterator.remove()
       }
     }
-    controller.incrementalAlterConfigs(context, configChanges, 
alterConfigsRequest.data.validateOnly)
+    controller.incrementalAlterConfigs(context, configChanges, 
alterConfigsRequest.data.validateOnly, request.isForwarded)
       .handle[Unit] { (controllerResults, exception) =>
         if (exception != null) {
           requestHelper.handleError(request, exception)
diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala 
b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index bc70f1955a4..6aff9935818 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -25,8 +25,7 @@ import kafka.network.DataPlaneAcceptor
 import kafka.raft.KafkaRaftManager
 import kafka.server.DynamicBrokerConfig._
 import kafka.utils.Logging
-import org.apache.kafka.common.Reconfigurable
-import org.apache.kafka.common.Endpoint
+import org.apache.kafka.common.{Endpoint, Reconfigurable, Uuid}
 import org.apache.kafka.common.config.{ConfigDef, ConfigException, 
ConfigResource, SslConfigs}
 import org.apache.kafka.common.metadata.{ConfigRecord, MetadataRecordType}
 import org.apache.kafka.common.metrics.{Metrics, MetricsReporter}
@@ -612,23 +611,13 @@ class DynamicLogConfig(logManager: LogManager, 
directoryEventHandler: DirectoryE
 
   override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): 
Unit = {
     val newBrokerDefaults = new util.HashMap[String, 
Object](newConfig.extractLogConfigMap)
-
-    
logManager.updateCordonedLogDirs(util.Set.copyOf(newConfig.cordonedLogDirs))
-    val newCordoned = new util.HashSet[String](newConfig.cordonedLogDirs)
-    newCordoned.removeAll(oldConfig.cordonedLogDirs)
-
-    val newUncordoned = new util.HashSet[String](oldConfig.cordonedLogDirs)
-    newUncordoned.removeAll(newConfig.cordonedLogDirs)
-    if (!newCordoned.isEmpty) {
-      directoryEventHandler.handleCordoned(newCordoned.stream.map(dir => 
logManager.directoryId(dir).get).collect(Collectors.toSet()))
-    }
-    if (!newUncordoned.isEmpty) {
-      directoryEventHandler.handleUncordoned(newUncordoned.stream.map(dir => 
logManager.directoryId(dir).get).collect(Collectors.toSet()))
-    }
-
     logManager.reconfigureDefaultLogConfig(new LogConfig(newBrokerDefaults))
-
     updateLogsConfig(newBrokerDefaults.asScala)
+
+    
logManager.updateCordonedLogDirs(util.Set.copyOf(newConfig.cordonedLogDirs))
+    directoryEventHandler.handleCordoned(newConfig.cordonedLogDirs.stream
+      .flatMap[Uuid](dir => logManager.directoryId(dir).stream)
+      .collect(Collectors.toSet[Uuid]))
   }
 }
 
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala 
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index c5075d5bd99..05d37ea07fe 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -399,7 +399,7 @@ class KafkaConfig private(doLog: Boolean, val props: 
util.Map[_, _])
       require(cordonedLogDirs.size == 1, s"When 
${ServerLogConfigs.CORDONED_LOG_DIRS_CONFIG} is set to 
${ServerLogConfigs.CORDONED_LOG_DIRS_ALL}, it must not contain other values")
     } else {
       val unknownLogDirs = 
cordonedLogDirs.asScala.filter(!logDirs().contains(_))
-      require(unknownLogDirs.isEmpty, s"All entries in 
${ServerLogConfigs.CORDONED_LOG_DIRS_CONFIG} must be present in 
${ServerLogConfigs.CORDONED_LOG_DIRS_CONFIG} or 
${ServerLogConfigs.LOG_DIR_CONFIG}. Missing entries : 
${unknownLogDirs.mkString(", ")}")
+      require(unknownLogDirs.isEmpty, s"All entries in 
${ServerLogConfigs.CORDONED_LOG_DIRS_CONFIG} must be present in 
${ServerLogConfigs.LOG_DIRS_CONFIG} or ${ServerLogConfigs.LOG_DIR_CONFIG}. 
Missing entries : ${unknownLogDirs.mkString(", ")}")
     }
   }
 
diff --git 
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
index cadcddd99a5..8759fa38128 100644
--- 
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
@@ -4323,7 +4323,7 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     controllerServer.controller.incrementalAlterConfigs(ANONYMOUS_CONTEXT,
       util.Map.of(controllerNodeResource,
         util.Map.of(CleanerConfig.LOG_CLEANER_DELETE_RETENTION_MS_PROP,
-          new SimpleImmutableEntry(AlterConfigOp.OpType.SET, "34"))), 
false).get()
+          new SimpleImmutableEntry(AlterConfigOp.OpType.SET, "34"))), false, 
false).get()
     ensureConsistentKRaftMetadata()
 
     waitUntilTrue(() => brokers.forall(_.config.originals.getOrDefault(
diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala 
b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index ab5dfe0587b..a57ecd588cb 100755
--- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@ -1163,7 +1163,7 @@ class LogManagerTest {
     assertEquals(Optional.empty, 
logManager.directoryId(dirs(2).getAbsolutePath))
     assertEquals(Optional.of(Uuid.fromString("kQfNPJ2FTHq_6Qlyyv6Jqg")), 
logManager.directoryId(dirs(3).getAbsolutePath))
     assertTrue(logManager.directoryId(dirs(3).getAbsolutePath).isPresent)
-    assertEquals(2, logManager.directoryIdsSet.size)
+    assertEquals(2, logManager.directoryIds.size)
   }
 
   /**
diff --git 
a/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala
index d64d9f00b9c..51d51967871 100644
--- a/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala
@@ -38,6 +38,10 @@ import scala.jdk.CollectionConverters._
 @Timeout(value = 12)
 class BrokerLifecycleManagerTest {
   private var manager: BrokerLifecycleManager = null
+  private val logDirs: util.Map[String, Uuid] = util.Map.of(
+    "/var/data",
+    Uuid.fromString("oFoTeS9QT0aAyCyH41v45A")
+  )
 
   @AfterEach
   def tearDown(): Unit = {
@@ -61,18 +65,18 @@ class BrokerLifecycleManagerTest {
   @Test
   def testCreateAndClose(): Unit = {
     val context = new RegistrationTestContext(configProperties)
-    manager = new BrokerLifecycleManager(context.config, context.time, 
"create-and-close-", util.Set.of(Uuid.fromString("oFoTeS9QT0aAyCyH41v45A")))
+    manager = new BrokerLifecycleManager(context.config, context.time, 
"create-and-close-", logDirs)
     manager.close()
   }
 
   @Test
   def testCreateStartAndClose(): Unit = {
     val context = new RegistrationTestContext(configProperties)
-    manager = new BrokerLifecycleManager(context.config, context.time, 
"create-start-and-close-", 
util.Set.of(Uuid.fromString("uiUADXZWTPixVvp6UWFWnw")))
+    manager = new BrokerLifecycleManager(context.config, context.time, 
"create-start-and-close-", logDirs)
     assertEquals(BrokerState.NOT_RUNNING, manager.state)
     manager.start(() => context.highestMetadataOffset.get(),
       context.mockChannelManager, context.clusterId, 
context.advertisedListeners,
-      Collections.emptyMap(), OptionalLong.empty(), util.Set.of())
+      Collections.emptyMap(), OptionalLong.empty())
     TestUtils.retry(60000) {
       assertEquals(BrokerState.STARTING, manager.state)
     }
@@ -83,12 +87,12 @@ class BrokerLifecycleManagerTest {
   @Test
   def testSuccessfulRegistration(): Unit = {
     val context = new RegistrationTestContext(configProperties)
-    manager = new BrokerLifecycleManager(context.config, context.time, 
"successful-registration-", 
util.Set.of(Uuid.fromString("gCpDJgRlS2CBCpxoP2VMsQ")))
+    manager = new BrokerLifecycleManager(context.config, context.time, 
"successful-registration-", logDirs)
     val controllerNode = new Node(3000, "localhost", 8021)
     context.controllerNodeProvider.node.set(controllerNode)
     manager.start(() => context.highestMetadataOffset.get(),
       context.mockChannelManager, context.clusterId, 
context.advertisedListeners,
-      Collections.emptyMap(), OptionalLong.of(10L), util.Set.of())
+      Collections.emptyMap(), OptionalLong.of(10L))
     TestUtils.retry(60000) {
       assertEquals(1, context.mockChannelManager.unsentQueue.size)
       assertEquals(10L, 
context.mockChannelManager.unsentQueue.getFirst.request.build().asInstanceOf[BrokerRegistrationRequest].data().previousBrokerEpoch())
@@ -105,7 +109,7 @@ class BrokerLifecycleManagerTest {
   def testRegistrationTimeout(): Unit = {
     val context = new RegistrationTestContext(configProperties)
     val controllerNode = new Node(3000, "localhost", 8021)
-    manager = new BrokerLifecycleManager(context.config, context.time, 
"registration-timeout-", util.Set.of(Uuid.fromString("9XBOAtr4T0Wbx2sbiWh6xg")))
+    manager = new BrokerLifecycleManager(context.config, context.time, 
"registration-timeout-", logDirs)
     context.controllerNodeProvider.node.set(controllerNode)
     def newDuplicateRegistrationResponse(): Unit = {
       context.mockClient.prepareResponseFrom(new BrokerRegistrationResponse(
@@ -117,7 +121,7 @@ class BrokerLifecycleManagerTest {
     assertEquals(1, context.mockClient.futureResponses().size)
     manager.start(() => context.highestMetadataOffset.get(),
       context.mockChannelManager, context.clusterId, 
context.advertisedListeners,
-      Collections.emptyMap(), OptionalLong.empty(), util.Set.of())
+      Collections.emptyMap(), OptionalLong.empty())
     // We should send the first registration request and get a failure 
immediately
     TestUtils.retry(60000) {
       context.poll()
@@ -145,7 +149,7 @@ class BrokerLifecycleManagerTest {
   @Test
   def testControlledShutdown(): Unit = {
     val context = new RegistrationTestContext(configProperties)
-    manager = new BrokerLifecycleManager(context.config, context.time, 
"controlled-shutdown-", util.Set.of(Uuid.fromString("B4RtUz1ySGip3A7ZFYB2dg")))
+    manager = new BrokerLifecycleManager(context.config, context.time, 
"controlled-shutdown-", logDirs)
     val controllerNode = new Node(3000, "localhost", 8021)
     context.controllerNodeProvider.node.set(controllerNode)
     context.mockClient.prepareResponseFrom(new BrokerRegistrationResponse(
@@ -154,7 +158,7 @@ class BrokerLifecycleManagerTest {
       new BrokerHeartbeatResponseData().setIsCaughtUp(true)), controllerNode)
     manager.start(() => context.highestMetadataOffset.get(),
       context.mockChannelManager, context.clusterId, 
context.advertisedListeners,
-      Collections.emptyMap(), OptionalLong.empty(), util.Set.of())
+      Collections.emptyMap(), OptionalLong.empty())
     TestUtils.retry(10000) {
       context.poll()
       manager.eventQueue.wakeup()
@@ -226,14 +230,14 @@ class BrokerLifecycleManagerTest {
   @Test
   def testAlwaysSendsAccumulatedOfflineDirs(): Unit = {
     val ctx = new RegistrationTestContext(configProperties)
-    manager = new BrokerLifecycleManager(ctx.config, ctx.time, 
"offline-dirs-sent-in-heartbeat-", 
util.Set.of(Uuid.fromString("0IbF1sjhSGG6FNvnrPbqQg")))
+    manager = new BrokerLifecycleManager(ctx.config, ctx.time, 
"offline-dirs-sent-in-heartbeat-", logDirs)
     val controllerNode = new Node(3000, "localhost", 8021)
     ctx.controllerNodeProvider.node.set(controllerNode)
 
     val registration = prepareResponse(ctx, new BrokerRegistrationResponse(new 
BrokerRegistrationResponseData().setBrokerEpoch(1000)))
     manager.start(() => ctx.highestMetadataOffset.get(),
       ctx.mockChannelManager, ctx.clusterId, ctx.advertisedListeners,
-      Collections.emptyMap(), OptionalLong.empty(), util.Set.of())
+      Collections.emptyMap(), OptionalLong.empty())
     poll(ctx, manager, registration)
 
     def nextHeartbeatDirs(): Set[String] =
@@ -250,9 +254,9 @@ class BrokerLifecycleManagerTest {
 
   @Test
   def testRegistrationIncludesDirs(): Unit = {
-    val logDirs = util.Set.of(Uuid.fromString("ad5FLIeCTnaQdai5vOjeng"), 
Uuid.fromString("ybdzUKmYSLK6oiIpI6CPlw"))
+    val dirs = util.Map.of("/dir1", Uuid.fromString("ad5FLIeCTnaQdai5vOjeng"), 
"/dir2", Uuid.fromString("ybdzUKmYSLK6oiIpI6CPlw"))
     val ctx = new RegistrationTestContext(configProperties)
-    manager = new BrokerLifecycleManager(ctx.config, ctx.time, 
"registration-includes-dirs-", logDirs)
+    manager = new BrokerLifecycleManager(ctx.config, ctx.time, 
"registration-includes-dirs-", dirs)
     val controllerNode = new Node(3000, "localhost", 8021)
     ctx.controllerNodeProvider.node.set(controllerNode)
 
@@ -260,23 +264,23 @@ class BrokerLifecycleManagerTest {
 
     manager.start(() => ctx.highestMetadataOffset.get(),
       ctx.mockChannelManager, ctx.clusterId, ctx.advertisedListeners,
-      Collections.emptyMap(), OptionalLong.empty(), util.Set.of())
+      Collections.emptyMap(), OptionalLong.empty())
     val request = poll(ctx, manager, 
registration).asInstanceOf[BrokerRegistrationRequest]
 
-    assertEquals(logDirs, new util.HashSet(request.data.logDirs()))
+    assertEquals(new util.HashSet(dirs.values()), new 
util.HashSet(request.data.logDirs()))
   }
 
   @Test
   def testKraftJBODMetadataVersionUpdateEvent(): Unit = {
     val ctx = new RegistrationTestContext(configProperties)
-    manager = new BrokerLifecycleManager(ctx.config, ctx.time, 
"jbod-metadata-version-update", 
util.Set.of(Uuid.fromString("gCpDJgRlS2CBCpxoP2VMsQ")))
+    manager = new BrokerLifecycleManager(ctx.config, ctx.time, 
"jbod-metadata-version-update", logDirs)
 
     val controllerNode = new Node(3000, "localhost", 8021)
     ctx.controllerNodeProvider.node.set(controllerNode)
 
     manager.start(() => ctx.highestMetadataOffset.get(),
       ctx.mockChannelManager, ctx.clusterId, ctx.advertisedListeners,
-      Collections.emptyMap(), OptionalLong.of(10L), util.Set.of())
+      Collections.emptyMap(), OptionalLong.of(10L))
 
     def doPoll[T<:AbstractRequest](response: AbstractResponse) = poll(ctx, 
manager, prepareResponse[T](ctx, response))
     def nextHeartbeatRequest() = doPoll[AbstractRequest](new 
BrokerHeartbeatResponse(new BrokerHeartbeatResponseData()))
@@ -300,33 +304,13 @@ class BrokerLifecycleManagerTest {
   }
 
   @Test
-  def testRegistrationIncludesCordonedDirs(): Unit = {
-    val logDirs = util.Set.of(Uuid.fromString("ad5FLIeCTnaQdai5vOjeng"), 
Uuid.fromString("ybdzUKmYSLK6oiIpI6CPlw"))
-    val ctx = new RegistrationTestContext(configProperties)
-    manager = new BrokerLifecycleManager(ctx.config, ctx.time, 
"registration-includes-cordoned-dirs-", logDirs)
-    val controllerNode = new Node(3000, "localhost", 8021)
-    ctx.controllerNodeProvider.node.set(controllerNode)
-
-    val registration = prepareResponse(ctx, new BrokerRegistrationResponse(new 
BrokerRegistrationResponseData().setBrokerEpoch(1000)))
-
-    manager.start(() => ctx.highestMetadataOffset.get(),
-      ctx.mockChannelManager, ctx.clusterId, ctx.advertisedListeners,
-      Collections.emptyMap(), OptionalLong.empty(),
-      logDirs
-    )
-    val request = poll(ctx, manager, 
registration).asInstanceOf[BrokerRegistrationRequest]
-
-    assertEquals(logDirs, new util.HashSet(request.data.cordonedLogDirs()))
-  }
-
-  @Test
-  def testAlwaysSendsAccumulatedCordonedDirs(): Unit = {
+  def testCordonedLogDirs(): Unit = {
     val ctx = new RegistrationTestContext(configProperties)
     var enabled = false
     def cordonedLogDirsEnabled(): Boolean  = {
       enabled
     }
-    manager = new BrokerLifecycleManager(ctx.config, ctx.time, 
"cordoned-dirs-sent-in-heartbeat-", 
util.Set.of(Uuid.fromString("0IbF1sjhSGG6FNvnrPbqQg")),
+    manager = new BrokerLifecycleManager(ctx.config, ctx.time, 
"cordoned-dirs-sent-in-heartbeat-", logDirs,
       () => {}, () => cordonedLogDirsEnabled())
     val controllerNode = new Node(3000, "localhost", 8021)
     ctx.controllerNodeProvider.node.set(controllerNode)
@@ -334,27 +318,33 @@ class BrokerLifecycleManagerTest {
     val registration = prepareResponse(ctx, new BrokerRegistrationResponse(new 
BrokerRegistrationResponseData().setBrokerEpoch(1000)))
     manager.start(() => ctx.highestMetadataOffset.get(),
       ctx.mockChannelManager, ctx.clusterId, ctx.advertisedListeners,
-      Collections.emptyMap(), OptionalLong.empty(), util.Set.of())
+      Collections.emptyMap(), OptionalLong.empty())
     poll(ctx, manager, registration)
 
     def nextHeartbeatDirs(): Set[Uuid] =
-      poll(ctx, manager, prepareResponse[BrokerHeartbeatRequest](ctx, new 
BrokerHeartbeatResponse(new BrokerHeartbeatResponseData())))
-        .data().cordonedLogDirs().asScala.toSet
-    assertEquals(Set(), nextHeartbeatDirs())
+      nextRequest().data().cordonedLogDirs().asScala.toSet
+
+    def nextRequest(): BrokerHeartbeatRequest =
+      poll(ctx, manager, prepareResponse[BrokerHeartbeatRequest](ctx, new 
BrokerHeartbeatResponse(new BrokerHeartbeatResponseData().setIsCaughtUp(true))))
+
+    while (!manager.initialCatchUpFuture().isDone) {
+      nextRequest()
+    }
+    assertNull(nextRequest().data().cordonedLogDirs())
 
     val dir1 = Uuid.randomUuid()
     val dir2 = Uuid.randomUuid()
     manager.propagateDirectoryCordoned(util.Set.of(dir1))
-    assertEquals(Set(), nextHeartbeatDirs())
+    assertNull(nextRequest().data().cordonedLogDirs())
 
     enabled = true
     manager.propagateDirectoryCordoned(util.Set.of(dir1))
     assertEquals(Set(dir1), nextHeartbeatDirs())
     manager.propagateDirectoryCordoned(util.Set.of(dir2))
-    assertEquals(Set(dir1, dir2), nextHeartbeatDirs())
-    manager.propagateDirectoryUncordoned(util.Set.of(dir1))
     assertEquals(Set(dir2), nextHeartbeatDirs())
-    manager.propagateDirectoryUncordoned(util.Set.of(dir2))
+    manager.propagateDirectoryCordoned(util.Set.of(dir1, dir2))
+    assertEquals(Set(dir1, dir2), nextHeartbeatDirs())
+    manager.propagateDirectoryCordoned(util.Set.of())
     assertEquals(Set(), nextHeartbeatDirs())
   }
 }
diff --git a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala 
b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
index 0e14f2f414a..ed1465faa6f 100644
--- a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
@@ -744,7 +744,8 @@ class ControllerApisTest {
     assertEquals(expectedResponse, 
controllerApis.createTopics(ANONYMOUS_CONTEXT, request,
       hasClusterAuth = false,
       _ => Set("baz", "indescribable"),
-      _ => Set("baz")).get().topics().asScala.toSet)
+      _ => Set("baz"),
+      forwarded = false).get().topics().asScala.toSet)
   }
 
   @ParameterizedTest(name = "testCreateTopicsMutationQuota with throttle: {0}")
diff --git 
a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala 
b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
index 012993c7763..fe631381198 100755
--- a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
@@ -1064,7 +1064,6 @@ class DynamicBrokerConfigTest {
     assertTrue(ctx.config.cordonedLogDirs.isEmpty)
     val logDirs = ctx.config.logDirs()
     verify(ctx.directoryEventHandler, never()).handleCordoned(anySet)
-    verify(ctx.directoryEventHandler, never()).handleUncordoned(anySet)
 
     // Cordoning 1 new log dir, so 1 new handleCordoned invocation
     val props = new Properties()
@@ -1072,42 +1071,42 @@ class DynamicBrokerConfigTest {
     ctx.config.dynamicConfig.updateBrokerConfig(0, props)
     assertEquals(util.List.of(logDirs.get(0)), ctx.config.cordonedLogDirs)
     verify(ctx.directoryEventHandler, times(1)).handleCordoned(anySet)
-    verify(ctx.directoryEventHandler, never()).handleUncordoned(anySet)
 
     // When using *, no other entries must be specified, so no new invocations
     props.put(ServerLogConfigs.CORDONED_LOG_DIRS_CONFIG, "*,/invalid/log/dir")
     ctx.config.dynamicConfig.updateBrokerConfig(0, props)
     assertEquals(util.List.of(logDirs.get(0)), ctx.config.cordonedLogDirs)
     verify(ctx.directoryEventHandler, times(1)).handleCordoned(anySet)
-    verify(ctx.directoryEventHandler, never()).handleUncordoned(anySet)
 
     // Invalid log dir, so no new invocations
     props.put(ServerLogConfigs.CORDONED_LOG_DIRS_CONFIG, "/invalid/log/dir")
     ctx.config.dynamicConfig.updateBrokerConfig(0, props)
     assertEquals(util.List.of(logDirs.get(0)), ctx.config.cordonedLogDirs)
     verify(ctx.directoryEventHandler, times(1)).handleCordoned(anySet)
-    verify(ctx.directoryEventHandler, times(0)).handleUncordoned(anySet)
 
     // * cordons the 2nd log dir, so 1 new handleCordoned invocation
     props.put(ServerLogConfigs.CORDONED_LOG_DIRS_CONFIG, "*")
     ctx.config.dynamicConfig.updateBrokerConfig(0, props)
     assertEquals(logDirs, ctx.config.cordonedLogDirs)
     verify(ctx.directoryEventHandler, times(2)).handleCordoned(anySet)
-    verify(ctx.directoryEventHandler, never()).handleUncordoned(anySet)
 
-    // clearing all cordoned log dirs, so 1 new handleUncordoned invocation
+    // same value so no new invocations
+    props.put(ServerLogConfigs.CORDONED_LOG_DIRS_CONFIG, "*")
+    ctx.config.dynamicConfig.updateBrokerConfig(0, props)
+    assertEquals(logDirs, ctx.config.cordonedLogDirs)
+    verify(ctx.directoryEventHandler, times(2)).handleCordoned(anySet)
+
+    // clearing all cordoned log dirs, so 1 new handleCordoned invocation
     props.put(ServerLogConfigs.CORDONED_LOG_DIRS_CONFIG, "")
     ctx.config.dynamicConfig.updateBrokerConfig(0, props)
     assertTrue(ctx.config.cordonedLogDirs.isEmpty)
-    verify(ctx.directoryEventHandler, times(2)).handleCordoned(anySet)
-    verify(ctx.directoryEventHandler, times(1)).handleUncordoned(anySet)
+    verify(ctx.directoryEventHandler, times(3)).handleCordoned(anySet)
 
     // * cordons all log dirs, so 1 new handleCordoned invocation
     props.put(ServerLogConfigs.CORDONED_LOG_DIRS_CONFIG, String.join(",", 
logDirs))
     ctx.config.dynamicConfig.updateBrokerConfig(0, props)
     assertEquals(logDirs, ctx.config.cordonedLogDirs)
-    verify(ctx.directoryEventHandler, times(3)).handleCordoned(anySet)
-    verify(ctx.directoryEventHandler, times(1)).handleUncordoned(anySet)
+    verify(ctx.directoryEventHandler, times(4)).handleCordoned(anySet)
   }
 
   @Test
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 3c0ee3db902..9791d5c9f6e 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -4312,7 +4312,7 @@ class ReplicaManagerTest {
 
     val replicaManager = setupReplicaManagerWithMockedPurgatories(new 
MockTimer(time), localId, setupLogDirMetaProperties = true, 
directoryEventHandler = directoryEventHandler)
     try {
-      val directoryIds = 
replicaManager.logManager.directoryIdsSet.asScala.toList
+      val directoryIds = 
replicaManager.logManager.directoryIds.values.asScala.toList
       assertEquals(directoryIds.size, 2)
       val leaderTopicsDelta: TopicsDelta = topicsCreateDelta(localId, true, 
partitions = List(0), directoryIds = directoryIds)
       val (partition: Partition, isNewWhenCreatedForFirstTime: Boolean) = 
replicaManager.getOrCreatePartition(topicPartition0.topicPartition(), 
leaderTopicsDelta, FOO_UUID).get
@@ -4360,7 +4360,7 @@ class ReplicaManagerTest {
     try {
 
       // Test applying delta as leader
-      val directoryIds = 
replicaManager.logManager.directoryIdsSet.asScala.toList
+      val directoryIds = 
replicaManager.logManager.directoryIds.values.asScala.toList
       // Make the local replica the leader
       val leaderTopicsDelta = topicsCreateDelta(localId, true, partitions = 
List(0), directoryIds = directoryIds)
       val leaderMetadataImage = imageFromTopics(leaderTopicsDelta.apply())
@@ -5886,7 +5886,7 @@ class ReplicaManagerTest {
 
     val rm = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), 
localId, setupLogDirMetaProperties = true, directoryEventHandler = 
directoryEventHandler)
     try {
-      val directoryIds = rm.logManager.directoryIdsSet.asScala.toList
+      val directoryIds = rm.logManager.directoryIds.values.asScala.toList
       assertEquals(directoryIds.size, 2)
       val leaderTopicsDelta: TopicsDelta = topicsCreateDelta(localId, 
isStartIdLeader = true, directoryIds = directoryIds)
       val (partition: Partition, _) = 
rm.getOrCreatePartition(topicPartition0.topicPartition(), leaderTopicsDelta, 
FOO_UUID).get
@@ -5919,7 +5919,7 @@ class ReplicaManagerTest {
 
     val rm = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), 
localId, setupLogDirMetaProperties = true, directoryEventHandler = 
directoryEventHandler)
     try {
-      val directoryIds = rm.logManager.directoryIdsSet.asScala.toList
+      val directoryIds = rm.logManager.directoryIds.values.asScala.toList
       assertEquals(directoryIds.size, 2)
       val leaderTopicsDelta: TopicsDelta = topicsCreateDelta(localId, 
isStartIdLeader = true, directoryIds = directoryIds)
       val (partition: Partition, _) = 
rm.getOrCreatePartition(topicPartition0.topicPartition(), leaderTopicsDelta, 
FOO_UUID).get
diff --git a/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
index 10b2188f0a8..183de9edd13 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
@@ -120,6 +120,7 @@ class ReplicationQuotasTest extends QuorumTestHarness {
           Map(new ConfigResource(BROKER, String.valueOf(brokerId)) -> Map(
             QuotaConfig.LEADER_REPLICATION_THROTTLED_RATE_CONFIG -> entry,
             QuotaConfig.FOLLOWER_REPLICATION_THROTTLED_RATE_CONFIG -> 
entry).asJava).asJava,
+          false,
           false
         ).get()
       }
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java 
b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
index 0d6da833a7a..b7665eeb8c0 100644
--- 
a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
+++ 
b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
@@ -432,9 +432,6 @@ public class ClusterControlManager {
         if 
(featureControl.metadataVersionOrThrow().isDirectoryAssignmentSupported()) {
             record.setLogDirs(request.logDirs());
         }
-        if 
(featureControl.metadataVersionOrThrow().isCordonedLogDirsSupported()) {
-            record.setCordonedLogDirs(request.cordonedLogDirs());
-        }
         if (!request.incarnationId().equals(prevIncarnationId)) {
             int prevNumRecords = records.size();
             boolean isCleanShutdown = cleanShutdownDetectionEnabled ?
@@ -555,22 +552,6 @@ public class ClusterControlManager {
         return OptionalLong.empty();
     }
 
-    public void updateCordonedLogDirs(int brokerId, List<Uuid> 
cordonedLogDirs) {
-        brokerRegistrations.compute(brokerId,
-                (k, brokerRegistration) -> new BrokerRegistration.Builder().
-                        setId(brokerId).
-                        setEpoch(brokerRegistration.epoch()).
-                        setIncarnationId(brokerRegistration.incarnationId()).
-                        setListeners(brokerRegistration.listeners()).
-                        
setSupportedFeatures(brokerRegistration.supportedFeatures()).
-                        setRack(brokerRegistration.rack()).
-                        setFenced(brokerRegistration.fenced()).
-                        
setInControlledShutdown(brokerRegistration.inControlledShutdown()).
-                        setDirectories(brokerRegistration.directories()).
-                        setCordonedDirectories(cordonedLogDirs).
-                        build());
-    }
-
     public void replay(RegisterBrokerRecord record, long offset) {
         registerBrokerRecordOffsets.put(record.brokerId(), offset);
         int brokerId = record.brokerId();
@@ -663,7 +644,7 @@ public class ClusterControlManager {
                 () -> new IllegalStateException(String.format("Unable to 
replay %s: unknown " +
                     "value for inControlledShutdown field: %x", record, 
record.inControlledShutdown())));
         Optional<List<Uuid>> directoriesChange = 
Optional.ofNullable(record.logDirs()).filter(list -> !list.isEmpty());
-        Optional<List<Uuid>> cordonedDirectoriesChange = 
Optional.ofNullable(record.cordonedLogDirs()).filter(list -> !list.isEmpty());
+        Optional<List<Uuid>> cordonedDirectoriesChange = 
Optional.ofNullable(record.cordonedLogDirs());
         replayRegistrationChange(
             record,
             record.brokerId(),
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
 
b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
index 705321a235c..3ff214dfc70 100644
--- 
a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
+++ 
b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
@@ -45,6 +45,7 @@ import org.apache.kafka.timeline.TimelineHashSet;
 import org.slf4j.Logger;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
@@ -54,6 +55,7 @@ import java.util.NoSuchElementException;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.function.Consumer;
+import java.util.regex.Pattern;
 
 import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.APPEND;
 import static org.apache.kafka.common.config.ConfigResource.Type.BROKER;
@@ -67,6 +69,7 @@ import static 
org.apache.kafka.server.config.ServerLogConfigs.CORDONED_LOG_DIRS_
 
 public class ConfigurationControlManager {
     public static final ConfigResource DEFAULT_NODE = new 
ConfigResource(Type.BROKER, "");
+    private static final Pattern COMMA_WITH_WHITESPACE = 
Pattern.compile("\\s*,\\s*");
 
     private final Logger log;
     private final SnapshotRegistry snapshotRegistry;
@@ -210,7 +213,8 @@ public class ConfigurationControlManager {
      */
     ControllerResult<Map<ConfigResource, ApiError>> incrementalAlterConfigs(
         Map<ConfigResource, Map<String, Entry<OpType, String>>> configChanges,
-        boolean newlyCreatedResource
+        boolean newlyCreatedResource,
+        boolean forwarded
     ) {
         List<ApiMessageAndVersion> outputRecords =
                 BoundedList.newArrayBacked(MAX_RECORDS_PER_USER_OP);
@@ -220,7 +224,8 @@ public class ConfigurationControlManager {
             ApiError apiError = 
incrementalAlterConfigResource(resourceEntry.getKey(),
                 resourceEntry.getValue(),
                 newlyCreatedResource,
-                outputRecords);
+                outputRecords,
+                forwarded);
             outputResults.put(resourceEntry.getKey(), apiError);
         }
         outputRecords.addAll(createClearElrRecordsAsNeeded(outputRecords));
@@ -252,14 +257,16 @@ public class ConfigurationControlManager {
     ControllerResult<ApiError> incrementalAlterConfig(
         ConfigResource configResource,
         Map<String, Entry<OpType, String>> keyToOps,
-        boolean newlyCreatedResource
+        boolean newlyCreatedResource,
+        boolean forwarded
     ) {
         List<ApiMessageAndVersion> outputRecords =
                 BoundedList.newArrayBacked(MAX_RECORDS_PER_USER_OP);
         ApiError apiError = incrementalAlterConfigResource(configResource,
             keyToOps,
             newlyCreatedResource,
-            outputRecords);
+            outputRecords,
+            forwarded);
 
         outputRecords.addAll(createClearElrRecordsAsNeeded(outputRecords));
         return ControllerResult.atomicOf(outputRecords, apiError);
@@ -269,7 +276,8 @@ public class ConfigurationControlManager {
         ConfigResource configResource,
         Map<String, Entry<OpType, String>> keysToOps,
         boolean newlyCreatedResource,
-        List<ApiMessageAndVersion> outputRecords
+        List<ApiMessageAndVersion> outputRecords,
+        boolean forwarded
     ) {
         List<ApiMessageAndVersion> newRecords = new ArrayList<>();
         for (Entry<String, Entry<OpType, String>> keysToOpsEntry : 
keysToOps.entrySet()) {
@@ -321,7 +329,7 @@ public class ConfigurationControlManager {
                     setValue(newValue), (short) 0));
             }
         }
-        ApiError error = validateAlterConfig(configResource, newRecords, 
List.of(), newlyCreatedResource);
+        ApiError error = validateAlterConfig(configResource, newRecords, 
List.of(), newlyCreatedResource, forwarded);
         if (error.isFailure()) {
             return error;
         }
@@ -333,7 +341,8 @@ public class ConfigurationControlManager {
         ConfigResource configResource,
         List<ApiMessageAndVersion> recordsExplicitlyAltered,
         List<ApiMessageAndVersion> recordsImplicitlyDeleted,
-        boolean newlyCreatedResource
+        boolean newlyCreatedResource,
+        boolean forwarded
     ) {
         Map<String, String> allConfigs = new HashMap<>();
         Map<String, String> existingConfigsMap = new HashMap<>();
@@ -349,8 +358,10 @@ public class ConfigurationControlManager {
                 return DISALLOWED_BROKER_MIN_ISR_TRANSITION_ERROR;
             } else if (isDisallowedClusterMinIsrTransition(configRecord)) {
                 return DISALLOWED_CLUSTER_MIN_ISR_REMOVAL_ERROR;
-            } else if (isCordonedLogDirsDisallowed(configRecord)) {
-                return DISALLOWED_CORDONED_LOG_DIRS_ERROR;
+            } else if (isCordonedLogDirsDisabled(configRecord)) {
+                return DISABLED_CORDONED_LOG_DIRS_ERROR;
+            } else if (isCordonedLogDirsInvalid(configRecord, 
existingConfigsMap.get(CORDONED_LOG_DIRS_CONFIG), forwarded)) {
+                return INVALID_CORDONED_LOG_DIRS_ERROR;
             } else if (configRecord.value() == null) {
                 allConfigs.remove(configRecord.name());
             } else if (configRecord.value().length() > Short.MAX_VALUE) {
@@ -368,8 +379,10 @@ public class ConfigurationControlManager {
                 return DISALLOWED_BROKER_MIN_ISR_TRANSITION_ERROR;
             } else if (isDisallowedClusterMinIsrTransition(configRecord)) {
                 return DISALLOWED_CLUSTER_MIN_ISR_REMOVAL_ERROR;
-            } else if (isCordonedLogDirsDisallowed(configRecord)) {
-                return DISALLOWED_CORDONED_LOG_DIRS_ERROR;
+            } else if (isCordonedLogDirsDisabled(configRecord)) {
+                return DISABLED_CORDONED_LOG_DIRS_ERROR;
+            } else if (isCordonedLogDirsInvalid(configRecord, 
existingConfigsMap.get(CORDONED_LOG_DIRS_CONFIG), forwarded)) {
+                return INVALID_CORDONED_LOG_DIRS_ERROR;
             } else {
                 allConfigs.remove(configRecord.name());
             }
@@ -407,10 +420,14 @@ public class ConfigurationControlManager {
         new ApiError(INVALID_CONFIG, "The configuration value cannot be added 
because " +
             "it exceeds the maximum value size of " + Short.MAX_VALUE + " 
bytes.");
 
-    static final ApiError DISALLOWED_CORDONED_LOG_DIRS_ERROR =
+    static final ApiError DISABLED_CORDONED_LOG_DIRS_ERROR =
             new ApiError(INVALID_CONFIG, "The " + CORDONED_LOG_DIRS_CONFIG + " 
configuration value cannot be " +
                     "set because it requires metadata.version >= " + 
MetadataVersion.IBP_4_3_IV0);
 
+    static final ApiError INVALID_CORDONED_LOG_DIRS_ERROR =
+            new ApiError(INVALID_CONFIG, "When updating " + 
CORDONED_LOG_DIRS_CONFIG + " via controllers, " +
+                    " the new value must be a subset of the current 
configuration value.");
+
     boolean isDisallowedBrokerMinIsrTransition(ConfigRecord configRecord) {
         if (configRecord.name().equals(MIN_IN_SYNC_REPLICAS_CONFIG) &&
                 configRecord.resourceType() == BROKER.id() &&
@@ -422,7 +439,32 @@ public class ConfigurationControlManager {
         return false;
     }
 
-    boolean isCordonedLogDirsDisallowed(ConfigRecord configRecord) {
+    /**
+     * Return whether the update to cordoned.log.dirs is valid or not
+     *
+     * Updates to cordoned.log.dirs normally go through the concerned broker 
which is able to validate the new value
+     * before forwarding the request to the controller.
+     * However, it's also possible to directly go to controllers, but since 
controllers only have directory ids, they
+     * cannot fully validate updates (cordoned.log.dirs is a list of paths). 
So if the request has not been forwarded
+     * by a broker, controllers can only accept updates that remove entries in 
cordoned.log.dirs.
+     *
+     * @param configRecord The configuration record
+     * @param currentValue The current cordoned.log.dirs value
+     * @param forwarded    True is the request has been forwarded by a broker
+     */
+    boolean isCordonedLogDirsInvalid(ConfigRecord configRecord, String 
currentValue, boolean forwarded) {
+        if (!configRecord.name().equals(CORDONED_LOG_DIRS_CONFIG) || 
configRecord.resourceType() != BROKER.id() ||
+                forwarded || configRecord.value() == null || 
configRecord.value().trim().isEmpty()) {
+            return false;
+        }
+        List<String> currentDirs = currentValue == null
+            ? List.of()
+            : Arrays.asList(COMMA_WITH_WHITESPACE.split(currentValue.trim(), 
-1));
+        List<String> newDirs = 
Arrays.asList(COMMA_WITH_WHITESPACE.split(configRecord.value().trim(), -1));
+        return !currentDirs.containsAll(newDirs);
+    }
+
+    boolean isCordonedLogDirsDisabled(ConfigRecord configRecord) {
         if (configRecord.name().equals(CORDONED_LOG_DIRS_CONFIG) &&
                 configRecord.resourceType() == BROKER.id()) {
             return 
!featureControl.metadataVersionOrThrow().isCordonedLogDirsSupported();
@@ -449,11 +491,13 @@ public class ConfigurationControlManager {
      *
      * @param newConfigs        The new configurations to install for each 
resource.
      *                          All existing configurations will be 
overwritten.
+     * @param forwarded         True if the request was forwarded.
      * @return                  The result.
      */
     ControllerResult<Map<ConfigResource, ApiError>> legacyAlterConfigs(
         Map<ConfigResource, Map<String, String>> newConfigs,
-        boolean newlyCreatedResource
+        boolean newlyCreatedResource,
+        boolean forwarded
     ) {
         List<ApiMessageAndVersion> outputRecords =
                 BoundedList.newArrayBacked(MAX_RECORDS_PER_USER_OP);
@@ -464,7 +508,8 @@ public class ConfigurationControlManager {
                 resourceEntry.getValue(),
                 newlyCreatedResource,
                 outputRecords,
-                outputResults);
+                outputResults,
+                forwarded);
         }
         outputRecords.addAll(createClearElrRecordsAsNeeded(outputRecords));
         return ControllerResult.atomicOf(outputRecords, outputResults);
@@ -474,7 +519,8 @@ public class ConfigurationControlManager {
                                            Map<String, String> newConfigs,
                                            boolean newlyCreatedResource,
                                            List<ApiMessageAndVersion> 
outputRecords,
-                                           Map<ConfigResource, ApiError> 
outputResults) {
+                                           Map<ConfigResource, ApiError> 
outputResults,
+                                           boolean forwarded) {
         List<ApiMessageAndVersion> recordsExplicitlyAltered = new 
ArrayList<>();
         Map<String, String> currentConfigs = configData.get(configResource);
         if (currentConfigs == null) {
@@ -503,7 +549,7 @@ public class ConfigurationControlManager {
                     setValue(null), (short) 0));
             }
         }
-        ApiError error = validateAlterConfig(configResource, 
recordsExplicitlyAltered, recordsImplicitlyDeleted, newlyCreatedResource);
+        ApiError error = validateAlterConfig(configResource, 
recordsExplicitlyAltered, recordsImplicitlyDeleted, newlyCreatedResource, 
forwarded);
         if (error.isFailure()) {
             outputResults.put(configResource, error);
             return;
diff --git a/metadata/src/main/java/org/apache/kafka/controller/Controller.java 
b/metadata/src/main/java/org/apache/kafka/controller/Controller.java
index 6845e908e7a..28aa076291b 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/Controller.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/Controller.java
@@ -136,13 +136,15 @@ public interface Controller extends AclMutator, 
AutoCloseable {
      * @param context       The controller request context.
      * @param request       The CreateTopicsRequest data.
      * @param describable   The topics which we have DESCRIBE permission on.
+     * @param forwarded     True if the request was forwarded.
      *
      * @return              A future yielding the response.
      */
     CompletableFuture<CreateTopicsResponseData> createTopics(
         ControllerRequestContext context,
         CreateTopicsRequestData request,
-        Set<String> describable
+        Set<String> describable,
+        boolean forwarded
     );
 
     /**
@@ -252,13 +254,15 @@ public interface Controller extends AclMutator, 
AutoCloseable {
      * @param context       The controller request context.
      * @param configChanges The changes.
      * @param validateOnly  True if we should validate the changes but not 
apply them.
+     * @param forwarded     True if the request was forwarded.
      *
      * @return              A future yielding a map from config resources to 
error results.
      */
     CompletableFuture<Map<ConfigResource, ApiError>> incrementalAlterConfigs(
         ControllerRequestContext context,
         Map<ConfigResource, Map<String, Map.Entry<AlterConfigOp.OpType, 
String>>> configChanges,
-        boolean validateOnly
+        boolean validateOnly,
+        boolean forwarded
     );
 
     /**
@@ -293,13 +297,15 @@ public interface Controller extends AclMutator, 
AutoCloseable {
      * @param context       The controller request context.
      * @param newConfigs    The new configuration maps to apply.
      * @param validateOnly  True if we should validate the changes but not 
apply them.
+     * @param forwarded     True if the request was forwarded.
      *
      * @return              A future yielding a map from config resources to 
error results.
      */
     CompletableFuture<Map<ConfigResource, ApiError>> legacyAlterConfigs(
         ControllerRequestContext context,
         Map<ConfigResource, Map<String, String>> newConfigs,
-        boolean validateOnly
+        boolean validateOnly,
+        boolean forwarded
     );
 
     /**
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java 
b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index 1000c04623f..35ad95356c1 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -1788,13 +1788,14 @@ public final class QuorumController implements 
Controller {
     @Override
     public CompletableFuture<CreateTopicsResponseData> createTopics(
         ControllerRequestContext context,
-        CreateTopicsRequestData request, Set<String> describable
+        CreateTopicsRequestData request, Set<String> describable,
+        boolean forwarded
     ) {
         if (request.topics().isEmpty()) {
             return CompletableFuture.completedFuture(new 
CreateTopicsResponseData());
         }
         return appendWriteEvent("createTopics", context.deadlineNs(),
-            () -> replicationControl.createTopics(context, request, 
describable));
+            () -> replicationControl.createTopics(context, request, 
describable, forwarded));
     }
 
     @Override
@@ -1884,14 +1885,15 @@ public final class QuorumController implements 
Controller {
     public CompletableFuture<Map<ConfigResource, ApiError>> 
incrementalAlterConfigs(
         ControllerRequestContext context,
         Map<ConfigResource, Map<String, Entry<OpType, String>>> configChanges,
-        boolean validateOnly
+        boolean validateOnly,
+        boolean forwarded
     ) {
         if (configChanges.isEmpty()) {
             return CompletableFuture.completedFuture(Map.of());
         }
         return appendWriteEvent("incrementalAlterConfigs", 
context.deadlineNs(), () -> {
             ControllerResult<Map<ConfigResource, ApiError>> result =
-                configurationControl.incrementalAlterConfigs(configChanges, 
false);
+                configurationControl.incrementalAlterConfigs(configChanges, 
false, forwarded);
             if (validateOnly) {
                 return result.withoutRecords();
             } else {
@@ -1929,14 +1931,16 @@ public final class QuorumController implements 
Controller {
     @Override
     public CompletableFuture<Map<ConfigResource, ApiError>> legacyAlterConfigs(
         ControllerRequestContext context,
-        Map<ConfigResource, Map<String, String>> newConfigs, boolean 
validateOnly
+        Map<ConfigResource, Map<String, String>> newConfigs,
+        boolean validateOnly,
+        boolean forwarded
     ) {
         if (newConfigs.isEmpty()) {
             return CompletableFuture.completedFuture(Map.of());
         }
         return appendWriteEvent("legacyAlterConfigs", context.deadlineNs(), () 
-> {
             ControllerResult<Map<ConfigResource, ApiError>> result =
-                configurationControl.legacyAlterConfigs(newConfigs, false);
+                configurationControl.legacyAlterConfigs(newConfigs, false, 
forwarded);
             if (validateOnly) {
                 return result.withoutRecords();
             } else {
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
 
b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
index 1a5bc40ed43..ae1854f3ced 100644
--- 
a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
+++ 
b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
@@ -627,7 +627,8 @@ public class ReplicationControlManager {
     ControllerResult<CreateTopicsResponseData> createTopics(
         ControllerRequestContext context,
         CreateTopicsRequestData request,
-        Set<String> describable
+        Set<String> describable,
+        boolean forwarded
     ) {
         Map<String, ApiError> topicErrors = new HashMap<>();
         List<ApiMessageAndVersion> records = 
BoundedList.newArrayBacked(MAX_RECORDS_PER_USER_OP);
@@ -657,7 +658,7 @@ public class ReplicationControlManager {
             List<ApiMessageAndVersion> configRecords;
             if (keyToOps != null) {
                 ControllerResult<ApiError> configResult =
-                    
configurationControl.incrementalAlterConfig(configResource, keyToOps, true);
+                    
configurationControl.incrementalAlterConfig(configResource, keyToOps, true, 
forwarded);
                 if (configResult.response().isFailure()) {
                     topicErrors.put(topic.name(), configResult.response());
                     continue;
@@ -1545,17 +1546,17 @@ public class ReplicationControlManager {
             List<Uuid> cordonedDirs,
             List<ApiMessageAndVersion> records
     ) {
+        // cordonedDirs is null until a broker has caught up with the latest 
metadata, so just ignore
+        if (cordonedDirs == null) return;
         BrokerRegistration registration = 
clusterControl.registration(brokerId);
-        List<Uuid> newCordonedDirs = 
registration.directoryIntersection(cordonedDirs);
-        if (!newCordonedDirs.isEmpty()) {
+        boolean cordonedDirsChanged = 
registration.cordonedDirChanged(cordonedDirs);
+        if (cordonedDirsChanged) {
             records.add(new ApiMessageAndVersion(new 
BrokerRegistrationChangeRecord().
                     setBrokerId(brokerId).setBrokerEpoch(brokerEpoch).
-                    setCordonedLogDirs(newCordonedDirs),
+                    setCordonedLogDirs(cordonedDirs),
                     (short) 3));
             if (log.isDebugEnabled()) {
-                List<Uuid> newUncordonedDirs = 
registration.directoryDifference(newCordonedDirs);
-                log.debug("Directories {} in broker {} marked cordoned, 
uncordoned directories: {}",
-                        newCordonedDirs, brokerId, newUncordonedDirs);
+                log.debug("Directories {} in broker {} marked cordoned", 
cordonedDirs, brokerId);
             }
         }
     }
@@ -1697,7 +1698,6 @@ public class ReplicationControlManager {
             handleDirectoriesOffline(brokerId, brokerEpoch, 
request.offlineLogDirs(), records);
         }
         if 
(featureControl.metadataVersionOrThrow().isCordonedLogDirsSupported()) {
-            clusterControl.updateCordonedLogDirs(brokerId, 
request.cordonedLogDirs());
             handleDirectoriesCordoned(brokerId, brokerEpoch, 
request.cordonedLogDirs(), records);
         }
         boolean isCaughtUp = request.currentMetadataOffset() >= 
registerBrokerRecordOffset;
diff --git a/metadata/src/main/java/org/apache/kafka/image/ClusterDelta.java 
b/metadata/src/main/java/org/apache/kafka/image/ClusterDelta.java
index bfdf9810a0b..1ab6e44423c 100644
--- a/metadata/src/main/java/org/apache/kafka/image/ClusterDelta.java
+++ b/metadata/src/main/java/org/apache/kafka/image/ClusterDelta.java
@@ -142,7 +142,7 @@ public final class ClusterDelta {
                 () -> new IllegalStateException(String.format("Unable to 
replay %s: unknown " +
                     "value for inControlledShutdown field: %d", record, 
record.inControlledShutdown())));
         Optional<List<Uuid>> directoriesChange = 
Optional.ofNullable(record.logDirs()).filter(list -> !list.isEmpty());
-        Optional<List<Uuid>> cordonedDirectoriesChange = 
Optional.ofNullable(record.cordonedLogDirs()).filter(list -> !list.isEmpty());
+        Optional<List<Uuid>> cordonedDirectoriesChange = 
Optional.ofNullable(record.cordonedLogDirs());
         BrokerRegistration nextRegistration = curRegistration.cloneWith(
             fencingChange.asBoolean(),
             inControlledShutdownChange.asBoolean(),
diff --git 
a/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java 
b/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java
index 4dcb5ba1b36..0545869901c 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java
@@ -36,6 +36,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Objects;
 import java.util.Optional;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 /**
@@ -67,7 +68,8 @@ public class BrokerRegistration {
             this.inControlledShutdown = false;
             this.isMigratingZkBroker = false;
             this.directories = List.of();
-            this.cordonedDirectories = List.of();
+            // This defaults to null indicating the broker has not yet sent 
its cordoned log dirs.
+            this.cordonedDirectories = null;
         }
 
         public Builder setId(int id) {
@@ -196,7 +198,7 @@ public class BrokerRegistration {
         directories = new ArrayList<>(directories);
         directories.sort(Uuid::compareTo);
         this.directories = Collections.unmodifiableList(directories);
-        this.cordonedDirectories = 
Collections.unmodifiableList(cordonedDirectories);
+        this.cordonedDirectories = cordonedDirectories == null ? null : 
Collections.unmodifiableList(cordonedDirectories);
     }
 
     public static BrokerRegistration fromRecord(RegisterBrokerRecord record) {
@@ -282,7 +284,7 @@ public class BrokerRegistration {
     }
 
     public boolean hasUncordonedDirs() {
-        if (directories.isEmpty()) return true;
+        if (directories.isEmpty() || cordonedDirectories == null) return true;
         List<Uuid> dirs = new ArrayList<>(directories);
         dirs.removeAll(cordonedDirectories);
         return !dirs.isEmpty();
@@ -308,6 +310,15 @@ public class BrokerRegistration {
         return results;
     }
 
+    public boolean cordonedDirChanged(List<Uuid> otherDirectories) {
+        // Brokers only start sending their cordoned log dirs once they are 
fully caught up with the metadata
+        // until then cordonedDirectories defaults to null to indicate the 
value is unknown
+        if (cordonedDirectories == null) return true;
+        Set<Uuid> cordonedDirs = Set.copyOf(cordonedDirectories);
+        Set<Uuid> otherDirs = Set.copyOf(otherDirectories);
+        return !cordonedDirs.equals(otherDirs);
+    }
+
     public ApiMessageAndVersion toRecord(ImageWriterOptions options) {
         RegisterBrokerRecord registrationRecord = new RegisterBrokerRecord().
             setBrokerId(id).
@@ -331,7 +342,7 @@ public class BrokerRegistration {
             options.handleLoss("the online log directories of one or more 
brokers");
         }
 
-        if (cordonedDirectories.isEmpty() || 
options.metadataVersion().isCordonedLogDirsSupported()) {
+        if (cordonedDirectories == null || 
options.metadataVersion().isCordonedLogDirsSupported()) {
             registrationRecord.setCordonedLogDirs(cordonedDirectories);
         } else {
             options.handleLoss("the cordoned log directories of one or more 
brokers");
@@ -376,7 +387,7 @@ public class BrokerRegistration {
             other.inControlledShutdown == inControlledShutdown &&
             other.isMigratingZkBroker == isMigratingZkBroker &&
             other.directories.equals(directories) &&
-            other.cordonedDirectories.equals(cordonedDirectories);
+            Objects.equals(other.cordonedDirectories, cordonedDirectories);
     }
 
     @Override
@@ -416,7 +427,7 @@ public class BrokerRegistration {
         if (newFenced == fenced
                 && newInControlledShutdownChange == inControlledShutdown
                 && newDirectories.equals(directories)
-                && newCordonedDirectories.equals(cordonedDirectories))
+                && Objects.equals(newCordonedDirectories, cordonedDirectories))
             return this;
 
         return new BrokerRegistration(
diff --git 
a/metadata/src/main/resources/common/metadata/BrokerRegistrationChangeRecord.json
 
b/metadata/src/main/resources/common/metadata/BrokerRegistrationChangeRecord.json
index 5329e53b2c7..61109f12913 100644
--- 
a/metadata/src/main/resources/common/metadata/BrokerRegistrationChangeRecord.json
+++ 
b/metadata/src/main/resources/common/metadata/BrokerRegistrationChangeRecord.json
@@ -33,7 +33,8 @@
      "about": "0 if no change, 1 if the broker is in controlled shutdown." },
    { "name": "LogDirs", "type":  "[]uuid", "versions":  "2+", 
"taggedVersions": "2+", "tag": 2,
      "about": "Log directories configured in this broker which are available." 
},
-   { "name": "CordonedLogDirs", "type":  "[]uuid", "versions":  "3+", 
"taggedVersions": "3+", "tag": "3",
-     "about": "Log directories that are cordoned." }
+   { "name": "CordonedLogDirs", "type":  "[]uuid", "versions":  "3+", 
"taggedVersions": "3+", "tag": 3,
+     "nullableVersions": "3+", "default": "null",
+     "about": "Log directories that are cordoned or null if no change." }
   ]
 }
diff --git 
a/metadata/src/main/resources/common/metadata/RegisterBrokerRecord.json 
b/metadata/src/main/resources/common/metadata/RegisterBrokerRecord.json
index b7db680cbd3..12af50ce17d 100644
--- a/metadata/src/main/resources/common/metadata/RegisterBrokerRecord.json
+++ b/metadata/src/main/resources/common/metadata/RegisterBrokerRecord.json
@@ -60,7 +60,8 @@
       "about": "True if the broker is in controlled shutdown." },
     { "name": "LogDirs", "type":  "[]uuid", "versions":  "3+", 
"taggedVersions": "3+", "tag": 0,
       "about": "Log directories configured in this broker which are 
available." },
-    { "name": "CordonedLogDirs", "type":  "[]uuid", "versions":  "4+", 
"taggedVersions": "4+", "tag": "1",
-      "about": "Log directories that are cordoned." }
+    { "name": "CordonedLogDirs", "type":  "[]uuid", "versions":  "4+", 
"taggedVersions": "4+", "tag": 1,
+      "nullableVersions": "4+", "default":  "null",
+      "about": "Log directories that are cordoned. This is initially null 
before the broker starts sending the value via its heartbeats." }
   ]
 }
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
 
b/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
index 2eb882fdd67..e9d4a0c8188 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
@@ -209,11 +209,14 @@ public class ClusterControlManagerTest {
         assertFalse(clusterControl.isUnfenced(0));
         assertFalse(clusterControl.inControlledShutdown(0));
 
+        Uuid dir1 = Uuid.fromString("Mj3CW3OSRi29cFeNJlXuAQ");
+        Uuid dir2 = Uuid.fromString("TyNK6XSSQJaJc2q9uflNHg");
         RegisterBrokerRecord brokerRecord = new RegisterBrokerRecord().
             setBrokerEpoch(100).
             setBrokerId(0).
             setRack(null).
-            setFenced(false);
+            setFenced(false).
+            setLogDirs(List.of(dir1, dir2));
         brokerRecord.endPoints().add(new BrokerEndpoint().
             setSecurityProtocol(SecurityProtocol.PLAINTEXT.id).
             setPort((short) 9092).
@@ -241,6 +244,37 @@ public class ClusterControlManagerTest {
 
         assertTrue(clusterControl.isUnfenced(0));
         assertTrue(clusterControl.inControlledShutdown(0));
+
+        // By default cordonedLogDirs is null and means no changes
+        assertEquals(null, 
clusterControl.registration(0).cordonedDirectories());
+        registrationChangeRecord = new BrokerRegistrationChangeRecord().
+            setBrokerId(0).
+            setBrokerEpoch(100);
+        clusterControl.replay(registrationChangeRecord);
+        assertEquals(null, 
clusterControl.registration(0).cordonedDirectories());
+
+        // Set a cordoned log dir
+        registrationChangeRecord = new BrokerRegistrationChangeRecord().
+            setBrokerId(0).
+            setBrokerEpoch(100).
+            setCordonedLogDirs(List.of(dir1));
+        clusterControl.replay(registrationChangeRecord);
+        assertEquals(List.of(dir1), 
clusterControl.registration(0).cordonedDirectories());
+
+        // null cordonedLogDirs so not changes
+        registrationChangeRecord = new BrokerRegistrationChangeRecord().
+            setBrokerId(0).
+            setBrokerEpoch(100);
+        clusterControl.replay(registrationChangeRecord);
+        assertEquals(List.of(dir1), 
clusterControl.registration(0).cordonedDirectories());
+
+        // Clears the cordoned dir
+        registrationChangeRecord = new BrokerRegistrationChangeRecord().
+            setBrokerId(0).
+            setBrokerEpoch(100).
+            setCordonedLogDirs(List.of());
+        clusterControl.replay(registrationChangeRecord);
+        assertEquals(List.of(), 
clusterControl.registration(0).cordonedDirectories());
     }
 
     @Test
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java
 
b/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java
index 03173552e90..b1e5f4c8872 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java
@@ -42,10 +42,13 @@ import 
org.apache.kafka.server.policy.AlterConfigPolicy.RequestMetadata;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
 import org.junit.jupiter.params.provider.ValueSource;
 
 import java.util.AbstractMap.SimpleImmutableEntry;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -55,6 +58,7 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Consumer;
+import java.util.stream.Stream;
 
 import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.APPEND;
 import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.DELETE;
@@ -63,8 +67,11 @@ import static 
org.apache.kafka.clients.admin.AlterConfigOp.OpType.SUBTRACT;
 import static org.apache.kafka.common.config.ConfigResource.Type.BROKER;
 import static org.apache.kafka.common.config.ConfigResource.Type.TOPIC;
 import static 
org.apache.kafka.common.metadata.MetadataRecordType.CONFIG_RECORD;
-import static 
org.apache.kafka.controller.ConfigurationControlManager.DISALLOWED_CORDONED_LOG_DIRS_ERROR;
+import static 
org.apache.kafka.controller.ConfigurationControlManager.DISABLED_CORDONED_LOG_DIRS_ERROR;
+import static 
org.apache.kafka.controller.ConfigurationControlManager.INVALID_CORDONED_LOG_DIRS_ERROR;
 import static 
org.apache.kafka.server.config.ConfigSynonym.HOURS_TO_MILLISECONDS;
+import static 
org.apache.kafka.server.config.ServerLogConfigs.CORDONED_LOG_DIRS_CONFIG;
+import static org.apache.kafka.server.config.ServerLogConfigs.LOG_DIRS_CONFIG;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -168,7 +175,8 @@ public class ConfigurationControlManagerTest {
                 entry("baz", entry(SUBTRACT, "abc")),
                 entry("quux", entry(SET, "abc")))),
                 entry(MYTOPIC, toMap(entry("abc", entry(APPEND, "123"))))),
-                true);
+                true,
+                false);
 
         assertEquals(ControllerResult.atomicOf(List.of(new 
ApiMessageAndVersion(
                 new 
ConfigRecord().setResourceType(TOPIC.id()).setResourceName("mytopic").
@@ -185,7 +193,7 @@ public class ConfigurationControlManagerTest {
                 toMap(entry(MYTOPIC, ApiError.NONE))),
             manager.incrementalAlterConfigs(toMap(entry(MYTOPIC, toMap(
                 entry("abc", entry(DELETE, "xyz"))))),
-                true));
+                true, false));
     }
 
     @Test
@@ -197,7 +205,7 @@ public class ConfigurationControlManagerTest {
         Map<String, Entry<AlterConfigOp.OpType, String>> keyToOps = 
toMap(entry("abc", entry(APPEND, "123")));
 
         ControllerResult<ApiError> result = manager.
-            incrementalAlterConfig(MYTOPIC, keyToOps, true);
+            incrementalAlterConfig(MYTOPIC, keyToOps, true, false);
 
         assertEquals(ControllerResult.atomicOf(List.of(new 
ApiMessageAndVersion(
                 new 
ConfigRecord().setResourceType(TOPIC.id()).setResourceName("mytopic").
@@ -210,13 +218,13 @@ public class ConfigurationControlManagerTest {
                     new 
ConfigRecord().setResourceType(TOPIC.id()).setResourceName("mytopic").
                         setName("abc").setValue(null), 
CONFIG_RECORD.highestSupportedVersion())),
                 ApiError.NONE),
-            manager.incrementalAlterConfig(MYTOPIC, toMap(entry("abc", 
entry(DELETE, "xyz"))), true));
+            manager.incrementalAlterConfig(MYTOPIC, toMap(entry("abc", 
entry(DELETE, "xyz"))), true, false));
 
         // The configuration value exceeding the maximum size is not allowed 
to be added.
         String largeValue = new String(new char[Short.MAX_VALUE - APPEND.id() 
- 1]);
         Map<String, Entry<AlterConfigOp.OpType, String>> largeValueOfOps = 
toMap(entry("abc", entry(APPEND, largeValue)));
 
-        ControllerResult<ApiError> invalidConfigValueResult = 
manager.incrementalAlterConfig(MYTOPIC, largeValueOfOps, true);
+        ControllerResult<ApiError> invalidConfigValueResult = 
manager.incrementalAlterConfig(MYTOPIC, largeValueOfOps, true, false);
         assertEquals(Errors.INVALID_CONFIG, 
invalidConfigValueResult.response().error());
         assertEquals("The configuration value cannot be added because it 
exceeds the maximum value size of " + Short.MAX_VALUE + " bytes.",
                 invalidConfigValueResult.response().message());
@@ -230,7 +238,7 @@ public class ConfigurationControlManagerTest {
             build();
 
         ControllerResult<Map<ConfigResource, ApiError>> result = manager.
-            incrementalAlterConfigs(toMap(entry(MYTOPIC, toMap(entry("abc", 
entry(APPEND, "123,456,789"))))), true);
+            incrementalAlterConfigs(toMap(entry(MYTOPIC, toMap(entry("abc", 
entry(APPEND, "123,456,789"))))), true, false);
 
         assertEquals(ControllerResult.atomicOf(List.of(new 
ApiMessageAndVersion(
                 new 
ConfigRecord().setResourceType(TOPIC.id()).setResourceName("mytopic").
@@ -241,7 +249,7 @@ public class ConfigurationControlManagerTest {
 
         // It's ok for the appended value to be already present
         result = manager
-            .incrementalAlterConfigs(toMap(entry(MYTOPIC, toMap(entry("abc", 
entry(APPEND, "123,456"))))), true);
+            .incrementalAlterConfigs(toMap(entry(MYTOPIC, toMap(entry("abc", 
entry(APPEND, "123,456"))))), true, false);
         assertEquals(
             ControllerResult.atomicOf(List.of(), toMap(entry(MYTOPIC, 
ApiError.NONE))),
             result
@@ -249,7 +257,7 @@ public class ConfigurationControlManagerTest {
         RecordTestUtils.replayAll(manager, result.records());
 
         result = manager
-            .incrementalAlterConfigs(toMap(entry(MYTOPIC, toMap(entry("abc", 
entry(SUBTRACT, "123,456"))))), true);
+            .incrementalAlterConfigs(toMap(entry(MYTOPIC, toMap(entry("abc", 
entry(SUBTRACT, "123,456"))))), true, false);
         assertEquals(ControllerResult.atomicOf(List.of(new 
ApiMessageAndVersion(
                 new 
ConfigRecord().setResourceType(TOPIC.id()).setResourceName("mytopic").
                     setName("abc").setValue("789"), 
CONFIG_RECORD.highestSupportedVersion())),
@@ -259,7 +267,7 @@ public class ConfigurationControlManagerTest {
 
         // It's ok for the deleted value not to be present
         result = manager
-            .incrementalAlterConfigs(toMap(entry(MYTOPIC, toMap(entry("abc", 
entry(SUBTRACT, "123456"))))), true);
+            .incrementalAlterConfigs(toMap(entry(MYTOPIC, toMap(entry("abc", 
entry(SUBTRACT, "123456"))))), true, false);
         assertEquals(
             ControllerResult.atomicOf(List.of(), toMap(entry(MYTOPIC, 
ApiError.NONE))),
             result
@@ -282,6 +290,7 @@ public class ConfigurationControlManagerTest {
             incrementalAlterConfigs(toMap(entry(BROKER0, toMap(
                 entry("quux", entry(SET, "1")))),
                 entry(existingTopic, toMap(entry("def", entry(SET, 
"newVal"))))),
+                false,
                 false);
 
         assertEquals(ControllerResult.atomicOf(List.of(new 
ApiMessageAndVersion(
@@ -366,7 +375,8 @@ public class ConfigurationControlManagerTest {
                         entry("quux", entry(SET, "456")),
                         entry("broker.config.to.remove", entry(DELETE, null))
                 ))),
-                true));
+                true,
+                false));
     }
 
     private static class CheckForNullValuesPolicy implements AlterConfigPolicy 
{
@@ -408,7 +418,7 @@ public class ConfigurationControlManagerTest {
                 expectedRecords1, toMap(entry(MYTOPIC, ApiError.NONE))),
             manager.legacyAlterConfigs(
                 toMap(entry(MYTOPIC, toMap(entry("abc", "456"), entry("def", 
"901")))),
-                true));
+                true, false));
         for (ApiMessageAndVersion message : expectedRecords1) {
             manager.replay((ConfigRecord) message.message());
         }
@@ -422,7 +432,7 @@ public class ConfigurationControlManagerTest {
                 CONFIG_RECORD.highestSupportedVersion())),
             toMap(entry(MYTOPIC, ApiError.NONE))),
             manager.legacyAlterConfigs(toMap(entry(MYTOPIC, toMap(entry("def", 
"901")))),
-                true));
+                true, false));
     }
 
     @ParameterizedTest
@@ -438,7 +448,7 @@ public class ConfigurationControlManagerTest {
         Map<String, Entry<AlterConfigOp.OpType, String>> keyToOps =
             toMap(entry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, entry(SET, 
"3")));
         ConfigResource brokerConfigResource = new 
ConfigResource(ConfigResource.Type.BROKER, "1");
-        ControllerResult<ApiError> result = 
manager.incrementalAlterConfig(brokerConfigResource, keyToOps, true);
+        ControllerResult<ApiError> result = 
manager.incrementalAlterConfig(brokerConfigResource, keyToOps, true, false);
         assertEquals(Set.of(), manager.brokersWithConfigs());
 
         assertEquals(ControllerResult.atomicOf(List.of(new 
ApiMessageAndVersion(
@@ -503,7 +513,7 @@ public class ConfigurationControlManagerTest {
         result = manager.incrementalAlterConfig(new 
ConfigResource(ConfigResource.Type.BROKER, "1"),
             toMap(entry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG,
                 removal ? entry(DELETE, null) : entry(SET, "3"))),
-            true);
+            true, false);
         assertEquals(Errors.INVALID_CONFIG, result.response().error());
         assertEquals("Broker-level min.insync.replicas cannot be altered while 
ELR is enabled.",
             result.response().message());
@@ -512,7 +522,7 @@ public class ConfigurationControlManagerTest {
         result = manager.incrementalAlterConfig(new 
ConfigResource(ConfigResource.Type.BROKER, ""),
             toMap(entry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG,
                 removal ? entry(DELETE, null) : entry(SET, "3"))),
-            true);
+            true, false);
         if (removal) {
             assertEquals(Errors.INVALID_CONFIG, result.response().error());
             assertEquals("Cluster-level min.insync.replicas cannot be removed 
while ELR is enabled.",
@@ -574,20 +584,28 @@ public class ConfigurationControlManagerTest {
                 build();
 
         ControllerResult<ApiError> result = manager.incrementalAlterConfig(new 
ConfigResource(ConfigResource.Type.BROKER, "1"),
-                toMap(entry(ServerLogConfigs.CORDONED_LOG_DIRS_CONFIG, 
entry(SET, "*"))),
-                true);
+                toMap(entry(ServerLogConfigs.CORDONED_LOG_DIRS_CONFIG, 
entry(SET, ""))),
+                true, false);
+        assertEquals(enabled ? ApiError.NONE : 
DISABLED_CORDONED_LOG_DIRS_ERROR, result.response());
 
-        assertEquals(enabled ? ApiError.NONE : 
DISALLOWED_CORDONED_LOG_DIRS_ERROR, result.response());
+        result = manager.incrementalAlterConfig(new 
ConfigResource(ConfigResource.Type.BROKER, "1"),
+                toMap(entry(ServerLogConfigs.CORDONED_LOG_DIRS_CONFIG, 
entry(SET, "*"))),
+                true, false);
+        assertEquals(enabled ? INVALID_CORDONED_LOG_DIRS_ERROR : 
DISABLED_CORDONED_LOG_DIRS_ERROR, result.response());
     }
 
-    private FeatureControlManager createFeatureControlManager() {
+    private FeatureControlManager createFeatureControlManager(short level) {
         FeatureControlManager featureControlManager = new 
FeatureControlManager.Builder().build();
         featureControlManager.replay(new FeatureLevelRecord().
-            setName(MetadataVersion.FEATURE_NAME).
-            setFeatureLevel(MetadataVersion.LATEST_PRODUCTION.featureLevel()));
+                setName(MetadataVersion.FEATURE_NAME).
+                setFeatureLevel(level));
         return featureControlManager;
     }
 
+    private FeatureControlManager createFeatureControlManager() {
+        return 
createFeatureControlManager(MetadataVersion.LATEST_PRODUCTION.featureLevel());
+    }
+
     @Test
     public void testValidateAlterConfigWithInvalidExistingConfigs() {
         Set<String> validConfigs = Set.of("abc", "def");
@@ -613,6 +631,7 @@ public class ConfigurationControlManagerTest {
         ControllerResult<ApiError> result = manager.incrementalAlterConfig(
             MYTOPIC,
             toMap(entry("def", entry(SET, "newValue"))),
+            false,
             false);
 
         assertEquals(ApiError.NONE, result.response());
@@ -646,4 +665,69 @@ public class ConfigurationControlManagerTest {
         assertTrue(configs.containsKey("def"));
         assertFalse(configs.containsKey("invalid.config"), "Invalid config 
should not be in configData");
     }
+
+    @ParameterizedTest
+    @MethodSource("arguments")
+    public void testIsCordonedLogDirsDisabled(boolean expected, short level) {
+        ConfigurationControlManager manager = new 
ConfigurationControlManager.Builder().
+                setKafkaConfigSchema(SCHEMA).
+                setFeatureControl(createFeatureControlManager(level)).
+                build();
+
+        ConfigRecord cordonedConfig = new ConfigRecord().
+                setResourceType(BROKER.id()).setResourceName("0").
+                setName(CORDONED_LOG_DIRS_CONFIG);
+        ConfigRecord otherConfigConfig = new ConfigRecord().
+                setResourceType(BROKER.id()).setResourceName("0").
+                setName(LOG_DIRS_CONFIG);
+
+        assertEquals(expected, 
manager.isCordonedLogDirsDisabled(cordonedConfig));
+        assertFalse(manager.isCordonedLogDirsDisabled(otherConfigConfig));
+    }
+
+    public static Stream<Arguments> arguments() {
+        return Stream.of(
+                Arguments.of(false, 
MetadataVersion.latestProduction().featureLevel()),
+                Arguments.of(true, MetadataVersion.IBP_4_2_IV1.featureLevel())
+        );
+    }
+
+    @Test
+    public void testIsCordonedLogDirsInvalid() {
+        ConfigurationControlManager manager = new 
ConfigurationControlManager.Builder().
+                setKafkaConfigSchema(SCHEMA).
+                setFeatureControl(createFeatureControlManager()).
+                build();
+
+        ConfigRecord cr = new ConfigRecord().
+                setResourceType(BROKER.id()).setResourceName("0").
+                setName(CORDONED_LOG_DIRS_CONFIG);
+
+        // If the new value is null or empty string, the update is always 
allowed
+        for (String value : Arrays.asList("", "   ", null)) {
+            cr.setValue(value);
+            assertFalse(manager.isCordonedLogDirsInvalid(cr, null, false));
+            assertFalse(manager.isCordonedLogDirsInvalid(cr, null, true));
+            assertFalse(manager.isCordonedLogDirsInvalid(cr, "some/value", 
false));
+            assertFalse(manager.isCordonedLogDirsInvalid(cr, "some/value", 
true));
+        }
+
+        // If the new value is equal or a subset of the current value, the 
update is always allowed
+        cr.setValue("dir1");
+        assertFalse(manager.isCordonedLogDirsInvalid(cr, "dir1", false));
+        assertFalse(manager.isCordonedLogDirsInvalid(cr, "dir1", true));
+        for (String value : Arrays.asList("dir1", "dir2", "dir1,dir2", 
"dir2,dir1")) {
+            cr.setValue(value);
+            assertFalse(manager.isCordonedLogDirsInvalid(cr, "dir1,dir2", 
false));
+            assertFalse(manager.isCordonedLogDirsInvalid(cr, "dir1,dir2", 
true));
+        }
+
+        // If the new value is different, the update is only allowed if the 
request is forwarded
+        assertTrue(manager.isCordonedLogDirsInvalid(cr, "dir2", false));
+        assertFalse(manager.isCordonedLogDirsInvalid(cr, "dir2", true));
+        assertTrue(manager.isCordonedLogDirsInvalid(cr, "", false));
+        assertFalse(manager.isCordonedLogDirsInvalid(cr, "", true));
+        assertTrue(manager.isCordonedLogDirsInvalid(cr, null, false));
+        assertFalse(manager.isCordonedLogDirsInvalid(cr, null, true));
+    }
 }
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerIntegrationTestUtils.java
 
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerIntegrationTestUtils.java
index 77d58e8d270..06ebcdedbf6 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerIntegrationTestUtils.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerIntegrationTestUtils.java
@@ -201,7 +201,7 @@ public class QuorumControllerIntegrationTestUtils {
                     setReplicationFactor((short) replicationFactor));
         }
         CreateTopicsResponseData response =
-            controller.createTopics(ANONYMOUS_CONTEXT, request, 
describable).get();
+            controller.createTopics(ANONYMOUS_CONTEXT, request, describable, 
false).get();
         for (int i = 0; i < numTopics; i++) {
             CreatableTopicResult result = response.topics().find(prefix + i);
             if (result.errorCode() != Errors.TOPIC_ALREADY_EXISTS.code()) {
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java 
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
index 865066686f3..b1ca959b4bf 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
@@ -200,14 +200,14 @@ public class QuorumControllerTest {
     private void testConfigurationOperations(QuorumController controller) 
throws Throwable {
         assertEquals(Map.of(BROKER0, ApiError.NONE),
             controller.incrementalAlterConfigs(ANONYMOUS_CONTEXT, Map.of(
-                BROKER0, Map.of("baz", entry(SET, "123"))), true).get());
+                BROKER0, Map.of("baz", entry(SET, "123"))), true, 
false).get());
         assertEquals(Map.of(BROKER0,
             new ResultOrError<>(Map.of())),
             controller.describeConfigs(ANONYMOUS_CONTEXT, Map.of(
                 BROKER0, List.of())).get());
         assertEquals(Map.of(BROKER0, ApiError.NONE),
             controller.incrementalAlterConfigs(ANONYMOUS_CONTEXT, Map.of(
-                BROKER0, Map.of("baz", entry(SET, "123"))), false).get());
+                BROKER0, Map.of("baz", entry(SET, "123"))), false, 
false).get());
         assertEquals(Map.of(BROKER0, new ResultOrError<>(Map.of("baz", 
"123"))),
             controller.describeConfigs(ANONYMOUS_CONTEXT, Map.of(
                 BROKER0, List.of())).get());
@@ -245,7 +245,7 @@ public class QuorumControllerTest {
         clientEnv.raftClients().forEach(m -> m.setMaxReadOffset(1L));
         CompletableFuture<Map<ConfigResource, ApiError>> future1 =
             controller.incrementalAlterConfigs(ANONYMOUS_CONTEXT, Map.of(
-                BROKER0, Map.of("baz", entry(SET, "123"))), false);
+                BROKER0, Map.of("baz", entry(SET, "123"))), false, false);
         assertFalse(future1.isDone());
         assertEquals(Map.of(BROKER0,
             new ResultOrError<>(Map.of())),
@@ -302,7 +302,7 @@ public class QuorumControllerTest {
                         setReplicationFactor(replicationFactor))));
             CreateTopicsResponseData createTopicsResponseData = 
active.createTopics(
                 ANONYMOUS_CONTEXT, createTopicsRequestData,
-                Set.of("foo")).get();
+                Set.of("foo"), false).get();
             assertEquals(Errors.NONE, 
Errors.forCode(createTopicsResponseData.topics().find("foo").errorCode()));
             Uuid topicIdFoo = 
createTopicsResponseData.topics().find("foo").topicId();
 
@@ -423,7 +423,7 @@ public class QuorumControllerTest {
                         setReplicationFactor(replicationFactor))));
             CreateTopicsResponseData createTopicsResponseData = 
active.createTopics(
                 ANONYMOUS_CONTEXT, createTopicsRequestData,
-                Set.of("foo")).get();
+                Set.of("foo"), false).get();
             assertEquals(Errors.NONE, 
Errors.forCode(createTopicsResponseData.topics().find("foo").errorCode()));
             Uuid topicIdFoo = 
createTopicsResponseData.topics().find("foo").topicId();
             ConfigRecord configRecord = new ConfigRecord()
@@ -562,7 +562,7 @@ public class QuorumControllerTest {
                         setReplicationFactor(replicationFactor))));
             CreateTopicsResponseData createTopicsResponseData = 
active.createTopics(
                 ANONYMOUS_CONTEXT, createTopicsRequestData,
-                Set.of("foo")).get();
+                Set.of("foo"), false).get();
             assertEquals(Errors.NONE, 
Errors.forCode(createTopicsResponseData.topics().find("foo").errorCode()));
             Uuid topicIdFoo = 
createTopicsResponseData.topics().find("foo").topicId();
 
@@ -659,7 +659,7 @@ public class QuorumControllerTest {
                 )));
             CreateTopicsResponseData createTopicsResponseData = 
active.createTopics(
                 ANONYMOUS_CONTEXT, createTopicsRequestData,
-                Set.of("foo", "bar")).get();
+                Set.of("foo", "bar"), false).get();
             assertEquals(Errors.NONE, 
Errors.forCode(createTopicsResponseData.topics().find("foo").errorCode()));
             assertEquals(Errors.NONE, 
Errors.forCode(createTopicsResponseData.topics().find("bar").errorCode()));
             Uuid topicIdFoo = 
createTopicsResponseData.topics().find("foo").topicId();
@@ -708,7 +708,7 @@ public class QuorumControllerTest {
             // First, decrease the min ISR config to 1. This should clear the 
ELR fields.
             ControllerResult<Map<ConfigResource, ApiError>> result = 
active.configurationControl().incrementalAlterConfigs(toMap(
                     entry(new ConfigResource(TOPIC, "foo"), 
toMap(entry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, entry(SET, "1"))))),
-                true);
+                true, false);
             assertEquals(2, result.records().size(), 
result.records().toString());
             RecordTestUtils.replayAll(active.configurationControl(), 
List.of(result.records().get(0)));
             RecordTestUtils.replayAll(active.replicationControl(), 
List.of(result.records().get(1)));
@@ -724,7 +724,7 @@ public class QuorumControllerTest {
 
             result = 
active.configurationControl().incrementalAlterConfigs(toMap(
                     entry(new ConfigResource(BROKER, ""), 
toMap(entry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, entry(SET, "1"))))),
-                true);
+                true, false);
             assertEquals(2, result.records().size(), 
result.records().toString());
             RecordTestUtils.replayAll(active.configurationControl(), 
List.of(result.records().get(0)));
             RecordTestUtils.replayAll(active.replicationControl(), 
List.of(result.records().get(1)));
@@ -783,7 +783,7 @@ public class QuorumControllerTest {
                     new 
CreatableTopic().setName("foo").setNumPartitions(numberOfPartitions).
                         setReplicationFactor(replicationFactor))));
             CreateTopicsResponseData createTopicsResponseData = 
active.createTopics(
-                ANONYMOUS_CONTEXT, createTopicsRequestData, 
Set.of("foo")).get();
+                ANONYMOUS_CONTEXT, createTopicsRequestData, Set.of("foo"), 
false).get();
             assertEquals(Errors.NONE, 
Errors.forCode(createTopicsResponseData.topics().find("foo").errorCode()));
             Uuid topicIdFoo = 
createTopicsResponseData.topics().find("foo").topicId();
 
@@ -1009,18 +1009,18 @@ public class QuorumControllerTest {
                             setReplicationFactor((short) 1))));
             assertEquals(Errors.INVALID_REPLICATION_FACTOR.code(), 
active.createTopics(
                 ANONYMOUS_CONTEXT,
-                createTopicsRequestData, Set.of("foo")).get().
+                createTopicsRequestData, Set.of("foo"), false).get().
                     topics().find("foo").errorCode());
             assertEquals("Unable to replicate the partition 1 time(s): All 
brokers " +
                 "are currently fenced, or have all their log directories 
cordoned.", active.createTopics(ANONYMOUS_CONTEXT,
-                    createTopicsRequestData, Set.of("foo")).
+                    createTopicsRequestData, Set.of("foo"), false).
                         get().topics().find("foo").errorMessage());
             assertEquals(new BrokerHeartbeatReply(true, false, false, false),
                 active.processBrokerHeartbeat(ANONYMOUS_CONTEXT, new 
BrokerHeartbeatRequestData().
                         setWantFence(false).setBrokerEpoch(6L).setBrokerId(0).
                         setCurrentMetadataOffset(100000L)).get());
             assertEquals(Errors.NONE.code(), 
active.createTopics(ANONYMOUS_CONTEXT,
-                createTopicsRequestData, Set.of("foo")).
+                createTopicsRequestData, Set.of("foo"), false).
                     get().topics().find("foo").errorCode());
             CompletableFuture<TopicIdPartition> topicPartitionFuture = 
active.appendReadEvent(
                 "debugGetPartition", OptionalLong.empty(), () -> {
@@ -1124,7 +1124,7 @@ public class QuorumControllerTest {
                                     new CreatableReplicaAssignment().
                                         setPartitionIndex(1).
                                         setBrokerIds(List.of(1, 2, 0)))))))),
-                Set.of("foo")).get();
+                Set.of("foo"), false).get();
             fooId = fooData.topics().find("foo").topicId();
             active.allocateProducerIds(ANONYMOUS_CONTEXT,
                 new 
AllocateProducerIdsRequestData().setBrokerId(0).setBrokerEpoch(brokerEpochs.get(0))).get();
@@ -1280,7 +1280,7 @@ public class QuorumControllerTest {
                 controller.createTopics(context0, new 
CreateTopicsRequestData().setTimeoutMs(0).
                     setTopics(new CreatableTopicCollection(Set.of(
                         new CreatableTopic().setName("foo")))),
-                    Set.of());
+                    Set.of(), false);
             CompletableFuture<Map<Uuid, ApiError>> deleteFuture =
                 controller.deleteTopics(context0, List.of(Uuid.ZERO_UUID));
             CompletableFuture<Map<String, ResultOrError<Uuid>>> 
findTopicIdsFuture =
@@ -1337,7 +1337,7 @@ public class QuorumControllerTest {
             CountDownLatch countDownLatch = pause(controller);
             CompletableFuture<CreateTopicsResponseData> createFuture =
                 controller.createTopics(ANONYMOUS_CONTEXT, new 
CreateTopicsRequestData().
-                    setTimeoutMs(120000), Set.of());
+                    setTimeoutMs(120000), Set.of(), false);
             CompletableFuture<Map<Uuid, ApiError>> deleteFuture =
                 controller.deleteTopics(ANONYMOUS_CONTEXT, List.of());
             CompletableFuture<Map<String, ResultOrError<Uuid>>> 
findTopicIdsFuture =
@@ -1379,7 +1379,7 @@ public class QuorumControllerTest {
                     new CreatableTopic().setName("foo").
                         setReplicationFactor((short) 3).
                         setNumPartitions(1)))),
-                Set.of("foo")).get();
+                Set.of("foo"), false).get();
             ConfigResourceExistenceChecker checker =
                 active.new ConfigResourceExistenceChecker();
             // A ConfigResource with type=BROKER and name=(empty string) 
represents
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
 
b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
index 19e4545cf06..cb7a74b334c 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
@@ -293,7 +293,7 @@ public class ReplicationControlManagerTest {
             request.topics().add(topic);
             ControllerRequestContext requestContext = 
anonymousContextFor(ApiKeys.CREATE_TOPICS);
             ControllerResult<CreateTopicsResponseData> result =
-                replicationControl.createTopics(requestContext, request, 
Set.of(name));
+                replicationControl.createTopics(requestContext, request, 
Set.of(name), false);
             CreatableTopicResult topicResult = 
result.response().topics().find(name);
             assertNotNull(topicResult);
             assertEquals(expectedErrorCode, topicResult.errorCode());
@@ -332,7 +332,7 @@ public class ReplicationControlManagerTest {
             request.topics().add(topic);
             ControllerRequestContext requestContext = 
anonymousContextFor(ApiKeys.CREATE_TOPICS);
             ControllerResult<CreateTopicsResponseData> result =
-                replicationControl.createTopics(requestContext, request, 
Set.of(name));
+                replicationControl.createTopics(requestContext, request, 
Set.of(name), false);
             CreatableTopicResult topicResult = 
result.response().topics().find(name);
             assertNotNull(topicResult);
             assertEquals(expectedErrorCode, topicResult.errorCode());
@@ -598,7 +598,7 @@ public class ReplicationControlManagerTest {
         ControllerRequestContext requestContext = 
anonymousContextFor(ApiKeys.CREATE_TOPICS);
         PolicyViolationException error = assertThrows(
                 PolicyViolationException.class,
-                () -> replicationControl.createTopics(requestContext, request, 
Set.of("foo", "bar", "baz")));
+                () -> replicationControl.createTopics(requestContext, request, 
Set.of("foo", "bar", "baz"), false));
         assertEquals(error.getMessage(), "Excessively large number of 
partitions per request.");
     }
 
@@ -631,7 +631,7 @@ public class ReplicationControlManagerTest {
 
         ControllerRequestContext requestContext = 
anonymousContextFor(ApiKeys.CREATE_TOPICS);
         ControllerResult<CreateTopicsResponseData> result =
-            replicationControl.createTopics(requestContext, request, 
Set.of("foo"));
+            replicationControl.createTopics(requestContext, request, 
Set.of("foo"), false);
         CreateTopicsResponseData expectedResponse = new 
CreateTopicsResponseData();
         expectedResponse.topics().add(new 
CreatableTopicResult().setName("foo").
             setErrorCode(INVALID_REPLICATION_FACTOR.code()).
@@ -644,7 +644,7 @@ public class ReplicationControlManagerTest {
         ctx.inControlledShutdownBrokers(0);
 
         ControllerResult<CreateTopicsResponseData> result2 =
-            replicationControl.createTopics(requestContext, request, 
Set.of("foo"));
+            replicationControl.createTopics(requestContext, request, 
Set.of("foo"), false);
         CreateTopicsResponseData expectedResponse2 = new 
CreateTopicsResponseData();
         expectedResponse2.topics().add(new 
CreatableTopicResult().setName("foo").
             setErrorCode(INVALID_REPLICATION_FACTOR.code()).
@@ -656,7 +656,7 @@ public class ReplicationControlManagerTest {
         ctx.unfenceBrokers(0, 1, 2);
 
         ControllerResult<CreateTopicsResponseData> result3 =
-            replicationControl.createTopics(requestContext, request, 
Set.of("foo"));
+            replicationControl.createTopics(requestContext, request, 
Set.of("foo"), false);
         CreateTopicsResponseData expectedResponse3 = new 
CreateTopicsResponseData();
         expectedResponse3.topics().add(new 
CreatableTopicResult().setName("foo").
             setNumPartitions(1).setReplicationFactor((short) 3).
@@ -674,7 +674,7 @@ public class ReplicationControlManagerTest {
             replicationControl.getPartition(
                 ((TopicRecord) result3.records().get(0).message()).topicId(), 
0));
         ControllerResult<CreateTopicsResponseData> result4 =
-                replicationControl.createTopics(requestContext, request, 
Set.of("foo"));
+                replicationControl.createTopics(requestContext, request, 
Set.of("foo"), false);
         CreateTopicsResponseData expectedResponse4 = new 
CreateTopicsResponseData();
         expectedResponse4.topics().add(new 
CreatableTopicResult().setName("foo").
                 setErrorCode(Errors.TOPIC_ALREADY_EXISTS.code()).
@@ -694,7 +694,7 @@ public class ReplicationControlManagerTest {
         ControllerRequestContext requestContext =
             
anonymousContextWithMutationQuotaExceededFor(ApiKeys.CREATE_TOPICS);
         ControllerResult<CreateTopicsResponseData> result =
-            replicationControl.createTopics(requestContext, request, 
Set.of("foo"));
+            replicationControl.createTopics(requestContext, request, 
Set.of("foo"), false);
         CreateTopicsResponseData expectedResponse = new 
CreateTopicsResponseData();
         expectedResponse.topics().add(new 
CreatableTopicResult().setName("foo").
             setErrorCode(THROTTLING_QUOTA_EXCEEDED.code()).
@@ -717,7 +717,7 @@ public class ReplicationControlManagerTest {
 
         ControllerRequestContext requestContext = 
anonymousContextFor(ApiKeys.CREATE_TOPICS);
         ControllerResult<CreateTopicsResponseData> result =
-            replicationControl.createTopics(requestContext, request, 
Set.of("foo"));
+            replicationControl.createTopics(requestContext, request, 
Set.of("foo"), false);
 
         CreateTopicsResponseData expectedResponse = new 
CreateTopicsResponseData();
         expectedResponse.topics().add(new 
CreatableTopicResult().setName("foo").
@@ -770,7 +770,7 @@ public class ReplicationControlManagerTest {
 
         ControllerRequestContext requestContext = 
anonymousContextFor(ApiKeys.CREATE_TOPICS);
         ControllerResult<CreateTopicsResponseData> result1 =
-            replicationControl.createTopics(requestContext, request1, 
Set.of("foo"));
+            replicationControl.createTopics(requestContext, request1, 
Set.of("foo"), false);
         assertEquals((short) 0, 
result1.response().topics().find("foo").errorCode());
 
         List<ApiMessageAndVersion> records1 = result1.records();
@@ -803,7 +803,7 @@ public class ReplicationControlManagerTest {
             .setConfigs(invalidConfigs));
 
         ControllerResult<CreateTopicsResponseData> result2 =
-            replicationControl.createTopics(requestContext, request2, 
Set.of("bar"));
+            replicationControl.createTopics(requestContext, request2, 
Set.of("bar"), false);
         assertEquals(Errors.INVALID_CONFIG.code(), 
result2.response().topics().find("bar").errorCode());
         assertEquals(
             "Null value not supported for topic configs: foo",
@@ -816,7 +816,7 @@ public class ReplicationControlManagerTest {
             .setConfigs(validConfigs));
 
         ControllerResult<CreateTopicsResponseData> result3 =
-            replicationControl.createTopics(requestContext, request3, 
Set.of("baz"));
+            replicationControl.createTopics(requestContext, request3, 
Set.of("baz"), false);
         assertEquals(INVALID_REPLICATION_FACTOR.code(), 
result3.response().topics().find("baz").errorCode());
         assertEquals(List.of(), result3.records());
 
@@ -835,7 +835,7 @@ public class ReplicationControlManagerTest {
         request4Topics.add(batchedTopic1);
         request4Topics.add(batchedTopic2);
         ControllerResult<CreateTopicsResponseData> result4 =
-            replicationControl.createTopics(requestContext, request4, 
request4Topics);
+            replicationControl.createTopics(requestContext, request4, 
request4Topics, false);
 
         assertEquals(Errors.NONE.code(), 
result4.response().topics().find(batchedTopic1).errorCode());
         assertEquals(INVALID_REPLICATION_FACTOR.code(), 
result4.response().topics().find(batchedTopic2).errorCode());
@@ -867,7 +867,7 @@ public class ReplicationControlManagerTest {
             
anonymousContextWithMutationQuotaExceededFor(ApiKeys.CREATE_TOPICS) :
             anonymousContextFor(ApiKeys.CREATE_TOPICS);
         ControllerResult<CreateTopicsResponseData> result =
-            ctx.replicationControl.createTopics(requestContext, request, 
Set.of("foo"));
+            ctx.replicationControl.createTopics(requestContext, request, 
Set.of("foo"), false);
         assertEquals(0, result.records().size());
         CreatableTopicResult topicResult = 
result.response().topics().find("foo");
         if (mutationQuotaExceeded) {
@@ -887,7 +887,7 @@ public class ReplicationControlManagerTest {
             setNumPartitions(1).setReplicationFactor((short) 4));
         ControllerRequestContext requestContext = 
anonymousContextFor(ApiKeys.CREATE_TOPICS);
         ControllerResult<CreateTopicsResponseData> result =
-            ctx.replicationControl.createTopics(requestContext, request, 
Set.of("foo"));
+            ctx.replicationControl.createTopics(requestContext, request, 
Set.of("foo"), false);
         assertEquals(0, result.records().size());
         CreateTopicsResponseData expectedResponse = new 
CreateTopicsResponseData();
         expectedResponse.topics().add(new 
CreatableTopicResult().setName("foo").
@@ -1493,7 +1493,7 @@ public class ReplicationControlManagerTest {
         ctx.unfenceBrokers(0, 1);
         ControllerRequestContext createTopicsRequestContext = 
anonymousContextFor(ApiKeys.CREATE_TOPICS);
         ControllerResult<CreateTopicsResponseData> createResult =
-            replicationControl.createTopics(createTopicsRequestContext, 
request, Set.of("foo"));
+            replicationControl.createTopics(createTopicsRequestContext, 
request, Set.of("foo"), false);
         CreateTopicsResponseData expectedResponse = new 
CreateTopicsResponseData();
         Uuid topicId = createResult.response().topics().find("foo").topicId();
         expectedResponse.topics().add(new 
CreatableTopicResult().setName("foo").
@@ -1562,7 +1562,7 @@ public class ReplicationControlManagerTest {
         ControllerRequestContext createTopicsRequestContext =
             anonymousContextFor(ApiKeys.CREATE_TOPICS);
         ControllerResult<CreateTopicsResponseData> createResult =
-            replicationControl.createTopics(createTopicsRequestContext, 
request, Set.of("foo"));
+            replicationControl.createTopics(createTopicsRequestContext, 
request, Set.of("foo"), false);
         CreatableTopicResult createdTopic = 
createResult.response().topics().find("foo");
         assertEquals(NONE.code(), createdTopic.errorCode());
         ctx.replay(createResult.records());
@@ -1596,7 +1596,7 @@ public class ReplicationControlManagerTest {
         ctx.unfenceBrokers(0, 1, 3);
         ControllerRequestContext requestContext = 
anonymousContextFor(ApiKeys.CREATE_TOPICS);
         ControllerResult<CreateTopicsResponseData> createTopicResult = 
replicationControl.
-            createTopics(requestContext, request, Set.of("foo", "bar", "quux", 
"foo2"));
+            createTopics(requestContext, request, Set.of("foo", "bar", "quux", 
"foo2"), false);
         ctx.replay(createTopicResult.records());
         List<CreatePartitionsTopic> topics = new ArrayList<>();
         topics.add(new CreatePartitionsTopic().
@@ -1680,7 +1680,7 @@ public class ReplicationControlManagerTest {
         ControllerRequestContext createTopicsRequestContext =
             anonymousContextFor(ApiKeys.CREATE_TOPICS);
         ControllerResult<CreateTopicsResponseData> createResult =
-            replicationControl.createTopics(createTopicsRequestContext, 
request, Set.of("foo"));
+            replicationControl.createTopics(createTopicsRequestContext, 
request, Set.of("foo"), false);
         CreatableTopicResult createdTopic = 
createResult.response().topics().find("foo");
         assertEquals(NONE.code(), createdTopic.errorCode());
         ctx.replay(createResult.records());
@@ -1720,7 +1720,7 @@ public class ReplicationControlManagerTest {
         ControllerRequestContext requestContext =
                 anonymousContextFor(ApiKeys.CREATE_TOPICS);
         ControllerResult<CreateTopicsResponseData> createTopicResult = 
replicationControl.
-            createTopics(requestContext, request, Set.of("foo"));
+            createTopics(requestContext, request, Set.of("foo"), false);
         ctx.replay(createTopicResult.records());
 
         ctx.registerBrokers(0, 1);
@@ -1757,7 +1757,7 @@ public class ReplicationControlManagerTest {
 
         ControllerRequestContext requestContext = 
anonymousContextFor(ApiKeys.CREATE_TOPICS);
         ControllerResult<CreateTopicsResponseData> result =
-            replicationControl.createTopics(requestContext, request, 
Set.of("foo"));
+            replicationControl.createTopics(requestContext, request, 
Set.of("foo"), false);
         ctx.replay(result.records());
 
         List<CreatePartitionsTopic> topics = List.of(new 
CreatePartitionsTopic().
@@ -2890,19 +2890,19 @@ public class ReplicationControlManagerTest {
                 Map.of(new ConfigResource(ConfigResource.Type.BROKER, ""),
                     Map.of(TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG,
                         new 
AbstractMap.SimpleImmutableEntry<>(AlterConfigOp.OpType.SET, "true"))),
-                true).records());
+                true, false).records());
         } else if (uncleanConfig.equals("dynamic_node")) {
             ctx.replay(ctx.configurationControl.incrementalAlterConfigs(
                 Map.of(new ConfigResource(ConfigResource.Type.BROKER, "0"),
                     Map.of(TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG,
                         new 
AbstractMap.SimpleImmutableEntry<>(AlterConfigOp.OpType.SET, "true"))),
-                true).records());
+                true, false).records());
         } else if (uncleanConfig.equals("dynamic_topic")) {
             ctx.replay(ctx.configurationControl.incrementalAlterConfigs(
                 Map.of(new ConfigResource(ConfigResource.Type.TOPIC, "foo"),
                     Map.of(TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG,
                         new 
AbstractMap.SimpleImmutableEntry<>(AlterConfigOp.OpType.SET, "true"))),
-                true).records());
+                true, false).records());
         }
         ControllerResult<Boolean> balanceResult = 
replication.maybeElectUncleanLeaders();
         assertFalse(balanceResult.response());
@@ -3419,8 +3419,15 @@ public class ReplicationControlManagerTest {
         Uuid dir2b1 = Uuid.fromString("yh3acnzGSeurSTj8aIhOjw");
         ctx.registerBrokersWithDirs(b1, List.of(dir1b1, dir2b1));
         ctx.unfenceBrokers(b1);
-        assertEquals(List.of(), 
ctx.clusterControl.registration(b1).cordonedDirectories());
+        assertNull(ctx.clusterControl.registration(b1).cordonedDirectories());
+
+        // If cordonedDirs is null, it's a no-op
         List<ApiMessageAndVersion> records = new ArrayList<>();
+        ctx.replicationControl.handleDirectoriesCordoned(b1, 
defaultBrokerEpoch(b1), null, records);
+        assertTrue(records.isEmpty());
+
+        // Cordon dir1b1, this will emit a BrokerRegistrationChangeRecord
+        records = new ArrayList<>();
         ctx.replicationControl.handleDirectoriesCordoned(b1, 
defaultBrokerEpoch(b1), List.of(dir1b1), records);
         assertEquals(
                 List.of(new ApiMessageAndVersion(new 
BrokerRegistrationChangeRecord()
@@ -3430,6 +3437,35 @@ public class ReplicationControlManagerTest {
         );
         ctx.replay(records);
         assertEquals(List.of(dir1b1), 
ctx.clusterControl.registration(b1).cordonedDirectories());
+
+        // Cordon dir1b1 again, this time no records are emitted
+        records = new ArrayList<>();
+        ctx.replicationControl.handleDirectoriesCordoned(b1, 
defaultBrokerEpoch(b1), List.of(dir1b1), records);
+        assertTrue(records.isEmpty());
+
+        // Cordon dir2b1, this will emit a BrokerRegistrationChangeRecord
+        records = new ArrayList<>();
+        ctx.replicationControl.handleDirectoriesCordoned(b1, 
defaultBrokerEpoch(b1), List.of(dir2b1, dir1b1), records);
+        assertEquals(
+                List.of(new ApiMessageAndVersion(new 
BrokerRegistrationChangeRecord()
+                        .setBrokerId(b1).setBrokerEpoch(defaultBrokerEpoch(b1))
+                        .setCordonedLogDirs(List.of(dir2b1, dir1b1)), (short) 
3)),
+                filter(records, BrokerRegistrationChangeRecord.class)
+        );
+        ctx.replay(records);
+        assertEquals(List.of(dir2b1, dir1b1), 
ctx.clusterControl.registration(b1).cordonedDirectories());
+
+        // Uncordon all directories, this will emit a 
BrokerRegistrationChangeRecord
+        records = new ArrayList<>();
+        ctx.replicationControl.handleDirectoriesCordoned(b1, 
defaultBrokerEpoch(b1), List.of(), records);
+        assertEquals(
+                List.of(new ApiMessageAndVersion(new 
BrokerRegistrationChangeRecord()
+                        .setBrokerId(b1).setBrokerEpoch(defaultBrokerEpoch(b1))
+                        .setCordonedLogDirs(List.of()), (short) 3)),
+                filter(records, BrokerRegistrationChangeRecord.class)
+        );
+        ctx.replay(records);
+        assertEquals(List.of(), 
ctx.clusterControl.registration(b1).cordonedDirectories());
     }
 
     @ParameterizedTest
@@ -3459,13 +3495,13 @@ public class ReplicationControlManagerTest {
             ctx.replay(ctx.configurationControl.legacyAlterConfigs(
                 Map.of(configResource,
                     Map.of(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "1")),
-                false).records());
+                false, false).records());
         } else {
             ctx.replay(ctx.configurationControl.incrementalAlterConfigs(
                 Map.of(configResource,
                     Map.of(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG,
                         new 
AbstractMap.SimpleImmutableEntry<>(AlterConfigOp.OpType.SET, "1"))),
-                false).records());
+                false, false).records());
         }
         assertArrayEquals(new int[]{}, 
ctx.replicationControl.getPartition(fooId, 0).elr);
         if (clusterLevel) {
diff --git 
a/metadata/src/test/java/org/apache/kafka/image/node/ClusterImageBrokersNodeTest.java
 
b/metadata/src/test/java/org/apache/kafka/image/node/ClusterImageBrokersNodeTest.java
index ab342b26087..1fed0e0dbad 100644
--- 
a/metadata/src/test/java/org/apache/kafka/image/node/ClusterImageBrokersNodeTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/image/node/ClusterImageBrokersNodeTest.java
@@ -70,7 +70,7 @@ public class ClusterImageBrokersNodeTest {
             "inControlledShutdown=false, " +
             "isMigratingZkBroker=false, " +
             "directories=[JsnDDNVyTL289kYk6sPzig, anCdBWcFTlu8gE1wP6bh3g], " +
-            "cordonedDirectories=[])",
+            "cordonedDirectories=null)",
             child.stringify());
     }
 
diff --git 
a/metadata/src/test/java/org/apache/kafka/metadata/BrokerRegistrationTest.java 
b/metadata/src/test/java/org/apache/kafka/metadata/BrokerRegistrationTest.java
index 577338203ec..c661658c909 100644
--- 
a/metadata/src/test/java/org/apache/kafka/metadata/BrokerRegistrationTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/metadata/BrokerRegistrationTest.java
@@ -40,6 +40,7 @@ import java.util.stream.Stream;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 @Timeout(value = 40)
@@ -87,6 +88,17 @@ public class BrokerRegistrationTest {
             setIsMigratingZkBroker(true).
             setDirectories(List.of(Uuid.fromString("r4HpEsMuST6nQ4rznIEJVA"))).
             
setCordonedDirectories(List.of(Uuid.fromString("r4HpEsMuST6nQ4rznIEJVA"))).
+            build(),
+        new BrokerRegistration.Builder().
+            setId(4).
+            setEpoch(0).
+            setIncarnationId(Uuid.fromString("Xkq84F5bTsSEwHqceVxcOQ")).
+            setListeners(List.of(new Endpoint("INTERNAL", 
SecurityProtocol.PLAINTEXT, "localhost", 9094))).
+            setSupportedFeatures(Map.of("foo", VersionRange.of((short) 1, 
(short) 2))).
+            setRack(Optional.empty()).
+            setFenced(true).
+            setDirectories(List.of(Uuid.fromString("r4HpEsMuST6nQ4rznIEJVA"))).
+            setCordonedDirectories(null).
             build());
 
     @Test
@@ -95,6 +107,7 @@ public class BrokerRegistrationTest {
         assertEquals(1, REGISTRATIONS.get(1).id());
         assertEquals(2, REGISTRATIONS.get(2).id());
         assertEquals(3, REGISTRATIONS.get(3).id());
+        assertEquals(4, REGISTRATIONS.get(4).id());
     }
 
     @Test
@@ -106,10 +119,13 @@ public class BrokerRegistrationTest {
         assertNotEquals(REGISTRATIONS.get(3), REGISTRATIONS.get(0));
         assertNotEquals(REGISTRATIONS.get(3), REGISTRATIONS.get(1));
         assertNotEquals(REGISTRATIONS.get(3), REGISTRATIONS.get(2));
+        assertNotEquals(REGISTRATIONS.get(4), REGISTRATIONS.get(0));
+        assertNotEquals(REGISTRATIONS.get(4), REGISTRATIONS.get(3));
         assertEquals(REGISTRATIONS.get(0), REGISTRATIONS.get(0));
         assertEquals(REGISTRATIONS.get(1), REGISTRATIONS.get(1));
         assertEquals(REGISTRATIONS.get(2), REGISTRATIONS.get(2));
         assertEquals(REGISTRATIONS.get(3), REGISTRATIONS.get(3));
+        assertEquals(REGISTRATIONS.get(4), REGISTRATIONS.get(4));
     }
 
     @Test
@@ -119,14 +135,14 @@ public class BrokerRegistrationTest {
             "listenerName='INTERNAL', securityProtocol=PLAINTEXT, " +
             "host='localhost', port=9091)], supportedFeatures={foo: 1-2}, " +
             "rack=Optional.empty, fenced=true, inControlledShutdown=false, 
isMigratingZkBroker=false, " +
-            "directories=[], cordonedDirectories=[])",
+            "directories=[], cordonedDirectories=null)",
             REGISTRATIONS.get(1).toString());
         assertEquals("BrokerRegistration(id=2, epoch=0, " +
             "incarnationId=eY7oaG1RREie5Kk9uy1l6g, listeners=[Endpoint(" +
             "listenerName='INTERNAL', securityProtocol=PLAINTEXT, " +
             "host='localhost', port=9092)], supportedFeatures={bar: 1-4, foo: 
2-3}, " +
             "rack=Optional[myrack], fenced=false, inControlledShutdown=true, 
isMigratingZkBroker=false, " +
-            "directories=[], cordonedDirectories=[])",
+            "directories=[], cordonedDirectories=null)",
             REGISTRATIONS.get(2).toString());
         assertEquals("BrokerRegistration(id=3, epoch=0, " +
             "incarnationId=1t8VyWx2TCSTpUWuqj-FOw, listeners=[Endpoint(" +
@@ -135,6 +151,13 @@ public class BrokerRegistrationTest {
             "rack=Optional.empty, fenced=false, inControlledShutdown=true, 
isMigratingZkBroker=true, " +
             "directories=[r4HpEsMuST6nQ4rznIEJVA], 
cordonedDirectories=[r4HpEsMuST6nQ4rznIEJVA])",
             REGISTRATIONS.get(3).toString());
+        assertEquals("BrokerRegistration(id=4, epoch=0, " +
+            "incarnationId=Xkq84F5bTsSEwHqceVxcOQ, listeners=[Endpoint(" +
+            "listenerName='INTERNAL', securityProtocol=PLAINTEXT, " +
+            "host='localhost', port=9094)], supportedFeatures={foo: 1-2}, " +
+            "rack=Optional.empty, fenced=true, inControlledShutdown=false, 
isMigratingZkBroker=false, " +
+            "directories=[r4HpEsMuST6nQ4rznIEJVA], cordonedDirectories=null)",
+            REGISTRATIONS.get(4).toString());
     }
 
     @Test
@@ -143,6 +166,7 @@ public class BrokerRegistrationTest {
         testRoundTrip(REGISTRATIONS.get(1));
         testRoundTrip(REGISTRATIONS.get(2));
         testRoundTrip(REGISTRATIONS.get(3));
+        testRoundTrip(REGISTRATIONS.get(4));
     }
 
     private void testRoundTrip(BrokerRegistration registration) {
@@ -228,36 +252,36 @@ public class BrokerRegistrationTest {
 
     @Test
     void testHasUncordonedDirs() {
-        BrokerRegistration registration = new BrokerRegistration.Builder().
-                setId(0).
-                setEpoch(0).
-                setIncarnationId(Uuid.fromString("m6CiJvfITZeKVC6UuhlZew")).
-                setListeners(List.of(new Endpoint("INTERNAL", 
SecurityProtocol.PLAINTEXT, "localhost", 9090))).
-                setSupportedFeatures(Map.of("foo", VersionRange.of((short) 1, 
(short) 2))).
-                setRack(Optional.empty()).
-                setFenced(false).
-                setInControlledShutdown(false).
-                setDirectories(List.of(
-                        Uuid.fromString("dir1G6EtuR1OTdAzFw1AFQ")
-                )).
-                build();
-        assertTrue(registration.hasUncordonedDirs());
-        registration = new BrokerRegistration.Builder().
-                setId(0).
-                setEpoch(0).
-                setIncarnationId(Uuid.fromString("m6CiJvfITZeKVC6UuhlZew")).
-                setListeners(List.of(new Endpoint("INTERNAL", 
SecurityProtocol.PLAINTEXT, "localhost", 9090))).
-                setSupportedFeatures(Map.of("foo", VersionRange.of((short) 1, 
(short) 2))).
-                setRack(Optional.empty()).
-                setFenced(false).
-                setInControlledShutdown(false).
-                setDirectories(List.of(
-                        Uuid.fromString("dir1G6EtuR1OTdAzFw1AFQ")
-                )).
-                setCordonedDirectories(List.of(
-                        Uuid.fromString("dir1G6EtuR1OTdAzFw1AFQ")
-                )).
-                build();
-        assertFalse(registration.hasUncordonedDirs());
+        assertTrue(REGISTRATIONS.get(0).hasUncordonedDirs());
+        assertFalse(REGISTRATIONS.get(3).hasUncordonedDirs());
+        assertTrue(REGISTRATIONS.get(4).hasUncordonedDirs());
+    }
+
+    @Test
+    void testCordonedDirChanged() {
+        assertTrue(REGISTRATIONS.get(0).cordonedDirChanged(List.of()));
+        
assertTrue(REGISTRATIONS.get(0).cordonedDirChanged(List.of(Uuid.fromString("r4HpEsMuST6nQ4rznIEJVA"))));
+        
assertFalse(REGISTRATIONS.get(3).cordonedDirChanged(List.of(Uuid.fromString("r4HpEsMuST6nQ4rznIEJVA"))));
+        assertTrue(REGISTRATIONS.get(3).cordonedDirChanged(List.of()));
+        assertTrue(REGISTRATIONS.get(4).cordonedDirChanged(List.of()));
+        
assertTrue(REGISTRATIONS.get(4).cordonedDirChanged(List.of(Uuid.fromString("r4HpEsMuST6nQ4rznIEJVA"))));
+    }
+
+    @Test
+    void testCordonedDirectories() {
+        assertNull(REGISTRATIONS.get(0).cordonedDirectories());
+        assertEquals(List.of(Uuid.fromString("r4HpEsMuST6nQ4rznIEJVA")), 
REGISTRATIONS.get(3).cordonedDirectories());
+        assertNull(REGISTRATIONS.get(4).cordonedDirectories());
+    }
+
+    @Test
+    void testCordonedLogDirsRoundTrip() {
+        ImageWriterOptions options = new 
ImageWriterOptions.Builder(MetadataVersion.IBP_4_2_IV1).build();
+        ApiMessageAndVersion record = REGISTRATIONS.get(4).toRecord(options);
+        assertNull(((RegisterBrokerRecord) 
record.message()).cordonedLogDirs());
+
+        options = new 
ImageWriterOptions.Builder(MetadataVersion.latestProduction()).build();
+        record = REGISTRATIONS.get(4).toRecord(options);
+        assertEquals(REGISTRATIONS.get(4).cordonedDirectories(), 
((RegisterBrokerRecord) record.message()).cordonedLogDirs());
     }
 }
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/common/DirectoryEventHandler.java
 
b/server-common/src/main/java/org/apache/kafka/server/common/DirectoryEventHandler.java
index 987d27bd5d2..c6e02d0373a 100644
--- 
a/server-common/src/main/java/org/apache/kafka/server/common/DirectoryEventHandler.java
+++ 
b/server-common/src/main/java/org/apache/kafka/server/common/DirectoryEventHandler.java
@@ -30,7 +30,6 @@ public interface DirectoryEventHandler {
         @Override public void handleAssignment(TopicIdPartition partition, 
Uuid directoryId, String reason, Runnable callback) {}
         @Override public void handleFailure(Uuid directoryId) {}
         @Override public void handleCordoned(Set<Uuid> directoryIds) {}
-        @Override public void handleUncordoned(Set<Uuid> directoryIds) {}
     };
 
     /**
@@ -49,14 +48,8 @@ public interface DirectoryEventHandler {
     void handleFailure(Uuid directoryId);
 
     /**
-     * Handle the transition of an online log directory to the cordoned state.
-     * @param directoryIds  The directory IDs to cordon
+     * Handle the update of the cordoned.log.dirs configuration.
+     * @param directoryIds  The directory IDs of the cordoned log dirs
      */
     void handleCordoned(Set<Uuid> directoryIds);
-
-    /**
-     * Handle the transition of a cordoned log directory to the online state.
-     * @param directoryIds  The directory IDs to uncordon
-     */
-    void handleUncordoned(Set<Uuid> directoryIds);
 }
diff --git 
a/server/src/main/java/org/apache/kafka/server/BrokerLifecycleManager.java 
b/server/src/main/java/org/apache/kafka/server/BrokerLifecycleManager.java
index 98e182cfb5c..3c8bc8807cd 100644
--- a/server/src/main/java/org/apache/kafka/server/BrokerLifecycleManager.java
+++ b/server/src/main/java/org/apache/kafka/server/BrokerLifecycleManager.java
@@ -48,6 +48,7 @@ import java.util.OptionalLong;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Supplier;
+import java.util.stream.Collectors;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static java.util.concurrent.TimeUnit.NANOSECONDS;
@@ -55,16 +56,17 @@ import static java.util.concurrent.TimeUnit.NANOSECONDS;
 /**
  * The broker lifecycle manager owns the broker state.
  *
- * Its inputs are messages passed in from other parts of the broker and from 
the
+ * <p>Its inputs are messages passed in from other parts of the broker and 
from the
  * controller: requests to start up, or shut down, for example. Its output are 
the broker
  * state and various futures that can be used to wait for broker state 
transitions to
  * occur.
  *
- * The lifecycle manager handles registering the broker with the controller, 
as described
+ * <p>The lifecycle manager handles registering the broker with the 
controller, as described
  * in KIP-631. After registration is complete, it handles sending periodic 
broker
- * heartbeats and processing the responses.
+ * heartbeats and processing the responses. Once the broker has caught up with 
the cluster metadata, it starts
+ * sending the Uuid of its cordoned log directories in its heartbeats.
  *
- * This code uses an event queue paradigm. Modifications get translated into 
events, which
+ * <p>This code uses an event queue paradigm. Modifications get translated 
into events, which
  * are placed on the queue to be processed sequentially. As described in the 
JavaDoc for
  * each variable, most mutable state can be accessed only from that event 
queue thread.
  * In some cases we expose a volatile variable which can be read from any 
thread, but only
@@ -76,7 +78,7 @@ public class BrokerLifecycleManager {
     private final KafkaEventQueue eventQueue;
     private final AbstractKafkaConfig config;
     private final Time time;
-    private final Set<Uuid> logDirs;
+    private final Map<String, Uuid> logDirs;
     private final Runnable shutdownHook;
     private final Supplier<Boolean> cordonedLogDirsSupported;
 
@@ -148,10 +150,10 @@ public class BrokerLifecycleManager {
     private final Map<Uuid, Boolean> offlineDirs = new HashMap<>();
 
     /**
-     * Map of cordoned log directories. The value is true if the directory is 
cordoned.
+     * Set of cordoned log directories. The is null at startup until the 
broker has caught up with the metadata
      * This variable can only be read or written from the event queue thread.
      */
-    private final Map<Uuid, Boolean> cordonedLogDirs = new HashMap<>();
+    private Set<Uuid> cordonedLogDirs;
 
     /**
      * True if we sent an event queue to the active controller requesting 
controlled
@@ -217,7 +219,7 @@ public class BrokerLifecycleManager {
             AbstractKafkaConfig config,
             Time time,
             String threadNamePrefix,
-            Set<Uuid> logDirs) {
+            Map<String, Uuid> logDirs) {
         this(config, time, threadNamePrefix, logDirs, () -> { }, () -> false);
     }
 
@@ -225,7 +227,7 @@ public class BrokerLifecycleManager {
             AbstractKafkaConfig config,
             Time time,
             String threadNamePrefix,
-            Set<Uuid> logDirs,
+            Map<String, Uuid> logDirs,
             Runnable shutdownHook,
             Supplier<Boolean> cordonedLogDirsSupported) {
         this.config = config;
@@ -260,14 +262,8 @@ public class BrokerLifecycleManager {
                String clusterId,
                ListenerCollection advertisedListeners,
                Map<String, VersionRange> supportedFeatures,
-               OptionalLong previousBrokerEpoch,
-               Set<Uuid> cordonedLogDirs) {
+               OptionalLong previousBrokerEpoch) {
         this.previousBrokerEpoch = previousBrokerEpoch;
-        if (!cordonedLogDirs.isEmpty()) {
-            // At this point we don't have fresh metadata yet so we don't know 
if the cordoned log dirs feature is supported.
-            // Queue an event, it will be ignored by the controller handling 
the broker registration if the feature is disabled.
-            eventQueue.append(new CordonedDirEvent(cordonedLogDirs));
-        }
         eventQueue.append(new StartupEvent(highestMetadataOffsetProvider,
                 channelManager, clusterId, advertisedListeners, 
supportedFeatures));
     }
@@ -300,16 +296,6 @@ public class BrokerLifecycleManager {
         }
     }
 
-    /**
-     * Propagate directory uncordoned to the controller.
-     * @param directories The IDs for the directories that is uncordoned.
-     */
-    public void propagateDirectoryUncordoned(Set<Uuid> directories) {
-        if (cordonedLogDirsSupported.get()) {
-            eventQueue.append(new UncordonedDirEvent(directories));
-        }
-    }
-
     public void resendBrokerRegistration() {
         eventQueue.append(new ResendBrokerRegistrationEvent());
     }
@@ -442,28 +428,7 @@ public class BrokerLifecycleManager {
 
         @Override
         public void run() {
-            for (Uuid dir : dirs) {
-                cordonedLogDirs.put(dir, true);
-            }
-            if (registered) {
-                scheduleNextCommunicationImmediately();
-            }
-        }
-    }
-
-    private class UncordonedDirEvent implements EventQueue.Event {
-
-        private final Set<Uuid> dirs;
-
-        UncordonedDirEvent(Set<Uuid> dirs) {
-            this.dirs = dirs;
-        }
-
-        @Override
-        public void run() {
-            for (Uuid dir : dirs) {
-                cordonedLogDirs.put(dir, false);
-            }
+            cordonedLogDirs = dirs;
             if (registered) {
                 scheduleNextCommunicationImmediately();
             }
@@ -515,7 +480,7 @@ public class BrokerLifecycleManager {
                         .setMinSupportedVersion(range.min())
                         .setMaxSupportedVersion(range.max()))
         );
-        List<Uuid> sortedLogDirs = new ArrayList<>(logDirs);
+        List<Uuid> sortedLogDirs = new ArrayList<>(logDirs.values());
         sortedLogDirs.sort(Uuid::compareTo);
         BrokerRegistrationRequestData data = new 
BrokerRegistrationRequestData()
             .setBrokerId(nodeId)
@@ -526,8 +491,7 @@ public class BrokerLifecycleManager {
             .setListeners(advertisedListeners)
             .setRack(rack.orElse(null))
             .setPreviousBrokerEpoch(previousBrokerEpoch.orElse(-1L))
-            .setLogDirs(sortedLogDirs)
-            
.setCordonedLogDirs(cordonedLogDirs.entrySet().stream().filter(Map.Entry::getValue).map(Map.Entry::getKey).toList());
+            .setLogDirs(sortedLogDirs);
         if (logger.isDebugEnabled()) {
             logger.debug("Sending broker registration {}", data);
         }
@@ -605,8 +569,10 @@ public class BrokerLifecycleManager {
             .setCurrentMetadataOffset(metadataOffset)
             .setWantFence(!readyToUnfence)
             .setWantShutDown(state == BrokerState.PENDING_CONTROLLED_SHUTDOWN)
-            .setOfflineLogDirs(new ArrayList<>(offlineDirs.keySet()))
-            
.setCordonedLogDirs(cordonedLogDirs.entrySet().stream().filter(Map.Entry::getValue).map(Map.Entry::getKey).toList());
+            .setOfflineLogDirs(new ArrayList<>(offlineDirs.keySet()));
+        if (initialCatchUpFuture.isDone() && 
!initialCatchUpFuture.isCompletedExceptionally() && 
cordonedLogDirsSupported.get()) {
+            data.setCordonedLogDirs(List.copyOf(cordonedLogDirs));
+        }
         if (logger.isTraceEnabled()) {
             logger.trace("Sending broker heartbeat {}", data);
         }
@@ -650,6 +616,7 @@ public class BrokerLifecycleManager {
             this.currentOfflineDirs = currentOfflineDirs;
         }
 
+        @SuppressWarnings({"CyclomaticComplexity"})
         @Override
         public void run() {
             communicationInFlight = false;
@@ -681,6 +648,12 @@ public class BrokerLifecycleManager {
                                 logger.info("The broker has caught up. 
Transitioning from STARTING to RECOVERY.");
                                 state = BrokerState.RECOVERY;
                                 initialCatchUpFuture.complete(null);
+                                // Now that the broker has caught up with the 
latest metadata, the configuration should
+                                // be up to date, so we can retrieve the 
cordoned log dirs to include them in the
+                                // next heartbeat request
+                                cordonedLogDirs = 
config.cordonedLogDirs().stream()
+                                    .flatMap(logDir -> 
Optional.ofNullable(logDirs.get(logDir)).stream())
+                                    .collect(Collectors.toSet());
                             } else {
                                 logger.debug("The broker is STARTING. Still 
waiting to catch up with cluster metadata.");
                             }
diff --git 
a/server/src/test/java/org/apache/kafka/server/CordonedLogDirsIntegrationTest.java
 
b/server/src/test/java/org/apache/kafka/server/CordonedLogDirsIntegrationTest.java
index def1cbbe191..558c7324d4e 100644
--- 
a/server/src/test/java/org/apache/kafka/server/CordonedLogDirsIntegrationTest.java
+++ 
b/server/src/test/java/org/apache/kafka/server/CordonedLogDirsIntegrationTest.java
@@ -35,10 +35,12 @@ import org.apache.kafka.common.config.ConfigResource;
 import org.apache.kafka.common.errors.InvalidConfigurationException;
 import org.apache.kafka.common.errors.InvalidReplicaAssignmentException;
 import org.apache.kafka.common.errors.InvalidReplicationFactorException;
+import org.apache.kafka.common.errors.InvalidRequestException;
 import org.apache.kafka.common.test.ClusterInstance;
 import org.apache.kafka.common.test.api.ClusterConfigProperty;
 import org.apache.kafka.common.test.api.ClusterTest;
 import org.apache.kafka.common.test.api.ClusterTestDefaults;
+import org.apache.kafka.common.test.api.Type;
 import org.apache.kafka.server.common.MetadataVersion;
 import org.apache.kafka.test.TestUtils;
 
@@ -233,6 +235,55 @@ public class CordonedLogDirsIntegrationTest {
         }
     }
 
+    @ClusterTest()
+    public void testCordonUnknownLogDirs() {
+        try (Admin admin = clusterInstance.admin()) {
+            Throwable t = assertThrows(ExecutionException.class,
+                    () -> 
admin.incrementalAlterConfigs(cordonedDirsConfig("/unknown/log/dir", 
BROKER_0)).all().get());
+            // ConfigAdminManager.validateBrokerConfigChange throws 
InvalidRequestException instead of InvalidConfigurationException
+            assertInstanceOf(InvalidRequestException.class, t.getCause());
+        }
+    }
+
+    @ClusterTest(
+            types = Type.KRAFT,
+            brokers = 2,
+            controllers = 1
+    )
+    public void testUpdateCordonedDirsViaController() throws Exception {
+        // Make sure we don't try to shut down the controller
+        int brokerId = clusterInstance.brokerIds().stream().filter(id -> 
!clusterInstance.controllerIds().contains(id)).findFirst().get();
+        ConfigResource cr = new ConfigResource(ConfigResource.Type.BROKER, 
String.valueOf(brokerId));
+        List<String> logDirs = 
clusterInstance.brokers().get(brokerId).config().logDirs();
+        String logDirsStr = String.join(",", logDirs);
+        try (Admin controllerAdmin = clusterInstance.admin(Map.of(), true);
+             Admin admin = clusterInstance.admin()) {
+            // We can't set cordoned log dirs via the controller
+            Throwable t = assertThrows(ExecutionException.class,
+                    () -> 
controllerAdmin.incrementalAlterConfigs(cordonedDirsConfig(logDirsStr, 
cr)).all().get());
+            assertInstanceOf(InvalidConfigurationException.class, 
t.getCause());
+
+            // We can set cordoned log dirs via the broker
+            admin.incrementalAlterConfigs(cordonedDirsConfig(logDirsStr, 
cr)).all().get();
+
+            // Shutdown the broker
+            clusterInstance.brokers().get(brokerId).shutdown();
+            clusterInstance.brokers().get(brokerId).awaitShutdown();
+
+            // We can clear a cordoned log dir via the controller
+            
controllerAdmin.incrementalAlterConfigs(cordonedDirsConfig(logDirs.get(0), 
cr)).all().get();
+            controllerAdmin.incrementalAlterConfigs(cordonedDirsConfig("", 
cr)).all().get();
+
+            // Restart the broker
+            clusterInstance.brokers().get(brokerId).startup();
+
+            // We can set cordoned log dirs via the broker
+            admin.incrementalAlterConfigs(cordonedDirsConfig(logDirsStr, 
cr)).all().get();
+            // We can keep cordoned log dirs via the controller
+            
controllerAdmin.incrementalAlterConfigs(cordonedDirsConfig(logDirsStr, 
cr)).all().get();
+        }
+    }
+
     @ClusterTest(
             brokers = 2,
             controllers = 1
@@ -253,7 +304,6 @@ public class CordonedLogDirsIntegrationTest {
                 Map<Integer, Map<String, LogDirDescription>> 
logDescriptionsPerBroker = 
admin.describeLogDirs(clusterInstance.brokerIds()).allDescriptions().get();
                 for (Map.Entry<Integer, Map<String, LogDirDescription>> entry 
: logDescriptionsPerBroker.entrySet()) {
                     for (LogDirDescription logDirDescription : 
entry.getValue().values()) {
-                        
assertFalse(logDirDescription.replicaInfos().isEmpty());
                         found += logDirDescription.replicaInfos().size();
                         if (entry.getKey() == brokerId) {
                             logDirDescription.replicaInfos().forEach((tp, 
replicaInfo) ->
diff --git 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogManager.java 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogManager.java
index 548c2112e2a..131cd0dcef0 100644
--- 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogManager.java
+++ 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogManager.java
@@ -282,8 +282,8 @@ public class LogManager {
         return cleaner;
     }
 
-    public Set<Uuid> directoryIdsSet() {
-        return Set.copyOf(directoryIds.values());
+    public Map<String, Uuid> directoryIds() {
+        return Map.copyOf(directoryIds);
     }
 
     private Set<File> offlineLogDirs() {
diff --git 
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/MockController.java
 
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/MockController.java
index 371228ca8c9..cb883971b1b 100644
--- 
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/MockController.java
+++ 
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/MockController.java
@@ -176,7 +176,8 @@ public class MockController implements Controller {
     public synchronized CompletableFuture<CreateTopicsResponseData> 
createTopics(
         ControllerRequestContext context,
         CreateTopicsRequestData request,
-        Set<String> describable
+        Set<String> describable,
+        boolean forwarded
     ) {
         CreateTopicsResponseData response = new CreateTopicsResponseData();
         for (CreatableTopic topic : request.topics()) {
@@ -355,7 +356,8 @@ public class MockController implements Controller {
     public CompletableFuture<Map<ConfigResource, ApiError>> 
incrementalAlterConfigs(
         ControllerRequestContext context,
         Map<ConfigResource, Map<String, Entry<AlterConfigOp.OpType, String>>> 
configChanges,
-        boolean validateOnly
+        boolean validateOnly,
+        boolean forwarded
     ) {
         Map<ConfigResource, ApiError> results = new HashMap<>();
         for (Entry<ConfigResource, Map<String, Entry<AlterConfigOp.OpType, 
String>>> entry :
@@ -417,7 +419,8 @@ public class MockController implements Controller {
     public CompletableFuture<Map<ConfigResource, ApiError>> legacyAlterConfigs(
         ControllerRequestContext context,
         Map<ConfigResource, Map<String, String>> newConfigs,
-        boolean validateOnly
+        boolean validateOnly,
+        boolean forwarded
     ) {
         Map<ConfigResource, ApiError> results = new HashMap<>();
         if (!validateOnly) {


Reply via email to