This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch camel-3.20.x
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 7add4846938ab86e5e1268d1b23b1cf31b82f6d4
Author: Göran Pöhner <10630407+groundho...@users.noreply.github.com>
AuthorDate: Fri Jan 20 14:37:17 2023 +0100

    Prevent high channel churn in a queue full scenario
    
    In a scenario with a high message publishing rate and a full queue the 
publisher will get "nack" (not acknowledged) as feedback from RabbitMQ.
    
    The method Channel#waitForConfirmsOrDie(long) in 
https://github.com/rabbitmq/rabbitmq-java-client/blob/main/src/main/java/com/rabbitmq/client/impl/ChannelN.java#L241
 will close the channel. This closed channel is then given back to the channel 
pool.
    Next time the channel is taken out of the pool it is detected as closed and 
will be removed. So a new channel needs to be opened and this too will be 
closed directly after usage when the same queue is still full. There will be a 
high rate of opened and closed communication channels and the channel pool gets 
unusable.
    
    This is described as high channel churn 
(https://www.rabbitmq.com/channels.html#high-channel-churn) and can have side 
effects on RabbitMQ (memory and cpu resource usage) and performance too.
    
    It was also discussed with folks from rabbitmq-java-client 
(rabbitmq/rabbitmq-java-client#942) and the usage of 
Channel#waitForConfirmsOrDie(long) is "highly NOT recommended" by them.
    
    This behavior affects ALL versions of the camel-rabbitmq component!
---
 .../component/rabbitmq/RabbitMQMessagePublisher.java | 20 +++++++++++++++++---
 1 file changed, 17 insertions(+), 3 deletions(-)

diff --git 
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessagePublisher.java
 
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessagePublisher.java
index af5a17bd7b6..1ddcdbbb8b9 100644
--- 
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessagePublisher.java
+++ 
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessagePublisher.java
@@ -162,11 +162,25 @@ public class RabbitMQMessagePublisher {
     private void waitForConfirmation() throws IOException {
         try {
             LOG.debug("Waiting for publisher acknowledgements for {}ms", 
endpoint.getPublisherAcknowledgementsTimeout());
-            
channel.waitForConfirmsOrDie(endpoint.getPublisherAcknowledgementsTimeout());
-            if (basicReturnReceived) {
-                throw new RuntimeCamelException("Failed to deliver message; 
basic.return received");
+            // Instead of calling waitForConfirmsOrDie() which is itself using 
the internal waitForConfirms() method
+            // waitForConfirms() is directly used and errors are handled 
exactly like before
+            // with one exception: underlaying channel will not be closed 
anymore when a "nack" is received
+            // This will prevent high-channel-churn in a queue full scenario
+            if 
(!channel.waitForConfirms(endpoint.getPublisherAcknowledgementsTimeout())) {
+                throw new IOException("nacks received");
+            } else {
+                if (basicReturnReceived) {
+                    throw new RuntimeCamelException("Failed to deliver 
message; basic.return received");
+                }
             }
         } catch (InterruptedException | TimeoutException e) {
+            try {
+                // Only close the channel in case of timeout
+                // Because we don't know why timeout happend (Maybe a 
communication problem)
+                channel.close(AMQP.PRECONDITION_FAILED, "TIMEOUT WAITING FOR 
ACK");
+            } catch (Exception ce) {
+              LOG.warn("Caught exception during closing of channel", ce);
+            }            
             LOG.warn("Acknowledgement error for {}", camelExchange);
             throw new RuntimeCamelException(e);
         }

Reply via email to