[
https://issues.apache.org/jira/browse/KAFKA-16637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17842530#comment-17842530
]
Kirk True commented on KAFKA-16637:
-----------------------------------
[~chickenchickenlove]—thanks for filing this. There are two existing
improvements (KAFKA-15974 and KAFKA-16200) that fix timing issues in the new
consumer. However, even when testing your case on a temporary branch that
includes fixes for both of those issues, the problem still showed up.
This issue is related to an optimization for offset fetch logic.
When a user calls {{Consumer.poll()}}, among other things, the consumer
performs a network request to fetch any previously-committed offsets so it can
determine from where to start fetching new records. When the user passes in a
timeout of zero, it's almost always the case that the offset fetch network
request will not be performed within 0 milliseconds. However, the consumer
still sends out the request and handles the response when it is received,
usually a few milliseconds later. In this first attempt, the lookup fails and
the {{poll()}} loops back around. Given that this timeout is the common case,
the consumer caches the offset fetch response/result from the first attempt
(even though it timed out) because it knows that the _next_ call to {{poll()}}
is going to attempt the exact same operation. When it is later attempted a
second time, the response is already there from the first attempt such that the
consumer doesn't need to perform a network request.
The existing consumer has implemented this caching in
[PendingCommittedOffsetRequest|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L132].
The new consumer has implemented it in
[CommitRequestManager|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java#L510].
The core issue is the new consumer implementation is clearing out the first
attempt's cached result too aggressively. The effect being that the second (and
subsequent) attempts fail to find any previous attempt's cached result, and all
submit network requests, which all fail. Thus the consumer never makes any
headway.
> KIP-848 does not work well
> --------------------------
>
> Key: KAFKA-16637
> URL: https://issues.apache.org/jira/browse/KAFKA-16637
> Project: Kafka
> Issue Type: Bug
> Components: clients, consumer
> Reporter: sanghyeok An
> Assignee: Kirk True
> Priority: Minor
> Labels: kip-848-client-support
> Fix For: 3.8.0
>
> Attachments: image-2024-04-30-08-33-06-367.png,
> image-2024-04-30-08-33-50-435.png
>
>
> I want to test next generation of the consumer rebalance protocol
> ([https://cwiki.apache.org/confluence/display/KAFKA/The+Next+Generation+of+the+Consumer+Rebalance+Protocol+%28KIP-848%29+-+Early+Access+Release+Notes)]
>
> However, it does not works well.
> You can check my condition.
>
> *Docker-compose.yaml*
> [https://github.com/chickenchickenlove/kraft-test/blob/main/docker-compose/docker-compose.yaml]
>
> *Consumer Code*
> [https://github.com/chickenchickenlove/kraft-test/blob/main/src/main/java/org/example/Main.java]
>
> *Consumer logs*
> [main] INFO org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector
> - initializing Kafka metrics collector
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.7.0
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId:
> 2ae524ed625438c5
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs:
> 1714309299215
> [main] INFO org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer -
> [Consumer clientId=1-1, groupId=1] Subscribed to topic(s): test-topic1
> [consumer_background_thread] INFO org.apache.kafka.clients.Metadata -
> [Consumer clientId=1-1, groupId=1] Cluster ID: Some(MkU3OEVBNTcwNTJENDM2Qk)
> Stuck In here...
>
> *Broker logs*
> broker | [2024-04-28 12:42:27,751] INFO Sent auto-creation request for
> Set(__consumer_offsets) to the active controller.
> (kafka.server.DefaultAutoTopicCreationManager)
> broker | [2024-04-28 12:42:27,801] INFO Sent auto-creation request for
> Set(__consumer_offsets) to the active controller.
> (kafka.server.DefaultAutoTopicCreationManager)
> broker | [2024-04-28 12:42:28,211] INFO Sent auto-creation request for
> Set(__consumer_offsets) to the active controller.
> (kafka.server.DefaultAutoTopicCreationManager)
> broker | [2024-04-28 12:42:28,259] INFO Sent auto-creation request for
> Set(__consumer_offsets) to the active controller.
> (kafka.server.DefaultAutoTopicCreationManager)
> broker | [2024-04-28 12:42:28,727] INFO Sent auto-creation request for
> Set(__consumer_offsets) to the active controller.
> (kafka.server.DefaultAutoTopicCreationManager)
> stuck in here
--
This message was sent by Atlassian Jira
(v8.20.10#820010)