This is an automated email from the ASF dual-hosted git repository.
mimaison pushed a commit to branch 4.3
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.3 by this push:
new cf9f8ad376a KAFKA-20441: Fix handling of cordoned log dirs (#22070)
cf9f8ad376a is described below
commit cf9f8ad376a9088d4d9bb9be217d28554e1be5b6
Author: Mickael Maison <[email protected]>
AuthorDate: Mon May 4 13:54:46 2026 +0200
KAFKA-20441: Fix handling of cordoned log dirs (#22070)
This tightens the validation of the cordoned.log.dirs configuration on
controllers. Controllers only allow uncordoning log directories as they
don't have access to the full list of log directory paths.
This also delays the initial propagation of cordoned log dirs.
Previously log dirs where initially sent as part of the broker
registration, now they are only sent via heartbeat once the broker has
fully caught up with the metadata. This ensure dynamic changes cannot be
lost.
Reviewers: Chia-Ping Tsai <[email protected]>, José Armando García
Sancio <[email protected]>
---
.../common/message/BrokerHeartbeatRequest.json | 5 +-
.../common/message/BrokerRegistrationRequest.json | 7 +-
.../common/message/BrokerRegistrationResponse.json | 3 +-
core/src/main/scala/kafka/log/LogManager.scala | 2 +-
.../src/main/scala/kafka/server/BrokerServer.scala | 12 +-
.../main/scala/kafka/server/ControllerApis.scala | 12 +-
.../scala/kafka/server/DynamicBrokerConfig.scala | 22 ++--
core/src/main/scala/kafka/server/KafkaConfig.scala | 2 +-
.../kafka/api/PlaintextAdminIntegrationTest.scala | 2 +-
.../test/scala/unit/kafka/log/LogManagerTest.scala | 2 +-
.../kafka/server/BrokerLifecycleManagerTest.scala | 84 ++++++--------
.../unit/kafka/server/ControllerApisTest.scala | 3 +-
.../kafka/server/DynamicBrokerConfigTest.scala | 19 ++-
.../unit/kafka/server/ReplicationQuotasTest.scala | 1 +
.../kafka/controller/ClusterControlManager.java | 21 +---
.../controller/ConfigurationControlManager.java | 80 ++++++++++---
.../org/apache/kafka/controller/Controller.java | 12 +-
.../apache/kafka/controller/QuorumController.java | 16 ++-
.../controller/ReplicationControlManager.java | 18 +--
.../java/org/apache/kafka/image/ClusterDelta.java | 2 +-
.../apache/kafka/metadata/BrokerRegistration.java | 23 +++-
.../metadata/BrokerRegistrationChangeRecord.json | 5 +-
.../common/metadata/RegisterBrokerRecord.json | 5 +-
.../controller/ClusterControlManagerTest.java | 36 +++++-
.../ConfigurationControlManagerTest.java | 128 +++++++++++++++++----
.../QuorumControllerIntegrationTestUtils.java | 2 +-
.../kafka/controller/QuorumControllerTest.java | 34 +++---
.../controller/ReplicationControlManagerTest.java | 90 ++++++++++-----
.../image/node/ClusterImageBrokersNodeTest.java | 2 +-
.../kafka/metadata/BrokerRegistrationTest.java | 90 +++++++++------
.../kafka/server/common/DirectoryEventHandler.java | 11 +-
.../kafka/server/BrokerLifecycleManager.java | 79 +++++--------
.../server/CordonedLogDirsIntegrationTest.java | 52 ++++++++-
.../kafka/storage/internals/log/LogManager.java | 1 -
.../apache/kafka/common/test/MockController.java | 9 +-
35 files changed, 557 insertions(+), 335 deletions(-)
diff --git
a/clients/src/main/resources/common/message/BrokerHeartbeatRequest.json
b/clients/src/main/resources/common/message/BrokerHeartbeatRequest.json
index 41d5fa0a150..63910c2fc4e 100644
--- a/clients/src/main/resources/common/message/BrokerHeartbeatRequest.json
+++ b/clients/src/main/resources/common/message/BrokerHeartbeatRequest.json
@@ -35,7 +35,8 @@
"about": "True if the broker wants to be shut down, false otherwise." },
{ "name": "OfflineLogDirs", "type": "[]uuid", "versions": "1+",
"taggedVersions": "1+", "tag": 0,
"about": "Log directories that failed and went offline." },
- { "name": "CordonedLogDirs", "type": "[]uuid", "versions": "2+",
"taggedVersions": "2+",
- "tag": "1", "about": "Log directories that are cordoned." }
+ { "name": "CordonedLogDirs", "type": "[]uuid", "versions": "2+",
"taggedVersions": "2+", "tag": 1,
+ "nullableVersions": "2+", "default": "null",
+ "about": "List of log directories that are cordoned. This is null before
the broker reaches the RECOVERY state." }
]
}
diff --git
a/clients/src/main/resources/common/message/BrokerRegistrationRequest.json
b/clients/src/main/resources/common/message/BrokerRegistrationRequest.json
index 53e37f21d5a..04cd1b65a79 100644
--- a/clients/src/main/resources/common/message/BrokerRegistrationRequest.json
+++ b/clients/src/main/resources/common/message/BrokerRegistrationRequest.json
@@ -17,13 +17,12 @@
// Version 2 adds LogDirs for KIP-858
// Version 3 adds the PreviousBrokerEpoch for the KIP-966
// Version 4 fixes KAFKA-17011, which blocked SupportedFeatures.MinVersion in
the response from being 0.
-// Version 5 adds the CordonedLogDirs flexible field
{
"apiKey":62,
"type": "request",
"listeners": ["controller"],
"name": "BrokerRegistrationRequest",
- "validVersions": "0-5",
+ "validVersions": "0-4",
"flexibleVersions": "0+",
"fields": [
{ "name": "BrokerId", "type": "int32", "versions": "0+", "entityType":
"brokerId",
@@ -61,8 +60,6 @@
{ "name": "LogDirs", "type": "[]uuid", "versions": "2+",
"about": "Log directories configured in this broker which are
available.", "ignorable": true },
{ "name": "PreviousBrokerEpoch", "type": "int64", "versions": "3+",
"default": "-1", "ignorable": true,
- "about": "The epoch before a clean shutdown." },
- { "name": "CordonedLogDirs", "type": "[]uuid", "versions": "5+",
"taggedVersions": "5+",
- "tag": "0", "about": "Log directories that are cordoned." }
+ "about": "The epoch before a clean shutdown." }
]
}
diff --git
a/clients/src/main/resources/common/message/BrokerRegistrationResponse.json
b/clients/src/main/resources/common/message/BrokerRegistrationResponse.json
index 956e945df40..308fe3860d2 100644
--- a/clients/src/main/resources/common/message/BrokerRegistrationResponse.json
+++ b/clients/src/main/resources/common/message/BrokerRegistrationResponse.json
@@ -17,12 +17,11 @@
// Version 2 adds the PreviousBrokerEpoch to the request for the KIP-966
// Version 3 is the same as version 2 (new field in request).
// Version 4 is the same as version 2 (new field in request).
-// Version 5 is the same as version 2 (new field in request).
{
"apiKey": 62,
"type": "response",
"name": "BrokerRegistrationResponse",
- "validVersions": "0-5",
+ "validVersions": "0-4",
"flexibleVersions": "0+",
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
diff --git a/core/src/main/scala/kafka/log/LogManager.scala
b/core/src/main/scala/kafka/log/LogManager.scala
index 85c56672aaa..1c2a31262af 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -126,7 +126,7 @@ class LogManager(logDirs: Seq[File],
// visible for testing
private[log] val dirLocks = lockLogDirs(liveLogDirs)
- private val directoryIds: mutable.Map[String, Uuid] =
loadDirectoryIds(liveLogDirs)
+ val directoryIds: mutable.Map[String, Uuid] = loadDirectoryIds(liveLogDirs)
def directoryIdsSet: Predef.Set[Uuid] = directoryIds.values.toSet
def updateCordonedLogDirs(newCordonedLogDirs: Set[String]): Unit = {
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala
b/core/src/main/scala/kafka/server/BrokerServer.scala
index f7aef286e10..1c46c95f2b9 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -65,7 +65,6 @@ import java.util
import java.util.Optional
import java.util.concurrent.locks.{Condition, ReentrantLock}
import java.util.concurrent.{CompletableFuture, ExecutionException, TimeUnit,
TimeoutException}
-import java.util.stream.Collectors
import scala.collection.Map
import scala.jdk.CollectionConverters._
import scala.jdk.OptionConverters.RichOption
@@ -223,7 +222,7 @@ class BrokerServer(
config,
time,
s"broker-${config.nodeId}-",
- logManager.directoryIdsSet.asJava,
+ util.Map.copyOf(logManager.directoryIds.asJava),
() => new Thread(() => shutdown(), "kafka-shutdown-thread").start(),
() => metadataCache.metadataVersion().isCordonedLogDirsSupported)
@@ -338,9 +337,6 @@ class BrokerServer(
override def handleCordoned(directoryIds: util.Set[Uuid]): Unit =
lifecycleManager.propagateDirectoryCordoned(directoryIds)
-
- override def handleUncordoned(directoryIds: util.Set[Uuid]): Unit =
- lifecycleManager.propagateDirectoryUncordoned(directoryIds)
}
/**
@@ -420,17 +416,13 @@ class BrokerServer(
s"broker-${config.nodeId}-",
config.brokerHeartbeatIntervalMs
)
- val initialCordonedLogDirs: util.Set[Uuid] =
config.cordonedLogDirs().stream()
- .map(dir => logManager.directoryId(dir).get)
- .collect(Collectors.toSet())
lifecycleManager.start(
() => sharedServer.loader.lastAppliedOffset(),
brokerLifecycleChannelManager,
clusterId,
listenerInfo.toBrokerRegistrationRequest,
featuresRemapped,
- logManager.readBrokerEpochFromCleanShutdownFiles(),
- initialCordonedLogDirs
+ logManager.readBrokerEpochFromCleanShutdownFiles()
)
// The FetchSessionCache is divided into config.numIoThreads shards,
each responsible
diff --git a/core/src/main/scala/kafka/server/ControllerApis.scala
b/core/src/main/scala/kafka/server/ControllerApis.scala
index d928bbac01b..fbe76012508 100644
--- a/core/src/main/scala/kafka/server/ControllerApis.scala
+++ b/core/src/main/scala/kafka/server/ControllerApis.scala
@@ -370,7 +370,8 @@ class ControllerApis(
authHelper.authorize(request.context, CREATE, CLUSTER, CLUSTER_NAME,
logIfDenied = false),
names => authHelper.filterByAuthorized(request.context, CREATE, TOPIC,
names)(identity),
names => authHelper.filterByAuthorized(request.context,
DESCRIBE_CONFIGS, TOPIC,
- names, logIfDenied = false)(identity))
+ names, logIfDenied = false)(identity),
+ request.isForwarded)
future.handle[Unit] { (result, exception) =>
val response = if (exception != null) {
createTopicsRequest.getErrorResponse(exception)
@@ -392,7 +393,8 @@ class ControllerApis(
request: CreateTopicsRequestData,
hasClusterAuth: Boolean,
getCreatableTopics: Iterable[String] => Set[String],
- getDescribableTopics: Iterable[String] => Set[String]
+ getDescribableTopics: Iterable[String] => Set[String],
+ forwarded: Boolean
): CompletableFuture[CreateTopicsResponseData] = {
val topicNames = new util.HashSet[String]()
val duplicateTopicNames = new util.HashSet[String]()
@@ -422,7 +424,7 @@ class ControllerApis(
iterator.remove()
}
}
- controller.createTopics(context, effectiveRequest,
describableTopicNames).thenApply { response =>
+ controller.createTopics(context, effectiveRequest, describableTopicNames,
forwarded).thenApply { response =>
duplicateTopicNames.forEach { name =>
response.topics().add(new CreatableTopicResult().
setName(name).
@@ -534,7 +536,7 @@ class ControllerApis(
iterator.remove()
}
}
- controller.legacyAlterConfigs(context, configChanges,
alterConfigsRequest.data.validateOnly)
+ controller.legacyAlterConfigs(context, configChanges,
alterConfigsRequest.data.validateOnly, request.isForwarded)
.handle[Unit] { (controllerResults, exception) =>
if (exception != null) {
requestHelper.handleError(request, exception)
@@ -773,7 +775,7 @@ class ControllerApis(
iterator.remove()
}
}
- controller.incrementalAlterConfigs(context, configChanges,
alterConfigsRequest.data.validateOnly)
+ controller.incrementalAlterConfigs(context, configChanges,
alterConfigsRequest.data.validateOnly, request.isForwarded)
.handle[Unit] { (controllerResults, exception) =>
if (exception != null) {
requestHelper.handleError(request, exception)
diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index 0fd64ef350d..5a2405b4a60 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -26,8 +26,7 @@ import kafka.network.DataPlaneAcceptor
import kafka.raft.KafkaRaftManager
import kafka.server.DynamicBrokerConfig._
import kafka.utils.Logging
-import org.apache.kafka.common.Reconfigurable
-import org.apache.kafka.common.Endpoint
+import org.apache.kafka.common.{Endpoint, Reconfigurable, Uuid}
import org.apache.kafka.common.config.{ConfigDef, ConfigException,
ConfigResource, SslConfigs}
import org.apache.kafka.common.metadata.{ConfigRecord, MetadataRecordType}
import org.apache.kafka.common.metrics.{Metrics, MetricsReporter}
@@ -48,9 +47,11 @@ import org.apache.kafka.server.util.LockUtils.{inReadLock,
inWriteLock}
import org.apache.kafka.snapshot.RecordsSnapshotReader
import org.apache.kafka.storage.internals.log.LogConfig
+import java.util.stream.Collectors
import scala.util.Using
import scala.collection._
import scala.jdk.CollectionConverters._
+import scala.jdk.OptionConverters.RichOption
/**
* Dynamic broker configurations may be defined at two levels:
@@ -610,20 +611,13 @@ class DynamicLogConfig(logManager: LogManager,
directoryEventHandler: DirectoryE
override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig):
Unit = {
val newBrokerDefaults = new util.HashMap[String,
Object](newConfig.extractLogConfigMap)
-
- logManager.updateCordonedLogDirs(newConfig.cordonedLogDirs.asScala.toSet)
- val newCordoned: Set[String] = newConfig.cordonedLogDirs.asScala.toSet --
oldConfig.cordonedLogDirs.asScala.toSet
- val newUncordoned: Set[String] = oldConfig.cordonedLogDirs.asScala.toSet
-- newConfig.cordonedLogDirs.asScala.toSet
- if (newCordoned.nonEmpty) {
- directoryEventHandler.handleCordoned(newCordoned.map(dir =>
logManager.directoryId(dir).get).toSet.asJava)
- }
- if (newUncordoned.nonEmpty) {
- directoryEventHandler.handleUncordoned(newUncordoned.map(dir =>
logManager.directoryId(dir).get).toSet.asJava)
- }
-
logManager.reconfigureDefaultLogConfig(new LogConfig(newBrokerDefaults))
-
updateLogsConfig(newBrokerDefaults.asScala)
+
+ logManager.updateCordonedLogDirs(newConfig.cordonedLogDirs.asScala.toSet)
+ directoryEventHandler.handleCordoned(newConfig.cordonedLogDirs.stream
+ .flatMap[Uuid](dir => logManager.directoryId(dir).toJava.stream)
+ .collect(Collectors.toSet[Uuid]))
}
}
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index c4707b68ff2..c4268d0f104 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -510,7 +510,7 @@ class KafkaConfig private(doLog: Boolean, val props:
util.Map[_, _])
require(cordonedLogDirs.size == 1, s"When
${ServerLogConfigs.CORDONED_LOG_DIRS_CONFIG} is set to
${ServerLogConfigs.CORDONED_LOG_DIRS_ALL}, it must not contain other values")
} else {
val unknownLogDirs =
cordonedLogDirs.asScala.filter(!logDirs().contains(_))
- require(unknownLogDirs.isEmpty, s"All entries in
${ServerLogConfigs.CORDONED_LOG_DIRS_CONFIG} must be present in
${ServerLogConfigs.CORDONED_LOG_DIRS_CONFIG} or
${ServerLogConfigs.LOG_DIR_CONFIG}. Missing entries :
${unknownLogDirs.mkString(", ")}")
+ require(unknownLogDirs.isEmpty, s"All entries in
${ServerLogConfigs.CORDONED_LOG_DIRS_CONFIG} must be present in
${ServerLogConfigs.LOG_DIRS_CONFIG} or ${ServerLogConfigs.LOG_DIR_CONFIG}.
Missing entries : ${unknownLogDirs.mkString(", ")}")
}
}
diff --git
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
index a9c23a0bc08..2f88a71942e 100644
---
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
+++
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
@@ -4323,7 +4323,7 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
controllerServer.controller.incrementalAlterConfigs(ANONYMOUS_CONTEXT,
util.Map.of(controllerNodeResource,
util.Map.of(CleanerConfig.LOG_CLEANER_DELETE_RETENTION_MS_PROP,
- new SimpleImmutableEntry(AlterConfigOp.OpType.SET, "34"))),
false).get()
+ new SimpleImmutableEntry(AlterConfigOp.OpType.SET, "34"))), false,
false).get()
ensureConsistentKRaftMetadata()
waitUntilTrue(() => brokers.forall(_.config.originals.getOrDefault(
diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index e4b26070728..1d7c195c373 100755
--- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@ -1164,7 +1164,7 @@ class LogManagerTest {
assertEquals(None, logManager.directoryId(dirs(2).getAbsolutePath))
assertEquals(Some(Uuid.fromString("kQfNPJ2FTHq_6Qlyyv6Jqg")),
logManager.directoryId(dirs(3).getAbsolutePath))
assertTrue(logManager.directoryId(dirs(3).getAbsolutePath).isDefined)
- assertEquals(2, logManager.directoryIdsSet.size)
+ assertEquals(2, logManager.directoryIds.size)
}
/**
diff --git
a/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala
b/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala
index d64d9f00b9c..51d51967871 100644
--- a/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala
@@ -38,6 +38,10 @@ import scala.jdk.CollectionConverters._
@Timeout(value = 12)
class BrokerLifecycleManagerTest {
private var manager: BrokerLifecycleManager = null
+ private val logDirs: util.Map[String, Uuid] = util.Map.of(
+ "/var/data",
+ Uuid.fromString("oFoTeS9QT0aAyCyH41v45A")
+ )
@AfterEach
def tearDown(): Unit = {
@@ -61,18 +65,18 @@ class BrokerLifecycleManagerTest {
@Test
def testCreateAndClose(): Unit = {
val context = new RegistrationTestContext(configProperties)
- manager = new BrokerLifecycleManager(context.config, context.time,
"create-and-close-", util.Set.of(Uuid.fromString("oFoTeS9QT0aAyCyH41v45A")))
+ manager = new BrokerLifecycleManager(context.config, context.time,
"create-and-close-", logDirs)
manager.close()
}
@Test
def testCreateStartAndClose(): Unit = {
val context = new RegistrationTestContext(configProperties)
- manager = new BrokerLifecycleManager(context.config, context.time,
"create-start-and-close-",
util.Set.of(Uuid.fromString("uiUADXZWTPixVvp6UWFWnw")))
+ manager = new BrokerLifecycleManager(context.config, context.time,
"create-start-and-close-", logDirs)
assertEquals(BrokerState.NOT_RUNNING, manager.state)
manager.start(() => context.highestMetadataOffset.get(),
context.mockChannelManager, context.clusterId,
context.advertisedListeners,
- Collections.emptyMap(), OptionalLong.empty(), util.Set.of())
+ Collections.emptyMap(), OptionalLong.empty())
TestUtils.retry(60000) {
assertEquals(BrokerState.STARTING, manager.state)
}
@@ -83,12 +87,12 @@ class BrokerLifecycleManagerTest {
@Test
def testSuccessfulRegistration(): Unit = {
val context = new RegistrationTestContext(configProperties)
- manager = new BrokerLifecycleManager(context.config, context.time,
"successful-registration-",
util.Set.of(Uuid.fromString("gCpDJgRlS2CBCpxoP2VMsQ")))
+ manager = new BrokerLifecycleManager(context.config, context.time,
"successful-registration-", logDirs)
val controllerNode = new Node(3000, "localhost", 8021)
context.controllerNodeProvider.node.set(controllerNode)
manager.start(() => context.highestMetadataOffset.get(),
context.mockChannelManager, context.clusterId,
context.advertisedListeners,
- Collections.emptyMap(), OptionalLong.of(10L), util.Set.of())
+ Collections.emptyMap(), OptionalLong.of(10L))
TestUtils.retry(60000) {
assertEquals(1, context.mockChannelManager.unsentQueue.size)
assertEquals(10L,
context.mockChannelManager.unsentQueue.getFirst.request.build().asInstanceOf[BrokerRegistrationRequest].data().previousBrokerEpoch())
@@ -105,7 +109,7 @@ class BrokerLifecycleManagerTest {
def testRegistrationTimeout(): Unit = {
val context = new RegistrationTestContext(configProperties)
val controllerNode = new Node(3000, "localhost", 8021)
- manager = new BrokerLifecycleManager(context.config, context.time,
"registration-timeout-", util.Set.of(Uuid.fromString("9XBOAtr4T0Wbx2sbiWh6xg")))
+ manager = new BrokerLifecycleManager(context.config, context.time,
"registration-timeout-", logDirs)
context.controllerNodeProvider.node.set(controllerNode)
def newDuplicateRegistrationResponse(): Unit = {
context.mockClient.prepareResponseFrom(new BrokerRegistrationResponse(
@@ -117,7 +121,7 @@ class BrokerLifecycleManagerTest {
assertEquals(1, context.mockClient.futureResponses().size)
manager.start(() => context.highestMetadataOffset.get(),
context.mockChannelManager, context.clusterId,
context.advertisedListeners,
- Collections.emptyMap(), OptionalLong.empty(), util.Set.of())
+ Collections.emptyMap(), OptionalLong.empty())
// We should send the first registration request and get a failure
immediately
TestUtils.retry(60000) {
context.poll()
@@ -145,7 +149,7 @@ class BrokerLifecycleManagerTest {
@Test
def testControlledShutdown(): Unit = {
val context = new RegistrationTestContext(configProperties)
- manager = new BrokerLifecycleManager(context.config, context.time,
"controlled-shutdown-", util.Set.of(Uuid.fromString("B4RtUz1ySGip3A7ZFYB2dg")))
+ manager = new BrokerLifecycleManager(context.config, context.time,
"controlled-shutdown-", logDirs)
val controllerNode = new Node(3000, "localhost", 8021)
context.controllerNodeProvider.node.set(controllerNode)
context.mockClient.prepareResponseFrom(new BrokerRegistrationResponse(
@@ -154,7 +158,7 @@ class BrokerLifecycleManagerTest {
new BrokerHeartbeatResponseData().setIsCaughtUp(true)), controllerNode)
manager.start(() => context.highestMetadataOffset.get(),
context.mockChannelManager, context.clusterId,
context.advertisedListeners,
- Collections.emptyMap(), OptionalLong.empty(), util.Set.of())
+ Collections.emptyMap(), OptionalLong.empty())
TestUtils.retry(10000) {
context.poll()
manager.eventQueue.wakeup()
@@ -226,14 +230,14 @@ class BrokerLifecycleManagerTest {
@Test
def testAlwaysSendsAccumulatedOfflineDirs(): Unit = {
val ctx = new RegistrationTestContext(configProperties)
- manager = new BrokerLifecycleManager(ctx.config, ctx.time,
"offline-dirs-sent-in-heartbeat-",
util.Set.of(Uuid.fromString("0IbF1sjhSGG6FNvnrPbqQg")))
+ manager = new BrokerLifecycleManager(ctx.config, ctx.time,
"offline-dirs-sent-in-heartbeat-", logDirs)
val controllerNode = new Node(3000, "localhost", 8021)
ctx.controllerNodeProvider.node.set(controllerNode)
val registration = prepareResponse(ctx, new BrokerRegistrationResponse(new
BrokerRegistrationResponseData().setBrokerEpoch(1000)))
manager.start(() => ctx.highestMetadataOffset.get(),
ctx.mockChannelManager, ctx.clusterId, ctx.advertisedListeners,
- Collections.emptyMap(), OptionalLong.empty(), util.Set.of())
+ Collections.emptyMap(), OptionalLong.empty())
poll(ctx, manager, registration)
def nextHeartbeatDirs(): Set[String] =
@@ -250,9 +254,9 @@ class BrokerLifecycleManagerTest {
@Test
def testRegistrationIncludesDirs(): Unit = {
- val logDirs = util.Set.of(Uuid.fromString("ad5FLIeCTnaQdai5vOjeng"),
Uuid.fromString("ybdzUKmYSLK6oiIpI6CPlw"))
+ val dirs = util.Map.of("/dir1", Uuid.fromString("ad5FLIeCTnaQdai5vOjeng"),
"/dir2", Uuid.fromString("ybdzUKmYSLK6oiIpI6CPlw"))
val ctx = new RegistrationTestContext(configProperties)
- manager = new BrokerLifecycleManager(ctx.config, ctx.time,
"registration-includes-dirs-", logDirs)
+ manager = new BrokerLifecycleManager(ctx.config, ctx.time,
"registration-includes-dirs-", dirs)
val controllerNode = new Node(3000, "localhost", 8021)
ctx.controllerNodeProvider.node.set(controllerNode)
@@ -260,23 +264,23 @@ class BrokerLifecycleManagerTest {
manager.start(() => ctx.highestMetadataOffset.get(),
ctx.mockChannelManager, ctx.clusterId, ctx.advertisedListeners,
- Collections.emptyMap(), OptionalLong.empty(), util.Set.of())
+ Collections.emptyMap(), OptionalLong.empty())
val request = poll(ctx, manager,
registration).asInstanceOf[BrokerRegistrationRequest]
- assertEquals(logDirs, new util.HashSet(request.data.logDirs()))
+ assertEquals(new util.HashSet(dirs.values()), new
util.HashSet(request.data.logDirs()))
}
@Test
def testKraftJBODMetadataVersionUpdateEvent(): Unit = {
val ctx = new RegistrationTestContext(configProperties)
- manager = new BrokerLifecycleManager(ctx.config, ctx.time,
"jbod-metadata-version-update",
util.Set.of(Uuid.fromString("gCpDJgRlS2CBCpxoP2VMsQ")))
+ manager = new BrokerLifecycleManager(ctx.config, ctx.time,
"jbod-metadata-version-update", logDirs)
val controllerNode = new Node(3000, "localhost", 8021)
ctx.controllerNodeProvider.node.set(controllerNode)
manager.start(() => ctx.highestMetadataOffset.get(),
ctx.mockChannelManager, ctx.clusterId, ctx.advertisedListeners,
- Collections.emptyMap(), OptionalLong.of(10L), util.Set.of())
+ Collections.emptyMap(), OptionalLong.of(10L))
def doPoll[T<:AbstractRequest](response: AbstractResponse) = poll(ctx,
manager, prepareResponse[T](ctx, response))
def nextHeartbeatRequest() = doPoll[AbstractRequest](new
BrokerHeartbeatResponse(new BrokerHeartbeatResponseData()))
@@ -300,33 +304,13 @@ class BrokerLifecycleManagerTest {
}
@Test
- def testRegistrationIncludesCordonedDirs(): Unit = {
- val logDirs = util.Set.of(Uuid.fromString("ad5FLIeCTnaQdai5vOjeng"),
Uuid.fromString("ybdzUKmYSLK6oiIpI6CPlw"))
- val ctx = new RegistrationTestContext(configProperties)
- manager = new BrokerLifecycleManager(ctx.config, ctx.time,
"registration-includes-cordoned-dirs-", logDirs)
- val controllerNode = new Node(3000, "localhost", 8021)
- ctx.controllerNodeProvider.node.set(controllerNode)
-
- val registration = prepareResponse(ctx, new BrokerRegistrationResponse(new
BrokerRegistrationResponseData().setBrokerEpoch(1000)))
-
- manager.start(() => ctx.highestMetadataOffset.get(),
- ctx.mockChannelManager, ctx.clusterId, ctx.advertisedListeners,
- Collections.emptyMap(), OptionalLong.empty(),
- logDirs
- )
- val request = poll(ctx, manager,
registration).asInstanceOf[BrokerRegistrationRequest]
-
- assertEquals(logDirs, new util.HashSet(request.data.cordonedLogDirs()))
- }
-
- @Test
- def testAlwaysSendsAccumulatedCordonedDirs(): Unit = {
+ def testCordonedLogDirs(): Unit = {
val ctx = new RegistrationTestContext(configProperties)
var enabled = false
def cordonedLogDirsEnabled(): Boolean = {
enabled
}
- manager = new BrokerLifecycleManager(ctx.config, ctx.time,
"cordoned-dirs-sent-in-heartbeat-",
util.Set.of(Uuid.fromString("0IbF1sjhSGG6FNvnrPbqQg")),
+ manager = new BrokerLifecycleManager(ctx.config, ctx.time,
"cordoned-dirs-sent-in-heartbeat-", logDirs,
() => {}, () => cordonedLogDirsEnabled())
val controllerNode = new Node(3000, "localhost", 8021)
ctx.controllerNodeProvider.node.set(controllerNode)
@@ -334,27 +318,33 @@ class BrokerLifecycleManagerTest {
val registration = prepareResponse(ctx, new BrokerRegistrationResponse(new
BrokerRegistrationResponseData().setBrokerEpoch(1000)))
manager.start(() => ctx.highestMetadataOffset.get(),
ctx.mockChannelManager, ctx.clusterId, ctx.advertisedListeners,
- Collections.emptyMap(), OptionalLong.empty(), util.Set.of())
+ Collections.emptyMap(), OptionalLong.empty())
poll(ctx, manager, registration)
def nextHeartbeatDirs(): Set[Uuid] =
- poll(ctx, manager, prepareResponse[BrokerHeartbeatRequest](ctx, new
BrokerHeartbeatResponse(new BrokerHeartbeatResponseData())))
- .data().cordonedLogDirs().asScala.toSet
- assertEquals(Set(), nextHeartbeatDirs())
+ nextRequest().data().cordonedLogDirs().asScala.toSet
+
+ def nextRequest(): BrokerHeartbeatRequest =
+ poll(ctx, manager, prepareResponse[BrokerHeartbeatRequest](ctx, new
BrokerHeartbeatResponse(new BrokerHeartbeatResponseData().setIsCaughtUp(true))))
+
+ while (!manager.initialCatchUpFuture().isDone) {
+ nextRequest()
+ }
+ assertNull(nextRequest().data().cordonedLogDirs())
val dir1 = Uuid.randomUuid()
val dir2 = Uuid.randomUuid()
manager.propagateDirectoryCordoned(util.Set.of(dir1))
- assertEquals(Set(), nextHeartbeatDirs())
+ assertNull(nextRequest().data().cordonedLogDirs())
enabled = true
manager.propagateDirectoryCordoned(util.Set.of(dir1))
assertEquals(Set(dir1), nextHeartbeatDirs())
manager.propagateDirectoryCordoned(util.Set.of(dir2))
- assertEquals(Set(dir1, dir2), nextHeartbeatDirs())
- manager.propagateDirectoryUncordoned(util.Set.of(dir1))
assertEquals(Set(dir2), nextHeartbeatDirs())
- manager.propagateDirectoryUncordoned(util.Set.of(dir2))
+ manager.propagateDirectoryCordoned(util.Set.of(dir1, dir2))
+ assertEquals(Set(dir1, dir2), nextHeartbeatDirs())
+ manager.propagateDirectoryCordoned(util.Set.of())
assertEquals(Set(), nextHeartbeatDirs())
}
}
diff --git a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
index d3e3d548029..be5b3a837bc 100644
--- a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
@@ -744,7 +744,8 @@ class ControllerApisTest {
assertEquals(expectedResponse,
controllerApis.createTopics(ANONYMOUS_CONTEXT, request,
hasClusterAuth = false,
_ => Set("baz", "indescribable"),
- _ => Set("baz")).get().topics().asScala.toSet)
+ _ => Set("baz"),
+ forwarded = false).get().topics().asScala.toSet)
}
@ParameterizedTest(name = "testCreateTopicsMutationQuota with throttle: {0}")
diff --git
a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
index c2fb3f935e3..e56c6fbef50 100755
--- a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
@@ -1065,7 +1065,6 @@ class DynamicBrokerConfigTest {
assertTrue(ctx.config.cordonedLogDirs.isEmpty)
val logDirs = ctx.config.logDirs()
verify(ctx.directoryEventHandler, never()).handleCordoned(anySet)
- verify(ctx.directoryEventHandler, never()).handleUncordoned(anySet)
// Cordoning 1 new log dir, so 1 new handleCordoned invocation
val props = new Properties()
@@ -1073,42 +1072,42 @@ class DynamicBrokerConfigTest {
ctx.config.dynamicConfig.updateBrokerConfig(0, props)
assertEquals(util.List.of(logDirs.get(0)), ctx.config.cordonedLogDirs)
verify(ctx.directoryEventHandler, times(1)).handleCordoned(anySet)
- verify(ctx.directoryEventHandler, never()).handleUncordoned(anySet)
// When using *, no other entries must be specified, so no new invocations
props.put(ServerLogConfigs.CORDONED_LOG_DIRS_CONFIG, "*,/invalid/log/dir")
ctx.config.dynamicConfig.updateBrokerConfig(0, props)
assertEquals(util.List.of(logDirs.get(0)), ctx.config.cordonedLogDirs)
verify(ctx.directoryEventHandler, times(1)).handleCordoned(anySet)
- verify(ctx.directoryEventHandler, never()).handleUncordoned(anySet)
// Invalid log dir, so no new invocations
props.put(ServerLogConfigs.CORDONED_LOG_DIRS_CONFIG, "/invalid/log/dir")
ctx.config.dynamicConfig.updateBrokerConfig(0, props)
assertEquals(util.List.of(logDirs.get(0)), ctx.config.cordonedLogDirs)
verify(ctx.directoryEventHandler, times(1)).handleCordoned(anySet)
- verify(ctx.directoryEventHandler, times(0)).handleUncordoned(anySet)
// * cordons the 2nd log dir, so 1 new handleCordoned invocation
props.put(ServerLogConfigs.CORDONED_LOG_DIRS_CONFIG, "*")
ctx.config.dynamicConfig.updateBrokerConfig(0, props)
assertEquals(logDirs, ctx.config.cordonedLogDirs)
verify(ctx.directoryEventHandler, times(2)).handleCordoned(anySet)
- verify(ctx.directoryEventHandler, never()).handleUncordoned(anySet)
- // clearing all cordoned log dirs, so 1 new handleUncordoned invocation
+ // same value so no new invocations
+ props.put(ServerLogConfigs.CORDONED_LOG_DIRS_CONFIG, "*")
+ ctx.config.dynamicConfig.updateBrokerConfig(0, props)
+ assertEquals(logDirs, ctx.config.cordonedLogDirs)
+ verify(ctx.directoryEventHandler, times(2)).handleCordoned(anySet)
+
+ // clearing all cordoned log dirs, so 1 new handleCordoned invocation
props.put(ServerLogConfigs.CORDONED_LOG_DIRS_CONFIG, "")
ctx.config.dynamicConfig.updateBrokerConfig(0, props)
assertTrue(ctx.config.cordonedLogDirs.isEmpty)
- verify(ctx.directoryEventHandler, times(2)).handleCordoned(anySet)
- verify(ctx.directoryEventHandler, times(1)).handleUncordoned(anySet)
+ verify(ctx.directoryEventHandler, times(3)).handleCordoned(anySet)
// * cordons all log dirs, so 1 new handleCordoned invocation
props.put(ServerLogConfigs.CORDONED_LOG_DIRS_CONFIG, String.join(",",
logDirs))
ctx.config.dynamicConfig.updateBrokerConfig(0, props)
assertEquals(logDirs, ctx.config.cordonedLogDirs)
- verify(ctx.directoryEventHandler, times(3)).handleCordoned(anySet)
- verify(ctx.directoryEventHandler, times(1)).handleUncordoned(anySet)
+ verify(ctx.directoryEventHandler, times(4)).handleCordoned(anySet)
}
@Test
diff --git a/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
b/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
index 42a5e8accc9..676dc6f21da 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicationQuotasTest.scala
@@ -120,6 +120,7 @@ class ReplicationQuotasTest extends QuorumTestHarness {
Map(new ConfigResource(BROKER, String.valueOf(brokerId)) -> Map(
QuotaConfig.LEADER_REPLICATION_THROTTLED_RATE_CONFIG -> entry,
QuotaConfig.FOLLOWER_REPLICATION_THROTTLED_RATE_CONFIG ->
entry).asJava).asJava,
+ false,
false
).get()
}
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
index 4fc14209e34..38d4c75b4f5 100644
---
a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
+++
b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
@@ -432,9 +432,6 @@ public class ClusterControlManager {
if
(featureControl.metadataVersionOrThrow().isDirectoryAssignmentSupported()) {
record.setLogDirs(request.logDirs());
}
- if
(featureControl.metadataVersionOrThrow().isCordonedLogDirsSupported()) {
- record.setCordonedLogDirs(request.cordonedLogDirs());
- }
if (!request.incarnationId().equals(prevIncarnationId)) {
int prevNumRecords = records.size();
boolean isCleanShutdown = cleanShutdownDetectionEnabled ?
@@ -555,22 +552,6 @@ public class ClusterControlManager {
return OptionalLong.empty();
}
- public void updateCordonedLogDirs(int brokerId, List<Uuid>
cordonedLogDirs) {
- brokerRegistrations.compute(brokerId,
- (k, brokerRegistration) -> new BrokerRegistration.Builder().
- setId(brokerId).
- setEpoch(brokerRegistration.epoch()).
- setIncarnationId(brokerRegistration.incarnationId()).
- setListeners(brokerRegistration.listeners()).
-
setSupportedFeatures(brokerRegistration.supportedFeatures()).
- setRack(brokerRegistration.rack()).
- setFenced(brokerRegistration.fenced()).
-
setInControlledShutdown(brokerRegistration.inControlledShutdown()).
- setDirectories(brokerRegistration.directories()).
- setCordonedDirectories(cordonedLogDirs).
- build());
- }
-
public void replay(RegisterBrokerRecord record, long offset) {
registerBrokerRecordOffsets.put(record.brokerId(), offset);
int brokerId = record.brokerId();
@@ -663,7 +644,7 @@ public class ClusterControlManager {
() -> new IllegalStateException(String.format("Unable to
replay %s: unknown " +
"value for inControlledShutdown field: %x", record,
record.inControlledShutdown())));
Optional<List<Uuid>> directoriesChange =
Optional.ofNullable(record.logDirs()).filter(list -> !list.isEmpty());
- Optional<List<Uuid>> cordonedDirectoriesChange =
Optional.ofNullable(record.cordonedLogDirs()).filter(list -> !list.isEmpty());
+ Optional<List<Uuid>> cordonedDirectoriesChange =
Optional.ofNullable(record.cordonedLogDirs());
replayRegistrationChange(
record,
record.brokerId(),
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
index 013a04c3bac..89793b70103 100644
---
a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
+++
b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
@@ -45,6 +45,7 @@ import org.apache.kafka.timeline.TimelineHashSet;
import org.slf4j.Logger;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
@@ -54,6 +55,7 @@ import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Consumer;
+import java.util.regex.Pattern;
import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.APPEND;
import static org.apache.kafka.common.config.ConfigResource.Type.BROKER;
@@ -67,6 +69,7 @@ import static
org.apache.kafka.server.config.ServerLogConfigs.CORDONED_LOG_DIRS_
public class ConfigurationControlManager {
public static final ConfigResource DEFAULT_NODE = new
ConfigResource(Type.BROKER, "");
+ private static final Pattern COMMA_WITH_WHITESPACE =
Pattern.compile("\\s*,\\s*");
private final Logger log;
private final SnapshotRegistry snapshotRegistry;
@@ -210,7 +213,8 @@ public class ConfigurationControlManager {
*/
ControllerResult<Map<ConfigResource, ApiError>> incrementalAlterConfigs(
Map<ConfigResource, Map<String, Entry<OpType, String>>> configChanges,
- boolean newlyCreatedResource
+ boolean newlyCreatedResource,
+ boolean forwarded
) {
List<ApiMessageAndVersion> outputRecords =
BoundedList.newArrayBacked(MAX_RECORDS_PER_USER_OP);
@@ -220,7 +224,8 @@ public class ConfigurationControlManager {
ApiError apiError =
incrementalAlterConfigResource(resourceEntry.getKey(),
resourceEntry.getValue(),
newlyCreatedResource,
- outputRecords);
+ outputRecords,
+ forwarded);
outputResults.put(resourceEntry.getKey(), apiError);
}
outputRecords.addAll(createClearElrRecordsAsNeeded(outputRecords));
@@ -252,14 +257,16 @@ public class ConfigurationControlManager {
ControllerResult<ApiError> incrementalAlterConfig(
ConfigResource configResource,
Map<String, Entry<OpType, String>> keyToOps,
- boolean newlyCreatedResource
+ boolean newlyCreatedResource,
+ boolean forwarded
) {
List<ApiMessageAndVersion> outputRecords =
BoundedList.newArrayBacked(MAX_RECORDS_PER_USER_OP);
ApiError apiError = incrementalAlterConfigResource(configResource,
keyToOps,
newlyCreatedResource,
- outputRecords);
+ outputRecords,
+ forwarded);
outputRecords.addAll(createClearElrRecordsAsNeeded(outputRecords));
return ControllerResult.atomicOf(outputRecords, apiError);
@@ -269,7 +276,8 @@ public class ConfigurationControlManager {
ConfigResource configResource,
Map<String, Entry<OpType, String>> keysToOps,
boolean newlyCreatedResource,
- List<ApiMessageAndVersion> outputRecords
+ List<ApiMessageAndVersion> outputRecords,
+ boolean forwarded
) {
List<ApiMessageAndVersion> newRecords = new ArrayList<>();
for (Entry<String, Entry<OpType, String>> keysToOpsEntry :
keysToOps.entrySet()) {
@@ -321,7 +329,7 @@ public class ConfigurationControlManager {
setValue(newValue), (short) 0));
}
}
- ApiError error = validateAlterConfig(configResource, newRecords,
List.of(), newlyCreatedResource);
+ ApiError error = validateAlterConfig(configResource, newRecords,
List.of(), newlyCreatedResource, forwarded);
if (error.isFailure()) {
return error;
}
@@ -333,7 +341,8 @@ public class ConfigurationControlManager {
ConfigResource configResource,
List<ApiMessageAndVersion> recordsExplicitlyAltered,
List<ApiMessageAndVersion> recordsImplicitlyDeleted,
- boolean newlyCreatedResource
+ boolean newlyCreatedResource,
+ boolean forwarded
) {
Map<String, String> allConfigs = new HashMap<>();
Map<String, String> existingConfigsMap = new HashMap<>();
@@ -349,8 +358,10 @@ public class ConfigurationControlManager {
return DISALLOWED_BROKER_MIN_ISR_TRANSITION_ERROR;
} else if (isDisallowedClusterMinIsrTransition(configRecord)) {
return DISALLOWED_CLUSTER_MIN_ISR_REMOVAL_ERROR;
- } else if (isCordonedLogDirsDisallowed(configRecord)) {
- return DISALLOWED_CORDONED_LOG_DIRS_ERROR;
+ } else if (isCordonedLogDirsDisabled(configRecord)) {
+ return DISABLED_CORDONED_LOG_DIRS_ERROR;
+ } else if (isCordonedLogDirsInvalid(configRecord,
existingConfigsMap.get(CORDONED_LOG_DIRS_CONFIG), forwarded)) {
+ return INVALID_CORDONED_LOG_DIRS_ERROR;
} else if (configRecord.value() == null) {
allConfigs.remove(configRecord.name());
} else if (configRecord.value().length() > Short.MAX_VALUE) {
@@ -368,8 +379,10 @@ public class ConfigurationControlManager {
return DISALLOWED_BROKER_MIN_ISR_TRANSITION_ERROR;
} else if (isDisallowedClusterMinIsrTransition(configRecord)) {
return DISALLOWED_CLUSTER_MIN_ISR_REMOVAL_ERROR;
- } else if (isCordonedLogDirsDisallowed(configRecord)) {
- return DISALLOWED_CORDONED_LOG_DIRS_ERROR;
+ } else if (isCordonedLogDirsDisabled(configRecord)) {
+ return DISABLED_CORDONED_LOG_DIRS_ERROR;
+ } else if (isCordonedLogDirsInvalid(configRecord,
existingConfigsMap.get(CORDONED_LOG_DIRS_CONFIG), forwarded)) {
+ return INVALID_CORDONED_LOG_DIRS_ERROR;
} else {
allConfigs.remove(configRecord.name());
}
@@ -407,10 +420,14 @@ public class ConfigurationControlManager {
new ApiError(INVALID_CONFIG, "The configuration value cannot be added
because " +
"it exceeds the maximum value size of " + Short.MAX_VALUE + "
bytes.");
- static final ApiError DISALLOWED_CORDONED_LOG_DIRS_ERROR =
+ static final ApiError DISABLED_CORDONED_LOG_DIRS_ERROR =
new ApiError(INVALID_CONFIG, "The " + CORDONED_LOG_DIRS_CONFIG + "
configuration value cannot be " +
"set because it requires metadata.version >= " +
MetadataVersion.IBP_4_3_IV0);
+ static final ApiError INVALID_CORDONED_LOG_DIRS_ERROR =
+ new ApiError(INVALID_CONFIG, "When updating " +
CORDONED_LOG_DIRS_CONFIG + " via controllers, " +
+ " the new value must be a subset of the current
configuration value.");
+
boolean isDisallowedBrokerMinIsrTransition(ConfigRecord configRecord) {
if (configRecord.name().equals(MIN_IN_SYNC_REPLICAS_CONFIG) &&
configRecord.resourceType() == BROKER.id() &&
@@ -422,7 +439,32 @@ public class ConfigurationControlManager {
return false;
}
- boolean isCordonedLogDirsDisallowed(ConfigRecord configRecord) {
+ /**
+ * Return whether the update to cordoned.log.dirs is valid or not
+ *
+ * Updates to cordoned.log.dirs normally go through the concerned broker
which is able to validate the new value
+ * before forwarding the request to the controller.
+ * However, it's also possible to directly go to controllers, but since
controllers only have directory ids, they
+ * cannot fully validate updates (cordoned.log.dirs is a list of paths).
So if the request has not been forwarded
+ * by a broker, controllers can only accept updates that remove entries in
cordoned.log.dirs.
+ *
+ * @param configRecord The configuration record
+ * @param currentValue The current cordoned.log.dirs value
+ * @param forwarded True is the request has been forwarded by a broker
+ */
+ boolean isCordonedLogDirsInvalid(ConfigRecord configRecord, String
currentValue, boolean forwarded) {
+ if (!configRecord.name().equals(CORDONED_LOG_DIRS_CONFIG) ||
configRecord.resourceType() != BROKER.id() ||
+ forwarded || configRecord.value() == null ||
configRecord.value().trim().isEmpty()) {
+ return false;
+ }
+ List<String> currentDirs = currentValue == null
+ ? List.of()
+ : Arrays.asList(COMMA_WITH_WHITESPACE.split(currentValue.trim(),
-1));
+ List<String> newDirs =
Arrays.asList(COMMA_WITH_WHITESPACE.split(configRecord.value().trim(), -1));
+ return !currentDirs.containsAll(newDirs);
+ }
+
+ boolean isCordonedLogDirsDisabled(ConfigRecord configRecord) {
if (configRecord.name().equals(CORDONED_LOG_DIRS_CONFIG) &&
configRecord.resourceType() == BROKER.id()) {
return
!featureControl.metadataVersionOrThrow().isCordonedLogDirsSupported();
@@ -449,11 +491,13 @@ public class ConfigurationControlManager {
*
* @param newConfigs The new configurations to install for each
resource.
* All existing configurations will be
overwritten.
+ * @param forwarded True if the request was forwarded.
* @return The result.
*/
ControllerResult<Map<ConfigResource, ApiError>> legacyAlterConfigs(
Map<ConfigResource, Map<String, String>> newConfigs,
- boolean newlyCreatedResource
+ boolean newlyCreatedResource,
+ boolean forwarded
) {
List<ApiMessageAndVersion> outputRecords =
BoundedList.newArrayBacked(MAX_RECORDS_PER_USER_OP);
@@ -464,7 +508,8 @@ public class ConfigurationControlManager {
resourceEntry.getValue(),
newlyCreatedResource,
outputRecords,
- outputResults);
+ outputResults,
+ forwarded);
}
outputRecords.addAll(createClearElrRecordsAsNeeded(outputRecords));
return ControllerResult.atomicOf(outputRecords, outputResults);
@@ -474,7 +519,8 @@ public class ConfigurationControlManager {
Map<String, String> newConfigs,
boolean newlyCreatedResource,
List<ApiMessageAndVersion>
outputRecords,
- Map<ConfigResource, ApiError>
outputResults) {
+ Map<ConfigResource, ApiError>
outputResults,
+ boolean forwarded) {
List<ApiMessageAndVersion> recordsExplicitlyAltered = new
ArrayList<>();
Map<String, String> currentConfigs = configData.get(configResource);
if (currentConfigs == null) {
@@ -503,7 +549,7 @@ public class ConfigurationControlManager {
setValue(null), (short) 0));
}
}
- ApiError error = validateAlterConfig(configResource,
recordsExplicitlyAltered, recordsImplicitlyDeleted, newlyCreatedResource);
+ ApiError error = validateAlterConfig(configResource,
recordsExplicitlyAltered, recordsImplicitlyDeleted, newlyCreatedResource,
forwarded);
if (error.isFailure()) {
outputResults.put(configResource, error);
return;
diff --git a/metadata/src/main/java/org/apache/kafka/controller/Controller.java
b/metadata/src/main/java/org/apache/kafka/controller/Controller.java
index 6845e908e7a..28aa076291b 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/Controller.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/Controller.java
@@ -136,13 +136,15 @@ public interface Controller extends AclMutator,
AutoCloseable {
* @param context The controller request context.
* @param request The CreateTopicsRequest data.
* @param describable The topics which we have DESCRIBE permission on.
+ * @param forwarded True if the request was forwarded.
*
* @return A future yielding the response.
*/
CompletableFuture<CreateTopicsResponseData> createTopics(
ControllerRequestContext context,
CreateTopicsRequestData request,
- Set<String> describable
+ Set<String> describable,
+ boolean forwarded
);
/**
@@ -252,13 +254,15 @@ public interface Controller extends AclMutator,
AutoCloseable {
* @param context The controller request context.
* @param configChanges The changes.
* @param validateOnly True if we should validate the changes but not
apply them.
+ * @param forwarded True if the request was forwarded.
*
* @return A future yielding a map from config resources to
error results.
*/
CompletableFuture<Map<ConfigResource, ApiError>> incrementalAlterConfigs(
ControllerRequestContext context,
Map<ConfigResource, Map<String, Map.Entry<AlterConfigOp.OpType,
String>>> configChanges,
- boolean validateOnly
+ boolean validateOnly,
+ boolean forwarded
);
/**
@@ -293,13 +297,15 @@ public interface Controller extends AclMutator,
AutoCloseable {
* @param context The controller request context.
* @param newConfigs The new configuration maps to apply.
* @param validateOnly True if we should validate the changes but not
apply them.
+ * @param forwarded True if the request was forwarded.
*
* @return A future yielding a map from config resources to
error results.
*/
CompletableFuture<Map<ConfigResource, ApiError>> legacyAlterConfigs(
ControllerRequestContext context,
Map<ConfigResource, Map<String, String>> newConfigs,
- boolean validateOnly
+ boolean validateOnly,
+ boolean forwarded
);
/**
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index 0f827d8551d..fcf182714e9 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -1788,13 +1788,14 @@ public final class QuorumController implements
Controller {
@Override
public CompletableFuture<CreateTopicsResponseData> createTopics(
ControllerRequestContext context,
- CreateTopicsRequestData request, Set<String> describable
+ CreateTopicsRequestData request, Set<String> describable,
+ boolean forwarded
) {
if (request.topics().isEmpty()) {
return CompletableFuture.completedFuture(new
CreateTopicsResponseData());
}
return appendWriteEvent("createTopics", context.deadlineNs(),
- () -> replicationControl.createTopics(context, request,
describable));
+ () -> replicationControl.createTopics(context, request,
describable, forwarded));
}
@Override
@@ -1884,14 +1885,15 @@ public final class QuorumController implements
Controller {
public CompletableFuture<Map<ConfigResource, ApiError>>
incrementalAlterConfigs(
ControllerRequestContext context,
Map<ConfigResource, Map<String, Entry<OpType, String>>> configChanges,
- boolean validateOnly
+ boolean validateOnly,
+ boolean forwarded
) {
if (configChanges.isEmpty()) {
return CompletableFuture.completedFuture(Map.of());
}
return appendWriteEvent("incrementalAlterConfigs",
context.deadlineNs(), () -> {
ControllerResult<Map<ConfigResource, ApiError>> result =
- configurationControl.incrementalAlterConfigs(configChanges,
false);
+ configurationControl.incrementalAlterConfigs(configChanges,
false, forwarded);
if (validateOnly) {
return result.withoutRecords();
} else {
@@ -1929,14 +1931,16 @@ public final class QuorumController implements
Controller {
@Override
public CompletableFuture<Map<ConfigResource, ApiError>> legacyAlterConfigs(
ControllerRequestContext context,
- Map<ConfigResource, Map<String, String>> newConfigs, boolean
validateOnly
+ Map<ConfigResource, Map<String, String>> newConfigs,
+ boolean validateOnly,
+ boolean forwarded
) {
if (newConfigs.isEmpty()) {
return CompletableFuture.completedFuture(Map.of());
}
return appendWriteEvent("legacyAlterConfigs", context.deadlineNs(), ()
-> {
ControllerResult<Map<ConfigResource, ApiError>> result =
- configurationControl.legacyAlterConfigs(newConfigs, false);
+ configurationControl.legacyAlterConfigs(newConfigs, false,
forwarded);
if (validateOnly) {
return result.withoutRecords();
} else {
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
index 198e8b2f4ed..f2901778931 100644
---
a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
+++
b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
@@ -627,7 +627,8 @@ public class ReplicationControlManager {
ControllerResult<CreateTopicsResponseData> createTopics(
ControllerRequestContext context,
CreateTopicsRequestData request,
- Set<String> describable
+ Set<String> describable,
+ boolean forwarded
) {
Map<String, ApiError> topicErrors = new HashMap<>();
List<ApiMessageAndVersion> records =
BoundedList.newArrayBacked(MAX_RECORDS_PER_USER_OP);
@@ -657,7 +658,7 @@ public class ReplicationControlManager {
List<ApiMessageAndVersion> configRecords;
if (keyToOps != null) {
ControllerResult<ApiError> configResult =
-
configurationControl.incrementalAlterConfig(configResource, keyToOps, true);
+
configurationControl.incrementalAlterConfig(configResource, keyToOps, true,
forwarded);
if (configResult.response().isFailure()) {
topicErrors.put(topic.name(), configResult.response());
continue;
@@ -1545,17 +1546,17 @@ public class ReplicationControlManager {
List<Uuid> cordonedDirs,
List<ApiMessageAndVersion> records
) {
+ // cordonedDirs is null until a broker has caught up with the latest
metadata, so just ignore
+ if (cordonedDirs == null) return;
BrokerRegistration registration =
clusterControl.registration(brokerId);
- List<Uuid> newCordonedDirs =
registration.directoryIntersection(cordonedDirs);
- if (!newCordonedDirs.isEmpty()) {
+ boolean cordonedDirsChanged =
registration.cordonedDirChanged(cordonedDirs);
+ if (cordonedDirsChanged) {
records.add(new ApiMessageAndVersion(new
BrokerRegistrationChangeRecord().
setBrokerId(brokerId).setBrokerEpoch(brokerEpoch).
- setCordonedLogDirs(newCordonedDirs),
+ setCordonedLogDirs(cordonedDirs),
(short) 3));
if (log.isDebugEnabled()) {
- List<Uuid> newUncordonedDirs =
registration.directoryDifference(newCordonedDirs);
- log.debug("Directories {} in broker {} marked cordoned,
uncordoned directories: {}",
- newCordonedDirs, brokerId, newUncordonedDirs);
+ log.debug("Directories {} in broker {} marked cordoned",
cordonedDirs, brokerId);
}
}
}
@@ -1697,7 +1698,6 @@ public class ReplicationControlManager {
handleDirectoriesOffline(brokerId, brokerEpoch,
request.offlineLogDirs(), records);
}
if
(featureControl.metadataVersionOrThrow().isCordonedLogDirsSupported()) {
- clusterControl.updateCordonedLogDirs(brokerId,
request.cordonedLogDirs());
handleDirectoriesCordoned(brokerId, brokerEpoch,
request.cordonedLogDirs(), records);
}
boolean isCaughtUp = request.currentMetadataOffset() >=
registerBrokerRecordOffset;
diff --git a/metadata/src/main/java/org/apache/kafka/image/ClusterDelta.java
b/metadata/src/main/java/org/apache/kafka/image/ClusterDelta.java
index bfdf9810a0b..1ab6e44423c 100644
--- a/metadata/src/main/java/org/apache/kafka/image/ClusterDelta.java
+++ b/metadata/src/main/java/org/apache/kafka/image/ClusterDelta.java
@@ -142,7 +142,7 @@ public final class ClusterDelta {
() -> new IllegalStateException(String.format("Unable to
replay %s: unknown " +
"value for inControlledShutdown field: %d", record,
record.inControlledShutdown())));
Optional<List<Uuid>> directoriesChange =
Optional.ofNullable(record.logDirs()).filter(list -> !list.isEmpty());
- Optional<List<Uuid>> cordonedDirectoriesChange =
Optional.ofNullable(record.cordonedLogDirs()).filter(list -> !list.isEmpty());
+ Optional<List<Uuid>> cordonedDirectoriesChange =
Optional.ofNullable(record.cordonedLogDirs());
BrokerRegistration nextRegistration = curRegistration.cloneWith(
fencingChange.asBoolean(),
inControlledShutdownChange.asBoolean(),
diff --git
a/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java
b/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java
index 4dcb5ba1b36..0545869901c 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java
@@ -36,6 +36,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
+import java.util.Set;
import java.util.stream.Collectors;
/**
@@ -67,7 +68,8 @@ public class BrokerRegistration {
this.inControlledShutdown = false;
this.isMigratingZkBroker = false;
this.directories = List.of();
- this.cordonedDirectories = List.of();
+ // This defaults to null indicating the broker has not yet sent
its cordoned log dirs.
+ this.cordonedDirectories = null;
}
public Builder setId(int id) {
@@ -196,7 +198,7 @@ public class BrokerRegistration {
directories = new ArrayList<>(directories);
directories.sort(Uuid::compareTo);
this.directories = Collections.unmodifiableList(directories);
- this.cordonedDirectories =
Collections.unmodifiableList(cordonedDirectories);
+ this.cordonedDirectories = cordonedDirectories == null ? null :
Collections.unmodifiableList(cordonedDirectories);
}
public static BrokerRegistration fromRecord(RegisterBrokerRecord record) {
@@ -282,7 +284,7 @@ public class BrokerRegistration {
}
public boolean hasUncordonedDirs() {
- if (directories.isEmpty()) return true;
+ if (directories.isEmpty() || cordonedDirectories == null) return true;
List<Uuid> dirs = new ArrayList<>(directories);
dirs.removeAll(cordonedDirectories);
return !dirs.isEmpty();
@@ -308,6 +310,15 @@ public class BrokerRegistration {
return results;
}
+ public boolean cordonedDirChanged(List<Uuid> otherDirectories) {
+ // Brokers only start sending their cordoned log dirs once they are
fully caught up with the metadata
+ // until then cordonedDirectories defaults to null to indicate the
value is unknown
+ if (cordonedDirectories == null) return true;
+ Set<Uuid> cordonedDirs = Set.copyOf(cordonedDirectories);
+ Set<Uuid> otherDirs = Set.copyOf(otherDirectories);
+ return !cordonedDirs.equals(otherDirs);
+ }
+
public ApiMessageAndVersion toRecord(ImageWriterOptions options) {
RegisterBrokerRecord registrationRecord = new RegisterBrokerRecord().
setBrokerId(id).
@@ -331,7 +342,7 @@ public class BrokerRegistration {
options.handleLoss("the online log directories of one or more
brokers");
}
- if (cordonedDirectories.isEmpty() ||
options.metadataVersion().isCordonedLogDirsSupported()) {
+ if (cordonedDirectories == null ||
options.metadataVersion().isCordonedLogDirsSupported()) {
registrationRecord.setCordonedLogDirs(cordonedDirectories);
} else {
options.handleLoss("the cordoned log directories of one or more
brokers");
@@ -376,7 +387,7 @@ public class BrokerRegistration {
other.inControlledShutdown == inControlledShutdown &&
other.isMigratingZkBroker == isMigratingZkBroker &&
other.directories.equals(directories) &&
- other.cordonedDirectories.equals(cordonedDirectories);
+ Objects.equals(other.cordonedDirectories, cordonedDirectories);
}
@Override
@@ -416,7 +427,7 @@ public class BrokerRegistration {
if (newFenced == fenced
&& newInControlledShutdownChange == inControlledShutdown
&& newDirectories.equals(directories)
- && newCordonedDirectories.equals(cordonedDirectories))
+ && Objects.equals(newCordonedDirectories, cordonedDirectories))
return this;
return new BrokerRegistration(
diff --git
a/metadata/src/main/resources/common/metadata/BrokerRegistrationChangeRecord.json
b/metadata/src/main/resources/common/metadata/BrokerRegistrationChangeRecord.json
index 5329e53b2c7..61109f12913 100644
---
a/metadata/src/main/resources/common/metadata/BrokerRegistrationChangeRecord.json
+++
b/metadata/src/main/resources/common/metadata/BrokerRegistrationChangeRecord.json
@@ -33,7 +33,8 @@
"about": "0 if no change, 1 if the broker is in controlled shutdown." },
{ "name": "LogDirs", "type": "[]uuid", "versions": "2+",
"taggedVersions": "2+", "tag": 2,
"about": "Log directories configured in this broker which are available."
},
- { "name": "CordonedLogDirs", "type": "[]uuid", "versions": "3+",
"taggedVersions": "3+", "tag": "3",
- "about": "Log directories that are cordoned." }
+ { "name": "CordonedLogDirs", "type": "[]uuid", "versions": "3+",
"taggedVersions": "3+", "tag": 3,
+ "nullableVersions": "3+", "default": "null",
+ "about": "Log directories that are cordoned or null if no change." }
]
}
diff --git
a/metadata/src/main/resources/common/metadata/RegisterBrokerRecord.json
b/metadata/src/main/resources/common/metadata/RegisterBrokerRecord.json
index b7db680cbd3..12af50ce17d 100644
--- a/metadata/src/main/resources/common/metadata/RegisterBrokerRecord.json
+++ b/metadata/src/main/resources/common/metadata/RegisterBrokerRecord.json
@@ -60,7 +60,8 @@
"about": "True if the broker is in controlled shutdown." },
{ "name": "LogDirs", "type": "[]uuid", "versions": "3+",
"taggedVersions": "3+", "tag": 0,
"about": "Log directories configured in this broker which are
available." },
- { "name": "CordonedLogDirs", "type": "[]uuid", "versions": "4+",
"taggedVersions": "4+", "tag": "1",
- "about": "Log directories that are cordoned." }
+ { "name": "CordonedLogDirs", "type": "[]uuid", "versions": "4+",
"taggedVersions": "4+", "tag": 1,
+ "nullableVersions": "4+", "default": "null",
+ "about": "Log directories that are cordoned. This is initially null
before the broker starts sending the value via its heartbeats." }
]
}
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
b/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
index 502c3881e9f..a8e030e30c4 100644
---
a/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
@@ -209,11 +209,14 @@ public class ClusterControlManagerTest {
assertFalse(clusterControl.isUnfenced(0));
assertFalse(clusterControl.inControlledShutdown(0));
+ Uuid dir1 = Uuid.fromString("Mj3CW3OSRi29cFeNJlXuAQ");
+ Uuid dir2 = Uuid.fromString("TyNK6XSSQJaJc2q9uflNHg");
RegisterBrokerRecord brokerRecord = new RegisterBrokerRecord().
setBrokerEpoch(100).
setBrokerId(0).
setRack(null).
- setFenced(false);
+ setFenced(false).
+ setLogDirs(List.of(dir1, dir2));
brokerRecord.endPoints().add(new BrokerEndpoint().
setSecurityProtocol(SecurityProtocol.PLAINTEXT.id).
setPort((short) 9092).
@@ -241,6 +244,37 @@ public class ClusterControlManagerTest {
assertTrue(clusterControl.isUnfenced(0));
assertTrue(clusterControl.inControlledShutdown(0));
+
+ // By default cordonedLogDirs is null and means no changes
+ assertEquals(null,
clusterControl.registration(0).cordonedDirectories());
+ registrationChangeRecord = new BrokerRegistrationChangeRecord().
+ setBrokerId(0).
+ setBrokerEpoch(100);
+ clusterControl.replay(registrationChangeRecord);
+ assertEquals(null,
clusterControl.registration(0).cordonedDirectories());
+
+ // Set a cordoned log dir
+ registrationChangeRecord = new BrokerRegistrationChangeRecord().
+ setBrokerId(0).
+ setBrokerEpoch(100).
+ setCordonedLogDirs(List.of(dir1));
+ clusterControl.replay(registrationChangeRecord);
+ assertEquals(List.of(dir1),
clusterControl.registration(0).cordonedDirectories());
+
+ // null cordonedLogDirs so not changes
+ registrationChangeRecord = new BrokerRegistrationChangeRecord().
+ setBrokerId(0).
+ setBrokerEpoch(100);
+ clusterControl.replay(registrationChangeRecord);
+ assertEquals(List.of(dir1),
clusterControl.registration(0).cordonedDirectories());
+
+ // Clears the cordoned dir
+ registrationChangeRecord = new BrokerRegistrationChangeRecord().
+ setBrokerId(0).
+ setBrokerEpoch(100).
+ setCordonedLogDirs(List.of());
+ clusterControl.replay(registrationChangeRecord);
+ assertEquals(List.of(),
clusterControl.registration(0).cordonedDirectories());
}
@Test
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java
b/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java
index 03173552e90..b1e5f4c8872 100644
---
a/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java
@@ -42,10 +42,13 @@ import
org.apache.kafka.server.policy.AlterConfigPolicy.RequestMetadata;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import java.util.AbstractMap.SimpleImmutableEntry;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
@@ -55,6 +58,7 @@ import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
+import java.util.stream.Stream;
import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.APPEND;
import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.DELETE;
@@ -63,8 +67,11 @@ import static
org.apache.kafka.clients.admin.AlterConfigOp.OpType.SUBTRACT;
import static org.apache.kafka.common.config.ConfigResource.Type.BROKER;
import static org.apache.kafka.common.config.ConfigResource.Type.TOPIC;
import static
org.apache.kafka.common.metadata.MetadataRecordType.CONFIG_RECORD;
-import static
org.apache.kafka.controller.ConfigurationControlManager.DISALLOWED_CORDONED_LOG_DIRS_ERROR;
+import static
org.apache.kafka.controller.ConfigurationControlManager.DISABLED_CORDONED_LOG_DIRS_ERROR;
+import static
org.apache.kafka.controller.ConfigurationControlManager.INVALID_CORDONED_LOG_DIRS_ERROR;
import static
org.apache.kafka.server.config.ConfigSynonym.HOURS_TO_MILLISECONDS;
+import static
org.apache.kafka.server.config.ServerLogConfigs.CORDONED_LOG_DIRS_CONFIG;
+import static org.apache.kafka.server.config.ServerLogConfigs.LOG_DIRS_CONFIG;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -168,7 +175,8 @@ public class ConfigurationControlManagerTest {
entry("baz", entry(SUBTRACT, "abc")),
entry("quux", entry(SET, "abc")))),
entry(MYTOPIC, toMap(entry("abc", entry(APPEND, "123"))))),
- true);
+ true,
+ false);
assertEquals(ControllerResult.atomicOf(List.of(new
ApiMessageAndVersion(
new
ConfigRecord().setResourceType(TOPIC.id()).setResourceName("mytopic").
@@ -185,7 +193,7 @@ public class ConfigurationControlManagerTest {
toMap(entry(MYTOPIC, ApiError.NONE))),
manager.incrementalAlterConfigs(toMap(entry(MYTOPIC, toMap(
entry("abc", entry(DELETE, "xyz"))))),
- true));
+ true, false));
}
@Test
@@ -197,7 +205,7 @@ public class ConfigurationControlManagerTest {
Map<String, Entry<AlterConfigOp.OpType, String>> keyToOps =
toMap(entry("abc", entry(APPEND, "123")));
ControllerResult<ApiError> result = manager.
- incrementalAlterConfig(MYTOPIC, keyToOps, true);
+ incrementalAlterConfig(MYTOPIC, keyToOps, true, false);
assertEquals(ControllerResult.atomicOf(List.of(new
ApiMessageAndVersion(
new
ConfigRecord().setResourceType(TOPIC.id()).setResourceName("mytopic").
@@ -210,13 +218,13 @@ public class ConfigurationControlManagerTest {
new
ConfigRecord().setResourceType(TOPIC.id()).setResourceName("mytopic").
setName("abc").setValue(null),
CONFIG_RECORD.highestSupportedVersion())),
ApiError.NONE),
- manager.incrementalAlterConfig(MYTOPIC, toMap(entry("abc",
entry(DELETE, "xyz"))), true));
+ manager.incrementalAlterConfig(MYTOPIC, toMap(entry("abc",
entry(DELETE, "xyz"))), true, false));
// The configuration value exceeding the maximum size is not allowed
to be added.
String largeValue = new String(new char[Short.MAX_VALUE - APPEND.id()
- 1]);
Map<String, Entry<AlterConfigOp.OpType, String>> largeValueOfOps =
toMap(entry("abc", entry(APPEND, largeValue)));
- ControllerResult<ApiError> invalidConfigValueResult =
manager.incrementalAlterConfig(MYTOPIC, largeValueOfOps, true);
+ ControllerResult<ApiError> invalidConfigValueResult =
manager.incrementalAlterConfig(MYTOPIC, largeValueOfOps, true, false);
assertEquals(Errors.INVALID_CONFIG,
invalidConfigValueResult.response().error());
assertEquals("The configuration value cannot be added because it
exceeds the maximum value size of " + Short.MAX_VALUE + " bytes.",
invalidConfigValueResult.response().message());
@@ -230,7 +238,7 @@ public class ConfigurationControlManagerTest {
build();
ControllerResult<Map<ConfigResource, ApiError>> result = manager.
- incrementalAlterConfigs(toMap(entry(MYTOPIC, toMap(entry("abc",
entry(APPEND, "123,456,789"))))), true);
+ incrementalAlterConfigs(toMap(entry(MYTOPIC, toMap(entry("abc",
entry(APPEND, "123,456,789"))))), true, false);
assertEquals(ControllerResult.atomicOf(List.of(new
ApiMessageAndVersion(
new
ConfigRecord().setResourceType(TOPIC.id()).setResourceName("mytopic").
@@ -241,7 +249,7 @@ public class ConfigurationControlManagerTest {
// It's ok for the appended value to be already present
result = manager
- .incrementalAlterConfigs(toMap(entry(MYTOPIC, toMap(entry("abc",
entry(APPEND, "123,456"))))), true);
+ .incrementalAlterConfigs(toMap(entry(MYTOPIC, toMap(entry("abc",
entry(APPEND, "123,456"))))), true, false);
assertEquals(
ControllerResult.atomicOf(List.of(), toMap(entry(MYTOPIC,
ApiError.NONE))),
result
@@ -249,7 +257,7 @@ public class ConfigurationControlManagerTest {
RecordTestUtils.replayAll(manager, result.records());
result = manager
- .incrementalAlterConfigs(toMap(entry(MYTOPIC, toMap(entry("abc",
entry(SUBTRACT, "123,456"))))), true);
+ .incrementalAlterConfigs(toMap(entry(MYTOPIC, toMap(entry("abc",
entry(SUBTRACT, "123,456"))))), true, false);
assertEquals(ControllerResult.atomicOf(List.of(new
ApiMessageAndVersion(
new
ConfigRecord().setResourceType(TOPIC.id()).setResourceName("mytopic").
setName("abc").setValue("789"),
CONFIG_RECORD.highestSupportedVersion())),
@@ -259,7 +267,7 @@ public class ConfigurationControlManagerTest {
// It's ok for the deleted value not to be present
result = manager
- .incrementalAlterConfigs(toMap(entry(MYTOPIC, toMap(entry("abc",
entry(SUBTRACT, "123456"))))), true);
+ .incrementalAlterConfigs(toMap(entry(MYTOPIC, toMap(entry("abc",
entry(SUBTRACT, "123456"))))), true, false);
assertEquals(
ControllerResult.atomicOf(List.of(), toMap(entry(MYTOPIC,
ApiError.NONE))),
result
@@ -282,6 +290,7 @@ public class ConfigurationControlManagerTest {
incrementalAlterConfigs(toMap(entry(BROKER0, toMap(
entry("quux", entry(SET, "1")))),
entry(existingTopic, toMap(entry("def", entry(SET,
"newVal"))))),
+ false,
false);
assertEquals(ControllerResult.atomicOf(List.of(new
ApiMessageAndVersion(
@@ -366,7 +375,8 @@ public class ConfigurationControlManagerTest {
entry("quux", entry(SET, "456")),
entry("broker.config.to.remove", entry(DELETE, null))
))),
- true));
+ true,
+ false));
}
private static class CheckForNullValuesPolicy implements AlterConfigPolicy
{
@@ -408,7 +418,7 @@ public class ConfigurationControlManagerTest {
expectedRecords1, toMap(entry(MYTOPIC, ApiError.NONE))),
manager.legacyAlterConfigs(
toMap(entry(MYTOPIC, toMap(entry("abc", "456"), entry("def",
"901")))),
- true));
+ true, false));
for (ApiMessageAndVersion message : expectedRecords1) {
manager.replay((ConfigRecord) message.message());
}
@@ -422,7 +432,7 @@ public class ConfigurationControlManagerTest {
CONFIG_RECORD.highestSupportedVersion())),
toMap(entry(MYTOPIC, ApiError.NONE))),
manager.legacyAlterConfigs(toMap(entry(MYTOPIC, toMap(entry("def",
"901")))),
- true));
+ true, false));
}
@ParameterizedTest
@@ -438,7 +448,7 @@ public class ConfigurationControlManagerTest {
Map<String, Entry<AlterConfigOp.OpType, String>> keyToOps =
toMap(entry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, entry(SET,
"3")));
ConfigResource brokerConfigResource = new
ConfigResource(ConfigResource.Type.BROKER, "1");
- ControllerResult<ApiError> result =
manager.incrementalAlterConfig(brokerConfigResource, keyToOps, true);
+ ControllerResult<ApiError> result =
manager.incrementalAlterConfig(brokerConfigResource, keyToOps, true, false);
assertEquals(Set.of(), manager.brokersWithConfigs());
assertEquals(ControllerResult.atomicOf(List.of(new
ApiMessageAndVersion(
@@ -503,7 +513,7 @@ public class ConfigurationControlManagerTest {
result = manager.incrementalAlterConfig(new
ConfigResource(ConfigResource.Type.BROKER, "1"),
toMap(entry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG,
removal ? entry(DELETE, null) : entry(SET, "3"))),
- true);
+ true, false);
assertEquals(Errors.INVALID_CONFIG, result.response().error());
assertEquals("Broker-level min.insync.replicas cannot be altered while
ELR is enabled.",
result.response().message());
@@ -512,7 +522,7 @@ public class ConfigurationControlManagerTest {
result = manager.incrementalAlterConfig(new
ConfigResource(ConfigResource.Type.BROKER, ""),
toMap(entry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG,
removal ? entry(DELETE, null) : entry(SET, "3"))),
- true);
+ true, false);
if (removal) {
assertEquals(Errors.INVALID_CONFIG, result.response().error());
assertEquals("Cluster-level min.insync.replicas cannot be removed
while ELR is enabled.",
@@ -574,20 +584,28 @@ public class ConfigurationControlManagerTest {
build();
ControllerResult<ApiError> result = manager.incrementalAlterConfig(new
ConfigResource(ConfigResource.Type.BROKER, "1"),
- toMap(entry(ServerLogConfigs.CORDONED_LOG_DIRS_CONFIG,
entry(SET, "*"))),
- true);
+ toMap(entry(ServerLogConfigs.CORDONED_LOG_DIRS_CONFIG,
entry(SET, ""))),
+ true, false);
+ assertEquals(enabled ? ApiError.NONE :
DISABLED_CORDONED_LOG_DIRS_ERROR, result.response());
- assertEquals(enabled ? ApiError.NONE :
DISALLOWED_CORDONED_LOG_DIRS_ERROR, result.response());
+ result = manager.incrementalAlterConfig(new
ConfigResource(ConfigResource.Type.BROKER, "1"),
+ toMap(entry(ServerLogConfigs.CORDONED_LOG_DIRS_CONFIG,
entry(SET, "*"))),
+ true, false);
+ assertEquals(enabled ? INVALID_CORDONED_LOG_DIRS_ERROR :
DISABLED_CORDONED_LOG_DIRS_ERROR, result.response());
}
- private FeatureControlManager createFeatureControlManager() {
+ private FeatureControlManager createFeatureControlManager(short level) {
FeatureControlManager featureControlManager = new
FeatureControlManager.Builder().build();
featureControlManager.replay(new FeatureLevelRecord().
- setName(MetadataVersion.FEATURE_NAME).
- setFeatureLevel(MetadataVersion.LATEST_PRODUCTION.featureLevel()));
+ setName(MetadataVersion.FEATURE_NAME).
+ setFeatureLevel(level));
return featureControlManager;
}
+ private FeatureControlManager createFeatureControlManager() {
+ return
createFeatureControlManager(MetadataVersion.LATEST_PRODUCTION.featureLevel());
+ }
+
@Test
public void testValidateAlterConfigWithInvalidExistingConfigs() {
Set<String> validConfigs = Set.of("abc", "def");
@@ -613,6 +631,7 @@ public class ConfigurationControlManagerTest {
ControllerResult<ApiError> result = manager.incrementalAlterConfig(
MYTOPIC,
toMap(entry("def", entry(SET, "newValue"))),
+ false,
false);
assertEquals(ApiError.NONE, result.response());
@@ -646,4 +665,69 @@ public class ConfigurationControlManagerTest {
assertTrue(configs.containsKey("def"));
assertFalse(configs.containsKey("invalid.config"), "Invalid config
should not be in configData");
}
+
+ @ParameterizedTest
+ @MethodSource("arguments")
+ public void testIsCordonedLogDirsDisabled(boolean expected, short level) {
+ ConfigurationControlManager manager = new
ConfigurationControlManager.Builder().
+ setKafkaConfigSchema(SCHEMA).
+ setFeatureControl(createFeatureControlManager(level)).
+ build();
+
+ ConfigRecord cordonedConfig = new ConfigRecord().
+ setResourceType(BROKER.id()).setResourceName("0").
+ setName(CORDONED_LOG_DIRS_CONFIG);
+ ConfigRecord otherConfigConfig = new ConfigRecord().
+ setResourceType(BROKER.id()).setResourceName("0").
+ setName(LOG_DIRS_CONFIG);
+
+ assertEquals(expected,
manager.isCordonedLogDirsDisabled(cordonedConfig));
+ assertFalse(manager.isCordonedLogDirsDisabled(otherConfigConfig));
+ }
+
+ public static Stream<Arguments> arguments() {
+ return Stream.of(
+ Arguments.of(false,
MetadataVersion.latestProduction().featureLevel()),
+ Arguments.of(true, MetadataVersion.IBP_4_2_IV1.featureLevel())
+ );
+ }
+
+ @Test
+ public void testIsCordonedLogDirsInvalid() {
+ ConfigurationControlManager manager = new
ConfigurationControlManager.Builder().
+ setKafkaConfigSchema(SCHEMA).
+ setFeatureControl(createFeatureControlManager()).
+ build();
+
+ ConfigRecord cr = new ConfigRecord().
+ setResourceType(BROKER.id()).setResourceName("0").
+ setName(CORDONED_LOG_DIRS_CONFIG);
+
+ // If the new value is null or empty string, the update is always
allowed
+ for (String value : Arrays.asList("", " ", null)) {
+ cr.setValue(value);
+ assertFalse(manager.isCordonedLogDirsInvalid(cr, null, false));
+ assertFalse(manager.isCordonedLogDirsInvalid(cr, null, true));
+ assertFalse(manager.isCordonedLogDirsInvalid(cr, "some/value",
false));
+ assertFalse(manager.isCordonedLogDirsInvalid(cr, "some/value",
true));
+ }
+
+ // If the new value is equal or a subset of the current value, the
update is always allowed
+ cr.setValue("dir1");
+ assertFalse(manager.isCordonedLogDirsInvalid(cr, "dir1", false));
+ assertFalse(manager.isCordonedLogDirsInvalid(cr, "dir1", true));
+ for (String value : Arrays.asList("dir1", "dir2", "dir1,dir2",
"dir2,dir1")) {
+ cr.setValue(value);
+ assertFalse(manager.isCordonedLogDirsInvalid(cr, "dir1,dir2",
false));
+ assertFalse(manager.isCordonedLogDirsInvalid(cr, "dir1,dir2",
true));
+ }
+
+ // If the new value is different, the update is only allowed if the
request is forwarded
+ assertTrue(manager.isCordonedLogDirsInvalid(cr, "dir2", false));
+ assertFalse(manager.isCordonedLogDirsInvalid(cr, "dir2", true));
+ assertTrue(manager.isCordonedLogDirsInvalid(cr, "", false));
+ assertFalse(manager.isCordonedLogDirsInvalid(cr, "", true));
+ assertTrue(manager.isCordonedLogDirsInvalid(cr, null, false));
+ assertFalse(manager.isCordonedLogDirsInvalid(cr, null, true));
+ }
}
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerIntegrationTestUtils.java
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerIntegrationTestUtils.java
index 0c094f19ca1..06cdf288ca7 100644
---
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerIntegrationTestUtils.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerIntegrationTestUtils.java
@@ -201,7 +201,7 @@ public class QuorumControllerIntegrationTestUtils {
setReplicationFactor((short) replicationFactor));
}
CreateTopicsResponseData response =
- controller.createTopics(ANONYMOUS_CONTEXT, request,
describable).get();
+ controller.createTopics(ANONYMOUS_CONTEXT, request, describable,
false).get();
for (int i = 0; i < numTopics; i++) {
CreatableTopicResult result = response.topics().find(prefix + i);
if (result.errorCode() != Errors.TOPIC_ALREADY_EXISTS.code()) {
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
index 6a8618708d8..e428f2b9b90 100644
---
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
@@ -200,14 +200,14 @@ public class QuorumControllerTest {
private void testConfigurationOperations(QuorumController controller)
throws Throwable {
assertEquals(Map.of(BROKER0, ApiError.NONE),
controller.incrementalAlterConfigs(ANONYMOUS_CONTEXT, Map.of(
- BROKER0, Map.of("baz", entry(SET, "123"))), true).get());
+ BROKER0, Map.of("baz", entry(SET, "123"))), true,
false).get());
assertEquals(Map.of(BROKER0,
new ResultOrError<>(Map.of())),
controller.describeConfigs(ANONYMOUS_CONTEXT, Map.of(
BROKER0, List.of())).get());
assertEquals(Map.of(BROKER0, ApiError.NONE),
controller.incrementalAlterConfigs(ANONYMOUS_CONTEXT, Map.of(
- BROKER0, Map.of("baz", entry(SET, "123"))), false).get());
+ BROKER0, Map.of("baz", entry(SET, "123"))), false,
false).get());
assertEquals(Map.of(BROKER0, new ResultOrError<>(Map.of("baz",
"123"))),
controller.describeConfigs(ANONYMOUS_CONTEXT, Map.of(
BROKER0, List.of())).get());
@@ -245,7 +245,7 @@ public class QuorumControllerTest {
clientEnv.raftClients().forEach(m -> m.setMaxReadOffset(1L));
CompletableFuture<Map<ConfigResource, ApiError>> future1 =
controller.incrementalAlterConfigs(ANONYMOUS_CONTEXT, Map.of(
- BROKER0, Map.of("baz", entry(SET, "123"))), false);
+ BROKER0, Map.of("baz", entry(SET, "123"))), false, false);
assertFalse(future1.isDone());
assertEquals(Map.of(BROKER0,
new ResultOrError<>(Map.of())),
@@ -302,7 +302,7 @@ public class QuorumControllerTest {
setReplicationFactor(replicationFactor)).iterator()));
CreateTopicsResponseData createTopicsResponseData =
active.createTopics(
ANONYMOUS_CONTEXT, createTopicsRequestData,
- Set.of("foo")).get();
+ Set.of("foo"), false).get();
assertEquals(Errors.NONE,
Errors.forCode(createTopicsResponseData.topics().find("foo").errorCode()));
Uuid topicIdFoo =
createTopicsResponseData.topics().find("foo").topicId();
@@ -423,7 +423,7 @@ public class QuorumControllerTest {
setReplicationFactor(replicationFactor)).iterator()));
CreateTopicsResponseData createTopicsResponseData =
active.createTopics(
ANONYMOUS_CONTEXT, createTopicsRequestData,
- Set.of("foo")).get();
+ Set.of("foo"), false).get();
assertEquals(Errors.NONE,
Errors.forCode(createTopicsResponseData.topics().find("foo").errorCode()));
Uuid topicIdFoo =
createTopicsResponseData.topics().find("foo").topicId();
ConfigRecord configRecord = new ConfigRecord()
@@ -562,7 +562,7 @@ public class QuorumControllerTest {
setReplicationFactor(replicationFactor)).iterator()));
CreateTopicsResponseData createTopicsResponseData =
active.createTopics(
ANONYMOUS_CONTEXT, createTopicsRequestData,
- Set.of("foo")).get();
+ Set.of("foo"), false).get();
assertEquals(Errors.NONE,
Errors.forCode(createTopicsResponseData.topics().find("foo").errorCode()));
Uuid topicIdFoo =
createTopicsResponseData.topics().find("foo").topicId();
@@ -659,7 +659,7 @@ public class QuorumControllerTest {
).iterator()));
CreateTopicsResponseData createTopicsResponseData =
active.createTopics(
ANONYMOUS_CONTEXT, createTopicsRequestData,
- Set.of("foo", "bar")).get();
+ Set.of("foo", "bar"), false).get();
assertEquals(Errors.NONE,
Errors.forCode(createTopicsResponseData.topics().find("foo").errorCode()));
assertEquals(Errors.NONE,
Errors.forCode(createTopicsResponseData.topics().find("bar").errorCode()));
Uuid topicIdFoo =
createTopicsResponseData.topics().find("foo").topicId();
@@ -708,7 +708,7 @@ public class QuorumControllerTest {
// First, decrease the min ISR config to 1. This should clear the
ELR fields.
ControllerResult<Map<ConfigResource, ApiError>> result =
active.configurationControl().incrementalAlterConfigs(toMap(
entry(new ConfigResource(TOPIC, "foo"),
toMap(entry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, entry(SET, "1"))))),
- true);
+ true, false);
assertEquals(2, result.records().size(),
result.records().toString());
RecordTestUtils.replayAll(active.configurationControl(),
List.of(result.records().get(0)));
RecordTestUtils.replayAll(active.replicationControl(),
List.of(result.records().get(1)));
@@ -724,7 +724,7 @@ public class QuorumControllerTest {
result =
active.configurationControl().incrementalAlterConfigs(toMap(
entry(new ConfigResource(BROKER, ""),
toMap(entry(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, entry(SET, "1"))))),
- true);
+ true, false);
assertEquals(2, result.records().size(),
result.records().toString());
RecordTestUtils.replayAll(active.configurationControl(),
List.of(result.records().get(0)));
RecordTestUtils.replayAll(active.replicationControl(),
List.of(result.records().get(1)));
@@ -783,7 +783,7 @@ public class QuorumControllerTest {
new
CreatableTopic().setName("foo").setNumPartitions(numberOfPartitions).
setReplicationFactor(replicationFactor)).iterator()));
CreateTopicsResponseData createTopicsResponseData =
active.createTopics(
- ANONYMOUS_CONTEXT, createTopicsRequestData,
Set.of("foo")).get();
+ ANONYMOUS_CONTEXT, createTopicsRequestData, Set.of("foo"),
false).get();
assertEquals(Errors.NONE,
Errors.forCode(createTopicsResponseData.topics().find("foo").errorCode()));
Uuid topicIdFoo =
createTopicsResponseData.topics().find("foo").topicId();
@@ -1009,18 +1009,18 @@ public class QuorumControllerTest {
setReplicationFactor((short) 1)).iterator()));
assertEquals(Errors.INVALID_REPLICATION_FACTOR.code(),
active.createTopics(
ANONYMOUS_CONTEXT,
- createTopicsRequestData, Set.of("foo")).get().
+ createTopicsRequestData, Set.of("foo"), false).get().
topics().find("foo").errorCode());
assertEquals("Unable to replicate the partition 1 time(s): All
brokers " +
"are currently fenced, or have all their log directories
cordoned.", active.createTopics(ANONYMOUS_CONTEXT,
- createTopicsRequestData, Set.of("foo")).
+ createTopicsRequestData, Set.of("foo"), false).
get().topics().find("foo").errorMessage());
assertEquals(new BrokerHeartbeatReply(true, false, false, false),
active.processBrokerHeartbeat(ANONYMOUS_CONTEXT, new
BrokerHeartbeatRequestData().
setWantFence(false).setBrokerEpoch(6L).setBrokerId(0).
setCurrentMetadataOffset(100000L)).get());
assertEquals(Errors.NONE.code(),
active.createTopics(ANONYMOUS_CONTEXT,
- createTopicsRequestData, Set.of("foo")).
+ createTopicsRequestData, Set.of("foo"), false).
get().topics().find("foo").errorCode());
CompletableFuture<TopicIdPartition> topicPartitionFuture =
active.appendReadEvent(
"debugGetPartition", OptionalLong.empty(), () -> {
@@ -1125,7 +1125,7 @@ public class QuorumControllerTest {
setPartitionIndex(1).
setBrokerIds(List.of(1, 2, 0))).
iterator()))).iterator())),
- Set.of("foo")).get();
+ Set.of("foo"), false).get();
fooId = fooData.topics().find("foo").topicId();
active.allocateProducerIds(ANONYMOUS_CONTEXT,
new
AllocateProducerIdsRequestData().setBrokerId(0).setBrokerEpoch(brokerEpochs.get(0))).get();
@@ -1281,7 +1281,7 @@ public class QuorumControllerTest {
controller.createTopics(context0, new
CreateTopicsRequestData().setTimeoutMs(0).
setTopics(new CreatableTopicCollection(Set.of(
new CreatableTopic().setName("foo")).iterator())),
- Set.of());
+ Set.of(), false);
CompletableFuture<Map<Uuid, ApiError>> deleteFuture =
controller.deleteTopics(context0, List.of(Uuid.ZERO_UUID));
CompletableFuture<Map<String, ResultOrError<Uuid>>>
findTopicIdsFuture =
@@ -1338,7 +1338,7 @@ public class QuorumControllerTest {
CountDownLatch countDownLatch = pause(controller);
CompletableFuture<CreateTopicsResponseData> createFuture =
controller.createTopics(ANONYMOUS_CONTEXT, new
CreateTopicsRequestData().
- setTimeoutMs(120000), Set.of());
+ setTimeoutMs(120000), Set.of(), false);
CompletableFuture<Map<Uuid, ApiError>> deleteFuture =
controller.deleteTopics(ANONYMOUS_CONTEXT, List.of());
CompletableFuture<Map<String, ResultOrError<Uuid>>>
findTopicIdsFuture =
@@ -1380,7 +1380,7 @@ public class QuorumControllerTest {
new CreatableTopic().setName("foo").
setReplicationFactor((short) 3).
setNumPartitions(1)).iterator())),
- Set.of("foo")).get();
+ Set.of("foo"), false).get();
ConfigResourceExistenceChecker checker =
active.new ConfigResourceExistenceChecker();
// A ConfigResource with type=BROKER and name=(empty string)
represents
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
index 9b97d8eeb54..274b2692136 100644
---
a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
@@ -293,7 +293,7 @@ public class ReplicationControlManagerTest {
request.topics().add(topic);
ControllerRequestContext requestContext =
anonymousContextFor(ApiKeys.CREATE_TOPICS);
ControllerResult<CreateTopicsResponseData> result =
- replicationControl.createTopics(requestContext, request,
Set.of(name));
+ replicationControl.createTopics(requestContext, request,
Set.of(name), false);
CreatableTopicResult topicResult =
result.response().topics().find(name);
assertNotNull(topicResult);
assertEquals(expectedErrorCode, topicResult.errorCode());
@@ -332,7 +332,7 @@ public class ReplicationControlManagerTest {
request.topics().add(topic);
ControllerRequestContext requestContext =
anonymousContextFor(ApiKeys.CREATE_TOPICS);
ControllerResult<CreateTopicsResponseData> result =
- replicationControl.createTopics(requestContext, request,
Set.of(name));
+ replicationControl.createTopics(requestContext, request,
Set.of(name), false);
CreatableTopicResult topicResult =
result.response().topics().find(name);
assertNotNull(topicResult);
assertEquals(expectedErrorCode, topicResult.errorCode());
@@ -598,7 +598,7 @@ public class ReplicationControlManagerTest {
ControllerRequestContext requestContext =
anonymousContextFor(ApiKeys.CREATE_TOPICS);
PolicyViolationException error = assertThrows(
PolicyViolationException.class,
- () -> replicationControl.createTopics(requestContext, request,
Set.of("foo", "bar", "baz")));
+ () -> replicationControl.createTopics(requestContext, request,
Set.of("foo", "bar", "baz"), false));
assertEquals(error.getMessage(), "Excessively large number of
partitions per request.");
}
@@ -631,7 +631,7 @@ public class ReplicationControlManagerTest {
ControllerRequestContext requestContext =
anonymousContextFor(ApiKeys.CREATE_TOPICS);
ControllerResult<CreateTopicsResponseData> result =
- replicationControl.createTopics(requestContext, request,
Set.of("foo"));
+ replicationControl.createTopics(requestContext, request,
Set.of("foo"), false);
CreateTopicsResponseData expectedResponse = new
CreateTopicsResponseData();
expectedResponse.topics().add(new
CreatableTopicResult().setName("foo").
setErrorCode(INVALID_REPLICATION_FACTOR.code()).
@@ -644,7 +644,7 @@ public class ReplicationControlManagerTest {
ctx.inControlledShutdownBrokers(0);
ControllerResult<CreateTopicsResponseData> result2 =
- replicationControl.createTopics(requestContext, request,
Set.of("foo"));
+ replicationControl.createTopics(requestContext, request,
Set.of("foo"), false);
CreateTopicsResponseData expectedResponse2 = new
CreateTopicsResponseData();
expectedResponse2.topics().add(new
CreatableTopicResult().setName("foo").
setErrorCode(INVALID_REPLICATION_FACTOR.code()).
@@ -656,7 +656,7 @@ public class ReplicationControlManagerTest {
ctx.unfenceBrokers(0, 1, 2);
ControllerResult<CreateTopicsResponseData> result3 =
- replicationControl.createTopics(requestContext, request,
Set.of("foo"));
+ replicationControl.createTopics(requestContext, request,
Set.of("foo"), false);
CreateTopicsResponseData expectedResponse3 = new
CreateTopicsResponseData();
expectedResponse3.topics().add(new
CreatableTopicResult().setName("foo").
setNumPartitions(1).setReplicationFactor((short) 3).
@@ -674,7 +674,7 @@ public class ReplicationControlManagerTest {
replicationControl.getPartition(
((TopicRecord) result3.records().get(0).message()).topicId(),
0));
ControllerResult<CreateTopicsResponseData> result4 =
- replicationControl.createTopics(requestContext, request,
Set.of("foo"));
+ replicationControl.createTopics(requestContext, request,
Set.of("foo"), false);
CreateTopicsResponseData expectedResponse4 = new
CreateTopicsResponseData();
expectedResponse4.topics().add(new
CreatableTopicResult().setName("foo").
setErrorCode(Errors.TOPIC_ALREADY_EXISTS.code()).
@@ -694,7 +694,7 @@ public class ReplicationControlManagerTest {
ControllerRequestContext requestContext =
anonymousContextWithMutationQuotaExceededFor(ApiKeys.CREATE_TOPICS);
ControllerResult<CreateTopicsResponseData> result =
- replicationControl.createTopics(requestContext, request,
Set.of("foo"));
+ replicationControl.createTopics(requestContext, request,
Set.of("foo"), false);
CreateTopicsResponseData expectedResponse = new
CreateTopicsResponseData();
expectedResponse.topics().add(new
CreatableTopicResult().setName("foo").
setErrorCode(THROTTLING_QUOTA_EXCEEDED.code()).
@@ -717,7 +717,7 @@ public class ReplicationControlManagerTest {
ControllerRequestContext requestContext =
anonymousContextFor(ApiKeys.CREATE_TOPICS);
ControllerResult<CreateTopicsResponseData> result =
- replicationControl.createTopics(requestContext, request,
Set.of("foo"));
+ replicationControl.createTopics(requestContext, request,
Set.of("foo"), false);
CreateTopicsResponseData expectedResponse = new
CreateTopicsResponseData();
expectedResponse.topics().add(new
CreatableTopicResult().setName("foo").
@@ -770,7 +770,7 @@ public class ReplicationControlManagerTest {
ControllerRequestContext requestContext =
anonymousContextFor(ApiKeys.CREATE_TOPICS);
ControllerResult<CreateTopicsResponseData> result1 =
- replicationControl.createTopics(requestContext, request1,
Set.of("foo"));
+ replicationControl.createTopics(requestContext, request1,
Set.of("foo"), false);
assertEquals((short) 0,
result1.response().topics().find("foo").errorCode());
List<ApiMessageAndVersion> records1 = result1.records();
@@ -803,7 +803,7 @@ public class ReplicationControlManagerTest {
.setConfigs(invalidConfigs));
ControllerResult<CreateTopicsResponseData> result2 =
- replicationControl.createTopics(requestContext, request2,
Set.of("bar"));
+ replicationControl.createTopics(requestContext, request2,
Set.of("bar"), false);
assertEquals(Errors.INVALID_CONFIG.code(),
result2.response().topics().find("bar").errorCode());
assertEquals(
"Null value not supported for topic configs: foo",
@@ -816,7 +816,7 @@ public class ReplicationControlManagerTest {
.setConfigs(validConfigs));
ControllerResult<CreateTopicsResponseData> result3 =
- replicationControl.createTopics(requestContext, request3,
Set.of("baz"));
+ replicationControl.createTopics(requestContext, request3,
Set.of("baz"), false);
assertEquals(INVALID_REPLICATION_FACTOR.code(),
result3.response().topics().find("baz").errorCode());
assertEquals(List.of(), result3.records());
@@ -835,7 +835,7 @@ public class ReplicationControlManagerTest {
request4Topics.add(batchedTopic1);
request4Topics.add(batchedTopic2);
ControllerResult<CreateTopicsResponseData> result4 =
- replicationControl.createTopics(requestContext, request4,
request4Topics);
+ replicationControl.createTopics(requestContext, request4,
request4Topics, false);
assertEquals(Errors.NONE.code(),
result4.response().topics().find(batchedTopic1).errorCode());
assertEquals(INVALID_REPLICATION_FACTOR.code(),
result4.response().topics().find(batchedTopic2).errorCode());
@@ -867,7 +867,7 @@ public class ReplicationControlManagerTest {
anonymousContextWithMutationQuotaExceededFor(ApiKeys.CREATE_TOPICS) :
anonymousContextFor(ApiKeys.CREATE_TOPICS);
ControllerResult<CreateTopicsResponseData> result =
- ctx.replicationControl.createTopics(requestContext, request,
Set.of("foo"));
+ ctx.replicationControl.createTopics(requestContext, request,
Set.of("foo"), false);
assertEquals(0, result.records().size());
CreatableTopicResult topicResult =
result.response().topics().find("foo");
if (mutationQuotaExceeded) {
@@ -887,7 +887,7 @@ public class ReplicationControlManagerTest {
setNumPartitions(1).setReplicationFactor((short) 4));
ControllerRequestContext requestContext =
anonymousContextFor(ApiKeys.CREATE_TOPICS);
ControllerResult<CreateTopicsResponseData> result =
- ctx.replicationControl.createTopics(requestContext, request,
Set.of("foo"));
+ ctx.replicationControl.createTopics(requestContext, request,
Set.of("foo"), false);
assertEquals(0, result.records().size());
CreateTopicsResponseData expectedResponse = new
CreateTopicsResponseData();
expectedResponse.topics().add(new
CreatableTopicResult().setName("foo").
@@ -1493,7 +1493,7 @@ public class ReplicationControlManagerTest {
ctx.unfenceBrokers(0, 1);
ControllerRequestContext createTopicsRequestContext =
anonymousContextFor(ApiKeys.CREATE_TOPICS);
ControllerResult<CreateTopicsResponseData> createResult =
- replicationControl.createTopics(createTopicsRequestContext,
request, Set.of("foo"));
+ replicationControl.createTopics(createTopicsRequestContext,
request, Set.of("foo"), false);
CreateTopicsResponseData expectedResponse = new
CreateTopicsResponseData();
Uuid topicId = createResult.response().topics().find("foo").topicId();
expectedResponse.topics().add(new
CreatableTopicResult().setName("foo").
@@ -1562,7 +1562,7 @@ public class ReplicationControlManagerTest {
ControllerRequestContext createTopicsRequestContext =
anonymousContextFor(ApiKeys.CREATE_TOPICS);
ControllerResult<CreateTopicsResponseData> createResult =
- replicationControl.createTopics(createTopicsRequestContext,
request, Set.of("foo"));
+ replicationControl.createTopics(createTopicsRequestContext,
request, Set.of("foo"), false);
CreatableTopicResult createdTopic =
createResult.response().topics().find("foo");
assertEquals(NONE.code(), createdTopic.errorCode());
ctx.replay(createResult.records());
@@ -1596,7 +1596,7 @@ public class ReplicationControlManagerTest {
ctx.unfenceBrokers(0, 1, 3);
ControllerRequestContext requestContext =
anonymousContextFor(ApiKeys.CREATE_TOPICS);
ControllerResult<CreateTopicsResponseData> createTopicResult =
replicationControl.
- createTopics(requestContext, request, Set.of("foo", "bar", "quux",
"foo2"));
+ createTopics(requestContext, request, Set.of("foo", "bar", "quux",
"foo2"), false);
ctx.replay(createTopicResult.records());
List<CreatePartitionsTopic> topics = new ArrayList<>();
topics.add(new CreatePartitionsTopic().
@@ -1680,7 +1680,7 @@ public class ReplicationControlManagerTest {
ControllerRequestContext createTopicsRequestContext =
anonymousContextFor(ApiKeys.CREATE_TOPICS);
ControllerResult<CreateTopicsResponseData> createResult =
- replicationControl.createTopics(createTopicsRequestContext,
request, Set.of("foo"));
+ replicationControl.createTopics(createTopicsRequestContext,
request, Set.of("foo"), false);
CreatableTopicResult createdTopic =
createResult.response().topics().find("foo");
assertEquals(NONE.code(), createdTopic.errorCode());
ctx.replay(createResult.records());
@@ -1720,7 +1720,7 @@ public class ReplicationControlManagerTest {
ControllerRequestContext requestContext =
anonymousContextFor(ApiKeys.CREATE_TOPICS);
ControllerResult<CreateTopicsResponseData> createTopicResult =
replicationControl.
- createTopics(requestContext, request, Set.of("foo"));
+ createTopics(requestContext, request, Set.of("foo"), false);
ctx.replay(createTopicResult.records());
ctx.registerBrokers(0, 1);
@@ -1757,7 +1757,7 @@ public class ReplicationControlManagerTest {
ControllerRequestContext requestContext =
anonymousContextFor(ApiKeys.CREATE_TOPICS);
ControllerResult<CreateTopicsResponseData> result =
- replicationControl.createTopics(requestContext, request,
Set.of("foo"));
+ replicationControl.createTopics(requestContext, request,
Set.of("foo"), false);
ctx.replay(result.records());
List<CreatePartitionsTopic> topics = List.of(new
CreatePartitionsTopic().
@@ -2890,19 +2890,19 @@ public class ReplicationControlManagerTest {
Map.of(new ConfigResource(ConfigResource.Type.BROKER, ""),
Map.of(TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG,
new
AbstractMap.SimpleImmutableEntry<>(AlterConfigOp.OpType.SET, "true"))),
- true).records());
+ true, false).records());
} else if (uncleanConfig.equals("dynamic_node")) {
ctx.replay(ctx.configurationControl.incrementalAlterConfigs(
Map.of(new ConfigResource(ConfigResource.Type.BROKER, "0"),
Map.of(TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG,
new
AbstractMap.SimpleImmutableEntry<>(AlterConfigOp.OpType.SET, "true"))),
- true).records());
+ true, false).records());
} else if (uncleanConfig.equals("dynamic_topic")) {
ctx.replay(ctx.configurationControl.incrementalAlterConfigs(
Map.of(new ConfigResource(ConfigResource.Type.TOPIC, "foo"),
Map.of(TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG,
new
AbstractMap.SimpleImmutableEntry<>(AlterConfigOp.OpType.SET, "true"))),
- true).records());
+ true, false).records());
}
ControllerResult<Boolean> balanceResult =
replication.maybeElectUncleanLeaders();
assertFalse(balanceResult.response());
@@ -3419,8 +3419,15 @@ public class ReplicationControlManagerTest {
Uuid dir2b1 = Uuid.fromString("yh3acnzGSeurSTj8aIhOjw");
ctx.registerBrokersWithDirs(b1, List.of(dir1b1, dir2b1));
ctx.unfenceBrokers(b1);
- assertEquals(List.of(),
ctx.clusterControl.registration(b1).cordonedDirectories());
+ assertNull(ctx.clusterControl.registration(b1).cordonedDirectories());
+
+ // If cordonedDirs is null, it's a no-op
List<ApiMessageAndVersion> records = new ArrayList<>();
+ ctx.replicationControl.handleDirectoriesCordoned(b1,
defaultBrokerEpoch(b1), null, records);
+ assertTrue(records.isEmpty());
+
+ // Cordon dir1b1, this will emit a BrokerRegistrationChangeRecord
+ records = new ArrayList<>();
ctx.replicationControl.handleDirectoriesCordoned(b1,
defaultBrokerEpoch(b1), List.of(dir1b1), records);
assertEquals(
List.of(new ApiMessageAndVersion(new
BrokerRegistrationChangeRecord()
@@ -3430,6 +3437,35 @@ public class ReplicationControlManagerTest {
);
ctx.replay(records);
assertEquals(List.of(dir1b1),
ctx.clusterControl.registration(b1).cordonedDirectories());
+
+ // Cordon dir1b1 again, this time no records are emitted
+ records = new ArrayList<>();
+ ctx.replicationControl.handleDirectoriesCordoned(b1,
defaultBrokerEpoch(b1), List.of(dir1b1), records);
+ assertTrue(records.isEmpty());
+
+ // Cordon dir2b1, this will emit a BrokerRegistrationChangeRecord
+ records = new ArrayList<>();
+ ctx.replicationControl.handleDirectoriesCordoned(b1,
defaultBrokerEpoch(b1), List.of(dir2b1, dir1b1), records);
+ assertEquals(
+ List.of(new ApiMessageAndVersion(new
BrokerRegistrationChangeRecord()
+ .setBrokerId(b1).setBrokerEpoch(defaultBrokerEpoch(b1))
+ .setCordonedLogDirs(List.of(dir2b1, dir1b1)), (short)
3)),
+ filter(records, BrokerRegistrationChangeRecord.class)
+ );
+ ctx.replay(records);
+ assertEquals(List.of(dir2b1, dir1b1),
ctx.clusterControl.registration(b1).cordonedDirectories());
+
+ // Uncordon all directories, this will emit a
BrokerRegistrationChangeRecord
+ records = new ArrayList<>();
+ ctx.replicationControl.handleDirectoriesCordoned(b1,
defaultBrokerEpoch(b1), List.of(), records);
+ assertEquals(
+ List.of(new ApiMessageAndVersion(new
BrokerRegistrationChangeRecord()
+ .setBrokerId(b1).setBrokerEpoch(defaultBrokerEpoch(b1))
+ .setCordonedLogDirs(List.of()), (short) 3)),
+ filter(records, BrokerRegistrationChangeRecord.class)
+ );
+ ctx.replay(records);
+ assertEquals(List.of(),
ctx.clusterControl.registration(b1).cordonedDirectories());
}
@ParameterizedTest
@@ -3459,13 +3495,13 @@ public class ReplicationControlManagerTest {
ctx.replay(ctx.configurationControl.legacyAlterConfigs(
Map.of(configResource,
Map.of(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "1")),
- false).records());
+ false, false).records());
} else {
ctx.replay(ctx.configurationControl.incrementalAlterConfigs(
Map.of(configResource,
Map.of(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG,
new
AbstractMap.SimpleImmutableEntry<>(AlterConfigOp.OpType.SET, "1"))),
- false).records());
+ false, false).records());
}
assertArrayEquals(new int[]{},
ctx.replicationControl.getPartition(fooId, 0).elr);
if (clusterLevel) {
diff --git
a/metadata/src/test/java/org/apache/kafka/image/node/ClusterImageBrokersNodeTest.java
b/metadata/src/test/java/org/apache/kafka/image/node/ClusterImageBrokersNodeTest.java
index ab342b26087..1fed0e0dbad 100644
---
a/metadata/src/test/java/org/apache/kafka/image/node/ClusterImageBrokersNodeTest.java
+++
b/metadata/src/test/java/org/apache/kafka/image/node/ClusterImageBrokersNodeTest.java
@@ -70,7 +70,7 @@ public class ClusterImageBrokersNodeTest {
"inControlledShutdown=false, " +
"isMigratingZkBroker=false, " +
"directories=[JsnDDNVyTL289kYk6sPzig, anCdBWcFTlu8gE1wP6bh3g], " +
- "cordonedDirectories=[])",
+ "cordonedDirectories=null)",
child.stringify());
}
diff --git
a/metadata/src/test/java/org/apache/kafka/metadata/BrokerRegistrationTest.java
b/metadata/src/test/java/org/apache/kafka/metadata/BrokerRegistrationTest.java
index 577338203ec..c661658c909 100644
---
a/metadata/src/test/java/org/apache/kafka/metadata/BrokerRegistrationTest.java
+++
b/metadata/src/test/java/org/apache/kafka/metadata/BrokerRegistrationTest.java
@@ -40,6 +40,7 @@ import java.util.stream.Stream;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
@Timeout(value = 40)
@@ -87,6 +88,17 @@ public class BrokerRegistrationTest {
setIsMigratingZkBroker(true).
setDirectories(List.of(Uuid.fromString("r4HpEsMuST6nQ4rznIEJVA"))).
setCordonedDirectories(List.of(Uuid.fromString("r4HpEsMuST6nQ4rznIEJVA"))).
+ build(),
+ new BrokerRegistration.Builder().
+ setId(4).
+ setEpoch(0).
+ setIncarnationId(Uuid.fromString("Xkq84F5bTsSEwHqceVxcOQ")).
+ setListeners(List.of(new Endpoint("INTERNAL",
SecurityProtocol.PLAINTEXT, "localhost", 9094))).
+ setSupportedFeatures(Map.of("foo", VersionRange.of((short) 1,
(short) 2))).
+ setRack(Optional.empty()).
+ setFenced(true).
+ setDirectories(List.of(Uuid.fromString("r4HpEsMuST6nQ4rznIEJVA"))).
+ setCordonedDirectories(null).
build());
@Test
@@ -95,6 +107,7 @@ public class BrokerRegistrationTest {
assertEquals(1, REGISTRATIONS.get(1).id());
assertEquals(2, REGISTRATIONS.get(2).id());
assertEquals(3, REGISTRATIONS.get(3).id());
+ assertEquals(4, REGISTRATIONS.get(4).id());
}
@Test
@@ -106,10 +119,13 @@ public class BrokerRegistrationTest {
assertNotEquals(REGISTRATIONS.get(3), REGISTRATIONS.get(0));
assertNotEquals(REGISTRATIONS.get(3), REGISTRATIONS.get(1));
assertNotEquals(REGISTRATIONS.get(3), REGISTRATIONS.get(2));
+ assertNotEquals(REGISTRATIONS.get(4), REGISTRATIONS.get(0));
+ assertNotEquals(REGISTRATIONS.get(4), REGISTRATIONS.get(3));
assertEquals(REGISTRATIONS.get(0), REGISTRATIONS.get(0));
assertEquals(REGISTRATIONS.get(1), REGISTRATIONS.get(1));
assertEquals(REGISTRATIONS.get(2), REGISTRATIONS.get(2));
assertEquals(REGISTRATIONS.get(3), REGISTRATIONS.get(3));
+ assertEquals(REGISTRATIONS.get(4), REGISTRATIONS.get(4));
}
@Test
@@ -119,14 +135,14 @@ public class BrokerRegistrationTest {
"listenerName='INTERNAL', securityProtocol=PLAINTEXT, " +
"host='localhost', port=9091)], supportedFeatures={foo: 1-2}, " +
"rack=Optional.empty, fenced=true, inControlledShutdown=false,
isMigratingZkBroker=false, " +
- "directories=[], cordonedDirectories=[])",
+ "directories=[], cordonedDirectories=null)",
REGISTRATIONS.get(1).toString());
assertEquals("BrokerRegistration(id=2, epoch=0, " +
"incarnationId=eY7oaG1RREie5Kk9uy1l6g, listeners=[Endpoint(" +
"listenerName='INTERNAL', securityProtocol=PLAINTEXT, " +
"host='localhost', port=9092)], supportedFeatures={bar: 1-4, foo:
2-3}, " +
"rack=Optional[myrack], fenced=false, inControlledShutdown=true,
isMigratingZkBroker=false, " +
- "directories=[], cordonedDirectories=[])",
+ "directories=[], cordonedDirectories=null)",
REGISTRATIONS.get(2).toString());
assertEquals("BrokerRegistration(id=3, epoch=0, " +
"incarnationId=1t8VyWx2TCSTpUWuqj-FOw, listeners=[Endpoint(" +
@@ -135,6 +151,13 @@ public class BrokerRegistrationTest {
"rack=Optional.empty, fenced=false, inControlledShutdown=true,
isMigratingZkBroker=true, " +
"directories=[r4HpEsMuST6nQ4rznIEJVA],
cordonedDirectories=[r4HpEsMuST6nQ4rznIEJVA])",
REGISTRATIONS.get(3).toString());
+ assertEquals("BrokerRegistration(id=4, epoch=0, " +
+ "incarnationId=Xkq84F5bTsSEwHqceVxcOQ, listeners=[Endpoint(" +
+ "listenerName='INTERNAL', securityProtocol=PLAINTEXT, " +
+ "host='localhost', port=9094)], supportedFeatures={foo: 1-2}, " +
+ "rack=Optional.empty, fenced=true, inControlledShutdown=false,
isMigratingZkBroker=false, " +
+ "directories=[r4HpEsMuST6nQ4rznIEJVA], cordonedDirectories=null)",
+ REGISTRATIONS.get(4).toString());
}
@Test
@@ -143,6 +166,7 @@ public class BrokerRegistrationTest {
testRoundTrip(REGISTRATIONS.get(1));
testRoundTrip(REGISTRATIONS.get(2));
testRoundTrip(REGISTRATIONS.get(3));
+ testRoundTrip(REGISTRATIONS.get(4));
}
private void testRoundTrip(BrokerRegistration registration) {
@@ -228,36 +252,36 @@ public class BrokerRegistrationTest {
@Test
void testHasUncordonedDirs() {
- BrokerRegistration registration = new BrokerRegistration.Builder().
- setId(0).
- setEpoch(0).
- setIncarnationId(Uuid.fromString("m6CiJvfITZeKVC6UuhlZew")).
- setListeners(List.of(new Endpoint("INTERNAL",
SecurityProtocol.PLAINTEXT, "localhost", 9090))).
- setSupportedFeatures(Map.of("foo", VersionRange.of((short) 1,
(short) 2))).
- setRack(Optional.empty()).
- setFenced(false).
- setInControlledShutdown(false).
- setDirectories(List.of(
- Uuid.fromString("dir1G6EtuR1OTdAzFw1AFQ")
- )).
- build();
- assertTrue(registration.hasUncordonedDirs());
- registration = new BrokerRegistration.Builder().
- setId(0).
- setEpoch(0).
- setIncarnationId(Uuid.fromString("m6CiJvfITZeKVC6UuhlZew")).
- setListeners(List.of(new Endpoint("INTERNAL",
SecurityProtocol.PLAINTEXT, "localhost", 9090))).
- setSupportedFeatures(Map.of("foo", VersionRange.of((short) 1,
(short) 2))).
- setRack(Optional.empty()).
- setFenced(false).
- setInControlledShutdown(false).
- setDirectories(List.of(
- Uuid.fromString("dir1G6EtuR1OTdAzFw1AFQ")
- )).
- setCordonedDirectories(List.of(
- Uuid.fromString("dir1G6EtuR1OTdAzFw1AFQ")
- )).
- build();
- assertFalse(registration.hasUncordonedDirs());
+ assertTrue(REGISTRATIONS.get(0).hasUncordonedDirs());
+ assertFalse(REGISTRATIONS.get(3).hasUncordonedDirs());
+ assertTrue(REGISTRATIONS.get(4).hasUncordonedDirs());
+ }
+
+ @Test
+ void testCordonedDirChanged() {
+ assertTrue(REGISTRATIONS.get(0).cordonedDirChanged(List.of()));
+
assertTrue(REGISTRATIONS.get(0).cordonedDirChanged(List.of(Uuid.fromString("r4HpEsMuST6nQ4rznIEJVA"))));
+
assertFalse(REGISTRATIONS.get(3).cordonedDirChanged(List.of(Uuid.fromString("r4HpEsMuST6nQ4rznIEJVA"))));
+ assertTrue(REGISTRATIONS.get(3).cordonedDirChanged(List.of()));
+ assertTrue(REGISTRATIONS.get(4).cordonedDirChanged(List.of()));
+
assertTrue(REGISTRATIONS.get(4).cordonedDirChanged(List.of(Uuid.fromString("r4HpEsMuST6nQ4rznIEJVA"))));
+ }
+
+ @Test
+ void testCordonedDirectories() {
+ assertNull(REGISTRATIONS.get(0).cordonedDirectories());
+ assertEquals(List.of(Uuid.fromString("r4HpEsMuST6nQ4rznIEJVA")),
REGISTRATIONS.get(3).cordonedDirectories());
+ assertNull(REGISTRATIONS.get(4).cordonedDirectories());
+ }
+
+ @Test
+ void testCordonedLogDirsRoundTrip() {
+ ImageWriterOptions options = new
ImageWriterOptions.Builder(MetadataVersion.IBP_4_2_IV1).build();
+ ApiMessageAndVersion record = REGISTRATIONS.get(4).toRecord(options);
+ assertNull(((RegisterBrokerRecord)
record.message()).cordonedLogDirs());
+
+ options = new
ImageWriterOptions.Builder(MetadataVersion.latestProduction()).build();
+ record = REGISTRATIONS.get(4).toRecord(options);
+ assertEquals(REGISTRATIONS.get(4).cordonedDirectories(),
((RegisterBrokerRecord) record.message()).cordonedLogDirs());
}
}
diff --git
a/server-common/src/main/java/org/apache/kafka/server/common/DirectoryEventHandler.java
b/server-common/src/main/java/org/apache/kafka/server/common/DirectoryEventHandler.java
index 987d27bd5d2..c6e02d0373a 100644
---
a/server-common/src/main/java/org/apache/kafka/server/common/DirectoryEventHandler.java
+++
b/server-common/src/main/java/org/apache/kafka/server/common/DirectoryEventHandler.java
@@ -30,7 +30,6 @@ public interface DirectoryEventHandler {
@Override public void handleAssignment(TopicIdPartition partition,
Uuid directoryId, String reason, Runnable callback) {}
@Override public void handleFailure(Uuid directoryId) {}
@Override public void handleCordoned(Set<Uuid> directoryIds) {}
- @Override public void handleUncordoned(Set<Uuid> directoryIds) {}
};
/**
@@ -49,14 +48,8 @@ public interface DirectoryEventHandler {
void handleFailure(Uuid directoryId);
/**
- * Handle the transition of an online log directory to the cordoned state.
- * @param directoryIds The directory IDs to cordon
+ * Handle the update of the cordoned.log.dirs configuration.
+ * @param directoryIds The directory IDs of the cordoned log dirs
*/
void handleCordoned(Set<Uuid> directoryIds);
-
- /**
- * Handle the transition of a cordoned log directory to the online state.
- * @param directoryIds The directory IDs to uncordon
- */
- void handleUncordoned(Set<Uuid> directoryIds);
}
diff --git
a/server/src/main/java/org/apache/kafka/server/BrokerLifecycleManager.java
b/server/src/main/java/org/apache/kafka/server/BrokerLifecycleManager.java
index 0ce78168704..95c41751d8b 100644
--- a/server/src/main/java/org/apache/kafka/server/BrokerLifecycleManager.java
+++ b/server/src/main/java/org/apache/kafka/server/BrokerLifecycleManager.java
@@ -48,6 +48,7 @@ import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
+import java.util.stream.Collectors;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
@@ -55,16 +56,17 @@ import static java.util.concurrent.TimeUnit.NANOSECONDS;
/**
* The broker lifecycle manager owns the broker state.
*
- * Its inputs are messages passed in from other parts of the broker and from
the
+ * <p>Its inputs are messages passed in from other parts of the broker and
from the
* controller: requests to start up, or shut down, for example. Its output are
the broker
* state and various futures that can be used to wait for broker state
transitions to
* occur.
*
- * The lifecycle manager handles registering the broker with the controller,
as described
+ * <p>The lifecycle manager handles registering the broker with the
controller, as described
* in KIP-631. After registration is complete, it handles sending periodic
broker
- * heartbeats and processing the responses.
+ * heartbeats and processing the responses. Once the broker has caught up with
the cluster metadata, it starts
+ * sending the Uuid of its cordoned log directories in its heartbeats.
*
- * This code uses an event queue paradigm. Modifications get translated into
events, which
+ * <p>This code uses an event queue paradigm. Modifications get translated
into events, which
* are placed on the queue to be processed sequentially. As described in the
JavaDoc for
* each variable, most mutable state can be accessed only from that event
queue thread.
* In some cases we expose a volatile variable which can be read from any
thread, but only
@@ -76,7 +78,7 @@ public class BrokerLifecycleManager {
private final KafkaEventQueue eventQueue;
private final AbstractKafkaConfig config;
private final Time time;
- private final Set<Uuid> logDirs;
+ private final Map<String, Uuid> logDirs;
private final Runnable shutdownHook;
private final Supplier<Boolean> cordonedLogDirsSupported;
@@ -148,10 +150,10 @@ public class BrokerLifecycleManager {
private final Map<Uuid, Boolean> offlineDirs = new HashMap<>();
/**
- * Map of cordoned log directories. The value is true if the directory is
cordoned.
+ * Set of cordoned log directories. The is null at startup until the
broker has caught up with the metadata
* This variable can only be read or written from the event queue thread.
*/
- private final Map<Uuid, Boolean> cordonedLogDirs = new HashMap<>();
+ private Set<Uuid> cordonedLogDirs;
/**
* True if we sent an event queue to the active controller requesting
controlled
@@ -217,7 +219,7 @@ public class BrokerLifecycleManager {
AbstractKafkaConfig config,
Time time,
String threadNamePrefix,
- Set<Uuid> logDirs) {
+ Map<String, Uuid> logDirs) {
this(config, time, threadNamePrefix, logDirs, () -> { }, () -> false);
}
@@ -225,7 +227,7 @@ public class BrokerLifecycleManager {
AbstractKafkaConfig config,
Time time,
String threadNamePrefix,
- Set<Uuid> logDirs,
+ Map<String, Uuid> logDirs,
Runnable shutdownHook,
Supplier<Boolean> cordonedLogDirsSupported) {
this.config = config;
@@ -260,14 +262,8 @@ public class BrokerLifecycleManager {
String clusterId,
ListenerCollection advertisedListeners,
Map<String, VersionRange> supportedFeatures,
- OptionalLong previousBrokerEpoch,
- Set<Uuid> cordonedLogDirs) {
+ OptionalLong previousBrokerEpoch) {
this.previousBrokerEpoch = previousBrokerEpoch;
- if (!cordonedLogDirs.isEmpty()) {
- // At this point we don't have fresh metadata yet so we don't know
if the cordoned log dirs feature is supported.
- // Queue an event, it will be ignored by the controller handling
the broker registration if the feature is disabled.
- eventQueue.append(new CordonedDirEvent(cordonedLogDirs));
- }
eventQueue.append(new StartupEvent(highestMetadataOffsetProvider,
channelManager, clusterId, advertisedListeners,
supportedFeatures));
}
@@ -300,16 +296,6 @@ public class BrokerLifecycleManager {
}
}
- /**
- * Propagate directory uncordoned to the controller.
- * @param directories The IDs for the directories that is uncordoned.
- */
- public void propagateDirectoryUncordoned(Set<Uuid> directories) {
- if (cordonedLogDirsSupported.get()) {
- eventQueue.append(new UncordonedDirEvent(directories));
- }
- }
-
public void resendBrokerRegistration() {
eventQueue.append(new ResendBrokerRegistrationEvent());
}
@@ -442,28 +428,7 @@ public class BrokerLifecycleManager {
@Override
public void run() {
- for (Uuid dir : dirs) {
- cordonedLogDirs.put(dir, true);
- }
- if (registered) {
- scheduleNextCommunicationImmediately();
- }
- }
- }
-
- private class UncordonedDirEvent implements EventQueue.Event {
-
- private final Set<Uuid> dirs;
-
- UncordonedDirEvent(Set<Uuid> dirs) {
- this.dirs = dirs;
- }
-
- @Override
- public void run() {
- for (Uuid dir : dirs) {
- cordonedLogDirs.put(dir, false);
- }
+ cordonedLogDirs = dirs;
if (registered) {
scheduleNextCommunicationImmediately();
}
@@ -515,7 +480,7 @@ public class BrokerLifecycleManager {
.setMinSupportedVersion(range.min())
.setMaxSupportedVersion(range.max()))
);
- List<Uuid> sortedLogDirs = new ArrayList<>(logDirs);
+ List<Uuid> sortedLogDirs = new ArrayList<>(logDirs.values());
sortedLogDirs.sort(Uuid::compareTo);
BrokerRegistrationRequestData data = new
BrokerRegistrationRequestData()
.setBrokerId(nodeId)
@@ -526,8 +491,7 @@ public class BrokerLifecycleManager {
.setListeners(advertisedListeners)
.setRack(rack.orElse(null))
.setPreviousBrokerEpoch(previousBrokerEpoch.orElse(-1L))
- .setLogDirs(sortedLogDirs)
-
.setCordonedLogDirs(cordonedLogDirs.entrySet().stream().filter(Map.Entry::getValue).map(Map.Entry::getKey).toList());
+ .setLogDirs(sortedLogDirs);
if (logger.isDebugEnabled()) {
logger.debug("Sending broker registration {}", data);
}
@@ -605,8 +569,10 @@ public class BrokerLifecycleManager {
.setCurrentMetadataOffset(metadataOffset)
.setWantFence(!readyToUnfence)
.setWantShutDown(state == BrokerState.PENDING_CONTROLLED_SHUTDOWN)
- .setOfflineLogDirs(new ArrayList<>(offlineDirs.keySet()))
-
.setCordonedLogDirs(cordonedLogDirs.entrySet().stream().filter(Map.Entry::getValue).map(Map.Entry::getKey).toList());
+ .setOfflineLogDirs(new ArrayList<>(offlineDirs.keySet()));
+ if (initialCatchUpFuture.isDone() &&
!initialCatchUpFuture.isCompletedExceptionally() &&
cordonedLogDirsSupported.get()) {
+ data.setCordonedLogDirs(List.copyOf(cordonedLogDirs));
+ }
if (logger.isTraceEnabled()) {
logger.trace("Sending broker heartbeat {}", data);
}
@@ -650,6 +616,7 @@ public class BrokerLifecycleManager {
this.currentOfflineDirs = currentOfflineDirs;
}
+ @SuppressWarnings({"CyclomaticComplexity"})
@Override
public void run() {
communicationInFlight = false;
@@ -681,6 +648,12 @@ public class BrokerLifecycleManager {
logger.info("The broker has caught up.
Transitioning from STARTING to RECOVERY.");
state = BrokerState.RECOVERY;
initialCatchUpFuture.complete(null);
+ // Now that the broker has caught up with the
latest metadata, the configuration should
+ // be up to date, so we can retrieve the
cordoned log dirs to include them in the
+ // next heartbeat request
+ cordonedLogDirs =
config.cordonedLogDirs().stream()
+ .flatMap(logDir ->
Optional.ofNullable(logDirs.get(logDir)).stream())
+ .collect(Collectors.toSet());
} else {
logger.debug("The broker is STARTING. Still
waiting to catch up with cluster metadata.");
}
diff --git
a/server/src/test/java/org/apache/kafka/server/CordonedLogDirsIntegrationTest.java
b/server/src/test/java/org/apache/kafka/server/CordonedLogDirsIntegrationTest.java
index def1cbbe191..558c7324d4e 100644
---
a/server/src/test/java/org/apache/kafka/server/CordonedLogDirsIntegrationTest.java
+++
b/server/src/test/java/org/apache/kafka/server/CordonedLogDirsIntegrationTest.java
@@ -35,10 +35,12 @@ import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.InvalidConfigurationException;
import org.apache.kafka.common.errors.InvalidReplicaAssignmentException;
import org.apache.kafka.common.errors.InvalidReplicationFactorException;
+import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.test.ClusterInstance;
import org.apache.kafka.common.test.api.ClusterConfigProperty;
import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.test.api.ClusterTestDefaults;
+import org.apache.kafka.common.test.api.Type;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.test.TestUtils;
@@ -233,6 +235,55 @@ public class CordonedLogDirsIntegrationTest {
}
}
+ @ClusterTest()
+ public void testCordonUnknownLogDirs() {
+ try (Admin admin = clusterInstance.admin()) {
+ Throwable t = assertThrows(ExecutionException.class,
+ () ->
admin.incrementalAlterConfigs(cordonedDirsConfig("/unknown/log/dir",
BROKER_0)).all().get());
+ // ConfigAdminManager.validateBrokerConfigChange throws
InvalidRequestException instead of InvalidConfigurationException
+ assertInstanceOf(InvalidRequestException.class, t.getCause());
+ }
+ }
+
+ @ClusterTest(
+ types = Type.KRAFT,
+ brokers = 2,
+ controllers = 1
+ )
+ public void testUpdateCordonedDirsViaController() throws Exception {
+ // Make sure we don't try to shut down the controller
+ int brokerId = clusterInstance.brokerIds().stream().filter(id ->
!clusterInstance.controllerIds().contains(id)).findFirst().get();
+ ConfigResource cr = new ConfigResource(ConfigResource.Type.BROKER,
String.valueOf(brokerId));
+ List<String> logDirs =
clusterInstance.brokers().get(brokerId).config().logDirs();
+ String logDirsStr = String.join(",", logDirs);
+ try (Admin controllerAdmin = clusterInstance.admin(Map.of(), true);
+ Admin admin = clusterInstance.admin()) {
+ // We can't set cordoned log dirs via the controller
+ Throwable t = assertThrows(ExecutionException.class,
+ () ->
controllerAdmin.incrementalAlterConfigs(cordonedDirsConfig(logDirsStr,
cr)).all().get());
+ assertInstanceOf(InvalidConfigurationException.class,
t.getCause());
+
+ // We can set cordoned log dirs via the broker
+ admin.incrementalAlterConfigs(cordonedDirsConfig(logDirsStr,
cr)).all().get();
+
+ // Shutdown the broker
+ clusterInstance.brokers().get(brokerId).shutdown();
+ clusterInstance.brokers().get(brokerId).awaitShutdown();
+
+ // We can clear a cordoned log dir via the controller
+
controllerAdmin.incrementalAlterConfigs(cordonedDirsConfig(logDirs.get(0),
cr)).all().get();
+ controllerAdmin.incrementalAlterConfigs(cordonedDirsConfig("",
cr)).all().get();
+
+ // Restart the broker
+ clusterInstance.brokers().get(brokerId).startup();
+
+ // We can set cordoned log dirs via the broker
+ admin.incrementalAlterConfigs(cordonedDirsConfig(logDirsStr,
cr)).all().get();
+ // We can keep cordoned log dirs via the controller
+
controllerAdmin.incrementalAlterConfigs(cordonedDirsConfig(logDirsStr,
cr)).all().get();
+ }
+ }
+
@ClusterTest(
brokers = 2,
controllers = 1
@@ -253,7 +304,6 @@ public class CordonedLogDirsIntegrationTest {
Map<Integer, Map<String, LogDirDescription>>
logDescriptionsPerBroker =
admin.describeLogDirs(clusterInstance.brokerIds()).allDescriptions().get();
for (Map.Entry<Integer, Map<String, LogDirDescription>> entry
: logDescriptionsPerBroker.entrySet()) {
for (LogDirDescription logDirDescription :
entry.getValue().values()) {
-
assertFalse(logDirDescription.replicaInfos().isEmpty());
found += logDirDescription.replicaInfos().size();
if (entry.getKey() == brokerId) {
logDirDescription.replicaInfos().forEach((tp,
replicaInfo) ->
diff --git
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogManager.java
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogManager.java
index cb526d7e667..7d4a45dbd8f 100644
---
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogManager.java
+++
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogManager.java
@@ -56,7 +56,6 @@ public class LogManager {
*
* @param replicas The replicas hosting the partition
* @param brokerId The ID of the current broker.
- * @param topicId The ID of the topic
* @param log The log object to check
* @return true if the log should not exist on the broker, false otherwise.
*/
diff --git
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/MockController.java
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/MockController.java
index 371228ca8c9..cb883971b1b 100644
---
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/MockController.java
+++
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/MockController.java
@@ -176,7 +176,8 @@ public class MockController implements Controller {
public synchronized CompletableFuture<CreateTopicsResponseData>
createTopics(
ControllerRequestContext context,
CreateTopicsRequestData request,
- Set<String> describable
+ Set<String> describable,
+ boolean forwarded
) {
CreateTopicsResponseData response = new CreateTopicsResponseData();
for (CreatableTopic topic : request.topics()) {
@@ -355,7 +356,8 @@ public class MockController implements Controller {
public CompletableFuture<Map<ConfigResource, ApiError>>
incrementalAlterConfigs(
ControllerRequestContext context,
Map<ConfigResource, Map<String, Entry<AlterConfigOp.OpType, String>>>
configChanges,
- boolean validateOnly
+ boolean validateOnly,
+ boolean forwarded
) {
Map<ConfigResource, ApiError> results = new HashMap<>();
for (Entry<ConfigResource, Map<String, Entry<AlterConfigOp.OpType,
String>>> entry :
@@ -417,7 +419,8 @@ public class MockController implements Controller {
public CompletableFuture<Map<ConfigResource, ApiError>> legacyAlterConfigs(
ControllerRequestContext context,
Map<ConfigResource, Map<String, String>> newConfigs,
- boolean validateOnly
+ boolean validateOnly,
+ boolean forwarded
) {
Map<ConfigResource, ApiError> results = new HashMap<>();
if (!validateOnly) {