This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new d364875  AutomaticRecovery from RabbitMQ Connection Factory doesn't 
recover from everything (#4384)
d364875 is described below

commit d364875e2200e82a0f203cd0d7e130ccced2cbc9
Author: Jonathan Schoreels <jonathan.schore...@gmail.com>
AuthorDate: Thu Oct 8 18:46:03 2020 +0200

    AutomaticRecovery from RabbitMQ Connection Factory doesn't recover from 
everything (#4384)
    
    * AutomaticRecovery from RabbitMQ Connection Factory doesn't recover from 
everything and we could just return the same closed connection over and over ...
    
    * If the consumer is still started, it won't do anything to call start() 
again, my solution would be to stop it before
    
    * In RPC calls, if we received a message for which the channel goes down, 
we have to reconnect to it before publishing our response to it, since the 
current Reconnection mechanism is made only in Producer, not in Publisher
    
    * To ensure a retry for failed reply works, we can't remove the headers 
from the messages during the conversion to AMQP properties. Thus, I use a 
temporar Map to keep the logic as close as it is, but without touching the 
Message Headers
    
    * Revert back to normal the format
---
 .../camel/component/rabbitmq/RabbitConsumer.java   | 24 +++++++++++--
 .../camel/component/rabbitmq/RabbitMQConsumer.java |  6 ++--
 .../rabbitmq/RabbitMQMessageConverter.java         | 42 +++++++++++-----------
 .../component/rabbitmq/RabbitMQProducerTest.java   |  1 +
 4 files changed, 44 insertions(+), 29 deletions(-)

diff --git 
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
 
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
index 208bcfe..71e64fa 100644
--- 
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
+++ 
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
@@ -21,6 +21,7 @@ import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeoutException;
 
 import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.AlreadyClosedException;
 import com.rabbitmq.client.Channel;
 import com.rabbitmq.client.Connection;
 import com.rabbitmq.client.Consumer;
@@ -127,6 +128,19 @@ class RabbitConsumer extends ServiceSupport implements 
com.rabbitmq.client.Consu
             if (sendReply && exchange.getPattern().isOutCapable()) {
                 try {
                     consumer.getEndpoint().publishExchangeToChannel(exchange, 
channel, properties.getReplyTo());
+                } catch (AlreadyClosedException alreadyClosedException) {
+                    LOG.warn("Connection or channel closed during reply to 
exchange {} for correlationId {}. Will reconnect and try again.", 
exchange.getExchangeId(), properties.getCorrelationId());
+                    // RPC call could not be responded because channel (or 
connection has been closed during the processing ...
+                    // will try to reconnect
+                    try {
+                        reconnect();
+                        LOG.debug("Sending again the reply to exchange {} for 
correlationId {}", exchange.getExchangeId(), properties.getCorrelationId());
+                        
consumer.getEndpoint().publishExchangeToChannel(exchange, channel, 
properties.getReplyTo());
+                    } catch (Exception e) {
+                        LOG.error("Couldn't sending again the reply to 
exchange {} for correlationId {}", exchange.getExchangeId(), 
properties.getCorrelationId());
+                        exchange.setException(e);
+                        consumer.getExceptionHandler().handleException("Error 
processing exchange", exchange, e);
+                    }
                 } catch (RuntimeCamelException e) {
                     // set the exception on the exchange so it can send the
                     // exception back to the producer
@@ -326,9 +340,13 @@ class RabbitConsumer extends ServiceSupport implements 
com.rabbitmq.client.Consu
         } else if (channel == null || !isAutomaticRecoveryEnabled()) {
             LOG.info("Attempting to open a new rabbitMQ channel");
             Connection conn = consumer.getConnection();
-            channel = openChannel(conn);
-            // Register the channel to the tag
-            start();
+            try {
+                stop();
+            } finally {
+                channel = openChannel(conn);
+                // Register the channel to the tag
+                start();
+            }
         }
     }
 
diff --git 
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
 
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
index 3d63079..6441deb 100644
--- 
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
+++ 
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
@@ -74,13 +74,11 @@ public class RabbitMQConsumer extends DefaultConsumer 
implements Suspendable {
      * Returns the exiting open connection or opens a new one
      */
     protected synchronized Connection getConnection() throws IOException, 
TimeoutException {
-        if (this.conn == null) {
+        if (this.conn == null || !this.conn.isOpen()) {
+            LOG.debug("The existing connection is closed or not opened yet.");
             openConnection();
             return this.conn;
-        } else if (this.conn.isOpen() || (!this.conn.isOpen() && 
isAutomaticRecoveryEnabled())) {
-            return this.conn;
         } else {
-            LOG.debug("The existing connection is closed");
             openConnection();
             return this.conn;
         }
diff --git 
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessageConverter.java
 
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessageConverter.java
index 83d638f..c261183 100644
--- 
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessageConverter.java
+++ 
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessageConverter.java
@@ -93,16 +93,11 @@ public class RabbitMQMessageConverter {
     }
 
     public AMQP.BasicProperties.Builder buildProperties(Exchange exchange) {
-        Message msg;
-        if (exchange.hasOut()) {
-            msg = exchange.getOut();
-        } else {
-            msg = exchange.getIn();
-        }
+        Message msg = exchange.getMessage();
 
         AMQP.BasicProperties.Builder properties = 
buildBasicAmqpProperties(exchange.getProperties(), msg);
 
-        final Map<String, Object> headers = msg.getHeaders();
+        final Map<String, Object> headers = properties.build().getHeaders();
         // Add additional headers (if any)
         if (additionalHeaders != null) {
             headers.putAll(additionalHeaders);
@@ -149,67 +144,68 @@ public class RabbitMQMessageConverter {
             Map<String, Object> exchangeProperties, Message msg) {
         AMQP.BasicProperties.Builder properties = new 
AMQP.BasicProperties.Builder();
 
-        final Object contentType = getBasicAmqpProperty(exchangeProperties, 
msg, RabbitMQConstants.CONTENT_TYPE);
+        final Map<String, Object> headers = new HashMap<>(msg.getHeaders()); 
// We don't want to mutate the message headers
+        final Object contentType = getBasicAmqpProperty(exchangeProperties, 
headers, RabbitMQConstants.CONTENT_TYPE);
         if (contentType != null) {
             properties.contentType(contentType.toString());
         }
 
-        final Object priority = getBasicAmqpProperty(exchangeProperties, msg, 
RabbitMQConstants.PRIORITY);
+        final Object priority = getBasicAmqpProperty(exchangeProperties, 
headers, RabbitMQConstants.PRIORITY);
         if (priority != null) {
             properties.priority(Integer.parseInt(priority.toString()));
         }
 
-        final Object messageId = getBasicAmqpProperty(exchangeProperties, msg, 
RabbitMQConstants.MESSAGE_ID);
+        final Object messageId = getBasicAmqpProperty(exchangeProperties, 
headers, RabbitMQConstants.MESSAGE_ID);
         if (messageId != null) {
             properties.messageId(messageId.toString());
         }
 
-        final Object clusterId = getBasicAmqpProperty(exchangeProperties, msg, 
RabbitMQConstants.CLUSTERID);
+        final Object clusterId = getBasicAmqpProperty(exchangeProperties, 
headers, RabbitMQConstants.CLUSTERID);
         if (clusterId != null) {
             properties.clusterId(clusterId.toString());
         }
 
-        final Object replyTo = getBasicAmqpProperty(exchangeProperties, msg, 
RabbitMQConstants.REPLY_TO);
+        final Object replyTo = getBasicAmqpProperty(exchangeProperties, 
headers, RabbitMQConstants.REPLY_TO);
         if (replyTo != null) {
             properties.replyTo(replyTo.toString());
         }
 
-        final Object correlationId = getBasicAmqpProperty(exchangeProperties, 
msg, RabbitMQConstants.CORRELATIONID);
+        final Object correlationId = getBasicAmqpProperty(exchangeProperties, 
headers, RabbitMQConstants.CORRELATIONID);
         if (correlationId != null) {
             properties.correlationId(correlationId.toString());
         }
 
-        final Object deliveryMode = getBasicAmqpProperty(exchangeProperties, 
msg, RabbitMQConstants.DELIVERY_MODE);
+        final Object deliveryMode = getBasicAmqpProperty(exchangeProperties, 
headers, RabbitMQConstants.DELIVERY_MODE);
         if (deliveryMode != null) {
             properties.deliveryMode(Integer.parseInt(deliveryMode.toString()));
         }
 
-        final Object userId = getBasicAmqpProperty(exchangeProperties, msg, 
RabbitMQConstants.USERID);
+        final Object userId = getBasicAmqpProperty(exchangeProperties, 
headers, RabbitMQConstants.USERID);
         if (userId != null) {
             properties.userId(userId.toString());
         }
 
-        final Object type = getBasicAmqpProperty(exchangeProperties, msg, 
RabbitMQConstants.TYPE);
+        final Object type = getBasicAmqpProperty(exchangeProperties, headers, 
RabbitMQConstants.TYPE);
         if (type != null) {
             properties.type(type.toString());
         }
 
-        final Object contentEncoding = 
getBasicAmqpProperty(exchangeProperties, msg, 
RabbitMQConstants.CONTENT_ENCODING);
+        final Object contentEncoding = 
getBasicAmqpProperty(exchangeProperties, headers, 
RabbitMQConstants.CONTENT_ENCODING);
         if (contentEncoding != null) {
             properties.contentEncoding(contentEncoding.toString());
         }
 
-        final Object expiration = getBasicAmqpProperty(exchangeProperties, 
msg, RabbitMQConstants.EXPIRATION);
+        final Object expiration = getBasicAmqpProperty(exchangeProperties, 
headers, RabbitMQConstants.EXPIRATION);
         if (expiration != null) {
             properties.expiration(expiration.toString());
         }
 
-        final Object appId = getBasicAmqpProperty(exchangeProperties, msg, 
RabbitMQConstants.APP_ID);
+        final Object appId = getBasicAmqpProperty(exchangeProperties, headers, 
RabbitMQConstants.APP_ID);
         if (appId != null) {
             properties.appId(appId.toString());
         }
 
-        final Object timestamp = getBasicAmqpProperty(exchangeProperties, msg, 
RabbitMQConstants.TIMESTAMP);
+        final Object timestamp = getBasicAmqpProperty(exchangeProperties, 
headers, RabbitMQConstants.TIMESTAMP);
         if (timestamp != null) {
             properties.timestamp(convertTimestamp(timestamp));
         }
@@ -222,15 +218,17 @@ public class RabbitMQMessageConverter {
             LOG.debug("Ignoring non-AMQP basic properties: {}", 
ignoredProperties);
         }
 
+        properties.headers(headers);
+
         return properties;
     }
 
     private Object getBasicAmqpProperty(
-            Map<String, Object> exchangeProperties, Message msg,
+            Map<String, Object> exchangeProperties, Map<String, Object> 
headers,
             String propertyKey) {
         boolean hasAdditionalProps = additionalProperties != null && 
!additionalProperties
                 .isEmpty();
-        Object object = msg.removeHeader(propertyKey);
+        Object object = headers.remove(propertyKey);
 
         if (exchangeProperties.containsKey(propertyKey)) {
             object = exchangeProperties.get(propertyKey);
diff --git 
a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerTest.java
 
b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerTest.java
index 2d20b12..0bf14df 100644
--- 
a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerTest.java
+++ 
b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerTest.java
@@ -51,6 +51,7 @@ public class RabbitMQProducerTest {
         RabbitMQMessageConverter converter = new RabbitMQMessageConverter();
         converter.setAllowCustomHeaders(true);
         Mockito.when(exchange.getIn()).thenReturn(message);
+        Mockito.when(exchange.getMessage()).thenReturn(message);
         
Mockito.when(endpoint.connect(any(ExecutorService.class))).thenReturn(conn);
         Mockito.when(conn.createChannel()).thenReturn(null);
         Mockito.when(endpoint.getMessageConverter()).thenReturn(converter);

Reply via email to