This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push: new d364875 AutomaticRecovery from RabbitMQ Connection Factory doesn't recover from everything (#4384) d364875 is described below commit d364875e2200e82a0f203cd0d7e130ccced2cbc9 Author: Jonathan Schoreels <jonathan.schore...@gmail.com> AuthorDate: Thu Oct 8 18:46:03 2020 +0200 AutomaticRecovery from RabbitMQ Connection Factory doesn't recover from everything (#4384) * AutomaticRecovery from RabbitMQ Connection Factory doesn't recover from everything and we could just return the same closed connection over and over ... * If the consumer is still started, it won't do anything to call start() again, my solution would be to stop it before * In RPC calls, if we received a message for which the channel goes down, we have to reconnect to it before publishing our response to it, since the current Reconnection mechanism is made only in Producer, not in Publisher * To ensure a retry for failed reply works, we can't remove the headers from the messages during the conversion to AMQP properties. Thus, I use a temporar Map to keep the logic as close as it is, but without touching the Message Headers * Revert back to normal the format --- .../camel/component/rabbitmq/RabbitConsumer.java | 24 +++++++++++-- .../camel/component/rabbitmq/RabbitMQConsumer.java | 6 ++-- .../rabbitmq/RabbitMQMessageConverter.java | 42 +++++++++++----------- .../component/rabbitmq/RabbitMQProducerTest.java | 1 + 4 files changed, 44 insertions(+), 29 deletions(-) diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java index 208bcfe..71e64fa 100644 --- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java @@ -21,6 +21,7 @@ import java.util.concurrent.Semaphore; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.AlreadyClosedException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Consumer; @@ -127,6 +128,19 @@ class RabbitConsumer extends ServiceSupport implements com.rabbitmq.client.Consu if (sendReply && exchange.getPattern().isOutCapable()) { try { consumer.getEndpoint().publishExchangeToChannel(exchange, channel, properties.getReplyTo()); + } catch (AlreadyClosedException alreadyClosedException) { + LOG.warn("Connection or channel closed during reply to exchange {} for correlationId {}. Will reconnect and try again.", exchange.getExchangeId(), properties.getCorrelationId()); + // RPC call could not be responded because channel (or connection has been closed during the processing ... + // will try to reconnect + try { + reconnect(); + LOG.debug("Sending again the reply to exchange {} for correlationId {}", exchange.getExchangeId(), properties.getCorrelationId()); + consumer.getEndpoint().publishExchangeToChannel(exchange, channel, properties.getReplyTo()); + } catch (Exception e) { + LOG.error("Couldn't sending again the reply to exchange {} for correlationId {}", exchange.getExchangeId(), properties.getCorrelationId()); + exchange.setException(e); + consumer.getExceptionHandler().handleException("Error processing exchange", exchange, e); + } } catch (RuntimeCamelException e) { // set the exception on the exchange so it can send the // exception back to the producer @@ -326,9 +340,13 @@ class RabbitConsumer extends ServiceSupport implements com.rabbitmq.client.Consu } else if (channel == null || !isAutomaticRecoveryEnabled()) { LOG.info("Attempting to open a new rabbitMQ channel"); Connection conn = consumer.getConnection(); - channel = openChannel(conn); - // Register the channel to the tag - start(); + try { + stop(); + } finally { + channel = openChannel(conn); + // Register the channel to the tag + start(); + } } } diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java index 3d63079..6441deb 100644 --- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java @@ -74,13 +74,11 @@ public class RabbitMQConsumer extends DefaultConsumer implements Suspendable { * Returns the exiting open connection or opens a new one */ protected synchronized Connection getConnection() throws IOException, TimeoutException { - if (this.conn == null) { + if (this.conn == null || !this.conn.isOpen()) { + LOG.debug("The existing connection is closed or not opened yet."); openConnection(); return this.conn; - } else if (this.conn.isOpen() || (!this.conn.isOpen() && isAutomaticRecoveryEnabled())) { - return this.conn; } else { - LOG.debug("The existing connection is closed"); openConnection(); return this.conn; } diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessageConverter.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessageConverter.java index 83d638f..c261183 100644 --- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessageConverter.java +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessageConverter.java @@ -93,16 +93,11 @@ public class RabbitMQMessageConverter { } public AMQP.BasicProperties.Builder buildProperties(Exchange exchange) { - Message msg; - if (exchange.hasOut()) { - msg = exchange.getOut(); - } else { - msg = exchange.getIn(); - } + Message msg = exchange.getMessage(); AMQP.BasicProperties.Builder properties = buildBasicAmqpProperties(exchange.getProperties(), msg); - final Map<String, Object> headers = msg.getHeaders(); + final Map<String, Object> headers = properties.build().getHeaders(); // Add additional headers (if any) if (additionalHeaders != null) { headers.putAll(additionalHeaders); @@ -149,67 +144,68 @@ public class RabbitMQMessageConverter { Map<String, Object> exchangeProperties, Message msg) { AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties.Builder(); - final Object contentType = getBasicAmqpProperty(exchangeProperties, msg, RabbitMQConstants.CONTENT_TYPE); + final Map<String, Object> headers = new HashMap<>(msg.getHeaders()); // We don't want to mutate the message headers + final Object contentType = getBasicAmqpProperty(exchangeProperties, headers, RabbitMQConstants.CONTENT_TYPE); if (contentType != null) { properties.contentType(contentType.toString()); } - final Object priority = getBasicAmqpProperty(exchangeProperties, msg, RabbitMQConstants.PRIORITY); + final Object priority = getBasicAmqpProperty(exchangeProperties, headers, RabbitMQConstants.PRIORITY); if (priority != null) { properties.priority(Integer.parseInt(priority.toString())); } - final Object messageId = getBasicAmqpProperty(exchangeProperties, msg, RabbitMQConstants.MESSAGE_ID); + final Object messageId = getBasicAmqpProperty(exchangeProperties, headers, RabbitMQConstants.MESSAGE_ID); if (messageId != null) { properties.messageId(messageId.toString()); } - final Object clusterId = getBasicAmqpProperty(exchangeProperties, msg, RabbitMQConstants.CLUSTERID); + final Object clusterId = getBasicAmqpProperty(exchangeProperties, headers, RabbitMQConstants.CLUSTERID); if (clusterId != null) { properties.clusterId(clusterId.toString()); } - final Object replyTo = getBasicAmqpProperty(exchangeProperties, msg, RabbitMQConstants.REPLY_TO); + final Object replyTo = getBasicAmqpProperty(exchangeProperties, headers, RabbitMQConstants.REPLY_TO); if (replyTo != null) { properties.replyTo(replyTo.toString()); } - final Object correlationId = getBasicAmqpProperty(exchangeProperties, msg, RabbitMQConstants.CORRELATIONID); + final Object correlationId = getBasicAmqpProperty(exchangeProperties, headers, RabbitMQConstants.CORRELATIONID); if (correlationId != null) { properties.correlationId(correlationId.toString()); } - final Object deliveryMode = getBasicAmqpProperty(exchangeProperties, msg, RabbitMQConstants.DELIVERY_MODE); + final Object deliveryMode = getBasicAmqpProperty(exchangeProperties, headers, RabbitMQConstants.DELIVERY_MODE); if (deliveryMode != null) { properties.deliveryMode(Integer.parseInt(deliveryMode.toString())); } - final Object userId = getBasicAmqpProperty(exchangeProperties, msg, RabbitMQConstants.USERID); + final Object userId = getBasicAmqpProperty(exchangeProperties, headers, RabbitMQConstants.USERID); if (userId != null) { properties.userId(userId.toString()); } - final Object type = getBasicAmqpProperty(exchangeProperties, msg, RabbitMQConstants.TYPE); + final Object type = getBasicAmqpProperty(exchangeProperties, headers, RabbitMQConstants.TYPE); if (type != null) { properties.type(type.toString()); } - final Object contentEncoding = getBasicAmqpProperty(exchangeProperties, msg, RabbitMQConstants.CONTENT_ENCODING); + final Object contentEncoding = getBasicAmqpProperty(exchangeProperties, headers, RabbitMQConstants.CONTENT_ENCODING); if (contentEncoding != null) { properties.contentEncoding(contentEncoding.toString()); } - final Object expiration = getBasicAmqpProperty(exchangeProperties, msg, RabbitMQConstants.EXPIRATION); + final Object expiration = getBasicAmqpProperty(exchangeProperties, headers, RabbitMQConstants.EXPIRATION); if (expiration != null) { properties.expiration(expiration.toString()); } - final Object appId = getBasicAmqpProperty(exchangeProperties, msg, RabbitMQConstants.APP_ID); + final Object appId = getBasicAmqpProperty(exchangeProperties, headers, RabbitMQConstants.APP_ID); if (appId != null) { properties.appId(appId.toString()); } - final Object timestamp = getBasicAmqpProperty(exchangeProperties, msg, RabbitMQConstants.TIMESTAMP); + final Object timestamp = getBasicAmqpProperty(exchangeProperties, headers, RabbitMQConstants.TIMESTAMP); if (timestamp != null) { properties.timestamp(convertTimestamp(timestamp)); } @@ -222,15 +218,17 @@ public class RabbitMQMessageConverter { LOG.debug("Ignoring non-AMQP basic properties: {}", ignoredProperties); } + properties.headers(headers); + return properties; } private Object getBasicAmqpProperty( - Map<String, Object> exchangeProperties, Message msg, + Map<String, Object> exchangeProperties, Map<String, Object> headers, String propertyKey) { boolean hasAdditionalProps = additionalProperties != null && !additionalProperties .isEmpty(); - Object object = msg.removeHeader(propertyKey); + Object object = headers.remove(propertyKey); if (exchangeProperties.containsKey(propertyKey)) { object = exchangeProperties.get(propertyKey); diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerTest.java index 2d20b12..0bf14df 100644 --- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerTest.java +++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerTest.java @@ -51,6 +51,7 @@ public class RabbitMQProducerTest { RabbitMQMessageConverter converter = new RabbitMQMessageConverter(); converter.setAllowCustomHeaders(true); Mockito.when(exchange.getIn()).thenReturn(message); + Mockito.when(exchange.getMessage()).thenReturn(message); Mockito.when(endpoint.connect(any(ExecutorService.class))).thenReturn(conn); Mockito.when(conn.createChannel()).thenReturn(null); Mockito.when(endpoint.getMessageConverter()).thenReturn(converter);