[
https://issues.apache.org/jira/browse/KAFKA-16156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17815820#comment-17815820
]
Philip Nee commented on KAFKA-16156:
------------------------------------
This issue seems easily reproducible by simply running endOffsets:
This is an example of an integration test
{code:java}
def testEndOffsets(quorum: String, groupProtocol: String): Unit = {
val producer = createProducer()
(0 until 10000).foreach { i =>
producer.send(new ProducerRecord(tp.topic, tp.partition, i.toLong, s"key
$i".getBytes, s"value $i"
.getBytes))
}
// This test ensure that the member ID is propagated from the group
coordinator when the
// assignment is received into a subsequent offset commit
val consumer = createConsumer()
assertEquals(0, consumer.assignment.size)
consumer.subscribe(List(topic).asJava)
awaitAssignment(consumer, Set(tp, tp2))
print("listing offsets")
print(consumer.endOffsets(Set(tp, tp2).asJava))
} {code}
> System test failing for new consumer on endOffsets with negative timestamps
> ---------------------------------------------------------------------------
>
> Key: KAFKA-16156
> URL: https://issues.apache.org/jira/browse/KAFKA-16156
> Project: Kafka
> Issue Type: Sub-task
> Components: clients, consumer, system tests
> Reporter: Lianet Magrans
> Assignee: Philip Nee
> Priority: Blocker
> Labels: consumer-threading-refactor, kip-848-client-support,
> system-tests
> Fix For: 3.8.0
>
>
> TransactionalMessageCopier run with 3.7 new consumer fails with "Invalid
> negative timestamp".
> Trace:
> [2024-01-15 07:42:33,932] TRACE [Consumer
> clientId=consumer-transactions-test-consumer-group-1,
> groupId=transactions-test-consumer-group] Received ListOffsetResponse
> ListOffsetsResponseData(throttleTimeMs=0,
> topics=[ListOffsetsTopicResponse(name='input-topic',
> partitions=[ListOffsetsPartitionResponse(partitionIndex=0, errorCode=0,
> oldStyleOffsets=[], timestamp=-1, offset=42804, leaderEpoch=0)])]) from
> broker worker2:9092 (id: 2 rack: null)
> (org.apache.kafka.clients.consumer.internals.OffsetsRequestManager)
> [2024-01-15 07:42:33,932] DEBUG [Consumer
> clientId=consumer-transactions-test-consumer-group-1,
> groupId=transactions-test-consumer-group] Handling ListOffsetResponse
> response for input-topic-0. Fetched offset 42804, timestamp -1
> (org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils)
> [2024-01-15 07:42:33,932] TRACE [Consumer
> clientId=consumer-transactions-test-consumer-group-1,
> groupId=transactions-test-consumer-group] Updating last stable offset for
> partition input-topic-0 to 42804
> (org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils)
> [2024-01-15 07:42:33,933] DEBUG [Consumer
> clientId=consumer-transactions-test-consumer-group-1,
> groupId=transactions-test-consumer-group] Fetch offsets completed
> successfully for partitions and timestamps {input-topic-0=-1}. Result
> org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils$ListOffsetResult@bf2a862
> (org.apache.kafka.clients.consumer.internals.OffsetsRequestManager)
> [2024-01-15 07:42:33,933] TRACE [Consumer
> clientId=consumer-transactions-test-consumer-group-1,
> groupId=transactions-test-consumer-group] No events to process
> (org.apache.kafka.clients.consumer.internals.events.EventProcessor)
> [2024-01-15 07:42:33,933] ERROR Shutting down after unexpected error in event
> loop (org.apache.kafka.tools.TransactionalMessageCopier)
> org.apache.kafka.common.KafkaException: java.lang.IllegalArgumentException:
> Invalid negative timestamp
> at
> org.apache.kafka.clients.consumer.internals.ConsumerUtils.maybeWrapAsKafkaException(ConsumerUtils.java:234)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerUtils.getResult(ConsumerUtils.java:212)
> at
> org.apache.kafka.clients.consumer.internals.events.CompletableApplicationEvent.get(CompletableApplicationEvent.java:44)
> at
> org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler.addAndGet(ApplicationEventHandler.java:113)
> at
> org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.beginningOrEndOffset(AsyncKafkaConsumer.java:1134)
> at
> org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.endOffsets(AsyncKafkaConsumer.java:1113)
> at
> org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.endOffsets(AsyncKafkaConsumer.java:1108)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.endOffsets(KafkaConsumer.java:1651)
> at
> org.apache.kafka.tools.TransactionalMessageCopier.messagesRemaining(TransactionalMessageCopier.java:246)
> at
> org.apache.kafka.tools.TransactionalMessageCopier.runEventLoop(TransactionalMessageCopier.java:342)
> at
> org.apache.kafka.tools.TransactionalMessageCopier.main(TransactionalMessageCopier.java:292)
> Caused by: java.lang.IllegalArgumentException: Invalid negative timestamp
> at
> org.apache.kafka.clients.consumer.OffsetAndTimestamp.<init>(OffsetAndTimestamp.java:39)
> at
> org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils.buildOffsetsForTimesResult(OffsetFetcherUtils.java:253)
> at
> org.apache.kafka.clients.consumer.internals.OffsetsRequestManager.lambda$fetchOffsets$1(OffsetsRequestManager.java:181)
> at
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
> at
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
> at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> at
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
> at
> org.apache.kafka.clients.consumer.internals.OffsetsRequestManager.lambda$buildListOffsetsRequests$3(OffsetsRequestManager.java:305)
> at
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
> at
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
> at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> at
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
> at
> org.apache.kafka.clients.consumer.internals.OffsetsRequestManager$MultiNodeRequest.addPartialResult(OffsetsRequestManager.java:612)
> at
> org.apache.kafka.clients.consumer.internals.OffsetsRequestManager$MultiNodeRequest.access$500(OffsetsRequestManager.java:586)
> at
> org.apache.kafka.clients.consumer.internals.OffsetsRequestManager.lambda$buildListOffsetsRequests$4(OffsetsRequestManager.java:328)
> at
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
> at
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
> at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> at
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
> at
> org.apache.kafka.clients.consumer.internals.OffsetsRequestManager.lambda$buildListOffsetRequestToNode$5(OffsetsRequestManager.java:369)
> at
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
> at
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
> at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> at
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
> at
> org.apache.kafka.clients.consumer.internals.NetworkClientDelegate$FutureCompletionHandler.onComplete(NetworkClientDelegate.java:354)
> at
> org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:154)
> at
> org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:608)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:600)
> at
> org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.poll(NetworkClientDelegate.java:129)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.runOnce(ConsumerNetworkThread.java:140)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.run(ConsumerNetworkThread.java:88)
--
This message was sent by Atlassian Jira
(v8.20.10#820010)