This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push: new 6870774 CAMEL-12969 : Map based Service Usage counting to remove memory leak (#2695) 6870774 is described below commit 6870774d274b1ed835a17904f89fbdcc581bbda4 Author: Bob Paulin <b...@bobpaulin.com> AuthorDate: Sun Dec 30 11:57:36 2018 -0600 CAMEL-12969 : Map based Service Usage counting to remove memory leak (#2695) --- .../camel/core/osgi/OsgiServiceRegistry.java | 59 +++++++++++++++------- 1 file changed, 42 insertions(+), 17 deletions(-) diff --git a/components/camel-core-osgi/src/main/java/org/apache/camel/core/osgi/OsgiServiceRegistry.java b/components/camel-core-osgi/src/main/java/org/apache/camel/core/osgi/OsgiServiceRegistry.java index c10eb22..72a60e6 100644 --- a/components/camel-core-osgi/src/main/java/org/apache/camel/core/osgi/OsgiServiceRegistry.java +++ b/components/camel-core-osgi/src/main/java/org/apache/camel/core/osgi/OsgiServiceRegistry.java @@ -19,11 +19,10 @@ package org.apache.camel.core.osgi; import java.util.HashMap; import java.util.HashSet; import java.util.Map; -import java.util.Queue; import java.util.Set; -import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; -import org.apache.camel.CamelContext; import org.apache.camel.RuntimeCamelException; import org.apache.camel.Service; import org.apache.camel.spi.Registry; @@ -31,6 +30,8 @@ import org.apache.camel.support.LifecycleStrategySupport; import org.osgi.framework.BundleContext; import org.osgi.framework.Constants; import org.osgi.framework.InvalidSyntaxException; +import org.osgi.framework.ServiceEvent; +import org.osgi.framework.ServiceListener; import org.osgi.framework.ServiceReference; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,13 +39,14 @@ import org.slf4j.LoggerFactory; /** * The OsgiServiceRegistry support to get the service object from the bundle context */ -public class OsgiServiceRegistry extends LifecycleStrategySupport implements Registry, Service { - private static final Logger LOG = LoggerFactory.getLogger(OsgiCamelContextHelper.class); +public class OsgiServiceRegistry extends LifecycleStrategySupport implements Registry, Service, ServiceListener { + private static final Logger LOG = LoggerFactory.getLogger(OsgiServiceRegistry.class); private final BundleContext bundleContext; - private final Queue<ServiceReference<?>> serviceReferenceQueue = new ConcurrentLinkedQueue<>(); + private final Map<ServiceReference<?>, AtomicLong> serviceReferenceUsageMap = new ConcurrentHashMap<>(); public OsgiServiceRegistry(BundleContext bc) { bundleContext = bc; + bundleContext.addServiceListener(this); } /** @@ -58,7 +60,7 @@ public class OsgiServiceRegistry extends LifecycleStrategySupport implements Reg if (refs != null && refs.length > 0) { // just return the first one sr = refs[0]; - serviceReferenceQueue.add(sr); + incrementServiceUsage(sr); service = bundleContext.getService(sr); } } catch (Exception ex) { @@ -89,9 +91,7 @@ public class OsgiServiceRegistry extends LifecycleStrategySupport implements Reg } } if (sr != null) { - // Need to keep the track of Service - // and call ungetService when the camel context is closed - serviceReferenceQueue.add(sr); + incrementServiceUsage(sr); service = bundleContext.getService(sr); } return service; @@ -106,7 +106,7 @@ public class OsgiServiceRegistry extends LifecycleStrategySupport implements Reg for (ServiceReference<?> sr : refs) { if (sr != null) { Object service = bundleContext.getService(sr); - serviceReferenceQueue.add(sr); + incrementServiceUsage(sr); if (service != null) { String name = (String)sr.getProperty("name"); if (name != null) { @@ -140,12 +140,37 @@ public class OsgiServiceRegistry extends LifecycleStrategySupport implements Reg public void stop() throws Exception { // Unget the OSGi service as OSGi uses reference counting // and we should do this as one of the last actions when stopping Camel - ServiceReference<?> sr = serviceReferenceQueue.poll(); - while (sr != null) { - bundleContext.ungetService(sr); - sr = serviceReferenceQueue.poll(); + this.serviceReferenceUsageMap.forEach(this::drainServiceUsage); + this.serviceReferenceUsageMap.clear(); + } + + void drainServiceUsage(ServiceReference<?> serviceReference, AtomicLong serviceUsageCount) { + if (serviceUsageCount != null && serviceReference != null) { + while(serviceUsageCount.decrementAndGet() >= 0) { + this.bundleContext.ungetService(serviceReference); + } + } + } + + void incrementServiceUsage(ServiceReference<?> sr) { + AtomicLong serviceUsageCount = this.serviceReferenceUsageMap.get(sr); + if (serviceUsageCount != null) { + serviceUsageCount.incrementAndGet(); + } else { + this.serviceReferenceUsageMap.merge(sr, new AtomicLong(1), + (existingServiceUsageCount, newServiceUsageCount)->{ + existingServiceUsageCount.getAndAdd(newServiceUsageCount.get()); + return existingServiceUsageCount; + }); + } + } + + @Override + public void serviceChanged(ServiceEvent event) { + if (event.getType() == ServiceEvent.UNREGISTERING) { + ServiceReference<?> serviceReference = event.getServiceReference(); + AtomicLong serviceUsageCount = this.serviceReferenceUsageMap.remove(serviceReference); + drainServiceUsage(serviceReference, serviceUsageCount); } - // Clean up the OSGi Service Cache - serviceReferenceQueue.clear(); } }