[
https://issues.apache.org/jira/browse/KAFKA-15848?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Kirk True updated KAFKA-15848:
------------------------------
Labels: consumer-threading-refactor kip-848-client-support timeout (was:
consumer-threading-refactor consumer-timeout kip-848-client-support timeout)
> Consumer API timeout inconsistent between ConsumerDelegate implementations
> --------------------------------------------------------------------------
>
> Key: KAFKA-15848
> URL: https://issues.apache.org/jira/browse/KAFKA-15848
> Project: Kafka
> Issue Type: Bug
> Components: clients, consumer, unit tests
> Reporter: Kirk True
> Assignee: Kirk True
> Priority: Major
> Labels: consumer-threading-refactor, kip-848-client-support,
> timeout
> Fix For: 3.8.0
>
>
> The two {{ConsumerDelegate}} implementations ({{{}LegacyKafkaConsumer{}}} and
> {{{}AsyncKafkaConsumer{}}}) have a fundamental difference related to their
> use and interpretation of the {{Timer}} that is supplied.
> h3. tl;dr
> {{AsyncKafkaConsumer}} is very literal about the timeout, whereas
> {{LegacyKafkaConsumer}} seems to give a little wiggle room.
> {{LegacyKafkaConsumer}} is structured so that the logic it uses can check for
> success of its operations _before_ checking the timer:
> # Submit operation asynchronously
> # Wait for operation to complete using {{NetworkClient.poll()}}
> # Check for result
> ## If successful, return success
> ## If fatal failure, return failure
> # Check timer
> ## If timer expired, return failure
> {{AsyncKafkaConsumer}} uses {{Future.get()}} to wait for its operations:
> # Submit operation asynchronously
> # Wait for operation to complete using {{Future.get()}}
> ## If operation timed out, {{Future.get()}} will throw a timeout error
> # Check for result
> ## If successful, return success
> ## Otherwise, return failure
> h3. How to reproduce
> This causes subtle timing issues, but they can be easily reproduced via any
> of the {{KafkaConsumerTest}} unit tests that invoke the {{consumer.poll(0)}}
> API. Here's a bit of code that illustrates the difference between the two
> approaches.
> {{LegacyKafkaConsumer}} performs a lot of its network I/O operations in a
> manner similar to this:
> {code:java}
> public int getCount(Timer timer) {
> do {
> final RequestFuture<Integer> future = sendSomeRequest(partitions);
> client.poll(future, timer);
> if (future.isDone())
> return future.get();
> } while (timer.notExpired());
> return -1;
> }
> {code}
> {{AsyncKafkaConsumer}} has similar logic, but it is structured like this:
> {code:java}
> private int getCount(Timer timer) {
> try {
> CompletableFuture<Integer> future = new CompleteableFuture<>();
> applicationEventQueue.add(future);
> return future.get(timer.remainingMs(), TimeUnit.MILLISECONDS);
> } catch (TimeoutException e) {
> return -1;
> }
> }
> {code}
> The call to {{add}} enqueues the network operation, but it then _immediately_
> invokes {{Future.get()}} with the timeout to implement a time-bounded
> blocking call. Since this method is being called with a timeout of 0, it
> _immediately_ throws a {{{}TimeoutException{}}}.
> h3. Suggested fix
> TBD :(
--
This message was sent by Atlassian Jira
(v8.20.10#820010)