CAMEL-9569: Fixed idempontent consumer - to not leak memory when add/remove routes with it many times.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/c4b131d7 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/c4b131d7 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/c4b131d7 Branch: refs/heads/camel-2.15.x Commit: c4b131d73c43603a02f7247efd77ea11ae4ce043 Parents: 508c1df Author: Claus Ibsen <davscl...@apache.org> Authored: Fri Feb 5 17:20:58 2016 +0100 Committer: Claus Ibsen <davscl...@apache.org> Committed: Fri Feb 5 17:53:11 2016 +0100 ---------------------------------------------------------------------- .../model/IdempotentConsumerDefinition.java | 3 --- .../idempotent/IdempotentConsumer.java | 28 +++++++++++++++++--- 2 files changed, 25 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/c4b131d7/camel-core/src/main/java/org/apache/camel/model/IdempotentConsumerDefinition.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/model/IdempotentConsumerDefinition.java b/camel-core/src/main/java/org/apache/camel/model/IdempotentConsumerDefinition.java index 5c64e04..ce9618b 100644 --- a/camel-core/src/main/java/org/apache/camel/model/IdempotentConsumerDefinition.java +++ b/camel-core/src/main/java/org/apache/camel/model/IdempotentConsumerDefinition.java @@ -194,9 +194,6 @@ public class IdempotentConsumerDefinition extends ExpressionNode { (IdempotentRepository<String>) resolveMessageIdRepository(routeContext); ObjectHelper.notNull(idempotentRepository, "idempotentRepository", this); - // add as service to CamelContext so we can managed it and it ensures it will be shutdown when camel shutdowns - routeContext.getCamelContext().addService(idempotentRepository); - Expression expression = getExpression().createExpression(routeContext); // these boolean should be true by default http://git-wip-us.apache.org/repos/asf/camel/blob/c4b131d7/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java b/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java index f534991..c470b1a 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java +++ b/camel-core/src/main/java/org/apache/camel/processor/idempotent/IdempotentConsumer.java @@ -22,6 +22,8 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.camel.AsyncCallback; import org.apache.camel.AsyncProcessor; +import org.apache.camel.CamelContext; +import org.apache.camel.CamelContextAware; import org.apache.camel.Exchange; import org.apache.camel.Expression; import org.apache.camel.Navigate; @@ -48,8 +50,9 @@ import org.slf4j.LoggerFactory; * @see org.apache.camel.spi.IdempotentRepository * @see org.apache.camel.spi.ExchangeIdempotentRepository */ -public class IdempotentConsumer extends ServiceSupport implements AsyncProcessor, Navigate<Processor> { +public class IdempotentConsumer extends ServiceSupport implements CamelContextAware, AsyncProcessor, Navigate<Processor> { private static final Logger LOG = LoggerFactory.getLogger(IdempotentConsumer.class); + private CamelContext camelContext; private final Expression messageIdExpression; private final AsyncProcessor processor; private final IdempotentRepository<String> idempotentRepository; @@ -73,6 +76,16 @@ public class IdempotentConsumer extends ServiceSupport implements AsyncProcessor return "IdempotentConsumer[" + messageIdExpression + " -> " + processor + "]"; } + @Override + public CamelContext getCamelContext() { + return camelContext; + } + + @Override + public void setCamelContext(CamelContext camelContext) { + this.camelContext = camelContext; + } + public void process(Exchange exchange) throws Exception { AsyncProcessorHelper.process(this, exchange); } @@ -172,11 +185,20 @@ public class IdempotentConsumer extends ServiceSupport implements AsyncProcessor // ------------------------------------------------------------------------- protected void doStart() throws Exception { - ServiceHelper.startServices(processor); + ServiceHelper.startServices(processor, idempotentRepository); + if (!camelContext.hasService(idempotentRepository)) { + camelContext.addService(idempotentRepository); + } } protected void doStop() throws Exception { - ServiceHelper.stopServices(processor); + ServiceHelper.stopServices(processor, idempotentRepository); + } + + @Override + protected void doShutdown() throws Exception { + ServiceHelper.stopAndShutdownServices(processor, idempotentRepository); + camelContext.removeService(idempotentRepository); } /**