Repository: camel Updated Branches: refs/heads/master 38bc5a91a -> 26cbd1839
CAMEL-11266: The ehcache component creates a separate CacheManager per producer route Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/26cbd183 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/26cbd183 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/26cbd183 Branch: refs/heads/master Commit: 26cbd1839eee2079857369e98295dbfa5cb06690 Parents: 38bc5a9 Author: lburgazzoli <lburgazz...@gmail.com> Authored: Wed May 24 12:11:50 2017 +0200 Committer: lburgazzoli <lburgazz...@gmail.com> Committed: Wed May 24 12:12:10 2017 +0200 ---------------------------------------------------------------------- .../org/apache/camel/util/ReferenceCount.java | 90 +++++++++++ .../apache/camel/util/ReferenceCountTest.java | 48 ++++++ .../component/ehcache/EhcacheComponent.java | 43 +++-- .../component/ehcache/EhcacheConsumer.java | 22 +-- .../camel/component/ehcache/EhcacheManager.java | 31 ++-- .../component/ehcache/EhcacheManagerTest.java | 161 +++++++++++++++++++ .../ehcache/EhcacheSpringConfigurationTest.java | 2 +- 7 files changed, 358 insertions(+), 39 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/26cbd183/camel-core/src/main/java/org/apache/camel/util/ReferenceCount.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/util/ReferenceCount.java b/camel-core/src/main/java/org/apache/camel/util/ReferenceCount.java new file mode 100644 index 0000000..c0bb9ea --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/util/ReferenceCount.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.util; + +import java.util.concurrent.atomic.AtomicLong; + +public final class ReferenceCount { + private final AtomicLong count; + private final Runnable onFirst; + private final Runnable onRelease; + + private ReferenceCount(Runnable onFirst, Runnable onRelease) { + this.count = new AtomicLong(0); + this.onFirst = ObjectHelper.notNull(onFirst, "onFirst"); + this.onRelease = ObjectHelper.notNull(onRelease, "onRelease"); + } + + /** + * Returns the reference count. + */ + public long get() { + return count.get(); + } + + /** + * Increases the reference count invoke onFirst on the first increment; + */ + public void retain() throws IllegalStateException { + while (true) { + long v = count.get(); + if (v < 0) { + throw new IllegalStateException("Released"); + } + + if (count.compareAndSet(v, v + 1)) { + if (v == 0) { + this.onFirst.run(); + } + + break; + } + } + } + + /** + * Decreases the reference count and invoke onRelease if the reference count reaches {@code 0}. + */ + public void release() throws IllegalStateException { + while (true) { + long v = count.get(); + if (v <= 0) { + throw new IllegalStateException("ReferenceCount already released"); + } + + if (count.compareAndSet(v, v - 1)) { + if (v == 1) { + onRelease.run(); + } + + break; + } + } + } + + // ******************************* + // Helpers + // ******************************* + + public static ReferenceCount on(Runnable onFirst, Runnable onRelease) { + return new ReferenceCount(onFirst, onRelease); + } + + public static ReferenceCount onRelease(Runnable onRelease) { + return new ReferenceCount(() -> { }, onRelease); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/26cbd183/camel-core/src/test/java/org/apache/camel/util/ReferenceCountTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/util/ReferenceCountTest.java b/camel-core/src/test/java/org/apache/camel/util/ReferenceCountTest.java new file mode 100644 index 0000000..f3ba778 --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/util/ReferenceCountTest.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.util; + +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.Assert; +import org.junit.Test; + +public class ReferenceCountTest { + + @Test + public void testReferenceCount() { + AtomicInteger cnt = new AtomicInteger(0); + + ReferenceCount ref = ReferenceCount.on(cnt::incrementAndGet, cnt::decrementAndGet); + + ref.retain(); + Assert.assertEquals(1, ref.get()); + Assert.assertEquals(1, cnt.get()); + + ref.retain(); + Assert.assertEquals(2, ref.get()); + Assert.assertEquals(1, cnt.get()); + + ref.release(); + Assert.assertEquals(1, ref.get()); + Assert.assertEquals(1, cnt.get()); + + ref.release(); + Assert.assertEquals(0, ref.get()); + Assert.assertEquals(0, cnt.get()); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/26cbd183/components/camel-ehcache/src/main/java/org/apache/camel/component/ehcache/EhcacheComponent.java ---------------------------------------------------------------------- diff --git a/components/camel-ehcache/src/main/java/org/apache/camel/component/ehcache/EhcacheComponent.java b/components/camel-ehcache/src/main/java/org/apache/camel/component/ehcache/EhcacheComponent.java index 614f6cb..cb00afc 100644 --- a/components/camel-ehcache/src/main/java/org/apache/camel/component/ehcache/EhcacheComponent.java +++ b/components/camel-ehcache/src/main/java/org/apache/camel/component/ehcache/EhcacheComponent.java @@ -19,10 +19,13 @@ package org.apache.camel.component.ehcache; import java.io.IOException; import java.net.URL; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import org.apache.camel.CamelContext; import org.apache.camel.Endpoint; import org.apache.camel.impl.DefaultComponent; +import org.apache.camel.spi.ClassResolver; import org.apache.camel.spi.Metadata; import org.apache.camel.util.ObjectHelper; import org.apache.camel.util.ResourceHelper; @@ -40,6 +43,8 @@ import org.slf4j.LoggerFactory; public class EhcacheComponent extends DefaultComponent { private static final Logger LOGGER = LoggerFactory.getLogger(EhcacheComponent.class); + private final ConcurrentMap<Object, EhcacheManager> managers = new ConcurrentHashMap<>(); + @Metadata(label = "advanced") private EhcacheConfiguration configuration = new EhcacheConfiguration(); @@ -66,34 +71,48 @@ public class EhcacheComponent extends DefaultComponent { ObjectHelper.notNull(configuration, "Camel Ehcache configuration"); // Check if a cache manager has been configured - CacheManager manager = configuration.getCacheManager(); - if (manager != null) { + if (configuration.hasCacheManager()) { LOGGER.info("EhcacheManager configured with supplied CacheManager"); - return new EhcacheManager(manager, false, configuration); + + return managers.computeIfAbsent( + configuration.getCacheManager(), + m -> new EhcacheManager( + CacheManager.class.cast(m), + false, + configuration) + ); } // Check if a cache manager configuration has been provided if (configuration.hasCacheManagerConfiguration()) { LOGGER.info("EhcacheManager configured with supplied CacheManagerConfiguration"); - return new EhcacheManager( - CacheManagerBuilder.newCacheManager(configuration.getCacheManagerConfiguration()), - true, - configuration + return managers.computeIfAbsent( + configuration.getCacheManagerConfiguration(), + c -> new EhcacheManager( + CacheManagerBuilder.newCacheManager(Configuration.class.cast(c)), + true, + configuration + ) ); } // Check if a configuration file has been provided if (configuration.hasConfigurationUri()) { String configurationUri = configuration.getConfigurationUri(); - URL url = ResourceHelper.resolveMandatoryResourceAsUrl(getCamelContext().getClassResolver(), configurationUri); + ClassResolver classResolver = getCamelContext().getClassResolver(); + + URL url = ResourceHelper.resolveMandatoryResourceAsUrl(classResolver, configurationUri); LOGGER.info("EhcacheManager configured with supplied URI {}", url); - return new EhcacheManager( - CacheManagerBuilder.newCacheManager(new XmlConfiguration(url)), - true, - configuration + return managers.computeIfAbsent( + url, + u -> new EhcacheManager( + CacheManagerBuilder.newCacheManager(new XmlConfiguration(URL.class.cast(u))), + true, + configuration + ) ); } http://git-wip-us.apache.org/repos/asf/camel/blob/26cbd183/components/camel-ehcache/src/main/java/org/apache/camel/component/ehcache/EhcacheConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-ehcache/src/main/java/org/apache/camel/component/ehcache/EhcacheConsumer.java b/components/camel-ehcache/src/main/java/org/apache/camel/component/ehcache/EhcacheConsumer.java index b85bf19..b8db0c4 100644 --- a/components/camel-ehcache/src/main/java/org/apache/camel/component/ehcache/EhcacheConsumer.java +++ b/components/camel-ehcache/src/main/java/org/apache/camel/component/ehcache/EhcacheConsumer.java @@ -58,18 +58,20 @@ public class EhcacheConsumer extends DefaultConsumer implements CacheEventListen @Override public void onEvent(CacheEvent<? extends Object, ? extends Object> event) { - final Exchange exchange = getEndpoint().createExchange(); - final Message message = exchange.getIn(); + if (isRunAllowed()) { + final Exchange exchange = getEndpoint().createExchange(); + final Message message = exchange.getIn(); - message.setHeader(EhcacheConstants.KEY, event.getKey()); - message.setHeader(EhcacheConstants.EVENT_TYPE, event.getType()); - message.setHeader(EhcacheConstants.OLD_VALUE, event.getOldValue()); - message.setBody(event.getNewValue()); + message.setHeader(EhcacheConstants.KEY, event.getKey()); + message.setHeader(EhcacheConstants.EVENT_TYPE, event.getType()); + message.setHeader(EhcacheConstants.OLD_VALUE, event.getOldValue()); + message.setBody(event.getNewValue()); - try { - getProcessor().process(exchange); - } catch (Exception e) { - getExceptionHandler().handleException("Error processing exchange", exchange, e); + try { + getProcessor().process(exchange); + } catch (Exception e) { + getExceptionHandler().handleException("Error processing exchange", exchange, e); + } } } } http://git-wip-us.apache.org/repos/asf/camel/blob/26cbd183/components/camel-ehcache/src/main/java/org/apache/camel/component/ehcache/EhcacheManager.java ---------------------------------------------------------------------- diff --git a/components/camel-ehcache/src/main/java/org/apache/camel/component/ehcache/EhcacheManager.java b/components/camel-ehcache/src/main/java/org/apache/camel/component/ehcache/EhcacheManager.java index 90ebd60..bc7b8c3 100644 --- a/components/camel-ehcache/src/main/java/org/apache/camel/component/ehcache/EhcacheManager.java +++ b/components/camel-ehcache/src/main/java/org/apache/camel/component/ehcache/EhcacheManager.java @@ -22,6 +22,7 @@ import java.util.concurrent.ConcurrentMap; import org.apache.camel.RuntimeCamelException; import org.apache.camel.Service; import org.apache.camel.util.ObjectHelper; +import org.apache.camel.util.ReferenceCount; import org.ehcache.Cache; import org.ehcache.CacheManager; import org.ehcache.UserManagedCache; @@ -32,35 +33,29 @@ public class EhcacheManager implements Service { private final EhcacheConfiguration configuration; private final CacheManager cacheManager; private final ConcurrentMap<String, UserManagedCache<?, ?>> userCaches; - private final boolean managed; + private final ReferenceCount refCount; public EhcacheManager(CacheManager cacheManager, boolean managed, EhcacheConfiguration configuration) { - this.cacheManager = cacheManager; + this.cacheManager = ObjectHelper.notNull(cacheManager, "cacheManager"); this.userCaches = new ConcurrentHashMap<>(); - this.managed = managed; this.configuration = configuration; - - ObjectHelper.notNull(cacheManager, "cacheManager"); + this.refCount = ReferenceCount.on( + managed ? cacheManager::init : () -> { }, + managed ? cacheManager::close : () -> { } + ); } @Override public synchronized void start() throws Exception { - if (managed) { - cacheManager.init(); - } + refCount.retain(); } @Override public synchronized void stop() throws Exception { - if (managed) { - cacheManager.close(); - } - - // Clean up any User managed cache + refCount.release(); userCaches.values().forEach(UserManagedCache::close); } - @SuppressWarnings("unchecked") public <K, V> Cache<K, V> getCache(String name, Class<K> keyType, Class<V> valueType) throws Exception { Cache<K, V> cache = cacheManager.getCache(name, keyType, valueType); if (cache == null && configuration != null && configuration.isCreateCacheIfNotExist()) { @@ -71,10 +66,10 @@ public class EhcacheManager implements Service { } else { // If a cache configuration is not provided, create a User Managed // Cache - cache = (Cache<K, V>)userCaches.computeIfAbsent( + cache = Cache.class.cast(userCaches.computeIfAbsent( name, key -> UserManagedCacheBuilder.newUserManagedCacheBuilder(keyType, valueType).build(true) - ); + )); } } @@ -88,4 +83,8 @@ public class EhcacheManager implements Service { CacheManager getCacheManager() { return this.cacheManager; } + + ReferenceCount getReferenceCount() { + return refCount; + } } http://git-wip-us.apache.org/repos/asf/camel/blob/26cbd183/components/camel-ehcache/src/test/java/org/apache/camel/component/ehcache/EhcacheManagerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-ehcache/src/test/java/org/apache/camel/component/ehcache/EhcacheManagerTest.java b/components/camel-ehcache/src/test/java/org/apache/camel/component/ehcache/EhcacheManagerTest.java new file mode 100644 index 0000000..9326e15 --- /dev/null +++ b/components/camel-ehcache/src/test/java/org/apache/camel/component/ehcache/EhcacheManagerTest.java @@ -0,0 +1,161 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.component.ehcache; + +import org.apache.camel.CamelContext; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.impl.DefaultCamelContext; +import org.apache.camel.impl.SimpleRegistry; +import org.ehcache.CacheManager; +import org.ehcache.Status; +import org.ehcache.config.builders.CacheManagerBuilder; +import org.ehcache.xml.XmlConfiguration; +import org.junit.Assert; +import org.junit.Test; + +public class EhcacheManagerTest { + + @Test + public void testCacheManagerFromFile() throws Exception { + CamelContext context = null; + + try { + context = new DefaultCamelContext(); + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:ehcache") + .to("ehcache:myCache1?configurationUri=classpath:ehcache/ehcache-file-config.xml") + .to("ehcache:myCache2?configurationUri=classpath:ehcache/ehcache-file-config.xml"); + } + }); + + context.start(); + + EhcacheEndpoint e1 = context.getEndpoint("ehcache:myCache1?configurationUri=classpath:ehcache/ehcache-file-config.xml", EhcacheEndpoint.class); + EhcacheEndpoint e2 = context.getEndpoint("ehcache:myCache2?configurationUri=classpath:ehcache/ehcache-file-config.xml", EhcacheEndpoint.class); + + Assert.assertEquals(e1.getManager(), e2.getManager()); + Assert.assertEquals(e1.getManager().getCacheManager(), e2.getManager().getCacheManager()); + Assert.assertEquals(2, e1.getManager().getReferenceCount().get()); + Assert.assertEquals(Status.AVAILABLE, e1.getManager().getCacheManager().getStatus()); + + context.stop(); + + Assert.assertEquals(e1.getManager(), e2.getManager()); + Assert.assertEquals(e1.getManager().getCacheManager(), e2.getManager().getCacheManager()); + Assert.assertEquals(0, e1.getManager().getReferenceCount().get()); + Assert.assertEquals(Status.UNINITIALIZED, e1.getManager().getCacheManager().getStatus()); + + } finally { + if (context != null) { + context.stop(); + } + } + } + + @Test + public void testCacheManagerFromConfiguration() throws Exception { + CamelContext context = null; + + try { + SimpleRegistry registry = new SimpleRegistry(); + registry.put("myConf", new XmlConfiguration(getClass().getResource("/ehcache/ehcache-file-config.xml"))); + + context = new DefaultCamelContext(registry); + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:ehcache") + .to("ehcache:myCache1?cacheManagerConfiguration=#myConf") + .to("ehcache:myCache2?cacheManagerConfiguration=#myConf"); + } + }); + + context.start(); + + EhcacheEndpoint e1 = context.getEndpoint("ehcache:myCache1?cacheManagerConfiguration=#myConf", EhcacheEndpoint.class); + EhcacheEndpoint e2 = context.getEndpoint("ehcache:myCache2?cacheManagerConfiguration=#myConf", EhcacheEndpoint.class); + + Assert.assertEquals(e1.getManager(), e2.getManager()); + Assert.assertEquals(e1.getManager().getCacheManager(), e2.getManager().getCacheManager()); + Assert.assertEquals(2, e1.getManager().getReferenceCount().get()); + Assert.assertEquals(Status.AVAILABLE, e1.getManager().getCacheManager().getStatus()); + + context.stop(); + + Assert.assertEquals(e1.getManager(), e2.getManager()); + Assert.assertEquals(e1.getManager().getCacheManager(), e2.getManager().getCacheManager()); + Assert.assertEquals(0, e1.getManager().getReferenceCount().get()); + Assert.assertEquals(Status.UNINITIALIZED, e1.getManager().getCacheManager().getStatus()); + + } finally { + if (context != null) { + context.stop(); + } + } + } + + @Test + public void testCacheManager() throws Exception { + CamelContext context = null; + CacheManager cacheManager = null; + + try { + cacheManager = CacheManagerBuilder.newCacheManagerBuilder().build(true); + + SimpleRegistry registry = new SimpleRegistry(); + registry.put("myManager", cacheManager); + + context = new DefaultCamelContext(registry); + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:ehcache") + .to("ehcache:myCache1?cacheManager=#myManager") + .to("ehcache:myCache2?cacheManager=#myManager"); + } + }); + + context.start(); + + EhcacheEndpoint e1 = context.getEndpoint("ehcache:myCache1?cacheManager=#myManager", EhcacheEndpoint.class); + EhcacheEndpoint e2 = context.getEndpoint("ehcache:myCache2?cacheManager=#myManager", EhcacheEndpoint.class); + + Assert.assertEquals(e1.getManager(), e2.getManager()); + Assert.assertEquals(e1.getManager().getCacheManager(), e2.getManager().getCacheManager()); + Assert.assertEquals(2, e1.getManager().getReferenceCount().get()); + Assert.assertEquals(Status.AVAILABLE, e1.getManager().getCacheManager().getStatus()); + + context.stop(); + + Assert.assertEquals(e1.getManager(), e2.getManager()); + Assert.assertEquals(e1.getManager().getCacheManager(), e2.getManager().getCacheManager()); + Assert.assertEquals(0, e1.getManager().getReferenceCount().get()); + Assert.assertEquals(Status.AVAILABLE, e1.getManager().getCacheManager().getStatus()); + + } finally { + if (context != null) { + context.stop(); + } + if (cacheManager != null) { + cacheManager.close(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/26cbd183/components/camel-ehcache/src/test/java/org/apache/camel/component/ehcache/EhcacheSpringConfigurationTest.java ---------------------------------------------------------------------- diff --git a/components/camel-ehcache/src/test/java/org/apache/camel/component/ehcache/EhcacheSpringConfigurationTest.java b/components/camel-ehcache/src/test/java/org/apache/camel/component/ehcache/EhcacheSpringConfigurationTest.java index b76cf42..1524b4c 100644 --- a/components/camel-ehcache/src/test/java/org/apache/camel/component/ehcache/EhcacheSpringConfigurationTest.java +++ b/components/camel-ehcache/src/test/java/org/apache/camel/component/ehcache/EhcacheSpringConfigurationTest.java @@ -32,7 +32,7 @@ public class EhcacheSpringConfigurationTest extends CamelSpringTestSupport { @EndpointInject(uri = "ehcache://myProgrammaticCacheConf?configuration=#myProgrammaticConfiguration") private EhcacheEndpoint ehcacheConf; - @EndpointInject(uri = "ehcache://myFileCacheConf?keyType=java.lang.String&valueType=java.lang.String&configUri=classpath:ehcache/ehcache-file-config.xml") + @EndpointInject(uri = "ehcache://myFileCacheConf?keyType=java.lang.String&valueType=java.lang.String&configurationUri=classpath:ehcache/ehcache-file-config.xml") private EhcacheEndpoint ehcacheFileConf; @Override