[
https://issues.apache.org/jira/browse/KAFKA-16777?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Kirk True resolved KAFKA-16777.
-------------------------------
Reviewer: Bruno Cadonna
Resolution: Fixed
Resolved as part of fix for KAFKA-16637.
> New consumer should throw NoOffsetForPartitionException on continuous poll
> zero if no reset strategy
> ----------------------------------------------------------------------------------------------------
>
> Key: KAFKA-16777
> URL: https://issues.apache.org/jira/browse/KAFKA-16777
> Project: Kafka
> Issue Type: Bug
> Components: clients, consumer
> Reporter: Lianet Magrans
> Assignee: Kirk True
> Priority: Blocker
> Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> If the consumer does not define an offset reset strategy, a call to poll
> should fail with NoOffsetForPartitionException. That works as expected on the
> new consumer when polling with a timeout > 0 (existing integration test
> [here|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/core/src/test/scala/integration/kafka/api/PlaintextConsumerFetchTest.scala#L36]),
> but fails when polling continuously with ZERO timeout.
> This can be easily reproduced with a new integration test like this (passes
> for the legacy consumer but fails for the new consumer). We should add it as
> part of the fix, for better coverage:
> {code:java}
> @ParameterizedTest(name =
> TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
> @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
> def testNoOffsetForPartitionExceptionOnPollZero(quorum: String,
> groupProtocol: String): Unit = {
> this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
> "none")
> val consumer = createConsumer(configOverrides = this.consumerConfig)
> consumer.assign(List(tp).asJava)
> // continuous poll should eventually fail because there is no offset
> reset strategy set (fail only when resetting positions after coordinator is
> known)
> TestUtils.tryUntilNoAssertionError() {
> assertThrows(classOf[NoOffsetForPartitionException], () =>
> consumer.poll(Duration.ZERO))
> }
> }
> {code}
> Also this is covered in the unit test
> [KafkaConsumerTest.testMissingOffsetNoResetPolicy|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java#L915],
> that is currently enabled only for the LegacyConsumer. After fixing this
> issue we should be able to enable it for the new consumer too.
> The issue seems to be around calling poll with ZERO timeout, that even when
> called continuously, the consumer is not able to
> initWithCommittedOffsetsIfNeeded, so the updateFetchPositions never makes it
> to
> [resetInitializingPositions|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1663],
> where the exception is thrown.
>
> There is the related issue https://issues.apache.org/jira/browse/KAFKA-16637,
> but filing this one to provide more context and point out the test failures
> and suggested new tests,. All fail even with the current patch in KAFKA-16637
> so needs investigation.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)