[
https://issues.apache.org/jira/browse/KAFKA-19259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17956788#comment-17956788
]
Arpit Goyal commented on KAFKA-19259:
-------------------------------------
Hi [~lianetm] I am able to figure out the root cause of the issue. As per my
observation it is happening because of incorrect handling of
pendingFetchRequestFuture flag which control fetch request need to be sent or
not.
Let's walk through the code's logic inside pollForFetches:
*Step 1: Check the Buffer*
First, the application thread tries to get any records that might already be
waiting in the FetchBuffer.
{code:java}
// Simplified from AsyncKafkaConsumer.pollForFetches()
Fetch<K, V> fetch = fetchCollector.collectFetch(fetchBuffer);
if (!fetch.isEmpty()) {
return fetch; // Data was already there, return immediately
}
{code}
On the first call, the buffer is empty, so this check fails.
*Step 2: Send the Fetch Request*
Since the buffer was empty, the application thread asks the background thread
to go get some data.
{code:java}
// Simplified from AsyncKafkaConsumer.pollForFetches()
sendFetches(timer);
{code}
this is where the flawed logic happens:
# A CreateFetchRequestsEvent is sent.
# The background thread creates the network request.
# The background thread prematurely completes the future and nulls it out.
What it means is request may not even yet received by broker , but
pendingFetchRequestFuture has been marked completed.
{code:java}
List<UnsentRequest> requests =
fetchRequests.entrySet().stream().map(entry -> {
final Node fetchTarget = entry.getKey();
final FetchSessionHandler.FetchRequestData data =
entry.getValue();
final FetchRequest.Builder request =
createFetchRequest(fetchTarget, data);
final BiConsumer<ClientResponse, Throwable> responseHandler =
(clientResponse, error) -> {
if (error != null)
errorHandler.handle(fetchTarget, data, error);
else
successHandler.handle(fetchTarget, data,
clientResponse);
};
return new UnsentRequest(request,
Optional.of(fetchTarget)).whenComplete(responseHandler);
}).collect(Collectors.toList());
pendingFetchRequestFuture.complete(null);
return new PollResult(requests);
} catch (Throwable t) {
// A "dummy" poll result is returned here rather than rethrowing
the error because any error
// that is thrown from any RequestManager.poll() method interrupts
the polling of the other
// request managers.
pendingFetchRequestFuture.completeExceptionally(t);
return PollResult.EMPTY;
} finally {
pendingFetchRequestFuture = null;
}
}
{code}
4. The sendFetches() call unblocks on the application thread.
*Step 3: Wait on awaitNotEmpty (The Crucial Step)*
Now that the application thread has been told the request is "sent," it doesn't
return immediately. Instead, it waits on the buffer.
{code:java}
// Simplified from AsyncKafkaConsumer.pollForFetches()
fetchBuffer.awaitNotEmpty(timer); // BLOCKING CALL
return fetchCollector.collectFetch(fetchBuffer);
{code}
The fetchBuffer.awaitNotEmpty(timer) call does the following: "Pause the
application thread until one of two things happens:
1. The background thread puts new records into the fetchBuffer and notifies me.
2. The timer (which is controlled by your poll(timeout)) expires."
*Step 4: The Two Paths (The Bug's Impact)*
This is where the outcome diverges and where delay comes from.
*_Scenario A: The Happy Path (Data Arrives)_*
# The response for FetchRequest #1 arrives from the broker with records.
# The background thread processes the response and calls
fetchBuffer.add(records).
# This add operation notifies the condition variable that awaitNotEmpty is
waiting on.
# The application thread in awaitNotEmpty wakes up instantly, collects the
fetch, and returns the data to the user. This feels fast and responsive.
_*Scenario B: The Buggy Path (Empty Response)*_
# The response for FetchRequest #1 arrives from the broker, but it's empty.
# The background thread sees the empty response. It does not add anything to
the fetchBuffer.
# Because the pendingFetchRequestFuture was already cleared, the background
thread does not know it should immediately send a new fetch request. It simply
goes idle.
{code:java}
private PollResult pollInternal(FetchRequestPreparer fetchRequestPreparer,
ResponseHandler<ClientResponse>
successHandler,
ResponseHandler<Throwable> errorHandler) {
if (pendingFetchRequestFuture == null) {
log.info("HACK (Background Thread): pollInternal was called, but
pendingFetchRequestFuture is null. No action will be taken.");
// If no explicit request for creating fetch requests was issued,
just short-circuit.
return PollResult.EMPTY;
}
{code}
{color:#FF0000}Therefore, the awaitNotEmpty call on the application thread is
never notified. It has no choice but to sit there and wait until the timer you
provided in poll(timeout) expires{color}.
I have also added log statements which depict the above behaviour.Attaching
the full logs.[^consumer11_KAFKA-19259.log]
{code:java}
[2025-06-08 22:23:34,921] DEBUG [Consumer clientId=console-consumer,
groupId=cg15] Entering awaitNotEmpty with timer remaining: 4065ms
(org.apache.kafka.clients.consumer.internals.FetchBuffer)
[2025-06-08 22:23:34,921] DEBUG [Consumer clientId=console-consumer,
groupId=cg15] Buffer is empty, will wait. Timer remaining: 4065ms, wokenup:
false (org.apache.kafka.clients.consumer.internals.FetchBuffer)
[2025-06-08 22:23:34,921] DEBUG [Consumer clientId=console-consumer,
groupId=cg15] Starting wait at 1749401614921 for 4065ms
(org.apache.kafka.clients.consumer.internals.FetchBuffer)
[2025-06-08 22:23:35,431] DEBUG [Consumer clientId=console-consumer,
groupId=cg15] Received FETCH response from node 1 for request with header
RequestHeader(apiKey=FETCH, apiVersion=17, clientId=console-consumer,
correlationId=79, headerVersion=2): FetchResponseData(throttleTimeMs=0,
errorCode=0, sessionId=1018152883, responses=[], nodeEndpoints=[])
(org.apache.kafka.clients.NetworkClient)
[2025-06-08 22:23:35,433] DEBUG [Consumer clientId=console-consumer,
groupId=cg15] Node 1 sent an incremental fetch response with throttleTimeMs = 0
for session 1018152883 with 0 response partition(s), 6 implied partition(s)
(org.apache.kafka.clients.FetchSessionHandler)
[2025-06-08 22:23:35,433] DEBUG [Consumer clientId=console-consumer,
groupId=cg15] Removing pending request for fetch session: 1018152883 for node:
localhost:9092 (id: 1 rack: null isFenced: false)
(org.apache.kafka.clients.consumer.internals.AbstractFetch)
[2025-06-08 22:23:35,434] INFO [Consumer clientId=console-consumer,
groupId=cg15] HACK (Background Thread): pollInternal was called, but
pendingFetchRequestFuture is null. No action will be taken.
(org.apache.kafka.clients.consumer.internals.FetchRequestManager)
[2025-06-08 22:23:38,994] DEBUG [Consumer clientId=console-consumer,
groupId=cg15] Wait timed out after 4071ms
(org.apache.kafka.clients.consumer.internals.FetchBuffer)
[2025-06-08 22:23:39,004] INFO [Consumer clientId=console-consumer,
groupId=cg15] HACK (Background Thread): pollInternal was called, but
pendingFetchRequestFuture is null. No action will be taken.
(org.apache.kafka.clients.consumer.internals.FetchRequestManager)
[2025-06-08 22:23:39,406] DEBUG [Consumer clientId=console-consumer,
groupId=cg15] Exiting awaitNotEmpty. Buffer empty: true, wokenup: false, timer
remaining: 4065ms (
{code}
> Async consumer fetch intermittent delays on console consumer
> ------------------------------------------------------------
>
> Key: KAFKA-19259
> URL: https://issues.apache.org/jira/browse/KAFKA-19259
> Project: Kafka
> Issue Type: Bug
> Components: clients, consumer
> Affects Versions: 4.0.0
> Reporter: Lianet Magrans
> Assignee: Arpit Goyal
> Priority: Major
> Fix For: 4.1.0
>
> Attachments: Screenshot 2025-05-31 at 10.44.29 PM.png,
> console-consumer-classic-vs-consumer.mov, consumer11_KAFKA-19259.log,
> consumer_KAFKA-19259.log, debug5.log
>
>
> We noticed that fetching with the kafka-console-consumer.sh tool using the
> new consumer shows some intermittent delays, that are not seen when running
> the same with the classic consumer. Note that I disabled auto-commit to
> isolate the delay, and from a first look seems to come from the
> fetchBuffer.awaitNonEmpty logic, that alternatively takes almost the full
> poll timeout (runs "fast", then "slow", and continues to alternate)
> [https://github.com/apache/kafka/blob/0b81d6c7802c1be55dc823ce51729f2c6a6071a7/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1808]
>
> The difference in behaviour between the 2 consumers can be seen with this
> setup:
> * topic with 6 partitions (I tried with 1 partition first and didn't see the
> delay, then with 3 and 6 I could see it)
> * data populated in topic with producer sending generated uuids to the topic
> in while loop
> * run console consumer (asycn) no commit:
> bin/kafka-console-consumer.sh --topic t1 --bootstrap-server localhost:9092
> --consumer-property group.protocol=consumer --group cg1 --consumer-property
> enable.auto.commit=false
> Here we can notice the pattern that looks like batches, and custom logs on
> the awaitNonEmpty show it take the full poll timeout on alternate poll
> iterations.
> * run same but for classic consumer (consumer-property
> group.protocol=classic) -> not such pattern of intermittent delays
> Produce continuously (I used this)
> while sleep 1; do echo $(uuidgen); done | bin/kafka-console-producer.sh
> --bootstrap-server localhost:9092 --topic t1
> This needs more investigation to fully understand if it's indeed something in
> the fetch path or something else)
--
This message was sent by Atlassian Jira
(v8.20.10#820010)