tstuber opened a new issue #1960:
URL: https://github.com/apache/camel-quarkus/issues/1960


   Hi all
   
   I am trying to consume Avro messages from Kafka by using camel-quarkus-kafka 
by including the schema-registry.
   
   **My test setup:**
   
   - local Kafka broker/zk
   - local schema registry
   - local Kafka event producer, writin new random avro messages (schema: 
fields with "id" and "message")
   
   **Issue**
   
   When I am involving the schema registry parameters, I run into an infinit 
loop of consumer reconnections. The log component is also never hit. I am not 
sure If I misconfigured something or if there is an issue involved in that. 
   What I would like to achieve is that a consumer derives the schema 
automatically and would convert the Avro schema into an pojo for further 
processing. 
   
           from(kafka("avro-events")
                   .brokers("{{kafka.bootstrap.servers}}")
                   .schemaRegistryURL("{{schema.registry.url}}")
                   .specificAvroReader(true)
                   
.valueDeserializer("io.confluent.kafka.serializers.KafkaAvroDeserializer")
                   .resolve(getContext()))
   
                   .log("Avro Message: ${body}")
   
   ```
   2020-10-27 11:06:22,746 INFO  [org.apa.kaf.cli.con.ConsumerConfig] (Camel 
(camel-1) thread #0 - KafkaConsumer[avro-events]) ConsumerConfig values: 
           allow.auto.create.topics = true
           auto.commit.interval.ms = 5000
           auto.offset.reset = latest
           bootstrap.servers = [localhost:9092]
           check.crcs = true
           client.dns.lookup = default
           client.id = 
           client.rack = 
           connections.max.idle.ms = 540000
           default.api.timeout.ms = 60000
           enable.auto.commit = true
           exclude.internal.topics = true
           fetch.max.bytes = 52428800
           fetch.max.wait.ms = 500
           fetch.min.bytes = 1
           group.id = 5919fa66-4bf2-47b2-b227-3d02769f3225
           group.instance.id = null
           heartbeat.interval.ms = 3000
           interceptor.classes = []
           internal.leave.group.on.close = true
           isolation.level = read_uncommitted
           key.deserializer = class 
org.apache.kafka.common.serialization.StringDeserializer
           max.partition.fetch.bytes = 1048576
           max.poll.interval.ms = 300000
           max.poll.records = 500
           metadata.max.age.ms = 300000
           metric.reporters = []
           metrics.num.samples = 2
           metrics.recording.level = INFO
           metrics.sample.window.ms = 30000
           partition.assignment.strategy = 
[org.apache.kafka.clients.consumer.RangeAssignor]
           receive.buffer.bytes = 65536
           reconnect.backoff.max.ms = 1000
           reconnect.backoff.ms = 50
           request.timeout.ms = 40000
           retry.backoff.ms = 100
           sasl.client.callback.handler.class = null
           sasl.jaas.config = null
           sasl.kerberos.kinit.cmd = /usr/bin/kinit
           sasl.kerberos.min.time.before.relogin = 60000
           sasl.kerberos.service.name = null
           sasl.kerberos.ticket.renew.jitter = 0.05
           sasl.kerberos.ticket.renew.window.factor = 0.8
           sasl.login.callback.handler.class = null
           sasl.login.class = null
           sasl.login.refresh.buffer.seconds = 300
           sasl.login.refresh.min.period.seconds = 60
           sasl.login.refresh.window.factor = 0.8
           sasl.login.refresh.window.jitter = 0.05
           sasl.mechanism = GSSAPI
           security.protocol = PLAINTEXT
           security.providers = null
           send.buffer.bytes = 131072
           session.timeout.ms = 10000
           ssl.cipher.suites = null
           ssl.enabled.protocols = [TLSv1.2]
           ssl.endpoint.identification.algorithm = https
           ssl.key.password = null
           ssl.keymanager.algorithm = SunX509
           ssl.keystore.location = null
           ssl.keystore.password = null
           ssl.keystore.type = JKS
           ssl.protocol = TLSv1.2
           ssl.provider = null
           ssl.secure.random.implementation = null
           ssl.trustmanager.algorithm = PKIX
           ssl.truststore.location = null
           ssl.truststore.password = null
           ssl.truststore.type = JKS
           value.deserializer = class 
io.confluent.kafka.serializers.KafkaAvroDeserializer
   
   2020-10-27 11:06:22,747 INFO  [io.con.kaf.ser.KafkaAvroDeserializerConfig] 
(Camel (camel-1) thread #0 - KafkaConsumer[avro-events]) 
KafkaAvroDeserializerConfig values: 
           bearer.auth.token = [hidden]
           proxy.port = -1
           schema.reflection = false
           auto.register.schemas = true
           max.schemas.per.subject = 1000
           basic.auth.credentials.source = URL
           specific.avro.reader = true
           value.subject.name.strategy = class 
io.confluent.kafka.serializers.subject.TopicNameStrategy
           schema.registry.url = [localhost:8081]
           basic.auth.user.info = [hidden]
           proxy.host = 
           use.latest.version = false
           schema.registry.basic.auth.user.info = [hidden]
           bearer.auth.credentials.source = STATIC_TOKEN
           key.subject.name.strategy = class 
io.confluent.kafka.serializers.subject.TopicNameStrategy
   
   2020-10-27 11:06:22,753 INFO  [org.apa.kaf.com.uti.AppInfoParser] (Camel 
(camel-1) thread #0 - KafkaConsumer[avro-events]) Kafka version: 2.5.0
   2020-10-27 11:06:22,753 INFO  [org.apa.kaf.com.uti.AppInfoParser] (Camel 
(camel-1) thread #0 - KafkaConsumer[avro-events]) Kafka commitId: 
66563e712b0b9f84
   2020-10-27 11:06:22,754 INFO  [org.apa.kaf.com.uti.AppInfoParser] (Camel 
(camel-1) thread #0 - KafkaConsumer[avro-events]) Kafka startTimeMs: 
1603793182753
   2020-10-27 11:06:22,754 INFO  [org.apa.cam.com.kaf.KafkaConsumer] (Camel 
(camel-1) thread #0 - KafkaConsumer[avro-events]) Reconnecting 
avro-events-Thread 0 to topic avro-events after 5000 ms
   2020-10-27 11:06:27,755 INFO  [org.apa.cam.com.kaf.KafkaConsumer] (Camel 
(camel-1) thread #0 - KafkaConsumer[avro-events]) Subscribing 
avro-events-Thread 0 to topic avro-events
   2020-10-27 11:06:27,757 INFO  [org.apa.kaf.cli.con.KafkaConsumer] (Camel 
(camel-1) thread #0 - KafkaConsumer[avro-events]) [Consumer 
clientId=consumer-5919fa66-4bf2-47b2-b227-3d02769f3225-2, 
groupId=5919fa66-4bf2-47b2-b227-3d02769f3225] Subscribed to topic(s): 
avro-events
   2020-10-27 11:06:27,774 INFO  [org.apa.kaf.cli.Metadata] (Camel (camel-1) 
thread #0 - KafkaConsumer[avro-events]) [Consumer 
clientId=consumer-5919fa66-4bf2-47b2-b227-3d02769f3225-2, 
groupId=5919fa66-4bf2-47b2-b227-3d02769f3225] Cluster ID: CBXaaoMdSfu7ChosVF8hRQ
   2020-10-27 11:06:27,776 INFO  [org.apa.kaf.cli.con.int.AbstractCoordinator] 
(Camel (camel-1) thread #0 - KafkaConsumer[avro-events]) [Consumer 
clientId=consumer-5919fa66-4bf2-47b2-b227-3d02769f3225-2, 
groupId=5919fa66-4bf2-47b2-b227-3d02769f3225] Discovered group coordinator 
localhost:9092 (id: 2147483647 rack: null)
   2020-10-27 11:06:27,779 INFO  [org.apa.kaf.cli.con.int.AbstractCoordinator] 
(Camel (camel-1) thread #0 - KafkaConsumer[avro-events]) [Consumer 
clientId=consumer-5919fa66-4bf2-47b2-b227-3d02769f3225-2, 
groupId=5919fa66-4bf2-47b2-b227-3d02769f3225] (Re-)joining group
   2020-10-27 11:06:27,792 INFO  [org.apa.kaf.cli.con.int.AbstractCoordinator] 
(Camel (camel-1) thread #0 - KafkaConsumer[avro-events]) [Consumer 
clientId=consumer-5919fa66-4bf2-47b2-b227-3d02769f3225-2, 
groupId=5919fa66-4bf2-47b2-b227-3d02769f3225] Join group failed with 
org.apache.kafka.common.errors.MemberIdRequiredException: The group member 
needs to have a valid member id before actually entering a consumer group
   2020-10-27 11:06:27,793 INFO  [org.apa.kaf.cli.con.int.AbstractCoordinator] 
(Camel (camel-1) thread #0 - KafkaConsumer[avro-events]) [Consumer 
clientId=consumer-5919fa66-4bf2-47b2-b227-3d02769f3225-2, 
groupId=5919fa66-4bf2-47b2-b227-3d02769f3225] (Re-)joining group
   2020-10-27 11:06:27,799 INFO  [org.apa.kaf.cli.con.int.ConsumerCoordinator] 
(Camel (camel-1) thread #0 - KafkaConsumer[avro-events]) [Consumer 
clientId=consumer-5919fa66-4bf2-47b2-b227-3d02769f3225-2, 
groupId=5919fa66-4bf2-47b2-b227-3d02769f3225] Finished assignment for group at 
generation 3: 
{consumer-5919fa66-4bf2-47b2-b227-3d02769f3225-2-e4c3ddd6-e9da-4b80-bbf5-1631a3f08c9f=Assignment(partitions=[avro-events-0])}
   2020-10-27 11:06:27,802 INFO  [org.apa.kaf.cli.con.int.AbstractCoordinator] 
(Camel (camel-1) thread #0 - KafkaConsumer[avro-events]) [Consumer 
clientId=consumer-5919fa66-4bf2-47b2-b227-3d02769f3225-2, 
groupId=5919fa66-4bf2-47b2-b227-3d02769f3225] Successfully joined group with 
generation 3
   2020-10-27 11:06:27,802 INFO  [org.apa.kaf.cli.con.int.ConsumerCoordinator] 
(Camel (camel-1) thread #0 - KafkaConsumer[avro-events]) [Consumer 
clientId=consumer-5919fa66-4bf2-47b2-b227-3d02769f3225-2, 
groupId=5919fa66-4bf2-47b2-b227-3d02769f3225] Adding newly assigned partitions: 
avro-events-0
   2020-10-27 11:06:27,808 INFO  [org.apa.kaf.cli.con.int.ConsumerCoordinator] 
(Camel (camel-1) thread #0 - KafkaConsumer[avro-events]) [Consumer 
clientId=consumer-5919fa66-4bf2-47b2-b227-3d02769f3225-2, 
groupId=5919fa66-4bf2-47b2-b227-3d02769f3225] Setting offset for partition 
avro-events-0 to the committed offset FetchPosition{offset=914, 
offsetEpoch=Optional.empty, 
currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 0 rack: 
null)], epoch=0}}
   2020-10-27 11:06:27,816 INFO  [org.apa.kaf.cli.con.int.ConsumerCoordinator] 
(Camel (camel-1) thread #0 - KafkaConsumer[avro-events]) [Consumer 
clientId=consumer-5919fa66-4bf2-47b2-b227-3d02769f3225-2, 
groupId=5919fa66-4bf2-47b2-b227-3d02769f3225] Revoke previously assigned 
partitions avro-events-0
   2020-10-27 11:06:27,816 INFO  [org.apa.kaf.cli.con.int.AbstractCoordinator] 
(Camel (camel-1) thread #0 - KafkaConsumer[avro-events]) [Consumer 
clientId=consumer-5919fa66-4bf2-47b2-b227-3d02769f3225-2, 
groupId=5919fa66-4bf2-47b2-b227-3d02769f3225] Member 
consumer-5919fa66-4bf2-47b2-b227-3d02769f3225-2-e4c3ddd6-e9da-4b80-bbf5-1631a3f08c9f
 sending LeaveGroup request to coordinator localhost:9092 (id: 2147483647 rack: 
null) due to the consumer is being closed
   2020-10-27 11:06:27,823 INFO  [org.apa.kaf.cli.con.ConsumerConfig] (Camel 
(camel-1) thread #0 - KafkaConsumer[avro-events]) ConsumerConfig values: 
           allow.auto.create.topics = true
           auto.commit.interval.ms = 5000
           auto.offset.reset = latest
           bootstrap.servers = [localhost:9092]
           check.crcs = true
           client.dns.lookup = default
           client.id = 
           client.rack = 
           connections.max.idle.ms = 540000
           default.api.timeout.ms = 60000
           enable.auto.commit = true
           exclude.internal.topics = true
           fetch.max.bytes = 52428800
           fetch.max.wait.ms = 500
           fetch.min.bytes = 1
           group.id = 5919fa66-4bf2-47b2-b227-3d02769f3225
           group.instance.id = null
           heartbeat.interval.ms = 3000
           interceptor.classes = []
           internal.leave.group.on.close = true
           isolation.level = read_uncommitted
           key.deserializer = class 
org.apache.kafka.common.serialization.StringDeserializer
           max.partition.fetch.bytes = 1048576
           max.poll.interval.ms = 300000
           max.poll.records = 500
           metadata.max.age.ms = 300000
           metric.reporters = []
           metrics.num.samples = 2
           metrics.recording.level = INFO
           metrics.sample.window.ms = 30000
           partition.assignment.strategy = 
[org.apache.kafka.clients.consumer.RangeAssignor]
           receive.buffer.bytes = 65536
           reconnect.backoff.max.ms = 1000
           reconnect.backoff.ms = 50
           request.timeout.ms = 40000
           retry.backoff.ms = 100
           sasl.client.callback.handler.class = null
           sasl.jaas.config = null
           sasl.kerberos.kinit.cmd = /usr/bin/kinit
           sasl.kerberos.min.time.before.relogin = 60000
           sasl.kerberos.service.name = null
           sasl.kerberos.ticket.renew.jitter = 0.05
           sasl.kerberos.ticket.renew.window.factor = 0.8
           sasl.login.callback.handler.class = null
           sasl.login.class = null
           sasl.login.refresh.buffer.seconds = 300
           sasl.login.refresh.min.period.seconds = 60
           sasl.login.refresh.window.factor = 0.8
           sasl.login.refresh.window.jitter = 0.05
           sasl.mechanism = GSSAPI
           security.protocol = PLAINTEXT
           security.providers = null
           send.buffer.bytes = 131072
           session.timeout.ms = 10000
           ssl.cipher.suites = null
           ssl.enabled.protocols = [TLSv1.2]
           ssl.endpoint.identification.algorithm = https
           ssl.key.password = null
           ssl.keymanager.algorithm = SunX509
           ssl.keystore.location = null
           ssl.keystore.password = null
           ssl.keystore.type = JKS
           ssl.protocol = TLSv1.2
           ssl.provider = null
           ssl.secure.random.implementation = null
           ssl.trustmanager.algorithm = PKIX
           ssl.truststore.location = null
           ssl.truststore.password = null
           ssl.truststore.type = JKS
           value.deserializer = class 
io.confluent.kafka.serializers.KafkaAvroDeserializer
   
   ```
   
   When I consume and log messages without involving schema registry, it works 
fine:
   
           from(kafka("avro-events")
                   .brokers("{{kafka.bootstrap.servers}}")
                   .resolve(getContext()))
   
                   .log("Avro Message: ${body}");
   
   Output: 
   ```
   2020-10-27 10:58:07,699 INFO  [route1] (Camel (camel-1) thread #0 - 
KafkaConsumer[avro-events]) Avro Message: �H786db2c3-c0ee-497e-ba61-dabca64f2729
   2020-10-27 10:58:10,710 INFO  [org.apa.cam.Tracing] (Camel (camel-1) thread 
#0 - KafkaConsumer[avro-events])      [route1      ] [log                       
       ] Exchange[Id: D25D23EC4A3C0CF-000000000000001D, BodyType: String, Body: 
�H38dadb0e-75b3-484c-b499-f61c654512a5]
   2020-10-27 10:58:10,710 INFO  [route1] (Camel (camel-1) thread #0 - 
KafkaConsumer[avro-events]) Avro Message: �H38dadb0e-75b3-484c-b499-f61c654512a5
   2020-10-27 10:58:13,699 INFO  [org.apa.cam.Tracing] (Camel (camel-1) thread 
#0 - KafkaConsumer[avro-events])      [route1      ] [log                       
       ] Exchange[Id: D25D23EC4A3C0CF-000000000000001E, BodyType: String, Body: 
�Ha53e0dfa-e302-4b1e-a5ff-db33bcf6972e]
   2020-10-27 10:58:13,699 INFO  [route1] (Camel (camel-1) thread #0 - 
KafkaConsumer[avro-events]) Avro Message: �Ha53e0dfa-e302-4b1e-a5ff-db33bcf6972e
   2020-10-27 10:58:16,707 INFO  [org.apa.cam.Tracing] (Camel (camel-1) thread 
#0 - KafkaConsumer[avro-events])      [route1      ] [log                       
       ] Exchange[Id: D25D23EC4A3C0CF-000000000000001F, BodyType: String, Body: 
�Hc116e361-7831-4e06-9d1b-759b8bb516f5]
   2020-10-27 10:58:16,707 INFO  [route1] (Camel (camel-1) thread #0 - 
KafkaConsumer[avro-events]) Avro Message: �Hc116e361-7831-4e06-9d1b-759b8bb516f5
   ```
   
   Thanks a lot for your support.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to