jsancio commented on code in PR #21028:
URL: https://github.com/apache/kafka/pull/21028#discussion_r2775326664
##########
raft/src/main/java/org/apache/kafka/raft/internals/KafkaRaftLog.java:
##########
@@ -118,14 +118,22 @@ UnifiedLog log() {
}
@Override
- public LogFetchInfo read(long startOffset, Isolation readIsolation) {
+ public int defaultReadMaxBatchSizeBytes() {
+ return config.internalMaxFetchSizeInBytes();
+ }
+
+ @Override
+ public LogFetchInfo read(long startOffset, Isolation readIsolation, int
maxTotalBatchSizeBytes) {
FetchIsolation isolation = switch (readIsolation) {
case COMMITTED -> FetchIsolation.HIGH_WATERMARK;
case UNCOMMITTED -> FetchIsolation.LOG_END;
};
try {
- FetchDataInfo fetchInfo = log.read(startOffset,
config.internalMaxFetchSizeInBytes(), isolation, true);
+ FetchDataInfo fetchInfo = log.read(startOffset,
+ maxTotalBatchSizeBytes,
+ isolation,
+ true);
Review Comment:
Please use this formatting in the raft module. Notice the comment for the
parameter passed using a generic literal like true.
```java
FetchDataInfo fetchInfo = log.read(
startOffset,
maxTotalBatchSizeBytes,
isolation,
true // return at least one batch if present
);
```
##########
raft/src/main/java/org/apache/kafka/raft/RaftLog.java:
##########
@@ -58,8 +58,22 @@ public interface RaftLog extends AutoCloseable {
/**
* Read a set of records within a range of offsets.
+ * maxTotalBatchSizeBytes specifies a "soft" max for total byte size of
the record batches to read.
*/
- LogFetchInfo read(long startOffsetInclusive, Isolation isolation);
+ LogFetchInfo read(long startOffsetInclusive, Isolation isolation, int
maxTotalBatchSizeBytes);
+
+ /**
+ * Configures a soft max for total size of bytes read for {@link
#read(long, Isolation)}.
+ */
+ int defaultReadMaxBatchSizeBytes();
+
+ /**
+ * Read a set of records within a range of offsets. Implementors set a
"soft" max for the number of bytes to read
+ * by implementing defaultLocalReadMaxRecordsSizeBytes.
+ */
+ default LogFetchInfo read(long startOffsetInclusive, Isolation isolation) {
+ return read(startOffsetInclusive, isolation,
defaultReadMaxBatchSizeBytes());
+ }
Review Comment:
Let's avoid overloaded methods.
##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -1598,7 +1600,8 @@ private FetchResponseData tryCompleteFetchRequest(
short apiVersion,
ReplicaKey replicaKey,
FetchRequestData.FetchPartition request,
- long currentTimeMs
+ long currentTimeMs,
+ int maxSizeBytes
Review Comment:
In the raft module we tend to always make the current time the last
parameter of methods that require it.
##########
raft/src/main/java/org/apache/kafka/raft/RaftLog.java:
##########
@@ -58,8 +58,22 @@ public interface RaftLog extends AutoCloseable {
/**
* Read a set of records within a range of offsets.
+ * maxTotalBatchSizeBytes specifies a "soft" max for total byte size of
the record batches to read.
*/
- LogFetchInfo read(long startOffsetInclusive, Isolation isolation);
+ LogFetchInfo read(long startOffsetInclusive, Isolation isolation, int
maxTotalBatchSizeBytes);
+
+ /**
+ * Configures a soft max for total size of bytes read for {@link
#read(long, Isolation)}.
+ */
+ int defaultReadMaxBatchSizeBytes();
Review Comment:
Why does the client need to know this? Let's try to keep the interface as
small as possible.
##########
raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java:
##########
@@ -233,7 +233,7 @@ private void checkOffsetIsValid(long offset) {
private void maybeLoadLog() {
while (log.endOffset().offset() > nextOffset) {
- LogFetchInfo info = log.read(nextOffset, Isolation.UNCOMMITTED);
+ LogFetchInfo info = log.read(nextOffset, Isolation.UNCOMMITTED,
maxBatchSizeBytes);
Review Comment:
Okay. Technically this is different from the FETCH case and we don't need to
be as strict on how many bytes to read when loading the entire log. This
component is attempting to read the entire log and blocks until the entire log
is read.
##########
raft/src/test/java/org/apache/kafka/raft/MockLog.java:
##########
@@ -418,9 +419,29 @@ private void verifyOffsetInRange(long offset) {
}
}
+ public void setExpectedMaxTotalRecordsSizeBytes(int maxBytesReadFromLog) {
+ this.maybeExpectedMaxTotalRecordsSizeBytes =
Optional.of(maxBytesReadFromLog);
+ }
+
+ private void maybeVerifyMaxBytesForLogRead(int actualSizeBytes) {
+ maybeExpectedMaxTotalRecordsSizeBytes.ifPresent(expectedSizeBytes -> {
+ if (actualSizeBytes != expectedSizeBytes) {
+ throw new IllegalStateException(
+ String.format("Expected maxTotalRecordsSizeBytes %d
but got %d (bytes)",
+ expectedSizeBytes, actualSizeBytes));
+ }
+ });
+ }
Review Comment:
Let's not test this feature this way. You are using this during the protocol
test. For those tests you can check that the return fetch response has at most
`maxBytesReadFromLog` if the next batch is less than `maxBytesReadFromLog`.
##########
raft/src/main/java/org/apache/kafka/raft/RaftLog.java:
##########
@@ -58,8 +58,22 @@ public interface RaftLog extends AutoCloseable {
/**
* Read a set of records within a range of offsets.
+ * maxTotalBatchSizeBytes specifies a "soft" max for total byte size of
the record batches to read.
Review Comment:
Please use Java doc syntax for Java doc comments. Can you add `@param`
comments for all of the parameters and `@return` for the returned object.
##########
raft/src/main/java/org/apache/kafka/raft/internals/KafkaRaftLog.java:
##########
@@ -118,14 +118,22 @@ UnifiedLog log() {
}
@Override
- public LogFetchInfo read(long startOffset, Isolation readIsolation) {
+ public int defaultReadMaxBatchSizeBytes() {
+ return config.internalMaxFetchSizeInBytes();
+ }
Review Comment:
See my other comment but let's remove this if not needed.
##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -1517,7 +1517,8 @@ private CompletableFuture<FetchResponseData>
handleFetchRequest(
requestMetadata.apiVersion(),
replicaKey,
fetchPartition,
- currentTimeMs
+ currentTimeMs,
+ request.maxBytes()
Review Comment:
Let's try to keep the current time as the last parameter. This is consistent
with the rest/majority of the methods in this file and module.
##########
raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java:
##########
@@ -108,6 +108,14 @@ public class QuorumConfig {
"join the cluster metadata partition for its cluster id.";
public static final boolean DEFAULT_QUORUM_AUTO_JOIN_ENABLE = false;
+ public static final String QUORUM_FETCH_SNAPSHOT_SIZE_MAX_BYTES_CONFIG =
QUORUM_PREFIX + "fetch.snapshot.max.bytes";
+ public static final String QUORUM_FETCH_SNAPSHOT_SIZE_MAX_BYTES_DOC =
"Maximum amount of data to retrieve for each FetchSnapshot request to the
controller.";
+ public static final int DEFAULT_QUORUM_FETCH_SNAPSHOT_SIZE_MAX_BYTES =
1048576;
+
+ public static final String QUORUM_FETCH_SIZE_MAX_BYTES_CONFIG =
QUORUM_PREFIX + "fetch.max.bytes";
+ public static final String QUORUM_FETCH_SIZE_MAX_BYTES_DOC = "Maximum
amount of data to retrieve for each Fetch request. Always returns at least one
record if a new one is available.";
Review Comment:
You should mention that FETCH will send at least one batch if there is data
to send even if it is greater than QUORUM_FETCH_SIZE_MAX_BYTES_CONFIG.
##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -3013,7 +3016,7 @@ private FetchSnapshotRequestData
buildFetchSnapshotRequest(OffsetAndEpoch snapsh
log.topicPartition(),
quorum.epoch(),
snapshotId,
- MAX_FETCH_SIZE_BYTES,
+ quorumConfig.fetchSnapshotSizeMaxBytes(),
Review Comment:
Why is one configuration call `fetchMaxSizeBytes` while the other is called
`fetchSnapshotSizeMaxBytes`? Notice how `Max` and `Size` are swapped.
##########
raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java:
##########
@@ -121,6 +121,8 @@ public final class RaftClientTestContext {
final int checkQuorumTimeoutMs = (int) (fetchTimeoutMs *
CHECK_QUORUM_TIMEOUT_FACTOR);
final int beginQuorumEpochTimeoutMs = fetchTimeoutMs / 2;
final int retryBackoffMs = Builder.RETRY_BACKOFF_MS;
+ final int fetchSnapshotMaxSizeBytes;
+ final int fetchMaxSizeBytes;
Review Comment:
It looks like `fetchSnapshotMaxSizeBytes` is not used.
Did you consider make these values parameters to the different methods that
need them?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]