CAMEL-9595: Camel-Kubernetes consumer should use DefaultConsumer and not ScheduledPollConsumer
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/3f6b909c Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/3f6b909c Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/3f6b909c Branch: refs/heads/master Commit: 3f6b909c3d4eddd1dc282ac065e90233809873f5 Parents: 5e83a39 Author: Andrea Cosentino <anco...@gmail.com> Authored: Fri Feb 12 13:52:32 2016 +0100 Committer: Andrea Cosentino <anco...@gmail.com> Committed: Fri Feb 12 14:15:15 2016 +0100 ---------------------------------------------------------------------- .../kubernetes/KubernetesConfiguration.java | 16 +- .../kubernetes/KubernetesEndpoint.java | 6 + .../consumer/KubernetesNamespacesConsumer.java | 152 +++++++++++-------- .../consumer/KubernetesPodsConsumer.java | 144 ++++++++++-------- ...ubernetesReplicationControllersConsumer.java | 150 ++++++++++-------- .../consumer/KubernetesSecretsConsumer.java | 142 +++++++++-------- .../consumer/KubernetesServicesConsumer.java | 142 +++++++++-------- .../KubernetesNamespacesConsumerTest.java | 10 +- .../consumer/KubernetesPodsConsumerTest.java | 4 +- ...netesReplicationControllersConsumerTest.java | 3 +- .../consumer/KubernetesSecretsConsumerTest.java | 2 +- .../KubernetesServicesConsumerTest.java | 2 +- 12 files changed, 445 insertions(+), 328 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/3f6b909c/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/KubernetesConfiguration.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/KubernetesConfiguration.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/KubernetesConfiguration.java index 0d3cda4..f1a0c33 100644 --- a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/KubernetesConfiguration.java +++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/KubernetesConfiguration.java @@ -86,6 +86,9 @@ public class KubernetesConfiguration { @UriParam(label = "consumer") private String namespaceName; + + @UriParam(label = "consumer", defaultValue = "1") + private int poolSize = 1; /** * Kubernetes Master url @@ -284,8 +287,19 @@ public class KubernetesConfiguration { public void setNamespaceName(String namespaceName) { this.namespaceName = namespaceName; } + + /** + * The Consumer pool size + */ + public int getPoolSize() { + return poolSize; + } + + public void setPoolSize(int poolSize) { + this.poolSize = poolSize; + } - @Override + @Override public String toString() { return "KubernetesConfiguration [masterUrl=" + masterUrl + ", category=" + category + ", kubernetesClient=" + kubernetesClient + ", username=" + username + ", password=" + password + ", operation=" + operation http://git-wip-us.apache.org/repos/asf/camel/blob/3f6b909c/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/KubernetesEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/KubernetesEndpoint.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/KubernetesEndpoint.java index 0f95f71..8e521a2 100644 --- a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/KubernetesEndpoint.java +++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/KubernetesEndpoint.java @@ -20,6 +20,8 @@ import io.fabric8.kubernetes.client.Config; import io.fabric8.kubernetes.client.ConfigBuilder; import io.fabric8.kubernetes.client.DefaultKubernetesClient; +import java.util.concurrent.ExecutorService; + import org.apache.camel.Consumer; import org.apache.camel.Processor; import org.apache.camel.Producer; @@ -164,6 +166,10 @@ public class KubernetesEndpoint extends DefaultEndpoint { super.doStop(); client.close(); } + + public ExecutorService createExecutor() { + return getCamelContext().getExecutorServiceManager().newFixedThreadPool(this, "KubernetesConsumer", configuration.getPoolSize()); + } public DefaultKubernetesClient getKubernetesClient() { return client; http://git-wip-us.apache.org/repos/asf/camel/blob/3f6b909c/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesNamespacesConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesNamespacesConsumer.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesNamespacesConsumer.java index f08c4dd..097b3a6 100644 --- a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesNamespacesConsumer.java +++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesNamespacesConsumer.java @@ -16,31 +16,32 @@ */ package org.apache.camel.component.kubernetes.consumer; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - -import io.fabric8.kubernetes.api.model.Namespace; -import io.fabric8.kubernetes.client.KubernetesClientException; -import io.fabric8.kubernetes.client.Watcher; +import java.util.concurrent.ExecutorService; import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.component.kubernetes.KubernetesConstants; import org.apache.camel.component.kubernetes.KubernetesEndpoint; import org.apache.camel.component.kubernetes.consumer.common.NamespaceEvent; -import org.apache.camel.impl.ScheduledPollConsumer; +import org.apache.camel.impl.DefaultConsumer; import org.apache.camel.util.ObjectHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class KubernetesNamespacesConsumer extends ScheduledPollConsumer { +import io.fabric8.kubernetes.api.model.Namespace; +import io.fabric8.kubernetes.client.KubernetesClientException; +import io.fabric8.kubernetes.client.Watcher; + +public class KubernetesNamespacesConsumer extends DefaultConsumer { private static final Logger LOG = LoggerFactory.getLogger(KubernetesNamespacesConsumer.class); - - private ConcurrentMap<Long, NamespaceEvent> map; + + private final Processor processor; + private ExecutorService executor; public KubernetesNamespacesConsumer(KubernetesEndpoint endpoint, Processor processor) { super(endpoint, processor); + this.processor = processor; } @Override @@ -51,70 +52,87 @@ public class KubernetesNamespacesConsumer extends ScheduledPollConsumer { @Override protected void doStart() throws Exception { super.doStart(); - map = new ConcurrentHashMap<Long, NamespaceEvent>(); - - if (ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getOauthToken())) { - if (ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getNamespaceName())) { - getEndpoint().getKubernetesClient().namespaces() - .withName(getEndpoint().getKubernetesConfiguration().getNamespaceName()) - .watch(new Watcher<Namespace>() { - - @Override - public void eventReceived(io.fabric8.kubernetes.client.Watcher.Action action, - Namespace resource) { - NamespaceEvent ne = new NamespaceEvent(action, resource); - map.put(System.currentTimeMillis(), ne); - - } - - @Override - public void onClose(KubernetesClientException cause) { - if (cause != null) { - LOG.error(cause.getMessage(), cause); - } - } - }); - } else { - getEndpoint().getKubernetesClient().namespaces().watch(new Watcher<Namespace>() { - - @Override - public void eventReceived(io.fabric8.kubernetes.client.Watcher.Action action, - Namespace resource) { - NamespaceEvent ne = new NamespaceEvent(action, resource); - map.put(System.currentTimeMillis(), ne); - - } - - @Override - public void onClose(KubernetesClientException cause) { - if (cause != null) { - LOG.error(cause.getMessage(), cause); - } - } - }); - } - } + executor = getEndpoint().createExecutor(); + + executor.submit(new NamespacesConsumerTask()); } @Override protected void doStop() throws Exception { super.doStop(); - map.clear(); + + LOG.debug("Stopping Kubernetes Namespace Consumer"); + if (executor != null) { + if (getEndpoint() != null && getEndpoint().getCamelContext() != null) { + getEndpoint().getCamelContext().getExecutorServiceManager().shutdownNow(executor); + } else { + executor.shutdownNow(); + } + } + executor = null; } - - @Override - protected int poll() throws Exception { - int mapSize = map.size(); - for (ConcurrentMap.Entry<Long, NamespaceEvent> entry : map.entrySet()) { - NamespaceEvent namespaceEvent = entry.getValue(); - Exchange e = getEndpoint().createExchange(); - e.getIn().setBody(namespaceEvent.getNamespace()); - e.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_ACTION, namespaceEvent.getAction()); - e.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_TIMESTAMP, entry.getKey()); - getProcessor().process(e); - map.remove(entry.getKey()); + + class NamespacesConsumerTask implements Runnable { + + @Override + public void run() { + if (ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getOauthToken())) { + if (ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getNamespaceName())) { + getEndpoint().getKubernetesClient().namespaces() + .withName(getEndpoint().getKubernetesConfiguration().getNamespaceName()) + .watch(new Watcher<Namespace>() { + + @Override + public void eventReceived(io.fabric8.kubernetes.client.Watcher.Action action, + Namespace resource) { + NamespaceEvent ne = new NamespaceEvent(action, resource); + Exchange exchange = getEndpoint().createExchange(); + exchange.getIn().setBody(ne.getNamespace()); + exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_ACTION, ne.getAction()); + exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_TIMESTAMP, System.currentTimeMillis()); + try { + processor.process(exchange); + } catch (Exception e) { + getExceptionHandler().handleException("Error during processing", exchange, e); + } + } + + @Override + public void onClose(KubernetesClientException cause) { + if (cause != null) { + LOG.error(cause.getMessage(), cause); + } + } + }); + } else { + getEndpoint().getKubernetesClient().namespaces().watch(new Watcher<Namespace>() { + + @Override + public void eventReceived(io.fabric8.kubernetes.client.Watcher.Action action, + Namespace resource) { + NamespaceEvent ne = new NamespaceEvent(action, resource); + Exchange exchange = getEndpoint().createExchange(); + exchange.getIn().setBody(ne.getNamespace()); + exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_ACTION, ne.getAction()); + exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_TIMESTAMP, System.currentTimeMillis()); + try { + processor.process(exchange); + } catch (Exception e) { + getExceptionHandler().handleException("Error during processing", exchange, e); + } + + } + + @Override + public void onClose(KubernetesClientException cause) { + if (cause != null) { + LOG.error(cause.getMessage(), cause); + } + } + }); + } + } } - return mapSize; } } http://git-wip-us.apache.org/repos/asf/camel/blob/3f6b909c/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesPodsConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesPodsConsumer.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesPodsConsumer.java index bfa025e..186f824 100644 --- a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesPodsConsumer.java +++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesPodsConsumer.java @@ -16,31 +16,32 @@ */ package org.apache.camel.component.kubernetes.consumer; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - -import io.fabric8.kubernetes.api.model.Pod; -import io.fabric8.kubernetes.client.KubernetesClientException; -import io.fabric8.kubernetes.client.Watcher; +import java.util.concurrent.ExecutorService; import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.component.kubernetes.KubernetesConstants; import org.apache.camel.component.kubernetes.KubernetesEndpoint; import org.apache.camel.component.kubernetes.consumer.common.PodEvent; -import org.apache.camel.impl.ScheduledPollConsumer; +import org.apache.camel.impl.DefaultConsumer; import org.apache.camel.util.ObjectHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class KubernetesPodsConsumer extends ScheduledPollConsumer { +import io.fabric8.kubernetes.api.model.Pod; +import io.fabric8.kubernetes.client.KubernetesClientException; +import io.fabric8.kubernetes.client.Watcher; + +public class KubernetesPodsConsumer extends DefaultConsumer { private static final Logger LOG = LoggerFactory.getLogger(KubernetesPodsConsumer.class); - private ConcurrentMap<Long, PodEvent> map; + private final Processor processor; + private ExecutorService executor; public KubernetesPodsConsumer(KubernetesEndpoint endpoint, Processor processor) { super(endpoint, processor); + this.processor = processor; } @Override @@ -51,68 +52,85 @@ public class KubernetesPodsConsumer extends ScheduledPollConsumer { @Override protected void doStart() throws Exception { super.doStart(); - map = new ConcurrentHashMap<Long, PodEvent>(); - - if (ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getOauthToken())) { - if (ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getNamespaceName())) { - getEndpoint().getKubernetesClient().pods() - .inNamespace(getEndpoint().getKubernetesConfiguration().getNamespaceName()) - .watch(new Watcher<Pod>() { - - @Override - public void eventReceived(io.fabric8.kubernetes.client.Watcher.Action action, - Pod resource) { - PodEvent pe = new PodEvent(action, resource); - map.put(System.currentTimeMillis(), pe); - } - - @Override - public void onClose(KubernetesClientException cause) { - if (cause != null) { - LOG.error(cause.getMessage(), cause); - } + executor = getEndpoint().createExecutor(); - } - }); - } else { - getEndpoint().getKubernetesClient().pods().watch(new Watcher<Pod>() { - - @Override - public void eventReceived(io.fabric8.kubernetes.client.Watcher.Action action, Pod resource) { - PodEvent pe = new PodEvent(action, resource); - map.put(System.currentTimeMillis(), pe); - } - - @Override - public void onClose(KubernetesClientException cause) { - if (cause != null) { - LOG.error(cause.getMessage(), cause); - } - } - }); - } - } + executor.submit(new PodsConsumerTask()); } @Override protected void doStop() throws Exception { super.doStop(); - map.clear(); - } - @Override - protected int poll() throws Exception { - int mapSize = map.size(); - for (ConcurrentMap.Entry<Long, PodEvent> entry : map.entrySet()) { - PodEvent podEvent = entry.getValue(); - Exchange e = getEndpoint().createExchange(); - e.getIn().setBody(podEvent.getPod()); - e.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_ACTION, podEvent.getAction()); - e.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_TIMESTAMP, entry.getKey()); - getProcessor().process(e); - map.remove(entry.getKey()); + LOG.debug("Stopping Kubernetes Pods Consumer"); + if (executor != null) { + if (getEndpoint() != null && getEndpoint().getCamelContext() != null) { + getEndpoint().getCamelContext().getExecutorServiceManager().shutdownNow(executor); + } else { + executor.shutdownNow(); + } } - return mapSize; + executor = null; } + class PodsConsumerTask implements Runnable { + + @Override + public void run() { + if (ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getOauthToken())) { + if (ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getNamespaceName())) { + getEndpoint().getKubernetesClient().pods() + .inNamespace(getEndpoint().getKubernetesConfiguration().getNamespaceName()) + .watch(new Watcher<Pod>() { + + @Override + public void eventReceived(io.fabric8.kubernetes.client.Watcher.Action action, + Pod resource) { + PodEvent pe = new PodEvent(action, resource); + Exchange exchange = getEndpoint().createExchange(); + exchange.getIn().setBody(pe.getPod()); + exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_ACTION, pe.getAction()); + exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_TIMESTAMP, System.currentTimeMillis()); + try { + processor.process(exchange); + } catch (Exception e) { + getExceptionHandler().handleException("Error during processing", exchange, e); + } + } + + @Override + public void onClose(KubernetesClientException cause) { + if (cause != null) { + LOG.error(cause.getMessage(), cause); + } + + } + }); + } else { + getEndpoint().getKubernetesClient().pods().watch(new Watcher<Pod>() { + + @Override + public void eventReceived(io.fabric8.kubernetes.client.Watcher.Action action, Pod resource) { + PodEvent pe = new PodEvent(action, resource); + Exchange exchange = getEndpoint().createExchange(); + exchange.getIn().setBody(pe.getPod()); + exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_ACTION, pe.getAction()); + exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_TIMESTAMP, System.currentTimeMillis()); + try { + processor.process(exchange); + } catch (Exception e) { + getExceptionHandler().handleException("Error during processing", exchange, e); + } + } + + @Override + public void onClose(KubernetesClientException cause) { + if (cause != null) { + LOG.error(cause.getMessage(), cause); + } + } + }); + } + } + } + } } http://git-wip-us.apache.org/repos/asf/camel/blob/3f6b909c/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesReplicationControllersConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesReplicationControllersConsumer.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesReplicationControllersConsumer.java index e7fd348..2fd6847 100644 --- a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesReplicationControllersConsumer.java +++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesReplicationControllersConsumer.java @@ -16,31 +16,32 @@ */ package org.apache.camel.component.kubernetes.consumer; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - -import io.fabric8.kubernetes.api.model.ReplicationController; -import io.fabric8.kubernetes.client.KubernetesClientException; -import io.fabric8.kubernetes.client.Watcher; +import java.util.concurrent.ExecutorService; import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.component.kubernetes.KubernetesConstants; import org.apache.camel.component.kubernetes.KubernetesEndpoint; import org.apache.camel.component.kubernetes.consumer.common.ReplicationControllerEvent; -import org.apache.camel.impl.ScheduledPollConsumer; +import org.apache.camel.impl.DefaultConsumer; import org.apache.camel.util.ObjectHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class KubernetesReplicationControllersConsumer extends ScheduledPollConsumer { +import io.fabric8.kubernetes.api.model.ReplicationController; +import io.fabric8.kubernetes.client.KubernetesClientException; +import io.fabric8.kubernetes.client.Watcher; + +public class KubernetesReplicationControllersConsumer extends DefaultConsumer { private static final Logger LOG = LoggerFactory.getLogger(KubernetesReplicationControllersConsumer.class); - private ConcurrentMap<Long, ReplicationControllerEvent> map; + private final Processor processor; + private ExecutorService executor; public KubernetesReplicationControllersConsumer(KubernetesEndpoint endpoint, Processor processor) { super(endpoint, processor); + this.processor = processor; } @Override @@ -51,72 +52,91 @@ public class KubernetesReplicationControllersConsumer extends ScheduledPollConsu @Override protected void doStart() throws Exception { super.doStart(); - map = new ConcurrentHashMap<Long, ReplicationControllerEvent>(); - - if (ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getOauthToken())) { - if (ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getNamespaceName())) { - getEndpoint().getKubernetesClient().replicationControllers() - .inNamespace(getEndpoint().getKubernetesConfiguration().getNamespaceName()) - .watch(new Watcher<ReplicationController>() { - - @Override - public void eventReceived(io.fabric8.kubernetes.client.Watcher.Action action, - ReplicationController resource) { - ReplicationControllerEvent rce = new ReplicationControllerEvent(action, resource); - map.put(System.currentTimeMillis(), rce); - - } - - @Override - public void onClose(KubernetesClientException cause) { - if (cause != null) { - LOG.error(cause.getMessage(), cause); - } - } - - }); - } else { - getEndpoint().getKubernetesClient().replicationControllers() - .watch(new Watcher<ReplicationController>() { - @Override - public void eventReceived(io.fabric8.kubernetes.client.Watcher.Action action, - ReplicationController resource) { - ReplicationControllerEvent se = new ReplicationControllerEvent(action, resource); - map.put(System.currentTimeMillis(), se); + executor = getEndpoint().createExecutor(); - } - - @Override - public void onClose(KubernetesClientException cause) { - if (cause != null) { - LOG.error(cause.getMessage(), cause); - } - } - }); - } - } + executor.submit(new ReplicationControllersConsumerTask()); } @Override protected void doStop() throws Exception { super.doStop(); - map.clear(); + + LOG.debug("Stopping Kubernetes Replication Controllers Consumer"); + if (executor != null) { + if (getEndpoint() != null && getEndpoint().getCamelContext() != null) { + getEndpoint().getCamelContext().getExecutorServiceManager().shutdownNow(executor); + } else { + executor.shutdownNow(); + } + } + executor = null; } + + class ReplicationControllersConsumerTask implements Runnable { + + @Override + public void run() { + if (ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getOauthToken())) { + if (ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getNamespaceName())) { + getEndpoint().getKubernetesClient().replicationControllers() + .inNamespace(getEndpoint().getKubernetesConfiguration().getNamespaceName()) + .watch(new Watcher<ReplicationController>() { + + @Override + public void eventReceived(io.fabric8.kubernetes.client.Watcher.Action action, + ReplicationController resource) { + ReplicationControllerEvent rce = new ReplicationControllerEvent(action, resource); + Exchange exchange = getEndpoint().createExchange(); + exchange.getIn().setBody(rce.getReplicationController()); + exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_ACTION, rce.getAction()); + exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_TIMESTAMP, System.currentTimeMillis()); + try { + processor.process(exchange); + } catch (Exception e) { + getExceptionHandler().handleException("Error during processing", exchange, e); + } - @Override - protected int poll() throws Exception { - int mapSize = map.size(); - for (ConcurrentMap.Entry<Long, ReplicationControllerEvent> entry : map.entrySet()) { - ReplicationControllerEvent serviceEvent = entry.getValue(); - Exchange e = getEndpoint().createExchange(); - e.getIn().setBody(serviceEvent.getReplicationController()); - e.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_ACTION, serviceEvent.getAction()); - e.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_TIMESTAMP, entry.getKey()); - getProcessor().process(e); - map.remove(entry.getKey()); + } + + @Override + public void onClose(KubernetesClientException cause) { + if (cause != null) { + LOG.error(cause.getMessage(), cause); + } + } + + }); + } else { + getEndpoint().getKubernetesClient().replicationControllers() + .watch(new Watcher<ReplicationController>() { + + @Override + public void eventReceived(io.fabric8.kubernetes.client.Watcher.Action action, + ReplicationController resource) { + ReplicationControllerEvent se = new ReplicationControllerEvent(action, resource); + ReplicationControllerEvent rce = new ReplicationControllerEvent(action, resource); + Exchange exchange = getEndpoint().createExchange(); + exchange.getIn().setBody(rce.getReplicationController()); + exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_ACTION, rce.getAction()); + exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_TIMESTAMP, System.currentTimeMillis()); + try { + processor.process(exchange); + } catch (Exception e) { + getExceptionHandler().handleException("Error during processing", exchange, e); + } + } + + @Override + public void onClose(KubernetesClientException cause) { + if (cause != null) { + LOG.error(cause.getMessage(), cause); + } + } + }); + } + } } - return mapSize; } } http://git-wip-us.apache.org/repos/asf/camel/blob/3f6b909c/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesSecretsConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesSecretsConsumer.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesSecretsConsumer.java index 1a47321..c9172e7 100644 --- a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesSecretsConsumer.java +++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesSecretsConsumer.java @@ -16,31 +16,32 @@ */ package org.apache.camel.component.kubernetes.consumer; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - -import io.fabric8.kubernetes.api.model.Secret; -import io.fabric8.kubernetes.client.KubernetesClientException; -import io.fabric8.kubernetes.client.Watcher; +import java.util.concurrent.ExecutorService; import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.component.kubernetes.KubernetesConstants; import org.apache.camel.component.kubernetes.KubernetesEndpoint; import org.apache.camel.component.kubernetes.consumer.common.SecretEvent; -import org.apache.camel.impl.ScheduledPollConsumer; +import org.apache.camel.impl.DefaultConsumer; import org.apache.camel.util.ObjectHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class KubernetesSecretsConsumer extends ScheduledPollConsumer { +import io.fabric8.kubernetes.api.model.Secret; +import io.fabric8.kubernetes.client.KubernetesClientException; +import io.fabric8.kubernetes.client.Watcher; + +public class KubernetesSecretsConsumer extends DefaultConsumer { private static final Logger LOG = LoggerFactory.getLogger(KubernetesSecretsConsumer.class); - private ConcurrentMap<Long, SecretEvent> map; + private final Processor processor; + private ExecutorService executor; public KubernetesSecretsConsumer(KubernetesEndpoint endpoint, Processor processor) { super(endpoint, processor); + this.processor = processor; } @Override @@ -51,68 +52,87 @@ public class KubernetesSecretsConsumer extends ScheduledPollConsumer { @Override protected void doStart() throws Exception { super.doStart(); - map = new ConcurrentHashMap<Long, SecretEvent>(); - - if (ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getOauthToken())) { - if (ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getNamespaceName())) { - getEndpoint().getKubernetesClient().secrets() - .inNamespace(getEndpoint().getKubernetesConfiguration().getNamespaceName()) - .watch(new Watcher<Secret>() { - - @Override - public void eventReceived(io.fabric8.kubernetes.client.Watcher.Action action, - Secret resource) { - SecretEvent se = new SecretEvent(action, resource); - map.put(System.currentTimeMillis(), se); - } + executor = getEndpoint().createExecutor(); - @Override - public void onClose(KubernetesClientException cause) { - if (cause != null) { - LOG.error(cause.getMessage(), cause); - } + executor.submit(new SecretsConsumerTask()); - } - }); - } else { - getEndpoint().getKubernetesClient().secrets().watch(new Watcher<Secret>() { - - @Override - public void eventReceived(io.fabric8.kubernetes.client.Watcher.Action action, Secret resource) { - SecretEvent se = new SecretEvent(action, resource); - map.put(System.currentTimeMillis(), se); - } - - @Override - public void onClose(KubernetesClientException cause) { - if (cause != null) { - LOG.error(cause.getMessage(), cause); - } - } - }); - } - } } @Override protected void doStop() throws Exception { super.doStop(); - map.clear(); + + LOG.debug("Stopping Kubernetes Secrets Consumer"); + if (executor != null) { + if (getEndpoint() != null && getEndpoint().getCamelContext() != null) { + getEndpoint().getCamelContext().getExecutorServiceManager().shutdownNow(executor); + } else { + executor.shutdownNow(); + } + } + executor = null; } + + class SecretsConsumerTask implements Runnable { + + @Override + public void run() { + if (ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getOauthToken())) { + if (ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getNamespaceName())) { + getEndpoint().getKubernetesClient().secrets() + .inNamespace(getEndpoint().getKubernetesConfiguration().getNamespaceName()) + .watch(new Watcher<Secret>() { + + @Override + public void eventReceived(io.fabric8.kubernetes.client.Watcher.Action action, + Secret resource) { + SecretEvent se = new SecretEvent(action, resource); + Exchange exchange = getEndpoint().createExchange(); + exchange.getIn().setBody(se.getSecret()); + exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_ACTION, se.getAction()); + exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_TIMESTAMP, System.currentTimeMillis()); + try { + processor.process(exchange); + } catch (Exception e) { + getExceptionHandler().handleException("Error during processing", exchange, e); + } + } - @Override - protected int poll() throws Exception { - int mapSize = map.size(); - for (ConcurrentMap.Entry<Long, SecretEvent> entry : map.entrySet()) { - SecretEvent podEvent = entry.getValue(); - Exchange e = getEndpoint().createExchange(); - e.getIn().setBody(podEvent.getSecret()); - e.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_ACTION, podEvent.getAction()); - e.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_TIMESTAMP, entry.getKey()); - getProcessor().process(e); - map.remove(entry.getKey()); + @Override + public void onClose(KubernetesClientException cause) { + if (cause != null) { + LOG.error(cause.getMessage(), cause); + } + + } + }); + } else { + getEndpoint().getKubernetesClient().secrets().watch(new Watcher<Secret>() { + + @Override + public void eventReceived(io.fabric8.kubernetes.client.Watcher.Action action, Secret resource) { + SecretEvent se = new SecretEvent(action, resource); + Exchange exchange = getEndpoint().createExchange(); + exchange.getIn().setBody(se.getSecret()); + exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_ACTION, se.getAction()); + exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_TIMESTAMP, System.currentTimeMillis()); + try { + processor.process(exchange); + } catch (Exception e) { + getExceptionHandler().handleException("Error during processing", exchange, e); + } + } + + @Override + public void onClose(KubernetesClientException cause) { + if (cause != null) { + LOG.error(cause.getMessage(), cause); + } + } + }); + } + } } - return mapSize; } } http://git-wip-us.apache.org/repos/asf/camel/blob/3f6b909c/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesServicesConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesServicesConsumer.java b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesServicesConsumer.java index b32a06e..8df7014 100644 --- a/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesServicesConsumer.java +++ b/components/camel-kubernetes/src/main/java/org/apache/camel/component/kubernetes/consumer/KubernetesServicesConsumer.java @@ -16,31 +16,32 @@ */ package org.apache.camel.component.kubernetes.consumer; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - -import io.fabric8.kubernetes.api.model.Service; -import io.fabric8.kubernetes.client.KubernetesClientException; -import io.fabric8.kubernetes.client.Watcher; +import java.util.concurrent.ExecutorService; import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.component.kubernetes.KubernetesConstants; import org.apache.camel.component.kubernetes.KubernetesEndpoint; import org.apache.camel.component.kubernetes.consumer.common.ServiceEvent; -import org.apache.camel.impl.ScheduledPollConsumer; +import org.apache.camel.impl.DefaultConsumer; import org.apache.camel.util.ObjectHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class KubernetesServicesConsumer extends ScheduledPollConsumer { +import io.fabric8.kubernetes.api.model.Service; +import io.fabric8.kubernetes.client.KubernetesClientException; +import io.fabric8.kubernetes.client.Watcher; + +public class KubernetesServicesConsumer extends DefaultConsumer { private static final Logger LOG = LoggerFactory.getLogger(KubernetesServicesConsumer.class); - private ConcurrentMap<Long, ServiceEvent> map; + private final Processor processor; + private ExecutorService executor; public KubernetesServicesConsumer(KubernetesEndpoint endpoint, Processor processor) { super(endpoint, processor); + this.processor = processor; } @Override @@ -51,70 +52,87 @@ public class KubernetesServicesConsumer extends ScheduledPollConsumer { @Override protected void doStart() throws Exception { super.doStart(); - map = new ConcurrentHashMap<Long, ServiceEvent>(); + executor = getEndpoint().createExecutor(); - if (ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getOauthToken())) { - if (ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getNamespaceName())) { - getEndpoint().getKubernetesClient().services() - .inNamespace(getEndpoint().getKubernetesConfiguration().getNamespaceName()) - .watch(new Watcher<Service>() { + executor.submit(new ServicesConsumerTask()); - @Override - public void eventReceived(io.fabric8.kubernetes.client.Watcher.Action action, - Service resource) { - ServiceEvent se = new ServiceEvent(action, resource); - map.put(System.currentTimeMillis(), se); + } - } + @Override + protected void doStop() throws Exception { + super.doStop(); + LOG.debug("Stopping Kubernetes Services Consumer"); + if (executor != null) { + if (getEndpoint() != null && getEndpoint().getCamelContext() != null) { + getEndpoint().getCamelContext().getExecutorServiceManager().shutdownNow(executor); + } else { + executor.shutdownNow(); + } + } + executor = null; + } + + class ServicesConsumerTask implements Runnable { + + @Override + public void run() { + if (ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getOauthToken())) { + if (ObjectHelper.isNotEmpty(getEndpoint().getKubernetesConfiguration().getNamespaceName())) { + getEndpoint().getKubernetesClient().services() + .inNamespace(getEndpoint().getKubernetesConfiguration().getNamespaceName()) + .watch(new Watcher<Service>() { + + @Override + public void eventReceived(io.fabric8.kubernetes.client.Watcher.Action action, + Service resource) { + ServiceEvent se = new ServiceEvent(action, resource); + Exchange exchange = getEndpoint().createExchange(); + exchange.getIn().setBody(se.getService()); + exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_ACTION, se.getAction()); + exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_TIMESTAMP, System.currentTimeMillis()); + try { + processor.process(exchange); + } catch (Exception e) { + getExceptionHandler().handleException("Error during processing", exchange, e); + } - @Override - public void onClose(KubernetesClientException cause) { - if (cause != null) { - LOG.error(cause.getMessage(), cause); } - } - - }); - } else { - getEndpoint().getKubernetesClient().services().watch(new Watcher<Service>() { - @Override - public void eventReceived(io.fabric8.kubernetes.client.Watcher.Action action, Service resource) { - ServiceEvent se = new ServiceEvent(action, resource); - map.put(System.currentTimeMillis(), se); + @Override + public void onClose(KubernetesClientException cause) { + if (cause != null) { + LOG.error(cause.getMessage(), cause); + } + } - } + }); + } else { + getEndpoint().getKubernetesClient().services().watch(new Watcher<Service>() { + + @Override + public void eventReceived(io.fabric8.kubernetes.client.Watcher.Action action, Service resource) { + ServiceEvent se = new ServiceEvent(action, resource); + Exchange exchange = getEndpoint().createExchange(); + exchange.getIn().setBody(se.getService()); + exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_ACTION, se.getAction()); + exchange.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_TIMESTAMP, System.currentTimeMillis()); + try { + processor.process(exchange); + } catch (Exception e) { + getExceptionHandler().handleException("Error during processing", exchange, e); + } + } - @Override - public void onClose(KubernetesClientException cause) { - if (cause != null) { - LOG.error(cause.getMessage(), cause); + @Override + public void onClose(KubernetesClientException cause) { + if (cause != null) { + LOG.error(cause.getMessage(), cause); + } } - } - }); + }); + } } } } - @Override - protected void doStop() throws Exception { - super.doStop(); - map.clear(); - } - - @Override - protected int poll() throws Exception { - int mapSize = map.size(); - for (ConcurrentMap.Entry<Long, ServiceEvent> entry : map.entrySet()) { - ServiceEvent serviceEvent = entry.getValue(); - Exchange e = getEndpoint().createExchange(); - e.getIn().setBody(serviceEvent.getService()); - e.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_ACTION, serviceEvent.getAction()); - e.getIn().setHeader(KubernetesConstants.KUBERNETES_EVENT_TIMESTAMP, entry.getKey()); - getProcessor().process(e); - map.remove(entry.getKey()); - } - return mapSize; - } - } http://git-wip-us.apache.org/repos/asf/camel/blob/3f6b909c/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/consumer/KubernetesNamespacesConsumerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/consumer/KubernetesNamespacesConsumerTest.java b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/consumer/KubernetesNamespacesConsumerTest.java index a583e05..9f18a9c 100644 --- a/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/consumer/KubernetesNamespacesConsumerTest.java +++ b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/consumer/KubernetesNamespacesConsumerTest.java @@ -21,8 +21,6 @@ import java.util.Iterator; import java.util.List; import java.util.Map; -import io.fabric8.kubernetes.api.model.Namespace; - import org.apache.camel.EndpointInject; import org.apache.camel.Exchange; import org.apache.camel.Message; @@ -34,6 +32,8 @@ import org.apache.camel.component.mock.MockEndpoint; import org.apache.camel.util.ObjectHelper; import org.junit.Test; +import io.fabric8.kubernetes.api.model.Namespace; + public class KubernetesNamespacesConsumerTest extends KubernetesTestSupport { @EndpointInject(uri = "mock:result") @@ -45,9 +45,9 @@ public class KubernetesNamespacesConsumerTest extends KubernetesTestSupport { return; } - mockResultEndpoint.expectedMessageCount(3); + mockResultEndpoint.expectedMessageCount(5); mockResultEndpoint.expectedHeaderValuesReceivedInAnyOrder(KubernetesConstants.KUBERNETES_EVENT_ACTION, "ADDED", - "MODIFIED", "DELETED"); + "MODIFIED", "MODIFIED", "MODIFIED", "DELETED"); Exchange ex = template.request("direct:createNamespace", new Processor() { @@ -109,6 +109,8 @@ public class KubernetesNamespacesConsumerTest extends KubernetesTestSupport { boolean nsDeleted = ex.getOut().getBody(Boolean.class); assertTrue(nsDeleted); + + Thread.sleep(3000); mockResultEndpoint.assertIsSatisfied(); } http://git-wip-us.apache.org/repos/asf/camel/blob/3f6b909c/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/consumer/KubernetesPodsConsumerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/consumer/KubernetesPodsConsumerTest.java b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/consumer/KubernetesPodsConsumerTest.java index 8c7af78..4c9fabc 100644 --- a/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/consumer/KubernetesPodsConsumerTest.java +++ b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/consumer/KubernetesPodsConsumerTest.java @@ -50,7 +50,7 @@ public class KubernetesPodsConsumerTest extends KubernetesTestSupport { mockResultEndpoint.expectedMessageCount(3); mockResultEndpoint.expectedHeaderValuesReceivedInAnyOrder(KubernetesConstants.KUBERNETES_EVENT_ACTION, "ADDED", - "MODIFIED", "DELETED"); + "MODIFIED", "MODIFIED"); Exchange ex = template.request("direct:createPod", new Processor() { @Override @@ -100,7 +100,7 @@ public class KubernetesPodsConsumerTest extends KubernetesTestSupport { assertTrue(podDeleted); - Thread.sleep(1 * 1000); + Thread.sleep(3000); mockResultEndpoint.assertIsSatisfied(); } http://git-wip-us.apache.org/repos/asf/camel/blob/3f6b909c/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/consumer/KubernetesReplicationControllersConsumerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/consumer/KubernetesReplicationControllersConsumerTest.java b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/consumer/KubernetesReplicationControllersConsumerTest.java index 2c6cf6d..814a20a 100644 --- a/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/consumer/KubernetesReplicationControllersConsumerTest.java +++ b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/consumer/KubernetesReplicationControllersConsumerTest.java @@ -17,6 +17,7 @@ package org.apache.camel.component.kubernetes.consumer; import java.util.HashMap; +import java.util.Iterator; import java.util.Map; import io.fabric8.kubernetes.api.model.EditablePodTemplateSpec; @@ -89,7 +90,7 @@ public class KubernetesReplicationControllersConsumerTest extends KubernetesTest assertTrue(rcDeleted); - Thread.sleep(1 * 1000); + Thread.sleep(3000); mockResultEndpoint.assertIsSatisfied(); } http://git-wip-us.apache.org/repos/asf/camel/blob/3f6b909c/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/consumer/KubernetesSecretsConsumerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/consumer/KubernetesSecretsConsumerTest.java b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/consumer/KubernetesSecretsConsumerTest.java index 25e3add..ac576aa 100644 --- a/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/consumer/KubernetesSecretsConsumerTest.java +++ b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/consumer/KubernetesSecretsConsumerTest.java @@ -87,7 +87,7 @@ public class KubernetesSecretsConsumerTest extends KubernetesTestSupport { assertTrue(secDeleted); - Thread.sleep(1 * 1000); + Thread.sleep(3000); mockResultEndpoint.assertIsSatisfied(); } http://git-wip-us.apache.org/repos/asf/camel/blob/3f6b909c/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/consumer/KubernetesServicesConsumerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/consumer/KubernetesServicesConsumerTest.java b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/consumer/KubernetesServicesConsumerTest.java index ac2bb76..61f57fb 100644 --- a/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/consumer/KubernetesServicesConsumerTest.java +++ b/components/camel-kubernetes/src/test/java/org/apache/camel/component/kubernetes/consumer/KubernetesServicesConsumerTest.java @@ -92,7 +92,7 @@ public class KubernetesServicesConsumerTest extends KubernetesTestSupport { assertTrue(servDeleted); - Thread.sleep(1 * 1000); + Thread.sleep(3000); mockResultEndpoint.assertIsSatisfied(); }