This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push: new bbdbafe CAMEL-14586: Fixed EIPs with disable producer cache was not working. Added back EmptyProducerCache from 2.x. bbdbafe is described below commit bbdbafe1cf1be83bb47041a0cbd286f1415f3347 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Tue Feb 18 11:21:49 2020 +0100 CAMEL-14586: Fixed EIPs with disable producer cache was not working. Added back EmptyProducerCache from 2.x. --- .../camel/impl/engine/DefaultProducerCache.java | 7 ++- .../camel/impl/engine/EmptyProducerCache.java | 65 ++++++++++++++++++++++ .../java/org/apache/camel/processor/Enricher.java | 10 +++- .../org/apache/camel/processor/RecipientList.java | 10 +++- .../org/apache/camel/processor/RoutingSlip.java | 10 +++- .../camel/processor/SendDynamicProcessor.java | 10 +++- .../camel/impl/engine/EmptyProducerCacheTest.java | 65 ++++++++++++++++++++++ .../camel/processor/RecipientListNoCacheTest.java | 20 ++++++- 8 files changed, 187 insertions(+), 10 deletions(-) diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultProducerCache.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultProducerCache.java index e381e10..a145d5b 100644 --- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultProducerCache.java +++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultProducerCache.java @@ -65,7 +65,12 @@ public class DefaultProducerCache extends ServiceSupport implements ProducerCach this.source = source; this.camelContext = (ExtendedCamelContext) camelContext; this.maxCacheSize = cacheSize <= 0 ? CamelContextHelper.getMaximumCachePoolSize(camelContext) : cacheSize; - this.producers = createServicePool(camelContext, maxCacheSize); + if (cacheSize >= 0) { + this.producers = createServicePool(camelContext, maxCacheSize); + } else { + // no cache then empty + this.producers = null; + } // only if JMX is enabled if (camelContext.getManagementStrategy() != null && camelContext.getManagementStrategy().getManagementAgent() != null) { diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/EmptyProducerCache.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/EmptyProducerCache.java new file mode 100644 index 0000000..e0154be --- /dev/null +++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/EmptyProducerCache.java @@ -0,0 +1,65 @@ +package org.apache.camel.impl.engine; + +import org.apache.camel.AsyncProducer; +import org.apache.camel.CamelContext; +import org.apache.camel.Endpoint; +import org.apache.camel.ExtendedCamelContext; +import org.apache.camel.FailedToCreateProducerException; +import org.apache.camel.support.service.ServiceHelper; + +public class EmptyProducerCache extends DefaultProducerCache { + + private final Object source; + private final ExtendedCamelContext ecc; + + public EmptyProducerCache(Object source, CamelContext camelContext) { + super(source, camelContext, -1); + this.source = source; + this.ecc = camelContext.adapt(ExtendedCamelContext.class); + setExtendedStatistics(false); + } + + @Override + public AsyncProducer acquireProducer(Endpoint endpoint) { + // always create a new producer + AsyncProducer answer; + try { + answer = endpoint.createAsyncProducer(); + boolean startingRoutes = ecc.isSetupRoutes() || ecc.getRouteController().isStartingRoutes(); + if (startingRoutes && answer.isSingleton()) { + // if we are currently starting a route, then add as service and enlist in JMX + // - but do not enlist non-singletons in JMX + // - note addService will also start the service + getCamelContext().addService(answer); + } else { + // must then start service so producer is ready to be used + ServiceHelper.startService(answer); + } + } catch (Exception e) { + throw new FailedToCreateProducerException(endpoint, e); + } + return answer; + } + + @Override + public void releaseProducer(Endpoint endpoint, AsyncProducer producer) { + // stop and shutdown the producer as its not cache or reused + ServiceHelper.stopAndShutdownService(producer); + } + + @Override + public int size() { + return 0; + } + + @Override + public int getCapacity() { + return 0; + } + + @Override + public String toString() { + return "EmptyProducerCache for source: " + source; + } + +} diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/Enricher.java b/core/camel-base/src/main/java/org/apache/camel/processor/Enricher.java index bbbca1b..c038f98 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/Enricher.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/Enricher.java @@ -29,6 +29,7 @@ import org.apache.camel.ExchangePattern; import org.apache.camel.Expression; import org.apache.camel.ExtendedExchange; import org.apache.camel.impl.engine.DefaultProducerCache; +import org.apache.camel.impl.engine.EmptyProducerCache; import org.apache.camel.spi.EndpointUtilizationStatistics; import org.apache.camel.spi.IdAware; import org.apache.camel.spi.ProducerCache; @@ -366,8 +367,13 @@ public class Enricher extends AsyncProcessorSupport implements IdAware, RouteIdA } if (producerCache == null) { - producerCache = new DefaultProducerCache(this, camelContext, cacheSize); - LOG.debug("Enricher {} using ProducerCache with cacheSize={}", this, producerCache.getCapacity()); + if (cacheSize < 0) { + producerCache = new EmptyProducerCache(this, camelContext); + LOG.debug("Enricher {} is not using ProducerCache", this); + } else { + producerCache = new DefaultProducerCache(this, camelContext, cacheSize); + LOG.debug("Enricher {} using ProducerCache with cacheSize={}", this, cacheSize); + } } ServiceHelper.startService(producerCache, aggregationStrategy); diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/RecipientList.java b/core/camel-base/src/main/java/org/apache/camel/processor/RecipientList.java index a0526d1..b6f173f 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/RecipientList.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/RecipientList.java @@ -27,6 +27,7 @@ import org.apache.camel.Exchange; import org.apache.camel.Expression; import org.apache.camel.Processor; import org.apache.camel.impl.engine.DefaultProducerCache; +import org.apache.camel.impl.engine.EmptyProducerCache; import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy; import org.apache.camel.spi.EndpointUtilizationStatistics; import org.apache.camel.spi.IdAware; @@ -197,8 +198,13 @@ public class RecipientList extends AsyncProcessorSupport implements IdAware, Rou @Override protected void doStart() throws Exception { if (producerCache == null) { - producerCache = new DefaultProducerCache(this, camelContext, cacheSize); - LOG.debug("RecipientList {} using ProducerCache with cacheSize={}", this, producerCache.getCapacity()); + if (cacheSize < 0) { + producerCache = new EmptyProducerCache(this, camelContext); + LOG.debug("RecipientList {} is not using ProducerCache", this); + } else { + producerCache = new DefaultProducerCache(this, camelContext, cacheSize); + LOG.debug("RecipientList {} using ProducerCache with cacheSize={}", this, cacheSize); + } } ServiceHelper.startService(aggregationStrategy, producerCache); } diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/RoutingSlip.java b/core/camel-base/src/main/java/org/apache/camel/processor/RoutingSlip.java index d7c8b04..cb31c0c 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/RoutingSlip.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/RoutingSlip.java @@ -28,6 +28,7 @@ import org.apache.camel.FailedToCreateProducerException; import org.apache.camel.Message; import org.apache.camel.Traceable; import org.apache.camel.impl.engine.DefaultProducerCache; +import org.apache.camel.impl.engine.EmptyProducerCache; import org.apache.camel.spi.EndpointUtilizationStatistics; import org.apache.camel.spi.IdAware; import org.apache.camel.spi.ProducerCache; @@ -464,8 +465,13 @@ public class RoutingSlip extends AsyncProcessorSupport implements Traceable, IdA @Override protected void doStart() throws Exception { if (producerCache == null) { - producerCache = new DefaultProducerCache(this, camelContext, cacheSize); - LOG.debug("RoutingSlip {} using ProducerCache with cacheSize={}", this, producerCache.getCapacity()); + if (cacheSize < 0) { + producerCache = new EmptyProducerCache(this, camelContext); + LOG.debug("RoutingSlip {} is not using ProducerCache", this); + } else { + producerCache = new DefaultProducerCache(this, camelContext, cacheSize); + LOG.debug("RoutingSlip {} using ProducerCache with cacheSize={}", this, cacheSize); + } } ServiceHelper.startService(producerCache, errorHandler); diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/SendDynamicProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/SendDynamicProcessor.java index 849d4b9..dabc2f9 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/SendDynamicProcessor.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/SendDynamicProcessor.java @@ -27,6 +27,7 @@ import org.apache.camel.NoTypeConversionAvailableException; import org.apache.camel.Processor; import org.apache.camel.ResolveEndpointFailedException; import org.apache.camel.impl.engine.DefaultProducerCache; +import org.apache.camel.impl.engine.EmptyProducerCache; import org.apache.camel.spi.EndpointUtilizationStatistics; import org.apache.camel.spi.IdAware; import org.apache.camel.spi.ProducerCache; @@ -257,8 +258,13 @@ public class SendDynamicProcessor extends AsyncProcessorSupport implements IdAwa @Override protected void doStart() throws Exception { if (producerCache == null) { - producerCache = new DefaultProducerCache(this, camelContext, cacheSize); - LOG.debug("DynamicSendTo {} using ProducerCache with cacheSize={}", this, producerCache.getCapacity()); + if (cacheSize < 0) { + producerCache = new EmptyProducerCache(this, camelContext); + LOG.debug("DynamicSendTo {} is not using ProducerCache", this); + } else { + producerCache = new DefaultProducerCache(this, camelContext, cacheSize); + LOG.debug("DynamicSendTo {} using ProducerCache with cacheSize={}", this, cacheSize); + } } if (isAllowOptimisedComponents() && uri != null) { diff --git a/core/camel-core/src/test/java/org/apache/camel/impl/engine/EmptyProducerCacheTest.java b/core/camel-core/src/test/java/org/apache/camel/impl/engine/EmptyProducerCacheTest.java new file mode 100644 index 0000000..b4cc9f9 --- /dev/null +++ b/core/camel-core/src/test/java/org/apache/camel/impl/engine/EmptyProducerCacheTest.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.impl.engine; + +import org.apache.camel.AsyncProducer; +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Endpoint; +import org.apache.camel.spi.ProducerCache; +import org.junit.Test; + +public class EmptyProducerCacheTest extends ContextTestSupport { + + @Test + public void testEmptyCache() throws Exception { + ProducerCache cache = new EmptyProducerCache(this, context); + cache.start(); + + assertEquals("Size should be 0", 0, cache.size()); + + // we never cache any producers + Endpoint e = context.getEndpoint("direct:queue:1"); + AsyncProducer p = cache.acquireProducer(e); + + assertEquals("Size should be 0", 0, cache.size()); + + cache.releaseProducer(e, p); + + assertEquals("Size should be 0", 0, cache.size()); + + cache.stop(); + } + + @Test + public void testCacheProducerAcquireAndRelease() throws Exception { + ProducerCache cache = new EmptyProducerCache(this, context); + cache.start(); + + assertEquals("Size should be 0", 0, cache.size()); + + // we never cache any producers + for (int i = 0; i < 1003; i++) { + Endpoint e = context.getEndpoint("direct:queue:" + i); + AsyncProducer p = cache.acquireProducer(e); + cache.releaseProducer(e, p); + } + + assertEquals("Size should be 1000", 0, cache.size()); + cache.stop(); + } + +} \ No newline at end of file diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/RecipientListNoCacheTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/RecipientListNoCacheTest.java index 3b216ae..a65fed8 100644 --- a/core/camel-core/src/test/java/org/apache/camel/processor/RecipientListNoCacheTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/processor/RecipientListNoCacheTest.java @@ -16,9 +16,14 @@ */ package org.apache.camel.processor; +import java.util.List; + import org.apache.camel.ContextTestSupport; +import org.apache.camel.Processor; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.impl.engine.EmptyProducerCache; +import org.apache.camel.util.ReflectionHelper; import org.junit.Test; public class RecipientListNoCacheTest extends ContextTestSupport { @@ -37,6 +42,18 @@ public class RecipientListNoCacheTest extends ContextTestSupport { sendBody("bar"); assertMockEndpointsSatisfied(); + + // make sure its using an empty producer cache as the cache is disabled + List<Processor> list = context.getRoute("route1").filter("foo"); + // the id is set on the pipeline as recipient list is wrapped + Pipeline pipe = (Pipeline) list.get(0); + RecipientList rl = (RecipientList) pipe.next().get(1); + assertNotNull(rl); + assertEquals(-1, rl.getCacheSize()); + + Object pc = ReflectionHelper.getField(rl.getClass().getDeclaredField("producerCache"), rl); + assertNotNull(pc); + assertIsInstanceOf(EmptyProducerCache.class, pc); } protected void sendBody(String body) { @@ -47,7 +64,8 @@ public class RecipientListNoCacheTest extends ContextTestSupport { protected RouteBuilder createRouteBuilder() { return new RouteBuilder() { public void configure() { - from("direct:a").recipientList(header("recipientListHeader").tokenize(",")).cacheSize(-1); + from("direct:a") + .recipientList(header("recipientListHeader").tokenize(",")).cacheSize(-1).id("foo"); } };