showuon commented on code in PR #20088:
URL: https://github.com/apache/kafka/pull/20088#discussion_r2191326016
##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -1852,7 +1852,11 @@ class ReplicaManager(val config: KafkaConfig,
// Once we read from a non-empty partition, we stop ignoring request and
partition level size limits
if (recordBatchSize > 0)
minOneMessage = false
- limitBytes = math.max(0, limitBytes - recordBatchSize)
+ // Because we don't know how much data will be retrieved in remote fetch
yet, and we don't want to block the API call
+ // to query remoteLogMetadata, assume it will fetch the max bytes size
of data to avoid to exceed the "fetch.max.bytes" setting.
+ val estimatedRecordBatchSize = if (recordBatchSize == 0 &&
readResult.info.delayedRemoteStorageFetch.isPresent)
+ readResult.info.delayedRemoteStorageFetch.get.fetchMaxBytes else
recordBatchSize
Review Comment:
> For example, if maxBytes is 2MB,
delayedRemoteStorageFetch.get.fetchMaxBytes is 1MB, the first 2 partitions are
read from remote storage, but the 3rd partition is read locally, this change
will fetch 0 bytes from the 3rd partition, but only return 1MB (for the 1st
partition) in the fetch response.
You're right, Jun. So this fix is going to include this improvement
(https://github.com/apache/kafka/pull/20045) in v4.2.0 together, to support
fetch multiple remote partitions in one fetch request.
> Note that readFromLog() is also used by share fetch, which supports
fetching from multiple partitions with remote storage. So, this code is correct
for share fetch.
Let me check with the share fetch.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]