[
https://issues.apache.org/jira/browse/KAFKA-16637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17842470#comment-17842470
]
Lianet Magrans edited comment on KAFKA-16637 at 4/30/24 6:10 PM:
-----------------------------------------------------------------
Hey [~chickenchickenlove], just to rule out the basics, did you make sure to
produce messages to the same test-topic1? No other consumers subscribed to it
that could be owning the partition? I tried your code again, 1 topic, 1
partition, 1 instance of your consumer app running with the poll duration of
1s, and was able to consume messages as expected. I only changed to
StringDeserializers for simplicity:
{quote}props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());{quote}
and produced a bunch of messages with:
bq. for x in {1..10}; do echo "Test message $x"; done |
./bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic topic1
Then the consumer app with your code printed the 10 messages as expected. If
after double checking that you're still facing issues, I would suggest to
provide a ConsumerRebalanceListener to the call to subscribe, just to
check/print the partitions assigned to your consumer on the
onPartitionsAssigned callback, and also taking a look and share the broker logs
to understand more about what's going on on your setup. Hope it helps!
was (Author: JIRAUSER300183):
Hey [~chickenchickenlove], just to rule out the basics, did you make sure to
produce messages to the same test-topic1? No other consumers subscribed to it
that could be owning the partition? I tried your code again, 1 topic, 1
partition, 1 instance of your consumer app running with the poll duration of
1s, and was able to consume messages as expected. I only changed to
StringDeserializers for simplicity:
{quote}props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());{quote}
and produced a bunch of messages with:
bq. for x in {1..10}; do echo "Test message $x"; done |
./bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic topic1
Then the consumer app with your code printed the 10 messages as expected. If
after double checking that you're still facing issues, I would suggest to
provide a ConsumerRebalanceListener to the call to subscribe, just to
check/print the partitions assigned to your consumer on the
onPartitionsAssigned callback, and also taking a look and share the broker logs
to understand more about what's going on on your setup. Hope it helps
> KIP-848 does not work well
> --------------------------
>
> Key: KAFKA-16637
> URL: https://issues.apache.org/jira/browse/KAFKA-16637
> Project: Kafka
> Issue Type: Bug
> Components: clients, consumer
> Reporter: sanghyeok An
> Assignee: Kirk True
> Priority: Minor
> Labels: kip-848-client-support
> Fix For: 3.8.0
>
> Attachments: image-2024-04-30-08-33-06-367.png,
> image-2024-04-30-08-33-50-435.png
>
>
> I want to test next generation of the consumer rebalance protocol
> ([https://cwiki.apache.org/confluence/display/KAFKA/The+Next+Generation+of+the+Consumer+Rebalance+Protocol+%28KIP-848%29+-+Early+Access+Release+Notes)]
>
> However, it does not works well.
> You can check my condition.
>
> *Docker-compose.yaml*
> [https://github.com/chickenchickenlove/kraft-test/blob/main/docker-compose/docker-compose.yaml]
>
> *Consumer Code*
> [https://github.com/chickenchickenlove/kraft-test/blob/main/src/main/java/org/example/Main.java]
>
> *Consumer logs*
> [main] INFO org.apache.kafka.common.telemetry.internals.KafkaMetricsCollector
> - initializing Kafka metrics collector
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.7.0
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId:
> 2ae524ed625438c5
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs:
> 1714309299215
> [main] INFO org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer -
> [Consumer clientId=1-1, groupId=1] Subscribed to topic(s): test-topic1
> [consumer_background_thread] INFO org.apache.kafka.clients.Metadata -
> [Consumer clientId=1-1, groupId=1] Cluster ID: Some(MkU3OEVBNTcwNTJENDM2Qk)
> Stuck In here...
>
> *Broker logs*
> broker | [2024-04-28 12:42:27,751] INFO Sent auto-creation request for
> Set(__consumer_offsets) to the active controller.
> (kafka.server.DefaultAutoTopicCreationManager)
> broker | [2024-04-28 12:42:27,801] INFO Sent auto-creation request for
> Set(__consumer_offsets) to the active controller.
> (kafka.server.DefaultAutoTopicCreationManager)
> broker | [2024-04-28 12:42:28,211] INFO Sent auto-creation request for
> Set(__consumer_offsets) to the active controller.
> (kafka.server.DefaultAutoTopicCreationManager)
> broker | [2024-04-28 12:42:28,259] INFO Sent auto-creation request for
> Set(__consumer_offsets) to the active controller.
> (kafka.server.DefaultAutoTopicCreationManager)
> broker | [2024-04-28 12:42:28,727] INFO Sent auto-creation request for
> Set(__consumer_offsets) to the active controller.
> (kafka.server.DefaultAutoTopicCreationManager)
> stuck in here
--
This message was sent by Atlassian Jira
(v8.20.10#820010)