This is an automated email from the ASF dual-hosted git repository.
jsancio pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 78a66fa9db0 KAFKA-19648; Cluster metadata bootstrapping with kraft
checkpoint (#20707)
78a66fa9db0 is described below
commit 78a66fa9db04cafabaf8611167ad25383f2b943d
Author: mannoopj <[email protected]>
AuthorDate: Wed May 27 12:25:54 2026 -0400
KAFKA-19648; Cluster metadata bootstrapping with kraft checkpoint (#20707)
Previously, bootstrap metadata was stored in a separate
bootstrap.checkpoint file, while the zero checkpoint contained only
KRaft control records. This change unifies them by having the Formatter
append bootstrap metadata records into the zero checkpoint alongside the
existing KRaft control records, integrating with KRaft's bootstrapping
checkpoint mechanisms like RaftClient.Listener#handleLoadBootstrap and
KIP-630 snapshot lifecycle management.
QuorumController's handleLoadBootstrap now extracts bootstrap records
from the zero checkpoint and stores them as BootstrapMetadata, which is
later committed by ActivationRecordsGenerator when the controller
activates on an empty metadata log.
The BootstrapDirectory class is removed and its functionality
consolidated into static methods on BootstrapMetadata#fromDirectory
reads from the legacy bootstrap.checkpoint (falling back to defaults),
and fromCheckpointFile reads from a specific checkpoint path.
StorageTool now only writes the bootstrap snapshot when the node has the
Controller role. KafkaClusterTestKit is updated to pass non-feature
versions, non-SCRAM bootstrap records to the Formatter as additional
bootstrap records.
Reviewers: José Armando García Sancio <[email protected]>, Kevin Wu
<[email protected]>
---
build.gradle | 1 +
.../main/scala/kafka/server/KafkaRaftServer.scala | 6 +-
core/src/main/scala/kafka/tools/StorageTool.scala | 1 +
.../unit/kafka/server/KafkaRaftServerTest.scala | 52 ++++++---
.../scala/unit/kafka/tools/StorageToolTest.scala | 26 ++---
.../apache/kafka/controller/QuorumController.java | 29 ++++-
.../metadata/bootstrap/BootstrapDirectory.java | 117 ---------------------
.../metadata/bootstrap/BootstrapMetadata.java | 61 ++++++++++-
.../apache/kafka/metadata/storage/Formatter.java | 40 +++++--
.../kafka/metadata/util/BatchFileReader.java | 16 ++-
.../metadata/bootstrap/BootstrapDirectoryTest.java | 97 -----------------
.../metadata/bootstrap/BootstrapMetadataTest.java | 101 ++++++++++++++++++
.../kafka/metadata/storage/FormatterTest.java | 37 ++++---
.../metadata/bootstrap/BootstrapTestUtils.java | 42 ++++++++
.../kafka/common/test/KafkaClusterTestKit.java | 22 ++++
.../org/apache/kafka/common/test/TestKitNodes.java | 18 +++-
.../test/junit/RaftClusterInvocationContext.java | 7 ++
.../org/apache/kafka/tools/DumpLogSegments.java | 4 +-
18 files changed, 395 insertions(+), 282 deletions(-)
diff --git a/build.gradle b/build.gradle
index f302df7aea4..7bdb74961a5 100644
--- a/build.gradle
+++ b/build.gradle
@@ -1432,6 +1432,7 @@ project(':metadata') {
testImplementation testLog4j2Libs
testFixturesImplementation testFixtures(project(':server-common'))
+ testFixturesImplementation project(':raft')
testFixturesImplementation libs.junitJupiter
testImplementation libs.junitJupiter
diff --git a/core/src/main/scala/kafka/server/KafkaRaftServer.scala
b/core/src/main/scala/kafka/server/KafkaRaftServer.scala
index facd4957133..5d559dd20ea 100644
--- a/core/src/main/scala/kafka/server/KafkaRaftServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaRaftServer.scala
@@ -17,6 +17,7 @@
package kafka.server
import java.io.File
+import java.nio.file.Path
import java.util.concurrent.CompletableFuture
import kafka.utils.{Logging, Mx4jLoader}
import org.apache.kafka.common.config.{ConfigDef, ConfigResource}
@@ -25,7 +26,7 @@ import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.utils.internals.AppInfoParser
import org.apache.kafka.common.{KafkaException, Uuid}
import org.apache.kafka.metadata.KafkaConfigSchema
-import org.apache.kafka.metadata.bootstrap.{BootstrapDirectory,
BootstrapMetadata}
+import org.apache.kafka.metadata.bootstrap.BootstrapMetadata
import
org.apache.kafka.metadata.properties.MetaPropertiesEnsemble.VerificationFlag.{REQUIRE_AT_LEAST_ONE_VALID,
REQUIRE_METADATA_LOG_DIR}
import org.apache.kafka.metadata.properties.{MetaProperties,
MetaPropertiesEnsemble}
import org.apache.kafka.raft.QuorumConfig
@@ -182,8 +183,7 @@ object KafkaRaftServer {
}
// Load the BootstrapMetadata.
- val bootstrapDirectory = new BootstrapDirectory(config.metadataLogDir)
- val bootstrapMetadata = bootstrapDirectory.read()
+ val bootstrapMetadata =
BootstrapMetadata.fromDirectory(Path.of(config.metadataLogDir))
(metaPropsEnsemble, bootstrapMetadata)
}
diff --git a/core/src/main/scala/kafka/tools/StorageTool.scala
b/core/src/main/scala/kafka/tools/StorageTool.scala
index 45642595974..cba3b364706 100644
--- a/core/src/main/scala/kafka/tools/StorageTool.scala
+++ b/core/src/main/scala/kafka/tools/StorageTool.scala
@@ -125,6 +125,7 @@ object StorageTool extends Logging {
val formatter = new Formatter().
setPrintStream(printStream).
setNodeId(config.nodeId).
+
setWriteBootstrapSnapshot(config.processRoles.contains(ProcessRole.ControllerRole)).
setClusterId(namespace.getString("cluster_id")).
setUnstableFeatureVersionsEnabled(config.unstableFeatureVersionsEnabled).
setIgnoreFormatted(namespace.getBoolean("ignore_formatted")).
diff --git a/core/src/test/scala/unit/kafka/server/KafkaRaftServerTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaRaftServerTest.scala
index a2783cb5015..cac002ddfd1 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaRaftServerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaRaftServerTest.scala
@@ -21,12 +21,13 @@ import java.nio.file.Files
import java.util.{Optional, Properties}
import org.apache.kafka.common.{KafkaException, Uuid}
import org.apache.kafka.common.utils.Utils
-import org.apache.kafka.metadata.bootstrap.{BootstrapDirectory,
BootstrapMetadata}
+import org.apache.kafka.metadata.bootstrap.{BootstrapMetadata,
BootstrapTestUtils}
import org.apache.kafka.metadata.properties.{MetaProperties,
MetaPropertiesEnsemble, MetaPropertiesVersion, PropertiesUtils}
-import org.apache.kafka.raft.{KRaftConfigs, MetadataLogConfig, QuorumConfig}
+import org.apache.kafka.metadata.storage.Formatter
import org.apache.kafka.network.SocketServerConfigs
-import org.apache.kafka.server.config.ServerLogConfigs
+import org.apache.kafka.raft.{KRaftConfigs, MetadataLogConfig, QuorumConfig}
import org.apache.kafka.server.common.MetadataVersion
+import org.apache.kafka.server.config.ServerLogConfigs
import org.apache.kafka.storage.internals.log.UnifiedLog
import org.apache.kafka.test.TestUtils
import org.junit.jupiter.api.Assertions._
@@ -109,8 +110,13 @@ class KafkaRaftServerTest {
}
private def writeBootstrapMetadata(logDir: File, metadataVersion:
MetadataVersion): Unit = {
- val bootstrapDirectory = new BootstrapDirectory(logDir.toString)
-
bootstrapDirectory.writeBinaryFile(BootstrapMetadata.fromVersion(metadataVersion,
"test"))
+ Formatter.writeBoostrapSnapshot(
+ logDir.toString,
+ BootstrapMetadata.fromVersion(metadataVersion, "test"),
+ Optional.empty(),
+ 0.toShort,
+ "CONTROLLER"
+ )
}
@Test
@@ -272,20 +278,32 @@ class KafkaRaftServerTest {
setDirectoryId(Uuid.fromString("4jm0e-YRYeB6CCKBvwoS8w")).
build()
- val configProperties = new Properties
- configProperties.put(KRaftConfigs.PROCESS_ROLES_CONFIG,
"broker,controller")
- configProperties.put(KRaftConfigs.NODE_ID_CONFIG, nodeId.toString)
- configProperties.put(SocketServerConfigs.LISTENERS_CONFIG,
"PLAINTEXT://127.0.0.1:9092,SSL://127.0.0.1:9093")
- configProperties.put(QuorumConfig.QUORUM_VOTERS_CONFIG,
s"$nodeId@localhost:9093")
- configProperties.put(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "SSL")
+ val logDir = TestUtils.tempDirectory()
+ try {
+ writeMetaProperties(logDir, metaProperties)
+ writeBootstrapMetadata(logDir, MetadataVersion.IBP_3_3_IV3)
+
+ val configProperties = new Properties
+ configProperties.put(KRaftConfigs.PROCESS_ROLES_CONFIG,
"broker,controller")
+ configProperties.put(KRaftConfigs.NODE_ID_CONFIG, nodeId.toString)
+ configProperties.put(SocketServerConfigs.LISTENERS_CONFIG,
"PLAINTEXT://127.0.0.1:9092,SSL://127.0.0.1:9093")
+ configProperties.put(QuorumConfig.QUORUM_VOTERS_CONFIG,
s"$nodeId@localhost:9093")
+ configProperties.put(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG,
"SSL")
+ configProperties.put(ServerLogConfigs.LOG_DIR_CONFIG,
logDir.getAbsolutePath)
+ val config = KafkaConfig.fromProps(configProperties)
- val (metaPropertiesEnsemble, bootstrapMetadata) =
- invokeLoadMetaProperties(metaProperties, configProperties,
Some(MetadataVersion.IBP_3_3_IV3))
+ val (metaPropertiesEnsemble, _) =
+ KafkaRaftServer.initializeLogDirs(config, MetaPropertiesEnsemble.LOG,
"")
- assertEquals(metaProperties,
metaPropertiesEnsemble.logDirProps().values().iterator().next())
- assertTrue(metaPropertiesEnsemble.errorLogDirs().isEmpty)
- assertTrue(metaPropertiesEnsemble.emptyLogDirs().isEmpty)
- assertEquals(bootstrapMetadata.metadataVersion(),
MetadataVersion.IBP_3_3_IV3)
+ assertEquals(metaProperties,
metaPropertiesEnsemble.logDirProps().values().iterator().next())
+ assertTrue(metaPropertiesEnsemble.errorLogDirs().isEmpty)
+ assertTrue(metaPropertiesEnsemble.emptyLogDirs().isEmpty)
+
+ val bootstrapMetadata =
BootstrapTestUtils.readBootstrapMetadata(logDir.getAbsolutePath)
+ assertEquals(MetadataVersion.IBP_3_3_IV3,
bootstrapMetadata.metadataVersion())
+ } finally {
+ Utils.delete(logDir)
+ }
}
@Test
diff --git a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
index 4ae84c03685..f0b6f8f8e02 100644
--- a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
@@ -27,12 +27,12 @@ import kafka.utils.TestUtils
import net.sourceforge.argparse4j.inf.ArgumentParserException
import org.apache.kafka.common.metadata.UserScramCredentialRecord
import org.apache.kafka.common.utils.Utils
-import org.apache.kafka.server.common.{Feature, MetadataVersion}
-import org.apache.kafka.metadata.bootstrap.BootstrapDirectory
+import org.apache.kafka.metadata.bootstrap.BootstrapTestUtils
import org.apache.kafka.metadata.properties.{MetaPropertiesEnsemble,
PropertiesUtils}
import org.apache.kafka.metadata.storage.FormatterException
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.raft.{KRaftConfigs, MetadataLogConfig, QuorumConfig}
+import org.apache.kafka.server.common.{Feature, MetadataVersion}
import org.apache.kafka.server.config.{ServerConfigs, ServerLogConfigs}
import org.apache.kafka.server.util.TerseFailure
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse,
assertThrows, assertTrue}
@@ -390,15 +390,15 @@ Found problem:
def testFormatWithReleaseVersionAndFeatureOverride(): Unit = {
val availableDirs = Seq(TestUtils.tempDir())
val properties = new Properties()
- properties.putAll(defaultStaticQuorumProperties)
+ properties.putAll(defaultDynamicQuorumProperties)
properties.setProperty("log.dirs", availableDirs.mkString(","))
val stream = new ByteArrayOutputStream()
assertEquals(0, runFormatCommand(stream, properties, Seq(
- "--release-version", "3.7-IV0",
+ "--release-version", "3.7-IV0", "--standalone",
"--feature", "share.version=1")))
// Verify that the feature override is applied by checking the bootstrap
metadata
- val bootstrapMetadata = new
BootstrapDirectory(availableDirs.head.toString).read
+ val bootstrapMetadata =
BootstrapTestUtils.readBootstrapMetadata(availableDirs.head.toString)
// Verify that the share.version feature is set to 1 as specified
assertEquals(1.toShort, bootstrapMetadata.featureLevel("share.version"),
@@ -409,7 +409,7 @@ Found problem:
"Failed to find release version in output: " + stream.toString())
// Verify that the format command completed successfully with features
- assertTrue(stream.toString().contains("Formatting metadata directory"),
+ assertTrue(stream.toString().contains("Formatting dynamic metadata voter
directory"),
"Failed to find formatting message in output: " + stream.toString())
}
@@ -417,17 +417,17 @@ Found problem:
def testFormatWithMultipleFeatures(): Unit = {
val availableDirs = Seq(TestUtils.tempDir())
val properties = new Properties()
- properties.putAll(defaultStaticQuorumProperties)
+ properties.putAll(defaultDynamicQuorumProperties)
properties.setProperty("log.dirs", availableDirs.mkString(","))
val stream = new ByteArrayOutputStream()
assertEquals(0, runFormatCommand(stream, properties, Seq(
- "--release-version", "3.8-IV0",
+ "--release-version", "3.8-IV0", "--standalone",
"--feature", "share.version=1",
"--feature", "transaction.version=2",
"--feature", "group.version=1")))
// Verify that all features are properly bootstrapped by checking the
bootstrap metadata
- val bootstrapMetadata = new
BootstrapDirectory(availableDirs.head.toString).read
+ val bootstrapMetadata =
BootstrapTestUtils.readBootstrapMetadata(availableDirs.head.toString)
// Verify that all specified features are set correctly
assertEquals(1.toShort, bootstrapMetadata.featureLevel("share.version"),
@@ -442,7 +442,7 @@ Found problem:
"Failed to find release version in output: " + stream.toString())
// Verify that the format command completed successfully with multiple
features
- assertTrue(stream.toString().contains("Formatting metadata directory"),
+ assertTrue(stream.toString().contains("Formatting dynamic metadata voter
directory"),
"Failed to find formatting message in output: " + stream.toString())
}
@@ -852,11 +852,11 @@ Found problem:
def testBootstrapScramRecords(): Unit = {
val availableDirs = Seq(TestUtils.tempDir())
val properties = new Properties()
- properties.putAll(defaultStaticQuorumProperties)
+ properties.putAll(defaultDynamicQuorumProperties)
properties.setProperty("log.dirs", availableDirs.mkString(","))
val stream = new ByteArrayOutputStream()
val arguments = ListBuffer[String](
- "--release-version", "3.9-IV0",
+ "--release-version", "3.9-IV0", "--standalone",
"--add-scram", "SCRAM-SHA-512=[name=alice,password=changeit]",
"--add-scram", "SCRAM-SHA-512=[name=bob,password=changeit]"
)
@@ -865,7 +865,7 @@ Found problem:
// Not doing full SCRAM record validation since that's covered elsewhere.
// Just checking that we generate the correct number of records
- val bootstrapMetadata = new
BootstrapDirectory(availableDirs.head.toString).read
+ val bootstrapMetadata =
BootstrapTestUtils.readBootstrapMetadata(availableDirs.head.toString)
val scramRecords = bootstrapMetadata.records().asScala
.filter(apiMessageAndVersion =>
apiMessageAndVersion.message().isInstanceOf[UserScramCredentialRecord])
.map(apiMessageAndVersion =>
apiMessageAndVersion.message().asInstanceOf[UserScramCredentialRecord])
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index 35ad95356c1..5da6eb2e3aa 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -128,6 +128,7 @@ import org.apache.kafka.timeline.SnapshotRegistry;
import org.slf4j.Logger;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.EnumSet;
import java.util.HashMap;
@@ -1075,7 +1076,31 @@ public final class QuorumController implements
Controller {
@Override
public void handleLoadBootstrap(SnapshotReader<ApiMessageAndVersion>
reader) {
- reader.close();
+
appendRaftEvent(String.format("handleLoadBootstrap[snapshotId=%s]",
reader.snapshotId()), () -> {
+ try {
+ String snapshotName =
Snapshots.filenameFromSnapshotId(reader.snapshotId());
+ if (isActiveController()) {
+ throw fatalFaultHandler.handleFault("Asked to load
bootstrap snapshot " + snapshotName +
+ ", but we are the active controller at epoch "
+ curClaimEpoch);
+ }
+ List<ApiMessageAndVersion> records = new ArrayList<>();
+ while (reader.hasNext()) {
+ Batch<ApiMessageAndVersion> batch = reader.next();
+ records.addAll(batch.records());
+ }
+ if (!records.isEmpty()) {
+ log.debug("Loaded {} bootstrap records from {}",
records.size(), snapshotName);
+ bootstrapMetadata =
BootstrapMetadata.fromRecords(records, "bootstrap");
+ }
+ } catch (FaultHandlerException e) {
+ throw e;
+ } catch (Throwable e) {
+ throw fatalFaultHandler.handleFault("Error while loading
bootstrap snapshot " +
+ reader.snapshotId(), e);
+ } finally {
+ reader.close();
+ }
+ });
}
@Override
@@ -1460,7 +1485,7 @@ public final class QuorumController implements Controller
{
/**
* The bootstrap metadata to use for initialization if needed.
*/
- private final BootstrapMetadata bootstrapMetadata;
+ private BootstrapMetadata bootstrapMetadata;
/**
* The maximum number of records per batch to allow.
diff --git
a/metadata/src/main/java/org/apache/kafka/metadata/bootstrap/BootstrapDirectory.java
b/metadata/src/main/java/org/apache/kafka/metadata/bootstrap/BootstrapDirectory.java
deleted file mode 100644
index dbeaeaa6524..00000000000
---
a/metadata/src/main/java/org/apache/kafka/metadata/bootstrap/BootstrapDirectory.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.metadata.bootstrap;
-
-import org.apache.kafka.metadata.util.BatchFileReader;
-import org.apache.kafka.metadata.util.BatchFileReader.BatchAndType;
-import org.apache.kafka.metadata.util.BatchFileWriter;
-import org.apache.kafka.server.common.ApiMessageAndVersion;
-import org.apache.kafka.server.common.MetadataVersion;
-
-import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Objects;
-
-import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;
-import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
-
-/**
- * A read-only class that holds the controller bootstrap metadata. A file
named "bootstrap.checkpoint" is used and the
- * format is the same as a KRaft snapshot.
- */
-public class BootstrapDirectory {
- public static final String BINARY_BOOTSTRAP_FILENAME =
"bootstrap.checkpoint";
-
- private final String directoryPath;
-
- /**
- * Create a new BootstrapDirectory object.
- *
- * @param directoryPath The path to the directory with the bootstrap
file.
- */
- public BootstrapDirectory(
- String directoryPath
- ) {
- this.directoryPath = Objects.requireNonNull(directoryPath);
- }
-
- public BootstrapMetadata read() throws Exception {
- Path path = Paths.get(directoryPath);
- if (!Files.isDirectory(path)) {
- if (Files.exists(path)) {
- throw new RuntimeException("Path " + directoryPath + " exists,
but is not " +
- "a directory.");
- } else {
- throw new RuntimeException("No such directory as " +
directoryPath);
- }
- }
- Path binaryBootstrapPath = Paths.get(directoryPath,
BINARY_BOOTSTRAP_FILENAME);
- if (!Files.exists(binaryBootstrapPath)) {
- return readFromConfiguration();
- } else {
- return readFromBinaryFile(binaryBootstrapPath.toString());
- }
- }
-
- BootstrapMetadata readFromConfiguration() {
- return
BootstrapMetadata.fromVersion(MetadataVersion.latestProduction(), "the default
bootstrap");
- }
-
- BootstrapMetadata readFromBinaryFile(String binaryPath) throws Exception {
- List<ApiMessageAndVersion> records = new ArrayList<>();
- try (BatchFileReader reader = new BatchFileReader.Builder().
- setPath(binaryPath).build()) {
- while (reader.hasNext()) {
- BatchAndType batchAndType = reader.next();
- if (!batchAndType.isControl()) {
- records.addAll(batchAndType.batch().records());
- }
- }
- }
- return
BootstrapMetadata.fromRecords(Collections.unmodifiableList(records),
- "the binary bootstrap metadata file: " + binaryPath);
- }
-
- public void writeBinaryFile(BootstrapMetadata bootstrapMetadata) throws
IOException {
- if (!Files.isDirectory(Paths.get(directoryPath))) {
- throw new RuntimeException("No such directory as " +
directoryPath);
- }
- Path tempPath = Paths.get(directoryPath, BINARY_BOOTSTRAP_FILENAME +
".tmp");
- Files.deleteIfExists(tempPath);
- try {
- try (BatchFileWriter writer = BatchFileWriter.open(tempPath)) {
- for (ApiMessageAndVersion message :
bootstrapMetadata.records()) {
- writer.append(message);
- }
- }
-
- Files.move(
- tempPath,
- Paths.get(directoryPath, BINARY_BOOTSTRAP_FILENAME),
- ATOMIC_MOVE, REPLACE_EXISTING
- );
- } finally {
- Files.deleteIfExists(tempPath);
- }
- }
-}
diff --git
a/metadata/src/main/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadata.java
b/metadata/src/main/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadata.java
index 9438a76693e..82f607f5689 100644
---
a/metadata/src/main/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadata.java
+++
b/metadata/src/main/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadata.java
@@ -19,11 +19,19 @@ package org.apache.kafka.metadata.bootstrap;
import org.apache.kafka.common.metadata.FeatureLevelRecord;
import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.metadata.util.BatchFileReader;
+import org.apache.kafka.metadata.util.BatchFileReader.BatchAndType;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.KRaftVersion;
import org.apache.kafka.server.common.MetadataVersion;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -35,10 +43,61 @@ import java.util.Optional;
* use the version specified here.
*/
public class BootstrapMetadata {
+ public static final String BINARY_BOOTSTRAP_FILENAME =
"bootstrap.checkpoint";
+
private final List<ApiMessageAndVersion> records;
private final short metadataVersionLevel;
private final String source;
+ /**
+ * Reads bootstrap metadata from the given directory. Checks the legacy
bootstrap.checkpoint
+ * first and falls back to defaults if it does not exist.
+ */
+ public static BootstrapMetadata fromDirectory(Path directory) {
+ if (!Files.isDirectory(directory)) {
+ if (Files.exists(directory)) {
+ throw new IllegalStateException("Path " + directory + "
exists, but is not " +
+ "a directory.");
+ } else {
+ throw new IllegalStateException("No such directory as " +
directory);
+ }
+ }
+ Path binaryBootstrapPath =
directory.resolve(BINARY_BOOTSTRAP_FILENAME);
+ if (Files.exists(binaryBootstrapPath)) {
+ return fromCheckpointFile(binaryBootstrapPath);
+ }
+ return fromVersion(MetadataVersion.latestProduction(), "the default
bootstrap");
+ }
+
+ /**
+ * Reads bootstrap metadata from the given checkpoint file.
+ * Throws if the file does not exist.
+ */
+ public static BootstrapMetadata fromCheckpointFile(Path file) {
+ if (!Files.exists(file)) {
+ String path = file.toString();
+ throw new UncheckedIOException(path, new
FileNotFoundException(path));
+ }
+ return readFromBinaryFile(file);
+ }
+
+ private static BootstrapMetadata readFromBinaryFile(Path binaryPath) {
+ List<ApiMessageAndVersion> records = new ArrayList<>();
+ try (BatchFileReader reader = new BatchFileReader.Builder().
+ setPath(binaryPath.toString()).build()) {
+ while (reader.hasNext()) {
+ BatchAndType batchAndType = reader.next();
+ if (!batchAndType.isControl()) {
+ records.addAll(batchAndType.batch().records());
+ }
+ }
+ } catch (IOException e) {
+ throw new UncheckedIOException("Unable to read bootstrap metadata
from " + binaryPath, e);
+ }
+ return fromRecords(Collections.unmodifiableList(records),
+ "the binary bootstrap metadata file: " + binaryPath);
+ }
+
public static BootstrapMetadata fromVersions(
MetadataVersion metadataVersion,
Map<String, Short> featureVersions,
@@ -92,7 +151,7 @@ public class BootstrapMetadata {
return new BootstrapMetadata(records, metadataVersionLevel.get(),
source);
}
- public static Optional<Short> recordToMetadataVersionLevel(ApiMessage
record) {
+ private static Optional<Short> recordToMetadataVersionLevel(ApiMessage
record) {
if (record instanceof FeatureLevelRecord featureLevel) {
if (featureLevel.name().equals(MetadataVersion.FEATURE_NAME)) {
return Optional.of(featureLevel.featureLevel());
diff --git
a/metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java
b/metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java
index 1ccf0c3d5c0..8c163cd4700 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java
@@ -20,14 +20,12 @@ package org.apache.kafka.metadata.storage;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.metadata.MetadataRecordSerde;
-import org.apache.kafka.metadata.bootstrap.BootstrapDirectory;
import org.apache.kafka.metadata.bootstrap.BootstrapMetadata;
import org.apache.kafka.metadata.properties.MetaProperties;
import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble;
import org.apache.kafka.metadata.properties.MetaPropertiesVersion;
import org.apache.kafka.raft.DynamicVoters;
import org.apache.kafka.raft.KafkaRaftClient;
-import org.apache.kafka.raft.VoterSet;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.Feature;
import org.apache.kafka.server.common.FeatureVersion;
@@ -102,6 +100,11 @@ public class Formatter {
*/
private BootstrapMetadata bootstrapMetadata;
+ /**
+ * Additional bootstrap records to include beyond feature levels and SCRAM.
+ */
+ private List<ApiMessageAndVersion> additionalBootstrapRecords = List.of();
+
/**
* True if we should enable unstable feature versions.
*/
@@ -132,6 +135,7 @@ public class Formatter {
*/
private Optional<DynamicVoters> initialControllers = Optional.empty();
private boolean hasDynamicQuorum = false;
+ private boolean writeBootstrapSnapshot = true;
public Formatter setPrintStream(PrintStream printStream) {
this.printStream = printStream;
@@ -182,6 +186,11 @@ public class Formatter {
return this;
}
+ public Formatter setAdditionalBootstrapRecords(List<ApiMessageAndVersion>
additionalBootstrapRecords) {
+ this.additionalBootstrapRecords = additionalBootstrapRecords;
+ return this;
+ }
+
public Formatter setUnstableFeatureVersionsEnabled(boolean
unstableFeatureVersionsEnabled) {
this.unstableFeatureVersionsEnabled = unstableFeatureVersionsEnabled;
return this;
@@ -217,6 +226,11 @@ public class Formatter {
return this;
}
+ public Formatter setWriteBootstrapSnapshot(boolean writeBootstrapSnapshot)
{
+ this.writeBootstrapSnapshot = writeBootstrapSnapshot;
+ return this;
+ }
+
public Optional<DynamicVoters> initialVoters() {
return initialControllers;
}
@@ -378,6 +392,9 @@ public class Formatter {
}
bootstrapRecords.addAll(ScramParser.parse(scramArguments));
}
+ if (!additionalBootstrapRecords.isEmpty()) {
+ bootstrapRecords.addAll(additionalBootstrapRecords);
+ }
return BootstrapMetadata.fromRecords(bootstrapRecords, "format
command");
}
@@ -432,11 +449,10 @@ public class Formatter {
directoryTypes.get(writeLogDir).description(), writeLogDir,
MetadataVersion.FEATURE_NAME, releaseVersion);
Files.createDirectories(Paths.get(writeLogDir));
- BootstrapDirectory bootstrapDirectory = new
BootstrapDirectory(writeLogDir);
- bootstrapDirectory.writeBinaryFile(bootstrapMetadata);
- if
(directoryTypes.get(writeLogDir).isDynamicMetadataDirectory()) {
- writeDynamicQuorumSnapshot(writeLogDir,
- initialControllers.get(),
+ if (writeBootstrapSnapshot) {
+ writeBoostrapSnapshot(writeLogDir,
+ bootstrapMetadata,
+ initialControllers,
featureLevels.get(KRaftVersion.FEATURE_NAME),
controllerListenerName);
}
@@ -487,9 +503,10 @@ public class Formatter {
}
}
- static void writeDynamicQuorumSnapshot(
+ public static void writeBoostrapSnapshot(
String writeLogDir,
- DynamicVoters initialControllers,
+ BootstrapMetadata bootstrapMetadata,
+ Optional<DynamicVoters> initialControllers,
short kraftVersion,
String controllerListenerName
) {
@@ -497,7 +514,6 @@ public class Formatter {
File clusterMetadataDirectory = new File(parentDir,
String.format("%s-%d",
CLUSTER_METADATA_TOPIC_PARTITION.topic(),
CLUSTER_METADATA_TOPIC_PARTITION.partition()));
- VoterSet voterSet =
initialControllers.toVoterSet(controllerListenerName);
RecordsSnapshotWriter.Builder builder = new
RecordsSnapshotWriter.Builder().
setLastContainedLogTimestamp(Time.SYSTEM.milliseconds()).
setMaxBatchSizeBytes(KafkaRaftClient.MAX_BATCH_SIZE_BYTES).
@@ -505,8 +521,10 @@ public class Formatter {
clusterMetadataDirectory.toPath(),
Snapshots.BOOTSTRAP_SNAPSHOT_ID)).
setKraftVersion(KRaftVersion.fromFeatureLevel(kraftVersion)).
- setVoterSet(Optional.of(voterSet));
+ setVoterSet(initialControllers.map(controllers ->
controllers.toVoterSet(controllerListenerName)));
+
try (RecordsSnapshotWriter<ApiMessageAndVersion> writer =
builder.build(MetadataRecordSerde.INSTANCE)) {
+ writer.append(bootstrapMetadata.records());
writer.freeze();
}
}
diff --git
a/metadata/src/main/java/org/apache/kafka/metadata/util/BatchFileReader.java
b/metadata/src/main/java/org/apache/kafka/metadata/util/BatchFileReader.java
index 71957dce6cf..70e5b8b8920 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/util/BatchFileReader.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/util/BatchFileReader.java
@@ -17,9 +17,11 @@
package org.apache.kafka.metadata.util;
+import org.apache.kafka.common.message.KRaftVersionRecord;
import org.apache.kafka.common.message.LeaderChangeMessage;
import org.apache.kafka.common.message.SnapshotFooterRecord;
import org.apache.kafka.common.message.SnapshotHeaderRecord;
+import org.apache.kafka.common.message.VotersRecord;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.record.internal.ControlRecordType;
import
org.apache.kafka.common.record.internal.FileLogInputStream.FileChannelRecordBatch;
@@ -34,6 +36,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
@@ -55,7 +58,7 @@ public final class BatchFileReader implements
Iterator<BatchFileReader.BatchAndT
return this;
}
- public BatchFileReader build() throws Exception {
+ public BatchFileReader build() throws IOException {
if (path == null) {
throw new RuntimeException("You must specify a path.");
}
@@ -119,6 +122,17 @@ public final class BatchFileReader implements
Iterator<BatchFileReader.BatchAndT
messages.add(new ApiMessageAndVersion(message, (short)
0));
break;
}
+ case KRAFT_VERSION: {
+ KRaftVersionRecord message = new KRaftVersionRecord();
+ message.read(new ByteBufferAccessor(record.value()),
(short) 0);
+ messages.add(new ApiMessageAndVersion(message, (short)
0));
+ break;
+ }
+ case KRAFT_VOTERS:
+ VotersRecord message = new VotersRecord();
+ message.read(new ByteBufferAccessor(record.value()),
(short) 0);
+ messages.add(new ApiMessageAndVersion(message, (short)
0));
+ break;
default:
throw new RuntimeException("Unsupported control record
type " + type + " at offset " +
record.offset());
diff --git
a/metadata/src/test/java/org/apache/kafka/metadata/bootstrap/BootstrapDirectoryTest.java
b/metadata/src/test/java/org/apache/kafka/metadata/bootstrap/BootstrapDirectoryTest.java
deleted file mode 100644
index 97240b227bb..00000000000
---
a/metadata/src/test/java/org/apache/kafka/metadata/bootstrap/BootstrapDirectoryTest.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.metadata.bootstrap;
-
-import org.apache.kafka.common.metadata.FeatureLevelRecord;
-import org.apache.kafka.common.metadata.NoOpRecord;
-import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.server.common.ApiMessageAndVersion;
-import org.apache.kafka.server.common.MetadataVersion;
-import org.apache.kafka.test.TestUtils;
-
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.Timeout;
-
-import java.io.File;
-import java.util.List;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-
-
-@Timeout(40)
-public class BootstrapDirectoryTest {
- static final List<ApiMessageAndVersion> SAMPLE_RECORDS1 = List.of(
- new ApiMessageAndVersion(new FeatureLevelRecord().
- setName(MetadataVersion.FEATURE_NAME).
- setFeatureLevel((short) 7), (short) 0),
- new ApiMessageAndVersion(new NoOpRecord(), (short) 0),
- new ApiMessageAndVersion(new NoOpRecord(), (short) 0));
-
- static class BootstrapTestDirectory implements AutoCloseable {
- File directory = null;
-
- synchronized BootstrapTestDirectory createDirectory() {
- directory = TestUtils.tempDirectory("BootstrapTestDirectory");
- return this;
- }
-
- synchronized String path() {
- return directory.getAbsolutePath();
- }
-
- synchronized String binaryBootstrapPath() {
- return new File(directory,
BootstrapDirectory.BINARY_BOOTSTRAP_FILENAME).getAbsolutePath();
- }
-
- @Override
- public synchronized void close() throws Exception {
- if (directory != null) {
- Utils.delete(directory);
- }
- directory = null;
- }
- }
-
- @Test
- public void testReadFromEmptyConfiguration() throws Exception {
- try (BootstrapTestDirectory testDirectory = new
BootstrapTestDirectory().createDirectory()) {
-
assertEquals(BootstrapMetadata.fromVersion(MetadataVersion.latestProduction(),
- "the default bootstrap"),
- new BootstrapDirectory(testDirectory.path()).read());
- }
- }
-
- @Test
- public void testMissingDirectory() {
- assertEquals("No such directory as ./non/existent/directory",
- assertThrows(RuntimeException.class, () ->
- new
BootstrapDirectory("./non/existent/directory").read()).getMessage());
- }
-
- @Test
- public void testReadFromConfigurationFile() throws Exception {
- try (BootstrapTestDirectory testDirectory = new
BootstrapTestDirectory().createDirectory()) {
- BootstrapDirectory directory = new
BootstrapDirectory(testDirectory.path());
- BootstrapMetadata metadata =
BootstrapMetadata.fromRecords(SAMPLE_RECORDS1,
- "the binary bootstrap metadata file: " +
testDirectory.binaryBootstrapPath());
- directory.writeBinaryFile(metadata);
- assertEquals(metadata, directory.read());
- }
- }
-}
diff --git
a/metadata/src/test/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadataTest.java
b/metadata/src/test/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadataTest.java
index 0ad45185eda..23fbada3007 100644
---
a/metadata/src/test/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadataTest.java
+++
b/metadata/src/test/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadataTest.java
@@ -19,14 +19,21 @@ package org.apache.kafka.metadata.bootstrap;
import org.apache.kafka.common.metadata.FeatureLevelRecord;
import org.apache.kafka.common.metadata.NoOpRecord;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.metadata.util.BatchFileWriter;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.common.MetadataVersionTestUtils;
+import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
+import java.io.UncheckedIOException;
+import java.nio.file.Path;
import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
import static org.apache.kafka.server.common.MetadataVersion.FEATURE_NAME;
import static org.apache.kafka.server.common.MetadataVersion.IBP_3_3_IV3;
@@ -45,6 +52,31 @@ public class BootstrapMetadataTest {
setName(FEATURE_NAME).
setFeatureLevel((short) 7), (short) 0));
+ static class BootstrapTestDirectory implements AutoCloseable {
+ private final Path directory;
+
+ static BootstrapTestDirectory createDirectory() {
+ return new BootstrapTestDirectory();
+ }
+
+ private BootstrapTestDirectory() {
+ this.directory = TestUtils.tempDirectory().toPath();
+ }
+
+ Path path() {
+ return directory;
+ }
+
+ Path binaryBootstrapPath() {
+ return
directory.resolve(BootstrapMetadata.BINARY_BOOTSTRAP_FILENAME);
+ }
+
+ @Override
+ public void close() throws Exception {
+ Utils.delete(directory.toFile());
+ }
+ }
+
@Test
public void testFromVersion() {
assertEquals(new BootstrapMetadata(List.of(
@@ -133,4 +165,73 @@ public class BootstrapMetadataTest {
+ " to " + MetadataVersion.latestTesting().featureLevel() + ".",
assertThrows(RuntimeException.class,
bootstrapMetadata::metadataVersion).getMessage());
}
+
+ @Test
+ public void testReadFromEmptyDirectory() throws Exception {
+ try (BootstrapTestDirectory testDirectory =
BootstrapTestDirectory.createDirectory()) {
+
assertEquals(BootstrapMetadata.fromVersion(MetadataVersion.latestProduction(),
+ "the default bootstrap"),
+ BootstrapMetadata.fromDirectory(testDirectory.path()));
+ }
+ }
+
+ @Test
+ public void testMissingDirectory() {
+ assertEquals("No such directory as ./non/existent/directory",
+ assertThrows(IllegalStateException.class, () ->
+
BootstrapMetadata.fromDirectory(Path.of("./non/existent/directory"))).getMessage());
+ }
+
+ @Test
+ public void testFromDirectoryWithLegacyBootstrapCheckpoint() throws
Exception {
+ try (BootstrapTestDirectory testDirectory =
BootstrapTestDirectory.createDirectory()) {
+ Path checkpointPath = testDirectory.binaryBootstrapPath();
+ BootstrapMetadata expected =
BootstrapMetadata.fromVersion(IBP_3_3_IV3, "test");
+ try (BatchFileWriter writer =
BatchFileWriter.open(checkpointPath)) {
+ writer.append(expected.records());
+ }
+ BootstrapMetadata result =
BootstrapMetadata.fromDirectory(testDirectory.path());
+ assertEquals(expected.records(), result.records());
+ }
+ }
+
+ @Test
+ public void testFromCheckpointFileNotFound() {
+ Path nonExistent = Path.of("/tmp/does_not_exist_bootstrap.checkpoint");
+ assertThrows(UncheckedIOException.class,
+ () -> BootstrapMetadata.fromCheckpointFile(nonExistent));
+ }
+
+ @Test
+ public void testFromVersions() {
+ Map<String, Short> features = new TreeMap<>();
+ features.put("foo", (short) 2);
+ features.put("bar", (short) 1);
+ BootstrapMetadata bm =
BootstrapMetadata.fromVersions(MetadataVersion.latestProduction(), features,
"test");
+ assertEquals(List.of(
+ new ApiMessageAndVersion(new FeatureLevelRecord()
+ .setName(FEATURE_NAME)
+
.setFeatureLevel(MetadataVersion.latestProduction().featureLevel()), (short) 0),
+ new ApiMessageAndVersion(new FeatureLevelRecord()
+ .setName("bar")
+ .setFeatureLevel((short) 1), (short) 0),
+ new ApiMessageAndVersion(new FeatureLevelRecord()
+ .setName("foo")
+ .setFeatureLevel((short) 2), (short) 0)),
+ bm.records()
+ );
+ }
+
+ @Test
+ public void testFromVersionsExcludesZeroLevelFeatures() {
+ Map<String, Short> features = Map.of("foo", (short) 0);
+ BootstrapMetadata bm =
BootstrapMetadata.fromVersions(MetadataVersion.latestProduction(), features,
"test");
+ assertEquals(List.of(
+ new ApiMessageAndVersion(new FeatureLevelRecord()
+ .setName(FEATURE_NAME)
+
.setFeatureLevel(MetadataVersion.latestProduction().featureLevel()), (short)
0)),
+ bm.records()
+ );
+ }
+
}
diff --git
a/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java
b/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java
index dcfba4328ae..975c4db0e48 100644
---
a/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java
+++
b/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java
@@ -23,8 +23,8 @@ import
org.apache.kafka.common.metadata.UserScramCredentialRecord;
import org.apache.kafka.common.security.scram.internals.ScramFormatter;
import org.apache.kafka.common.security.scram.internals.ScramMechanism;
import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.metadata.bootstrap.BootstrapDirectory;
import org.apache.kafka.metadata.bootstrap.BootstrapMetadata;
+import org.apache.kafka.metadata.bootstrap.BootstrapTestUtils;
import org.apache.kafka.metadata.properties.MetaProperties;
import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble;
import org.apache.kafka.raft.DynamicVoters;
@@ -60,6 +60,7 @@ import java.util.Optional;
import java.util.OptionalInt;
import java.util.stream.Stream;
+import static
org.apache.kafka.common.internals.Topic.CLUSTER_METADATA_TOPIC_PARTITION;
import static org.apache.kafka.metadata.storage.ScramParserTest.TEST_SALT;
import static
org.apache.kafka.metadata.storage.ScramParserTest.TEST_SALTED_PASSWORD;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
@@ -148,12 +149,24 @@ public class FormatterTest {
assertEquals(OptionalInt.of(DEFAULT_NODE_ID), ensemble.nodeId());
assertEquals(Optional.of(DEFAULT_CLUSTER_ID.toString()),
ensemble.clusterId());
assertEquals(new HashSet<>(testEnv.directories),
ensemble.logDirProps().keySet());
- BootstrapMetadata bootstrapMetadata =
- new BootstrapDirectory(testEnv.directory(0)).read();
+ BootstrapMetadata bootstrapMetadata =
BootstrapTestUtils.readBootstrapMetadata(testEnv.directory(0));
assertEquals(MetadataVersion.latestProduction(),
bootstrapMetadata.metadataVersion());
}
}
+ @Test
+ public void testSkipsBootstrapSnapshotWhenDisabled() throws Exception {
+ try (TestEnv testEnv = new TestEnv(1)) {
+ FormatterContext context = testEnv.newFormatter();
+ context.formatter.setWriteBootstrapSnapshot(false);
+ context.formatter.run();
+ File clusterMetadataDir = new File(testEnv.directory(0),
String.format("%s-%d",
+ CLUSTER_METADATA_TOPIC_PARTITION.topic(),
+ CLUSTER_METADATA_TOPIC_PARTITION.partition()));
+ assertFalse(clusterMetadataDir.exists());
+ }
+ }
+
@Test
public void testFormatterFailsOnAlreadyFormatted() throws Exception {
try (TestEnv testEnv = new TestEnv(1)) {
@@ -170,11 +183,7 @@ public class FormatterTest {
try (TestEnv testEnv = new TestEnv(1)) {
new File(testEnv.directory(0)).setReadOnly();
FormatterContext formatter1 = testEnv.newFormatter();
- String expectedPrefix = "Error while writing meta.properties file";
- assertEquals(expectedPrefix,
- assertThrows(FormatterException.class,
- formatter1.formatter::run).
- getMessage().substring(0, expectedPrefix.length()));
+ assertThrows(Exception.class, formatter1.formatter::run);
}
}
@@ -266,8 +275,7 @@ public class FormatterTest {
"\nFormatting metadata directory " + testEnv.directory(0) +
" with metadata.version " + MetadataVersion.IBP_3_5_IV0 +
".",
formatter1.output().trim());
- BootstrapMetadata bootstrapMetadata =
- new BootstrapDirectory(testEnv.directory(0)).read();
+ BootstrapMetadata bootstrapMetadata =
BootstrapTestUtils.readBootstrapMetadata(testEnv.directory(0));
assertEquals(MetadataVersion.IBP_3_5_IV0,
bootstrapMetadata.metadataVersion());
assertEquals(1, bootstrapMetadata.records().size());
}
@@ -294,8 +302,7 @@ public class FormatterTest {
"\nFormatting metadata directory " + testEnv.directory(0) +
" with metadata.version " +
MetadataVersion.latestTesting() + ".",
formatter1.output().trim());
- BootstrapMetadata bootstrapMetadata =
- new BootstrapDirectory(testEnv.directory(0)).read();
+ BootstrapMetadata bootstrapMetadata =
BootstrapTestUtils.readBootstrapMetadata(testEnv.directory(0));
assertEquals(MetadataVersion.latestTesting(),
bootstrapMetadata.metadataVersion());
}
}
@@ -345,8 +352,7 @@ public class FormatterTest {
"\nFormatting metadata directory " + testEnv.directory(0) +
" with metadata.version " + MetadataVersion.IBP_3_8_IV0 +
".",
formatter1.output().trim());
- BootstrapMetadata bootstrapMetadata =
- new BootstrapDirectory(testEnv.directory(0)).read();
+ BootstrapMetadata bootstrapMetadata =
BootstrapTestUtils.readBootstrapMetadata(testEnv.directory(0));
assertEquals(MetadataVersion.IBP_3_8_IV0,
bootstrapMetadata.metadataVersion());
List<ApiMessageAndVersion> scramRecords =
bootstrapMetadata.records().stream().
filter(r -> r.message() instanceof UserScramCredentialRecord).
@@ -380,8 +386,7 @@ public class FormatterTest {
formatter1.formatter.setSupportedFeatures(Feature.TEST_AND_PRODUCTION_FEATURES);
formatter1.formatter.setFeatureLevel(TestFeatureVersion.FEATURE_NAME, version);
formatter1.formatter.run();
- BootstrapMetadata bootstrapMetadata =
- new BootstrapDirectory(testEnv.directory(0)).read();
+ BootstrapMetadata bootstrapMetadata =
BootstrapTestUtils.readBootstrapMetadata(testEnv.directory(0));
List<ApiMessageAndVersion> expected = new ArrayList<>();
expected.add(new ApiMessageAndVersion(new FeatureLevelRecord().
setName(MetadataVersion.FEATURE_NAME).
diff --git
a/metadata/src/testFixtures/java/org/apache/kafka/metadata/bootstrap/BootstrapTestUtils.java
b/metadata/src/testFixtures/java/org/apache/kafka/metadata/bootstrap/BootstrapTestUtils.java
new file mode 100644
index 00000000000..8ad7e64148d
--- /dev/null
+++
b/metadata/src/testFixtures/java/org/apache/kafka/metadata/bootstrap/BootstrapTestUtils.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata.bootstrap;
+
+import java.nio.file.Path;
+
+import static
org.apache.kafka.common.internals.Topic.CLUSTER_METADATA_TOPIC_PARTITION;
+import static org.apache.kafka.snapshot.Snapshots.BOOTSTRAP_SNAPSHOT_ID;
+import static org.apache.kafka.snapshot.Snapshots.snapshotPath;
+
+/**
+ * Utilities for testing classes that deal with bootstrap metadata.
+ */
+public class BootstrapTestUtils {
+ /**
+ * Reads bootstrap metadata from the cluster metadata bootstrap checkpoint
file of the given metadata directory.
+ *
+ * @param directoryPath the metadata log directory
+ * @return the bootstrap metadata
+ */
+ public static BootstrapMetadata readBootstrapMetadata(String
directoryPath) {
+ Path metadataPartitionDir = Path.of(directoryPath,
+ CLUSTER_METADATA_TOPIC_PARTITION.topic() + "-" +
CLUSTER_METADATA_TOPIC_PARTITION.partition());
+ Path checkpointPath = snapshotPath(metadataPartitionDir,
BOOTSTRAP_SNAPSHOT_ID);
+ return BootstrapMetadata.fromCheckpointFile(checkpointPath);
+ }
+}
diff --git
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java
index fa498e190ea..9fd6404c893 100644
---
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java
+++
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java
@@ -31,6 +31,8 @@ import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
+import org.apache.kafka.common.metadata.FeatureLevelRecord;
+import org.apache.kafka.common.metadata.UserScramCredentialRecord;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.security.auth.SecurityProtocol;
@@ -48,6 +50,7 @@ import org.apache.kafka.raft.KRaftConfigs;
import org.apache.kafka.raft.MetadataLogConfig;
import org.apache.kafka.raft.QuorumConfig;
import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.config.ServerConfigs;
import org.apache.kafka.server.fault.FaultHandler;
import org.apache.kafka.storage.internals.log.CleanerConfig;
@@ -496,6 +499,25 @@ public class KafkaClusterTestKit implements AutoCloseable {
formatter.setIgnoreFormatted(false);
formatter.setControllerListenerName(controllerListenerName);
formatter.setMetadataLogDirectory(ensemble.metadataLogDir().get());
+
+ List<ApiMessageAndVersion> additionalRecords = new ArrayList<>();
+ for (ApiMessageAndVersion record :
nodes.bootstrapMetadata().records()) {
+ if (record.message() instanceof FeatureLevelRecord
featureRecord) {
+ if
(!featureRecord.name().equals(MetadataVersion.FEATURE_NAME)) {
+ formatter.setFeatureLevel(featureRecord.name(),
featureRecord.featureLevel());
+ }
+ } else if (!(record.message() instanceof
UserScramCredentialRecord)) {
+ additionalRecords.add(record);
+ } else {
+ throw new IllegalStateException("UserScramCredentialRecord
is not supported in " +
+ "bootstrap metadata. Use Formatter.setScramArguments()
instead.");
+ }
+ }
+ for (String disabledFeature : nodes.disabledFeatures()) {
+ formatter.setFeatureLevel(disabledFeature, (short) 0);
+ }
+ formatter.setAdditionalBootstrapRecords(additionalRecords);
+
StringBuilder dynamicVotersBuilder = new StringBuilder();
String prefix = "";
if (standalone) {
diff --git
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestKitNodes.java
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestKitNodes.java
index 0de47d9b771..b1c7fc00e88 100644
---
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestKitNodes.java
+++
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestKitNodes.java
@@ -36,6 +36,7 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
+import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.stream.Collectors;
@@ -54,6 +55,7 @@ public class TestKitNodes {
private int numDisksPerBroker = 1;
private Map<Integer, Map<String, String>> perServerProperties =
Map.of();
private BootstrapMetadata bootstrapMetadata;
+ private Set<String> disabledFeatures = Set.of();
public Builder() {
this(BootstrapMetadata.fromVersions(
@@ -93,6 +95,11 @@ public class TestKitNodes {
return this;
}
+ public Builder setDisabledFeatures(Set<String> disabledFeatures) {
+ this.disabledFeatures =
Collections.unmodifiableSet(disabledFeatures);
+ return this;
+ }
+
public Builder setCombined(boolean combined) {
this.combined = combined;
return this;
@@ -215,14 +222,15 @@ public class TestKitNodes {
brokerNodes.put(id, brokerNode);
}
- return new TestKitNodes(baseDirectory.toFile().getAbsolutePath(),
clusterId, bootstrapMetadata, controllerNodes, brokerNodes,
- brokerListenerName, brokerSecurityProtocol,
controllerListenerName, controllerSecurityProtocol);
+ return new TestKitNodes(baseDirectory.toFile().getAbsolutePath(),
clusterId, bootstrapMetadata, disabledFeatures, controllerNodes,
+ brokerNodes, brokerListenerName, brokerSecurityProtocol,
controllerListenerName, controllerSecurityProtocol);
}
}
private final String baseDirectory;
private final String clusterId;
private final BootstrapMetadata bootstrapMetadata;
+ private final Set<String> disabledFeatures;
private final SortedMap<Integer, TestKitNode> controllerNodes;
private final SortedMap<Integer, TestKitNode> brokerNodes;
private final ListenerName brokerListenerName;
@@ -234,6 +242,7 @@ public class TestKitNodes {
String baseDirectory,
String clusterId,
BootstrapMetadata bootstrapMetadata,
+ Set<String> disabledFeatures,
SortedMap<Integer, TestKitNode> controllerNodes,
SortedMap<Integer, TestKitNode> brokerNodes,
ListenerName brokerListenerName,
@@ -244,6 +253,7 @@ public class TestKitNodes {
this.baseDirectory = Objects.requireNonNull(baseDirectory);
this.clusterId = Objects.requireNonNull(clusterId);
this.bootstrapMetadata = Objects.requireNonNull(bootstrapMetadata);
+ this.disabledFeatures = Objects.requireNonNull(disabledFeatures);
this.controllerNodes = Collections.unmodifiableSortedMap(new
TreeMap<>(Objects.requireNonNull(controllerNodes)));
this.brokerNodes = Collections.unmodifiableSortedMap(new
TreeMap<>(Objects.requireNonNull(brokerNodes)));
this.brokerListenerName = Objects.requireNonNull(brokerListenerName);
@@ -272,6 +282,10 @@ public class TestKitNodes {
return bootstrapMetadata;
}
+ public Set<String> disabledFeatures() {
+ return disabledFeatures;
+ }
+
public SortedMap<Integer, TestKitNode> brokerNodes() {
return brokerNodes;
}
diff --git
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/junit/RaftClusterInvocationContext.java
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/junit/RaftClusterInvocationContext.java
index f1ad8080cb4..ba9da8c77bf 100644
---
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/junit/RaftClusterInvocationContext.java
+++
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/junit/RaftClusterInvocationContext.java
@@ -336,8 +336,15 @@ public class RaftClusterInvocationContext implements
TestTemplateInvocationConte
}
}
+ Set<String> disabledFeatures =
newFeatureLevels.entrySet().stream()
+ .filter(featureEntry -> featureEntry.getValue() == 0)
+ .filter(featureEntry ->
!featureEntry.getKey().equals(MetadataVersion.FEATURE_NAME))
+ .map(Map.Entry::getKey)
+ .collect(Collectors.toSet());
+
TestKitNodes nodes = new TestKitNodes.Builder()
.setBootstrapMetadata(BootstrapMetadata.fromVersions(clusterConfig.metadataVersion(),
newFeatureLevels, "testkit"))
+ .setDisabledFeatures(disabledFeatures)
.setCombined(isCombined)
.setNumBrokerNodes(clusterConfig.numBrokers())
.setNumDisksPerBroker(clusterConfig.numDisksPerBroker())
diff --git a/tools/src/main/java/org/apache/kafka/tools/DumpLogSegments.java
b/tools/src/main/java/org/apache/kafka/tools/DumpLogSegments.java
index 0ac0b74b6bc..8e6f3d8d096 100644
--- a/tools/src/main/java/org/apache/kafka/tools/DumpLogSegments.java
+++ b/tools/src/main/java/org/apache/kafka/tools/DumpLogSegments.java
@@ -54,7 +54,7 @@ import
org.apache.kafka.coordinator.group.GroupCoordinatorRecordSerde;
import org.apache.kafka.coordinator.share.ShareCoordinatorRecordSerde;
import
org.apache.kafka.coordinator.transaction.TransactionCoordinatorRecordSerde;
import org.apache.kafka.metadata.MetadataRecordSerde;
-import org.apache.kafka.metadata.bootstrap.BootstrapDirectory;
+import org.apache.kafka.metadata.bootstrap.BootstrapMetadata;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import
org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde;
import org.apache.kafka.server.util.CommandDefaultOptions;
@@ -365,7 +365,7 @@ public class DumpLogSegments {
long startOffset = Long.parseLong(file.getName().split("\\.")[0]);
System.out.println("Log starting offset: " + startOffset);
} else if (file.getName().endsWith(Snapshots.SUFFIX)) {
- if
(file.getName().equals(BootstrapDirectory.BINARY_BOOTSTRAP_FILENAME)) {
+ if
(file.getName().equals(BootstrapMetadata.BINARY_BOOTSTRAP_FILENAME)) {
System.out.println("KRaft bootstrap snapshot");
} else {
Optional<SnapshotPath> pathOpt =
Snapshots.parse(file.toPath());