CAMEL-7393: Recipient list and routing slip eip allows to set cache size for producer cache. So you can control this, or turn it off.
Conflicts: camel-core/src/main/java/org/apache/camel/processor/RecipientList.java Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/12483508 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/12483508 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/12483508 Branch: refs/heads/camel-2.12.x Commit: 124835080e8e0e80341a7d7673608a366ba3e26e Parents: 8857bd0 Author: Claus Ibsen <davscl...@apache.org> Authored: Thu Apr 24 18:17:26 2014 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Thu Apr 24 18:19:56 2014 +0200 ---------------------------------------------------------------------- .../apache/camel/impl/EmptyProducerCache.java | 59 +++++++++++++++++++ .../camel/model/RecipientListDefinition.java | 25 +++++++- .../camel/model/RoutingSlipDefinition.java | 28 ++++++++- .../apache/camel/processor/RecipientList.java | 24 +++++++- .../org/apache/camel/processor/RoutingSlip.java | 21 ++++++- .../camel/impl/EmptyProducerCacheTest.java | 61 ++++++++++++++++++++ .../processor/RecipientListNoCacheTest.java | 58 +++++++++++++++++++ .../camel/processor/RoutingSlipNoCacheTest.java | 58 +++++++++++++++++++ 8 files changed, 330 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/12483508/camel-core/src/main/java/org/apache/camel/impl/EmptyProducerCache.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/EmptyProducerCache.java b/camel-core/src/main/java/org/apache/camel/impl/EmptyProducerCache.java new file mode 100644 index 0000000..823bf35 --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/impl/EmptyProducerCache.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.impl; + +import org.apache.camel.CamelContext; +import org.apache.camel.Endpoint; +import org.apache.camel.FailedToCreateProducerException; +import org.apache.camel.Producer; +import org.apache.camel.util.ServiceHelper; + +/** + * A {@link org.apache.camel.impl.ProducerCache} which is always empty and does not cache any {@link org.apache.camel.Producer}s. + */ +public class EmptyProducerCache extends ProducerCache { + + public EmptyProducerCache(Object source, CamelContext camelContext) { + super(source, camelContext, 0); + } + + @Override + public Producer acquireProducer(Endpoint endpoint) { + // always create a new producer + Producer answer = null; + try { + answer = endpoint.createProducer(); + // must then start service so producer is ready to be used + ServiceHelper.startService(answer); + } catch (Exception e) { + throw new FailedToCreateProducerException(endpoint, e); + } + return answer; + } + + @Override + public void releaseProducer(Endpoint endpoint, Producer producer) throws Exception { + // stop and shutdown the producer as its not cache or reused + ServiceHelper.stopAndShutdownService(producer); + } + + @Override + public String toString() { + return "EmptyProducerCache for source: " + getSource(); + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/12483508/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java b/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java index fc12fb1..c5b5796 100644 --- a/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java +++ b/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java @@ -19,7 +19,6 @@ package org.apache.camel.model; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutorService; - import javax.xml.bind.annotation.XmlAccessType; import javax.xml.bind.annotation.XmlAccessorType; import javax.xml.bind.annotation.XmlAttribute; @@ -77,6 +76,8 @@ public class RecipientListDefinition<Type extends ProcessorDefinition<Type>> ext private Processor onPrepare; @XmlAttribute private Boolean shareUnitOfWork; + @XmlAttribute + private Integer cacheSize; public RecipientListDefinition() { } @@ -118,6 +119,9 @@ public class RecipientListDefinition<Type extends ProcessorDefinition<Type>> ext answer.setParallelProcessing(isParallelProcessing()); answer.setStreaming(isStreaming()); answer.setShareUnitOfWork(isShareUnitOfWork()); + if (getCacheSize() != null) { + answer.setCacheSize(getCacheSize()); + } if (onPrepareRef != null) { onPrepare = CamelContextHelper.mandatoryLookup(routeContext.getCamelContext(), onPrepareRef, Processor.class); } @@ -367,6 +371,18 @@ public class RecipientListDefinition<Type extends ProcessorDefinition<Type>> ext return this; } + /** + * Sets the maximum size used by the {@link org.apache.camel.impl.ProducerCache} which is used + * to cache and reuse producers when using this recipient list, when uris are reused. + * + * @param cacheSize the cache size, use <tt>0</tt> for default cache size, or <tt>-1</tt> to turn cache off. + * @return the builder + */ + public RecipientListDefinition<Type> cacheSize(int cacheSize) { + setCacheSize(cacheSize); + return this; + } + // Properties //------------------------------------------------------------------------- @@ -510,4 +526,11 @@ public class RecipientListDefinition<Type extends ProcessorDefinition<Type>> ext return shareUnitOfWork != null && shareUnitOfWork; } + public Integer getCacheSize() { + return cacheSize; + } + + public void setCacheSize(Integer cacheSize) { + this.cacheSize = cacheSize; + } } http://git-wip-us.apache.org/repos/asf/camel/blob/12483508/camel-core/src/main/java/org/apache/camel/model/RoutingSlipDefinition.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/model/RoutingSlipDefinition.java b/camel-core/src/main/java/org/apache/camel/model/RoutingSlipDefinition.java index ff9d3c2..bfc3173 100644 --- a/camel-core/src/main/java/org/apache/camel/model/RoutingSlipDefinition.java +++ b/camel-core/src/main/java/org/apache/camel/model/RoutingSlipDefinition.java @@ -41,6 +41,8 @@ public class RoutingSlipDefinition<Type extends ProcessorDefinition<Type>> exten private String uriDelimiter; @XmlAttribute private Boolean ignoreInvalidEndpoints; + @XmlAttribute + private Integer cacheSize; public RoutingSlipDefinition() { this((String)null, DEFAULT_DELIMITER); @@ -88,6 +90,9 @@ public class RoutingSlipDefinition<Type extends ProcessorDefinition<Type>> exten if (getIgnoreInvalidEndpoints() != null) { routingSlip.setIgnoreInvalidEndpoints(getIgnoreInvalidEndpoints()); } + if (getCacheSize() != null) { + routingSlip.setCacheSize(getCacheSize()); + } return routingSlip; } @@ -111,7 +116,15 @@ public class RoutingSlipDefinition<Type extends ProcessorDefinition<Type>> exten public Boolean getIgnoreInvalidEndpoints() { return ignoreInvalidEndpoints; } - + + public Integer getCacheSize() { + return cacheSize; + } + + public void setCacheSize(Integer cacheSize) { + this.cacheSize = cacheSize; + } + // Fluent API // ------------------------------------------------------------------------- @@ -142,4 +155,17 @@ public class RoutingSlipDefinition<Type extends ProcessorDefinition<Type>> exten setUriDelimiter(uriDelimiter); return this; } + + /** + * Sets the maximum size used by the {@link org.apache.camel.impl.ProducerCache} which is used + * to cache and reuse producers when using this recipient list, when uris are reused. + * + * @param cacheSize the cache size, use <tt>0</tt> for default cache size, or <tt>-1</tt> to turn cache off. + * @return the builder + */ + public RoutingSlipDefinition<Type> cacheSize(int cacheSize) { + setCacheSize(cacheSize); + return this; + } + } http://git-wip-us.apache.org/repos/asf/camel/blob/12483508/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java b/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java index 4b700a9..94652f3 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java +++ b/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java @@ -26,6 +26,7 @@ import org.apache.camel.Endpoint; import org.apache.camel.Exchange; import org.apache.camel.Expression; import org.apache.camel.Processor; +import org.apache.camel.impl.EmptyProducerCache; import org.apache.camel.impl.ProducerCache; import org.apache.camel.processor.aggregate.AggregationStrategy; import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy; @@ -34,6 +35,8 @@ import org.apache.camel.util.AsyncProcessorHelper; import org.apache.camel.util.ExchangeHelper; import org.apache.camel.util.ObjectHelper; import org.apache.camel.util.ServiceHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import static org.apache.camel.util.ObjectHelper.notNull; @@ -46,6 +49,7 @@ import static org.apache.camel.util.ObjectHelper.notNull; * @version */ public class RecipientList extends ServiceSupport implements AsyncProcessor { + private static final Logger LOG = LoggerFactory.getLogger(RecipientList.class); private final CamelContext camelContext; private ProducerCache producerCache; private Expression expression; @@ -55,6 +59,7 @@ public class RecipientList extends ServiceSupport implements AsyncProcessor { private boolean ignoreInvalidEndpoints; private boolean streaming; private long timeout; + private int cacheSize; private Processor onPrepare; private boolean shareUnitOfWork; private ExecutorService executorService; @@ -163,7 +168,16 @@ public class RecipientList extends ServiceSupport implements AsyncProcessor { protected void doStart() throws Exception { if (producerCache == null) { - producerCache = new ProducerCache(this, camelContext); + if (cacheSize < 0) { + producerCache = new EmptyProducerCache(this, camelContext); + LOG.debug("RecipientList {} is not using ProducerCache", this); + } else if (cacheSize == 0) { + producerCache = new ProducerCache(this, camelContext); + LOG.debug("RecipientList {} using ProducerCache with default cache size", this); + } else { + producerCache = new ProducerCache(this, camelContext, cacheSize); + LOG.debug("RecipientList {} using ProducerCache with cacheSize={}", this, cacheSize); + } } ServiceHelper.startServices(aggregationStrategy, producerCache); } @@ -259,4 +273,12 @@ public class RecipientList extends ServiceSupport implements AsyncProcessor { public void setShareUnitOfWork(boolean shareUnitOfWork) { this.shareUnitOfWork = shareUnitOfWork; } + + public int getCacheSize() { + return cacheSize; + } + + public void setCacheSize(int cacheSize) { + this.cacheSize = cacheSize; + } } http://git-wip-us.apache.org/repos/asf/camel/blob/12483508/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java b/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java index 99b9069..88e57db 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java +++ b/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java @@ -32,6 +32,7 @@ import org.apache.camel.Producer; import org.apache.camel.Traceable; import org.apache.camel.builder.ExpressionBuilder; import org.apache.camel.impl.DefaultExchange; +import org.apache.camel.impl.EmptyProducerCache; import org.apache.camel.impl.ProducerCache; import org.apache.camel.support.ServiceSupport; import org.apache.camel.util.AsyncProcessorHelper; @@ -57,6 +58,7 @@ import static org.apache.camel.util.ObjectHelper.notNull; public class RoutingSlip extends ServiceSupport implements AsyncProcessor, Traceable { protected final Logger log = LoggerFactory.getLogger(getClass()); protected ProducerCache producerCache; + protected int cacheSize; protected boolean ignoreInvalidEndpoints; protected String header; protected Expression expression; @@ -113,6 +115,14 @@ public class RoutingSlip extends ServiceSupport implements AsyncProcessor, Trace this.ignoreInvalidEndpoints = ignoreInvalidEndpoints; } + public int getCacheSize() { + return cacheSize; + } + + public void setCacheSize(int cacheSize) { + this.cacheSize = cacheSize; + } + @Override public String toString() { return "RoutingSlip[expression=" + expression + " uriDelimiter=" + uriDelimiter + "]"; @@ -359,7 +369,16 @@ public class RoutingSlip extends ServiceSupport implements AsyncProcessor, Trace protected void doStart() throws Exception { if (producerCache == null) { - producerCache = new ProducerCache(this, camelContext); + if (cacheSize < 0) { + producerCache = new EmptyProducerCache(this, camelContext); + log.debug("RoutingSlip {} is not using ProducerCache", this); + } else if (cacheSize == 0) { + producerCache = new ProducerCache(this, camelContext); + log.debug("RoutingSlip {} using ProducerCache with default cache size", this); + } else { + producerCache = new ProducerCache(this, camelContext, cacheSize); + log.debug("RoutingSlip {} using ProducerCache with cacheSize={}", this, cacheSize); + } } ServiceHelper.startService(producerCache); } http://git-wip-us.apache.org/repos/asf/camel/blob/12483508/camel-core/src/test/java/org/apache/camel/impl/EmptyProducerCacheTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/impl/EmptyProducerCacheTest.java b/camel-core/src/test/java/org/apache/camel/impl/EmptyProducerCacheTest.java new file mode 100644 index 0000000..0f43d32 --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/impl/EmptyProducerCacheTest.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.impl; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Endpoint; +import org.apache.camel.Producer; + +public class EmptyProducerCacheTest extends ContextTestSupport { + + public void testEmptyCache() throws Exception { + ProducerCache cache = new EmptyProducerCache(this, context); + cache.start(); + + assertEquals("Size should be 0", 0, cache.size()); + + // we never cache any producers + Endpoint e = context.getEndpoint("direct:queue:1"); + Producer p = cache.acquireProducer(e); + + assertEquals("Size should be 0", 0, cache.size()); + + cache.releaseProducer(e, p); + + assertEquals("Size should be 0", 0, cache.size()); + + cache.stop(); + } + + public void testCacheProducerAcquireAndRelease() throws Exception { + ProducerCache cache = new EmptyProducerCache(this, context); + cache.start(); + + assertEquals("Size should be 0", 0, cache.size()); + + // we never cache any producers + for (int i = 0; i < 1003; i++) { + Endpoint e = context.getEndpoint("direct:queue:" + i); + Producer p = cache.acquireProducer(e); + cache.releaseProducer(e, p); + } + + assertEquals("Size should be 1000", 0, cache.size()); + cache.stop(); + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/12483508/camel-core/src/test/java/org/apache/camel/processor/RecipientListNoCacheTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/RecipientListNoCacheTest.java b/camel-core/src/test/java/org/apache/camel/processor/RecipientListNoCacheTest.java new file mode 100644 index 0000000..122e72b --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/processor/RecipientListNoCacheTest.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; + +/** + * @version + */ +public class RecipientListNoCacheTest extends ContextTestSupport { + + public void testNoCache() throws Exception { + MockEndpoint x = getMockEndpoint("mock:x"); + MockEndpoint y = getMockEndpoint("mock:y"); + MockEndpoint z = getMockEndpoint("mock:z"); + + x.expectedBodiesReceived("foo", "bar"); + y.expectedBodiesReceived("foo", "bar"); + z.expectedBodiesReceived("foo", "bar"); + + sendBody("foo"); + sendBody("bar"); + + assertMockEndpointsSatisfied(); + } + + protected void sendBody(String body) { + template.sendBodyAndHeader("direct:a", body, "recipientListHeader", + "mock:x,mock:y,mock:z"); + } + + protected RouteBuilder createRouteBuilder() { + return new RouteBuilder() { + public void configure() { + from("direct:a").recipientList( + header("recipientListHeader").tokenize(",")).cacheSize(0); + } + }; + + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/12483508/camel-core/src/test/java/org/apache/camel/processor/RoutingSlipNoCacheTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/RoutingSlipNoCacheTest.java b/camel-core/src/test/java/org/apache/camel/processor/RoutingSlipNoCacheTest.java new file mode 100644 index 0000000..e7bf94f --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/processor/RoutingSlipNoCacheTest.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; + +/** + * @version + */ +public class RoutingSlipNoCacheTest extends ContextTestSupport { + + public void testNoCache() throws Exception { + MockEndpoint x = getMockEndpoint("mock:x"); + MockEndpoint y = getMockEndpoint("mock:y"); + MockEndpoint z = getMockEndpoint("mock:z"); + + x.expectedBodiesReceived("foo", "bar"); + y.expectedBodiesReceived("foo", "bar"); + z.expectedBodiesReceived("foo", "bar"); + + sendBody("foo"); + sendBody("bar"); + + assertMockEndpointsSatisfied(); + } + + protected void sendBody(String body) { + template.sendBodyAndHeader("direct:a", body, "recipientListHeader", + "mock:x,mock:y,mock:z"); + } + + protected RouteBuilder createRouteBuilder() { + return new RouteBuilder() { + public void configure() { + from("direct:a").routingSlip( + header("recipientListHeader").tokenize(",")).cacheSize(0); + } + }; + + } + +}