This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new 7c6cbf8  [CAMEL-15899] Hazelcast consumer listeners not removed after 
use
7c6cbf8 is described below

commit 7c6cbf89829123ffd2088daff43bd1d5c036c1fe
Author: Florian Agsteiner <florian.agstei...@caperwhite.com>
AuthorDate: Fri Nov 27 11:07:45 2020 +0100

    [CAMEL-15899] Hazelcast consumer listeners not removed after use
---
 .../hazelcast/list/HazelcastListConsumer.java      | 29 +++++++++++++++++++--
 .../hazelcast/map/HazelcastMapConsumer.java        | 30 ++++++++++++++++++++--
 .../multimap/HazelcastMultimapConsumer.java        | 28 ++++++++++++++++++--
 .../HazelcastReplicatedmapConsumer.java            | 29 +++++++++++++++++++--
 .../hazelcast/set/HazelcastSetConsumer.java        | 27 +++++++++++++++++--
 .../hazelcast/topic/HazelcastTopicConsumer.java    | 24 +++++++++++++++--
 6 files changed, 155 insertions(+), 12 deletions(-)

diff --git 
a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/list/HazelcastListConsumer.java
 
b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/list/HazelcastListConsumer.java
index 63d294e..f22a3bb 100644
--- 
a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/list/HazelcastListConsumer.java
+++ 
b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/list/HazelcastListConsumer.java
@@ -16,6 +16,8 @@
  */
 package org.apache.camel.component.hazelcast.list;
 
+import java.util.UUID;
+
 import com.hazelcast.collection.IList;
 import com.hazelcast.core.HazelcastInstance;
 import org.apache.camel.Consumer;
@@ -29,12 +31,35 @@ import 
org.apache.camel.component.hazelcast.listener.CamelItemListener;
  */
 public class HazelcastListConsumer extends HazelcastDefaultConsumer {
 
+    private final IList<Object> queue;
+
+    private UUID listener;
+
     public HazelcastListConsumer(HazelcastInstance hazelcastInstance, Endpoint 
endpoint, Processor processor,
                                  String cacheName) {
         super(hazelcastInstance, endpoint, processor, cacheName);
 
-        IList<Object> queue = hazelcastInstance.getList(cacheName);
-        queue.addItemListener(new CamelItemListener(this, cacheName), true);
+        queue = hazelcastInstance.getList(cacheName);
+    }
+
+    /**
+     * @see org.apache.camel.support.DefaultConsumer#doStart()
+     */
+    @Override
+    protected void doStart() throws Exception {
+        super.doStart();
+
+        listener = queue.addItemListener(new CamelItemListener(this, 
cacheName), true);
+    }
+
+    /**
+     * @see org.apache.camel.support.DefaultConsumer#doStop()
+     */
+    @Override
+    protected void doStop() throws Exception {
+        queue.removeItemListener(listener);
+
+        super.doStop();
     }
 
 }
diff --git 
a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/map/HazelcastMapConsumer.java
 
b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/map/HazelcastMapConsumer.java
index 7076e89..2672e3c 100644
--- 
a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/map/HazelcastMapConsumer.java
+++ 
b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/map/HazelcastMapConsumer.java
@@ -16,6 +16,8 @@
  */
 package org.apache.camel.component.hazelcast.map;
 
+import java.util.UUID;
+
 import com.hazelcast.core.HazelcastInstance;
 import com.hazelcast.map.IMap;
 import org.apache.camel.Endpoint;
@@ -23,12 +25,36 @@ import org.apache.camel.Processor;
 import org.apache.camel.component.hazelcast.HazelcastDefaultConsumer;
 import org.apache.camel.component.hazelcast.listener.CamelMapListener;
 
+
 public class HazelcastMapConsumer extends HazelcastDefaultConsumer {
 
+    private final IMap<Object, Object> cache;
+
+    private UUID listener;
+
     public HazelcastMapConsumer(HazelcastInstance hazelcastInstance, Endpoint 
endpoint, Processor processor, String cacheName) {
         super(hazelcastInstance, endpoint, processor, cacheName);
 
-        IMap<Object, Object> cache = hazelcastInstance.getMap(cacheName);
-        cache.addEntryListener(new CamelMapListener(this, cacheName), true);
+        cache = hazelcastInstance.getMap(cacheName);
+    }
+
+    /**
+     * @see org.apache.camel.support.DefaultConsumer#doStart()
+     */
+    @Override
+    protected void doStart() throws Exception {
+        super.doStart();
+
+        listener = cache.addEntryListener(new CamelMapListener(this, 
cacheName), true);
+    }
+
+    /**
+     * @see org.apache.camel.support.DefaultConsumer#doStop()
+     */
+    @Override
+    protected void doStop() throws Exception {
+        cache.removeEntryListener(listener);
+
+        super.doStop();
     }
 }
diff --git 
a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/multimap/HazelcastMultimapConsumer.java
 
b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/multimap/HazelcastMultimapConsumer.java
index 0fe4c4b..29c0d6a 100644
--- 
a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/multimap/HazelcastMultimapConsumer.java
+++ 
b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/multimap/HazelcastMultimapConsumer.java
@@ -16,6 +16,8 @@
  */
 package org.apache.camel.component.hazelcast.multimap;
 
+import java.util.UUID;
+
 import com.hazelcast.core.HazelcastInstance;
 import com.hazelcast.multimap.MultiMap;
 import org.apache.camel.Endpoint;
@@ -25,12 +27,34 @@ import 
org.apache.camel.component.hazelcast.listener.CamelEntryListener;
 
 public class HazelcastMultimapConsumer extends HazelcastDefaultConsumer {
 
+    private final MultiMap<Object, Object> cache;
+
+    private UUID listener;
+
     public HazelcastMultimapConsumer(HazelcastInstance hazelcastInstance, 
Endpoint endpoint, Processor processor,
                                      String cacheName) {
         super(hazelcastInstance, endpoint, processor, cacheName);
 
-        MultiMap<Object, Object> cache = 
hazelcastInstance.getMultiMap(cacheName);
-        cache.addEntryListener(new CamelEntryListener(this, cacheName), true);
+        cache = hazelcastInstance.getMultiMap(cacheName);
+    }
+
+    /**
+     * @see org.apache.camel.support.DefaultConsumer#doStart()
+     */
+    @Override
+    protected void doStart() throws Exception {
+        super.doStart();
+
+        listener = cache.addEntryListener(new CamelEntryListener(this, 
cacheName), true);
     }
 
+    /**
+     * @see org.apache.camel.support.DefaultConsumer#doStop()
+     */
+    @Override
+    protected void doStop() throws Exception {
+        cache.removeEntryListener(listener);
+
+        super.doStop();
+    }
 }
diff --git 
a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/replicatedmap/HazelcastReplicatedmapConsumer.java
 
b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/replicatedmap/HazelcastReplicatedmapConsumer.java
index fd50fff..9251134 100644
--- 
a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/replicatedmap/HazelcastReplicatedmapConsumer.java
+++ 
b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/replicatedmap/HazelcastReplicatedmapConsumer.java
@@ -16,6 +16,8 @@
  */
 package org.apache.camel.component.hazelcast.replicatedmap;
 
+import java.util.UUID;
+
 import com.hazelcast.core.HazelcastInstance;
 import com.hazelcast.replicatedmap.ReplicatedMap;
 import org.apache.camel.Endpoint;
@@ -23,14 +25,37 @@ import org.apache.camel.Processor;
 import org.apache.camel.component.hazelcast.HazelcastDefaultConsumer;
 import org.apache.camel.component.hazelcast.listener.CamelEntryListener;
 
+
 public class HazelcastReplicatedmapConsumer extends HazelcastDefaultConsumer {
 
+    private final ReplicatedMap<Object, Object> cache;
+
+    private UUID listener;
+
     public HazelcastReplicatedmapConsumer(HazelcastInstance hazelcastInstance, 
Endpoint endpoint, Processor processor,
                                           String cacheName) {
         super(hazelcastInstance, endpoint, processor, cacheName);
 
-        ReplicatedMap<Object, Object> cache = 
hazelcastInstance.getReplicatedMap(cacheName);
-        cache.addEntryListener(new CamelEntryListener(this, cacheName), true);
+        cache = hazelcastInstance.getReplicatedMap(cacheName);
     }
 
+    /**
+     * @see org.apache.camel.support.DefaultConsumer#doStart()
+     */
+    @Override
+    protected void doStart() throws Exception {
+        super.doStart();
+
+        listener = cache.addEntryListener(new CamelEntryListener(this, 
cacheName), true);
+    }
+
+    /**
+     * @see org.apache.camel.support.DefaultConsumer#doStop()
+     */
+    @Override
+    protected void doStop() throws Exception {
+        cache.removeEntryListener(listener);
+
+        super.doStop();
+    }
 }
diff --git 
a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/set/HazelcastSetConsumer.java
 
b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/set/HazelcastSetConsumer.java
index 8468562..8a17cf9 100644
--- 
a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/set/HazelcastSetConsumer.java
+++ 
b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/set/HazelcastSetConsumer.java
@@ -16,6 +16,8 @@
  */
 package org.apache.camel.component.hazelcast.set;
 
+import java.util.UUID;
+
 import com.hazelcast.collection.ISet;
 import com.hazelcast.core.HazelcastInstance;
 import org.apache.camel.Consumer;
@@ -28,12 +30,33 @@ import 
org.apache.camel.component.hazelcast.listener.CamelItemListener;
  * Implementation of Hazelcast Set {@link Consumer}.
  */
 public class HazelcastSetConsumer extends HazelcastDefaultConsumer {
+    private final ISet<Object> set;
+
+    private UUID listener;
 
     public HazelcastSetConsumer(HazelcastInstance hazelcastInstance, Endpoint 
endpoint, Processor processor, String cacheName) {
         super(hazelcastInstance, endpoint, processor, cacheName);
 
-        ISet<Object> set = hazelcastInstance.getSet(cacheName);
-        set.addItemListener(new CamelItemListener(this, cacheName), true);
+        set = hazelcastInstance.getSet(cacheName);
     }
 
+    /**
+     * @see org.apache.camel.support.DefaultConsumer#doStart()
+     */
+    @Override
+    protected void doStart() throws Exception {
+        super.doStart();
+
+        listener = set.addItemListener(new CamelItemListener(this, cacheName), 
true);
+    }
+
+    /**
+     * @see org.apache.camel.support.DefaultConsumer#doStop()
+     */
+    @Override
+    protected void doStop() throws Exception {
+        set.removeItemListener(listener);
+
+        super.doStop();
+    }
 }
diff --git 
a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/topic/HazelcastTopicConsumer.java
 
b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/topic/HazelcastTopicConsumer.java
index e964988..71f6b56 100644
--- 
a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/topic/HazelcastTopicConsumer.java
+++ 
b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/topic/HazelcastTopicConsumer.java
@@ -16,6 +16,8 @@
  */
 package org.apache.camel.component.hazelcast.topic;
 
+import java.util.UUID;
+
 import com.hazelcast.core.HazelcastInstance;
 import com.hazelcast.topic.ITopic;
 import org.apache.camel.Endpoint;
@@ -23,21 +25,39 @@ import org.apache.camel.Processor;
 import org.apache.camel.component.hazelcast.HazelcastDefaultConsumer;
 import org.apache.camel.component.hazelcast.listener.CamelMessageListener;
 
+
 /**
  *
  */
 public class HazelcastTopicConsumer extends HazelcastDefaultConsumer {
 
+    private ITopic<Object> topic;
+
+    private UUID listener;
+
     public HazelcastTopicConsumer(HazelcastInstance hazelcastInstance, 
Endpoint endpoint, Processor processor, String cacheName,
                                   boolean reliable) {
         super(hazelcastInstance, endpoint, processor, cacheName);
-        ITopic<Object> topic;
         if (!reliable) {
             topic = hazelcastInstance.getTopic(cacheName);
         } else {
             topic = hazelcastInstance.getReliableTopic(cacheName);
         }
-        topic.addMessageListener(new CamelMessageListener(this, cacheName));
     }
 
+    /**
+     * @see org.apache.camel.support.DefaultConsumer#doStart()
+     */
+    @Override
+    protected void doStart() throws Exception {
+        super.doStart();
+
+        listener = topic.addMessageListener(new CamelMessageListener(this, 
cacheName));
+    }
+
+    protected void doStop() throws Exception {
+        topic.removeMessageListener(listener);
+
+        super.doStop();
+    }
 }

Reply via email to