[
https://issues.apache.org/jira/browse/KAFKA-15305?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17850505#comment-17850505
]
Chia-Ping Tsai edited comment on KAFKA-15305 at 5/30/24 1:17 PM:
-----------------------------------------------------------------
KAFKA-16639 needs this ticket to fix root cause. As this ticket described,
`ConsumerNetworkThread` does not honor the close timeout. Even though we put a
heartbeat request to leave group, `ConsumerNetworkThread` will move it to
`NetworkClient` and then exit the waiting ...
It seems to me the simple solution is to add a method "hasInFlightRequests" to
"networkClientDelegate", and then we change the while condition [0] from
"timer.notExpired() && !networkClientDelegate.unsentRequests().isEmpty()" to
following condition
{code:java}
boolean hasPendingRequessts() {
return client.hasInFlightRequests() ||
!networkClientDelegate.unsentRequests().isEmpty()
}
{code}
{code:java}
do {
networkClientDelegate.poll(timer.remainingMs(),
timer.currentTimeMs());
timer.update();
} while (timer.notExpired() &&
networkClientDelegate.hasPendingRequessts());
{code}
[~kirktrue] WDYT?
[0]
https://github.com/apache/kafka/blob/cc269b0d438534ae8fef16b39354da1d78332a2c/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java#L301
was (Author: chia7712):
KAFKA-16639 needs this ticket to fix root cause. As this ticket described,
`ConsumerNetworkThread` does not honor the close timeout. Even though we put a
heartbeat request to leave group, `ConsumerNetworkThread` will move it to
`NetworkClient` and then exit the waiting ...
It seems to me the simple solution is to add a method "hasInFlightRequests" to
"networkClientDelegate", and then we change the while condition [0] from
"timer.notExpired() && !networkClientDelegate.unsentRequests().isEmpty()" to
"timer.notExpired() && networkClientDelegate.hasInFlightRequests()".
{code:java}
boolean hasPendingRequessts() {
return client.hasInFlightRequests() ||
!networkClientDelegate.unsentRequests().isEmpty()
}
{code}
{code:java}
do {
networkClientDelegate.poll(timer.remainingMs(),
timer.currentTimeMs());
timer.update();
} while (timer.notExpired() &&
networkClientDelegate.hasPendingRequessts());
{code}
[~kirktrue] WDYT?
[0]
https://github.com/apache/kafka/blob/cc269b0d438534ae8fef16b39354da1d78332a2c/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java#L301
> The background thread should try to process the remaining task until the
> shutdown timer is expired
> --------------------------------------------------------------------------------------------------
>
> Key: KAFKA-15305
> URL: https://issues.apache.org/jira/browse/KAFKA-15305
> Project: Kafka
> Issue Type: Bug
> Components: clients, consumer
> Reporter: Philip Nee
> Assignee: Chia-Ping Tsai
> Priority: Major
> Labels: consumer-threading-refactor, timeout
> Fix For: 3.8.0
>
>
> While working on https://issues.apache.org/jira/browse/KAFKA-15304
> close() API supplies a timeout parameter so that the consumer can have a
> grace period to process things before shutting down. The background thread
> currently doesn't do that, when close() is initiated, it will immediately
> close all of its dependencies.
>
> This might not be desirable because there could be remaining tasks to be
> processed before closing. Maybe the correct things to do is to first stop
> accepting API request, second, let the runOnce() continue to run before the
> shutdown timer expires, then we can force closing all of its dependencies.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)