This is an automated email from the ASF dual-hosted git repository.

jsancio pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4d3c65898df KAFKA-19541; Allow configuration of kraft RPC max bytes 
(#21028)
4d3c65898df is described below

commit 4d3c65898dff8bf03a1111cb9956c6340e29de81
Author: Jonah Hooper <[email protected]>
AuthorDate: Thu Mar 26 17:00:46 2026 -0400

    KAFKA-19541; Allow configuration of kraft RPC max bytes (#21028)
    
    Implements KIP-1219 by adding two new configuration properties:
    controller.quorum.fetch.snapshot.max.bytes and
    controller.quorum.fetch.max.bytes. The
    controller.quorum.fetch.snapshot.max.bytes property controls the maximum
    bytes for the FETCH_SNAPSHOT request. The
    controller.quorum.fetch.max.bytes property controls the maximum bytes
    for the FETCH request.
    
    This change also removes the internal.metadata.max.batch.size.in.bytes
    internal configuration property. This internal configuration property is
    not needed now that there is a publicly available configuration
    property.
    
    Reviewers: Alyssa Huang <[email protected]>, José Armando García
     Sancio <[email protected]>
---
 .../scala/unit/kafka/server/KafkaConfigTest.scala  |   1 -
 .../org/apache/kafka/raft/KafkaRaftClient.java     |  16 +-
 .../org/apache/kafka/raft/MetadataLogConfig.java   |  10 -
 .../java/org/apache/kafka/raft/QuorumConfig.java   |  25 +-
 .../main/java/org/apache/kafka/raft/RaftLog.java   |  12 +-
 .../internals/KRaftControlRecordStateMachine.java  |   6 +-
 .../apache/kafka/raft/internals/KafkaRaftLog.java  |  16 +-
 .../kafka/raft/KafkaRaftClientFetchTest.java       | 291 ++++++++++++++++++++-
 .../kafka/raft/KafkaRaftClientReconfigTest.java    |  24 +-
 .../kafka/raft/KafkaRaftClientSnapshotTest.java    | 107 +++++++-
 .../org/apache/kafka/raft/KafkaRaftClientTest.java |  11 +-
 .../test/java/org/apache/kafka/raft/MockLog.java   |  19 +-
 .../java/org/apache/kafka/raft/MockLogTest.java    | 207 ++++++++++++++-
 .../org/apache/kafka/raft/QuorumConfigTest.java    |   2 +
 .../apache/kafka/raft/RaftClientTestContext.java   |  24 +-
 .../kafka/raft/internals/KafkaRaftLogTest.java     | 134 ++++++++--
 16 files changed, 828 insertions(+), 77 deletions(-)

diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index bb493c89811..2a035f77333 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -806,7 +806,6 @@ class KafkaConfigTest {
         case MetadataLogConfig.METADATA_MAX_RETENTION_MILLIS_CONFIG => 
assertPropertyInvalid(baseProperties, name, "not_a_number")
         case MetadataLogConfig.INTERNAL_METADATA_LOG_SEGMENT_BYTES_CONFIG => 
// no op
         case 
MetadataLogConfig.INTERNAL_METADATA_MAX_BATCH_SIZE_IN_BYTES_CONFIG => // no op
-        case 
MetadataLogConfig.INTERNAL_METADATA_MAX_FETCH_SIZE_IN_BYTES_CONFIG => // no op
         case MetadataLogConfig.INTERNAL_METADATA_DELETE_DELAY_MILLIS_CONFIG => 
// no op
         case KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG => // ignore string
         case MetadataLogConfig.METADATA_MAX_IDLE_INTERVAL_MS_CONFIG  => 
assertPropertyInvalid(baseProperties, name, "not_a_number")
diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java 
b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
index 9df0b82a373..02ae182901f 100644
--- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
+++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
@@ -171,7 +171,6 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
     private static final int MAX_FETCH_WAIT_MS = 500;
     // visible for testing
     public static final int MAX_BATCH_SIZE_BYTES = 8 * 1024 * 1024;
-    public static final int MAX_FETCH_SIZE_BYTES = MAX_BATCH_SIZE_BYTES;
 
     private final OptionalInt nodeId;
     private final Uuid nodeDirectoryId;
@@ -434,7 +433,11 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
             // Re-read the expected offset in case the snapshot had to be 
reloaded
             listenerContext.nextExpectedOffset().ifPresent(nextExpectedOffset 
-> {
                 if (nextExpectedOffset < highWatermark) {
-                    LogFetchInfo readInfo = log.read(nextExpectedOffset, 
Isolation.COMMITTED);
+                    LogFetchInfo readInfo = log.read(
+                        nextExpectedOffset,
+                        Isolation.COMMITTED,
+                        Integer.MAX_VALUE
+                    );
                     listenerContext.fireHandleCommit(nextExpectedOffset, 
readInfo.records);
                 }
             });
@@ -1517,6 +1520,7 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
             requestMetadata.apiVersion(),
             replicaKey,
             fetchPartition,
+            request.maxBytes(),
             currentTimeMs
         );
         FetchResponseData.PartitionData partitionResponse =
@@ -1587,6 +1591,7 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
                     requestMetadata.apiVersion(),
                     replicaKey,
                     fetchPartition,
+                    request.maxBytes(),
                     completionTimeMs
                 );
             }
@@ -1598,6 +1603,7 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
         short apiVersion,
         ReplicaKey replicaKey,
         FetchRequestData.FetchPartition request,
+        int maxSizeBytes,
         long currentTimeMs
     ) {
         try {
@@ -1622,7 +1628,7 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
 
             final Records records;
             if (validOffsetAndEpoch.kind() == ValidOffsetAndEpoch.Kind.VALID) {
-                LogFetchInfo info = log.read(fetchOffset, 
Isolation.UNCOMMITTED);
+                LogFetchInfo info = log.read(fetchOffset, 
Isolation.UNCOMMITTED, maxSizeBytes);
 
                 if (state.updateReplicaState(replicaKey, currentTimeMs, 
info.startOffsetMetadata)) {
                     onUpdateLeaderHighWatermark(state, currentTimeMs);
@@ -2989,7 +2995,7 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
         );
 
         return request
-            .setMaxBytes(MAX_FETCH_SIZE_BYTES)
+            .setMaxBytes(quorumConfig.fetchMaxBytes())
             .setMaxWaitMs(fetchMaxWaitMs)
             .setClusterId(clusterId)
             .setReplicaState(new 
FetchRequestData.ReplicaState().setReplicaId(quorum.localIdOrSentinel()));
@@ -3013,7 +3019,7 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
             log.topicPartition(),
             quorum.epoch(),
             snapshotId,
-            MAX_FETCH_SIZE_BYTES,
+            quorumConfig.fetchSnapshotMaxBytes(),
             snapshotSize
         );
     }
diff --git a/raft/src/main/java/org/apache/kafka/raft/MetadataLogConfig.java 
b/raft/src/main/java/org/apache/kafka/raft/MetadataLogConfig.java
index 93821df8db3..331c64e858f 100644
--- a/raft/src/main/java/org/apache/kafka/raft/MetadataLogConfig.java
+++ b/raft/src/main/java/org/apache/kafka/raft/MetadataLogConfig.java
@@ -81,9 +81,6 @@ public class MetadataLogConfig {
     public static final String 
INTERNAL_METADATA_MAX_BATCH_SIZE_IN_BYTES_CONFIG = 
"internal.metadata.max.batch.size.in.bytes";
     public static final String INTERNAL_METADATA_MAX_BATCH_SIZE_IN_BYTES_DOC = 
"The largest record batch size allowed in the metadata log, only for testing.";
 
-    public static final String 
INTERNAL_METADATA_MAX_FETCH_SIZE_IN_BYTES_CONFIG = 
"internal.metadata.max.fetch.size.in.bytes";
-    public static final String INTERNAL_METADATA_MAX_FETCH_SIZE_IN_BYTES_DOC = 
"The maximum number of bytes to read when fetching from the metadata log, only 
for testing.";
-
     public static final String INTERNAL_METADATA_DELETE_DELAY_MILLIS_CONFIG = 
"internal.metadata.delete.delay.millis";
     public static final String INTERNAL_METADATA_DELETE_DELAY_MILLIS_DOC = 
"The amount of time to wait before deleting a file from the filesystem, only 
for testing.";
 
@@ -98,7 +95,6 @@ public class MetadataLogConfig {
             .define(METADATA_MAX_IDLE_INTERVAL_MS_CONFIG, INT, 
METADATA_MAX_IDLE_INTERVAL_MS_DEFAULT, atLeast(0), LOW, 
METADATA_MAX_IDLE_INTERVAL_MS_DOC)
             .defineInternal(INTERNAL_METADATA_LOG_SEGMENT_BYTES_CONFIG, INT, 
null, null, LOW, INTERNAL_METADATA_LOG_SEGMENT_BYTES_DOC)
             .defineInternal(INTERNAL_METADATA_MAX_BATCH_SIZE_IN_BYTES_CONFIG, 
INT, KafkaRaftClient.MAX_BATCH_SIZE_BYTES, null, LOW, 
INTERNAL_METADATA_MAX_BATCH_SIZE_IN_BYTES_DOC)
-            .defineInternal(INTERNAL_METADATA_MAX_FETCH_SIZE_IN_BYTES_CONFIG, 
INT, KafkaRaftClient.MAX_FETCH_SIZE_BYTES, null, LOW, 
INTERNAL_METADATA_MAX_FETCH_SIZE_IN_BYTES_DOC)
             .defineInternal(INTERNAL_METADATA_DELETE_DELAY_MILLIS_CONFIG, 
LONG, ServerLogConfigs.LOG_DELETE_DELAY_MS_DEFAULT, null, LOW, 
INTERNAL_METADATA_DELETE_DELAY_MILLIS_DOC);
 
     private final int logSegmentBytes;
@@ -107,7 +103,6 @@ public class MetadataLogConfig {
     private final long retentionMaxBytes;
     private final long retentionMillis;
     private final int internalMaxBatchSizeInBytes;
-    private final int internalMaxFetchSizeInBytes;
     private final long internalDeleteDelayMillis;
 
     public MetadataLogConfig(AbstractConfig config) {
@@ -117,7 +112,6 @@ public class MetadataLogConfig {
         this.retentionMaxBytes = 
config.getLong(METADATA_MAX_RETENTION_BYTES_CONFIG);
         this.retentionMillis = 
config.getLong(METADATA_MAX_RETENTION_MILLIS_CONFIG);
         this.internalMaxBatchSizeInBytes = 
config.getInt(INTERNAL_METADATA_MAX_BATCH_SIZE_IN_BYTES_CONFIG);
-        this.internalMaxFetchSizeInBytes = 
config.getInt(INTERNAL_METADATA_MAX_FETCH_SIZE_IN_BYTES_CONFIG);
         this.internalDeleteDelayMillis = 
config.getLong(INTERNAL_METADATA_DELETE_DELAY_MILLIS_CONFIG);
     }
 
@@ -145,10 +139,6 @@ public class MetadataLogConfig {
         return internalMaxBatchSizeInBytes;
     }
 
-    public int internalMaxFetchSizeInBytes() {
-        return internalMaxFetchSizeInBytes;
-    }
-
     public long internalDeleteDelayMillis() {
         return internalDeleteDelayMillis;
     }
diff --git a/raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java 
b/raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java
index 3712c1cc92d..3ecc1b2c190 100644
--- a/raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java
+++ b/raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java
@@ -108,6 +108,15 @@ public class QuorumConfig {
         "join the cluster metadata partition for its cluster id.";
     public static final boolean DEFAULT_QUORUM_AUTO_JOIN_ENABLE = false;
 
+    public static final String QUORUM_FETCH_SNAPSHOT_MAX_BYTES_CONFIG = 
QUORUM_PREFIX + "fetch.snapshot.max.bytes";
+    public static final String QUORUM_FETCH_SNAPSHOT_MAX_BYTES_DOC = "Maximum 
amount of data to retrieve for each FetchSnapshot request to the controller.";
+    public static final int DEFAULT_QUORUM_FETCH_SNAPSHOT_MAX_BYTES = 1048576;
+
+    public static final String QUORUM_FETCH_MAX_BYTES_CONFIG = QUORUM_PREFIX + 
"fetch.max.bytes";
+    public static final String QUORUM_FETCH_MAX_BYTES_DOC = "Maximum amount of 
data to retrieve for each Fetch request. " +
+            "Always returns at least one batch even if it is greater than 
controller.quorum.fetch.max.bytes.";
+    public static final int DEFAULT_QUORUM_FETCH_MAX_BYTES = 1048576;
+
     public static final ConfigDef CONFIG_DEF =  new ConfigDef()
             .define(QUORUM_VOTERS_CONFIG, LIST, DEFAULT_QUORUM_VOTERS, new 
ControllerQuorumVotersValidator(), HIGH, QUORUM_VOTERS_DOC)
             .define(QUORUM_BOOTSTRAP_SERVERS_CONFIG, LIST, 
DEFAULT_QUORUM_BOOTSTRAP_SERVERS, new 
ControllerQuorumBootstrapServersValidator(), HIGH, QUORUM_BOOTSTRAP_SERVERS_DOC)
@@ -117,7 +126,9 @@ public class QuorumConfig {
             .define(QUORUM_LINGER_MS_CONFIG, INT, DEFAULT_QUORUM_LINGER_MS, 
atLeast(0), MEDIUM, QUORUM_LINGER_MS_DOC)
             .define(QUORUM_REQUEST_TIMEOUT_MS_CONFIG, INT, 
DEFAULT_QUORUM_REQUEST_TIMEOUT_MS, atLeast(0), MEDIUM, 
QUORUM_REQUEST_TIMEOUT_MS_DOC)
             .define(QUORUM_RETRY_BACKOFF_MS_CONFIG, INT, 
DEFAULT_QUORUM_RETRY_BACKOFF_MS, atLeast(0), LOW, QUORUM_RETRY_BACKOFF_MS_DOC)
-            .define(QUORUM_AUTO_JOIN_ENABLE_CONFIG, BOOLEAN, 
DEFAULT_QUORUM_AUTO_JOIN_ENABLE, LOW, QUORUM_AUTO_JOIN_ENABLE_DOC);
+            .define(QUORUM_AUTO_JOIN_ENABLE_CONFIG, BOOLEAN, 
DEFAULT_QUORUM_AUTO_JOIN_ENABLE, LOW, QUORUM_AUTO_JOIN_ENABLE_DOC)
+            .define(QUORUM_FETCH_SNAPSHOT_MAX_BYTES_CONFIG, INT, 
DEFAULT_QUORUM_FETCH_SNAPSHOT_MAX_BYTES, atLeast(1), LOW, 
QUORUM_FETCH_SNAPSHOT_MAX_BYTES_DOC)
+            .define(QUORUM_FETCH_MAX_BYTES_CONFIG, INT, 
DEFAULT_QUORUM_FETCH_MAX_BYTES, atLeast(1), LOW, QUORUM_FETCH_MAX_BYTES_DOC);
 
     private final List<String> voters;
     private final List<String> bootstrapServers;
@@ -128,6 +139,8 @@ public class QuorumConfig {
     private final int fetchTimeoutMs;
     private final int appendLingerMs;
     private final boolean autoJoin;
+    private final int fetchSnapshotMaxBytes;
+    private final int fetchMaxBytes;
 
     public QuorumConfig(AbstractConfig abstractConfig) {
         this.voters = abstractConfig.getList(QUORUM_VOTERS_CONFIG);
@@ -139,6 +152,8 @@ public class QuorumConfig {
         this.fetchTimeoutMs = 
abstractConfig.getInt(QUORUM_FETCH_TIMEOUT_MS_CONFIG);
         this.appendLingerMs = abstractConfig.getInt(QUORUM_LINGER_MS_CONFIG);
         this.autoJoin = 
abstractConfig.getBoolean(QUORUM_AUTO_JOIN_ENABLE_CONFIG);
+        this.fetchSnapshotMaxBytes = 
abstractConfig.getInt(QUORUM_FETCH_SNAPSHOT_MAX_BYTES_CONFIG);
+        this.fetchMaxBytes = 
abstractConfig.getInt(QUORUM_FETCH_MAX_BYTES_CONFIG);
     }
 
     public List<String> voters() {
@@ -177,6 +192,14 @@ public class QuorumConfig {
         return autoJoin;
     }
 
+    public int fetchSnapshotMaxBytes() {
+        return fetchSnapshotMaxBytes;
+    }
+
+    public int fetchMaxBytes() {
+        return fetchMaxBytes;
+    }
+
     private static Integer parseVoterId(String idString) {
         try {
             return Integer.parseInt(idString);
diff --git a/raft/src/main/java/org/apache/kafka/raft/RaftLog.java 
b/raft/src/main/java/org/apache/kafka/raft/RaftLog.java
index 5bb60e7a175..8a287d31efb 100644
--- a/raft/src/main/java/org/apache/kafka/raft/RaftLog.java
+++ b/raft/src/main/java/org/apache/kafka/raft/RaftLog.java
@@ -57,9 +57,15 @@ public interface RaftLog extends AutoCloseable {
     LogAppendInfo appendAsFollower(Records records, int epoch);
 
     /**
-     * Read a set of records within a range of offsets.
+     * Read a set of records from startOffsetInclusive. Always returns at 
least one records batch if one exists.
+     *
+     * @param startOffsetInclusive Records later and including this offset are 
returned.
+     * @param isolation The fetch isolation, which controls the maximum offset 
we are allowed to read.
+     * @param maxTotalBatchBytes The maximum number of bytes to read if there 
are more than one record batch.
+     * @return Records and start offset information wrapped in a LogFetchInfo
      */
-    LogFetchInfo read(long startOffsetInclusive, Isolation isolation);
+    LogFetchInfo read(long startOffsetInclusive, Isolation isolation, int 
maxTotalBatchBytes);
+
 
     /**
      * Return the latest epoch. For an empty log, the latest epoch is defined
@@ -175,7 +181,7 @@ public interface RaftLog extends AutoCloseable {
 
     /**
      * Update the high watermark and associated metadata (which is used to 
avoid
-     * index lookups when handling reads with {@link #read(long, Isolation)} 
with
+     * index lookups when handling reads with {@link #read(long, Isolation, 
int)} with
      * the {@link Isolation#COMMITTED} isolation level).
      *
      * @param offsetMetadata The offset and optional metadata
diff --git 
a/raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java
 
b/raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java
index 6a33fdb2c2e..724d52ce4fd 100644
--- 
a/raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java
+++ 
b/raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java
@@ -233,7 +233,11 @@ public final class KRaftControlRecordStateMachine {
 
     private void maybeLoadLog() {
         while (log.endOffset().offset() > nextOffset) {
-            LogFetchInfo info = log.read(nextOffset, Isolation.UNCOMMITTED);
+            LogFetchInfo info = log.read(
+                nextOffset,
+                Isolation.UNCOMMITTED,
+                Integer.MAX_VALUE
+            );
             try (RecordsIterator<?> iterator = new RecordsIterator<>(
                     info.records,
                     serde,
diff --git 
a/raft/src/main/java/org/apache/kafka/raft/internals/KafkaRaftLog.java 
b/raft/src/main/java/org/apache/kafka/raft/internals/KafkaRaftLog.java
index b168dc66c46..95bc12b9643 100644
--- a/raft/src/main/java/org/apache/kafka/raft/internals/KafkaRaftLog.java
+++ b/raft/src/main/java/org/apache/kafka/raft/internals/KafkaRaftLog.java
@@ -117,14 +117,19 @@ public class KafkaRaftLog implements RaftLog {
     }
 
     @Override
-    public LogFetchInfo read(long startOffset, Isolation readIsolation) {
+    public LogFetchInfo read(long startOffset, Isolation readIsolation, int 
maxTotalBatchBytes) {
         FetchIsolation isolation = switch (readIsolation) {
             case COMMITTED -> FetchIsolation.HIGH_WATERMARK;
             case UNCOMMITTED -> FetchIsolation.LOG_END;
         };
 
         try {
-            FetchDataInfo fetchInfo = log.read(startOffset, 
config.internalMaxFetchSizeInBytes(), isolation, true);
+            FetchDataInfo fetchInfo = log.read(
+                startOffset,
+                maxTotalBatchBytes,
+                isolation,
+                true
+            );
             return new LogFetchInfo(
                     fetchInfo.records,
                     new LogOffsetMetadata(
@@ -358,7 +363,12 @@ public class KafkaRaftLog implements RaftLog {
           fetches from this offset, the returned batch will start at offset (X 
- M), and the
           follower will be unable to append it since (X - M) < (X).
          */
-        long baseOffset = read(snapshotId.offset(), 
Isolation.COMMITTED).startOffsetMetadata.offset();
+        long baseOffset = read(
+            snapshotId.offset(),
+            Isolation.COMMITTED,
+            1 // maxTotalBatchBytes - ensures that we only fetch one batch.
+        ).startOffsetMetadata.offset();
+
         if (snapshotId.offset() != baseOffset) {
             throw new IllegalArgumentException(
                     "Cannot create snapshot at offset (" + snapshotId.offset() 
+ ") because it is not batch aligned. " +
diff --git 
a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientFetchTest.java 
b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientFetchTest.java
index 775a1fb1bc5..74e4afd362a 100644
--- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientFetchTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientFetchTest.java
@@ -17,12 +17,15 @@
 package org.apache.kafka.raft;
 
 import org.apache.kafka.common.compress.Compression;
+import org.apache.kafka.common.message.FetchRequestData;
+import org.apache.kafka.common.message.FetchResponseData;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.record.internal.ArbitraryMemoryRecords;
 import org.apache.kafka.common.record.internal.InvalidMemoryRecordsProvider;
 import org.apache.kafka.common.record.internal.MemoryRecords;
 import org.apache.kafka.common.record.internal.SimpleRecord;
+import org.apache.kafka.common.requests.FetchResponse;
 import org.apache.kafka.server.common.KRaftVersion;
 
 import net.jqwik.api.AfterFailureMode;
@@ -42,6 +45,8 @@ import java.util.stream.IntStream;
 import java.util.stream.Stream;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public final class KafkaRaftClientFetchTest {
     @Property(tries = 100, afterFailure = AfterFailureMode.SAMPLE_ONLY)
@@ -91,6 +96,289 @@ public final class KafkaRaftClientFetchTest {
         assertEquals(oldLogEndOffset, context.log.endOffset().offset());
     }
 
+    @Test
+    void testSentFetchUsesQuorumMaxBytesConfiguration() throws Exception {
+        int epoch = 2;
+        int localId = KafkaRaftClientTest.randomReplicaId();
+        ReplicaKey local = KafkaRaftClientTest.replicaKey(localId, true);
+        ReplicaKey electedLeader = KafkaRaftClientTest.replicaKey(localId + 1, 
true);
+        int expectedFetchMaxBytes = 1024;
+
+        RaftClientTestContext context = new RaftClientTestContext.Builder(
+            local.id(),
+            local.directoryId().get()
+        )
+            .withStartingVoters(
+                VoterSetTest.voterSet(Stream.of(local, electedLeader)),
+                KRaftVersion.KRAFT_VERSION_1
+            )
+            .withElectedLeader(epoch, electedLeader.id())
+            // Explicitly change the configuration here.
+            .withFetchMaxBytes(expectedFetchMaxBytes)
+            .build();
+
+        context.pollUntilRequest();
+        RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
+        context.assertFetchRequestData(fetchRequest, epoch, 0L, 0, 
OptionalLong.empty());
+        FetchRequestData data = (FetchRequestData) fetchRequest.data();
+        assertEquals(expectedFetchMaxBytes, data.maxBytes());
+    }
+
+    @Test
+    public void testFetchMaxBytesAlwaysReturnsAllBatchesForLargeMax() throws 
Exception {
+        var epoch = 2;
+        var id = KafkaRaftClientTest.randomReplicaId();
+        var localKey = KafkaRaftClientTest.replicaKey(id, true);
+        var remoteKey = KafkaRaftClientTest.replicaKey(id + 1, true);
+        // Here we are effectively saying that there is no limit to the amount 
of records to return.
+        var remoteMaxSizeBytes = Integer.MAX_VALUE;
+        var localMaxSizeBytes = 1;
+
+        RaftClientTestContext context = new RaftClientTestContext.Builder(
+            localKey.id(),
+            localKey.directoryId().get()
+        )
+            .appendToLog(epoch, List.of("a", "a", "a"))
+            .appendToLog(epoch, List.of("b", "b", "b"))
+            .withStartingVoters(
+                VoterSetTest.voterSet(Stream.of(localKey, remoteKey)),
+                KRaftVersion.KRAFT_VERSION_1
+            )
+            .withUnknownLeader(epoch)
+            .withFetchMaxBytes(localMaxSizeBytes)
+            .build();
+
+        context.unattachedToLeader();
+        epoch = context.currentEpoch();
+
+        // Send a fetch request with max bytes that are different from the 
configured value.
+        FetchRequestData request = context.fetchRequest(epoch, remoteKey, 1L, 
epoch, 500);
+        request.setMaxBytes(remoteMaxSizeBytes);
+        context.deliverRequest(request);
+
+        context.pollUntilResponse();
+        FetchResponseData.PartitionData partitionData = 
context.assertSentFetchPartitionResponse();
+        assertEquals(Errors.NONE.code(), partitionData.errorCode());
+        MemoryRecords records = (MemoryRecords) 
FetchResponse.recordsOrFail(partitionData);
+        var iterator = records.batchIterator();
+        int offsetCount = 0;
+        int batchCount = 0;
+        while (iterator.hasNext()) {
+            var batch = iterator.next();
+            var recordsIterator = batch.iterator();
+            while (recordsIterator.hasNext()) {
+                var record = recordsIterator.next();
+                assertEquals(offsetCount, record.offset());
+                offsetCount++;
+            }
+            batchCount++;
+        }
+        assertEquals(2, batchCount);
+        assertEquals(6, offsetCount);
+    }
+
+    @Test
+    public void testFetchMaxBytesAlwaysReturnsAtLeastOneBatch() throws 
Exception {
+        var epoch = 2;
+        var id = KafkaRaftClientTest.randomReplicaId();
+        var localKey = KafkaRaftClientTest.replicaKey(id, true);
+        var remoteKey = KafkaRaftClientTest.replicaKey(id + 1, true);
+        // There are two batches with 3 records each. The first batch is 
always at least larger than 1 byte.
+        // If remoteMaxSizeBytes = 1 then the MockLog will return exactly 1 
batch.
+        // If localMaxSizeBytes is used then MockLog will return two batches
+        var remoteMaxSizeBytes = 1;
+        var localMaxSizeBytes = 1024;
+
+        RaftClientTestContext context = new RaftClientTestContext.Builder(
+            localKey.id(),
+            localKey.directoryId().get()
+        )
+            .appendToLog(epoch, List.of("a", "a", "a"))
+            .appendToLog(epoch, List.of("b", "b", "b"))
+            .withStartingVoters(
+                VoterSetTest.voterSet(Stream.of(localKey, remoteKey)),
+                KRaftVersion.KRAFT_VERSION_1
+            )
+            .withUnknownLeader(epoch)
+            .withFetchMaxBytes(localMaxSizeBytes)
+            .build();
+
+        context.unattachedToLeader();
+        epoch = context.currentEpoch();
+
+        // Send a fetch request with max bytes that are different from the 
configured value.
+        FetchRequestData request = context.fetchRequest(epoch, remoteKey, 1L, 
epoch, 500);
+        request.setMaxBytes(remoteMaxSizeBytes);
+        context.deliverRequest(request);
+
+        context.pollUntilResponse();
+        FetchResponseData.PartitionData partitionData = 
context.assertSentFetchPartitionResponse();
+        assertEquals(Errors.NONE.code(), partitionData.errorCode());
+        MemoryRecords records = (MemoryRecords) 
FetchResponse.recordsOrFail(partitionData);
+        assertTrue(
+            records.sizeInBytes() > 1,
+            String.format(
+                "Expected records.sizeInBytes() (%d) > 1 since we always 
return at least one batch",
+                records.sizeInBytes()
+            )
+        );
+        var iterator = records.batchIterator();
+        var firstBatch = iterator.next();
+        assertEquals(0, firstBatch.baseOffset());
+        assertEquals(3, firstBatch.nextOffset());
+        assertFalse(
+            iterator.hasNext(),
+            String.format("Expected only a single batch to be fetched for 
maxSize = %d", remoteMaxSizeBytes)
+        );
+    }
+
+    @Test
+    public void testFetchMaxBytesWithTwoBatches() throws Exception {
+        var epoch = 2;
+        var id = KafkaRaftClientTest.randomReplicaId();
+        var localKey = KafkaRaftClientTest.replicaKey(id, true);
+        var remoteKey = KafkaRaftClientTest.replicaKey(id + 1, true);
+        var batchSizeBytes = 115;
+        var remoteMaxSizeBytes = batchSizeBytes * 2;
+        var localMaxSizeBytes = 1024;
+
+        RaftClientTestContext context = new RaftClientTestContext.Builder(
+            localKey.id(),
+            localKey.directoryId().get()
+        )
+            .appendToLog(epoch, List.of("a", "a", "a"))
+            .appendToLog(epoch, List.of("b", "b", "b"))
+            .appendToLog(epoch, List.of("c", "c", "c"))
+            .withStartingVoters(
+                VoterSetTest.voterSet(Stream.of(localKey, remoteKey)),
+                KRaftVersion.KRAFT_VERSION_1
+            )
+            .withUnknownLeader(epoch)
+            .withFetchMaxBytes(localMaxSizeBytes)
+            .build();
+
+        context.unattachedToLeader();
+        epoch = context.currentEpoch();
+
+        // Send a fetch request with max bytes that are different from the 
configured value.
+        FetchRequestData request = context.fetchRequest(epoch, remoteKey, 1L, 
epoch, 500);
+        request.setMaxBytes(remoteMaxSizeBytes);
+        context.deliverRequest(request);
+
+        context.pollUntilResponse();
+        FetchResponseData.PartitionData partitionData = 
context.assertSentFetchPartitionResponse();
+        assertEquals(Errors.NONE.code(), partitionData.errorCode());
+        MemoryRecords records = (MemoryRecords) 
FetchResponse.recordsOrFail(partitionData);
+        // There is less data than remoteMaxSizeBytes so we expect the size of 
records.sizeInBytes to be less than
+        // remoteMaxSizeBytes.
+        assertTrue(
+            records.sizeInBytes() < remoteMaxSizeBytes,
+            String.format(
+                "Expected records size (%d) < remoteMaxBatchSizeBytes (%d)",
+                records.sizeInBytes(),
+                remoteMaxSizeBytes
+            )
+        );
+        var iterator = records.batchIterator();
+        var firstBatch = iterator.next();
+        // First batch should be less than the batchSizeBytes.
+        assertTrue(
+            firstBatch.sizeInBytes() < batchSizeBytes,
+            String.format(
+                "Expected secondBatch.sizeInBytes() (%d) < batchSizeBytes 
(%d)",
+                firstBatch.sizeInBytes(),
+                remoteMaxSizeBytes
+            )
+        );
+        assertTrue(iterator.hasNext(), "Expected more than one batch to be 
fetched");
+        var secondBatch = iterator.next();
+        assertTrue(
+            secondBatch.sizeInBytes() < batchSizeBytes,
+            String.format(
+                "Expected secondBatch.sizeInBytes() (%d) < batchSizeBytes 
(%d)",
+                secondBatch.sizeInBytes(),
+                remoteMaxSizeBytes
+            )
+        );
+        assertFalse(iterator.hasNext(), "Expected two batches to be fetched");
+    }
+
+    @Test
+    public void testFetchMaxBytesBatchesInvariants() throws Exception {
+        var epoch = 2;
+        var id = KafkaRaftClientTest.randomReplicaId();
+        var localKey = KafkaRaftClientTest.replicaKey(id, true);
+        var remoteKey = KafkaRaftClientTest.replicaKey(id + 1, true);
+        var localMaxSizeBytes = 1024;
+
+        RaftClientTestContext context = new RaftClientTestContext.Builder(
+            localKey.id(),
+            localKey.directoryId().get()
+        )
+            .appendToLog(epoch, List.of("a", "a", "a"))
+            .appendToLog(epoch, List.of("b", "b", "b"))
+            .withStartingVoters(
+                VoterSetTest.voterSet(Stream.of(localKey, remoteKey)),
+                KRaftVersion.KRAFT_VERSION_1
+            )
+            .withUnknownLeader(epoch)
+            .withFetchMaxBytes(localMaxSizeBytes)
+            .build();
+
+        context.unattachedToLeader();
+        epoch = context.currentEpoch();
+
+        // Send initial request to get all available data.
+        FetchRequestData allRecordsRequest =
+            context.fetchRequest(epoch, remoteKey, 1L, epoch, 500);
+        allRecordsRequest.setMaxBytes(Integer.MAX_VALUE);
+        context.deliverRequest(allRecordsRequest);
+        context.pollUntilResponse();
+        MemoryRecords allRecords =
+            (MemoryRecords) 
FetchResponse.recordsOrFail(context.assertSentFetchPartitionResponse());
+
+        // Send another request to retrieve all data by setting exactSizeBytes
+        FetchRequestData exactSizeBytesRequest =
+            context.fetchRequest(epoch, remoteKey, 1L, epoch, 500);
+        exactSizeBytesRequest.setMaxBytes(allRecords.sizeInBytes());
+        context.deliverRequest(exactSizeBytesRequest);
+        context.pollUntilResponse();
+
+        // All the records should be returned
+        FetchResponseData.PartitionData exactSizeBytesResponseData = 
context.assertSentFetchPartitionResponse();
+        assertEquals(Errors.NONE.code(), 
exactSizeBytesResponseData.errorCode());
+        MemoryRecords exactSizeBytesRecords = (MemoryRecords) 
FetchResponse.recordsOrFail(exactSizeBytesResponseData);
+
+        // We expect to return the same number of total bytes in both requests.
+        assertEquals(allRecords.sizeInBytes(), 
exactSizeBytesRecords.sizeInBytes());
+
+        var allIterator = allRecords.records().iterator();
+        var exactIterator = exactSizeBytesRecords.records().iterator();
+        while (exactIterator.hasNext() && allIterator.hasNext()) {
+            var r1 = exactIterator.next();
+            var r2 = allIterator.next();
+            assertEquals(r1, r2);
+        }
+        assertFalse(exactIterator.hasNext());
+        assertFalse(allIterator.hasNext());
+
+        // Send fetch request with sizeInBytes-1. It will appear here that we 
have only 1 batch
+        // since the other batch is not "complete" (it's missing one byte) and 
hence not "iterable".
+        FetchRequestData oneBatchRequest =
+            context.fetchRequest(epoch, remoteKey, 1L, epoch, 500);
+        oneBatchRequest.setMaxBytes(exactSizeBytesRecords.sizeInBytes() - 1);
+        context.deliverRequest(oneBatchRequest);
+        context.pollUntilResponse();
+        MemoryRecords oneBatchRecords =
+            (MemoryRecords) 
FetchResponse.recordsOrFail(context.assertSentFetchPartitionResponse());
+        assertTrue(oneBatchRecords.sizeInBytes() < 
exactSizeBytesRecords.sizeInBytes());
+        var oneBatchBatches = oneBatchRecords.batchIterator();
+        var firstBatch = oneBatchBatches.next();
+        assertTrue(firstBatch.sizeInBytes() < oneBatchRecords.sizeInBytes());
+        assertEquals(exactSizeBytesRecords.sizeInBytes() - 1, 
oneBatchRecords.sizeInBytes());
+        assertFalse(oneBatchBatches.hasNext(), "Expected 1 batch to be 
fetched");
+    }
+
     @Test
     void testReplicationOfHigherPartitionLeaderEpoch() throws Exception {
         int epoch = 2;
@@ -103,7 +391,8 @@ public final class KafkaRaftClientFetchTest {
             local.directoryId().get()
         )
             .withStartingVoters(
-                VoterSetTest.voterSet(Stream.of(local, electedLeader)), 
KRaftVersion.KRAFT_VERSION_1
+                VoterSetTest.voterSet(Stream.of(local, electedLeader)),
+                KRaftVersion.KRAFT_VERSION_1
             )
             .withElectedLeader(epoch, electedLeader.id())
             
.withRaftProtocol(RaftClientTestContext.RaftProtocol.KIP_996_PROTOCOL)
diff --git 
a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java 
b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java
index e97eb2dcc26..a63c1f40243 100644
--- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java
@@ -132,7 +132,11 @@ public class KafkaRaftClientReconfigTest {
         context.unattachedToLeader();
 
         // check if leader writes 3 bootstrap records to the log
-        Records records = context.log.read(0, Isolation.UNCOMMITTED).records;
+        Records records = context.log.read(
+            0,
+            Isolation.UNCOMMITTED,
+            Integer.MAX_VALUE
+        ).records;
         RecordBatch batch = records.batches().iterator().next();
         assertTrue(batch.isControlBatch());
         Iterator<Record> recordIterator = batch.iterator();
@@ -195,7 +199,11 @@ public class KafkaRaftClientReconfigTest {
         // check leader does not write bootstrap records to log
         context.unattachedToLeader();
 
-        Records records = context.log.read(0, Isolation.UNCOMMITTED).records;
+        Records records = context.log.read(
+            0,
+            Isolation.UNCOMMITTED,
+            Integer.MAX_VALUE
+        ).records;
         RecordBatch batch = records.batches().iterator().next();
         assertTrue(batch.isControlBatch());
         Iterator<Record> recordIterator = batch.iterator();
@@ -2451,7 +2459,11 @@ public class KafkaRaftClientReconfigTest {
         // check if leader writes 2 control records to the log;
         // one for the kraft version and one for the voter set
         var updatedVoters = VoterSetTest.voterSet(Stream.of(local, voter1, 
voter2));
-        var records = context.log.read(localLogEndOffset, 
Isolation.UNCOMMITTED).records;
+        var records = context.log.read(
+            localLogEndOffset,
+            Isolation.UNCOMMITTED,
+            Integer.MAX_VALUE
+        ).records;
         var batch = records.batches().iterator().next();
         assertTrue(batch.isControlBatch());
         var recordsIterator = batch.iterator();
@@ -2559,7 +2571,11 @@ public class KafkaRaftClientReconfigTest {
         context.client.poll();
 
         // check that the leader wrote voters control record to the log;
-        var records = context.log.read(localLogEndOffset, 
Isolation.UNCOMMITTED).records;
+        var records = context.log.read(
+            localLogEndOffset,
+            Isolation.UNCOMMITTED,
+            Integer.MAX_VALUE
+        ).records;
         var batch = records.batches().iterator().next();
         assertTrue(batch.isControlBatch());
         var recordsIterator = batch.iterator();
diff --git 
a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java 
b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java
index b159c08d2f2..bfdbd14c18a 100644
--- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java
@@ -1026,6 +1026,87 @@ public final class KafkaRaftClientSnapshotTest {
         assertEquals(leaderId, response.currentLeader().leaderId());
     }
 
+    @Test
+    public void testFetchSnapshotRequestWithPartialData() throws Exception {
+        int localId = randomReplicaId();
+        Set<Integer> voters = Set.of(localId, localId + 1);
+        OffsetAndEpoch snapshotId = new OffsetAndEpoch(1, 1);
+        List<String> records = List.of("foo", "bar");
+
+        RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
+            .appendToLog(snapshotId.epoch(), List.of("a"))
+            .withKip853Rpc(true)
+            .build();
+
+        context.unattachedToLeader();
+        int epoch = context.currentEpoch();
+
+        context.advanceLocalLeaderHighWatermarkToLogEndOffset();
+
+        try (SnapshotWriter<String> snapshot = 
context.client.createSnapshot(snapshotId, 0).get()) {
+            assertEquals(snapshotId, snapshot.snapshotId());
+            snapshot.append(records);
+            snapshot.freeze();
+        }
+
+        // Test that we will respond with at least 2 equally sized read of the 
snapshot.
+        RawSnapshotReader snapshot = 
context.log.readSnapshot(snapshotId).get();
+        int snapshotSizeBytes = Math.toIntExact(snapshot.sizeInBytes());
+        int expectedNumberOfReads = 2;
+        // Find expectedNumberOfReads where we always have a remainder.
+        // This ensures that we will have expectedNumberOfRead which return 
fetchSnapshotMaxBytes, followed by one
+        // request with expectedFinalRequestSize that returns the remaining 
data.
+        while (snapshotSizeBytes % expectedNumberOfReads == 0) 
expectedNumberOfReads++;
+        int fetchSnapshotMaxBytes = snapshotSizeBytes / expectedNumberOfReads;
+        int expectedFinalRequestSize = snapshotSizeBytes % 
expectedNumberOfReads;
+        int totalBytesRead = 0;
+        int position = 0;
+        for (int i = 0; i < expectedNumberOfReads; i++) {
+            context.deliverRequest(
+                fetchSnapshotRequest(
+                    context.metadataPartition,
+                    epoch,
+                    snapshotId,
+                    fetchSnapshotMaxBytes,
+                    position
+                )
+            );
+            context.client.poll();
+
+            FetchSnapshotResponseData.PartitionSnapshot response =
+                
context.assertSentFetchSnapshotResponse(context.metadataPartition).get();
+            assertEquals(epoch, response.currentLeader().leaderEpoch());
+            assertEquals(localId, response.currentLeader().leaderId());
+            int actualSizeBytes = response.unalignedRecords().sizeInBytes();
+            assertEquals(fetchSnapshotMaxBytes, actualSizeBytes);
+
+            totalBytesRead += actualSizeBytes;
+            position += fetchSnapshotMaxBytes;
+        }
+        assertEquals(fetchSnapshotMaxBytes * expectedNumberOfReads, 
totalBytesRead);
+
+        // Fetch the remaining snapshot bytes.
+        assertTrue(
+            totalBytesRead < snapshotSizeBytes,
+            String.format("Expected totalBytesRead (%d) < snapshotSizeBytes 
(%d)", totalBytesRead, snapshotSizeBytes)
+        );
+        context.deliverRequest(
+            fetchSnapshotRequest(
+                context.metadataPartition,
+                epoch,
+                snapshotId,
+                fetchSnapshotMaxBytes,
+                position
+            )
+        );
+        context.client.poll();
+        FetchSnapshotResponseData.PartitionSnapshot response =
+            
context.assertSentFetchSnapshotResponse(context.metadataPartition).get();
+        assertEquals(epoch, response.currentLeader().leaderEpoch());
+        assertEquals(localId, response.currentLeader().leaderId());
+        assertEquals(expectedFinalRequestSize, 
response.unalignedRecords().sizeInBytes());
+    }
+
     @ParameterizedTest
     @ValueSource(booleans = { false, true })
     public void testFetchSnapshotRequestWithInvalidPosition(boolean 
withKip853Rpc) throws Exception {
@@ -1223,10 +1304,12 @@ public final class KafkaRaftClientSnapshotTest {
         Set<Integer> voters = Set.of(localId, leaderId);
         int epoch = 2;
         OffsetAndEpoch snapshotId = new OffsetAndEpoch(100L, 1);
+        int expectedFetchMaxSnapshotBytes = 1024;
 
         RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
             .withElectedLeader(epoch, leaderId)
             .withKip853Rpc(withKip853Rpc)
+            .withFetchSnapshotMaxBytes(expectedFetchMaxSnapshotBytes)
             .build();
 
         context.pollUntilRequest();
@@ -1245,7 +1328,7 @@ public final class KafkaRaftClientSnapshotTest {
             snapshotRequest,
             context.metadataPartition,
             localId,
-            KafkaRaftClient.MAX_FETCH_SIZE_BYTES
+            expectedFetchMaxSnapshotBytes
         ).get();
         assertEquals(snapshotId.offset(), request.snapshotId().endOffset());
         assertEquals(snapshotId.epoch(), request.snapshotId().epoch());
@@ -1302,10 +1385,12 @@ public final class KafkaRaftClientSnapshotTest {
         Set<Integer> voters = Set.of(localId, leaderId);
         int epoch = 2;
         OffsetAndEpoch snapshotId = new OffsetAndEpoch(100L, 1);
+        int expectedFetchMaxSnapshotBytes = 6;
 
         RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
             .withElectedLeader(epoch, leaderId)
             .withKip853Rpc(withKip853Rpc)
+            .withFetchSnapshotMaxBytes(expectedFetchMaxSnapshotBytes)
             .build();
 
         context.pollUntilRequest();
@@ -1324,7 +1409,7 @@ public final class KafkaRaftClientSnapshotTest {
             snapshotRequest,
             context.metadataPartition,
             localId,
-            KafkaRaftClient.MAX_FETCH_SIZE_BYTES
+            expectedFetchMaxSnapshotBytes
         ).get();
         assertEquals(snapshotId.offset(), request.snapshotId().endOffset());
         assertEquals(snapshotId.epoch(), request.snapshotId().epoch());
@@ -1360,7 +1445,7 @@ public final class KafkaRaftClientSnapshotTest {
             snapshotRequest,
             context.metadataPartition,
             localId,
-            KafkaRaftClient.MAX_FETCH_SIZE_BYTES
+            expectedFetchMaxSnapshotBytes
         ).get();
         assertEquals(snapshotId.offset(), request.snapshotId().endOffset());
         assertEquals(snapshotId.epoch(), request.snapshotId().epoch());
@@ -1435,7 +1520,7 @@ public final class KafkaRaftClientSnapshotTest {
             snapshotRequest,
             context.metadataPartition,
             localId,
-            KafkaRaftClient.MAX_FETCH_SIZE_BYTES
+            QuorumConfig.DEFAULT_QUORUM_FETCH_SNAPSHOT_MAX_BYTES
         ).get();
         assertEquals(snapshotId.offset(), request.snapshotId().endOffset());
         assertEquals(snapshotId.epoch(), request.snapshotId().epoch());
@@ -1497,7 +1582,7 @@ public final class KafkaRaftClientSnapshotTest {
             snapshotRequest,
             context.metadataPartition,
             localId,
-            KafkaRaftClient.MAX_FETCH_SIZE_BYTES
+            QuorumConfig.DEFAULT_QUORUM_FETCH_SNAPSHOT_MAX_BYTES
         ).get();
         assertEquals(snapshotId.offset(), request.snapshotId().endOffset());
         assertEquals(snapshotId.epoch(), request.snapshotId().epoch());
@@ -1558,7 +1643,7 @@ public final class KafkaRaftClientSnapshotTest {
             snapshotRequest,
             context.metadataPartition,
             localId,
-            KafkaRaftClient.MAX_FETCH_SIZE_BYTES
+            QuorumConfig.DEFAULT_QUORUM_FETCH_SNAPSHOT_MAX_BYTES
         ).get();
         assertEquals(snapshotId.offset(), request.snapshotId().endOffset());
         assertEquals(snapshotId.epoch(), request.snapshotId().epoch());
@@ -1619,7 +1704,7 @@ public final class KafkaRaftClientSnapshotTest {
             snapshotRequest,
             context.metadataPartition,
             localId,
-            KafkaRaftClient.MAX_FETCH_SIZE_BYTES
+            QuorumConfig.DEFAULT_QUORUM_FETCH_SNAPSHOT_MAX_BYTES
         ).get();
         assertEquals(snapshotId.offset(), request.snapshotId().endOffset());
         assertEquals(snapshotId.epoch(), request.snapshotId().epoch());
@@ -1653,7 +1738,7 @@ public final class KafkaRaftClientSnapshotTest {
             snapshotRequest,
             context.metadataPartition,
             localId,
-            KafkaRaftClient.MAX_FETCH_SIZE_BYTES
+            QuorumConfig.DEFAULT_QUORUM_FETCH_SNAPSHOT_MAX_BYTES
         ).get();
         assertEquals(snapshotId.offset(), request.snapshotId().endOffset());
         assertEquals(snapshotId.epoch(), request.snapshotId().epoch());
@@ -1690,7 +1775,7 @@ public final class KafkaRaftClientSnapshotTest {
             snapshotRequest,
             context.metadataPartition,
             localId,
-            KafkaRaftClient.MAX_FETCH_SIZE_BYTES
+            QuorumConfig.DEFAULT_QUORUM_FETCH_SNAPSHOT_MAX_BYTES
         ).get();
         assertEquals(snapshotId.offset(), request.snapshotId().endOffset());
         assertEquals(snapshotId.epoch(), request.snapshotId().epoch());
@@ -1739,7 +1824,7 @@ public final class KafkaRaftClientSnapshotTest {
             snapshotRequest,
             context.metadataPartition,
             localId,
-            KafkaRaftClient.MAX_FETCH_SIZE_BYTES
+            QuorumConfig.DEFAULT_QUORUM_FETCH_SNAPSHOT_MAX_BYTES
         ).get();
         assertEquals(snapshotId.offset(), request.snapshotId().endOffset());
         assertEquals(snapshotId.epoch(), request.snapshotId().epoch());
@@ -1807,7 +1892,7 @@ public final class KafkaRaftClientSnapshotTest {
             snapshotRequest,
             context.metadataPartition,
             localId,
-            KafkaRaftClient.MAX_FETCH_SIZE_BYTES
+            QuorumConfig.DEFAULT_QUORUM_FETCH_SNAPSHOT_MAX_BYTES
         ).get();
         assertEquals(snapshotId.offset(), request.snapshotId().endOffset());
         assertEquals(snapshotId.epoch(), request.snapshotId().epoch());
diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java 
b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
index c4a6840ff84..f44f09581cc 100644
--- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
@@ -958,7 +958,10 @@ class KafkaRaftClientTest {
         context.client.poll();
         context.assertSentBeginQuorumEpochRequest(1, Set.of(otherNodeId));
 
-        Records records = context.log.read(0, Isolation.UNCOMMITTED).records;
+        Records records = context.log.read(0,
+            Isolation.UNCOMMITTED,
+            Integer.MAX_VALUE
+        ).records;
         RecordBatch batch = records.batches().iterator().next();
         assertTrue(batch.isControlBatch());
 
@@ -1007,7 +1010,11 @@ class KafkaRaftClientTest {
         context.client.poll();
         context.assertSentBeginQuorumEpochRequest(2, Set.of(firstNodeId, 
secondNodeId));
 
-        Records records = context.log.read(0, Isolation.UNCOMMITTED).records;
+        Records records = context.log.read(
+            0,
+            Isolation.UNCOMMITTED,
+            Integer.MAX_VALUE
+        ).records;
         RecordBatch batch = records.batches().iterator().next();
         assertTrue(batch.isControlBatch());
 
diff --git a/raft/src/test/java/org/apache/kafka/raft/MockLog.java 
b/raft/src/test/java/org/apache/kafka/raft/MockLog.java
index 228f92a9bd5..abadb1c8a30 100644
--- a/raft/src/test/java/org/apache/kafka/raft/MockLog.java
+++ b/raft/src/test/java/org/apache/kafka/raft/MockLog.java
@@ -419,7 +419,7 @@ public class MockLog implements RaftLog {
     }
 
     @Override
-    public LogFetchInfo read(long startOffset, Isolation isolation) {
+    public LogFetchInfo read(long startOffset, Isolation isolation, int 
maxTotalBatchBytes) {
         verifyOffsetInRange(startOffset);
 
         long maxOffset = isolation == Isolation.COMMITTED ? 
highWatermark.offset() : endOffset().offset();
@@ -444,7 +444,13 @@ public class MockLog implements RaftLog {
             // complete batches, so batches which end at an offset larger than 
the max offset are
             // filtered, which is effectively the same as having the consumer 
drop an incomplete
             // batch returned in a fetch response.
-            if (batch.lastOffset() >= startOffset && batch.lastOffset() < 
maxOffset && !batch.entries.isEmpty()) {
+            // Since we operate in batches, it is possible for byte size of 
returned batches to exceed
+            // maxTotalBatchBytes. To keep to that invariant, we exit the loop 
as soon as the buffer contains
+            // more bytes than maxTotalBatchBytes.
+            if (batch.lastOffset() >= startOffset
+                && batch.lastOffset() < maxOffset
+                && !batch.entries.isEmpty()
+                && buffer.position() < maxTotalBatchBytes) {
                 buffer = batch.writeTo(buffer);
 
                 if (batchStartOffset == null) {
@@ -462,6 +468,9 @@ public class MockLog implements RaftLog {
 
         buffer.flip();
         Records records = MemoryRecords.readableRecords(buffer);
+        if (batchCount > 1) {
+            records = records.slice(0, maxTotalBatchBytes);
+        }
 
         if (batchStartOffset == null) {
             throw new RuntimeException("Expected to find at least one entry 
starting from offset " +
@@ -513,7 +522,11 @@ public class MockLog implements RaftLog {
             );
         }
 
-        long baseOffset = read(snapshotId.offset(), 
Isolation.COMMITTED).startOffsetMetadata.offset();
+        long baseOffset = read(
+            snapshotId.offset(),
+            Isolation.COMMITTED,
+            1 // Only needs to read the first batch.
+        ).startOffsetMetadata.offset();
         if (snapshotId.offset() != baseOffset) {
             throw new IllegalArgumentException(
                 String.format(
diff --git a/raft/src/test/java/org/apache/kafka/raft/MockLogTest.java 
b/raft/src/test/java/org/apache/kafka/raft/MockLogTest.java
index 8a5c4bf6225..8fc6f0c769b 100644
--- a/raft/src/test/java/org/apache/kafka/raft/MockLogTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/MockLogTest.java
@@ -46,8 +46,11 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.function.Executable;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ArgumentsSource;
+import org.junit.jupiter.params.provider.ValueSource;
 
+import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
@@ -214,7 +217,11 @@ public class MockLogTest {
         assertEquals(1, log.endOffset().offset());
         assertEquals(currentEpoch, log.lastFetchedEpoch());
 
-        Records records = log.read(0, Isolation.UNCOMMITTED).records;
+        Records records = log.read(
+            0,
+            Isolation.UNCOMMITTED,
+            Integer.MAX_VALUE
+        ).records;
         for (RecordBatch batch : records.batches()) {
             assertTrue(batch.isControlBatch());
         }
@@ -249,7 +256,11 @@ public class MockLogTest {
         assertEquals(initialOffset + 1, log.endOffset().offset());
         assertEquals(3, log.lastFetchedEpoch());
 
-        Records records = log.read(5L, Isolation.UNCOMMITTED).records;
+        Records records = log.read(
+            5L,
+            Isolation.UNCOMMITTED,
+            Integer.MAX_VALUE
+        ).records;
         List<ByteBuffer> extractRecords = new ArrayList<>();
         for (Record record : records.records()) {
             extractRecords.add(record.value());
@@ -275,7 +286,11 @@ public class MockLogTest {
 
         appendAsLeader(List.of(recordOne, recordTwo), epoch);
 
-        Records records = log.read(0, Isolation.UNCOMMITTED).records;
+        Records records = log.read(
+            0,
+            Isolation.UNCOMMITTED,
+            Integer.MAX_VALUE
+        ).records;
 
         List<ByteBuffer> extractRecords = new ArrayList<>();
         for (Record record : records.records()) {
@@ -297,11 +312,23 @@ public class MockLogTest {
         assertEquals(Optional.of(new OffsetRange(30L, 59L)), readOffsets(30L, 
Isolation.UNCOMMITTED));
         assertEquals(Optional.of(new OffsetRange(30L, 59L)), readOffsets(33L, 
Isolation.UNCOMMITTED));
         assertEquals(Optional.empty(), readOffsets(60L, 
Isolation.UNCOMMITTED));
-        assertThrows(OffsetOutOfRangeException.class, () -> log.read(61L, 
Isolation.UNCOMMITTED));
+        assertThrows(OffsetOutOfRangeException.class,
+            () -> log.read(
+                61L,
+                Isolation.UNCOMMITTED,
+                Integer.MAX_VALUE
+            )
+        );
 
         // Verify range after truncation
         log.truncateTo(20L);
-        assertThrows(OffsetOutOfRangeException.class, () -> log.read(21L, 
Isolation.UNCOMMITTED));
+        assertThrows(OffsetOutOfRangeException.class,
+            () -> log.read(
+                21L,
+                Isolation.UNCOMMITTED,
+                Integer.MAX_VALUE
+            )
+        );
     }
 
     @Test
@@ -336,7 +363,13 @@ public class MockLogTest {
         assertEquals(Optional.of(new OffsetRange(30, 59L)), readOffsets(30L, 
Isolation.COMMITTED));
         assertEquals(Optional.of(new OffsetRange(30L, 59L)), readOffsets(50L, 
Isolation.COMMITTED));
         assertEquals(Optional.empty(), readOffsets(60L, Isolation.COMMITTED));
-        assertThrows(OffsetOutOfRangeException.class, () -> log.read(61L, 
Isolation.COMMITTED));
+        assertThrows(OffsetOutOfRangeException.class,
+            () -> log.read(
+                61L,
+                Isolation.COMMITTED,
+                Integer.MAX_VALUE
+            )
+        );
     }
 
     @Test
@@ -345,7 +378,11 @@ public class MockLogTest {
         appendBatch(5, 1);
         appendBatch(5, 1);
 
-        LogFetchInfo readInfo = log.read(5, Isolation.UNCOMMITTED);
+        LogFetchInfo readInfo = log.read(
+            5,
+            Isolation.UNCOMMITTED,
+            Integer.MAX_VALUE
+        );
         assertEquals(5L, readInfo.startOffsetMetadata.offset());
         assertTrue(readInfo.startOffsetMetadata.metadata().isPresent());
 
@@ -359,7 +396,11 @@ public class MockLogTest {
                 Optional.of(new MockLog.MockOffsetMetadata(98230980L)))));
 
         // Ensure we can update the high watermark to the end offset
-        LogFetchInfo readFromEndInfo = log.read(15L, Isolation.UNCOMMITTED);
+        LogFetchInfo readFromEndInfo = log.read(
+            15L,
+            Isolation.UNCOMMITTED,
+            Integer.MAX_VALUE
+        );
         assertEquals(15, readFromEndInfo.startOffsetMetadata.offset());
         assertTrue(readFromEndInfo.startOffsetMetadata.metadata().isPresent());
         log.updateHighWatermark(readFromEndInfo.startOffsetMetadata);
@@ -369,7 +410,11 @@ public class MockLogTest {
         log.updateHighWatermark(readFromEndInfo.startOffsetMetadata);
 
         // Check handling of a fetch from the middle of a batch
-        LogFetchInfo readFromMiddleInfo = log.read(16L, Isolation.UNCOMMITTED);
+        LogFetchInfo readFromMiddleInfo = log.read(
+            16L,
+            Isolation.UNCOMMITTED,
+            Integer.MAX_VALUE
+        );
         assertEquals(readFromEndInfo.startOffsetMetadata, 
readFromMiddleInfo.startOffsetMetadata);
     }
 
@@ -484,11 +529,19 @@ public class MockLogTest {
 
         assertThrows(
             OffsetOutOfRangeException.class,
-            () -> log.read(log.startOffset() - 1, Isolation.UNCOMMITTED)
+            () -> log.read(
+                log.startOffset() - 1,
+                Isolation.UNCOMMITTED,
+                Integer.MAX_VALUE
+            )
         );
         assertThrows(
             OffsetOutOfRangeException.class,
-            () -> log.read(log.endOffset().offset() + 1, Isolation.UNCOMMITTED)
+            () -> log.read(
+                log.endOffset().offset() + 1,
+                Isolation.UNCOMMITTED,
+                Integer.MAX_VALUE
+            )
         );
     }
 
@@ -991,6 +1044,126 @@ public class MockLogTest {
         assertEquals(ValidOffsetAndEpoch.Kind.VALID, 
resultOffsetAndEpoch.kind());
     }
 
+    @Test
+    public void testLogLimitsReturnsLessThanMaxBytes() {
+        int numberOfRecordsPerBatch = 10;
+        appendBatch(numberOfRecordsPerBatch, 5);
+        appendBatch(numberOfRecordsPerBatch, 5);
+        appendBatch(numberOfRecordsPerBatch, 5);
+        // Set to be larger than 1 batch but smaller than 2.
+        int magicMaxTotalBytes = 200;
+        Records records = log.read(
+            0,
+            Isolation.UNCOMMITTED,
+            magicMaxTotalBytes
+        ).records;
+        // MockLog#read returns data in batches and will return an additional 
batch if one of them
+        // exceeds maxTotalBytes.
+        assertEquals(magicMaxTotalBytes, records.sizeInBytes());
+    }
+
+    @Test
+    public void testLogLimitsReturnsAtLeastOne() {
+        int numberOfRecordsPerBatch = 10;
+        appendBatch(numberOfRecordsPerBatch, 5);
+        appendBatch(numberOfRecordsPerBatch, 5);
+        // magicMaxTotalBytes are smaller than 10 simple records in a batch.
+        // Meaning we will read only the first batch and not the second.
+        int magicMaxTotalBytes = 1;
+        Records records = log.read(
+            0,
+            Isolation.UNCOMMITTED,
+            magicMaxTotalBytes
+        ).records;
+        assertTrue(
+            records.sizeInBytes() > magicMaxTotalBytes,
+            String.format(
+                "Expected records size (%d) > maxTotalBytes (%d) since one 
whole batch must be returned",
+                records.sizeInBytes(),
+                magicMaxTotalBytes
+            )
+        );
+        int recordCount = 0;
+        var iterator = records.records().iterator();
+        while (iterator.hasNext()) {
+            recordCount++;
+            iterator.next();
+        }
+        assertEquals(numberOfRecordsPerBatch, recordCount);
+    }
+
+    @Test
+    public void testMockLogReadExtremelyLargeMultiBatch() {
+        int recordsPerBatch = 1000;
+        appendBatch(recordsPerBatch, 5);
+        appendBatch(recordsPerBatch, 5);
+        // The MockLog is able to read a large internal batch
+        Records records = log.read(
+            0,
+            Isolation.UNCOMMITTED,
+            Integer.MAX_VALUE
+        ).records;
+        int recordCount = 0;
+        var iterator = records.records().iterator();
+        while (iterator.hasNext()) {
+            recordCount++;
+            iterator.next();
+        }
+        assertEquals(recordsPerBatch * 2, recordCount);
+    }
+
+    @Test
+    public void testMockLogReadExtremelyLargeSingleBatch() {
+        int recordsPerBatch = 1000;
+        appendBatch(recordsPerBatch, 5);
+        appendBatch(recordsPerBatch, 5);
+        // The MockLog is able to read a large internal batch
+        Records records = log.read(
+            0,
+            Isolation.UNCOMMITTED,
+            1
+        ).records;
+        int recordCount = 0;
+        var iterator = records.records().iterator();
+        while (iterator.hasNext()) {
+            recordCount++;
+            iterator.next();
+        }
+        assertEquals(recordsPerBatch, recordCount);
+    }
+
+    @ParameterizedTest
+    @ValueSource(ints = {1, 2})
+    public void testReadRespectsMaxSizeInBytes(int expectedBatches) throws 
IOException {
+        int recordsPerBatch = 5;
+        appendBatch(recordsPerBatch, 1);
+        appendBatch(recordsPerBatch, 1);
+        appendBatch(recordsPerBatch, 1);
+        appendBatch(recordsPerBatch, 1);
+
+        int magicMaxBatchSizeBytes = 101;
+        LogFetchInfo info = log.read(
+            0,
+            Isolation.UNCOMMITTED,
+            magicMaxBatchSizeBytes * expectedBatches
+        );
+
+        assertEquals(expectedBatches * magicMaxBatchSizeBytes, 
info.records.sizeInBytes());
+        var batchIterator = info.records.batchIterator();
+        while (batchIterator.hasNext()) {
+            assertEquals(magicMaxBatchSizeBytes, 
batchIterator.next().sizeInBytes());
+        }
+        // Asserts that we have exactly B * R records. Further there must be B 
batches of SimpleRecords each with a value of
+        // [0..R-1] converted to an utf-8 string with empty keys and headers.
+        int count = 0;
+        for (Record record : info.records.records()) {
+            byte[] expectedValue = String.valueOf(count % 
recordsPerBatch).getBytes(StandardCharsets.UTF_8);
+            assertEquals(ByteBuffer.wrap(expectedValue), record.value());
+            count += 1;
+        }
+        assertEquals(recordsPerBatch * expectedBatches, count);
+    }
+
     private Optional<OffsetRange> readOffsets(long startOffset, Isolation 
isolation) {
         // The current MockLog implementation reads at most one batch
 
@@ -1002,7 +1175,11 @@ public class MockLogTest {
         while (foundRecord) {
             foundRecord = false;
 
-            Records records = log.read(currentStart, isolation).records;
+            Records records = log.read(
+                currentStart,
+                isolation,
+                Integer.MAX_VALUE
+            ).records;
             for (Record record : records.records()) {
                 foundRecord = true;
 
@@ -1055,7 +1232,11 @@ public class MockLogTest {
 
         int currentOffset = 0;
         while (currentOffset < log.endOffset().offset()) {
-            Records records = log.read(currentOffset, 
Isolation.UNCOMMITTED).records;
+            Records records = log.read(
+                currentOffset,
+                Isolation.UNCOMMITTED,
+                Integer.MAX_VALUE
+            ).records;
             List<? extends RecordBatch> batches = 
Utils.toList(records.batches().iterator());
 
             assertFalse(batches.isEmpty());
diff --git a/raft/src/test/java/org/apache/kafka/raft/QuorumConfigTest.java 
b/raft/src/test/java/org/apache/kafka/raft/QuorumConfigTest.java
index ce7175a8b5e..c20612d0892 100644
--- a/raft/src/test/java/org/apache/kafka/raft/QuorumConfigTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/QuorumConfigTest.java
@@ -35,6 +35,8 @@ public class QuorumConfigTest {
         
assertInvalidConfig(Map.of(QuorumConfig.QUORUM_REQUEST_TIMEOUT_MS_CONFIG, 
"-1"));
         
assertInvalidConfig(Map.of(QuorumConfig.QUORUM_RETRY_BACKOFF_MS_CONFIG, "-1"));
         
assertInvalidConfig(Map.of(QuorumConfig.QUORUM_AUTO_JOIN_ENABLE_CONFIG, "-1"));
+        
assertInvalidConfig(Map.of(QuorumConfig.QUORUM_FETCH_SNAPSHOT_MAX_BYTES_CONFIG, 
"-1"));
+        assertInvalidConfig(Map.of(QuorumConfig.QUORUM_FETCH_MAX_BYTES_CONFIG, 
"-1"));
     }
 
     private void assertInvalidConfig(Map<String, Object> overrideConfig) {
diff --git 
a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java 
b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
index cd839d37b42..dfc4ec9c309 100644
--- a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
+++ b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
@@ -121,6 +121,7 @@ public final class RaftClientTestContext {
     final int checkQuorumTimeoutMs = (int) (fetchTimeoutMs * 
CHECK_QUORUM_TIMEOUT_FACTOR);
     final int beginQuorumEpochTimeoutMs = fetchTimeoutMs / 2;
     final int retryBackoffMs = Builder.RETRY_BACKOFF_MS;
+    final int fetchMaxBytes;
 
     private int electionTimeoutMs;
     private int requestTimeoutMs;
@@ -185,6 +186,8 @@ public final class RaftClientTestContext {
         private Endpoints localListeners = Endpoints.empty();
         private boolean isStartingVotersStatic = false;
         private boolean autoJoin = false;
+        private int fetchSnapshotMaxBytes = 
QuorumConfig.DEFAULT_QUORUM_FETCH_SNAPSHOT_MAX_BYTES;
+        private int fetchMaxBytes = 
QuorumConfig.DEFAULT_QUORUM_FETCH_MAX_BYTES;
 
         public Builder(int localId, Set<Integer> staticVoters) {
             this(OptionalInt.of(localId), staticVoters);
@@ -392,6 +395,16 @@ public final class RaftClientTestContext {
             return this;
         }
 
+        Builder withFetchSnapshotMaxBytes(int fetchSnapshotMaxSizeBytes) {
+            this.fetchSnapshotMaxBytes = fetchSnapshotMaxSizeBytes;
+            return this;
+        }
+
+        Builder withFetchMaxBytes(int fetchMaxBytes) {
+            this.fetchMaxBytes = fetchMaxBytes;
+            return this;
+        }
+
         public RaftClientTestContext build() throws IOException {
             Metrics metrics = new Metrics(time);
             MockNetworkChannel channel = new MockNetworkChannel();
@@ -428,6 +441,8 @@ public final class RaftClientTestContext {
             configMap.put(QuorumConfig.QUORUM_FETCH_TIMEOUT_MS_CONFIG, 
FETCH_TIMEOUT_MS);
             configMap.put(QuorumConfig.QUORUM_LINGER_MS_CONFIG, 
appendLingerMs);
             configMap.put(QuorumConfig.QUORUM_AUTO_JOIN_ENABLE_CONFIG, 
autoJoin);
+            configMap.put(QuorumConfig.QUORUM_FETCH_SNAPSHOT_MAX_BYTES_CONFIG, 
fetchSnapshotMaxBytes);
+            configMap.put(QuorumConfig.QUORUM_FETCH_MAX_BYTES_CONFIG, 
fetchMaxBytes);
             QuorumConfig quorumConfig = new QuorumConfig(new 
AbstractConfig(QuorumConfig.CONFIG_DEF, configMap));
 
             List<InetSocketAddress> computedBootstrapServers = 
bootstrapServers.orElseGet(() -> {
@@ -494,7 +509,8 @@ public final class RaftClientTestContext {
                 canBecomeVoter,
                 metrics,
                 externalKRaftMetrics,
-                listener
+                listener,
+                fetchMaxBytes
             );
 
             context.electionTimeoutMs = electionTimeoutMs;
@@ -523,7 +539,8 @@ public final class RaftClientTestContext {
         boolean canBecomeVoter,
         Metrics metrics,
         ExternalKRaftMetrics externalKRaftMetrics,
-        MockListener listener
+        MockListener listener,
+        int fetchMaxBytes
     ) {
         this.clusterId = clusterId;
         this.localId = localId;
@@ -542,6 +559,7 @@ public final class RaftClientTestContext {
         this.metrics = metrics;
         this.externalKRaftMetrics = externalKRaftMetrics;
         this.listener = listener;
+        this.fetchMaxBytes = fetchMaxBytes;
     }
 
     int electionTimeoutMs() {
@@ -1798,7 +1816,6 @@ public final class RaftClientTestContext {
             message.data(),
             "unexpected request type " + message.data());
         FetchRequestData request = (FetchRequestData) message.data();
-        assertEquals(KafkaRaftClient.MAX_FETCH_SIZE_BYTES, request.maxBytes());
         assertEquals(fetchMaxWaitMs, request.maxWaitMs());
 
         assertEquals(1, request.topics().size());
@@ -1893,6 +1910,7 @@ public final class RaftClientTestContext {
         return request
             .setMaxWaitMs(maxWaitTimeMs)
             .setClusterId(clusterId)
+            .setMaxBytes(fetchMaxBytes)
             .setReplicaState(
                 new 
FetchRequestData.ReplicaState().setReplicaId(replicaKey.id())
             );
diff --git 
a/raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftLogTest.java 
b/raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftLogTest.java
index 98456324cc1..ba3f39cdeb0 100644
--- a/raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftLogTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftLogTest.java
@@ -28,11 +28,15 @@ import org.apache.kafka.common.protocol.Writable;
 import org.apache.kafka.common.record.internal.ArbitraryMemoryRecords;
 import org.apache.kafka.common.record.internal.InvalidMemoryRecordsProvider;
 import org.apache.kafka.common.record.internal.MemoryRecords;
+import org.apache.kafka.common.record.internal.Record;
+import org.apache.kafka.common.record.internal.Records;
 import org.apache.kafka.common.record.internal.SimpleRecord;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.raft.Isolation;
 import org.apache.kafka.raft.KRaftConfigs;
 import org.apache.kafka.raft.KafkaRaftClient;
 import org.apache.kafka.raft.LogAppendInfo;
+import org.apache.kafka.raft.LogFetchInfo;
 import org.apache.kafka.raft.LogOffsetMetadata;
 import org.apache.kafka.raft.MetadataLogConfig;
 import org.apache.kafka.raft.QuorumConfig;
@@ -61,6 +65,7 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.function.Executable;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ArgumentsSource;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import java.io.File;
 import java.io.IOException;
@@ -88,8 +93,7 @@ public class KafkaRaftLogTest {
             10 * 1000,
             100 * 1024,
             60 * 1000,
-            KafkaRaftClient.MAX_BATCH_SIZE_BYTES,
-            KafkaRaftClient.MAX_FETCH_SIZE_BYTES
+            KafkaRaftClient.MAX_BATCH_SIZE_BYTES
     );
 
     private final MockTime mockTime = new MockTime();
@@ -718,8 +722,7 @@ public class KafkaRaftLogTest {
                 DEFAULT_METADATA_LOG_CONFIG.logSegmentMillis(),
                 DEFAULT_METADATA_LOG_CONFIG.retentionMaxBytes(),
                 DEFAULT_METADATA_LOG_CONFIG.retentionMillis(),
-                maxBatchSizeInBytes,
-                KafkaRaftClient.MAX_FETCH_SIZE_BYTES
+                maxBatchSizeInBytes
         );
         KafkaRaftLog log = buildMetadataLog(tempDir, mockTime, config);
 
@@ -928,8 +931,7 @@ public class KafkaRaftLogTest {
                 10 * 1000,
                 256,
                 60 * 1000,
-                512,
-                DEFAULT_METADATA_LOG_CONFIG.internalMaxFetchSizeInBytes()
+                512
         );
         KafkaRaftLog log = buildMetadataLog(tempDir, mockTime, config);
 
@@ -963,8 +965,7 @@ public class KafkaRaftLogTest {
                 10 * 1000,
                 1024,
                 60 * 1000,
-                100,
-                DEFAULT_METADATA_LOG_CONFIG.internalMaxFetchSizeInBytes()
+                100
         );
         KafkaRaftLog log = buildMetadataLog(tempDir, mockTime, config);
 
@@ -995,8 +996,7 @@ public class KafkaRaftLogTest {
                 10 * 1000,
                 10240,
                 60 * 1000,
-                100,
-                DEFAULT_METADATA_LOG_CONFIG.internalMaxFetchSizeInBytes()
+                100
         );
         KafkaRaftLog log = buildMetadataLog(tempDir, mockTime, config);
 
@@ -1043,8 +1043,7 @@ public class KafkaRaftLogTest {
                 10 * 1000,
                 10240,
                 60 * 1000,
-                200,
-                DEFAULT_METADATA_LOG_CONFIG.internalMaxFetchSizeInBytes()
+                200
         );
         KafkaRaftLog log = buildMetadataLog(tempDir, mockTime, config);
 
@@ -1083,13 +1082,117 @@ public class KafkaRaftLogTest {
         );
     }
 
+
+    @ParameterizedTest
+    @ValueSource(ints = {1, 2, 3})
+    public void testReadRespectsMaxSizeInBytes(int expectedBatches) throws 
IOException {
+        // 5 records are written in batches of 101 bytes each (at time of 
writing).
+        int magicMaxBatchSizeBytes = 101;
+        MetadataLogConfig config = createMetadataLogConfig(
+            10240,
+            10 * 1000,
+            10240,
+            60 * 1000,
+            magicMaxBatchSizeBytes
+        );
+        KafkaRaftLog log = buildMetadataLog(tempDir, mockTime, config);
+        int recordsPerBatch = 5;
+        append(log, recordsPerBatch, 1);
+        append(log, recordsPerBatch, 1);
+        append(log, recordsPerBatch, 1);
+        append(log, recordsPerBatch, 1);
+
+        LogFetchInfo info = log.read(
+            0,
+            Isolation.UNCOMMITTED,
+            magicMaxBatchSizeBytes * expectedBatches
+        );
+        assertEquals(expectedBatches * magicMaxBatchSizeBytes, 
info.records.sizeInBytes());
+        // Asserts that we have exactly B * R records. Further there must be B 
batches of SimpleRecords each with a value of
+        // [0..R-1] converted to an utf-8 string with empty keys and headers.
+        int count = 0;
+        for (Record record : info.records.records()) {
+            byte[] expectedValue = String.valueOf(count % 
recordsPerBatch).getBytes(StandardCharsets.UTF_8);
+            assertEquals(ByteBuffer.wrap(expectedValue), record.value());
+            count += 1;
+        }
+        assertEquals(recordsPerBatch * expectedBatches, count);
+    }
+
+    @Test
+    public void testLogLimitsReturnsLessThanMaxBytes() throws IOException {
+        // 5 records are written in batches of 141 bytes each (at time of 
writing).
+        int magicMaxBatchSizeBytes = 141;
+        MetadataLogConfig config = createMetadataLogConfig(
+            10240,
+            10 * 1000,
+            10240,
+            60 * 1000,
+            magicMaxBatchSizeBytes
+        );
+        KafkaRaftLog log = buildMetadataLog(tempDir, mockTime, config);
+        int numberOfRecordsPerBatch = 10;
+        append(log, numberOfRecordsPerBatch, 5);
+        append(log, numberOfRecordsPerBatch, 5);
+        append(log, numberOfRecordsPerBatch, 5);
+        // Set to be larger than 1 batch but smaller than 2.
+        int magicMaxTotalBytes = 200;
+        Records records = log.read(
+            0,
+            Isolation.UNCOMMITTED,
+            magicMaxTotalBytes
+        ).records;
+        // MockLog#read returns data in batches and will return an additional 
batch if one of them
+        // exceeds maxTotalBytes.
+        assertEquals(magicMaxTotalBytes, records.sizeInBytes());
+    }
+
+    @Test
+    public void testLogLimitsReturnsAtLeastOne() throws IOException {
+        int numberOfRecordsPerBatch = 10;
+        // 5 records are written in batches of 141 bytes each (at time of 
writing).
+        int magicMaxBatchSizeBytes = 141;
+        MetadataLogConfig config = createMetadataLogConfig(
+            10240,
+            10 * 1000,
+            10240,
+            60 * 1000,
+            magicMaxBatchSizeBytes
+        );
+        KafkaRaftLog log = buildMetadataLog(tempDir, mockTime, config);
+        append(log, numberOfRecordsPerBatch, 5);
+        append(log, numberOfRecordsPerBatch, 5);
+        // magicMaxTotalBytes are smaller than 10 simple records in a batch.
+        // Meaning we will read only the first batch and not the second.
+        int magicMaxTotalBytes = 1;
+        Records records = log.read(
+            0,
+            Isolation.UNCOMMITTED,
+            magicMaxTotalBytes
+        ).records;
+        assertTrue(
+            records.sizeInBytes() > magicMaxTotalBytes,
+            String.format(
+                "Expected records size (%d) > maxTotalBytes (%d) since one 
whole batch must be returned",
+                records.sizeInBytes(),
+                magicMaxTotalBytes
+            )
+        );
+        int recordCount = 0;
+        var iterator = records.records().iterator();
+        while (iterator.hasNext()) {
+            recordCount++;
+            iterator.next();
+        }
+        assertEquals(numberOfRecordsPerBatch, recordCount);
+    }
+
     private static MetadataLogConfig createMetadataLogConfig(
             int internalLogSegmentBytes,
             long logSegmentMillis,
             long retentionMaxBytes,
             long retentionMillis,
-            int internalMaxBatchSizeInBytes, //: Int = 
KafkaRaftClient.MAX_BATCH_SIZE_BYTES,
-            int internalMaxFetchSizeInBytes //: Int = 
KafkaRaftClient.MAX_FETCH_SIZE_BYTES,
+            int internalMaxBatchSizeInBytes
     ) {
         Map<String, ?> config = Map.of(
                 MetadataLogConfig.INTERNAL_METADATA_LOG_SEGMENT_BYTES_CONFIG, 
internalLogSegmentBytes,
@@ -1097,9 +1200,8 @@ public class KafkaRaftLogTest {
                 MetadataLogConfig.METADATA_MAX_RETENTION_BYTES_CONFIG, 
retentionMaxBytes,
                 MetadataLogConfig.METADATA_MAX_RETENTION_MILLIS_CONFIG, 
retentionMillis,
                 
MetadataLogConfig.INTERNAL_METADATA_MAX_BATCH_SIZE_IN_BYTES_CONFIG, 
internalMaxBatchSizeInBytes,
-                
MetadataLogConfig.INTERNAL_METADATA_MAX_FETCH_SIZE_IN_BYTES_CONFIG, 
internalMaxFetchSizeInBytes,
                 
MetadataLogConfig.INTERNAL_METADATA_DELETE_DELAY_MILLIS_CONFIG, 
ServerLogConfigs.LOG_DELETE_DELAY_MS_DEFAULT
-                );
+        );
         return new MetadataLogConfig(new 
AbstractConfig(MetadataLogConfig.CONFIG_DEF, config, false));
     }
 

Reply via email to