[
https://issues.apache.org/jira/browse/KAFKA-16194?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Kirk True updated KAFKA-16194:
------------------------------
Fix Version/s: 3.8.0
> KafkaConsumer.groupMetadata() should be correct when first records are
> returned
> -------------------------------------------------------------------------------
>
> Key: KAFKA-16194
> URL: https://issues.apache.org/jira/browse/KAFKA-16194
> Project: Kafka
> Issue Type: Sub-task
> Components: clients, consumer
> Reporter: David Jacot
> Assignee: Bruno Cadonna
> Priority: Major
> Labels: consumer-threading-refactor, kip-848-client-support
> Fix For: 3.8.0
>
>
> The following code returns records before the group metadata is updated. This
> fails the first transactions ever run by the Producer/Consumer.
>
> {code:java}
> Producer<String, String> txnProducer = new KafkaProducer<>(txnProducerProps);
> Consumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
> txnProducer.initTransactions();
> System.out.println("Init transactions called");
> try {
> txnProducer.beginTransaction();
> System.out.println("Begin transactions called");
> consumer.subscribe(Collections.singletonList("input"));
> System.out.println("Consumer subscribed to topic -> KIP848-topic-2 ");
> ConsumerRecords<String, String> records =
> consumer.poll(Duration.ofSeconds(10));
> System.out.println("Returned " + records.count() + " records.");
> // Process and send txn messages.
> for (ConsumerRecord<String, String> processedRecord : records) {
> txnProducer.send(new ProducerRecord<>("output",
> processedRecord.key(), "Processed: " + processedRecord.value()));
> }
> ConsumerGroupMetadata groupMetadata = consumer.groupMetadata();
> System.out.println("Group metadata inside test" + groupMetadata);
> Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>();
> for (ConsumerRecord<String, String> record : records) {
> offsetsToCommit.put(new TopicPartition(record.topic(),
> record.partition()),
> new OffsetAndMetadata(record.offset() + 1));
> }
> System.out.println("Offsets to commit" + offsetsToCommit);
> // Send offsets to transaction with ConsumerGroupMetadata.
> txnProducer.sendOffsetsToTransaction(offsetsToCommit, groupMetadata);
> System.out.println("Send offsets to transaction done");
> // Commit the transaction.
> txnProducer.commitTransaction();
> System.out.println("Commit transaction done");
> } catch (ProducerFencedException | OutOfOrderSequenceException |
> AuthorizationException e) {
> e.printStackTrace();
> txnProducer.close();
> } catch (KafkaException e) {
> e.printStackTrace();
> txnProducer.abortTransaction();
> } finally {
> txnProducer.close();
> consumer.close();
> } {code}
> The issue seems to be that while it waits in `poll`, the event to update the
> group metadata is not processed.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)