Repository: camel
Updated Branches:
  refs/heads/camel-2.17.x 199263a1a -> 19f619eee
  refs/heads/master 3da0654ef -> f39b83eeb


Fix CAMEL-10229

Use a semaphore to wait for the message to be processed when
autoAck=false

Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/6c693842
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/6c693842
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/6c693842

Branch: refs/heads/master
Commit: 6c693842b65ac587bc40c8f9dd7cc829d21c82dd
Parents: 3da0654
Author: miti <pric...@textkernel.nl>
Authored: Fri Aug 12 16:41:55 2016 +0200
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Tue Aug 16 16:22:12 2016 +0200

----------------------------------------------------------------------
 .../component/rabbitmq/RabbitConsumer.java      | 28 ++++++++++++++++++--
 1 file changed, 26 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/6c693842/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
 
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
index 21560f8..d143d9b 100644
--- 
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
+++ 
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
@@ -17,6 +17,7 @@
 package org.apache.camel.component.rabbitmq;
 
 import java.io.IOException;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeoutException;
 
 import com.rabbitmq.client.AMQP;
@@ -41,6 +42,8 @@ class RabbitConsumer implements com.rabbitmq.client.Consumer {
     /** Consumer tag for this consumer. */
     private volatile String consumerTag;
     private volatile boolean stopping;
+    
+    private final Semaphore lock = new Semaphore(1);
 
     /**
      * Constructs a new instance and records its association to the passed-in
@@ -56,9 +59,26 @@ class RabbitConsumer implements com.rabbitmq.client.Consumer 
{
             log.warn("Unable to open channel for RabbitMQConsumer. Continuing 
and will try again", e);
         }
     }
-
     @Override
     public void handleDelivery(String consumerTag, Envelope envelope, 
AMQP.BasicProperties properties, byte[] body) throws IOException {
+       try {
+            if (!consumer.getEndpoint().isAutoAck()) {
+               lock.acquire();
+            }
+            doHandleDelivery(consumerTag, envelope, properties, body);
+            if (!consumer.getEndpoint().isAutoAck()) {
+               lock.release();
+            }
+               
+       } catch (InterruptedException e) {
+               log.error("Thread Interrupted!");
+               
+       }
+        
+                       
+    }
+
+    public void doHandleDelivery(String consumerTag, Envelope envelope, 
AMQP.BasicProperties properties, byte[] body) throws IOException {
         Exchange exchange = 
consumer.getEndpoint().createRabbitExchange(envelope, properties, body);
         
consumer.getEndpoint().getMessageConverter().mergeAmqpProperties(exchange, 
properties);
 
@@ -160,12 +180,16 @@ class RabbitConsumer implements 
com.rabbitmq.client.Consumer {
             channel.basicCancel(tag);
         }
         try {
+                       lock.acquire();
             if (isChannelOpen()) {
                 channel.close();
             }
-        } catch (TimeoutException e) {
+            lock.release();
+               } catch (TimeoutException e) {
             log.error("Timeout occured");
             throw e;
+        } catch (InterruptedException e1) {
+               log.error("Thread Interrupted!");
         }
     }
 

Reply via email to