This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch exchange-factory
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/exchange-factory by this push:
     new a69e747  CAMEL-16222: PooledExchangeFactory experiment
a69e747 is described below

commit a69e747a2c42d720c9c447b778720ad58b36aba9
Author: Claus Ibsen <claus.ib...@gmail.com>
AuthorDate: Tue Feb 23 08:37:34 2021 +0100

    CAMEL-16222: PooledExchangeFactory experiment
---
 .../org/apache/camel/component/rabbitmq/RabbitConsumer.java   | 11 ++++++-----
 .../org/apache/camel/component/rabbitmq/RabbitMQConsumer.java |  9 +++++++++
 2 files changed, 15 insertions(+), 5 deletions(-)

diff --git 
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
 
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
index f1bd416..4ffb480 100644
--- 
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
+++ 
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
@@ -69,7 +69,7 @@ class RabbitConsumer extends ServiceSupport implements 
com.rabbitmq.client.Consu
                 lock.acquire();
             }
             // Channel might be open because while we were waiting for the 
lock,
-            // stop() has been succesfully called.
+            // stop() has been successfully called.
             if (!channel.isOpen()) {
                 // we could not open the channel so release the lock
                 if (!consumer.getEndpoint().isAutoAck()) {
@@ -78,12 +78,15 @@ class RabbitConsumer extends ServiceSupport implements 
com.rabbitmq.client.Consu
                 return;
             }
 
+            Exchange exchange = consumer.createExchange(envelope, properties, 
body);
             try {
-                doHandleDelivery(consumerTag, envelope, properties, body);
+                
consumer.getEndpoint().getMessageConverter().mergeAmqpProperties(exchange, 
properties);
+                doHandleDelivery(exchange, envelope, properties);
             } finally {
                 if (!consumer.getEndpoint().isAutoAck()) {
                     lock.release();
                 }
+                consumer.releaseExchange(exchange, false);
             }
 
         } catch (InterruptedException e) {
@@ -91,10 +94,8 @@ class RabbitConsumer extends ServiceSupport implements 
com.rabbitmq.client.Consu
         }
     }
 
-    public void doHandleDelivery(String consumerTag, Envelope envelope, 
AMQP.BasicProperties properties, byte[] body)
+    public void doHandleDelivery(Exchange exchange, Envelope envelope, 
AMQP.BasicProperties properties)
             throws IOException {
-        Exchange exchange = 
consumer.getEndpoint().createRabbitExchange(envelope, properties, body);
-        
consumer.getEndpoint().getMessageConverter().mergeAmqpProperties(exchange, 
properties);
 
         boolean sendReply = properties.getReplyTo() != null;
         if (sendReply && !exchange.getPattern().isOutCapable()) {
diff --git 
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
 
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
index bda9900..1e85c18 100644
--- 
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
+++ 
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
@@ -24,7 +24,10 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import com.rabbitmq.client.AMQP;
 import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.Envelope;
+import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.Suspendable;
 import org.apache.camel.support.DefaultConsumer;
@@ -126,6 +129,12 @@ public class RabbitMQConsumer extends DefaultConsumer 
implements Suspendable {
         this.consumers.add(consumer);
     }
 
+    public Exchange createExchange(Envelope envelope, AMQP.BasicProperties 
properties, byte[] body) {
+        Exchange exchange = createExchange(false);
+        endpoint.getMessageConverter().populateRabbitExchange(exchange, 
envelope, properties, body, false, endpoint.isAllowMessageBodySerialization());
+        return exchange;
+    }
+
     private synchronized void reconnect() {
         if (startConsumerCallable != null) {
             return;

Reply via email to