tstuber opened a new issue #1960: URL: https://github.com/apache/camel-quarkus/issues/1960
Hi all I am trying to consume Avro messages from Kafka by using camel-quarkus-kafka by including the schema-registry. **My test setup:** - local Kafka broker/zk - local schema registry - local Kafka event producer, writin new random avro messages (schema: fields with "id" and "message") **Issue** When I am involving the schema registry parameters, I run into an infinit loop of consumer reconnections. The log component is also never hit. I am not sure If I misconfigured something or if there is an issue involved in that. What I would like to achieve is that a consumer derives the schema automatically and would convert the Avro schema into an pojo for further processing. from(kafka("avro-events") .brokers("{{kafka.bootstrap.servers}}") .schemaRegistryURL("{{schema.registry.url}}") .specificAvroReader(true) .valueDeserializer("io.confluent.kafka.serializers.KafkaAvroDeserializer") .resolve(getContext())) .log("Avro Message: ${body}") ``` 2020-10-27 11:06:22,746 INFO [org.apa.kaf.cli.con.ConsumerConfig] (Camel (camel-1) thread #0 - KafkaConsumer[avro-events]) ConsumerConfig values: allow.auto.create.topics = true auto.commit.interval.ms = 5000 auto.offset.reset = latest bootstrap.servers = [localhost:9092] check.crcs = true client.dns.lookup = default client.id = client.rack = connections.max.idle.ms = 540000 default.api.timeout.ms = 60000 enable.auto.commit = true exclude.internal.topics = true fetch.max.bytes = 52428800 fetch.max.wait.ms = 500 fetch.min.bytes = 1 group.id = 5919fa66-4bf2-47b2-b227-3d02769f3225 group.instance.id = null heartbeat.interval.ms = 3000 interceptor.classes = [] internal.leave.group.on.close = true isolation.level = read_uncommitted key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer max.partition.fetch.bytes = 1048576 max.poll.interval.ms = 300000 max.poll.records = 500 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor] receive.buffer.bytes = 65536 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 40000 retry.backoff.ms = 100 sasl.client.callback.handler.class = null sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.login.callback.handler.class = null sasl.login.class = null sasl.login.refresh.buffer.seconds = 300 sasl.login.refresh.min.period.seconds = 60 sasl.login.refresh.window.factor = 0.8 sasl.login.refresh.window.jitter = 0.05 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT security.providers = null send.buffer.bytes = 131072 session.timeout.ms = 10000 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2] ssl.endpoint.identification.algorithm = https ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLSv1.2 ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS value.deserializer = class io.confluent.kafka.serializers.KafkaAvroDeserializer 2020-10-27 11:06:22,747 INFO [io.con.kaf.ser.KafkaAvroDeserializerConfig] (Camel (camel-1) thread #0 - KafkaConsumer[avro-events]) KafkaAvroDeserializerConfig values: bearer.auth.token = [hidden] proxy.port = -1 schema.reflection = false auto.register.schemas = true max.schemas.per.subject = 1000 basic.auth.credentials.source = URL specific.avro.reader = true value.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy schema.registry.url = [localhost:8081] basic.auth.user.info = [hidden] proxy.host = use.latest.version = false schema.registry.basic.auth.user.info = [hidden] bearer.auth.credentials.source = STATIC_TOKEN key.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy 2020-10-27 11:06:22,753 INFO [org.apa.kaf.com.uti.AppInfoParser] (Camel (camel-1) thread #0 - KafkaConsumer[avro-events]) Kafka version: 2.5.0 2020-10-27 11:06:22,753 INFO [org.apa.kaf.com.uti.AppInfoParser] (Camel (camel-1) thread #0 - KafkaConsumer[avro-events]) Kafka commitId: 66563e712b0b9f84 2020-10-27 11:06:22,754 INFO [org.apa.kaf.com.uti.AppInfoParser] (Camel (camel-1) thread #0 - KafkaConsumer[avro-events]) Kafka startTimeMs: 1603793182753 2020-10-27 11:06:22,754 INFO [org.apa.cam.com.kaf.KafkaConsumer] (Camel (camel-1) thread #0 - KafkaConsumer[avro-events]) Reconnecting avro-events-Thread 0 to topic avro-events after 5000 ms 2020-10-27 11:06:27,755 INFO [org.apa.cam.com.kaf.KafkaConsumer] (Camel (camel-1) thread #0 - KafkaConsumer[avro-events]) Subscribing avro-events-Thread 0 to topic avro-events 2020-10-27 11:06:27,757 INFO [org.apa.kaf.cli.con.KafkaConsumer] (Camel (camel-1) thread #0 - KafkaConsumer[avro-events]) [Consumer clientId=consumer-5919fa66-4bf2-47b2-b227-3d02769f3225-2, groupId=5919fa66-4bf2-47b2-b227-3d02769f3225] Subscribed to topic(s): avro-events 2020-10-27 11:06:27,774 INFO [org.apa.kaf.cli.Metadata] (Camel (camel-1) thread #0 - KafkaConsumer[avro-events]) [Consumer clientId=consumer-5919fa66-4bf2-47b2-b227-3d02769f3225-2, groupId=5919fa66-4bf2-47b2-b227-3d02769f3225] Cluster ID: CBXaaoMdSfu7ChosVF8hRQ 2020-10-27 11:06:27,776 INFO [org.apa.kaf.cli.con.int.AbstractCoordinator] (Camel (camel-1) thread #0 - KafkaConsumer[avro-events]) [Consumer clientId=consumer-5919fa66-4bf2-47b2-b227-3d02769f3225-2, groupId=5919fa66-4bf2-47b2-b227-3d02769f3225] Discovered group coordinator localhost:9092 (id: 2147483647 rack: null) 2020-10-27 11:06:27,779 INFO [org.apa.kaf.cli.con.int.AbstractCoordinator] (Camel (camel-1) thread #0 - KafkaConsumer[avro-events]) [Consumer clientId=consumer-5919fa66-4bf2-47b2-b227-3d02769f3225-2, groupId=5919fa66-4bf2-47b2-b227-3d02769f3225] (Re-)joining group 2020-10-27 11:06:27,792 INFO [org.apa.kaf.cli.con.int.AbstractCoordinator] (Camel (camel-1) thread #0 - KafkaConsumer[avro-events]) [Consumer clientId=consumer-5919fa66-4bf2-47b2-b227-3d02769f3225-2, groupId=5919fa66-4bf2-47b2-b227-3d02769f3225] Join group failed with org.apache.kafka.common.errors.MemberIdRequiredException: The group member needs to have a valid member id before actually entering a consumer group 2020-10-27 11:06:27,793 INFO [org.apa.kaf.cli.con.int.AbstractCoordinator] (Camel (camel-1) thread #0 - KafkaConsumer[avro-events]) [Consumer clientId=consumer-5919fa66-4bf2-47b2-b227-3d02769f3225-2, groupId=5919fa66-4bf2-47b2-b227-3d02769f3225] (Re-)joining group 2020-10-27 11:06:27,799 INFO [org.apa.kaf.cli.con.int.ConsumerCoordinator] (Camel (camel-1) thread #0 - KafkaConsumer[avro-events]) [Consumer clientId=consumer-5919fa66-4bf2-47b2-b227-3d02769f3225-2, groupId=5919fa66-4bf2-47b2-b227-3d02769f3225] Finished assignment for group at generation 3: {consumer-5919fa66-4bf2-47b2-b227-3d02769f3225-2-e4c3ddd6-e9da-4b80-bbf5-1631a3f08c9f=Assignment(partitions=[avro-events-0])} 2020-10-27 11:06:27,802 INFO [org.apa.kaf.cli.con.int.AbstractCoordinator] (Camel (camel-1) thread #0 - KafkaConsumer[avro-events]) [Consumer clientId=consumer-5919fa66-4bf2-47b2-b227-3d02769f3225-2, groupId=5919fa66-4bf2-47b2-b227-3d02769f3225] Successfully joined group with generation 3 2020-10-27 11:06:27,802 INFO [org.apa.kaf.cli.con.int.ConsumerCoordinator] (Camel (camel-1) thread #0 - KafkaConsumer[avro-events]) [Consumer clientId=consumer-5919fa66-4bf2-47b2-b227-3d02769f3225-2, groupId=5919fa66-4bf2-47b2-b227-3d02769f3225] Adding newly assigned partitions: avro-events-0 2020-10-27 11:06:27,808 INFO [org.apa.kaf.cli.con.int.ConsumerCoordinator] (Camel (camel-1) thread #0 - KafkaConsumer[avro-events]) [Consumer clientId=consumer-5919fa66-4bf2-47b2-b227-3d02769f3225-2, groupId=5919fa66-4bf2-47b2-b227-3d02769f3225] Setting offset for partition avro-events-0 to the committed offset FetchPosition{offset=914, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 0 rack: null)], epoch=0}} 2020-10-27 11:06:27,816 INFO [org.apa.kaf.cli.con.int.ConsumerCoordinator] (Camel (camel-1) thread #0 - KafkaConsumer[avro-events]) [Consumer clientId=consumer-5919fa66-4bf2-47b2-b227-3d02769f3225-2, groupId=5919fa66-4bf2-47b2-b227-3d02769f3225] Revoke previously assigned partitions avro-events-0 2020-10-27 11:06:27,816 INFO [org.apa.kaf.cli.con.int.AbstractCoordinator] (Camel (camel-1) thread #0 - KafkaConsumer[avro-events]) [Consumer clientId=consumer-5919fa66-4bf2-47b2-b227-3d02769f3225-2, groupId=5919fa66-4bf2-47b2-b227-3d02769f3225] Member consumer-5919fa66-4bf2-47b2-b227-3d02769f3225-2-e4c3ddd6-e9da-4b80-bbf5-1631a3f08c9f sending LeaveGroup request to coordinator localhost:9092 (id: 2147483647 rack: null) due to the consumer is being closed 2020-10-27 11:06:27,823 INFO [org.apa.kaf.cli.con.ConsumerConfig] (Camel (camel-1) thread #0 - KafkaConsumer[avro-events]) ConsumerConfig values: allow.auto.create.topics = true auto.commit.interval.ms = 5000 auto.offset.reset = latest bootstrap.servers = [localhost:9092] check.crcs = true client.dns.lookup = default client.id = client.rack = connections.max.idle.ms = 540000 default.api.timeout.ms = 60000 enable.auto.commit = true exclude.internal.topics = true fetch.max.bytes = 52428800 fetch.max.wait.ms = 500 fetch.min.bytes = 1 group.id = 5919fa66-4bf2-47b2-b227-3d02769f3225 group.instance.id = null heartbeat.interval.ms = 3000 interceptor.classes = [] internal.leave.group.on.close = true isolation.level = read_uncommitted key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer max.partition.fetch.bytes = 1048576 max.poll.interval.ms = 300000 max.poll.records = 500 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor] receive.buffer.bytes = 65536 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 40000 retry.backoff.ms = 100 sasl.client.callback.handler.class = null sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.login.callback.handler.class = null sasl.login.class = null sasl.login.refresh.buffer.seconds = 300 sasl.login.refresh.min.period.seconds = 60 sasl.login.refresh.window.factor = 0.8 sasl.login.refresh.window.jitter = 0.05 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT security.providers = null send.buffer.bytes = 131072 session.timeout.ms = 10000 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2] ssl.endpoint.identification.algorithm = https ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLSv1.2 ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS value.deserializer = class io.confluent.kafka.serializers.KafkaAvroDeserializer ``` When I consume and log messages without involving schema registry, it works fine: from(kafka("avro-events") .brokers("{{kafka.bootstrap.servers}}") .resolve(getContext())) .log("Avro Message: ${body}"); Output: ``` 2020-10-27 10:58:07,699 INFO [route1] (Camel (camel-1) thread #0 - KafkaConsumer[avro-events]) Avro Message: �H786db2c3-c0ee-497e-ba61-dabca64f2729 2020-10-27 10:58:10,710 INFO [org.apa.cam.Tracing] (Camel (camel-1) thread #0 - KafkaConsumer[avro-events]) [route1 ] [log ] Exchange[Id: D25D23EC4A3C0CF-000000000000001D, BodyType: String, Body: �H38dadb0e-75b3-484c-b499-f61c654512a5] 2020-10-27 10:58:10,710 INFO [route1] (Camel (camel-1) thread #0 - KafkaConsumer[avro-events]) Avro Message: �H38dadb0e-75b3-484c-b499-f61c654512a5 2020-10-27 10:58:13,699 INFO [org.apa.cam.Tracing] (Camel (camel-1) thread #0 - KafkaConsumer[avro-events]) [route1 ] [log ] Exchange[Id: D25D23EC4A3C0CF-000000000000001E, BodyType: String, Body: �Ha53e0dfa-e302-4b1e-a5ff-db33bcf6972e] 2020-10-27 10:58:13,699 INFO [route1] (Camel (camel-1) thread #0 - KafkaConsumer[avro-events]) Avro Message: �Ha53e0dfa-e302-4b1e-a5ff-db33bcf6972e 2020-10-27 10:58:16,707 INFO [org.apa.cam.Tracing] (Camel (camel-1) thread #0 - KafkaConsumer[avro-events]) [route1 ] [log ] Exchange[Id: D25D23EC4A3C0CF-000000000000001F, BodyType: String, Body: �Hc116e361-7831-4e06-9d1b-759b8bb516f5] 2020-10-27 10:58:16,707 INFO [route1] (Camel (camel-1) thread #0 - KafkaConsumer[avro-events]) Avro Message: �Hc116e361-7831-4e06-9d1b-759b8bb516f5 ``` Thanks a lot for your support. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org