This is an automated email from the ASF dual-hosted git repository.

jsancio pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 78a66fa9db0 KAFKA-19648; Cluster metadata bootstrapping with kraft 
checkpoint (#20707)
78a66fa9db0 is described below

commit 78a66fa9db04cafabaf8611167ad25383f2b943d
Author: mannoopj <[email protected]>
AuthorDate: Wed May 27 12:25:54 2026 -0400

    KAFKA-19648; Cluster metadata bootstrapping with kraft checkpoint (#20707)
    
    Previously, bootstrap metadata was stored in a separate
    bootstrap.checkpoint file, while the zero checkpoint contained only
    KRaft control records. This change unifies them by having the Formatter
    append bootstrap metadata records into the zero checkpoint alongside the
    existing KRaft control records, integrating with KRaft's bootstrapping
    checkpoint mechanisms like RaftClient.Listener#handleLoadBootstrap and
    KIP-630 snapshot lifecycle  management.
    
    QuorumController's handleLoadBootstrap now extracts bootstrap records
    from the zero checkpoint and stores them as BootstrapMetadata, which is
    later committed by ActivationRecordsGenerator when the controller
    activates on an empty metadata log.
    
    The BootstrapDirectory class is removed and its functionality
    consolidated into static methods on BootstrapMetadata#fromDirectory
    reads from the legacy bootstrap.checkpoint (falling back to defaults),
    and fromCheckpointFile reads from a specific checkpoint path.
    StorageTool now only writes the bootstrap snapshot when the node has the
    Controller role. KafkaClusterTestKit is updated to pass non-feature
    versions, non-SCRAM bootstrap records to the Formatter as additional
    bootstrap records.
    
    Reviewers: José Armando García Sancio <[email protected]>, Kevin Wu
     <[email protected]>
---
 build.gradle                                       |   1 +
 .../main/scala/kafka/server/KafkaRaftServer.scala  |   6 +-
 core/src/main/scala/kafka/tools/StorageTool.scala  |   1 +
 .../unit/kafka/server/KafkaRaftServerTest.scala    |  52 ++++++---
 .../scala/unit/kafka/tools/StorageToolTest.scala   |  26 ++---
 .../apache/kafka/controller/QuorumController.java  |  29 ++++-
 .../metadata/bootstrap/BootstrapDirectory.java     | 117 ---------------------
 .../metadata/bootstrap/BootstrapMetadata.java      |  61 ++++++++++-
 .../apache/kafka/metadata/storage/Formatter.java   |  40 +++++--
 .../kafka/metadata/util/BatchFileReader.java       |  16 ++-
 .../metadata/bootstrap/BootstrapDirectoryTest.java |  97 -----------------
 .../metadata/bootstrap/BootstrapMetadataTest.java  | 101 ++++++++++++++++++
 .../kafka/metadata/storage/FormatterTest.java      |  37 ++++---
 .../metadata/bootstrap/BootstrapTestUtils.java     |  42 ++++++++
 .../kafka/common/test/KafkaClusterTestKit.java     |  22 ++++
 .../org/apache/kafka/common/test/TestKitNodes.java |  18 +++-
 .../test/junit/RaftClusterInvocationContext.java   |   7 ++
 .../org/apache/kafka/tools/DumpLogSegments.java    |   4 +-
 18 files changed, 395 insertions(+), 282 deletions(-)

diff --git a/build.gradle b/build.gradle
index f302df7aea4..7bdb74961a5 100644
--- a/build.gradle
+++ b/build.gradle
@@ -1432,6 +1432,7 @@ project(':metadata') {
 
     testImplementation testLog4j2Libs
     testFixturesImplementation testFixtures(project(':server-common'))
+    testFixturesImplementation project(':raft')
     testFixturesImplementation libs.junitJupiter
 
     testImplementation libs.junitJupiter
diff --git a/core/src/main/scala/kafka/server/KafkaRaftServer.scala 
b/core/src/main/scala/kafka/server/KafkaRaftServer.scala
index facd4957133..5d559dd20ea 100644
--- a/core/src/main/scala/kafka/server/KafkaRaftServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaRaftServer.scala
@@ -17,6 +17,7 @@
 package kafka.server
 
 import java.io.File
+import java.nio.file.Path
 import java.util.concurrent.CompletableFuture
 import kafka.utils.{Logging, Mx4jLoader}
 import org.apache.kafka.common.config.{ConfigDef, ConfigResource}
@@ -25,7 +26,7 @@ import org.apache.kafka.common.utils.{Time, Utils}
 import org.apache.kafka.common.utils.internals.AppInfoParser
 import org.apache.kafka.common.{KafkaException, Uuid}
 import org.apache.kafka.metadata.KafkaConfigSchema
-import org.apache.kafka.metadata.bootstrap.{BootstrapDirectory, 
BootstrapMetadata}
+import org.apache.kafka.metadata.bootstrap.BootstrapMetadata
 import 
org.apache.kafka.metadata.properties.MetaPropertiesEnsemble.VerificationFlag.{REQUIRE_AT_LEAST_ONE_VALID,
 REQUIRE_METADATA_LOG_DIR}
 import org.apache.kafka.metadata.properties.{MetaProperties, 
MetaPropertiesEnsemble}
 import org.apache.kafka.raft.QuorumConfig
@@ -182,8 +183,7 @@ object KafkaRaftServer {
     }
 
     // Load the BootstrapMetadata.
-    val bootstrapDirectory = new BootstrapDirectory(config.metadataLogDir)
-    val bootstrapMetadata = bootstrapDirectory.read()
+    val bootstrapMetadata = 
BootstrapMetadata.fromDirectory(Path.of(config.metadataLogDir))
     (metaPropsEnsemble, bootstrapMetadata)
   }
 
diff --git a/core/src/main/scala/kafka/tools/StorageTool.scala 
b/core/src/main/scala/kafka/tools/StorageTool.scala
index 45642595974..cba3b364706 100644
--- a/core/src/main/scala/kafka/tools/StorageTool.scala
+++ b/core/src/main/scala/kafka/tools/StorageTool.scala
@@ -125,6 +125,7 @@ object StorageTool extends Logging {
     val formatter = new Formatter().
       setPrintStream(printStream).
       setNodeId(config.nodeId).
+      
setWriteBootstrapSnapshot(config.processRoles.contains(ProcessRole.ControllerRole)).
       setClusterId(namespace.getString("cluster_id")).
       setUnstableFeatureVersionsEnabled(config.unstableFeatureVersionsEnabled).
       setIgnoreFormatted(namespace.getBoolean("ignore_formatted")).
diff --git a/core/src/test/scala/unit/kafka/server/KafkaRaftServerTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaRaftServerTest.scala
index a2783cb5015..cac002ddfd1 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaRaftServerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaRaftServerTest.scala
@@ -21,12 +21,13 @@ import java.nio.file.Files
 import java.util.{Optional, Properties}
 import org.apache.kafka.common.{KafkaException, Uuid}
 import org.apache.kafka.common.utils.Utils
-import org.apache.kafka.metadata.bootstrap.{BootstrapDirectory, 
BootstrapMetadata}
+import org.apache.kafka.metadata.bootstrap.{BootstrapMetadata, 
BootstrapTestUtils}
 import org.apache.kafka.metadata.properties.{MetaProperties, 
MetaPropertiesEnsemble, MetaPropertiesVersion, PropertiesUtils}
-import org.apache.kafka.raft.{KRaftConfigs, MetadataLogConfig, QuorumConfig}
+import org.apache.kafka.metadata.storage.Formatter
 import org.apache.kafka.network.SocketServerConfigs
-import org.apache.kafka.server.config.ServerLogConfigs
+import org.apache.kafka.raft.{KRaftConfigs, MetadataLogConfig, QuorumConfig}
 import org.apache.kafka.server.common.MetadataVersion
+import org.apache.kafka.server.config.ServerLogConfigs
 import org.apache.kafka.storage.internals.log.UnifiedLog
 import org.apache.kafka.test.TestUtils
 import org.junit.jupiter.api.Assertions._
@@ -109,8 +110,13 @@ class KafkaRaftServerTest {
   }
 
   private def writeBootstrapMetadata(logDir: File, metadataVersion: 
MetadataVersion): Unit = {
-    val bootstrapDirectory = new BootstrapDirectory(logDir.toString)
-    
bootstrapDirectory.writeBinaryFile(BootstrapMetadata.fromVersion(metadataVersion,
 "test"))
+    Formatter.writeBoostrapSnapshot(
+      logDir.toString,
+      BootstrapMetadata.fromVersion(metadataVersion, "test"),
+      Optional.empty(),
+      0.toShort,
+      "CONTROLLER"
+    )
   }
 
   @Test
@@ -272,20 +278,32 @@ class KafkaRaftServerTest {
       setDirectoryId(Uuid.fromString("4jm0e-YRYeB6CCKBvwoS8w")).
       build()
 
-    val configProperties = new Properties
-    configProperties.put(KRaftConfigs.PROCESS_ROLES_CONFIG, 
"broker,controller")
-    configProperties.put(KRaftConfigs.NODE_ID_CONFIG, nodeId.toString)
-    configProperties.put(SocketServerConfigs.LISTENERS_CONFIG, 
"PLAINTEXT://127.0.0.1:9092,SSL://127.0.0.1:9093")
-    configProperties.put(QuorumConfig.QUORUM_VOTERS_CONFIG, 
s"$nodeId@localhost:9093")
-    configProperties.put(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "SSL")
+    val logDir = TestUtils.tempDirectory()
+    try {
+      writeMetaProperties(logDir, metaProperties)
+      writeBootstrapMetadata(logDir, MetadataVersion.IBP_3_3_IV3)
+
+      val configProperties = new Properties
+      configProperties.put(KRaftConfigs.PROCESS_ROLES_CONFIG, 
"broker,controller")
+      configProperties.put(KRaftConfigs.NODE_ID_CONFIG, nodeId.toString)
+      configProperties.put(SocketServerConfigs.LISTENERS_CONFIG, 
"PLAINTEXT://127.0.0.1:9092,SSL://127.0.0.1:9093")
+      configProperties.put(QuorumConfig.QUORUM_VOTERS_CONFIG, 
s"$nodeId@localhost:9093")
+      configProperties.put(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, 
"SSL")
+      configProperties.put(ServerLogConfigs.LOG_DIR_CONFIG, 
logDir.getAbsolutePath)
+      val config = KafkaConfig.fromProps(configProperties)
 
-    val (metaPropertiesEnsemble, bootstrapMetadata) =
-      invokeLoadMetaProperties(metaProperties, configProperties, 
Some(MetadataVersion.IBP_3_3_IV3))
+      val (metaPropertiesEnsemble, _) =
+        KafkaRaftServer.initializeLogDirs(config, MetaPropertiesEnsemble.LOG, 
"")
 
-    assertEquals(metaProperties, 
metaPropertiesEnsemble.logDirProps().values().iterator().next())
-    assertTrue(metaPropertiesEnsemble.errorLogDirs().isEmpty)
-    assertTrue(metaPropertiesEnsemble.emptyLogDirs().isEmpty)
-    assertEquals(bootstrapMetadata.metadataVersion(), 
MetadataVersion.IBP_3_3_IV3)
+      assertEquals(metaProperties, 
metaPropertiesEnsemble.logDirProps().values().iterator().next())
+      assertTrue(metaPropertiesEnsemble.errorLogDirs().isEmpty)
+      assertTrue(metaPropertiesEnsemble.emptyLogDirs().isEmpty)
+
+      val bootstrapMetadata = 
BootstrapTestUtils.readBootstrapMetadata(logDir.getAbsolutePath)
+      assertEquals(MetadataVersion.IBP_3_3_IV3, 
bootstrapMetadata.metadataVersion())
+    } finally {
+      Utils.delete(logDir)
+    }
   }
 
   @Test
diff --git a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala 
b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
index 4ae84c03685..f0b6f8f8e02 100644
--- a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
@@ -27,12 +27,12 @@ import kafka.utils.TestUtils
 import net.sourceforge.argparse4j.inf.ArgumentParserException
 import org.apache.kafka.common.metadata.UserScramCredentialRecord
 import org.apache.kafka.common.utils.Utils
-import org.apache.kafka.server.common.{Feature, MetadataVersion}
-import org.apache.kafka.metadata.bootstrap.BootstrapDirectory
+import org.apache.kafka.metadata.bootstrap.BootstrapTestUtils
 import org.apache.kafka.metadata.properties.{MetaPropertiesEnsemble, 
PropertiesUtils}
 import org.apache.kafka.metadata.storage.FormatterException
 import org.apache.kafka.network.SocketServerConfigs
 import org.apache.kafka.raft.{KRaftConfigs, MetadataLogConfig, QuorumConfig}
+import org.apache.kafka.server.common.{Feature, MetadataVersion}
 import org.apache.kafka.server.config.{ServerConfigs, ServerLogConfigs}
 import org.apache.kafka.server.util.TerseFailure
 import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, 
assertThrows, assertTrue}
@@ -390,15 +390,15 @@ Found problem:
   def testFormatWithReleaseVersionAndFeatureOverride(): Unit = {
     val availableDirs = Seq(TestUtils.tempDir())
     val properties = new Properties()
-    properties.putAll(defaultStaticQuorumProperties)
+    properties.putAll(defaultDynamicQuorumProperties)
     properties.setProperty("log.dirs", availableDirs.mkString(","))
     val stream = new ByteArrayOutputStream()
     assertEquals(0, runFormatCommand(stream, properties, Seq(
-      "--release-version", "3.7-IV0",
+      "--release-version", "3.7-IV0", "--standalone",
       "--feature", "share.version=1")))
 
     // Verify that the feature override is applied by checking the bootstrap 
metadata
-    val bootstrapMetadata = new 
BootstrapDirectory(availableDirs.head.toString).read
+    val bootstrapMetadata = 
BootstrapTestUtils.readBootstrapMetadata(availableDirs.head.toString)
 
     // Verify that the share.version feature is set to 1 as specified
     assertEquals(1.toShort, bootstrapMetadata.featureLevel("share.version"),
@@ -409,7 +409,7 @@ Found problem:
       "Failed to find release version in output: " + stream.toString())
 
     // Verify that the format command completed successfully with features
-    assertTrue(stream.toString().contains("Formatting metadata directory"),
+    assertTrue(stream.toString().contains("Formatting dynamic metadata voter 
directory"),
       "Failed to find formatting message in output: " + stream.toString())
   }
 
@@ -417,17 +417,17 @@ Found problem:
   def testFormatWithMultipleFeatures(): Unit = {
     val availableDirs = Seq(TestUtils.tempDir())
     val properties = new Properties()
-    properties.putAll(defaultStaticQuorumProperties)
+    properties.putAll(defaultDynamicQuorumProperties)
     properties.setProperty("log.dirs", availableDirs.mkString(","))
     val stream = new ByteArrayOutputStream()
     assertEquals(0, runFormatCommand(stream, properties, Seq(
-      "--release-version", "3.8-IV0",
+      "--release-version", "3.8-IV0", "--standalone",
       "--feature", "share.version=1",
       "--feature", "transaction.version=2",
       "--feature", "group.version=1")))
 
     // Verify that all features are properly bootstrapped by checking the 
bootstrap metadata
-    val bootstrapMetadata = new 
BootstrapDirectory(availableDirs.head.toString).read
+    val bootstrapMetadata = 
BootstrapTestUtils.readBootstrapMetadata(availableDirs.head.toString)
 
     // Verify that all specified features are set correctly
     assertEquals(1.toShort, bootstrapMetadata.featureLevel("share.version"),
@@ -442,7 +442,7 @@ Found problem:
       "Failed to find release version in output: " + stream.toString())
 
     // Verify that the format command completed successfully with multiple 
features
-    assertTrue(stream.toString().contains("Formatting metadata directory"),
+    assertTrue(stream.toString().contains("Formatting dynamic metadata voter 
directory"),
       "Failed to find formatting message in output: " + stream.toString())
   }
   
@@ -852,11 +852,11 @@ Found problem:
   def testBootstrapScramRecords(): Unit = {
     val availableDirs = Seq(TestUtils.tempDir())
     val properties = new Properties()
-    properties.putAll(defaultStaticQuorumProperties)
+    properties.putAll(defaultDynamicQuorumProperties)
     properties.setProperty("log.dirs", availableDirs.mkString(","))
     val stream = new ByteArrayOutputStream()
     val arguments = ListBuffer[String](
-      "--release-version", "3.9-IV0",
+      "--release-version", "3.9-IV0", "--standalone",
       "--add-scram", "SCRAM-SHA-512=[name=alice,password=changeit]",
       "--add-scram", "SCRAM-SHA-512=[name=bob,password=changeit]"
     )
@@ -865,7 +865,7 @@ Found problem:
 
     // Not doing full SCRAM record validation since that's covered elsewhere.
     // Just checking that we generate the correct number of records
-    val bootstrapMetadata = new 
BootstrapDirectory(availableDirs.head.toString).read
+    val bootstrapMetadata = 
BootstrapTestUtils.readBootstrapMetadata(availableDirs.head.toString)
     val scramRecords = bootstrapMetadata.records().asScala
       .filter(apiMessageAndVersion => 
apiMessageAndVersion.message().isInstanceOf[UserScramCredentialRecord])
       .map(apiMessageAndVersion => 
apiMessageAndVersion.message().asInstanceOf[UserScramCredentialRecord])
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java 
b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index 35ad95356c1..5da6eb2e3aa 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -128,6 +128,7 @@ import org.apache.kafka.timeline.SnapshotRegistry;
 
 import org.slf4j.Logger;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.EnumSet;
 import java.util.HashMap;
@@ -1075,7 +1076,31 @@ public final class QuorumController implements 
Controller {
 
         @Override
         public void handleLoadBootstrap(SnapshotReader<ApiMessageAndVersion> 
reader) {
-            reader.close();
+            
appendRaftEvent(String.format("handleLoadBootstrap[snapshotId=%s]", 
reader.snapshotId()), () -> {
+                try {
+                    String snapshotName = 
Snapshots.filenameFromSnapshotId(reader.snapshotId());
+                    if (isActiveController()) {
+                        throw fatalFaultHandler.handleFault("Asked to load 
bootstrap snapshot " + snapshotName +
+                                ", but we are the active controller at epoch " 
+ curClaimEpoch);
+                    }
+                    List<ApiMessageAndVersion> records = new ArrayList<>();
+                    while (reader.hasNext()) {
+                        Batch<ApiMessageAndVersion> batch = reader.next();
+                        records.addAll(batch.records());
+                    }
+                    if (!records.isEmpty()) {
+                        log.debug("Loaded {} bootstrap records from {}", 
records.size(), snapshotName);
+                        bootstrapMetadata = 
BootstrapMetadata.fromRecords(records, "bootstrap");
+                    }
+                } catch (FaultHandlerException e) {
+                    throw e;
+                } catch (Throwable e) {
+                    throw fatalFaultHandler.handleFault("Error while loading 
bootstrap snapshot " +
+                            reader.snapshotId(), e);
+                } finally {
+                    reader.close();
+                }
+            });
         }
 
         @Override
@@ -1460,7 +1485,7 @@ public final class QuorumController implements Controller 
{
     /**
      * The bootstrap metadata to use for initialization if needed.
      */
-    private final BootstrapMetadata bootstrapMetadata;
+    private BootstrapMetadata bootstrapMetadata;
 
     /**
      * The maximum number of records per batch to allow.
diff --git 
a/metadata/src/main/java/org/apache/kafka/metadata/bootstrap/BootstrapDirectory.java
 
b/metadata/src/main/java/org/apache/kafka/metadata/bootstrap/BootstrapDirectory.java
deleted file mode 100644
index dbeaeaa6524..00000000000
--- 
a/metadata/src/main/java/org/apache/kafka/metadata/bootstrap/BootstrapDirectory.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.metadata.bootstrap;
-
-import org.apache.kafka.metadata.util.BatchFileReader;
-import org.apache.kafka.metadata.util.BatchFileReader.BatchAndType;
-import org.apache.kafka.metadata.util.BatchFileWriter;
-import org.apache.kafka.server.common.ApiMessageAndVersion;
-import org.apache.kafka.server.common.MetadataVersion;
-
-import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Objects;
-
-import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;
-import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
-
-/**
- * A read-only class that holds the controller bootstrap metadata. A file 
named "bootstrap.checkpoint" is used and the
- * format is the same as a KRaft snapshot.
- */
-public class BootstrapDirectory {
-    public static final String BINARY_BOOTSTRAP_FILENAME = 
"bootstrap.checkpoint";
-
-    private final String directoryPath;
-
-    /**
-     * Create a new BootstrapDirectory object.
-     *
-     * @param directoryPath     The path to the directory with the bootstrap 
file.
-     */
-    public BootstrapDirectory(
-        String directoryPath
-    ) {
-        this.directoryPath = Objects.requireNonNull(directoryPath);
-    }
-
-    public BootstrapMetadata read() throws Exception {
-        Path path = Paths.get(directoryPath);
-        if (!Files.isDirectory(path)) {
-            if (Files.exists(path)) {
-                throw new RuntimeException("Path " + directoryPath + " exists, 
but is not " +
-                        "a directory.");
-            } else {
-                throw new RuntimeException("No such directory as " + 
directoryPath);
-            }
-        }
-        Path binaryBootstrapPath = Paths.get(directoryPath, 
BINARY_BOOTSTRAP_FILENAME);
-        if (!Files.exists(binaryBootstrapPath)) {
-            return readFromConfiguration();
-        } else {
-            return readFromBinaryFile(binaryBootstrapPath.toString());
-        }
-    }
-
-    BootstrapMetadata readFromConfiguration() {
-        return 
BootstrapMetadata.fromVersion(MetadataVersion.latestProduction(), "the default 
bootstrap");
-    }
-
-    BootstrapMetadata readFromBinaryFile(String binaryPath) throws Exception {
-        List<ApiMessageAndVersion> records = new ArrayList<>();
-        try (BatchFileReader reader = new BatchFileReader.Builder().
-                setPath(binaryPath).build()) {
-            while (reader.hasNext()) {
-                BatchAndType batchAndType = reader.next();
-                if (!batchAndType.isControl()) {
-                    records.addAll(batchAndType.batch().records());
-                }
-            }
-        }
-        return 
BootstrapMetadata.fromRecords(Collections.unmodifiableList(records),
-                "the binary bootstrap metadata file: " + binaryPath);
-    }
-
-    public void writeBinaryFile(BootstrapMetadata bootstrapMetadata) throws 
IOException {
-        if (!Files.isDirectory(Paths.get(directoryPath))) {
-            throw new RuntimeException("No such directory as " + 
directoryPath);
-        }
-        Path tempPath = Paths.get(directoryPath, BINARY_BOOTSTRAP_FILENAME + 
".tmp");
-        Files.deleteIfExists(tempPath);
-        try {
-            try (BatchFileWriter writer = BatchFileWriter.open(tempPath)) {
-                for (ApiMessageAndVersion message : 
bootstrapMetadata.records()) {
-                    writer.append(message);
-                }
-            }
-
-            Files.move(
-                tempPath,
-                Paths.get(directoryPath, BINARY_BOOTSTRAP_FILENAME),
-                ATOMIC_MOVE, REPLACE_EXISTING
-            );
-        } finally {
-            Files.deleteIfExists(tempPath);
-        }
-    }
-}
diff --git 
a/metadata/src/main/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadata.java
 
b/metadata/src/main/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadata.java
index 9438a76693e..82f607f5689 100644
--- 
a/metadata/src/main/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadata.java
+++ 
b/metadata/src/main/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadata.java
@@ -19,11 +19,19 @@ package org.apache.kafka.metadata.bootstrap;
 
 import org.apache.kafka.common.metadata.FeatureLevelRecord;
 import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.metadata.util.BatchFileReader;
+import org.apache.kafka.metadata.util.BatchFileReader.BatchAndType;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
 import org.apache.kafka.server.common.KRaftVersion;
 import org.apache.kafka.server.common.MetadataVersion;
 
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -35,10 +43,61 @@ import java.util.Optional;
  * use the version specified here.
  */
 public class BootstrapMetadata {
+    public static final String BINARY_BOOTSTRAP_FILENAME = 
"bootstrap.checkpoint";
+
     private final List<ApiMessageAndVersion> records;
     private final short metadataVersionLevel;
     private final String source;
 
+    /**
+     * Reads bootstrap metadata from the given directory. Checks the legacy 
bootstrap.checkpoint
+     * first and falls back to defaults if it does not exist.
+     */
+    public static BootstrapMetadata fromDirectory(Path directory) {
+        if (!Files.isDirectory(directory)) {
+            if (Files.exists(directory)) {
+                throw new IllegalStateException("Path " + directory + " 
exists, but is not " +
+                        "a directory.");
+            } else {
+                throw new IllegalStateException("No such directory as " + 
directory);
+            }
+        }
+        Path binaryBootstrapPath = 
directory.resolve(BINARY_BOOTSTRAP_FILENAME);
+        if (Files.exists(binaryBootstrapPath)) {
+            return fromCheckpointFile(binaryBootstrapPath);
+        }
+        return fromVersion(MetadataVersion.latestProduction(), "the default 
bootstrap");
+    }
+
+    /**
+     * Reads bootstrap metadata from the given checkpoint file.
+     * Throws if the file does not exist.
+     */
+    public static BootstrapMetadata fromCheckpointFile(Path file) {
+        if (!Files.exists(file)) {
+            String path = file.toString();
+            throw new UncheckedIOException(path, new 
FileNotFoundException(path));
+        }
+        return readFromBinaryFile(file);
+    }
+
+    private static BootstrapMetadata readFromBinaryFile(Path binaryPath) {
+        List<ApiMessageAndVersion> records = new ArrayList<>();
+        try (BatchFileReader reader = new BatchFileReader.Builder().
+                setPath(binaryPath.toString()).build()) {
+            while (reader.hasNext()) {
+                BatchAndType batchAndType = reader.next();
+                if (!batchAndType.isControl()) {
+                    records.addAll(batchAndType.batch().records());
+                }
+            }
+        } catch (IOException e) {
+            throw new UncheckedIOException("Unable to read bootstrap metadata 
from " + binaryPath, e);
+        }
+        return fromRecords(Collections.unmodifiableList(records),
+                "the binary bootstrap metadata file: " + binaryPath);
+    }
+
     public static BootstrapMetadata fromVersions(
         MetadataVersion metadataVersion,
         Map<String, Short> featureVersions,
@@ -92,7 +151,7 @@ public class BootstrapMetadata {
         return new BootstrapMetadata(records, metadataVersionLevel.get(), 
source);
     }
 
-    public static Optional<Short> recordToMetadataVersionLevel(ApiMessage 
record) {
+    private static Optional<Short> recordToMetadataVersionLevel(ApiMessage 
record) {
         if (record instanceof FeatureLevelRecord featureLevel) {
             if (featureLevel.name().equals(MetadataVersion.FEATURE_NAME)) {
                 return Optional.of(featureLevel.featureLevel());
diff --git 
a/metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java 
b/metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java
index 1ccf0c3d5c0..8c163cd4700 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java
@@ -20,14 +20,12 @@ package org.apache.kafka.metadata.storage;
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.metadata.MetadataRecordSerde;
-import org.apache.kafka.metadata.bootstrap.BootstrapDirectory;
 import org.apache.kafka.metadata.bootstrap.BootstrapMetadata;
 import org.apache.kafka.metadata.properties.MetaProperties;
 import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble;
 import org.apache.kafka.metadata.properties.MetaPropertiesVersion;
 import org.apache.kafka.raft.DynamicVoters;
 import org.apache.kafka.raft.KafkaRaftClient;
-import org.apache.kafka.raft.VoterSet;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
 import org.apache.kafka.server.common.Feature;
 import org.apache.kafka.server.common.FeatureVersion;
@@ -102,6 +100,11 @@ public class Formatter {
      */
     private BootstrapMetadata bootstrapMetadata;
 
+    /**
+     * Additional bootstrap records to include beyond feature levels and SCRAM.
+     */
+    private List<ApiMessageAndVersion> additionalBootstrapRecords = List.of();
+
     /**
      * True if we should enable unstable feature versions.
      */
@@ -132,6 +135,7 @@ public class Formatter {
      */
     private Optional<DynamicVoters> initialControllers = Optional.empty();
     private boolean hasDynamicQuorum = false;
+    private boolean writeBootstrapSnapshot = true;
 
     public Formatter setPrintStream(PrintStream printStream) {
         this.printStream = printStream;
@@ -182,6 +186,11 @@ public class Formatter {
         return this;
     }
 
+    public Formatter setAdditionalBootstrapRecords(List<ApiMessageAndVersion> 
additionalBootstrapRecords) {
+        this.additionalBootstrapRecords = additionalBootstrapRecords;
+        return this;
+    }
+
     public Formatter setUnstableFeatureVersionsEnabled(boolean 
unstableFeatureVersionsEnabled) {
         this.unstableFeatureVersionsEnabled = unstableFeatureVersionsEnabled;
         return this;
@@ -217,6 +226,11 @@ public class Formatter {
         return this;
     }
 
+    public Formatter setWriteBootstrapSnapshot(boolean writeBootstrapSnapshot) 
{
+        this.writeBootstrapSnapshot = writeBootstrapSnapshot;
+        return this;
+    }
+
     public Optional<DynamicVoters> initialVoters() {
         return initialControllers;
     }
@@ -378,6 +392,9 @@ public class Formatter {
             }
             bootstrapRecords.addAll(ScramParser.parse(scramArguments));
         }
+        if (!additionalBootstrapRecords.isEmpty()) {
+            bootstrapRecords.addAll(additionalBootstrapRecords);
+        }
         return BootstrapMetadata.fromRecords(bootstrapRecords, "format 
command");
     }
 
@@ -432,11 +449,10 @@ public class Formatter {
                     directoryTypes.get(writeLogDir).description(), writeLogDir,
                     MetadataVersion.FEATURE_NAME, releaseVersion);
                 Files.createDirectories(Paths.get(writeLogDir));
-                BootstrapDirectory bootstrapDirectory = new 
BootstrapDirectory(writeLogDir);
-                bootstrapDirectory.writeBinaryFile(bootstrapMetadata);
-                if 
(directoryTypes.get(writeLogDir).isDynamicMetadataDirectory()) {
-                    writeDynamicQuorumSnapshot(writeLogDir,
-                        initialControllers.get(),
+                if (writeBootstrapSnapshot) {
+                    writeBoostrapSnapshot(writeLogDir,
+                        bootstrapMetadata,
+                        initialControllers,
                         featureLevels.get(KRaftVersion.FEATURE_NAME),
                         controllerListenerName);
                 }
@@ -487,9 +503,10 @@ public class Formatter {
         }
     }
 
-    static void writeDynamicQuorumSnapshot(
+    public static void writeBoostrapSnapshot(
         String writeLogDir,
-        DynamicVoters initialControllers,
+        BootstrapMetadata bootstrapMetadata,
+        Optional<DynamicVoters> initialControllers,
         short kraftVersion,
         String controllerListenerName
     ) {
@@ -497,7 +514,6 @@ public class Formatter {
         File clusterMetadataDirectory = new File(parentDir, 
String.format("%s-%d",
                 CLUSTER_METADATA_TOPIC_PARTITION.topic(),
                 CLUSTER_METADATA_TOPIC_PARTITION.partition()));
-        VoterSet voterSet = 
initialControllers.toVoterSet(controllerListenerName);
         RecordsSnapshotWriter.Builder builder = new 
RecordsSnapshotWriter.Builder().
             setLastContainedLogTimestamp(Time.SYSTEM.milliseconds()).
             setMaxBatchSizeBytes(KafkaRaftClient.MAX_BATCH_SIZE_BYTES).
@@ -505,8 +521,10 @@ public class Formatter {
                 clusterMetadataDirectory.toPath(),
                 Snapshots.BOOTSTRAP_SNAPSHOT_ID)).
             setKraftVersion(KRaftVersion.fromFeatureLevel(kraftVersion)).
-            setVoterSet(Optional.of(voterSet));
+            setVoterSet(initialControllers.map(controllers -> 
controllers.toVoterSet(controllerListenerName)));
+
         try (RecordsSnapshotWriter<ApiMessageAndVersion> writer = 
builder.build(MetadataRecordSerde.INSTANCE)) {
+            writer.append(bootstrapMetadata.records());
             writer.freeze();
         }
     }
diff --git 
a/metadata/src/main/java/org/apache/kafka/metadata/util/BatchFileReader.java 
b/metadata/src/main/java/org/apache/kafka/metadata/util/BatchFileReader.java
index 71957dce6cf..70e5b8b8920 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/util/BatchFileReader.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/util/BatchFileReader.java
@@ -17,9 +17,11 @@
 
 package org.apache.kafka.metadata.util;
 
+import org.apache.kafka.common.message.KRaftVersionRecord;
 import org.apache.kafka.common.message.LeaderChangeMessage;
 import org.apache.kafka.common.message.SnapshotFooterRecord;
 import org.apache.kafka.common.message.SnapshotHeaderRecord;
+import org.apache.kafka.common.message.VotersRecord;
 import org.apache.kafka.common.protocol.ByteBufferAccessor;
 import org.apache.kafka.common.record.internal.ControlRecordType;
 import 
org.apache.kafka.common.record.internal.FileLogInputStream.FileChannelRecordBatch;
@@ -34,6 +36,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Iterator;
@@ -55,7 +58,7 @@ public final class BatchFileReader implements 
Iterator<BatchFileReader.BatchAndT
             return this;
         }
 
-        public BatchFileReader build() throws Exception {
+        public BatchFileReader build() throws IOException {
             if (path == null) {
                 throw new RuntimeException("You must specify a path.");
             }
@@ -119,6 +122,17 @@ public final class BatchFileReader implements 
Iterator<BatchFileReader.BatchAndT
                         messages.add(new ApiMessageAndVersion(message, (short) 
0));
                         break;
                     }
+                    case KRAFT_VERSION: {
+                        KRaftVersionRecord message = new KRaftVersionRecord();
+                        message.read(new ByteBufferAccessor(record.value()), 
(short) 0);
+                        messages.add(new ApiMessageAndVersion(message, (short) 
0));
+                        break;
+                    }
+                    case KRAFT_VOTERS:
+                        VotersRecord message =  new VotersRecord();
+                        message.read(new ByteBufferAccessor(record.value()), 
(short) 0);
+                        messages.add(new ApiMessageAndVersion(message, (short) 
0));
+                        break;
                     default:
                         throw new RuntimeException("Unsupported control record 
type " + type + " at offset " +
                                 record.offset());
diff --git 
a/metadata/src/test/java/org/apache/kafka/metadata/bootstrap/BootstrapDirectoryTest.java
 
b/metadata/src/test/java/org/apache/kafka/metadata/bootstrap/BootstrapDirectoryTest.java
deleted file mode 100644
index 97240b227bb..00000000000
--- 
a/metadata/src/test/java/org/apache/kafka/metadata/bootstrap/BootstrapDirectoryTest.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.metadata.bootstrap;
-
-import org.apache.kafka.common.metadata.FeatureLevelRecord;
-import org.apache.kafka.common.metadata.NoOpRecord;
-import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.server.common.ApiMessageAndVersion;
-import org.apache.kafka.server.common.MetadataVersion;
-import org.apache.kafka.test.TestUtils;
-
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.Timeout;
-
-import java.io.File;
-import java.util.List;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-
-
-@Timeout(40)
-public class BootstrapDirectoryTest {
-    static final List<ApiMessageAndVersion> SAMPLE_RECORDS1 = List.of(
-            new ApiMessageAndVersion(new FeatureLevelRecord().
-                    setName(MetadataVersion.FEATURE_NAME).
-                    setFeatureLevel((short) 7), (short) 0),
-            new ApiMessageAndVersion(new NoOpRecord(), (short) 0),
-            new ApiMessageAndVersion(new NoOpRecord(), (short) 0));
-
-    static class BootstrapTestDirectory implements AutoCloseable {
-        File directory = null;
-
-        synchronized BootstrapTestDirectory createDirectory() {
-            directory = TestUtils.tempDirectory("BootstrapTestDirectory");
-            return this;
-        }
-
-        synchronized String path() {
-            return directory.getAbsolutePath();
-        }
-
-        synchronized String binaryBootstrapPath() {
-            return new File(directory, 
BootstrapDirectory.BINARY_BOOTSTRAP_FILENAME).getAbsolutePath();
-        }
-
-        @Override
-        public synchronized void close() throws Exception {
-            if (directory != null) {
-                Utils.delete(directory);
-            }
-            directory = null;
-        }
-    }
-
-    @Test
-    public void testReadFromEmptyConfiguration() throws Exception {
-        try (BootstrapTestDirectory testDirectory = new 
BootstrapTestDirectory().createDirectory()) {
-            
assertEquals(BootstrapMetadata.fromVersion(MetadataVersion.latestProduction(),
-                    "the default bootstrap"),
-                new BootstrapDirectory(testDirectory.path()).read());
-        }
-    }
-
-    @Test
-    public void testMissingDirectory() {
-        assertEquals("No such directory as ./non/existent/directory",
-            assertThrows(RuntimeException.class, () ->
-                new 
BootstrapDirectory("./non/existent/directory").read()).getMessage());
-    }
-
-    @Test
-    public void testReadFromConfigurationFile() throws Exception {
-        try (BootstrapTestDirectory testDirectory = new 
BootstrapTestDirectory().createDirectory()) {
-            BootstrapDirectory directory = new 
BootstrapDirectory(testDirectory.path());
-            BootstrapMetadata metadata = 
BootstrapMetadata.fromRecords(SAMPLE_RECORDS1,
-                    "the binary bootstrap metadata file: " + 
testDirectory.binaryBootstrapPath());
-            directory.writeBinaryFile(metadata);
-            assertEquals(metadata, directory.read());
-        }
-    }
-}
diff --git 
a/metadata/src/test/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadataTest.java
 
b/metadata/src/test/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadataTest.java
index 0ad45185eda..23fbada3007 100644
--- 
a/metadata/src/test/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadataTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadataTest.java
@@ -19,14 +19,21 @@ package org.apache.kafka.metadata.bootstrap;
 
 import org.apache.kafka.common.metadata.FeatureLevelRecord;
 import org.apache.kafka.common.metadata.NoOpRecord;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.metadata.util.BatchFileWriter;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
 import org.apache.kafka.server.common.MetadataVersion;
 import org.apache.kafka.server.common.MetadataVersionTestUtils;
+import org.apache.kafka.test.TestUtils;
 
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 
+import java.io.UncheckedIOException;
+import java.nio.file.Path;
 import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
 
 import static org.apache.kafka.server.common.MetadataVersion.FEATURE_NAME;
 import static org.apache.kafka.server.common.MetadataVersion.IBP_3_3_IV3;
@@ -45,6 +52,31 @@ public class BootstrapMetadataTest {
             setName(FEATURE_NAME).
             setFeatureLevel((short) 7), (short) 0));
 
+    static class BootstrapTestDirectory implements AutoCloseable {
+        private final Path directory;
+
+        static BootstrapTestDirectory createDirectory() {
+            return new BootstrapTestDirectory();
+        }
+
+        private BootstrapTestDirectory() {
+            this.directory = TestUtils.tempDirectory().toPath();
+        }
+
+        Path path() {
+            return directory;
+        }
+
+        Path binaryBootstrapPath() {
+            return 
directory.resolve(BootstrapMetadata.BINARY_BOOTSTRAP_FILENAME);
+        }
+
+        @Override
+        public void close() throws Exception {
+            Utils.delete(directory.toFile());
+        }
+    }
+
     @Test
     public void testFromVersion() {
         assertEquals(new BootstrapMetadata(List.of(
@@ -133,4 +165,73 @@ public class BootstrapMetadataTest {
             + " to " + MetadataVersion.latestTesting().featureLevel() + ".",
                 assertThrows(RuntimeException.class, 
bootstrapMetadata::metadataVersion).getMessage());
     }
+
+    @Test
+    public void testReadFromEmptyDirectory() throws Exception {
+        try (BootstrapTestDirectory testDirectory = 
BootstrapTestDirectory.createDirectory()) {
+            
assertEquals(BootstrapMetadata.fromVersion(MetadataVersion.latestProduction(),
+                    "the default bootstrap"),
+                BootstrapMetadata.fromDirectory(testDirectory.path()));
+        }
+    }
+
+    @Test
+    public void testMissingDirectory() {
+        assertEquals("No such directory as ./non/existent/directory",
+            assertThrows(IllegalStateException.class, () ->
+                
BootstrapMetadata.fromDirectory(Path.of("./non/existent/directory"))).getMessage());
+    }
+
+    @Test
+    public void testFromDirectoryWithLegacyBootstrapCheckpoint() throws 
Exception {
+        try (BootstrapTestDirectory testDirectory = 
BootstrapTestDirectory.createDirectory()) {
+            Path checkpointPath = testDirectory.binaryBootstrapPath();
+            BootstrapMetadata expected = 
BootstrapMetadata.fromVersion(IBP_3_3_IV3, "test");
+            try (BatchFileWriter writer = 
BatchFileWriter.open(checkpointPath)) {
+                writer.append(expected.records());
+            }
+            BootstrapMetadata result = 
BootstrapMetadata.fromDirectory(testDirectory.path());
+            assertEquals(expected.records(), result.records());
+        }
+    }
+
+    @Test
+    public void testFromCheckpointFileNotFound() {
+        Path nonExistent = Path.of("/tmp/does_not_exist_bootstrap.checkpoint");
+        assertThrows(UncheckedIOException.class,
+            () -> BootstrapMetadata.fromCheckpointFile(nonExistent));
+    }
+
+    @Test
+    public void testFromVersions() {
+        Map<String, Short> features = new TreeMap<>();
+        features.put("foo", (short) 2);
+        features.put("bar", (short) 1);
+        BootstrapMetadata bm = 
BootstrapMetadata.fromVersions(MetadataVersion.latestProduction(), features, 
"test");
+        assertEquals(List.of(
+            new ApiMessageAndVersion(new FeatureLevelRecord()
+                .setName(FEATURE_NAME)
+                
.setFeatureLevel(MetadataVersion.latestProduction().featureLevel()), (short) 0),
+            new ApiMessageAndVersion(new FeatureLevelRecord()
+                .setName("bar")
+                .setFeatureLevel((short) 1), (short) 0),
+            new ApiMessageAndVersion(new FeatureLevelRecord()
+                .setName("foo")
+                .setFeatureLevel((short) 2), (short) 0)),
+            bm.records()
+        );
+    }
+
+    @Test
+    public void testFromVersionsExcludesZeroLevelFeatures() {
+        Map<String, Short> features = Map.of("foo", (short) 0);
+        BootstrapMetadata bm = 
BootstrapMetadata.fromVersions(MetadataVersion.latestProduction(), features, 
"test");
+        assertEquals(List.of(
+            new ApiMessageAndVersion(new FeatureLevelRecord()
+                .setName(FEATURE_NAME)
+                
.setFeatureLevel(MetadataVersion.latestProduction().featureLevel()), (short) 
0)),
+            bm.records()
+        );
+    }
+
 }
diff --git 
a/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java 
b/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java
index dcfba4328ae..975c4db0e48 100644
--- 
a/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java
@@ -23,8 +23,8 @@ import 
org.apache.kafka.common.metadata.UserScramCredentialRecord;
 import org.apache.kafka.common.security.scram.internals.ScramFormatter;
 import org.apache.kafka.common.security.scram.internals.ScramMechanism;
 import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.metadata.bootstrap.BootstrapDirectory;
 import org.apache.kafka.metadata.bootstrap.BootstrapMetadata;
+import org.apache.kafka.metadata.bootstrap.BootstrapTestUtils;
 import org.apache.kafka.metadata.properties.MetaProperties;
 import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble;
 import org.apache.kafka.raft.DynamicVoters;
@@ -60,6 +60,7 @@ import java.util.Optional;
 import java.util.OptionalInt;
 import java.util.stream.Stream;
 
+import static 
org.apache.kafka.common.internals.Topic.CLUSTER_METADATA_TOPIC_PARTITION;
 import static org.apache.kafka.metadata.storage.ScramParserTest.TEST_SALT;
 import static 
org.apache.kafka.metadata.storage.ScramParserTest.TEST_SALTED_PASSWORD;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
@@ -148,12 +149,24 @@ public class FormatterTest {
             assertEquals(OptionalInt.of(DEFAULT_NODE_ID), ensemble.nodeId());
             assertEquals(Optional.of(DEFAULT_CLUSTER_ID.toString()), 
ensemble.clusterId());
             assertEquals(new HashSet<>(testEnv.directories), 
ensemble.logDirProps().keySet());
-            BootstrapMetadata bootstrapMetadata =
-                new BootstrapDirectory(testEnv.directory(0)).read();
+            BootstrapMetadata bootstrapMetadata = 
BootstrapTestUtils.readBootstrapMetadata(testEnv.directory(0));
             assertEquals(MetadataVersion.latestProduction(), 
bootstrapMetadata.metadataVersion());
         }
     }
 
+    @Test
+    public void testSkipsBootstrapSnapshotWhenDisabled() throws Exception {
+        try (TestEnv testEnv = new TestEnv(1)) {
+            FormatterContext context = testEnv.newFormatter();
+            context.formatter.setWriteBootstrapSnapshot(false);
+            context.formatter.run();
+            File clusterMetadataDir = new File(testEnv.directory(0), 
String.format("%s-%d",
+                CLUSTER_METADATA_TOPIC_PARTITION.topic(),
+                CLUSTER_METADATA_TOPIC_PARTITION.partition()));
+            assertFalse(clusterMetadataDir.exists());
+        }
+    }
+
     @Test
     public void testFormatterFailsOnAlreadyFormatted() throws Exception {
         try (TestEnv testEnv = new TestEnv(1)) {
@@ -170,11 +183,7 @@ public class FormatterTest {
         try (TestEnv testEnv = new TestEnv(1)) {
             new File(testEnv.directory(0)).setReadOnly();
             FormatterContext formatter1 = testEnv.newFormatter();
-            String expectedPrefix = "Error while writing meta.properties file";
-            assertEquals(expectedPrefix,
-                assertThrows(FormatterException.class,
-                    formatter1.formatter::run).
-                        getMessage().substring(0, expectedPrefix.length()));
+            assertThrows(Exception.class, formatter1.formatter::run);
         }
     }
 
@@ -266,8 +275,7 @@ public class FormatterTest {
                     "\nFormatting metadata directory " + testEnv.directory(0) +
                     " with metadata.version " + MetadataVersion.IBP_3_5_IV0 + 
".",
                 formatter1.output().trim());
-            BootstrapMetadata bootstrapMetadata =
-                new BootstrapDirectory(testEnv.directory(0)).read();
+            BootstrapMetadata bootstrapMetadata = 
BootstrapTestUtils.readBootstrapMetadata(testEnv.directory(0));
             assertEquals(MetadataVersion.IBP_3_5_IV0, 
bootstrapMetadata.metadataVersion());
             assertEquals(1, bootstrapMetadata.records().size());
         }
@@ -294,8 +302,7 @@ public class FormatterTest {
                     "\nFormatting metadata directory " + testEnv.directory(0) +
                     " with metadata.version " + 
MetadataVersion.latestTesting() + ".",
                 formatter1.output().trim());
-            BootstrapMetadata bootstrapMetadata =
-                    new BootstrapDirectory(testEnv.directory(0)).read();
+            BootstrapMetadata bootstrapMetadata = 
BootstrapTestUtils.readBootstrapMetadata(testEnv.directory(0));
             assertEquals(MetadataVersion.latestTesting(), 
bootstrapMetadata.metadataVersion());
         }
     }
@@ -345,8 +352,7 @@ public class FormatterTest {
                     "\nFormatting metadata directory " + testEnv.directory(0) +
                     " with metadata.version " + MetadataVersion.IBP_3_8_IV0 + 
".",
                 formatter1.output().trim());
-            BootstrapMetadata bootstrapMetadata =
-                new BootstrapDirectory(testEnv.directory(0)).read();
+            BootstrapMetadata bootstrapMetadata = 
BootstrapTestUtils.readBootstrapMetadata(testEnv.directory(0));
             assertEquals(MetadataVersion.IBP_3_8_IV0, 
bootstrapMetadata.metadataVersion());
             List<ApiMessageAndVersion> scramRecords = 
bootstrapMetadata.records().stream().
                 filter(r -> r.message() instanceof UserScramCredentialRecord).
@@ -380,8 +386,7 @@ public class FormatterTest {
             
formatter1.formatter.setSupportedFeatures(Feature.TEST_AND_PRODUCTION_FEATURES);
             
formatter1.formatter.setFeatureLevel(TestFeatureVersion.FEATURE_NAME, version);
             formatter1.formatter.run();
-            BootstrapMetadata bootstrapMetadata =
-                new BootstrapDirectory(testEnv.directory(0)).read();
+            BootstrapMetadata bootstrapMetadata = 
BootstrapTestUtils.readBootstrapMetadata(testEnv.directory(0));
             List<ApiMessageAndVersion> expected = new ArrayList<>();
             expected.add(new ApiMessageAndVersion(new FeatureLevelRecord().
                 setName(MetadataVersion.FEATURE_NAME).
diff --git 
a/metadata/src/testFixtures/java/org/apache/kafka/metadata/bootstrap/BootstrapTestUtils.java
 
b/metadata/src/testFixtures/java/org/apache/kafka/metadata/bootstrap/BootstrapTestUtils.java
new file mode 100644
index 00000000000..8ad7e64148d
--- /dev/null
+++ 
b/metadata/src/testFixtures/java/org/apache/kafka/metadata/bootstrap/BootstrapTestUtils.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata.bootstrap;
+
+import java.nio.file.Path;
+
+import static 
org.apache.kafka.common.internals.Topic.CLUSTER_METADATA_TOPIC_PARTITION;
+import static org.apache.kafka.snapshot.Snapshots.BOOTSTRAP_SNAPSHOT_ID;
+import static org.apache.kafka.snapshot.Snapshots.snapshotPath;
+
+/**
+ * Utilities for testing classes that deal with bootstrap metadata.
+ */
+public class BootstrapTestUtils {
+    /**
+     * Reads bootstrap metadata from the cluster metadata bootstrap checkpoint 
file of the given metadata directory.
+     *
+     * @param directoryPath the metadata log directory
+     * @return the bootstrap metadata
+     */
+    public static BootstrapMetadata readBootstrapMetadata(String 
directoryPath) {
+        Path metadataPartitionDir = Path.of(directoryPath,
+            CLUSTER_METADATA_TOPIC_PARTITION.topic() + "-" + 
CLUSTER_METADATA_TOPIC_PARTITION.partition());
+        Path checkpointPath = snapshotPath(metadataPartitionDir, 
BOOTSTRAP_SNAPSHOT_ID);
+        return BootstrapMetadata.fromCheckpointFile(checkpointPath);
+    }
+}
diff --git 
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java
 
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java
index fa498e190ea..9fd6404c893 100644
--- 
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java
+++ 
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java
@@ -31,6 +31,8 @@ import org.apache.kafka.clients.admin.AdminClientConfig;
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.config.SaslConfigs;
 import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
+import org.apache.kafka.common.metadata.FeatureLevelRecord;
+import org.apache.kafka.common.metadata.UserScramCredentialRecord;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.network.ListenerName;
 import org.apache.kafka.common.security.auth.SecurityProtocol;
@@ -48,6 +50,7 @@ import org.apache.kafka.raft.KRaftConfigs;
 import org.apache.kafka.raft.MetadataLogConfig;
 import org.apache.kafka.raft.QuorumConfig;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.MetadataVersion;
 import org.apache.kafka.server.config.ServerConfigs;
 import org.apache.kafka.server.fault.FaultHandler;
 import org.apache.kafka.storage.internals.log.CleanerConfig;
@@ -496,6 +499,25 @@ public class KafkaClusterTestKit implements AutoCloseable {
             formatter.setIgnoreFormatted(false);
             formatter.setControllerListenerName(controllerListenerName);
             formatter.setMetadataLogDirectory(ensemble.metadataLogDir().get());
+
+            List<ApiMessageAndVersion> additionalRecords = new ArrayList<>();
+            for (ApiMessageAndVersion record : 
nodes.bootstrapMetadata().records()) {
+                if (record.message() instanceof FeatureLevelRecord 
featureRecord) {
+                    if 
(!featureRecord.name().equals(MetadataVersion.FEATURE_NAME)) {
+                        formatter.setFeatureLevel(featureRecord.name(), 
featureRecord.featureLevel());
+                    }
+                } else if (!(record.message() instanceof 
UserScramCredentialRecord)) {
+                    additionalRecords.add(record);
+                } else {
+                    throw new IllegalStateException("UserScramCredentialRecord 
is not supported in " +
+                        "bootstrap metadata. Use Formatter.setScramArguments() 
instead.");
+                }
+            }
+            for (String disabledFeature : nodes.disabledFeatures()) {
+                formatter.setFeatureLevel(disabledFeature, (short) 0);
+            }
+            formatter.setAdditionalBootstrapRecords(additionalRecords);
+
             StringBuilder dynamicVotersBuilder = new StringBuilder();
             String prefix = "";
             if (standalone) {
diff --git 
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestKitNodes.java
 
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestKitNodes.java
index 0de47d9b771..b1c7fc00e88 100644
--- 
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestKitNodes.java
+++ 
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestKitNodes.java
@@ -36,6 +36,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
+import java.util.Set;
 import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.stream.Collectors;
@@ -54,6 +55,7 @@ public class TestKitNodes {
         private int numDisksPerBroker = 1;
         private Map<Integer, Map<String, String>> perServerProperties = 
Map.of();
         private BootstrapMetadata bootstrapMetadata;
+        private Set<String> disabledFeatures = Set.of();
 
         public Builder() {
             this(BootstrapMetadata.fromVersions(
@@ -93,6 +95,11 @@ public class TestKitNodes {
             return this;
         }
 
+        public Builder setDisabledFeatures(Set<String> disabledFeatures) {
+            this.disabledFeatures = 
Collections.unmodifiableSet(disabledFeatures);
+            return this;
+        }
+
         public Builder setCombined(boolean combined) {
             this.combined = combined;
             return this;
@@ -215,14 +222,15 @@ public class TestKitNodes {
                 brokerNodes.put(id, brokerNode);
             }
 
-            return new TestKitNodes(baseDirectory.toFile().getAbsolutePath(), 
clusterId, bootstrapMetadata, controllerNodes, brokerNodes,
-                brokerListenerName, brokerSecurityProtocol, 
controllerListenerName, controllerSecurityProtocol);
+            return new TestKitNodes(baseDirectory.toFile().getAbsolutePath(), 
clusterId, bootstrapMetadata, disabledFeatures, controllerNodes,
+                brokerNodes, brokerListenerName, brokerSecurityProtocol, 
controllerListenerName, controllerSecurityProtocol);
         }
     }
 
     private final String baseDirectory;
     private final String clusterId;
     private final BootstrapMetadata bootstrapMetadata;
+    private final Set<String> disabledFeatures;
     private final SortedMap<Integer, TestKitNode> controllerNodes;
     private final SortedMap<Integer, TestKitNode> brokerNodes;
     private final ListenerName brokerListenerName;
@@ -234,6 +242,7 @@ public class TestKitNodes {
         String baseDirectory,
         String clusterId,
         BootstrapMetadata bootstrapMetadata,
+        Set<String> disabledFeatures,
         SortedMap<Integer, TestKitNode> controllerNodes,
         SortedMap<Integer, TestKitNode> brokerNodes,
         ListenerName brokerListenerName,
@@ -244,6 +253,7 @@ public class TestKitNodes {
         this.baseDirectory = Objects.requireNonNull(baseDirectory);
         this.clusterId = Objects.requireNonNull(clusterId);
         this.bootstrapMetadata = Objects.requireNonNull(bootstrapMetadata);
+        this.disabledFeatures = Objects.requireNonNull(disabledFeatures);
         this.controllerNodes = Collections.unmodifiableSortedMap(new 
TreeMap<>(Objects.requireNonNull(controllerNodes)));
         this.brokerNodes = Collections.unmodifiableSortedMap(new 
TreeMap<>(Objects.requireNonNull(brokerNodes)));
         this.brokerListenerName = Objects.requireNonNull(brokerListenerName);
@@ -272,6 +282,10 @@ public class TestKitNodes {
         return bootstrapMetadata;
     }
 
+    public Set<String> disabledFeatures() {
+        return disabledFeatures;
+    }
+
     public SortedMap<Integer, TestKitNode> brokerNodes() {
         return brokerNodes;
     }
diff --git 
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/junit/RaftClusterInvocationContext.java
 
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/junit/RaftClusterInvocationContext.java
index f1ad8080cb4..ba9da8c77bf 100644
--- 
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/junit/RaftClusterInvocationContext.java
+++ 
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/junit/RaftClusterInvocationContext.java
@@ -336,8 +336,15 @@ public class RaftClusterInvocationContext implements 
TestTemplateInvocationConte
                     }
                 }
 
+                Set<String> disabledFeatures = 
newFeatureLevels.entrySet().stream()
+                    .filter(featureEntry -> featureEntry.getValue() == 0)
+                    .filter(featureEntry -> 
!featureEntry.getKey().equals(MetadataVersion.FEATURE_NAME))
+                    .map(Map.Entry::getKey)
+                    .collect(Collectors.toSet());
+
                 TestKitNodes nodes = new TestKitNodes.Builder()
                         
.setBootstrapMetadata(BootstrapMetadata.fromVersions(clusterConfig.metadataVersion(),
 newFeatureLevels, "testkit"))
+                        .setDisabledFeatures(disabledFeatures)
                         .setCombined(isCombined)
                         .setNumBrokerNodes(clusterConfig.numBrokers())
                         
.setNumDisksPerBroker(clusterConfig.numDisksPerBroker())
diff --git a/tools/src/main/java/org/apache/kafka/tools/DumpLogSegments.java 
b/tools/src/main/java/org/apache/kafka/tools/DumpLogSegments.java
index 0ac0b74b6bc..8e6f3d8d096 100644
--- a/tools/src/main/java/org/apache/kafka/tools/DumpLogSegments.java
+++ b/tools/src/main/java/org/apache/kafka/tools/DumpLogSegments.java
@@ -54,7 +54,7 @@ import 
org.apache.kafka.coordinator.group.GroupCoordinatorRecordSerde;
 import org.apache.kafka.coordinator.share.ShareCoordinatorRecordSerde;
 import 
org.apache.kafka.coordinator.transaction.TransactionCoordinatorRecordSerde;
 import org.apache.kafka.metadata.MetadataRecordSerde;
-import org.apache.kafka.metadata.bootstrap.BootstrapDirectory;
+import org.apache.kafka.metadata.bootstrap.BootstrapMetadata;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
 import 
org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde;
 import org.apache.kafka.server.util.CommandDefaultOptions;
@@ -365,7 +365,7 @@ public class DumpLogSegments {
             long startOffset = Long.parseLong(file.getName().split("\\.")[0]);
             System.out.println("Log starting offset: " + startOffset);
         } else if (file.getName().endsWith(Snapshots.SUFFIX)) {
-            if 
(file.getName().equals(BootstrapDirectory.BINARY_BOOTSTRAP_FILENAME)) {
+            if 
(file.getName().equals(BootstrapMetadata.BINARY_BOOTSTRAP_FILENAME)) {
                 System.out.println("KRaft bootstrap snapshot");
             } else {
                 Optional<SnapshotPath> pathOpt = 
Snapshots.parse(file.toPath());

Reply via email to