Thylossus opened a new issue #414: URL: https://github.com/apache/camel-kafka-connector/issues/414
Hi, I'm using the [Camel Rabbit MQ Connector](https://camel.apache.org/camel-kafka-connector/latest/connectors/camel-rabbitmq-kafka-source-connector.html) as a source connector for my Kafka cluster. Kafka connect is running in a Docker container built with the following Dockerfile ```Dockerfile FROM openjdk:8-jre-alpine ARG VERSION_APACHE_KAFKA=2.2.1 ARG VERSION_SCALA=2.12 ARG VERSION_APACHE_CAMEL=0.4.0 RUN mkdir -p /kafka/connect/plugins && \ mkdir -p /kafka/connect/app && \ mkdir -p /kafka/connect/config && \ wget https://archive.apache.org/dist/kafka/${VERSION_APACHE_KAFKA}/kafka_${VERSION_SCALA}-${VERSION_APACHE_KAFKA}.tgz -q -O kafka.tgz && \ tar -xzf kafka.tgz -C /kafka/connect/app --strip 1 && \ rm -f kafka.tgz && \ wget https://repo1.maven.org/maven2/org/apache/camel/kafkaconnector/camel-rabbitmq-kafka-connector/${VERSION_APACHE_CAMEL}/camel-rabbitmq-kafka-connector-${VERSION_APACHE_CAMEL}-package.zip -q -O rabbitmq-connector.zip && \ unzip -qq rabbitmq-connector.zip -d /kafka/connect/plugins && \ rm -f rabbitmq-connector.zip && \ apk add -q --no-cache bash ENTRYPOINT [ "/kafka/connect/entrypoint.sh" ] ``` When running this configuration and adding a RabbitMQ connector with the following configuration ```json { "name": "{{ config.connector_name }}", "config": { "connector.class": "org.apache.camel.kafkaconnector.rabbitmq.CamelRabbitmqSourceConnector", "topics": "{{ config.topic }}", "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter", "tasks.max": 1, "camel.component.rabbitmq.hostname": "{{ kafka_connect_rabbitmq_host }}", "camel.component.rabbitmq.portNumber": {{ kafka_connect_rabbitmq_port }}, "camel.component.rabbitmq.username": "{{ kafka_connect_rabbitmq_user }}", "camel.component.rabbitmq.password": "{{ kafka_connect_rabbitmq_password }}", "camel.source.path.exchangeName": "{{ config.exchange }}", "camel.source.endpoint.exchangeType": "topic", "camel.source.endpoint.autoDelete": false, "camel.source.endpoint.queue": "{{ config.queue }}", "camel.source.endpoint.routingKey": "{{ config.routing_key }}" } } ``` the connector task created by this configuration consumes all resources of its assigned CPU core. I've looked at the thread dumps and activated `TRACE` logging and came up with the following conclusion: The [CamelSourceTask](https://github.com/apache/camel-kafka-connector/blob/master/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java) does not seem to properly implement the `poll` method of the abstract [SourceTask](https://github.com/apache/kafka/blob/trunk/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java) class. The JavaDoc for the `poll` method is as follows ([source](https://github.com/apache/kafka/blob/trunk/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java#L46-L60)): > Poll this source task for new records. If no data is currently available, this method > should block but return control to the caller regularly (by returning {@code null}) in > order for the task to transition to the {@code PAUSED} state if requested to do so. > > The task will be {@link #stop() stopped} on a separate thread, and when that happens > this method is expected to unblock, quickly finish up any remaining processing, and > return. Looking at the [implementation](https://github.com/apache/camel-kafka-connector/blob/master/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java#L123-L169) I cannot see how the contract w.r.t. "should block, but return control to the caller regularly" is fulfilled. Since `consumer.receiveNoWait();` is called, exchanges are not read in a blocking manner and the subsequent code is not blocking either. This causes the `execute` method of [WorkerSourceTask](https://github.com/apache/kafka/blob/775f0d484b6fccc3d985a9d53d86d7a3710c0b22/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java#L229-L276) to continuously call the poll method which immediately returns if no exchange is available (due to the `break` statement in the while loop) effectively creating an infinite loop which consumes all CPU resources. I would be grateful if someone could take a look at my analysis. Maybe there is some configuration option that solves my issue, but I was not able to find it. Looking forward to hearing from anyone. Best regards, Tobias P.S.: when `TRACE` logging is activated, the logs immediately show that there is some kind of resource-intensive loop: ``` 2020-09-02T10:57:16.381+0000 [task-thread-deribw-events-0] TRACE WorkerSourceTask - WorkerSourceTask{id=deribw-events-0} Nothing to send to Kafka. Polling source for additional records 2020-09-02T10:57:16.381+0000 [task-thread-deribw-events-0] TRACE WorkerSourceTask - WorkerSourceTask{id=deribw-events-0} About to send 0 records to Kafka 2020-09-02T10:57:16.382+0000 [task-thread-deribw-events-0] TRACE WorkerSourceTask - WorkerSourceTask{id=deribw-events-0} Nothing to send to Kafka. Polling source for additional records 2020-09-02T10:57:16.382+0000 [task-thread-deribw-events-0] TRACE WorkerSourceTask - WorkerSourceTask{id=deribw-events-0} About to send 0 records to Kafka 2020-09-02T10:57:16.382+0000 [task-thread-deribw-events-0] TRACE WorkerSourceTask - WorkerSourceTask{id=deribw-events-0} Nothing to send to Kafka. Polling source for additional records 2020-09-02T10:57:16.382+0000 [task-thread-deribw-events-0] TRACE WorkerSourceTask - WorkerSourceTask{id=deribw-events-0} About to send 0 records to Kafka 2020-09-02T10:57:16.382+0000 [task-thread-deribw-events-0] TRACE WorkerSourceTask - WorkerSourceTask{id=deribw-events-0} Nothing to send to Kafka. Polling source for additional records 2020-09-02T10:57:16.382+0000 [task-thread-deribw-events-0] TRACE WorkerSourceTask - WorkerSourceTask{id=deribw-events-0} About to send 0 records to Kafka ``` The corresponding log statements can be found in the `execute` method of [WorkerSourceTask](https://github.com/apache/kafka/blob/775f0d484b6fccc3d985a9d53d86d7a3710c0b22/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java#L229-L276) ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org