[
https://issues.apache.org/jira/browse/KAFKA-16156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17816239#comment-17816239
]
Philip Nee commented on KAFKA-16156:
------------------------------------
There seem to be a subtle difference in behavior between the async and the
legacy consumer. The legacy one catches the error without doing anything, the
async client doesn't seem to be handling the exception. The fix would be easy
- but I'll run the same test first
> System test failing for new consumer on endOffsets with negative timestamps
> ---------------------------------------------------------------------------
>
> Key: KAFKA-16156
> URL: https://issues.apache.org/jira/browse/KAFKA-16156
> Project: Kafka
> Issue Type: Sub-task
> Components: clients, consumer, system tests
> Reporter: Lianet Magrans
> Assignee: Philip Nee
> Priority: Blocker
> Labels: consumer-threading-refactor, kip-848-client-support,
> system-tests
> Fix For: 3.8.0
>
>
> TransactionalMessageCopier run with 3.7 new consumer fails with "Invalid
> negative timestamp".
> Trace:
> [2024-01-15 07:42:33,932] TRACE [Consumer
> clientId=consumer-transactions-test-consumer-group-1,
> groupId=transactions-test-consumer-group] Received ListOffsetResponse
> ListOffsetsResponseData(throttleTimeMs=0,
> topics=[ListOffsetsTopicResponse(name='input-topic',
> partitions=[ListOffsetsPartitionResponse(partitionIndex=0, errorCode=0,
> oldStyleOffsets=[], timestamp=-1, offset=42804, leaderEpoch=0)])]) from
> broker worker2:9092 (id: 2 rack: null)
> (org.apache.kafka.clients.consumer.internals.OffsetsRequestManager)
> [2024-01-15 07:42:33,932] DEBUG [Consumer
> clientId=consumer-transactions-test-consumer-group-1,
> groupId=transactions-test-consumer-group] Handling ListOffsetResponse
> response for input-topic-0. Fetched offset 42804, timestamp -1
> (org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils)
> [2024-01-15 07:42:33,932] TRACE [Consumer
> clientId=consumer-transactions-test-consumer-group-1,
> groupId=transactions-test-consumer-group] Updating last stable offset for
> partition input-topic-0 to 42804
> (org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils)
> [2024-01-15 07:42:33,933] DEBUG [Consumer
> clientId=consumer-transactions-test-consumer-group-1,
> groupId=transactions-test-consumer-group] Fetch offsets completed
> successfully for partitions and timestamps {input-topic-0=-1}. Result
> org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils$ListOffsetResult@bf2a862
> (org.apache.kafka.clients.consumer.internals.OffsetsRequestManager)
> [2024-01-15 07:42:33,933] TRACE [Consumer
> clientId=consumer-transactions-test-consumer-group-1,
> groupId=transactions-test-consumer-group] No events to process
> (org.apache.kafka.clients.consumer.internals.events.EventProcessor)
> [2024-01-15 07:42:33,933] ERROR Shutting down after unexpected error in event
> loop (org.apache.kafka.tools.TransactionalMessageCopier)
> org.apache.kafka.common.KafkaException: java.lang.IllegalArgumentException:
> Invalid negative timestamp
> at
> org.apache.kafka.clients.consumer.internals.ConsumerUtils.maybeWrapAsKafkaException(ConsumerUtils.java:234)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerUtils.getResult(ConsumerUtils.java:212)
> at
> org.apache.kafka.clients.consumer.internals.events.CompletableApplicationEvent.get(CompletableApplicationEvent.java:44)
> at
> org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler.addAndGet(ApplicationEventHandler.java:113)
> at
> org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.beginningOrEndOffset(AsyncKafkaConsumer.java:1134)
> at
> org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.endOffsets(AsyncKafkaConsumer.java:1113)
> at
> org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.endOffsets(AsyncKafkaConsumer.java:1108)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.endOffsets(KafkaConsumer.java:1651)
> at
> org.apache.kafka.tools.TransactionalMessageCopier.messagesRemaining(TransactionalMessageCopier.java:246)
> at
> org.apache.kafka.tools.TransactionalMessageCopier.runEventLoop(TransactionalMessageCopier.java:342)
> at
> org.apache.kafka.tools.TransactionalMessageCopier.main(TransactionalMessageCopier.java:292)
> Caused by: java.lang.IllegalArgumentException: Invalid negative timestamp
> at
> org.apache.kafka.clients.consumer.OffsetAndTimestamp.<init>(OffsetAndTimestamp.java:39)
> at
> org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils.buildOffsetsForTimesResult(OffsetFetcherUtils.java:253)
> at
> org.apache.kafka.clients.consumer.internals.OffsetsRequestManager.lambda$fetchOffsets$1(OffsetsRequestManager.java:181)
> at
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
> at
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
> at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> at
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
> at
> org.apache.kafka.clients.consumer.internals.OffsetsRequestManager.lambda$buildListOffsetsRequests$3(OffsetsRequestManager.java:305)
> at
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
> at
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
> at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> at
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
> at
> org.apache.kafka.clients.consumer.internals.OffsetsRequestManager$MultiNodeRequest.addPartialResult(OffsetsRequestManager.java:612)
> at
> org.apache.kafka.clients.consumer.internals.OffsetsRequestManager$MultiNodeRequest.access$500(OffsetsRequestManager.java:586)
> at
> org.apache.kafka.clients.consumer.internals.OffsetsRequestManager.lambda$buildListOffsetsRequests$4(OffsetsRequestManager.java:328)
> at
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
> at
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
> at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> at
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
> at
> org.apache.kafka.clients.consumer.internals.OffsetsRequestManager.lambda$buildListOffsetRequestToNode$5(OffsetsRequestManager.java:369)
> at
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
> at
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
> at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> at
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
> at
> org.apache.kafka.clients.consumer.internals.NetworkClientDelegate$FutureCompletionHandler.onComplete(NetworkClientDelegate.java:354)
> at
> org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:154)
> at
> org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:608)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:600)
> at
> org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.poll(NetworkClientDelegate.java:129)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.runOnce(ConsumerNetworkThread.java:140)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.run(ConsumerNetworkThread.java:88)
--
This message was sent by Atlassian Jira
(v8.20.10#820010)