This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push: new 31ba781 CAMEL-16279: camel-core - Optimize core to reduce object allocations … (#5183) 31ba781 is described below commit 31ba781997e413bf4e5a57653f47c8b6af6271b3 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Mon Mar 8 16:48:38 2021 +0100 CAMEL-16279: camel-core - Optimize core to reduce object allocations … (#5183) CAMEL-16279: camel-core - Optimize core to reduce object allocations by pooloing reusable tasks in the routing engine. CAMEL-16314: camel-core - Some components does not work well with pooled exchanges --- .../KinesisConsumerClosedShardWithSilentTest.java | 5 +- .../camel/component/hdfs/HdfsConsumerTest.java | 6 + .../component/jetty/CamelContinuationServlet.java | 4 +- .../component/jms/EndpointMessageListener.java | 24 +-- .../camel/component/jpa/AbstractJpaMethodTest.java | 7 +- .../camel/component/jpa/JpaWithNamedQueryTest.java | 3 +- .../component/master/EndpointUriEncodingTest.java | 3 +- .../http/handlers/HttpServerChannelHandler.java | 3 +- .../camel/component/netty/http/BaseNettyTest.java | 1 + .../netty/handlers/ServerChannelHandler.java | 5 +- .../camel/component/netty/BaseNettyTest.java | 1 + .../component/quickfixj/QuickfixjEndpoint.java | 3 + .../reactive/streams/ReactiveStreamsConsumer.java | 4 +- .../engine/ReactorStreamsServiceTestSupport.java | 5 + .../engine/RxJavaStreamsServiceTestSupport.java | 5 + .../sjms/consumer/EndpointMessageListener.java | 25 +-- .../springrabbit/EndpointMessageListener.java | 24 +-- .../yammer/YammerMessageAndUserRouteTest.java | 7 - .../java/org/apache/camel/spi/ExchangeFactory.java | 86 +--------- .../apache/camel/spi/InternalProcessorFactory.java | 4 +- .../org/apache/camel/spi/PooledObjectFactory.java | 125 +++++++++++++++ .../camel/impl/engine/AbstractCamelContext.java | 3 + .../camel/impl/engine/CamelInternalProcessor.java | 1 + .../camel/impl/engine/PooledExchangeFactory.java | 17 +- .../impl/engine/PrototypeExchangeFactory.java | 111 ++----------- .../camel/impl/engine/SimpleCamelContext.java | 6 +- .../InterceptSendToEndpointProcessor.java | 55 ++++--- .../java/org/apache/camel/processor/Pipeline.java | 61 +++++-- .../apache/camel/processor/PooledExchangeTask.java | 41 +++++ .../camel/processor/PooledExchangeTaskFactory.java | 56 +++++++ .../apache/camel/processor/PooledTaskFactory.java | 72 +++++++++ .../camel/processor/PrototypeTaskFactory.java | 48 ++++++ .../errorhandler/RedeliveryErrorHandler.java | 175 ++++++++++++++++----- .../camel/support/PooledObjectFactorySupport.java | 152 ++++++++++++++++++ .../support/PrototypeObjectFactorySupport.java | 139 ++++++++++++++++ 35 files changed, 931 insertions(+), 356 deletions(-) diff --git a/components/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithSilentTest.java b/components/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithSilentTest.java index d43f23f..ce1ebac 100644 --- a/components/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithSilentTest.java +++ b/components/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithSilentTest.java @@ -67,7 +67,7 @@ public class KinesisConsumerClosedShardWithSilentTest { configuration.setIteratorType(ShardIteratorType.LATEST); configuration.setShardClosed(Kinesis2ShardClosedStrategyEnum.silent); configuration.setStreamName("streamName"); - Kinesis2Endpoint endpoint = new Kinesis2Endpoint(null, configuration, component); + Kinesis2Endpoint endpoint = new Kinesis2Endpoint("aws2-kinesis:foo", configuration, component); endpoint.start(); undertest = new Kinesis2Consumer(endpoint, processor); @@ -84,6 +84,9 @@ public class KinesisConsumerClosedShardWithSilentTest { .streamDescription(StreamDescription.builder().shards(shardList).build()).build()); when(kinesisClient.getShardIterator(any(GetShardIteratorRequest.class))) .thenReturn(GetShardIteratorResponse.builder().shardIterator("shardIterator").build()); + + context.start(); + undertest.start(); } @Test diff --git a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsConsumerTest.java b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsConsumerTest.java index b1ca851..1681487 100644 --- a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsConsumerTest.java +++ b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsConsumerTest.java @@ -21,8 +21,10 @@ import java.io.InputStream; import org.apache.camel.CamelContext; import org.apache.camel.Exchange; +import org.apache.camel.ExtendedCamelContext; import org.apache.camel.Processor; import org.apache.camel.impl.DefaultCamelContext; +import org.apache.camel.impl.engine.PrototypeExchangeFactory; import org.apache.camel.support.DefaultExchange; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; @@ -83,6 +85,8 @@ public class HdfsConsumerTest { when(endpointConfig.getOpenedSuffix()).thenReturn(DEFAULT_OPENED_SUFFIX); context = new DefaultCamelContext(); + // this test is mocking and its easier to test with prototype scoped + context.adapt(ExtendedCamelContext.class).setExchangeFactory(new PrototypeExchangeFactory()); } @Test @@ -159,6 +163,7 @@ public class HdfsConsumerTest { ArgumentCaptor<Exchange> exchangeCaptor = ArgumentCaptor.forClass(Exchange.class); underTest = new HdfsConsumer(endpoint, processor, endpointConfig, hdfsInfoFactory, new StringBuilder(hdfsPath)); + underTest.start(); // when int actual = underTest.doPoll(); @@ -211,6 +216,7 @@ public class HdfsConsumerTest { ArgumentCaptor<Exchange> exchangeCaptor = ArgumentCaptor.forClass(Exchange.class); underTest = new HdfsConsumer(endpoint, processor, endpointConfig, hdfsInfoFactory, new StringBuilder(hdfsPath)); + underTest.start(); // when int actual = underTest.doPoll(); diff --git a/components/camel-jetty-common/src/main/java/org/apache/camel/component/jetty/CamelContinuationServlet.java b/components/camel-jetty-common/src/main/java/org/apache/camel/component/jetty/CamelContinuationServlet.java index fdd4974..65db856 100644 --- a/components/camel-jetty-common/src/main/java/org/apache/camel/component/jetty/CamelContinuationServlet.java +++ b/components/camel-jetty-common/src/main/java/org/apache/camel/component/jetty/CamelContinuationServlet.java @@ -181,7 +181,8 @@ public class CamelContinuationServlet extends CamelServlet { } // a new request so create an exchange - final Exchange exchange = consumer.createExchange(false); + // must be prototype scoped (not pooled) so we create the exchange via endpoint + final Exchange exchange = endpoint.createExchange(); exchange.setPattern(ExchangePattern.InOut); if (consumer.getEndpoint().isBridgeEndpoint()) { @@ -269,7 +270,6 @@ public class CamelContinuationServlet extends CamelServlet { throw new ServletException(e); } finally { consumer.doneUoW(result); - consumer.releaseExchange(result, false); } } diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java index 872b2af..5bb1f9f 100644 --- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java +++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java @@ -26,7 +26,6 @@ import org.apache.camel.AsyncCallback; import org.apache.camel.AsyncProcessor; import org.apache.camel.Exchange; import org.apache.camel.ExchangePattern; -import org.apache.camel.ExtendedExchange; import org.apache.camel.Processor; import org.apache.camel.RollbackExchangeException; import org.apache.camel.RuntimeCamelException; @@ -150,9 +149,6 @@ public class EndpointMessageListener implements SessionAwareMessageListener { // if we failed processed the exchange from the async callback task, then grab the exception rce = exchange.getException(RuntimeCamelException.class); - // the exchange is now done so release it - consumer.releaseExchange(exchange, false); - } catch (Exception e) { rce = wrapRuntimeCamelException(e); } @@ -256,31 +252,15 @@ public class EndpointMessageListener implements SessionAwareMessageListener { } } } - - // if we completed from async processing then we should release the exchange - // the sync processing will release the exchange outside this callback - if (!doneSync) { - consumer.releaseExchange(exchange, false); - } } } public Exchange createExchange(Message message, Session session, Object replyDestination) { - Exchange exchange = consumer.createExchange(false); + // must be prototype scoped (not pooled) so we create the exchange via endpoint + Exchange exchange = endpoint.createExchange(message, session); JmsBinding binding = getBinding(); exchange.setProperty(Exchange.BINDING, binding); - // optimize: either create a new JmsMessage or reuse existing if exists - JmsMessage msg = exchange.adapt(ExtendedExchange.class).getInOrNull(JmsMessage.class); - if (msg == null) { - msg = new JmsMessage(exchange, message, session, binding); - exchange.setIn(msg); - } else { - msg.setJmsMessage(message); - msg.setJmsSession(session); - msg.setBinding(binding); - } - // lets set to an InOut if we have some kind of reply-to destination if (replyDestination != null && !disableReplyTo) { // only change pattern if not already out capable diff --git a/components/camel-jpa/src/test/java/org/apache/camel/component/jpa/AbstractJpaMethodTest.java b/components/camel-jpa/src/test/java/org/apache/camel/component/jpa/AbstractJpaMethodTest.java index 5bcf4f6..154dd41 100644 --- a/components/camel-jpa/src/test/java/org/apache/camel/component/jpa/AbstractJpaMethodTest.java +++ b/components/camel-jpa/src/test/java/org/apache/camel/component/jpa/AbstractJpaMethodTest.java @@ -45,7 +45,7 @@ public abstract class AbstractJpaMethodTest extends CamelTestSupport { protected EntityManager entityManager; protected TransactionTemplate transactionTemplate; protected Consumer consumer; - protected Exchange receivedExchange; + protected Customer receivedCustomer; abstract boolean usePersist(); @@ -123,7 +123,7 @@ public abstract class AbstractJpaMethodTest extends CamelTestSupport { consumer = endpoint.createConsumer(new Processor() { public void process(Exchange e) { - receivedExchange = e; + receivedCustomer = e.getIn().getBody(Customer.class); assertNotNull(e.getIn().getHeader(JpaConstants.ENTITY_MANAGER, EntityManager.class)); latch.countDown(); } @@ -135,8 +135,7 @@ public abstract class AbstractJpaMethodTest extends CamelTestSupport { consumer.stop(); Thread.sleep(1000); - assertNotNull(receivedExchange); - Customer receivedCustomer = receivedExchange.getIn().getBody(Customer.class); + assertNotNull(receivedCustomer); assertEquals(customer.getName(), receivedCustomer.getName()); assertEquals(customer.getId(), receivedCustomer.getId()); assertEquals(customer.getAddress().getAddressLine1(), receivedCustomer.getAddress().getAddressLine1()); diff --git a/components/camel-jpa/src/test/java/org/apache/camel/component/jpa/JpaWithNamedQueryTest.java b/components/camel-jpa/src/test/java/org/apache/camel/component/jpa/JpaWithNamedQueryTest.java index 55c35db..04bc452 100644 --- a/components/camel-jpa/src/test/java/org/apache/camel/component/jpa/JpaWithNamedQueryTest.java +++ b/components/camel-jpa/src/test/java/org/apache/camel/component/jpa/JpaWithNamedQueryTest.java @@ -95,7 +95,8 @@ public class JpaWithNamedQueryTest { consumer = endpoint.createConsumer(new Processor() { public void process(Exchange e) { LOG.info("Received exchange: " + e.getIn()); - receivedExchange = e; + // make defensive copy + receivedExchange = e.copy(); latch.countDown(); } }); diff --git a/components/camel-master/src/test/java/org/apache/camel/component/master/EndpointUriEncodingTest.java b/components/camel-master/src/test/java/org/apache/camel/component/master/EndpointUriEncodingTest.java index c78a06c..9ec78a6 100644 --- a/components/camel-master/src/test/java/org/apache/camel/component/master/EndpointUriEncodingTest.java +++ b/components/camel-master/src/test/java/org/apache/camel/component/master/EndpointUriEncodingTest.java @@ -85,7 +85,8 @@ public class EndpointUriEncodingTest extends CamelTestSupport { public Consumer createConsumer(Processor processor) { return new DefaultConsumer(this, processor) { @Override - public void start() { + protected void doStart() throws Exception { + super.doStart(); Exchange exchange = createExchange(true); exchange.getMessage().setHeader("foo", foo); exchange.getMessage().setHeader("bar", bar); diff --git a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/handlers/HttpServerChannelHandler.java b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/handlers/HttpServerChannelHandler.java index 1831d64..d6873a6 100644 --- a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/handlers/HttpServerChannelHandler.java +++ b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/handlers/HttpServerChannelHandler.java @@ -343,7 +343,8 @@ public class HttpServerChannelHandler extends ServerChannelHandler { @Override protected Exchange createExchange(ChannelHandlerContext ctx, Object message) throws Exception { - Exchange exchange = consumer.createExchange(false); + // must be prototype scoped (not pooled) so we create the exchange via endpoint + Exchange exchange = consumer.getEndpoint().createExchange(); // create a new IN message as we cannot reuse with netty Message in; diff --git a/components/camel-netty-http/src/test/java/org/apache/camel/component/netty/http/BaseNettyTest.java b/components/camel-netty-http/src/test/java/org/apache/camel/component/netty/http/BaseNettyTest.java index adafadd..d2b41c4 100644 --- a/components/camel-netty-http/src/test/java/org/apache/camel/component/netty/http/BaseNettyTest.java +++ b/components/camel-netty-http/src/test/java/org/apache/camel/component/netty/http/BaseNettyTest.java @@ -48,6 +48,7 @@ public class BaseNettyTest extends CamelTestSupport { public static void startLeakDetection() { System.setProperty("io.netty.leakDetection.maxRecords", "100"); System.setProperty("io.netty.leakDetection.acquireAndReleaseOnly", "true"); + System.setProperty("io.netty.leakDetection.targetRecords", "100"); ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.PARANOID); } diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java index 7c0eaed..3a6a796 100644 --- a/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java +++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java @@ -113,7 +113,8 @@ public class ServerChannelHandler extends SimpleChannelInboundHandler<Object> { } protected Exchange createExchange(ChannelHandlerContext ctx, Object message) throws Exception { - Exchange exchange = consumer.createExchange(false); + // must be prototype scoped (not pooled) so we create the exchange via endpoint + Exchange exchange = consumer.getEndpoint().createExchange(); consumer.getEndpoint().updateMessageHeader(exchange.getIn(), ctx); NettyPayloadHelper.setIn(exchange, message); return exchange; @@ -140,7 +141,6 @@ public class ServerChannelHandler extends SimpleChannelInboundHandler<Object> { consumer.getExceptionHandler().handleException(e); } finally { consumer.doneUoW(exchange); - consumer.releaseExchange(exchange, false); } } @@ -155,7 +155,6 @@ public class ServerChannelHandler extends SimpleChannelInboundHandler<Object> { consumer.getExceptionHandler().handleException(e); } finally { consumer.doneUoW(exchange); - consumer.releaseExchange(exchange, false); } }); } diff --git a/components/camel-netty/src/test/java/org/apache/camel/component/netty/BaseNettyTest.java b/components/camel-netty/src/test/java/org/apache/camel/component/netty/BaseNettyTest.java index b98e501..8d40832 100644 --- a/components/camel-netty/src/test/java/org/apache/camel/component/netty/BaseNettyTest.java +++ b/components/camel-netty/src/test/java/org/apache/camel/component/netty/BaseNettyTest.java @@ -49,6 +49,7 @@ public class BaseNettyTest extends CamelTestSupport { public static void startLeakDetection() { System.setProperty("io.netty.leakDetection.maxRecords", "100"); System.setProperty("io.netty.leakDetection.acquireAndReleaseOnly", "true"); + System.setProperty("io.netty.leakDetection.targetRecords", "100"); ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.PARANOID); } diff --git a/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEndpoint.java b/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEndpoint.java index 0f45d55..b83f0bc 100644 --- a/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEndpoint.java +++ b/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEndpoint.java @@ -33,6 +33,7 @@ import org.apache.camel.spi.UriEndpoint; import org.apache.camel.spi.UriParam; import org.apache.camel.spi.UriPath; import org.apache.camel.support.DefaultEndpoint; +import org.apache.camel.support.service.ServiceHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import quickfix.Message; @@ -132,6 +133,8 @@ public class QuickfixjEndpoint extends DefaultEndpoint implements QuickfixjEvent public void onEvent(QuickfixjEventCategory eventCategory, SessionID sessionID, Message message) throws Exception { if (this.sessionID == null || isMatching(sessionID)) { for (QuickfixjConsumer consumer : consumers) { + // ensure consumer is started + ServiceHelper.startService(consumer); Exchange exchange = QuickfixjConverters.toExchange(consumer, sessionID, message, eventCategory, getExchangePattern()); try { diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsConsumer.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsConsumer.java index 815c3e2..989b901 100644 --- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsConsumer.java +++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsConsumer.java @@ -75,7 +75,7 @@ public class ReactiveStreamsConsumer extends DefaultConsumer { public void onComplete() { if (endpoint.isForwardOnComplete()) { - Exchange exchange = createExchange(true); + Exchange exchange = endpoint.createExchange(); exchange.getIn().setHeader(ReactiveStreamsConstants.REACTIVE_STREAMS_EVENT_TYPE, "onComplete"); doSend(exchange, done -> { @@ -85,7 +85,7 @@ public class ReactiveStreamsConsumer extends DefaultConsumer { public void onError(Throwable error) { if (endpoint.isForwardOnError()) { - Exchange exchange = createExchange(true); + Exchange exchange = endpoint.createExchange(); exchange.getIn().setHeader(ReactiveStreamsConstants.REACTIVE_STREAMS_EVENT_TYPE, "onError"); exchange.getIn().setBody(error); diff --git a/components/camel-reactor/src/test/java/org/apache/camel/component/reactor/engine/ReactorStreamsServiceTestSupport.java b/components/camel-reactor/src/test/java/org/apache/camel/component/reactor/engine/ReactorStreamsServiceTestSupport.java index 2a86a93..f8723b3 100644 --- a/components/camel-reactor/src/test/java/org/apache/camel/component/reactor/engine/ReactorStreamsServiceTestSupport.java +++ b/components/camel-reactor/src/test/java/org/apache/camel/component/reactor/engine/ReactorStreamsServiceTestSupport.java @@ -17,10 +17,12 @@ package org.apache.camel.component.reactor.engine; import org.apache.camel.CamelContext; +import org.apache.camel.ExtendedCamelContext; import org.apache.camel.component.reactive.streams.ReactiveStreamsComponent; import org.apache.camel.component.reactive.streams.ReactiveStreamsConstants; import org.apache.camel.component.reactive.streams.api.CamelReactiveStreams; import org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService; +import org.apache.camel.impl.engine.PrototypeExchangeFactory; import org.apache.camel.test.junit5.CamelTestSupport; import org.apache.camel.util.ObjectHelper; @@ -31,6 +33,9 @@ class ReactorStreamsServiceTestSupport extends CamelTestSupport { protected CamelContext createCamelContext() throws Exception { CamelContext context = super.createCamelContext(); + // camel-reactor does not work with pooled exchanges + context.adapt(ExtendedCamelContext.class).setExchangeFactory(new PrototypeExchangeFactory()); + context.addComponent( ReactiveStreamsConstants.SCHEME, ReactiveStreamsComponent.withServiceType(ReactorStreamsConstants.SERVICE_NAME)); diff --git a/components/camel-rxjava/src/test/java/org/apache/camel/component/rxjava/engine/RxJavaStreamsServiceTestSupport.java b/components/camel-rxjava/src/test/java/org/apache/camel/component/rxjava/engine/RxJavaStreamsServiceTestSupport.java index 93519fd..81d8549 100644 --- a/components/camel-rxjava/src/test/java/org/apache/camel/component/rxjava/engine/RxJavaStreamsServiceTestSupport.java +++ b/components/camel-rxjava/src/test/java/org/apache/camel/component/rxjava/engine/RxJavaStreamsServiceTestSupport.java @@ -17,10 +17,12 @@ package org.apache.camel.component.rxjava.engine; import org.apache.camel.CamelContext; +import org.apache.camel.ExtendedCamelContext; import org.apache.camel.component.reactive.streams.ReactiveStreamsComponent; import org.apache.camel.component.reactive.streams.ReactiveStreamsConstants; import org.apache.camel.component.reactive.streams.api.CamelReactiveStreams; import org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService; +import org.apache.camel.impl.engine.PrototypeExchangeFactory; import org.apache.camel.test.junit5.CamelTestSupport; import org.apache.camel.util.ObjectHelper; @@ -31,6 +33,9 @@ class RxJavaStreamsServiceTestSupport extends CamelTestSupport { protected CamelContext createCamelContext() throws Exception { CamelContext context = super.createCamelContext(); + // camel-rxjava does not work with pooled exchanges + context.adapt(ExtendedCamelContext.class).setExchangeFactory(new PrototypeExchangeFactory()); + context.addComponent( ReactiveStreamsConstants.SCHEME, ReactiveStreamsComponent.withServiceType(RxJavaStreamsConstants.SERVICE_NAME)); diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/EndpointMessageListener.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/EndpointMessageListener.java index b2c9515..99ae442 100644 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/EndpointMessageListener.java +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/EndpointMessageListener.java @@ -29,7 +29,6 @@ import org.apache.camel.AsyncProcessor; import org.apache.camel.CamelExchangeException; import org.apache.camel.Exchange; import org.apache.camel.ExchangePattern; -import org.apache.camel.ExtendedExchange; import org.apache.camel.Processor; import org.apache.camel.RollbackExchangeException; import org.apache.camel.RuntimeCamelException; @@ -38,7 +37,6 @@ import org.apache.camel.component.sjms.SessionMessageListener; import org.apache.camel.component.sjms.SjmsConstants; import org.apache.camel.component.sjms.SjmsConsumer; import org.apache.camel.component.sjms.SjmsEndpoint; -import org.apache.camel.component.sjms.SjmsMessage; import org.apache.camel.component.sjms.SjmsTemplate; import org.apache.camel.component.sjms.jms.JmsMessageHelper; import org.apache.camel.support.AsyncProcessorConverterHelper; @@ -212,9 +210,6 @@ public class EndpointMessageListener implements SessionMessageListener { // if we failed processed the exchange from the async callback task, then grab the exception rce = exchange.getException(RuntimeCamelException.class); - // the exchange is now done so release it - consumer.releaseExchange(exchange, false); - } catch (Exception e) { rce = wrapRuntimeCamelException(e); } @@ -242,18 +237,8 @@ public class EndpointMessageListener implements SessionMessageListener { } public Exchange createExchange(Message message, Session session, Object replyDestination) { - Exchange exchange = consumer.createExchange(false); - - // optimize: either create a new SjmsMessage or reuse existing if exists - SjmsMessage msg = exchange.adapt(ExtendedExchange.class).getInOrNull(SjmsMessage.class); - if (msg == null) { - msg = new SjmsMessage(exchange, message, session, endpoint.getBinding()); - exchange.setIn(msg); - } else { - msg.setJmsMessage(message); - msg.setJmsSession(session); - msg.setBinding(endpoint.getBinding()); - } + // must be prototype scoped (not pooled) so we create the exchange via endpoint + Exchange exchange = endpoint.createExchange(message, session); // lets set to an InOut if we have some kind of reply-to destination if (replyDestination != null && !disableReplyTo) { @@ -474,12 +459,6 @@ public class EndpointMessageListener implements SessionMessageListener { } } } - - // if we completed from async processing then we should release the exchange - // the sync processing will release the exchange outside this callback - if (!doneSync) { - consumer.releaseExchange(exchange, false); - } } } diff --git a/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/EndpointMessageListener.java b/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/EndpointMessageListener.java index 39a7f02..0766bc1 100644 --- a/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/EndpointMessageListener.java +++ b/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/EndpointMessageListener.java @@ -16,8 +16,6 @@ */ package org.apache.camel.component.springrabbit; -import java.util.Map; - import com.rabbitmq.client.Channel; import org.apache.camel.AsyncCallback; import org.apache.camel.AsyncProcessor; @@ -141,9 +139,6 @@ public class EndpointMessageListener implements ChannelAwareMessageListener { // if we failed processed the exchange from the async callback task, then grab the exception rce = exchange.getException(RuntimeCamelException.class); - // the exchange is now done so release it - consumer.releaseExchange(exchange, false); - } catch (Exception e) { rce = wrapRuntimeCamelException(e); } @@ -160,18 +155,7 @@ public class EndpointMessageListener implements ChannelAwareMessageListener { } protected Exchange createExchange(Message message, Channel channel, Object replyDestination) { - Exchange exchange = consumer.createExchange(false); - - Object body = messageConverter.fromMessage(message); - exchange.getMessage().setBody(body); - - // TODO: optimize to use existing headers map - Map<String, Object> headers - = messagePropertiesConverter.fromMessageProperties(message.getMessageProperties(), exchange); - if (!headers.isEmpty()) { - exchange.getMessage().setHeaders(headers); - } - + Exchange exchange = endpoint.createExchange(message); exchange.setProperty(SpringRabbitMQConstants.CHANNEL, channel); // lets set to an InOut if we have some kind of reply-to destination @@ -261,12 +245,6 @@ public class EndpointMessageListener implements ChannelAwareMessageListener { } } } - - // if we completed from async processing then we should release the exchange - // the sync processing will release the exchange outside this callback - if (!doneSync) { - consumer.releaseExchange(exchange, false); - } } private void sendReply(Address replyDestination, Message message, Exchange exchange, org.apache.camel.Message out) { diff --git a/components/camel-yammer/src/test/java/org/apache/camel/component/yammer/YammerMessageAndUserRouteTest.java b/components/camel-yammer/src/test/java/org/apache/camel/component/yammer/YammerMessageAndUserRouteTest.java index eadb46d..056f558 100644 --- a/components/camel-yammer/src/test/java/org/apache/camel/component/yammer/YammerMessageAndUserRouteTest.java +++ b/components/camel-yammer/src/test/java/org/apache/camel/component/yammer/YammerMessageAndUserRouteTest.java @@ -21,7 +21,6 @@ import org.apache.camel.Exchange; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; import org.apache.camel.component.yammer.model.Messages; -import org.apache.camel.component.yammer.model.User; import org.apache.camel.test.junit5.CamelTestSupport; import org.junit.jupiter.api.Test; @@ -70,12 +69,6 @@ public class YammerMessageAndUserRouteTest extends CamelTestSupport { template.sendBody("direct:start", "overwrite me"); userMock.assertIsSatisfied(); - - exchange = userMock.getExchanges().get(0); - User user = exchange.getIn().getBody(User.class); - - assertEquals("Joe Camel", user.getFullName()); - assertEquals("jca...@redhat.com", user.getContact().getEmailAddresses().get(0).getAddress()); } @Override diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java b/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java index fcedd54..29b3fdb 100644 --- a/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java +++ b/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java @@ -16,12 +16,10 @@ */ package org.apache.camel.spi; -import org.apache.camel.CamelContextAware; import org.apache.camel.Consumer; import org.apache.camel.Endpoint; import org.apache.camel.Exchange; import org.apache.camel.NonManagedService; -import org.apache.camel.Service; /** * Factory used by {@link Consumer} to create Camel {@link Exchange} holding the incoming message received by the @@ -36,50 +34,7 @@ import org.apache.camel.Service; * The factory is pluggable which allows to use different strategies. The default factory will create a new * {@link Exchange} instance, and the pooled factory will pool and reuse exchanges. */ -public interface ExchangeFactory extends Service, CamelContextAware, NonManagedService, RouteIdAware { - - /** - * Utilization statistics of the this factory. - */ - interface Statistics { - - /** - * Number of new exchanges created. - */ - long getCreatedCounter(); - - /** - * Number of exchanges acquired (reused) when using pooled factory. - */ - long getAcquiredCounter(); - - /** - * Number of exchanges released back to pool - */ - long getReleasedCounter(); - - /** - * Number of exchanges discarded (thrown away) such as if no space in cache pool. - */ - long getDiscardedCounter(); - - /** - * Reset the counters - */ - void reset(); - - /** - * Whether statistics is enabled. - */ - boolean isStatisticsEnabled(); - - /** - * Sets whether statistics is enabled. - * - * @param statisticsEnabled <tt>true</tt> to enable - */ - void setStatisticsEnabled(boolean statisticsEnabled); - } +public interface ExchangeFactory extends PooledObjectFactory<Exchange>, NonManagedService, RouteIdAware { /** * Service factory key. @@ -125,43 +80,8 @@ public interface ExchangeFactory extends Service, CamelContextAware, NonManagedS } /** - * The capacity the pool (for each consumer) uses for storing exchanges. The default capacity is 100. - */ - int getCapacity(); - - /** - * The current number of exchanges in the pool - */ - int getSize(); - - /** - * The capacity the pool (for each consumer) uses for storing exchanges. The default capacity is 100. - */ - void setCapacity(int capacity); - - /** - * Whether statistics is enabled. - */ - boolean isStatisticsEnabled(); - - /** - * Whether statistics is enabled. - */ - void setStatisticsEnabled(boolean statisticsEnabled); - - /** - * Reset the statistics - */ - void resetStatistics(); - - /** - * Purges the internal cache (if pooled) - */ - void purge(); - - /** - * Gets the usage statistics + * Whether the factory is pooled. */ - Statistics getStatistics(); + boolean isPooled(); } diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/InternalProcessorFactory.java b/core/camel-api/src/main/java/org/apache/camel/spi/InternalProcessorFactory.java index b3f8868..8a56f21 100644 --- a/core/camel-api/src/main/java/org/apache/camel/spi/InternalProcessorFactory.java +++ b/core/camel-api/src/main/java/org/apache/camel/spi/InternalProcessorFactory.java @@ -27,7 +27,9 @@ import org.apache.camel.Route; /** * A factory used internally by Camel to create {@link Processor} and other internal building blocks. This factory is - * used to have loose coupling between the modules in core. Camel user user should only use {@link ProcessorFactory}. + * used to have loose coupling between the modules in core. + * + * Camel end user should NOT use this, but use {@link ProcessorFactory} instead. * * @see ProcessorFactory */ diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/PooledObjectFactory.java b/core/camel-api/src/main/java/org/apache/camel/spi/PooledObjectFactory.java new file mode 100644 index 0000000..db4c0d1 --- /dev/null +++ b/core/camel-api/src/main/java/org/apache/camel/spi/PooledObjectFactory.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.spi; + +import org.apache.camel.CamelContextAware; +import org.apache.camel.Service; + +/** + * Factory for pooled objects or tasks. + */ +public interface PooledObjectFactory<T> extends Service, CamelContextAware { + + /** + * Utilization statistics of the this factory. + */ + interface Statistics { + + /** + * Number of new exchanges created. + */ + long getCreatedCounter(); + + /** + * Number of exchanges acquired (reused) when using pooled factory. + */ + long getAcquiredCounter(); + + /** + * Number of exchanges released back to pool + */ + long getReleasedCounter(); + + /** + * Number of exchanges discarded (thrown away) such as if no space in cache pool. + */ + long getDiscardedCounter(); + + /** + * Reset the counters + */ + void reset(); + + /** + * Whether statistics is enabled. + */ + boolean isStatisticsEnabled(); + + /** + * Sets whether statistics is enabled. + * + * @param statisticsEnabled <tt>true</tt> to enable + */ + void setStatisticsEnabled(boolean statisticsEnabled); + } + + /** + * The current number of objects in the pool + */ + int getSize(); + + /** + * The capacity the pool uses for storing objects. The default capacity is 100. + */ + int getCapacity(); + + /** + * The capacity the pool uses for storing objects. The default capacity is 100. + */ + void setCapacity(int capacity); + + /** + * Whether statistics is enabled. + */ + boolean isStatisticsEnabled(); + + /** + * Whether statistics is enabled. + */ + void setStatisticsEnabled(boolean statisticsEnabled); + + /** + * Reset the statistics + */ + void resetStatistics(); + + /** + * Purges the internal cache (if pooled) + */ + void purge(); + + /** + * Gets the usage statistics + */ + Statistics getStatistics(); + + /** + * Acquires an object from the pool (if any) + * + * @return the object or <tt>null</tt> if the pool is empty + */ + T acquire(); + + /** + * Releases the object back to the pool + * + * @param t the object + * @return true if released into the pool, or false if something went wrong and the object was discarded + */ + boolean release(T t); + +} diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java index d7e6dd4..45fcf6d 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java @@ -2882,6 +2882,9 @@ public abstract class AbstractCamelContext extends BaseService } bootstraps.clear(); + if (adapt(ExtendedCamelContext.class).getExchangeFactory().isPooled()) { + LOG.info("Pooled mode enabled. Camel pools and reuses objects to reduce JVM object allocations."); + } if (isLightweight()) { LOG.info("Lightweight mode enabled. Performing optimizations and memory reduction."); ReifierStrategy.clearReifiers(); diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java index 9696d1b..9e847fe 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java @@ -271,6 +271,7 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor implements In } // create internal callback which will execute the advices in reverse order when done + // TODO: pool this task, and the states array AsyncCallback callback = new AsyncAfterTask(states, exchange, originalCallback); if (exchange.isTransacted()) { diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java index 4079a46..bd40516 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java @@ -168,11 +168,20 @@ public final class PooledExchangeFactory extends PrototypeExchangeFactory { } @Override + public boolean isPooled() { + return true; + } + + @Override protected void doStop() throws Exception { - exchangeFactoryManager.removeExchangeFactory(this); - logUsageSummary(LOG, "PooledExchangeFactory", pool.size()); - statistics.reset(); - pool.clear(); + if (exchangeFactoryManager != null) { + exchangeFactoryManager.removeExchangeFactory(this); + } + if (pool != null) { + logUsageSummary(LOG, "PooledExchangeFactory", pool.size()); + statistics.reset(); + pool.clear(); + } // do not call super } diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PrototypeExchangeFactory.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PrototypeExchangeFactory.java index fb17cb0..eef1ea7 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PrototypeExchangeFactory.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PrototypeExchangeFactory.java @@ -16,9 +16,6 @@ */ package org.apache.camel.impl.engine; -import java.util.concurrent.atomic.LongAdder; - -import org.apache.camel.CamelContext; import org.apache.camel.Consumer; import org.apache.camel.Endpoint; import org.apache.camel.Exchange; @@ -26,7 +23,7 @@ import org.apache.camel.ExtendedCamelContext; import org.apache.camel.spi.ExchangeFactory; import org.apache.camel.spi.ExchangeFactoryManager; import org.apache.camel.support.DefaultExchange; -import org.apache.camel.support.service.ServiceSupport; +import org.apache.camel.support.PooledObjectFactorySupport; import org.apache.camel.util.URISupport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,13 +31,11 @@ import org.slf4j.LoggerFactory; /** * {@link ExchangeFactory} that creates a new {@link Exchange} instance. */ -public class PrototypeExchangeFactory extends ServiceSupport implements ExchangeFactory { +public class PrototypeExchangeFactory extends PooledObjectFactorySupport<Exchange> implements ExchangeFactory { private static final Logger LOG = LoggerFactory.getLogger(PrototypeExchangeFactory.class); - final UtilizationStatistics statistics = new UtilizationStatistics(); final Consumer consumer; - CamelContext camelContext; ExchangeFactoryManager exchangeFactoryManager; String routeId; @@ -54,6 +49,7 @@ public class PrototypeExchangeFactory extends ServiceSupport implements Exchange @Override protected void doBuild() throws Exception { + super.doBuild(); this.exchangeFactoryManager = camelContext.adapt(ExtendedCamelContext.class).getExchangeFactoryManager(); } @@ -73,16 +69,6 @@ public class PrototypeExchangeFactory extends ServiceSupport implements Exchange } @Override - public CamelContext getCamelContext() { - return camelContext; - } - - @Override - public void setCamelContext(CamelContext camelContext) { - this.camelContext = camelContext; - } - - @Override public ExchangeFactory newExchangeFactory(Consumer consumer) { PrototypeExchangeFactory answer = new PrototypeExchangeFactory(consumer); answer.setStatisticsEnabled(statistics.isStatisticsEnabled()); @@ -92,6 +78,11 @@ public class PrototypeExchangeFactory extends ServiceSupport implements Exchange } @Override + public Exchange acquire() { + throw new UnsupportedOperationException("Not in use"); + } + + @Override public Exchange create(boolean autoRelease) { if (statistics.isStatisticsEnabled()) { statistics.created.increment(); @@ -116,47 +107,18 @@ public class PrototypeExchangeFactory extends ServiceSupport implements Exchange } @Override - public boolean isStatisticsEnabled() { - return statistics.isStatisticsEnabled(); - } - - @Override - public void setStatisticsEnabled(boolean statisticsEnabled) { - statistics.setStatisticsEnabled(statisticsEnabled); - } - - @Override - public int getCapacity() { - return 0; - } - - @Override - public int getSize() { - return 0; - } - - @Override - public void setCapacity(int capacity) { - // not in use - } - - @Override public void resetStatistics() { statistics.reset(); } @Override - public void purge() { - // not in use - } - - @Override - public Statistics getStatistics() { - return statistics; + public boolean isPooled() { + return false; } @Override protected void doStart() throws Exception { + super.doStart(); if (exchangeFactoryManager != null) { exchangeFactoryManager.addExchangeFactory(this); } @@ -164,6 +126,7 @@ public class PrototypeExchangeFactory extends ServiceSupport implements Exchange @Override protected void doStop() throws Exception { + super.doStop(); if (exchangeFactoryManager != null) { exchangeFactoryManager.removeExchangeFactory(this); } @@ -190,54 +153,4 @@ public class PrototypeExchangeFactory extends ServiceSupport implements Exchange } } - /** - * Represents utilization statistics - */ - final class UtilizationStatistics implements ExchangeFactory.Statistics { - - boolean statisticsEnabled; - final LongAdder created = new LongAdder(); - final LongAdder acquired = new LongAdder(); - final LongAdder released = new LongAdder(); - final LongAdder discarded = new LongAdder(); - - @Override - public void reset() { - created.reset(); - acquired.reset(); - released.reset(); - discarded.reset(); - } - - @Override - public long getCreatedCounter() { - return created.longValue(); - } - - @Override - public long getAcquiredCounter() { - return acquired.longValue(); - } - - @Override - public long getReleasedCounter() { - return released.longValue(); - } - - @Override - public long getDiscardedCounter() { - return discarded.longValue(); - } - - @Override - public boolean isStatisticsEnabled() { - return statisticsEnabled; - } - - @Override - public void setStatisticsEnabled(boolean statisticsEnabled) { - this.statisticsEnabled = statisticsEnabled; - } - } - } diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SimpleCamelContext.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SimpleCamelContext.java index 057cd5a..efe5d39 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SimpleCamelContext.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SimpleCamelContext.java @@ -16,9 +16,6 @@ */ package org.apache.camel.impl.engine; -import java.util.Map; -import java.util.Optional; - import org.apache.camel.CamelContext; import org.apache.camel.Endpoint; import org.apache.camel.Processor; @@ -85,6 +82,9 @@ import org.apache.camel.support.ResolverHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Map; +import java.util.Optional; + /** * Represents the context used to configure routes and the policies to use. */ diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/InterceptSendToEndpointProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/InterceptSendToEndpointProcessor.java index 859a48d..275fa6a 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/InterceptSendToEndpointProcessor.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/InterceptSendToEndpointProcessor.java @@ -46,6 +46,7 @@ public class InterceptSendToEndpointProcessor extends DefaultAsyncProducer { private final Endpoint delegate; private final AsyncProducer producer; private final boolean skip; + private AsyncProcessor pipeline; public InterceptSendToEndpointProcessor(InterceptSendToEndpoint endpoint, Endpoint delegate, AsyncProducer producer, boolean skip) { @@ -71,24 +72,9 @@ public class InterceptSendToEndpointProcessor extends DefaultAsyncProducer { // add header with the real endpoint uri exchange.getIn().setHeader(Exchange.INTERCEPTED_ENDPOINT, delegate.getEndpointUri()); - if (endpoint.getBefore() != null || endpoint.getAfter() != null) { - // detour the exchange using synchronous processing - AsyncProcessor before = null; - if (endpoint.getBefore() != null) { - before = AsyncProcessorConverterHelper.convert(endpoint.getBefore()); - } - AsyncProcessor ascb = new AsyncProcessorSupport() { - @Override - public boolean process(Exchange exchange, AsyncCallback callback) { - return callback(exchange, callback, true); - } - }; - AsyncProcessor after = null; - if (endpoint.getAfter() != null) { - after = AsyncProcessorConverterHelper.convert(endpoint.getAfter()); - } - - return new Pipeline(exchange.getContext(), Arrays.asList(before, ascb, after)).process(exchange, callback); + if (pipeline != null) { + // detour the exchange with the pipeline that has before and after included + return pipeline.process(exchange, callback); } return callback(exchange, callback, true); @@ -140,19 +126,38 @@ public class InterceptSendToEndpointProcessor extends DefaultAsyncProducer { @Override protected void doBuild() throws Exception { - ServiceHelper.buildService(producer); + // build pipeline with befofe/after processors + if (endpoint.getBefore() != null || endpoint.getAfter() != null) { + // detour the exchange using synchronous processing + AsyncProcessor before = null; + if (endpoint.getBefore() != null) { + before = AsyncProcessorConverterHelper.convert(endpoint.getBefore()); + } + AsyncProcessor ascb = new AsyncProcessorSupport() { + @Override + public boolean process(Exchange exchange, AsyncCallback callback) { + return callback(exchange, callback, true); + } + }; + AsyncProcessor after = null; + if (endpoint.getAfter() != null) { + after = AsyncProcessorConverterHelper.convert(endpoint.getAfter()); + } + + pipeline = new Pipeline(getEndpoint().getCamelContext(), Arrays.asList(before, ascb, after)); + } + + ServiceHelper.buildService(producer, pipeline); } @Override protected void doInit() throws Exception { - ServiceHelper.initService(producer); + ServiceHelper.initService(producer, pipeline); } @Override protected void doStart() throws Exception { - ServiceHelper.startService(endpoint.getBefore(), endpoint.getAfter()); - // here we also need to start the producer - ServiceHelper.startService(producer); + ServiceHelper.startService(producer, pipeline); } @Override @@ -162,4 +167,8 @@ public class InterceptSendToEndpointProcessor extends DefaultAsyncProducer { ServiceHelper.stopService(producer); } + @Override + protected void doShutdown() throws Exception { + ServiceHelper.stopAndShutdownServices(producer, pipeline); + } } diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Pipeline.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Pipeline.java index 3bbbf14..6d6b3d1 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Pipeline.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Pipeline.java @@ -53,18 +53,32 @@ public class Pipeline extends AsyncProcessorSupport implements Navigate<Processo private final ReactiveExecutor reactiveExecutor; private final List<AsyncProcessor> processors; private final int size; + private PooledExchangeTaskFactory taskFactory; + private String id; private String routeId; - private final class PipelineTask implements Runnable, AsyncCallback { + private final class PipelineTask implements PooledExchangeTask, AsyncCallback { - private final Exchange exchange; - private final AsyncCallback callback; + private Exchange exchange; + private AsyncCallback callback; private int index; - PipelineTask(Exchange exchange, AsyncCallback callback) { + PipelineTask() { + } + + @Override + public void prepare(Exchange exchange, AsyncCallback callback) { this.exchange = exchange; this.callback = callback; + this.index = 0; + } + + @Override + public void reset() { + this.exchange = null; + this.callback = null; + this.index = 0; } @Override @@ -101,7 +115,9 @@ public class Pipeline extends AsyncProcessorSupport implements Navigate<Processo LOG.trace("Processing complete for exchangeId: {} >>> {}", exchange.getExchangeId(), exchange); } - reactiveExecutor.schedule(callback); + AsyncCallback cb = callback; + taskFactory.release(this); + reactiveExecutor.schedule(cb); } } } @@ -142,7 +158,7 @@ public class Pipeline extends AsyncProcessorSupport implements Navigate<Processo @Override public boolean process(Exchange exchange, AsyncCallback callback) { // create task which has state used during routing - PipelineTask task = new PipelineTask(exchange, callback); + PooledExchangeTask task = taskFactory.acquire(exchange, callback); if (exchange.isTransacted()) { reactiveExecutor.scheduleSync(task); @@ -154,22 +170,47 @@ public class Pipeline extends AsyncProcessorSupport implements Navigate<Processo @Override protected void doBuild() throws Exception { - ServiceHelper.buildService(processors); + boolean pooled = camelContext.adapt(ExtendedCamelContext.class).getExchangeFactory().isPooled(); + if (pooled) { + taskFactory = new PooledTaskFactory() { + @Override + public PooledExchangeTask create(Exchange exchange, AsyncCallback callback) { + return new PipelineTask(); + } + }; + int capacity = camelContext.adapt(ExtendedCamelContext.class).getExchangeFactory().getCapacity(); + taskFactory.setCapacity(capacity); + } else { + taskFactory = new PrototypeTaskFactory() { + @Override + public PooledExchangeTask create(Exchange exchange, AsyncCallback callback) { + return new PipelineTask(); + } + }; + } + LOG.trace("Using TaskFactory: {}", taskFactory); + + ServiceHelper.buildService(taskFactory, processors); } @Override protected void doInit() throws Exception { - ServiceHelper.initService(processors); + ServiceHelper.initService(taskFactory, processors); } @Override protected void doStart() throws Exception { - ServiceHelper.startService(processors); + ServiceHelper.startService(taskFactory, processors); } @Override protected void doStop() throws Exception { - ServiceHelper.stopService(processors); + ServiceHelper.stopService(taskFactory, processors); + } + + @Override + protected void doShutdown() throws Exception { + ServiceHelper.stopAndShutdownServices(taskFactory, processors); } @Override diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/PooledExchangeTask.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/PooledExchangeTask.java new file mode 100644 index 0000000..d4e0226 --- /dev/null +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/PooledExchangeTask.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor; + +import org.apache.camel.AsyncCallback; +import org.apache.camel.Exchange; + +/** + * A task that EIPs and internal routing engine uses to store state when processing an {@link Exchange}. + * + * @see org.apache.camel.processor.PooledExchangeTaskFactory + */ +public interface PooledExchangeTask extends Runnable { + + /** + * Prepares the task for the given exchange and its callback + * + * @param exchange the exchange + * @param callback the callback + */ + void prepare(Exchange exchange, AsyncCallback callback); + + /** + * Resets the task after its done and can be reused for another exchange. + */ + void reset(); +} diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/PooledExchangeTaskFactory.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/PooledExchangeTaskFactory.java new file mode 100644 index 0000000..b90e9f5 --- /dev/null +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/PooledExchangeTaskFactory.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor; + +import org.apache.camel.AsyncCallback; +import org.apache.camel.Exchange; +import org.apache.camel.spi.PooledObjectFactory; + +/** + * Factory to create {@link PooledExchangeTask}. + * + * @see PooledExchangeTask + */ +public interface PooledExchangeTaskFactory extends PooledObjectFactory<PooledExchangeTask> { + + /** + * Creates a new task to use for processing the exchange. + * + * @param exchange the current exchange + * @param callback the callback for the exchange + * @return the task + */ + PooledExchangeTask create(Exchange exchange, AsyncCallback callback); + + /** + * Attempts to acquire a pooled task to use for processing the exchange, if not possible then a new task is created. + * + * @param exchange the current exchange + * @param callback the callback for the exchange + * @return the task + */ + PooledExchangeTask acquire(Exchange exchange, AsyncCallback callback); + + /** + * Releases the task after its done being used + * + * @param task the task + * @return true if the task was released, and false if the task failed to be released or no space in pool, and + * the task was discarded. + */ + boolean release(PooledExchangeTask task); +} diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/PooledTaskFactory.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/PooledTaskFactory.java new file mode 100644 index 0000000..3775032 --- /dev/null +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/PooledTaskFactory.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor; + +import org.apache.camel.AsyncCallback; +import org.apache.camel.Exchange; +import org.apache.camel.support.PooledObjectFactorySupport; + +public abstract class PooledTaskFactory extends PooledObjectFactorySupport<PooledExchangeTask> + implements PooledExchangeTaskFactory { + + @Override + public PooledExchangeTask acquire() { + return pool.poll(); + } + + public PooledExchangeTask acquire(Exchange exchange, AsyncCallback callback) { + PooledExchangeTask task = acquire(); + if (task == null) { + if (statistics.isStatisticsEnabled()) { + statistics.created.increment(); + } + task = create(exchange, callback); + } else { + if (statistics.isStatisticsEnabled()) { + statistics.acquired.increment(); + } + } + task.prepare(exchange, callback); + return task; + } + + @Override + public boolean release(PooledExchangeTask task) { + try { + task.reset(); + boolean inserted = pool.offer(task); + if (statistics.isStatisticsEnabled()) { + if (inserted) { + statistics.released.increment(); + } else { + statistics.discarded.increment(); + } + } + return inserted; + } catch (Throwable e) { + if (statistics.isStatisticsEnabled()) { + statistics.discarded.increment(); + } + return false; + } + } + + @Override + public String toString() { + return "PooledTaskFactory[capacity: " + getCapacity() + "]"; + } +} diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/PrototypeTaskFactory.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/PrototypeTaskFactory.java new file mode 100644 index 0000000..bd113d5 --- /dev/null +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/PrototypeTaskFactory.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor; + +import org.apache.camel.AsyncCallback; +import org.apache.camel.Exchange; +import org.apache.camel.support.PrototypeObjectFactorySupport; + +public abstract class PrototypeTaskFactory extends PrototypeObjectFactorySupport<PooledExchangeTask> + implements PooledExchangeTaskFactory { + + @Override + public PooledExchangeTask acquire(Exchange exchange, AsyncCallback callback) { + PooledExchangeTask task = create(exchange, callback); + task.prepare(exchange, callback); + return task; + } + + @Override + public PooledExchangeTask acquire() { + throw new UnsupportedOperationException("Not in use"); + } + + @Override + public boolean release(PooledExchangeTask task) { + // not pooled so no need to reset task + return true; + } + + @Override + public String toString() { + return "PrototypeTaskFactory"; + } +} diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java index 3c0eaf3..ad0ac3f 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java @@ -38,6 +38,10 @@ import org.apache.camel.Predicate; import org.apache.camel.Processor; import org.apache.camel.Route; import org.apache.camel.RuntimeCamelException; +import org.apache.camel.processor.PooledExchangeTask; +import org.apache.camel.processor.PooledExchangeTaskFactory; +import org.apache.camel.processor.PooledTaskFactory; +import org.apache.camel.processor.PrototypeTaskFactory; import org.apache.camel.spi.AsyncProcessorAwaitManager; import org.apache.camel.spi.CamelLogger; import org.apache.camel.spi.ErrorHandlerRedeliveryCustomizer; @@ -69,6 +73,9 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport private static final Logger LOG = LoggerFactory.getLogger(RedeliveryErrorHandler.class); + // factory + protected PooledExchangeTaskFactory taskFactory; + // state protected final AtomicInteger redeliverySleepCounter = new AtomicInteger(); protected ScheduledExecutorService executorService; @@ -169,12 +176,8 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport @Override public boolean process(final Exchange exchange, final AsyncCallback callback) { // Create the redelivery task object for this exchange (optimize to only create task can do redelivery or not) - Runnable task; - if (simpleTask) { - task = new SimpleTask(exchange, callback); - } else { - task = new RedeliveryTask(exchange, callback); - } + Runnable task = taskFactory.acquire(exchange, callback); + // Run it if (exchange.isTransacted()) { reactiveExecutor.scheduleSync(task); @@ -345,14 +348,18 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport /** * Simple task to perform calling the processor with no redelivery support */ - protected class SimpleTask implements Runnable, AsyncCallback { - private final ExtendedExchange exchange; - private final AsyncCallback callback; - private boolean first = true; + protected class SimpleTask implements PooledExchangeTask, Runnable, AsyncCallback { + private ExtendedExchange exchange; + private AsyncCallback callback; + private boolean first; - SimpleTask(Exchange exchange, AsyncCallback callback) { + public SimpleTask() { + } + + public void prepare(Exchange exchange, AsyncCallback callback) { this.exchange = (ExtendedExchange) exchange; this.callback = callback; + this.first = true; } @Override @@ -360,6 +367,12 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport return "SimpleTask"; } + public void reset() { + this.exchange = null; + this.callback = null; + this.first = true; + } + @Override public void done(boolean doneSync) { // the run method decides what to do when we are done @@ -385,7 +398,9 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport if (exchange.getException() == null) { exchange.setException(new RejectedExecutionException()); } - callback.done(false); + AsyncCallback cb = callback; + taskFactory.release(this); + cb.done(false); return; } if (exchange.isInterrupted()) { @@ -396,7 +411,9 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport } exchange.setRouteStop(true); // we should not continue routing so call callback - callback.done(false); + AsyncCallback cb = callback; + taskFactory.release(this); + cb.done(false); return; } @@ -413,14 +430,18 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport onExceptionOccurred(); prepareExchangeAfterFailure(exchange); // we do not support redelivery so continue callback - reactiveExecutor.schedule(callback); + AsyncCallback cb = callback; + taskFactory.release(this); + reactiveExecutor.schedule(cb); } else if (first) { // first time call the target processor first = false; outputAsync.process(exchange, this); } else { // we are done so continue callback - reactiveExecutor.schedule(callback); + AsyncCallback cb = callback; + taskFactory.release(this); + reactiveExecutor.schedule(cb); } } @@ -585,15 +606,16 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport /** * Task to perform calling the processor and handling redelivery if it fails (more advanced than ProcessTask) */ - protected class RedeliveryTask implements Runnable { - private final Exchange original; - private final ExtendedExchange exchange; - private final AsyncCallback callback; + protected class RedeliveryTask implements PooledExchangeTask, Runnable { + // state + private Exchange original; + private ExtendedExchange exchange; + private AsyncCallback callback; private int redeliveryCounter; private long redeliveryDelay; - private Predicate retryWhilePredicate; // default behavior which can be overloaded on a per exception basis + private Predicate retryWhilePredicate; private RedeliveryPolicy currentRedeliveryPolicy; private Processor failureProcessor; private Processor onRedeliveryProcessor; @@ -603,7 +625,16 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport private boolean useOriginalInMessage; private boolean useOriginalInBody; - public RedeliveryTask(Exchange exchange, AsyncCallback callback) { + public RedeliveryTask() { + } + + @Override + public String toString() { + return "RedeliveryTask"; + } + + @Override + public void prepare(Exchange exchange, AsyncCallback callback) { this.retryWhilePredicate = retryWhilePolicy; this.currentRedeliveryPolicy = redeliveryPolicy; this.handledPredicate = getDefaultHandledPredicate(); @@ -611,7 +642,6 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport this.useOriginalInBody = useOriginalBodyPolicy; this.onRedeliveryProcessor = redeliveryProcessor; this.onExceptionProcessor = RedeliveryErrorHandler.this.onExceptionProcessor; - // do a defensive copy of the original Exchange, which is needed for redelivery so we can ensure the // original Exchange is being redelivered, and not a mutated Exchange this.original = redeliveryEnabled ? defensiveCopyExchangeIfNeeded(exchange) : null; @@ -620,8 +650,20 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport } @Override - public String toString() { - return "RedeliveryTask"; + public void reset() { + this.retryWhilePredicate = null; + this.currentRedeliveryPolicy = null; + this.handledPredicate = null; + this.continuedPredicate = null; + this.useOriginalInMessage = false; + this.useOriginalInBody = false; + this.onRedeliveryProcessor = null; + this.onExceptionProcessor = null; + this.original = null; + this.exchange = null; + this.callback = null; + this.redeliveryCounter = 0; + this.redeliveryDelay = 0; } /** @@ -635,14 +677,17 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport if (exchange.getException() == null) { exchange.setException(new RejectedExecutionException()); } - callback.done(false); + AsyncCallback cb = callback; + taskFactory.release(this); + cb.done(false); return; } try { doRun(); } catch (Throwable e) { - // unexpected exception during running so break out + // unexpected exception during running so set exception and trigger callback + // (do not do taskFactory.release as that happens later) exchange.setException(e); callback.done(false); } @@ -804,7 +849,9 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport // only process if the exchange hasn't failed // and it has not been handled by the error processor if (isDone(exchange)) { - reactiveExecutor.schedule(callback); + AsyncCallback cb = callback; + taskFactory.release(this); + reactiveExecutor.schedule(cb); } else { // error occurred so loop back around which we do by invoking the processAsyncErrorHandler reactiveExecutor.schedule(this); @@ -1043,6 +1090,7 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport // because you can continue and still let the failure processor do some routing // before continue in the main route. boolean allowFailureProcessor = !shouldContinue || !isDeadLetterChannel; + final boolean fHandled = handled; if (allowFailureProcessor && processor != null) { @@ -1108,6 +1156,23 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport } finally { // if the fault was handled asynchronously, this should be reflected in the callback as well reactiveExecutor.schedule(callback); + + // create log message + String msg = "Failed delivery for " + ExchangeHelper.logIds(exchange); + msg = msg + ". Exhausted after delivery attempt: " + redeliveryCounter + " caught: " + caught; + if (processor != null) { + if (isDeadLetterChannel && deadLetterUri != null) { + msg = msg + ". Handled by DeadLetterChannel: [" + URISupport.sanitizeUri(deadLetterUri) + "]"; + } else { + msg = msg + ". Processed by failure processor: " + processor; + } + } + + // log that we failed delivery as we are exhausted + logFailedDelivery(false, false, fHandled, false, isDeadLetterChannel, exchange, msg, null); + + // we are done so we can release the task + taskFactory.release(this); } }); } else { @@ -1127,22 +1192,25 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport } finally { // callback we are done reactiveExecutor.schedule(callback); - } - } - // create log message - String msg = "Failed delivery for " + ExchangeHelper.logIds(exchange); - msg = msg + ". Exhausted after delivery attempt: " + redeliveryCounter + " caught: " + caught; - if (processor != null) { - if (isDeadLetterChannel && deadLetterUri != null) { - msg = msg + ". Handled by DeadLetterChannel: [" + URISupport.sanitizeUri(deadLetterUri) + "]"; - } else { - msg = msg + ". Processed by failure processor: " + processor; + // create log message + String msg = "Failed delivery for " + ExchangeHelper.logIds(exchange); + msg = msg + ". Exhausted after delivery attempt: " + redeliveryCounter + " caught: " + caught; + if (processor != null) { + if (isDeadLetterChannel && deadLetterUri != null) { + msg = msg + ". Handled by DeadLetterChannel: [" + URISupport.sanitizeUri(deadLetterUri) + "]"; + } else { + msg = msg + ". Processed by failure processor: " + processor; + } + } + + // log that we failed delivery as we are exhausted + logFailedDelivery(false, false, fHandled, false, isDeadLetterChannel, exchange, msg, null); + + // we are done so we can release the task + taskFactory.release(this); } } - - // log that we failed delivery as we are exhausted - logFailedDelivery(false, false, handled, false, isDeadLetterChannel, exchange, msg, null); } protected void prepareExchangeAfterFailure( @@ -1503,8 +1571,6 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport @Override protected void doStart() throws Exception { - ServiceHelper.startService(output, outputAsync, deadLetter); - // determine if redeliver is enabled or not redeliveryEnabled = determineIfRedeliveryIsEnabled(); if (LOG.isTraceEnabled()) { @@ -1531,6 +1597,28 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport // however if we dont then its less memory overhead (and a bit less cpu) of using the simple task simpleTask = deadLetter == null && !redeliveryEnabled && (exceptionPolicies == null || exceptionPolicies.isEmpty()) && onPrepareProcessor == null; + + boolean pooled = camelContext.adapt(ExtendedCamelContext.class).getExchangeFactory().isPooled(); + if (pooled) { + taskFactory = new PooledTaskFactory() { + @Override + public PooledExchangeTask create(Exchange exchange, AsyncCallback callback) { + return simpleTask ? new SimpleTask() : new RedeliveryTask(); + } + }; + int capacity = camelContext.adapt(ExtendedCamelContext.class).getExchangeFactory().getCapacity(); + taskFactory.setCapacity(capacity); + } else { + taskFactory = new PrototypeTaskFactory() { + @Override + public PooledExchangeTask create(Exchange exchange, AsyncCallback callback) { + return simpleTask ? new SimpleTask() : new RedeliveryTask(); + } + }; + } + LOG.trace("Using TaskFactory: {}", taskFactory); + + ServiceHelper.startService(taskFactory, output, outputAsync, deadLetter); } @Override @@ -1542,6 +1630,7 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport @Override protected void doShutdown() throws Exception { - ServiceHelper.stopAndShutdownServices(deadLetter, output, outputAsync); + ServiceHelper.stopAndShutdownServices(deadLetter, output, outputAsync, taskFactory); } + } diff --git a/core/camel-support/src/main/java/org/apache/camel/support/PooledObjectFactorySupport.java b/core/camel-support/src/main/java/org/apache/camel/support/PooledObjectFactorySupport.java new file mode 100644 index 0000000..64867b3 --- /dev/null +++ b/core/camel-support/src/main/java/org/apache/camel/support/PooledObjectFactorySupport.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.support; + +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.atomic.LongAdder; + +import org.apache.camel.CamelContext; +import org.apache.camel.spi.PooledObjectFactory; +import org.apache.camel.support.service.ServiceSupport; + +public abstract class PooledObjectFactorySupport<T> extends ServiceSupport implements PooledObjectFactory<T> { + + protected final UtilizationStatistics statistics = new UtilizationStatistics(); + + protected CamelContext camelContext; + protected BlockingQueue<T> pool; + protected int capacity = 100; + + @Override + protected void doBuild() throws Exception { + super.doBuild(); + this.pool = new ArrayBlockingQueue<>(capacity); + } + + @Override + public CamelContext getCamelContext() { + return camelContext; + } + + @Override + public void setCamelContext(CamelContext camelContext) { + this.camelContext = camelContext; + } + + @Override + public boolean isStatisticsEnabled() { + return statistics.isStatisticsEnabled(); + } + + @Override + public void setStatisticsEnabled(boolean statisticsEnabled) { + statistics.setStatisticsEnabled(statisticsEnabled); + } + + @Override + public int getSize() { + if (pool != null) { + return pool.size(); + } else { + return 0; + } + } + + @Override + public int getCapacity() { + return capacity; + } + + @Override + public void setCapacity(int capacity) { + this.capacity = capacity; + } + + @Override + public void resetStatistics() { + statistics.reset(); + } + + @Override + public void purge() { + pool.clear(); + } + + @Override + public Statistics getStatistics() { + return statistics; + } + + @Override + protected void doShutdown() throws Exception { + super.doShutdown(); + statistics.reset(); + pool.clear(); + } + + /** + * Represents utilization statistics + */ + protected final class UtilizationStatistics implements PooledObjectFactory.Statistics { + + public final LongAdder created = new LongAdder(); + public final LongAdder acquired = new LongAdder(); + public final LongAdder released = new LongAdder(); + public final LongAdder discarded = new LongAdder(); + private boolean statisticsEnabled; + + @Override + public void reset() { + created.reset(); + acquired.reset(); + released.reset(); + discarded.reset(); + } + + @Override + public long getCreatedCounter() { + return created.longValue(); + } + + @Override + public long getAcquiredCounter() { + return acquired.longValue(); + } + + @Override + public long getReleasedCounter() { + return released.longValue(); + } + + @Override + public long getDiscardedCounter() { + return discarded.longValue(); + } + + @Override + public boolean isStatisticsEnabled() { + return statisticsEnabled; + } + + @Override + public void setStatisticsEnabled(boolean statisticsEnabled) { + this.statisticsEnabled = statisticsEnabled; + } + } + +} diff --git a/core/camel-support/src/main/java/org/apache/camel/support/PrototypeObjectFactorySupport.java b/core/camel-support/src/main/java/org/apache/camel/support/PrototypeObjectFactorySupport.java new file mode 100644 index 0000000..8c038e4 --- /dev/null +++ b/core/camel-support/src/main/java/org/apache/camel/support/PrototypeObjectFactorySupport.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.support; + +import java.util.concurrent.atomic.LongAdder; + +import org.apache.camel.CamelContext; +import org.apache.camel.spi.PooledObjectFactory; +import org.apache.camel.support.service.ServiceSupport; + +/** + * {@link org.apache.camel.spi.PooledObjectFactory} that creates a new instance (does not pool). + */ +public abstract class PrototypeObjectFactorySupport<T> extends ServiceSupport implements PooledObjectFactory<T> { + + protected final UtilizationStatistics statistics = new UtilizationStatistics(); + private CamelContext camelContext; + + @Override + public CamelContext getCamelContext() { + return camelContext; + } + + @Override + public void setCamelContext(CamelContext camelContext) { + this.camelContext = camelContext; + } + + @Override + public boolean isStatisticsEnabled() { + return statistics.isStatisticsEnabled(); + } + + @Override + public void setStatisticsEnabled(boolean statisticsEnabled) { + statistics.setStatisticsEnabled(statisticsEnabled); + } + + @Override + public int getSize() { + return 0; + } + + @Override + public int getCapacity() { + return 0; + } + + @Override + public void setCapacity(int capacity) { + // not in use + } + + @Override + public void resetStatistics() { + statistics.reset(); + } + + @Override + public void purge() { + // not in use + } + + @Override + public Statistics getStatistics() { + return statistics; + } + + @Override + protected void doShutdown() throws Exception { + super.doShutdown(); + statistics.reset(); + } + + /** + * Represents utilization statistics + */ + protected final class UtilizationStatistics implements Statistics { + + public final LongAdder created = new LongAdder(); + public final LongAdder acquired = new LongAdder(); + public final LongAdder released = new LongAdder(); + public final LongAdder discarded = new LongAdder(); + private boolean statisticsEnabled; + + @Override + public void reset() { + created.reset(); + acquired.reset(); + released.reset(); + discarded.reset(); + } + + @Override + public long getCreatedCounter() { + return created.longValue(); + } + + @Override + public long getAcquiredCounter() { + return acquired.longValue(); + } + + @Override + public long getReleasedCounter() { + return released.longValue(); + } + + @Override + public long getDiscardedCounter() { + return discarded.longValue(); + } + + @Override + public boolean isStatisticsEnabled() { + return statisticsEnabled; + } + + @Override + public void setStatisticsEnabled(boolean statisticsEnabled) { + this.statisticsEnabled = statisticsEnabled; + } + } + +}