Repository: camel
Updated Branches:
  refs/heads/master 753100b4c -> 664637fce


CAMEL-10239: Provide implementation for publisher acknowledgement together with 
basic.return


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/ff96d5b2
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/ff96d5b2
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/ff96d5b2

Branch: refs/heads/master
Commit: ff96d5b22a023119f72d91c598509302a9f77f3a
Parents: 753100b
Author: Florian Gessner <flo.gess...@gmail.com>
Authored: Thu Aug 11 20:54:48 2016 +0200
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Fri Aug 12 08:49:22 2016 +0200

----------------------------------------------------------------------
 .../component/rabbitmq/RabbitMQEndpoint.java    | 24 +++++++++--
 .../rabbitmq/RabbitMQMessagePublisher.java      | 33 ++++++++++++---
 .../rabbitmq/RabbitMQProducerIntTest.java       | 42 +++++++++++++++++++-
 3 files changed, 89 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/ff96d5b2/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
----------------------------------------------------------------------
diff --git 
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
 
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
index bf40766..cfd9a06 100644
--- 
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
+++ 
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
@@ -150,6 +150,8 @@ public class RabbitMQEndpoint extends DefaultEndpoint 
implements AsyncEndpoint {
     private boolean publisherAcknowledgements;
     @UriParam(label = "producer")
     private long publisherAcknowledgementsTimeout;
+    @UriParam(label = "producer")
+    private boolean guaranteedDeliveries;
     // camel-jms supports this setting but it is not currently configurable in 
camel-rabbitmq
     private boolean useMessageIDAsCorrelationID = true;
     // camel-jms supports this setting but it is not currently configurable in 
camel-rabbitmq
@@ -411,7 +413,7 @@ public class RabbitMQEndpoint extends DefaultEndpoint 
implements AsyncEndpoint {
 
     /**
      * If true the queue will not be bound to the exchange after declaring it
-     * @return 
+     * @return
      */
     public boolean isSkipQueueBind() {
         return skipQueueBind;
@@ -420,7 +422,7 @@ public class RabbitMQEndpoint extends DefaultEndpoint 
implements AsyncEndpoint {
     public void setSkipQueueBind(boolean skipQueueBind) {
         this.skipQueueBind = skipQueueBind;
     }
-     
+
     /**
      * This can be used if we need to declare the queue but not the exchange
      */
@@ -799,7 +801,7 @@ public class RabbitMQEndpoint extends DefaultEndpoint 
implements AsyncEndpoint {
     }
 
     /**
-     * When true and an inOut Exchange failed on the consumer side send the 
caused Exception back in the response 
+     * When true and an inOut Exchange failed on the consumer side send the 
caused Exception back in the response
      */
     public void setTransferException(boolean transferException) {
         this.transferException = transferException;
@@ -832,6 +834,22 @@ public class RabbitMQEndpoint extends DefaultEndpoint 
implements AsyncEndpoint {
     }
 
     /**
+     * When true, an exception will be thrown when the message cannot be 
delivered (basic.return) and the message is
+     * marked as mandatory.
+     * PublisherAcknowledgement will also be activated in this case
+     *
+     * See also <a href=https://www.rabbitmq.com/confirms.html";>publisher 
acknowledgements</a> - When will messages be
+     * confirmed?
+     */
+    public boolean isGuaranteedDeliveries() {
+        return guaranteedDeliveries;
+    }
+
+    public void setGuaranteedDeliveries(boolean guaranteedDeliveries) {
+        this.guaranteedDeliveries = guaranteedDeliveries;
+    }
+
+    /**
      * Get replyToType for inOut exchange
      */
     public String getReplyToType() {

http://git-wip-us.apache.org/repos/asf/camel/blob/ff96d5b2/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessagePublisher.java
----------------------------------------------------------------------
diff --git 
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessagePublisher.java
 
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessagePublisher.java
index bc78665..15f69ff 100644
--- 
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessagePublisher.java
+++ 
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessagePublisher.java
@@ -25,6 +25,7 @@ import java.util.concurrent.TimeoutException;
 
 import com.rabbitmq.client.AMQP;
 import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.ReturnListener;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
 import org.apache.camel.NoTypeConversionAvailableException;
@@ -37,6 +38,13 @@ import org.slf4j.LoggerFactory;
  * A method object for publishing to RabbitMQ
  */
 public class RabbitMQMessagePublisher {
+    private static final ReturnListener GUARANTEED_DELIVERY_RETURN_LISTENER = 
new ReturnListener() {
+        @Override
+        public void handleReturn(int replyCode, String replyText, String 
exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) 
throws IOException {
+            throw new RuntimeCamelException("Delivery failed for exchange " + 
exchange + " and routing key " + routingKey + "; replyCode = " + replyCode + " 
replyText = " + replyText);
+        }
+    };
+
     private static final Logger LOG = 
LoggerFactory.getLogger(RabbitMQMessagePublisher.class);
     private final Exchange camelExchange;
     private final Channel channel;
@@ -60,7 +68,7 @@ public class RabbitMQMessagePublisher {
             LOG.debug("Removing the {} header", 
RabbitMQEndpoint.SERIALIZE_HEADER);
             message.getHeaders().remove(RabbitMQEndpoint.SERIALIZE_HEADER);
         }
-        
+
         return message;
     }
 
@@ -86,7 +94,7 @@ public class RabbitMQMessagePublisher {
                 throw new RuntimeCamelException(e);
             }
         }
-        
+
         publishToRabbitMQ(properties, body);
     }
 
@@ -98,17 +106,30 @@ public class RabbitMQMessagePublisher {
 
         LOG.debug("Sending message to exchange: {} with CorrelationId = {}", 
rabbitExchange, properties.getCorrelationId());
 
-        if (endpoint.isPublisherAcknowledgements()) {
+        if (isPublisherAcknowledgements()) {
             channel.confirmSelect();
         }
+        if (endpoint.isGuaranteedDeliveries()) {
+            channel.addReturnListener(GUARANTEED_DELIVERY_RETURN_LISTENER);
 
-        channel.basicPublish(rabbitExchange, routingKey, mandatory, immediate, 
properties, body);
+        }
 
-        if (endpoint.isPublisherAcknowledgements()) {
-            waitForConfirmation();
+        try {
+            channel.basicPublish(rabbitExchange, routingKey, mandatory, 
immediate, properties, body);
+            if (isPublisherAcknowledgements()) {
+                waitForConfirmation();
+            }
+        } finally {
+            if (endpoint.isGuaranteedDeliveries()) {
+                
channel.removeReturnListener(GUARANTEED_DELIVERY_RETURN_LISTENER);
+            }
         }
     }
 
+    private boolean isPublisherAcknowledgements() {
+        return endpoint.isPublisherAcknowledgements() || 
endpoint.isGuaranteedDeliveries();
+    }
+
     private void waitForConfirmation() throws IOException {
         try {
             LOG.debug("Waiting for publisher acknowledgements for {}ms", 
endpoint.getPublisherAcknowledgementsTimeout());

http://git-wip-us.apache.org/repos/asf/camel/blob/ff96d5b2/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerIntTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerIntTest.java
 
b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerIntTest.java
index fded387..5be1be1 100644
--- 
a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerIntTest.java
+++ 
b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerIntTest.java
@@ -29,6 +29,7 @@ import com.rabbitmq.client.DefaultConsumer;
 import com.rabbitmq.client.Envelope;
 import org.apache.camel.Produce;
 import org.apache.camel.ProducerTemplate;
+import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.test.junit4.CamelTestSupport;
 import org.junit.After;
@@ -42,6 +43,9 @@ public class RabbitMQProducerIntTest extends CamelTestSupport 
{
     private static final String BASIC_URI = String.format(BASIC_URI_FORMAT, 
EXCHANGE, ROUTE);
     private static final String PUBLISHER_ACKNOWLEDGES_URI = BASIC_URI + 
"&mandatory=true&publisherAcknowledgements=true";
     private static final String PUBLISHER_ACKNOWLEDGES_BAD_ROUTE_URI = 
String.format(BASIC_URI_FORMAT, EXCHANGE, "route2") + 
"&publisherAcknowledgements=true";
+    private static final String GUARANTEED_DELIVERY_URI = BASIC_URI + 
"&mandatory=true&guaranteedDeliveries=true";
+    private static final String 
GUARANTEED_DELIVERY_BAD_ROUTE_NOT_MANDATORY_URI = 
String.format(BASIC_URI_FORMAT, EXCHANGE, "route2") + 
"&guaranteedDeliveries=true";
+    private static final String GUARANTEED_DELIVERY_BAD_ROUTE_URI = 
String.format(BASIC_URI_FORMAT, EXCHANGE, "route2") + 
"&mandatory=true&guaranteedDeliveries=true";
 
     @Produce(uri = "direct:start")
     protected ProducerTemplate template;
@@ -52,6 +56,15 @@ public class RabbitMQProducerIntTest extends 
CamelTestSupport {
     @Produce(uri = "direct:start-with-confirms-bad-route")
     protected ProducerTemplate templateWithConfirmsAndBadRoute;
 
+    @Produce(uri = "direct:start-with-guaranteed-delivery")
+    protected ProducerTemplate templateWithGuranteedDelivery;
+
+    @Produce(uri = "direct:start-with-guaranteed-delivery-bad-route")
+    protected ProducerTemplate templateWithGuranteedDeliveryAndBadRoute;
+
+    @Produce(uri = 
"direct:start-with-guaranteed-delivery-bad-route-but-not-mandatory")
+    protected ProducerTemplate 
templateWithGuranteedDeliveryBadRouteButNotMandatory;
+
     private Connection connection;
     private Channel channel;
 
@@ -64,6 +77,9 @@ public class RabbitMQProducerIntTest extends CamelTestSupport 
{
                 from("direct:start").to(BASIC_URI);
                 
from("direct:start-with-confirms").to(PUBLISHER_ACKNOWLEDGES_URI);
                 
from("direct:start-with-confirms-bad-route").to(PUBLISHER_ACKNOWLEDGES_BAD_ROUTE_URI);
+                
from("direct:start-with-guaranteed-delivery").to(GUARANTEED_DELIVERY_URI);
+                
from("direct:start-with-guaranteed-delivery-bad-route").to(GUARANTEED_DELIVERY_BAD_ROUTE_URI);
+                
from("direct:start-with-guaranteed-delivery-bad-route-but-not-mandatory").to(GUARANTEED_DELIVERY_BAD_ROUTE_NOT_MANDATORY_URI);
             }
         };
     }
@@ -121,6 +137,31 @@ public class RabbitMQProducerIntTest extends 
CamelTestSupport {
         assertThatBodiesReceivedIn(received);
     }
 
+    @Test
+    public void 
shouldSuccessfullyProduceMessageWhenGuaranteedDeliveryIsActivatedAndMessageIsMarkedAsMandatory()
 throws InterruptedException, IOException, TimeoutException {
+        final List<String> received = new ArrayList<>();
+        channel.basicConsume("sammyq", true, new 
ArrayPopulatingConsumer(received));
+
+        templateWithGuranteedDelivery.sendBodyAndHeader("publisher ack 
message", RabbitMQConstants.EXCHANGE_NAME, "ex1");
+
+        assertThatBodiesReceivedIn(received, "publisher ack message");
+    }
+
+    @Test(expected = RuntimeCamelException.class)
+    public void 
shouldFailIfMessageIsMarkedAsMandatoryAndGuaranteedDeliveryIsActiveButNoQueueIsBound()
 {
+        templateWithGuranteedDeliveryAndBadRoute.sendBody("publish with ack 
and return message");
+    }
+
+    @Test
+    public void 
shouldSuccessfullyProduceMessageWhenGuaranteedDeliveryIsActivatedOnABadRouteButMessageIsNotMandatory()
 throws InterruptedException, IOException, TimeoutException {
+        final List<String> received = new ArrayList<>();
+        channel.basicConsume("sammyq", true, new 
ArrayPopulatingConsumer(received));
+
+        
templateWithGuranteedDeliveryBadRouteButNotMandatory.sendBodyAndHeader("publisher
 ack message", RabbitMQConstants.EXCHANGE_NAME, "ex1");
+
+        assertThatBodiesReceivedIn(received);
+    }
+
     private Connection createTestConnection() throws IOException, 
TimeoutException {
         ConnectionFactory factory = new ConnectionFactory();
         factory.setHost("localhost");
@@ -148,4 +189,3 @@ public class RabbitMQProducerIntTest extends 
CamelTestSupport {
         }
     }
 }
-

Reply via email to