CAMEL-9595: Camel-Kubernetes consumer should use DefaultConsumer and not 
ScheduledPollConsumer


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/3f6b909c
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/3f6b909c
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/3f6b909c

Branch: refs/heads/master
Commit: 3f6b909c3d4eddd1dc282ac065e90233809873f5
Parents: 5e83a39
Author: Andrea Cosentino <anco...@gmail.com>
Authored: Fri Feb 12 13:52:32 2016 +0100
Committer: Andrea Cosentino <anco...@gmail.com>
Committed: Fri Feb 12 14:15:15 2016 +0100

----------------------------------------------------------------------
 .../kubernetes/KubernetesConfiguration.java     |  16 +-
 .../kubernetes/KubernetesEndpoint.java          |   6 +
 .../consumer/KubernetesNamespacesConsumer.java  | 152 +++++++++++--------
 .../consumer/KubernetesPodsConsumer.java        | 144 ++++++++++--------
 ...ubernetesReplicationControllersConsumer.java | 150 ++++++++++--------
 .../consumer/KubernetesSecretsConsumer.java     | 142 +++++++++--------
 .../consumer/KubernetesServicesConsumer.java    | 142 +++++++++--------
 .../KubernetesNamespacesConsumerTest.java       |  10 +-
 .../consumer/KubernetesPodsConsumerTest.java    |   4 +-
 ...netesReplicationControllersConsumerTest.java |   3 +-
 .../consumer/KubernetesSecretsConsumerTest.java |   2 +-
 .../KubernetesServicesConsumerTest.java         |   2 +-
 12 files changed, 445 insertions(+), 328 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/3f6b909c/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/KubernetesConfiguration.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/KubernetesConfiguration.java
 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/KubernetesConfiguration.java
index 0d3cda4..f1a0c33 100644
--- 
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/KubernetesConfiguration.java
+++ 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/KubernetesConfiguration.java
@@ -86,6 +86,9 @@ public class KubernetesConfiguration {
 
     @UriParam(label = "consumer")
     private String namespaceName;
+    
+    @UriParam(label = "consumer", defaultValue = "1")
+    private int poolSize = 1;
 
     /**
      * Kubernetes Master url
@@ -284,8 +287,19 @@ public class KubernetesConfiguration {
     public void setNamespaceName(String namespaceName) {
         this.namespaceName = namespaceName;
     }
+    
+    /**
+     * The Consumer pool size
+     */
+    public int getPoolSize() {
+               return poolSize;
+       }
+
+       public void setPoolSize(int poolSize) {
+               this.poolSize = poolSize;
+       }
 
-    @Override
+       @Override
     public String toString() {
         return "KubernetesConfiguration [masterUrl=" + masterUrl + ", 
category=" + category + ", kubernetesClient="
                 + kubernetesClient + ", username=" + username + ", password=" 
+ password + ", operation=" + operation

http://git-wip-us.apache.org/repos/asf/camel/blob/3f6b909c/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/KubernetesEndpoint.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/KubernetesEndpoint.java
 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/KubernetesEndpoint.java
index 0f95f71..8e521a2 100644
--- 
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/KubernetesEndpoint.java
+++ 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/KubernetesEndpoint.java
@@ -20,6 +20,8 @@ import io.fabric8.kubernetes.client.Config;
 import io.fabric8.kubernetes.client.ConfigBuilder;
 import io.fabric8.kubernetes.client.DefaultKubernetesClient;
 
+import java.util.concurrent.ExecutorService;
+
 import org.apache.camel.Consumer;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
@@ -164,6 +166,10 @@ public class KubernetesEndpoint extends DefaultEndpoint {
         super.doStop();
         client.close();
     }
+    
+    public ExecutorService createExecutor() {
+        return 
getCamelContext().getExecutorServiceManager().newFixedThreadPool(this, 
"KubernetesConsumer", configuration.getPoolSize());
+    }
 
     public DefaultKubernetesClient getKubernetesClient() {
         return client;

http://git-wip-us.apache.org/repos/asf/camel/blob/3f6b909c/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesNamespacesConsumer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesNamespacesConsumer.java
 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesNamespacesConsumer.java
index f08c4dd..097b3a6 100644
--- 
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesNamespacesConsumer.java
+++ 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesNamespacesConsumer.java
@@ -16,31 +16,32 @@
  */
 package org.apache.camel.component.kubernetes.consumer;
 
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-import io.fabric8.kubernetes.api.model.Namespace;
-import io.fabric8.kubernetes.client.KubernetesClientException;
-import io.fabric8.kubernetes.client.Watcher;
+import java.util.concurrent.ExecutorService;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.component.kubernetes.KubernetesConstants;
 import org.apache.camel.component.kubernetes.KubernetesEndpoint;
 import org.apache.camel.component.kubernetes.consumer.common.NamespaceEvent;
-import org.apache.camel.impl.ScheduledPollConsumer;
+import org.apache.camel.impl.DefaultConsumer;
 import org.apache.camel.util.ObjectHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class KubernetesNamespacesConsumer extends ScheduledPollConsumer {
+import io.fabric8.kubernetes.api.model.Namespace;
+import io.fabric8.kubernetes.client.KubernetesClientException;
+import io.fabric8.kubernetes.client.Watcher;
+
+public class KubernetesNamespacesConsumer extends DefaultConsumer {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(KubernetesNamespacesConsumer.class);
-
-    private ConcurrentMap<Long, NamespaceEvent> map;
+    
+    private final Processor processor;
+    private ExecutorService executor;
 
     public KubernetesNamespacesConsumer(KubernetesEndpoint endpoint, Processor 
processor) {
         super(endpoint, processor);
+        this.processor = processor;
     }
 
     @Override
@@ -51,70 +52,87 @@ public class KubernetesNamespacesConsumer extends 
ScheduledPollConsumer {
     @Override
     protected void doStart() throws Exception {
         super.doStart();
-        map = new ConcurrentHashMap<Long, NamespaceEvent>();
-
-        if 
(ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getOauthToken()))
 {
-            if 
(ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getNamespaceName()))
 {
-                getEndpoint().getKubernetesClient().namespaces()
-                        
.withName(getEndpoint().getKubernetesConfiguration().getNamespaceName())
-                        .watch(new Watcher<Namespace>() {
-
-                            @Override
-                            public void 
eventReceived(io.fabric8.kubernetes.client.Watcher.Action action,
-                                    Namespace resource) {
-                                NamespaceEvent ne = new NamespaceEvent(action, 
resource);
-                                map.put(System.currentTimeMillis(), ne);
-                                
-                            }
-
-                            @Override
-                            public void onClose(KubernetesClientException 
cause) {
-                                if (cause != null) {
-                                    LOG.error(cause.getMessage(), cause);
-                                }                            
-                            }
-                        });
-            } else {
-                getEndpoint().getKubernetesClient().namespaces().watch(new 
Watcher<Namespace>() {
-
-                    @Override
-                    public void 
eventReceived(io.fabric8.kubernetes.client.Watcher.Action action,
-                            Namespace resource) {
-                        NamespaceEvent ne = new NamespaceEvent(action, 
resource);
-                        map.put(System.currentTimeMillis(), ne);
-                        
-                    }
-
-                    @Override
-                    public void onClose(KubernetesClientException cause) {
-                        if (cause != null) {
-                            LOG.error(cause.getMessage(), cause);
-                        }                            
-                    }
-                });
-            }
-        }
+        executor = getEndpoint().createExecutor();
+        
+        executor.submit(new NamespacesConsumerTask());
     }
 
     @Override
     protected void doStop() throws Exception {
         super.doStop();
-        map.clear();
+        
+        LOG.debug("Stopping Kubernetes Namespace Consumer");
+        if (executor != null) {
+            if (getEndpoint() != null && getEndpoint().getCamelContext() != 
null) {
+                
getEndpoint().getCamelContext().getExecutorServiceManager().shutdownNow(executor);
+            } else {
+                executor.shutdownNow();
+            }
+        }
+        executor = null;
     }
-
-    @Override
-    protected int poll() throws Exception {
-        int mapSize = map.size();
-        for (ConcurrentMap.Entry<Long, NamespaceEvent> entry : map.entrySet()) 
{
-            NamespaceEvent namespaceEvent = entry.getValue();
-            Exchange e = getEndpoint().createExchange();
-            e.getIn().setBody(namespaceEvent.getNamespace());
-            e.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_ACTION, 
namespaceEvent.getAction());
-            
e.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_TIMESTAMP, 
entry.getKey());
-            getProcessor().process(e);
-            map.remove(entry.getKey());
+    
+    class NamespacesConsumerTask implements Runnable {
+       
+        @Override
+        public void run() {
+               if 
(ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getOauthToken()))
 {
+                if 
(ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getNamespaceName()))
 {
+                    getEndpoint().getKubernetesClient().namespaces()
+                            
.withName(getEndpoint().getKubernetesConfiguration().getNamespaceName())
+                            .watch(new Watcher<Namespace>() {
+
+                                @Override
+                                public void 
eventReceived(io.fabric8.kubernetes.client.Watcher.Action action,
+                                        Namespace resource) {
+                                    NamespaceEvent ne = new 
NamespaceEvent(action, resource);
+                                    Exchange exchange = 
getEndpoint().createExchange();
+                                    
exchange.getIn().setBody(ne.getNamespace());
+                                    
exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_ACTION, 
ne.getAction());
+                                    
exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_TIMESTAMP, 
System.currentTimeMillis());
+                                    try {
+                                                                               
processor.process(exchange);
+                                                                       } catch 
(Exception e) {
+                                                   
getExceptionHandler().handleException("Error during processing", exchange, e);
+                                               }                               
    
+                                }
+
+                                @Override
+                                public void onClose(KubernetesClientException 
cause) {
+                                    if (cause != null) {
+                                        LOG.error(cause.getMessage(), cause);
+                                    }                            
+                                }
+                            });
+                } else {
+                    getEndpoint().getKubernetesClient().namespaces().watch(new 
Watcher<Namespace>() {
+
+                        @Override
+                        public void 
eventReceived(io.fabric8.kubernetes.client.Watcher.Action action,
+                                Namespace resource) {
+                            NamespaceEvent ne = new NamespaceEvent(action, 
resource);
+                            Exchange exchange = getEndpoint().createExchange();
+                            exchange.getIn().setBody(ne.getNamespace());
+                            
exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_ACTION, 
ne.getAction());
+                            
exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_TIMESTAMP, 
System.currentTimeMillis());
+                            try {
+                                                               
processor.process(exchange);
+                                                       } catch (Exception e) {
+                                   
getExceptionHandler().handleException("Error during processing", exchange, e);
+                               }      
+                            
+                        }
+
+                        @Override
+                        public void onClose(KubernetesClientException cause) {
+                            if (cause != null) {
+                                LOG.error(cause.getMessage(), cause);
+                            }                            
+                        }
+                    });
+                }
+            }
         }
-        return mapSize;
     }
 
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/3f6b909c/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesPodsConsumer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesPodsConsumer.java
 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesPodsConsumer.java
index bfa025e..186f824 100644
--- 
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesPodsConsumer.java
+++ 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesPodsConsumer.java
@@ -16,31 +16,32 @@
  */
 package org.apache.camel.component.kubernetes.consumer;
 
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-import io.fabric8.kubernetes.api.model.Pod;
-import io.fabric8.kubernetes.client.KubernetesClientException;
-import io.fabric8.kubernetes.client.Watcher;
+import java.util.concurrent.ExecutorService;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.component.kubernetes.KubernetesConstants;
 import org.apache.camel.component.kubernetes.KubernetesEndpoint;
 import org.apache.camel.component.kubernetes.consumer.common.PodEvent;
-import org.apache.camel.impl.ScheduledPollConsumer;
+import org.apache.camel.impl.DefaultConsumer;
 import org.apache.camel.util.ObjectHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class KubernetesPodsConsumer extends ScheduledPollConsumer {
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.client.KubernetesClientException;
+import io.fabric8.kubernetes.client.Watcher;
+
+public class KubernetesPodsConsumer extends DefaultConsumer {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(KubernetesPodsConsumer.class);
 
-    private ConcurrentMap<Long, PodEvent> map;
+    private final Processor processor;
+    private ExecutorService executor;
 
     public KubernetesPodsConsumer(KubernetesEndpoint endpoint, Processor 
processor) {
         super(endpoint, processor);
+        this.processor = processor;
     }
 
     @Override
@@ -51,68 +52,85 @@ public class KubernetesPodsConsumer extends 
ScheduledPollConsumer {
     @Override
     protected void doStart() throws Exception {
         super.doStart();
-        map = new ConcurrentHashMap<Long, PodEvent>();
-
-        if 
(ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getOauthToken()))
 {
-            if 
(ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getNamespaceName()))
 {
-                getEndpoint().getKubernetesClient().pods()
-                        
.inNamespace(getEndpoint().getKubernetesConfiguration().getNamespaceName())
-                        .watch(new Watcher<Pod>() {
-
-                            @Override
-                            public void 
eventReceived(io.fabric8.kubernetes.client.Watcher.Action action,
-                                    Pod resource) {
-                                PodEvent pe = new PodEvent(action, resource);
-                                map.put(System.currentTimeMillis(), pe);
-                            }
-
-                            @Override
-                            public void onClose(KubernetesClientException 
cause) {
-                                if (cause != null) {
-                                    LOG.error(cause.getMessage(), cause);
-                                }
+        executor = getEndpoint().createExecutor();
 
-                            }
-                        });
-            } else {
-                getEndpoint().getKubernetesClient().pods().watch(new 
Watcher<Pod>() {
-
-                    @Override
-                    public void 
eventReceived(io.fabric8.kubernetes.client.Watcher.Action action, Pod resource) 
{
-                        PodEvent pe = new PodEvent(action, resource);
-                        map.put(System.currentTimeMillis(), pe);
-                    }
-
-                    @Override
-                    public void onClose(KubernetesClientException cause) {
-                        if (cause != null) {
-                            LOG.error(cause.getMessage(), cause);
-                        }
-                    }
-                });
-            }
-        }
+        executor.submit(new PodsConsumerTask());
     }
 
     @Override
     protected void doStop() throws Exception {
         super.doStop();
-        map.clear();
-    }
 
-    @Override
-    protected int poll() throws Exception {
-        int mapSize = map.size();
-        for (ConcurrentMap.Entry<Long, PodEvent> entry : map.entrySet()) {
-            PodEvent podEvent = entry.getValue();
-            Exchange e = getEndpoint().createExchange();
-            e.getIn().setBody(podEvent.getPod());
-            e.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_ACTION, 
podEvent.getAction());
-            
e.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_TIMESTAMP, 
entry.getKey());
-            getProcessor().process(e);
-            map.remove(entry.getKey());
+        LOG.debug("Stopping Kubernetes Pods Consumer");
+        if (executor != null) {
+            if (getEndpoint() != null && getEndpoint().getCamelContext() != 
null) {
+                
getEndpoint().getCamelContext().getExecutorServiceManager().shutdownNow(executor);
+            } else {
+                executor.shutdownNow();
+            }
         }
-        return mapSize;
+        executor = null;
     }
 
+    class PodsConsumerTask implements Runnable {
+       
+        @Override
+        public void run() {
+            if 
(ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getOauthToken()))
 {
+                if 
(ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getNamespaceName()))
 {
+                    getEndpoint().getKubernetesClient().pods()
+                            
.inNamespace(getEndpoint().getKubernetesConfiguration().getNamespaceName())
+                            .watch(new Watcher<Pod>() {
+
+                                @Override
+                                public void 
eventReceived(io.fabric8.kubernetes.client.Watcher.Action action,
+                                        Pod resource) {
+                                    PodEvent pe = new PodEvent(action, 
resource);
+                                    Exchange exchange = 
getEndpoint().createExchange();
+                                    exchange.getIn().setBody(pe.getPod());
+                                    
exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_ACTION, 
pe.getAction());
+                                    
exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_TIMESTAMP, 
System.currentTimeMillis());
+                                    try {
+                                                                               
processor.process(exchange);
+                                                                       } catch 
(Exception e) {
+                                                                               
getExceptionHandler().handleException("Error during processing", exchange, e);
+                                                                       }
+                                }
+
+                                @Override
+                                public void onClose(KubernetesClientException 
cause) {
+                                    if (cause != null) {
+                                        LOG.error(cause.getMessage(), cause);
+                                    }
+
+                                }
+                            });
+                } else {
+                    getEndpoint().getKubernetesClient().pods().watch(new 
Watcher<Pod>() {
+
+                        @Override
+                        public void 
eventReceived(io.fabric8.kubernetes.client.Watcher.Action action, Pod resource) 
{
+                            PodEvent pe = new PodEvent(action, resource);
+                            Exchange exchange = getEndpoint().createExchange();
+                            exchange.getIn().setBody(pe.getPod());
+                            
exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_ACTION, 
pe.getAction());
+                            
exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_TIMESTAMP, 
System.currentTimeMillis());
+                            try {
+                                                               
processor.process(exchange);
+                                                       } catch (Exception e) {
+                                                               
getExceptionHandler().handleException("Error during processing", exchange, e);
+                                                       }
+                        }
+
+                        @Override
+                        public void onClose(KubernetesClientException cause) {
+                            if (cause != null) {
+                                LOG.error(cause.getMessage(), cause);
+                            }
+                        }
+                    });
+                }
+            }
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/3f6b909c/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesReplicationControllersConsumer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesReplicationControllersConsumer.java
 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesReplicationControllersConsumer.java
index e7fd348..2fd6847 100644
--- 
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesReplicationControllersConsumer.java
+++ 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesReplicationControllersConsumer.java
@@ -16,31 +16,32 @@
  */
 package org.apache.camel.component.kubernetes.consumer;
 
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-import io.fabric8.kubernetes.api.model.ReplicationController;
-import io.fabric8.kubernetes.client.KubernetesClientException;
-import io.fabric8.kubernetes.client.Watcher;
+import java.util.concurrent.ExecutorService;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.component.kubernetes.KubernetesConstants;
 import org.apache.camel.component.kubernetes.KubernetesEndpoint;
 import 
org.apache.camel.component.kubernetes.consumer.common.ReplicationControllerEvent;
-import org.apache.camel.impl.ScheduledPollConsumer;
+import org.apache.camel.impl.DefaultConsumer;
 import org.apache.camel.util.ObjectHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class KubernetesReplicationControllersConsumer extends 
ScheduledPollConsumer {
+import io.fabric8.kubernetes.api.model.ReplicationController;
+import io.fabric8.kubernetes.client.KubernetesClientException;
+import io.fabric8.kubernetes.client.Watcher;
+
+public class KubernetesReplicationControllersConsumer extends DefaultConsumer {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(KubernetesReplicationControllersConsumer.class);
 
-    private ConcurrentMap<Long, ReplicationControllerEvent> map;
+    private final Processor processor;
+    private ExecutorService executor;
 
     public KubernetesReplicationControllersConsumer(KubernetesEndpoint 
endpoint, Processor processor) {
         super(endpoint, processor);
+        this.processor = processor;
     }
 
     @Override
@@ -51,72 +52,91 @@ public class KubernetesReplicationControllersConsumer 
extends ScheduledPollConsu
     @Override
     protected void doStart() throws Exception {
         super.doStart();
-        map = new ConcurrentHashMap<Long, ReplicationControllerEvent>();
-
-        if 
(ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getOauthToken()))
 {
-            if 
(ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getNamespaceName()))
 {
-                getEndpoint().getKubernetesClient().replicationControllers()
-                        
.inNamespace(getEndpoint().getKubernetesConfiguration().getNamespaceName())
-                        .watch(new Watcher<ReplicationController>() {
-
-                            @Override
-                            public void 
eventReceived(io.fabric8.kubernetes.client.Watcher.Action action,
-                                    ReplicationController resource) {
-                                ReplicationControllerEvent rce = new 
ReplicationControllerEvent(action, resource);
-                                map.put(System.currentTimeMillis(), rce);
-
-                            }
-
-                            @Override
-                            public void onClose(KubernetesClientException 
cause) {
-                                if (cause != null) {
-                                    LOG.error(cause.getMessage(), cause);
-                                }
-                            }
-
-                        });
-            } else {
-                getEndpoint().getKubernetesClient().replicationControllers()
-                        .watch(new Watcher<ReplicationController>() {
 
-                            @Override
-                            public void 
eventReceived(io.fabric8.kubernetes.client.Watcher.Action action,
-                                    ReplicationController resource) {
-                                ReplicationControllerEvent se = new 
ReplicationControllerEvent(action, resource);
-                                map.put(System.currentTimeMillis(), se);
+        executor = getEndpoint().createExecutor();
 
-                            }
-
-                            @Override
-                            public void onClose(KubernetesClientException 
cause) {
-                                if (cause != null) {
-                                    LOG.error(cause.getMessage(), cause);
-                                }
-                            }
-                        });
-            }
-        }
+        executor.submit(new ReplicationControllersConsumerTask());       
     }
 
     @Override
     protected void doStop() throws Exception {
         super.doStop();
-        map.clear();
+        
+        LOG.debug("Stopping Kubernetes Replication Controllers Consumer");
+        if (executor != null) {
+            if (getEndpoint() != null && getEndpoint().getCamelContext() != 
null) {
+                
getEndpoint().getCamelContext().getExecutorServiceManager().shutdownNow(executor);
+            } else {
+                executor.shutdownNow();
+            }
+        }
+        executor = null;
     }
+    
+    class ReplicationControllersConsumerTask implements Runnable {
+       
+        @Override
+        public void run() {
+               if 
(ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getOauthToken()))
 {
+                if 
(ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getNamespaceName()))
 {
+                    
getEndpoint().getKubernetesClient().replicationControllers()
+                            
.inNamespace(getEndpoint().getKubernetesConfiguration().getNamespaceName())
+                            .watch(new Watcher<ReplicationController>() {
+
+                                @Override
+                                public void 
eventReceived(io.fabric8.kubernetes.client.Watcher.Action action,
+                                        ReplicationController resource) {
+                                    ReplicationControllerEvent rce = new 
ReplicationControllerEvent(action, resource);
+                                    Exchange exchange = 
getEndpoint().createExchange();
+                                    
exchange.getIn().setBody(rce.getReplicationController());
+                                    
exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_ACTION, 
rce.getAction());
+                                    
exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_TIMESTAMP, 
System.currentTimeMillis());
+                                    try {
+                                                                       
processor.process(exchange);
+                                                               } catch 
(Exception e) {
+                                                                       
getExceptionHandler().handleException("Error during processing", exchange, e);
+                                                               }
 
-    @Override
-    protected int poll() throws Exception {
-        int mapSize = map.size();
-        for (ConcurrentMap.Entry<Long, ReplicationControllerEvent> entry : 
map.entrySet()) {
-            ReplicationControllerEvent serviceEvent = entry.getValue();
-            Exchange e = getEndpoint().createExchange();
-            e.getIn().setBody(serviceEvent.getReplicationController());
-            e.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_ACTION, 
serviceEvent.getAction());
-            
e.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_TIMESTAMP, 
entry.getKey());
-            getProcessor().process(e);
-            map.remove(entry.getKey());
+                                }
+
+                                @Override
+                                public void onClose(KubernetesClientException 
cause) {
+                                    if (cause != null) {
+                                        LOG.error(cause.getMessage(), cause);
+                                    }
+                                }
+
+                            });
+                } else {
+                    
getEndpoint().getKubernetesClient().replicationControllers()
+                            .watch(new Watcher<ReplicationController>() {
+
+                                @Override
+                                public void 
eventReceived(io.fabric8.kubernetes.client.Watcher.Action action,
+                                        ReplicationController resource) {
+                                    ReplicationControllerEvent se = new 
ReplicationControllerEvent(action, resource);
+                                    ReplicationControllerEvent rce = new 
ReplicationControllerEvent(action, resource);
+                                    Exchange exchange = 
getEndpoint().createExchange();
+                                    
exchange.getIn().setBody(rce.getReplicationController());
+                                    
exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_ACTION, 
rce.getAction());
+                                    
exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_TIMESTAMP, 
System.currentTimeMillis());
+                                    try {
+                                                                       
processor.process(exchange);
+                                                               } catch 
(Exception e) {
+                                                                       
getExceptionHandler().handleException("Error during processing", exchange, e);
+                                                               }
+                                }
+
+                                @Override
+                                public void onClose(KubernetesClientException 
cause) {
+                                    if (cause != null) {
+                                        LOG.error(cause.getMessage(), cause);
+                                    }
+                                }
+                            });
+                }
+            }
         }
-        return mapSize;
     }
 
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/3f6b909c/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesSecretsConsumer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesSecretsConsumer.java
 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesSecretsConsumer.java
index 1a47321..c9172e7 100644
--- 
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesSecretsConsumer.java
+++ 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesSecretsConsumer.java
@@ -16,31 +16,32 @@
  */
 package org.apache.camel.component.kubernetes.consumer;
 
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-import io.fabric8.kubernetes.api.model.Secret;
-import io.fabric8.kubernetes.client.KubernetesClientException;
-import io.fabric8.kubernetes.client.Watcher;
+import java.util.concurrent.ExecutorService;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.component.kubernetes.KubernetesConstants;
 import org.apache.camel.component.kubernetes.KubernetesEndpoint;
 import org.apache.camel.component.kubernetes.consumer.common.SecretEvent;
-import org.apache.camel.impl.ScheduledPollConsumer;
+import org.apache.camel.impl.DefaultConsumer;
 import org.apache.camel.util.ObjectHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class KubernetesSecretsConsumer extends ScheduledPollConsumer {
+import io.fabric8.kubernetes.api.model.Secret;
+import io.fabric8.kubernetes.client.KubernetesClientException;
+import io.fabric8.kubernetes.client.Watcher;
+
+public class KubernetesSecretsConsumer extends DefaultConsumer {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(KubernetesSecretsConsumer.class);
 
-    private ConcurrentMap<Long, SecretEvent> map;
+    private final Processor processor;
+    private ExecutorService executor;
 
     public KubernetesSecretsConsumer(KubernetesEndpoint endpoint, Processor 
processor) {
         super(endpoint, processor);
+        this.processor = processor;
     }
 
     @Override
@@ -51,68 +52,87 @@ public class KubernetesSecretsConsumer extends 
ScheduledPollConsumer {
     @Override
     protected void doStart() throws Exception {
         super.doStart();
-        map = new ConcurrentHashMap<Long, SecretEvent>();
-
-        if 
(ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getOauthToken()))
 {
-            if 
(ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getNamespaceName()))
 {
-                getEndpoint().getKubernetesClient().secrets()
-                        
.inNamespace(getEndpoint().getKubernetesConfiguration().getNamespaceName())
-                        .watch(new Watcher<Secret>() {
-
-                            @Override
-                            public void 
eventReceived(io.fabric8.kubernetes.client.Watcher.Action action,
-                                    Secret resource) {
-                                SecretEvent se = new SecretEvent(action, 
resource);
-                                map.put(System.currentTimeMillis(), se);
-                            }
+        executor = getEndpoint().createExecutor();
 
-                            @Override
-                            public void onClose(KubernetesClientException 
cause) {
-                                if (cause != null) {
-                                    LOG.error(cause.getMessage(), cause);
-                                }
+        executor.submit(new SecretsConsumerTask());       
 
-                            }
-                        });
-            } else {
-                getEndpoint().getKubernetesClient().secrets().watch(new 
Watcher<Secret>() {
-
-                    @Override
-                    public void 
eventReceived(io.fabric8.kubernetes.client.Watcher.Action action, Secret 
resource) {
-                        SecretEvent se = new SecretEvent(action, resource);
-                        map.put(System.currentTimeMillis(), se);
-                    }
-
-                    @Override
-                    public void onClose(KubernetesClientException cause) {
-                        if (cause != null) {
-                            LOG.error(cause.getMessage(), cause);
-                        }
-                    }
-                });
-            }
-        }
     }
 
     @Override
     protected void doStop() throws Exception {
         super.doStop();
-        map.clear();
+        
+        LOG.debug("Stopping Kubernetes Secrets Consumer");
+        if (executor != null) {
+            if (getEndpoint() != null && getEndpoint().getCamelContext() != 
null) {
+                
getEndpoint().getCamelContext().getExecutorServiceManager().shutdownNow(executor);
+            } else {
+                executor.shutdownNow();
+            }
+        }
+        executor = null;
     }
+    
+    class SecretsConsumerTask implements Runnable {
+       
+        @Override
+        public void run() {
+            if 
(ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getOauthToken()))
 {
+                if 
(ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getNamespaceName()))
 {
+                    getEndpoint().getKubernetesClient().secrets()
+                            
.inNamespace(getEndpoint().getKubernetesConfiguration().getNamespaceName())
+                            .watch(new Watcher<Secret>() {
+
+                                @Override
+                                public void 
eventReceived(io.fabric8.kubernetes.client.Watcher.Action action,
+                                        Secret resource) {
+                                    SecretEvent se = new SecretEvent(action, 
resource);
+                                    Exchange exchange = 
getEndpoint().createExchange();
+                                    exchange.getIn().setBody(se.getSecret());
+                                    
exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_ACTION, 
se.getAction());
+                                    
exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_TIMESTAMP, 
System.currentTimeMillis());
+                                    try {
+                                                                       
processor.process(exchange);
+                                                               } catch 
(Exception e) {
+                                                                       
getExceptionHandler().handleException("Error during processing", exchange, e);
+                                                               }
+                                }
 
-    @Override
-    protected int poll() throws Exception {
-        int mapSize = map.size();
-        for (ConcurrentMap.Entry<Long, SecretEvent> entry : map.entrySet()) {
-            SecretEvent podEvent = entry.getValue();
-            Exchange e = getEndpoint().createExchange();
-            e.getIn().setBody(podEvent.getSecret());
-            e.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_ACTION, 
podEvent.getAction());
-            
e.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_TIMESTAMP, 
entry.getKey());
-            getProcessor().process(e);
-            map.remove(entry.getKey());
+                                @Override
+                                public void onClose(KubernetesClientException 
cause) {
+                                    if (cause != null) {
+                                        LOG.error(cause.getMessage(), cause);
+                                    }
+
+                                }
+                            });
+                } else {
+                    getEndpoint().getKubernetesClient().secrets().watch(new 
Watcher<Secret>() {
+
+                        @Override
+                        public void 
eventReceived(io.fabric8.kubernetes.client.Watcher.Action action, Secret 
resource) {
+                            SecretEvent se = new SecretEvent(action, resource);
+                            Exchange exchange = getEndpoint().createExchange();
+                            exchange.getIn().setBody(se.getSecret());
+                            
exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_ACTION, 
se.getAction());
+                            
exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_TIMESTAMP, 
System.currentTimeMillis());
+                            try {
+                                                               
processor.process(exchange);
+                                                       } catch (Exception e) {
+                                                               
getExceptionHandler().handleException("Error during processing", exchange, e);
+                                                       }
+                        }
+
+                        @Override
+                        public void onClose(KubernetesClientException cause) {
+                            if (cause != null) {
+                                LOG.error(cause.getMessage(), cause);
+                            }
+                        }
+                    });
+                }
+            }
         }
-        return mapSize;
     }
 
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/3f6b909c/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesServicesConsumer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesServicesConsumer.java
 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesServicesConsumer.java
index b32a06e..8df7014 100644
--- 
a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesServicesConsumer.java
+++ 
b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesServicesConsumer.java
@@ -16,31 +16,32 @@
  */
 package org.apache.camel.component.kubernetes.consumer;
 
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-import io.fabric8.kubernetes.api.model.Service;
-import io.fabric8.kubernetes.client.KubernetesClientException;
-import io.fabric8.kubernetes.client.Watcher;
+import java.util.concurrent.ExecutorService;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.component.kubernetes.KubernetesConstants;
 import org.apache.camel.component.kubernetes.KubernetesEndpoint;
 import org.apache.camel.component.kubernetes.consumer.common.ServiceEvent;
-import org.apache.camel.impl.ScheduledPollConsumer;
+import org.apache.camel.impl.DefaultConsumer;
 import org.apache.camel.util.ObjectHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class KubernetesServicesConsumer extends ScheduledPollConsumer {
+import io.fabric8.kubernetes.api.model.Service;
+import io.fabric8.kubernetes.client.KubernetesClientException;
+import io.fabric8.kubernetes.client.Watcher;
+
+public class KubernetesServicesConsumer extends DefaultConsumer {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(KubernetesServicesConsumer.class);
 
-    private ConcurrentMap<Long, ServiceEvent> map;
+    private final Processor processor;
+    private ExecutorService executor;
 
     public KubernetesServicesConsumer(KubernetesEndpoint endpoint, Processor 
processor) {
         super(endpoint, processor);
+        this.processor = processor;
     }
 
     @Override
@@ -51,70 +52,87 @@ public class KubernetesServicesConsumer extends 
ScheduledPollConsumer {
     @Override
     protected void doStart() throws Exception {
         super.doStart();
-        map = new ConcurrentHashMap<Long, ServiceEvent>();
+        executor = getEndpoint().createExecutor();
 
-        if 
(ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getOauthToken()))
 {
-            if 
(ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getNamespaceName()))
 {
-                getEndpoint().getKubernetesClient().services()
-                        
.inNamespace(getEndpoint().getKubernetesConfiguration().getNamespaceName())
-                        .watch(new Watcher<Service>() {
+        executor.submit(new ServicesConsumerTask());       
 
-                            @Override
-                            public void 
eventReceived(io.fabric8.kubernetes.client.Watcher.Action action,
-                                    Service resource) {
-                                ServiceEvent se = new ServiceEvent(action, 
resource);
-                                map.put(System.currentTimeMillis(), se);
+    }
 
-                            }
+    @Override
+    protected void doStop() throws Exception {
+        super.doStop();
+        LOG.debug("Stopping Kubernetes Services Consumer");
+        if (executor != null) {
+            if (getEndpoint() != null && getEndpoint().getCamelContext() != 
null) {
+                
getEndpoint().getCamelContext().getExecutorServiceManager().shutdownNow(executor);
+            } else {
+                executor.shutdownNow();
+            }
+        }
+        executor = null;
+    }
+    
+    class ServicesConsumerTask implements Runnable {
+       
+        @Override
+        public void run() {
+            if 
(ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getOauthToken()))
 {
+                if 
(ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getNamespaceName()))
 {
+                    getEndpoint().getKubernetesClient().services()
+                            
.inNamespace(getEndpoint().getKubernetesConfiguration().getNamespaceName())
+                            .watch(new Watcher<Service>() {
+
+                                @Override
+                                public void 
eventReceived(io.fabric8.kubernetes.client.Watcher.Action action,
+                                        Service resource) {
+                                    ServiceEvent se = new ServiceEvent(action, 
resource);
+                                    Exchange exchange = 
getEndpoint().createExchange();
+                                    exchange.getIn().setBody(se.getService());
+                                    
exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_ACTION, 
se.getAction());
+                                    
exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_TIMESTAMP, 
System.currentTimeMillis());
+                                    try {
+                                                                       
processor.process(exchange);
+                                                               } catch 
(Exception e) {
+                                                                       
getExceptionHandler().handleException("Error during processing", exchange, e);
+                                                               }
 
-                            @Override
-                            public void onClose(KubernetesClientException 
cause) {
-                                if (cause != null) {
-                                    LOG.error(cause.getMessage(), cause);
                                 }
-                            }
-
-                        });
-            } else {
-                getEndpoint().getKubernetesClient().services().watch(new 
Watcher<Service>() {
 
-                    @Override
-                    public void 
eventReceived(io.fabric8.kubernetes.client.Watcher.Action action, Service 
resource) {
-                        ServiceEvent se = new ServiceEvent(action, resource);
-                        map.put(System.currentTimeMillis(), se);
+                                @Override
+                                public void onClose(KubernetesClientException 
cause) {
+                                    if (cause != null) {
+                                        LOG.error(cause.getMessage(), cause);
+                                    }
+                                }
 
-                    }
+                            });
+                } else {
+                    getEndpoint().getKubernetesClient().services().watch(new 
Watcher<Service>() {
+
+                        @Override
+                        public void 
eventReceived(io.fabric8.kubernetes.client.Watcher.Action action, Service 
resource) {
+                            ServiceEvent se = new ServiceEvent(action, 
resource);
+                            Exchange exchange = getEndpoint().createExchange();
+                            exchange.getIn().setBody(se.getService());
+                            
exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_ACTION, 
se.getAction());
+                            
exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_TIMESTAMP, 
System.currentTimeMillis());
+                            try {
+                                                               
processor.process(exchange);
+                                                       } catch (Exception e) {
+                                                               
getExceptionHandler().handleException("Error during processing", exchange, e);
+                                                       }
+                        }
 
-                    @Override
-                    public void onClose(KubernetesClientException cause) {
-                        if (cause != null) {
-                            LOG.error(cause.getMessage(), cause);
+                        @Override
+                        public void onClose(KubernetesClientException cause) {
+                            if (cause != null) {
+                                LOG.error(cause.getMessage(), cause);
+                            }
                         }
-                    }
-                });
+                    });
+                }
             }
         }
     }
 
-    @Override
-    protected void doStop() throws Exception {
-        super.doStop();
-        map.clear();
-    }
-
-    @Override
-    protected int poll() throws Exception {
-        int mapSize = map.size();
-        for (ConcurrentMap.Entry<Long, ServiceEvent> entry : map.entrySet()) {
-            ServiceEvent serviceEvent = entry.getValue();
-            Exchange e = getEndpoint().createExchange();
-            e.getIn().setBody(serviceEvent.getService());
-            e.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_ACTION, 
serviceEvent.getAction());
-            
e.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_TIMESTAMP, 
entry.getKey());
-            getProcessor().process(e);
-            map.remove(entry.getKey());
-        }
-        return mapSize;
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/3f6b909c/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/consumer/KubernetesNamespacesConsumerTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/consumer/KubernetesNamespacesConsumerTest.java
 
b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/consumer/KubernetesNamespacesConsumerTest.java
index a583e05..9f18a9c 100644
--- 
a/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/consumer/KubernetesNamespacesConsumerTest.java
+++ 
b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/consumer/KubernetesNamespacesConsumerTest.java
@@ -21,8 +21,6 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
-import io.fabric8.kubernetes.api.model.Namespace;
-
 import org.apache.camel.EndpointInject;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
@@ -34,6 +32,8 @@ import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.util.ObjectHelper;
 import org.junit.Test;
 
+import io.fabric8.kubernetes.api.model.Namespace;
+
 public class KubernetesNamespacesConsumerTest extends KubernetesTestSupport {
 
     @EndpointInject(uri = "mock:result")
@@ -45,9 +45,9 @@ public class KubernetesNamespacesConsumerTest extends 
KubernetesTestSupport {
             return;
         }
 
-        mockResultEndpoint.expectedMessageCount(3);
+        mockResultEndpoint.expectedMessageCount(5);
         
mockResultEndpoint.expectedHeaderValuesReceivedInAnyOrder(KubernetesConstants.KUBERNETES_EVENT_ACTION,
 "ADDED",
-                "MODIFIED", "DELETED");
+                "MODIFIED", "MODIFIED", "MODIFIED", "DELETED");
         
         Exchange ex = template.request("direct:createNamespace",
                 new Processor() {
@@ -109,6 +109,8 @@ public class KubernetesNamespacesConsumerTest extends 
KubernetesTestSupport {
         boolean nsDeleted = ex.getOut().getBody(Boolean.class);
 
         assertTrue(nsDeleted);
+        
+        Thread.sleep(3000);
 
         mockResultEndpoint.assertIsSatisfied();
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/3f6b909c/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/consumer/KubernetesPodsConsumerTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/consumer/KubernetesPodsConsumerTest.java
 
b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/consumer/KubernetesPodsConsumerTest.java
index 8c7af78..4c9fabc 100644
--- 
a/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/consumer/KubernetesPodsConsumerTest.java
+++ 
b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/consumer/KubernetesPodsConsumerTest.java
@@ -50,7 +50,7 @@ public class KubernetesPodsConsumerTest extends 
KubernetesTestSupport {
 
         mockResultEndpoint.expectedMessageCount(3);
         
mockResultEndpoint.expectedHeaderValuesReceivedInAnyOrder(KubernetesConstants.KUBERNETES_EVENT_ACTION,
 "ADDED",
-                "MODIFIED", "DELETED");
+                "MODIFIED", "MODIFIED");
         Exchange ex = template.request("direct:createPod", new Processor() {
 
             @Override
@@ -100,7 +100,7 @@ public class KubernetesPodsConsumerTest extends 
KubernetesTestSupport {
 
         assertTrue(podDeleted);
 
-        Thread.sleep(1 * 1000);
+        Thread.sleep(3000);
 
         mockResultEndpoint.assertIsSatisfied();
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/3f6b909c/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/consumer/KubernetesReplicationControllersConsumerTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/consumer/KubernetesReplicationControllersConsumerTest.java
 
b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/consumer/KubernetesReplicationControllersConsumerTest.java
index 2c6cf6d..814a20a 100644
--- 
a/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/consumer/KubernetesReplicationControllersConsumerTest.java
+++ 
b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/consumer/KubernetesReplicationControllersConsumerTest.java
@@ -17,6 +17,7 @@
 package org.apache.camel.component.kubernetes.consumer;
 
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.Map;
 
 import io.fabric8.kubernetes.api.model.EditablePodTemplateSpec;
@@ -89,7 +90,7 @@ public class KubernetesReplicationControllersConsumerTest 
extends KubernetesTest
 
         assertTrue(rcDeleted);
 
-        Thread.sleep(1 * 1000);
+        Thread.sleep(3000);
 
         mockResultEndpoint.assertIsSatisfied();
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/3f6b909c/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/consumer/KubernetesSecretsConsumerTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/consumer/KubernetesSecretsConsumerTest.java
 
b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/consumer/KubernetesSecretsConsumerTest.java
index 25e3add..ac576aa 100644
--- 
a/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/consumer/KubernetesSecretsConsumerTest.java
+++ 
b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/consumer/KubernetesSecretsConsumerTest.java
@@ -87,7 +87,7 @@ public class KubernetesSecretsConsumerTest extends 
KubernetesTestSupport {
 
         assertTrue(secDeleted);
 
-        Thread.sleep(1 * 1000);
+        Thread.sleep(3000);
 
         mockResultEndpoint.assertIsSatisfied();
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/3f6b909c/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/consumer/KubernetesServicesConsumerTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/consumer/KubernetesServicesConsumerTest.java
 
b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/consumer/KubernetesServicesConsumerTest.java
index ac2bb76..61f57fb 100644
--- 
a/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/consumer/KubernetesServicesConsumerTest.java
+++ 
b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/consumer/KubernetesServicesConsumerTest.java
@@ -92,7 +92,7 @@ public class KubernetesServicesConsumerTest extends 
KubernetesTestSupport {
 
         assertTrue(servDeleted);
 
-        Thread.sleep(1 * 1000);
+        Thread.sleep(3000);
 
         mockResultEndpoint.assertIsSatisfied();
     }

Reply via email to