chirag-wadhwa5 commented on code in PR #19637:
URL: https://github.com/apache/kafka/pull/19637#discussion_r2077341351
##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -3049,6 +3049,11 @@ class KafkaApis(val requestChannel: RequestChannel,
// Creating the shareFetchContext for Share Session Handling. if context
creation fails, the request is failed directly here.
shareFetchContext = sharePartitionManager.newContext(groupId,
shareFetchData, forgottenTopics, newReqMetadata, isAcknowledgeDataPresent,
request.context.connectionId)
} catch {
+ case e: ShareSessionLimitReachedException =>
+ // Throttle for maxWaitMs when share session limit is reached
+ requestHelper.throttle(quotas.request, request,
shareFetchRequest.maxWait)
Review Comment:
Thanks for the review. Yes, when a channel is muted, then it cannot receive
any requests until it is unmuted. Even metadata and heartbeat requests will not
be processed. But I think that should be fine. The throttled duration is max
wait time time. If I am not wrong, the channel is muted everytime a request is
received and remains muted until the response is sent back. When the partition
does not have any records to be fetched, I believe any share fetch request is
added to the purgatory and is actually waited for max wait time before a
response is sent. That situation is identical to the broker being throttled for
max wait time and no heartbeats or metadata requests can be processed during
that time. There could be issues where max wait time is more than heartbeat
session timeout though
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]