This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch exchange-factory in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/exchange-factory by this push: new a69e747 CAMEL-16222: PooledExchangeFactory experiment a69e747 is described below commit a69e747a2c42d720c9c447b778720ad58b36aba9 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Tue Feb 23 08:37:34 2021 +0100 CAMEL-16222: PooledExchangeFactory experiment --- .../org/apache/camel/component/rabbitmq/RabbitConsumer.java | 11 ++++++----- .../org/apache/camel/component/rabbitmq/RabbitMQConsumer.java | 9 +++++++++ 2 files changed, 15 insertions(+), 5 deletions(-) diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java index f1bd416..4ffb480 100644 --- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java @@ -69,7 +69,7 @@ class RabbitConsumer extends ServiceSupport implements com.rabbitmq.client.Consu lock.acquire(); } // Channel might be open because while we were waiting for the lock, - // stop() has been succesfully called. + // stop() has been successfully called. if (!channel.isOpen()) { // we could not open the channel so release the lock if (!consumer.getEndpoint().isAutoAck()) { @@ -78,12 +78,15 @@ class RabbitConsumer extends ServiceSupport implements com.rabbitmq.client.Consu return; } + Exchange exchange = consumer.createExchange(envelope, properties, body); try { - doHandleDelivery(consumerTag, envelope, properties, body); + consumer.getEndpoint().getMessageConverter().mergeAmqpProperties(exchange, properties); + doHandleDelivery(exchange, envelope, properties); } finally { if (!consumer.getEndpoint().isAutoAck()) { lock.release(); } + consumer.releaseExchange(exchange, false); } } catch (InterruptedException e) { @@ -91,10 +94,8 @@ class RabbitConsumer extends ServiceSupport implements com.rabbitmq.client.Consu } } - public void doHandleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) + public void doHandleDelivery(Exchange exchange, Envelope envelope, AMQP.BasicProperties properties) throws IOException { - Exchange exchange = consumer.getEndpoint().createRabbitExchange(envelope, properties, body); - consumer.getEndpoint().getMessageConverter().mergeAmqpProperties(exchange, properties); boolean sendReply = properties.getReplyTo() != null; if (sendReply && !exchange.getPattern().isOutCapable()) { diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java index bda9900..1e85c18 100644 --- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java @@ -24,7 +24,10 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; +import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Connection; +import com.rabbitmq.client.Envelope; +import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.Suspendable; import org.apache.camel.support.DefaultConsumer; @@ -126,6 +129,12 @@ public class RabbitMQConsumer extends DefaultConsumer implements Suspendable { this.consumers.add(consumer); } + public Exchange createExchange(Envelope envelope, AMQP.BasicProperties properties, byte[] body) { + Exchange exchange = createExchange(false); + endpoint.getMessageConverter().populateRabbitExchange(exchange, envelope, properties, body, false, endpoint.isAllowMessageBodySerialization()); + return exchange; + } + private synchronized void reconnect() { if (startConsumerCallable != null) { return;