This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch camel-3.x in repository https://gitbox.apache.org/repos/asf/camel.git
commit a23faa045fd7ddd15070591db6c5744f19bfc6d4 Author: Göran Pöhner <10630407+groundho...@users.noreply.github.com> AuthorDate: Fri Jan 20 14:50:58 2023 +0100 Prevent high channel churn in a queue full scenario In a scenario with a high message publishing rate and a full queue the publisher will get "nack" (not acknowledged) as feedback from RabbitMQ. The method Channel#waitForConfirmsOrDie(long) in https://github.com/rabbitmq/rabbitmq-java-client/blob/main/src/main/java/com/rabbitmq/client/impl/ChannelN.java#L241 will close the channel. This closed channel is then given back to the channel pool. Next time the channel is taken out of the pool it is detected as closed and will be removed. So a new channel needs to be opened and this too will be closed directly after usage when the same queue is still full. There will be a high rate [...] This is described as high channel churn (https://www.rabbitmq.com/channels.html#high-channel-churn) and can have side effects on RabbitMQ (memory and cpu resource usage) and performance too. It was also discussed with folks from rabbitmq-java-client (rabbitmq/rabbitmq-java-client#942) and the usage of Channel#waitForConfirmsOrDie(long) is "highly NOT recommended" by them. This behavior affects ALL versions of the camel-rabbitmq component! --- .../component/rabbitmq/RabbitMQMessagePublisher.java | 20 +++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessagePublisher.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessagePublisher.java index af5a17bd7b6..8e3ed776393 100644 --- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessagePublisher.java +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessagePublisher.java @@ -162,11 +162,25 @@ public class RabbitMQMessagePublisher { private void waitForConfirmation() throws IOException { try { LOG.debug("Waiting for publisher acknowledgements for {}ms", endpoint.getPublisherAcknowledgementsTimeout()); - channel.waitForConfirmsOrDie(endpoint.getPublisherAcknowledgementsTimeout()); - if (basicReturnReceived) { - throw new RuntimeCamelException("Failed to deliver message; basic.return received"); + // Instead of calling waitForConfirmsOrDie() which is itself using the internal waitForConfirms() method + // waitForConfirms() is directly used and errors are handled exactly like before + // with one exception: underlaying channel will not be closed anymore when a "nack" is received + // This will prevent high-channel-churn in a queue full scenario + if (!channel.waitForConfirms(endpoint.getPublisherAcknowledgementsTimeout())) { + throw new IOException("nacks received"); + } else { + if (basicReturnReceived) { + throw new RuntimeCamelException("Failed to deliver message; basic.return received"); + } } } catch (InterruptedException | TimeoutException e) { + try { + // Only close the channel in case of timeout + // Because we don't know why timeout happend (Maybe a communication problem) + channel.close(AMQP.PRECONDITION_FAILED, "TIMEOUT WAITING FOR ACK"); + } catch (Exception ce) { + LOG.warn("Caught exception during closing of channel", ce); + } LOG.warn("Acknowledgement error for {}", camelExchange); throw new RuntimeCamelException(e); }