This is an automated email from the ASF dual-hosted git repository.

jsancio pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f380a042d8d KAFKA-20164; Add bootstrap support to KRaft listener 
interface (#21453)
f380a042d8d is described below

commit f380a042d8d38980c3c0272eb0f2297e6d1ffb0e
Author: mannoopj <[email protected]>
AuthorDate: Wed Mar 18 15:17:44 2026 -0400

    KAFKA-20164; Add bootstrap support to KRaft listener interface (#21453)
    
    This change adds a new handleLoadBootstrap callback to
    RaftClient.Listener objects so that applications can handle
    bootstrapping snapshots. The snapshot sent to the handleLoadBootstrap
    callback contains state that has not been committed by KRaft. The
    snapshot sent to handleLoadSnapshot contains state that has been
    committed by KRaft.
    
    In a future change, QuorumController will use this feature to implement
    metadata bootstrapping using a KRaft bootstrap snapshot instead of the
    bootstrap.checkpoint.
    
    Reviewers: José Armando García Sancio <[email protected]>, Kevin Wu
     <[email protected]>
---
 .../main/scala/kafka/tools/TestRaftServer.scala    |  5 ++
 .../apache/kafka/controller/QuorumController.java  |  5 ++
 .../apache/kafka/image/loader/MetadataLoader.java  |  6 +++
 .../kafka/controller/MockRaftClientListener.java   | 17 ++++--
 .../org/apache/kafka/raft/KafkaRaftClient.java     |  9 +++-
 .../java/org/apache/kafka/raft/RaftClient.java     | 16 +++++-
 .../kafka/raft/KafkaRaftClientSnapshotTest.java    | 62 +++++++++++++++++++++-
 .../apache/kafka/raft/RaftClientTestContext.java   | 30 ++++++++++-
 .../org/apache/kafka/raft/ReplicatedCounter.java   |  6 +++
 9 files changed, 145 insertions(+), 11 deletions(-)

diff --git a/core/src/main/scala/kafka/tools/TestRaftServer.scala 
b/core/src/main/scala/kafka/tools/TestRaftServer.scala
index 83eeab0d527..a76abfce055 100644
--- a/core/src/main/scala/kafka/tools/TestRaftServer.scala
+++ b/core/src/main/scala/kafka/tools/TestRaftServer.scala
@@ -198,6 +198,11 @@ class TestRaftServer(
       eventQueue.offer(HandleSnapshot(reader))
     }
 
+    override def handleLoadBootstrap(reader: SnapshotReader[Array[Byte]]): 
Unit = {
+      // TestRaftServer does not process bootstrap snapshots.
+      reader.close()
+    }
+
     override def initiateShutdown(): Boolean = {
       val initiated = super.initiateShutdown()
       eventQueue.offer(Shutdown)
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java 
b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index 8ebd7848853..16f3438b94a 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -1065,6 +1065,11 @@ public final class QuorumController implements 
Controller {
             });
         }
 
+        @Override
+        public void handleLoadBootstrap(SnapshotReader<ApiMessageAndVersion> 
reader) {
+            reader.close();
+        }
+
         @Override
         public void handleLeaderChange(LeaderAndEpoch newLeader) {
             appendRaftEvent("handleLeaderChange[" + newLeader.epoch() + "]", 
() -> {
diff --git 
a/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java 
b/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java
index 3e21973306b..19235e578e3 100644
--- a/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java
+++ b/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java
@@ -426,6 +426,12 @@ public class MetadataLoader implements 
RaftClient.Listener<ApiMessageAndVersion>
         });
     }
 
+    @Override
+    public void handleLoadBootstrap(SnapshotReader<ApiMessageAndVersion> 
reader) {
+        // MetadataLoader does not process uncommitted bootstrap snapshots.
+        reader.close();
+    }
+
     /**
      * Load a snapshot. This is relatively straightforward since we don't 
track as many things as
      * we do in loadLogDelta. The main complication here is that we have to 
maintain an index
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/MockRaftClientListener.java
 
b/metadata/src/test/java/org/apache/kafka/controller/MockRaftClientListener.java
index b4ee74346ee..48f1d449203 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/MockRaftClientListener.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/MockRaftClientListener.java
@@ -36,6 +36,8 @@ public class MockRaftClientListener implements 
RaftClient.Listener<ApiMessageAnd
     public static final String RENOUNCE = "RENOUNCE";
     public static final String SHUTDOWN = "SHUTDOWN";
     public static final String SNAPSHOT = "SNAPSHOT";
+    public static final String BOOTSTRAP_SNAPSHOT = "BOOTSTRAP_SNAPSHOT";
+    public static final String BOOTSTRAP_OFFSET = "BOOTSTRAP_OFFSET";
 
     private final int nodeId;
     private final List<String> serializedEvents = new ArrayList<>();
@@ -65,16 +67,25 @@ public class MockRaftClientListener implements 
RaftClient.Listener<ApiMessageAnd
 
     @Override
     public synchronized void 
handleLoadSnapshot(SnapshotReader<ApiMessageAndVersion> reader) {
-        long lastCommittedOffset = reader.lastContainedLogOffset();
+        loadSnapshot(reader, SNAPSHOT, LAST_COMMITTED_OFFSET);
+    }
+
+    @Override
+    public synchronized void 
handleLoadBootstrap(SnapshotReader<ApiMessageAndVersion> reader) {
+        loadSnapshot(reader, BOOTSTRAP_SNAPSHOT, BOOTSTRAP_OFFSET);
+    }
+
+    private void loadSnapshot(SnapshotReader<ApiMessageAndVersion> reader, 
String eventType, String offsetLabel) {
+        long lastContainedLogOffset = reader.lastContainedLogOffset();
         try {
             while (reader.hasNext()) {
                 Batch<ApiMessageAndVersion> batch = reader.next();
 
                 for (ApiMessageAndVersion messageAndVersion : batch.records()) 
{
                     ApiMessage message = messageAndVersion.message();
-                    serializedEvents.add(SNAPSHOT + " " + message.toString());
+                    serializedEvents.add(eventType + " " + message.toString());
                 }
-                serializedEvents.add(LAST_COMMITTED_OFFSET + " " + 
lastCommittedOffset);
+                serializedEvents.add(offsetLabel + " " + 
lastContainedLogOffset);
             }
         } finally {
             reader.close();
diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java 
b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
index 9e002ff297f..9df0b82a373 100644
--- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
+++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
@@ -4028,8 +4028,13 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
                 lastSent = null;
             }
 
-            logger.debug("Notifying listener {} of snapshot {}", 
listenerName(), reader.snapshotId());
-            listener.handleLoadSnapshot(reader);
+            if (reader.snapshotId().equals(BOOTSTRAP_SNAPSHOT_ID)) {
+                logger.debug("Notifying listener {} of bootstrap snapshot {}", 
listenerName(), reader.snapshotId());
+                listener.handleLoadBootstrap(reader);
+            } else {
+                logger.debug("Notifying listener {} of committed snapshot {}", 
listenerName(), reader.snapshotId());
+                listener.handleLoadSnapshot(reader);
+            }
         }
 
         /**
diff --git a/raft/src/main/java/org/apache/kafka/raft/RaftClient.java 
b/raft/src/main/java/org/apache/kafka/raft/RaftClient.java
index 95ed1905b67..de74296c287 100644
--- a/raft/src/main/java/org/apache/kafka/raft/RaftClient.java
+++ b/raft/src/main/java/org/apache/kafka/raft/RaftClient.java
@@ -51,7 +51,7 @@ public interface RaftClient<T> extends AutoCloseable {
         void handleCommit(BatchReader<T> reader);
 
         /**
-         * Callback which is invoked when the Listener needs to load a 
snapshot.
+         * Callback which is invoked when the Listener needs to load a 
committed snapshot.
          * It is the responsibility of this implementation to invoke {@link 
SnapshotReader#close()}
          * after consuming the reader.
          *
@@ -62,13 +62,25 @@ public interface RaftClient<T> extends AutoCloseable {
          */
         void handleLoadSnapshot(SnapshotReader<T> reader);
 
+        /**
+         * Callback which is invoked when the Listener needs to load bootstrap 
snapshot.
+         * Bootstrap snapshots are uncommitted and are used to store and load 
the initial application state.
+         *
+         * It is the responsibility of this implementation to invoke {@link 
SnapshotReader#close()}
+         * after consuming the reader.
+         *
+         * @param reader snapshot reader instance which must be iterated and 
closed
+         */
+        void handleLoadBootstrap(SnapshotReader<T> reader);
+
         /**
          * Called on any change to leadership. This includes both when a 
leader is elected and
          * when a leader steps down or fails.
          *
          * If this node is the leader, then the notification of leadership 
will be delayed until
          * the implementation of this interface has caught up to the 
high-watermark through calls to
-         * {@link #handleLoadSnapshot(SnapshotReader)} and {@link 
#handleCommit(BatchReader)}.
+         * {@link #handleLoadSnapshot(SnapshotReader)}, {@link 
#handleLoadBootstrap(SnapshotReader)},
+         * and {@link #handleCommit(BatchReader)}.
          *
          * If this node is not the leader, then this method will be called as 
soon as possible. In
          * this case the leader may or may not be known for the current epoch.
diff --git 
a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java 
b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java
index 921f1104f53..b159c08d2f2 100644
--- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java
@@ -51,6 +51,7 @@ import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Stream;
 
+import static 
org.apache.kafka.raft.RaftClientTestContext.RaftProtocol.KIP_853_PROTOCOL;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
@@ -123,11 +124,12 @@ public final class KafkaRaftClientSnapshotTest {
         context.assertSentFetchPartitionResponse(Errors.NONE, epoch, 
OptionalInt.of(localId));
         assertEquals(localLogEndOffset, 
context.client.highWatermark().getAsLong());
 
-        // Check that listener was notified of the new snapshot
+        // Check that listener was notified of the committed snapshot, not the 
bootstrap snapshot
         try (SnapshotReader<String> snapshot = 
context.listener.drainHandledSnapshot().get()) {
             assertEquals(snapshotId, snapshot.snapshotId());
             SnapshotWriterReaderTest.assertDataSnapshot(List.of(), snapshot);
         }
+        
assertFalse(context.listener.drainHandledBootstrapSnapshot().isPresent());
     }
 
     @ParameterizedTest
@@ -176,11 +178,12 @@ public final class KafkaRaftClientSnapshotTest {
             context.client.highWatermark()
         );
 
-        // Check that listener was notified of the new snapshot
+        // Check that listener was notified of the committed snapshot, not the 
bootstrap snapshot
         try (SnapshotReader<String> snapshot = 
context.listener.drainHandledSnapshot().get()) {
             assertEquals(snapshotId, snapshot.snapshotId());
             SnapshotWriterReaderTest.assertDataSnapshot(List.of(), snapshot);
         }
+        
assertFalse(context.listener.drainHandledBootstrapSnapshot().isPresent());
     }
 
     @ParameterizedTest
@@ -2129,6 +2132,61 @@ public final class KafkaRaftClientSnapshotTest {
         );
     }
 
+    @Test
+    public void testListenerReceivesBootstrapSnapshot() throws Exception {
+        ReplicaKey localKey = replicaKey(randomReplicaId(), true);
+        VoterSet voters = VoterSetTest.voterSet(Stream.of(localKey));
+        List<String> bootstrapRecords = List.of("a", "b", "c");
+
+        RaftClientTestContext context = new RaftClientTestContext
+            .Builder(localKey.id(), localKey.directoryId().get())
+            .withRaftProtocol(KIP_853_PROTOCOL)
+            .withBootstrapSnapshotRecords(Optional.of(voters), 
bootstrapRecords)
+            .build();
+
+        context.pollUntil(() -> context.client.highWatermark().isPresent());
+
+        assertBootstrapSnapshot(context, bootstrapRecords);
+    }
+
+    @Test
+    public void testListenerReceivesBootstrapSnapshotViaFollowerFetch() throws 
Exception {
+        ReplicaKey localKey = replicaKey(randomReplicaId(), true);
+        ReplicaKey otherNodeKey = replicaKey(localKey.id() + 1, true);
+        VoterSet voters = VoterSetTest.voterSet(Stream.of(localKey, 
otherNodeKey));
+        List<String> bootstrapRecords = List.of("a", "b", "c");
+
+        RaftClientTestContext context = new RaftClientTestContext
+            .Builder(localKey.id(), localKey.directoryId().get())
+            .withRaftProtocol(KIP_853_PROTOCOL)
+            .withBootstrapSnapshotRecords(Optional.of(voters), 
bootstrapRecords)
+            .withUnknownLeader(3)
+            .build();
+
+        context.unattachedToLeader();
+        int epoch = context.currentEpoch();
+
+        // Advance HWM via follower fetch
+        long localLogEndOffset = context.log.endOffset().offset();
+        context.deliverRequest(context.fetchRequest(epoch, otherNodeKey, 
localLogEndOffset, epoch, 0));
+        context.pollUntilResponse();
+        context.assertSentFetchPartitionResponse(Errors.NONE, epoch, 
OptionalInt.of(localKey.id()));
+        assertEquals(localLogEndOffset, 
context.client.highWatermark().getAsLong());
+
+        assertBootstrapSnapshot(context, bootstrapRecords);
+    }
+
+    private static void assertBootstrapSnapshot(
+        RaftClientTestContext context,
+        List<String> expectedRecords
+    ) throws Exception {
+        try (SnapshotReader<String> bootstrapSnapshot = 
context.listener.drainHandledBootstrapSnapshot().get()) {
+            assertEquals(Snapshots.BOOTSTRAP_SNAPSHOT_ID, 
bootstrapSnapshot.snapshotId());
+            
SnapshotWriterReaderTest.assertDataSnapshot(List.of(expectedRecords), 
bootstrapSnapshot);
+        }
+        assertFalse(context.listener.drainHandledSnapshot().isPresent());
+    }
+
     private static ReplicaKey replicaKey(int id, boolean withDirectoryId) {
         Uuid directoryId = withDirectoryId ? Uuid.randomUuid() : 
ReplicaKey.NO_DIRECTORY_ID;
         return ReplicaKey.of(id, directoryId);
diff --git 
a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java 
b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
index e1c90f2de15..cd839d37b42 100644
--- a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
+++ b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
@@ -350,6 +350,10 @@ public final class RaftClientTestContext {
         }
 
         Builder withBootstrapSnapshot(Optional<VoterSet> voters) {
+            return withBootstrapSnapshotRecords(voters, List.of());
+        }
+
+        Builder withBootstrapSnapshotRecords(Optional<VoterSet> voters, 
List<String> records) {
             startingVoters = voters.orElse(VoterSet.empty());
             isStartingVotersStatic = false;
 
@@ -364,6 +368,9 @@ public final class RaftClientTestContext {
                     .setVoterSet(voters);
 
                 try (RecordsSnapshotWriter<String> writer = 
builder.build(SERDE)) {
+                    if (!records.isEmpty()) {
+                        writer.append(records);
+                    }
                     writer.freeze();
                 }
             } else {
@@ -2224,6 +2231,7 @@ public final class RaftClientTestContext {
         private LeaderAndEpoch currentLeaderAndEpoch = LeaderAndEpoch.UNKNOWN;
         private final OptionalInt localId;
         private Optional<SnapshotReader<String>> snapshot = Optional.empty();
+        private Optional<SnapshotReader<String>> bootstrapSnapshot = 
Optional.empty();
         private boolean readCommit = true;
 
         MockListener(OptionalInt localId) {
@@ -2358,10 +2366,28 @@ public final class RaftClientTestContext {
 
         @Override
         public void handleLoadSnapshot(SnapshotReader<String> reader) {
-            snapshot.ifPresent(snapshot -> 
assertDoesNotThrow(snapshot::close));
+            snapshot = handleLoadSnapshotOrBootstrap(snapshot, reader);
+        }
+
+        @Override
+        public void handleLoadBootstrap(SnapshotReader<String> reader) {
+            bootstrapSnapshot = 
handleLoadSnapshotOrBootstrap(bootstrapSnapshot, reader);
+        }
+
+        private Optional<SnapshotReader<String>> handleLoadSnapshotOrBootstrap(
+            Optional<SnapshotReader<String>> previousSnapshot,
+            SnapshotReader<String> reader
+        ) {
+            previousSnapshot.ifPresent(s -> assertDoesNotThrow(s::close));
             commits.clear();
             savedBatches.clear();
-            snapshot = Optional.of(reader);
+            return Optional.of(reader);
+        }
+
+        Optional<SnapshotReader<String>> drainHandledBootstrapSnapshot() {
+            Optional<SnapshotReader<String>> temp = bootstrapSnapshot;
+            bootstrapSnapshot = Optional.empty();
+            return temp;
         }
     }
 
diff --git a/raft/src/test/java/org/apache/kafka/raft/ReplicatedCounter.java 
b/raft/src/test/java/org/apache/kafka/raft/ReplicatedCounter.java
index d877a7ec308..6e0de670b08 100644
--- a/raft/src/test/java/org/apache/kafka/raft/ReplicatedCounter.java
+++ b/raft/src/test/java/org/apache/kafka/raft/ReplicatedCounter.java
@@ -181,6 +181,12 @@ public class ReplicatedCounter implements 
RaftClient.Listener<Integer> {
         }
     }
 
+    @Override
+    public synchronized void handleLoadBootstrap(SnapshotReader<Integer> 
reader) {
+        // ReplicatedCounter does not process bootstrap snapshots.
+        reader.close();
+    }
+
     @Override
     public synchronized void handleLeaderChange(LeaderAndEpoch newLeader) {
         if (newLeader.isLeader(nodeId)) {

Reply via email to