showuon commented on code in PR #20088:
URL: https://github.com/apache/kafka/pull/20088#discussion_r2192058107
##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -1852,7 +1852,11 @@ class ReplicaManager(val config: KafkaConfig,
// Once we read from a non-empty partition, we stop ignoring request and
partition level size limits
if (recordBatchSize > 0)
minOneMessage = false
- limitBytes = math.max(0, limitBytes - recordBatchSize)
+ // Because we don't know how much data will be retrieved in remote fetch
yet, and we don't want to block the API call
+ // to query remoteLogMetadata, assume it will fetch the max bytes size
of data to avoid to exceed the "fetch.max.bytes" setting.
+ val estimatedRecordBatchSize = if (recordBatchSize == 0 &&
readResult.info.delayedRemoteStorageFetch.isPresent)
+ readResult.info.delayedRemoteStorageFetch.get.fetchMaxBytes else
recordBatchSize
Review Comment:
After reading the code for share fetch, I agree this change won't impact the
share fetch. In share fetch, we'll pre-calculate the expected max fetch size
for each partition in `uniformPartitionMaxBytes`. Without this change, there
will be no issue for share fetch because the max size is already calculated.
After this change, I used the
`readResult.info.delayedRemoteStorageFetch.get.fetchMaxBytes`, which comes from
`math.min(fetchInfo.maxBytes, limitBytes)`, so it has the same result.
Thanks for pointing it out, Jun! I admit I never considered share fetch
earlier.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]