This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push: new 7c6cbf8 [CAMEL-15899] Hazelcast consumer listeners not removed after use 7c6cbf8 is described below commit 7c6cbf89829123ffd2088daff43bd1d5c036c1fe Author: Florian Agsteiner <florian.agstei...@caperwhite.com> AuthorDate: Fri Nov 27 11:07:45 2020 +0100 [CAMEL-15899] Hazelcast consumer listeners not removed after use --- .../hazelcast/list/HazelcastListConsumer.java | 29 +++++++++++++++++++-- .../hazelcast/map/HazelcastMapConsumer.java | 30 ++++++++++++++++++++-- .../multimap/HazelcastMultimapConsumer.java | 28 ++++++++++++++++++-- .../HazelcastReplicatedmapConsumer.java | 29 +++++++++++++++++++-- .../hazelcast/set/HazelcastSetConsumer.java | 27 +++++++++++++++++-- .../hazelcast/topic/HazelcastTopicConsumer.java | 24 +++++++++++++++-- 6 files changed, 155 insertions(+), 12 deletions(-) diff --git a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/list/HazelcastListConsumer.java b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/list/HazelcastListConsumer.java index 63d294e..f22a3bb 100644 --- a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/list/HazelcastListConsumer.java +++ b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/list/HazelcastListConsumer.java @@ -16,6 +16,8 @@ */ package org.apache.camel.component.hazelcast.list; +import java.util.UUID; + import com.hazelcast.collection.IList; import com.hazelcast.core.HazelcastInstance; import org.apache.camel.Consumer; @@ -29,12 +31,35 @@ import org.apache.camel.component.hazelcast.listener.CamelItemListener; */ public class HazelcastListConsumer extends HazelcastDefaultConsumer { + private final IList<Object> queue; + + private UUID listener; + public HazelcastListConsumer(HazelcastInstance hazelcastInstance, Endpoint endpoint, Processor processor, String cacheName) { super(hazelcastInstance, endpoint, processor, cacheName); - IList<Object> queue = hazelcastInstance.getList(cacheName); - queue.addItemListener(new CamelItemListener(this, cacheName), true); + queue = hazelcastInstance.getList(cacheName); + } + + /** + * @see org.apache.camel.support.DefaultConsumer#doStart() + */ + @Override + protected void doStart() throws Exception { + super.doStart(); + + listener = queue.addItemListener(new CamelItemListener(this, cacheName), true); + } + + /** + * @see org.apache.camel.support.DefaultConsumer#doStop() + */ + @Override + protected void doStop() throws Exception { + queue.removeItemListener(listener); + + super.doStop(); } } diff --git a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/map/HazelcastMapConsumer.java b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/map/HazelcastMapConsumer.java index 7076e89..2672e3c 100644 --- a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/map/HazelcastMapConsumer.java +++ b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/map/HazelcastMapConsumer.java @@ -16,6 +16,8 @@ */ package org.apache.camel.component.hazelcast.map; +import java.util.UUID; + import com.hazelcast.core.HazelcastInstance; import com.hazelcast.map.IMap; import org.apache.camel.Endpoint; @@ -23,12 +25,36 @@ import org.apache.camel.Processor; import org.apache.camel.component.hazelcast.HazelcastDefaultConsumer; import org.apache.camel.component.hazelcast.listener.CamelMapListener; + public class HazelcastMapConsumer extends HazelcastDefaultConsumer { + private final IMap<Object, Object> cache; + + private UUID listener; + public HazelcastMapConsumer(HazelcastInstance hazelcastInstance, Endpoint endpoint, Processor processor, String cacheName) { super(hazelcastInstance, endpoint, processor, cacheName); - IMap<Object, Object> cache = hazelcastInstance.getMap(cacheName); - cache.addEntryListener(new CamelMapListener(this, cacheName), true); + cache = hazelcastInstance.getMap(cacheName); + } + + /** + * @see org.apache.camel.support.DefaultConsumer#doStart() + */ + @Override + protected void doStart() throws Exception { + super.doStart(); + + listener = cache.addEntryListener(new CamelMapListener(this, cacheName), true); + } + + /** + * @see org.apache.camel.support.DefaultConsumer#doStop() + */ + @Override + protected void doStop() throws Exception { + cache.removeEntryListener(listener); + + super.doStop(); } } diff --git a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/multimap/HazelcastMultimapConsumer.java b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/multimap/HazelcastMultimapConsumer.java index 0fe4c4b..29c0d6a 100644 --- a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/multimap/HazelcastMultimapConsumer.java +++ b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/multimap/HazelcastMultimapConsumer.java @@ -16,6 +16,8 @@ */ package org.apache.camel.component.hazelcast.multimap; +import java.util.UUID; + import com.hazelcast.core.HazelcastInstance; import com.hazelcast.multimap.MultiMap; import org.apache.camel.Endpoint; @@ -25,12 +27,34 @@ import org.apache.camel.component.hazelcast.listener.CamelEntryListener; public class HazelcastMultimapConsumer extends HazelcastDefaultConsumer { + private final MultiMap<Object, Object> cache; + + private UUID listener; + public HazelcastMultimapConsumer(HazelcastInstance hazelcastInstance, Endpoint endpoint, Processor processor, String cacheName) { super(hazelcastInstance, endpoint, processor, cacheName); - MultiMap<Object, Object> cache = hazelcastInstance.getMultiMap(cacheName); - cache.addEntryListener(new CamelEntryListener(this, cacheName), true); + cache = hazelcastInstance.getMultiMap(cacheName); + } + + /** + * @see org.apache.camel.support.DefaultConsumer#doStart() + */ + @Override + protected void doStart() throws Exception { + super.doStart(); + + listener = cache.addEntryListener(new CamelEntryListener(this, cacheName), true); } + /** + * @see org.apache.camel.support.DefaultConsumer#doStop() + */ + @Override + protected void doStop() throws Exception { + cache.removeEntryListener(listener); + + super.doStop(); + } } diff --git a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/replicatedmap/HazelcastReplicatedmapConsumer.java b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/replicatedmap/HazelcastReplicatedmapConsumer.java index fd50fff..9251134 100644 --- a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/replicatedmap/HazelcastReplicatedmapConsumer.java +++ b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/replicatedmap/HazelcastReplicatedmapConsumer.java @@ -16,6 +16,8 @@ */ package org.apache.camel.component.hazelcast.replicatedmap; +import java.util.UUID; + import com.hazelcast.core.HazelcastInstance; import com.hazelcast.replicatedmap.ReplicatedMap; import org.apache.camel.Endpoint; @@ -23,14 +25,37 @@ import org.apache.camel.Processor; import org.apache.camel.component.hazelcast.HazelcastDefaultConsumer; import org.apache.camel.component.hazelcast.listener.CamelEntryListener; + public class HazelcastReplicatedmapConsumer extends HazelcastDefaultConsumer { + private final ReplicatedMap<Object, Object> cache; + + private UUID listener; + public HazelcastReplicatedmapConsumer(HazelcastInstance hazelcastInstance, Endpoint endpoint, Processor processor, String cacheName) { super(hazelcastInstance, endpoint, processor, cacheName); - ReplicatedMap<Object, Object> cache = hazelcastInstance.getReplicatedMap(cacheName); - cache.addEntryListener(new CamelEntryListener(this, cacheName), true); + cache = hazelcastInstance.getReplicatedMap(cacheName); } + /** + * @see org.apache.camel.support.DefaultConsumer#doStart() + */ + @Override + protected void doStart() throws Exception { + super.doStart(); + + listener = cache.addEntryListener(new CamelEntryListener(this, cacheName), true); + } + + /** + * @see org.apache.camel.support.DefaultConsumer#doStop() + */ + @Override + protected void doStop() throws Exception { + cache.removeEntryListener(listener); + + super.doStop(); + } } diff --git a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/set/HazelcastSetConsumer.java b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/set/HazelcastSetConsumer.java index 8468562..8a17cf9 100644 --- a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/set/HazelcastSetConsumer.java +++ b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/set/HazelcastSetConsumer.java @@ -16,6 +16,8 @@ */ package org.apache.camel.component.hazelcast.set; +import java.util.UUID; + import com.hazelcast.collection.ISet; import com.hazelcast.core.HazelcastInstance; import org.apache.camel.Consumer; @@ -28,12 +30,33 @@ import org.apache.camel.component.hazelcast.listener.CamelItemListener; * Implementation of Hazelcast Set {@link Consumer}. */ public class HazelcastSetConsumer extends HazelcastDefaultConsumer { + private final ISet<Object> set; + + private UUID listener; public HazelcastSetConsumer(HazelcastInstance hazelcastInstance, Endpoint endpoint, Processor processor, String cacheName) { super(hazelcastInstance, endpoint, processor, cacheName); - ISet<Object> set = hazelcastInstance.getSet(cacheName); - set.addItemListener(new CamelItemListener(this, cacheName), true); + set = hazelcastInstance.getSet(cacheName); } + /** + * @see org.apache.camel.support.DefaultConsumer#doStart() + */ + @Override + protected void doStart() throws Exception { + super.doStart(); + + listener = set.addItemListener(new CamelItemListener(this, cacheName), true); + } + + /** + * @see org.apache.camel.support.DefaultConsumer#doStop() + */ + @Override + protected void doStop() throws Exception { + set.removeItemListener(listener); + + super.doStop(); + } } diff --git a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/topic/HazelcastTopicConsumer.java b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/topic/HazelcastTopicConsumer.java index e964988..71f6b56 100644 --- a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/topic/HazelcastTopicConsumer.java +++ b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/topic/HazelcastTopicConsumer.java @@ -16,6 +16,8 @@ */ package org.apache.camel.component.hazelcast.topic; +import java.util.UUID; + import com.hazelcast.core.HazelcastInstance; import com.hazelcast.topic.ITopic; import org.apache.camel.Endpoint; @@ -23,21 +25,39 @@ import org.apache.camel.Processor; import org.apache.camel.component.hazelcast.HazelcastDefaultConsumer; import org.apache.camel.component.hazelcast.listener.CamelMessageListener; + /** * */ public class HazelcastTopicConsumer extends HazelcastDefaultConsumer { + private ITopic<Object> topic; + + private UUID listener; + public HazelcastTopicConsumer(HazelcastInstance hazelcastInstance, Endpoint endpoint, Processor processor, String cacheName, boolean reliable) { super(hazelcastInstance, endpoint, processor, cacheName); - ITopic<Object> topic; if (!reliable) { topic = hazelcastInstance.getTopic(cacheName); } else { topic = hazelcastInstance.getReliableTopic(cacheName); } - topic.addMessageListener(new CamelMessageListener(this, cacheName)); } + /** + * @see org.apache.camel.support.DefaultConsumer#doStart() + */ + @Override + protected void doStart() throws Exception { + super.doStart(); + + listener = topic.addMessageListener(new CamelMessageListener(this, cacheName)); + } + + protected void doStop() throws Exception { + topic.removeMessageListener(listener); + + super.doStop(); + } }