This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch exchange-factory in repository https://gitbox.apache.org/repos/asf/camel.git
commit 0b8ef854f6399c06b1cfbb04b983ac512bf4c2e7 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Tue Feb 23 14:52:16 2021 +0100 CAMEL-16222: PooledExchangeFactory experiment --- .../springrabbit/EndpointMessageListener.java | 33 +++++++++- ...MQConsumer.java => SpringRabbitMQConsumer.java} | 8 +-- .../springrabbit/SpringRabbitMQEndpoint.java | 2 +- .../RabbitMQConsumerPooledExchangeIntTest.java | 70 ++++++++++++++++++++++ 4 files changed, 106 insertions(+), 7 deletions(-) diff --git a/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/EndpointMessageListener.java b/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/EndpointMessageListener.java index 186bde3..39a7f02 100644 --- a/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/EndpointMessageListener.java +++ b/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/EndpointMessageListener.java @@ -16,6 +16,8 @@ */ package org.apache.camel.component.springrabbit; +import java.util.Map; + import com.rabbitmq.client.Channel; import org.apache.camel.AsyncCallback; import org.apache.camel.AsyncProcessor; @@ -32,6 +34,7 @@ import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener; +import org.springframework.amqp.support.converter.MessageConverter; import static org.apache.camel.RuntimeCamelException.wrapRuntimeCamelException; @@ -39,15 +42,21 @@ public class EndpointMessageListener implements ChannelAwareMessageListener { private static final Logger LOG = LoggerFactory.getLogger(EndpointMessageListener.class); + private final SpringRabbitMQConsumer consumer; private final SpringRabbitMQEndpoint endpoint; private final AsyncProcessor processor; + private final MessagePropertiesConverter messagePropertiesConverter; + private final MessageConverter messageConverter; private RabbitTemplate template; private boolean disableReplyTo; private boolean async; - public EndpointMessageListener(SpringRabbitMQEndpoint endpoint, Processor processor) { + public EndpointMessageListener(SpringRabbitMQConsumer consumer, SpringRabbitMQEndpoint endpoint, Processor processor) { + this.consumer = consumer; this.endpoint = endpoint; this.processor = AsyncProcessorConverterHelper.convert(processor); + this.messagePropertiesConverter = endpoint.getMessagePropertiesConverter(); + this.messageConverter = endpoint.getMessageConverter(); } public boolean isAsync() { @@ -132,6 +141,9 @@ public class EndpointMessageListener implements ChannelAwareMessageListener { // if we failed processed the exchange from the async callback task, then grab the exception rce = exchange.getException(RuntimeCamelException.class); + // the exchange is now done so release it + consumer.releaseExchange(exchange, false); + } catch (Exception e) { rce = wrapRuntimeCamelException(e); } @@ -148,7 +160,18 @@ public class EndpointMessageListener implements ChannelAwareMessageListener { } protected Exchange createExchange(Message message, Channel channel, Object replyDestination) { - Exchange exchange = endpoint.createExchange(message); + Exchange exchange = consumer.createExchange(false); + + Object body = messageConverter.fromMessage(message); + exchange.getMessage().setBody(body); + + // TODO: optimize to use existing headers map + Map<String, Object> headers + = messagePropertiesConverter.fromMessageProperties(message.getMessageProperties(), exchange); + if (!headers.isEmpty()) { + exchange.getMessage().setHeaders(headers); + } + exchange.setProperty(SpringRabbitMQConstants.CHANNEL, channel); // lets set to an InOut if we have some kind of reply-to destination @@ -238,6 +261,12 @@ public class EndpointMessageListener implements ChannelAwareMessageListener { } } } + + // if we completed from async processing then we should release the exchange + // the sync processing will release the exchange outside this callback + if (!doneSync) { + consumer.releaseExchange(exchange, false); + } } private void sendReply(Address replyDestination, Message message, Exchange exchange, org.apache.camel.Message out) { diff --git a/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/RabbitMQConsumer.java b/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/SpringRabbitMQConsumer.java similarity index 94% rename from components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/RabbitMQConsumer.java rename to components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/SpringRabbitMQConsumer.java index 773a861..a1fa041 100644 --- a/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/RabbitMQConsumer.java +++ b/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/SpringRabbitMQConsumer.java @@ -27,15 +27,15 @@ import org.springframework.amqp.rabbit.connection.Connection; import org.springframework.amqp.rabbit.connection.RabbitUtils; import org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer; -public class RabbitMQConsumer extends DefaultConsumer implements Suspendable { +public class SpringRabbitMQConsumer extends DefaultConsumer implements Suspendable { - private static final Logger LOG = LoggerFactory.getLogger(RabbitMQConsumer.class); + private static final Logger LOG = LoggerFactory.getLogger(SpringRabbitMQConsumer.class); private AbstractMessageListenerContainer listenerContainer; private volatile EndpointMessageListener messageListener; private volatile boolean initialized; - public RabbitMQConsumer(Endpoint endpoint, Processor processor, AbstractMessageListenerContainer listenerContainer) { + public SpringRabbitMQConsumer(Endpoint endpoint, Processor processor, AbstractMessageListenerContainer listenerContainer) { super(endpoint, processor); this.listenerContainer = listenerContainer; this.listenerContainer.setMessageListener(getEndpointMessageListener()); @@ -54,7 +54,7 @@ public class RabbitMQConsumer extends DefaultConsumer implements Suspendable { } protected void createMessageListener(SpringRabbitMQEndpoint endpoint, Processor processor) { - messageListener = new EndpointMessageListener(endpoint, processor); + messageListener = new EndpointMessageListener(this, endpoint, processor); endpoint.configureMessageListener(messageListener); } diff --git a/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/SpringRabbitMQEndpoint.java b/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/SpringRabbitMQEndpoint.java index 2c7e1ea..70b441b 100644 --- a/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/SpringRabbitMQEndpoint.java +++ b/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/SpringRabbitMQEndpoint.java @@ -348,7 +348,7 @@ public class SpringRabbitMQEndpoint extends DefaultEndpoint implements AsyncEndp @Override public Consumer createConsumer(Processor processor) throws Exception { AbstractMessageListenerContainer listenerContainer = createMessageListenerContainer(); - RabbitMQConsumer consumer = new RabbitMQConsumer(this, processor, listenerContainer); + SpringRabbitMQConsumer consumer = new SpringRabbitMQConsumer(this, processor, listenerContainer); configureConsumer(consumer); return consumer; } diff --git a/components/camel-spring-rabbitmq/src/test/java/org/apache/camel/component/springrabbit/integration/RabbitMQConsumerPooledExchangeIntTest.java b/components/camel-spring-rabbitmq/src/test/java/org/apache/camel/component/springrabbit/integration/RabbitMQConsumerPooledExchangeIntTest.java new file mode 100644 index 0000000..596547e --- /dev/null +++ b/components/camel-spring-rabbitmq/src/test/java/org/apache/camel/component/springrabbit/integration/RabbitMQConsumerPooledExchangeIntTest.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.springrabbit.integration; + +import java.util.concurrent.TimeUnit; + +import org.apache.camel.CamelContext; +import org.apache.camel.ExtendedCamelContext; +import org.apache.camel.RoutesBuilder; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.impl.engine.PooledExchangeFactory; +import org.junit.jupiter.api.Test; + +public class RabbitMQConsumerPooledExchangeIntTest extends AbstractRabbitMQIntTest { + + @Override + protected CamelContext createCamelContext() throws Exception { + CamelContext camelContext = super.createCamelContext(); + camelContext.adapt(ExtendedCamelContext.class).setExchangeFactory(new PooledExchangeFactory()); + return camelContext; + } + + @Test + public void testConsumer() throws Exception { + getMockEndpoint("mock:result").expectedBodiesReceived("Hello World"); + + template.sendBody("direct:start", "Hello World"); + + assertMockEndpointsSatisfied(30, TimeUnit.SECONDS); + } + + @Test + public void testConsumerTwo() throws Exception { + getMockEndpoint("mock:result").expectedBodiesReceived("Hello World", "Bye World"); + + template.sendBody("direct:start", "Hello World"); + template.sendBody("direct:start", "Bye World"); + + assertMockEndpointsSatisfied(30, TimeUnit.SECONDS); + } + + @Override + protected RoutesBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:start") + .to("spring-rabbitmq:foo"); + + from("spring-rabbitmq:foo") + .to("log:result") + .to("mock:result"); + } + }; + } +}