Thylossus opened a new issue #414:
URL: https://github.com/apache/camel-kafka-connector/issues/414


   Hi,
   
   I'm using the [Camel Rabbit MQ 
Connector](https://camel.apache.org/camel-kafka-connector/latest/connectors/camel-rabbitmq-kafka-source-connector.html)
 as a source connector for my Kafka cluster. Kafka connect is running in a 
Docker container built with the following Dockerfile
   
   ```Dockerfile
   FROM openjdk:8-jre-alpine
   
   ARG VERSION_APACHE_KAFKA=2.2.1
   ARG VERSION_SCALA=2.12
   ARG VERSION_APACHE_CAMEL=0.4.0
   
   RUN mkdir -p /kafka/connect/plugins && \
       mkdir -p /kafka/connect/app && \
       mkdir -p /kafka/connect/config && \
       wget 
https://archive.apache.org/dist/kafka/${VERSION_APACHE_KAFKA}/kafka_${VERSION_SCALA}-${VERSION_APACHE_KAFKA}.tgz
 -q -O kafka.tgz && \
       tar -xzf kafka.tgz -C /kafka/connect/app --strip 1 && \
       rm -f kafka.tgz && \
       wget 
https://repo1.maven.org/maven2/org/apache/camel/kafkaconnector/camel-rabbitmq-kafka-connector/${VERSION_APACHE_CAMEL}/camel-rabbitmq-kafka-connector-${VERSION_APACHE_CAMEL}-package.zip
 -q -O rabbitmq-connector.zip && \
       unzip -qq rabbitmq-connector.zip -d /kafka/connect/plugins && \
       rm -f rabbitmq-connector.zip && \
       apk add -q --no-cache bash
   
   ENTRYPOINT [ "/kafka/connect/entrypoint.sh" ]
   ```
   
   When running this configuration and adding a RabbitMQ connector with the 
following configuration
   
   ```json
   {
     "name": "{{ config.connector_name }}",
     "config": {
       "connector.class": 
"org.apache.camel.kafkaconnector.rabbitmq.CamelRabbitmqSourceConnector",
       "topics": "{{ config.topic }}",
       "value.converter": 
"org.apache.kafka.connect.converters.ByteArrayConverter",
       "tasks.max": 1,
       "camel.component.rabbitmq.hostname": "{{ kafka_connect_rabbitmq_host }}",
       "camel.component.rabbitmq.portNumber": {{ kafka_connect_rabbitmq_port }},
       "camel.component.rabbitmq.username": "{{ kafka_connect_rabbitmq_user }}",
       "camel.component.rabbitmq.password": "{{ kafka_connect_rabbitmq_password 
}}",
       "camel.source.path.exchangeName": "{{ config.exchange }}",
       "camel.source.endpoint.exchangeType": "topic",
       "camel.source.endpoint.autoDelete": false,
       "camel.source.endpoint.queue": "{{ config.queue }}",
       "camel.source.endpoint.routingKey": "{{ config.routing_key }}"
     }
   }
   ```
   
   the connector task created by this configuration consumes all resources of 
its assigned CPU core.
   
   I've looked at the thread dumps and activated `TRACE` logging and came up 
with the following conclusion:  
   The 
[CamelSourceTask](https://github.com/apache/camel-kafka-connector/blob/master/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java)
 does not seem to properly implement the `poll` method of the abstract 
[SourceTask](https://github.com/apache/kafka/blob/trunk/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java)
 class.
   
   The JavaDoc for the `poll` method is as follows 
([source](https://github.com/apache/kafka/blob/trunk/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java#L46-L60)):
   
   > Poll this source task for new records. If no data is currently available, 
this method
   > should block but return control to the caller regularly (by returning 
{@code null}) in
   > order for the task to transition to the {@code PAUSED} state if requested 
to do so.
   >  
   > The task will be {@link #stop() stopped} on a separate thread, and when 
that happens
   > this method is expected to unblock, quickly finish up any remaining 
processing, and
   > return.
   
   Looking at the 
[implementation](https://github.com/apache/camel-kafka-connector/blob/master/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java#L123-L169)
 I cannot see how the contract w.r.t. "should block, but return control to the 
caller regularly" is fulfilled. Since `consumer.receiveNoWait();` is called, 
exchanges are not read in a blocking manner and the subsequent code is not 
blocking either.
   
   This causes the `execute` method of 
[WorkerSourceTask](https://github.com/apache/kafka/blob/775f0d484b6fccc3d985a9d53d86d7a3710c0b22/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java#L229-L276)
 to continuously call the poll method which immediately returns if no exchange 
is available (due to the `break` statement in the while loop) effectively 
creating an infinite loop which consumes all CPU resources.
   
   I would be grateful if someone could take a look at my analysis. Maybe there 
is some configuration option that solves my issue, but I was not able to find 
it. Looking forward to hearing from anyone.
   
   Best regards,
   
   Tobias
   
   P.S.: when `TRACE` logging is activated, the logs immediately show that 
there is some kind of resource-intensive loop:
   
   ```
   2020-09-02T10:57:16.381+0000 [task-thread-deribw-events-0] TRACE 
WorkerSourceTask - WorkerSourceTask{id=deribw-events-0} Nothing to send to 
Kafka. Polling source for additional records
   2020-09-02T10:57:16.381+0000 [task-thread-deribw-events-0] TRACE 
WorkerSourceTask - WorkerSourceTask{id=deribw-events-0} About to send 0 records 
to Kafka
   2020-09-02T10:57:16.382+0000 [task-thread-deribw-events-0] TRACE 
WorkerSourceTask - WorkerSourceTask{id=deribw-events-0} Nothing to send to 
Kafka. Polling source for additional records
   2020-09-02T10:57:16.382+0000 [task-thread-deribw-events-0] TRACE 
WorkerSourceTask - WorkerSourceTask{id=deribw-events-0} About to send 0 records 
to Kafka
   2020-09-02T10:57:16.382+0000 [task-thread-deribw-events-0] TRACE 
WorkerSourceTask - WorkerSourceTask{id=deribw-events-0} Nothing to send to 
Kafka. Polling source for additional records
   2020-09-02T10:57:16.382+0000 [task-thread-deribw-events-0] TRACE 
WorkerSourceTask - WorkerSourceTask{id=deribw-events-0} About to send 0 records 
to Kafka
   2020-09-02T10:57:16.382+0000 [task-thread-deribw-events-0] TRACE 
WorkerSourceTask - WorkerSourceTask{id=deribw-events-0} Nothing to send to 
Kafka. Polling source for additional records
   2020-09-02T10:57:16.382+0000 [task-thread-deribw-events-0] TRACE 
WorkerSourceTask - WorkerSourceTask{id=deribw-events-0} About to send 0 records 
to Kafka
   ```
   
   The corresponding log statements can be found in the `execute` method of 
[WorkerSourceTask](https://github.com/apache/kafka/blob/775f0d484b6fccc3d985a9d53d86d7a3710c0b22/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java#L229-L276)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to