Repository: camel
Updated Branches:
  refs/heads/master 1502688d2 -> bba76eff1


CAMEL-9886 : Create an etcd based RoutePolicy


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/bba76eff
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/bba76eff
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/bba76eff

Branch: refs/heads/master
Commit: bba76eff18dc8653de9f60d1fcfca90c7028bb31
Parents: 1502688
Author: lburgazzoli <lburgazz...@gmail.com>
Authored: Wed Apr 27 17:06:24 2016 +0200
Committer: lburgazzoli <lburgazz...@gmail.com>
Committed: Wed Apr 27 17:07:03 2016 +0200

----------------------------------------------------------------------
 .../apache/camel/component/etcd/EtcdHelper.java |  35 ++
 .../camel/component/etcd/EtcdWatchConsumer.java |  12 +-
 .../component/etcd/policy/EtcdRoutePolicy.java  | 330 +++++++++++++++++++
 .../etcd/policy/EtcdRoutePolicyMain.java        |  43 +++
 .../src/test/resources/log4j.properties         |   4 +-
 5 files changed, 412 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/bba76eff/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/EtcdHelper.java
----------------------------------------------------------------------
diff --git 
a/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/EtcdHelper.java
 
b/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/EtcdHelper.java
new file mode 100644
index 0000000..c86e1a0
--- /dev/null
+++ 
b/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/EtcdHelper.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.etcd;
+
+import mousio.etcd4j.responses.EtcdErrorCode;
+import mousio.etcd4j.responses.EtcdException;
+
+public final class EtcdHelper  {
+    private static final String OUTDATED_EVENT_MSG = "requested index is 
outdated and cleared";
+
+    private EtcdHelper() {
+    }
+
+    public static boolean isOutdatedIndexException(EtcdException exception) {
+        if (exception.isErrorCode(EtcdErrorCode.EventIndexCleared) && 
exception.etcdMessage != null) {
+            return 
exception.etcdMessage.toLowerCase().contains(OUTDATED_EVENT_MSG);
+        }
+
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/bba76eff/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/EtcdWatchConsumer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/EtcdWatchConsumer.java
 
b/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/EtcdWatchConsumer.java
index 9f85a0a..2a197b3 100644
--- 
a/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/EtcdWatchConsumer.java
+++ 
b/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/EtcdWatchConsumer.java
@@ -22,7 +22,6 @@ import java.util.concurrent.atomic.AtomicLong;
 
 import mousio.client.promises.ResponsePromise;
 import mousio.etcd4j.requests.EtcdKeyGetRequest;
-import mousio.etcd4j.responses.EtcdErrorCode;
 import mousio.etcd4j.responses.EtcdException;
 import mousio.etcd4j.responses.EtcdKeysResponse;
 import org.apache.camel.Exchange;
@@ -32,7 +31,6 @@ import org.slf4j.LoggerFactory;
 
 public class EtcdWatchConsumer extends AbstractEtcdConsumer implements 
ResponsePromise.IsSimplePromiseResponseHandler<EtcdKeysResponse> {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(EtcdWatchConsumer.class);
-    private static final String OUTDATED_EVENT_MSG = "requested index is 
outdated and cleared";
 
     private final EtcdWatchEndpoint endpoint;
     private final EtcdConfiguration configuration;
@@ -81,7 +79,7 @@ public class EtcdWatchConsumer extends AbstractEtcdConsumer 
implements ResponseP
             // }
             //
             // So we set the index to the one returned by the exception + 1
-            if (isOutdatedIndexException(exception)) {
+            if (EtcdHelper.isOutdatedIndexException(exception)) {
                 LOGGER.debug("Outdated index, key: {}, cause={}", getPath(), 
exception.etcdCause);
 
                 // We set the index to the one returned by the exception + 1.
@@ -154,12 +152,4 @@ public class EtcdWatchConsumer extends 
AbstractEtcdConsumer implements ResponseP
 
         request.send().addListener(this);
     }
-
-    private boolean isOutdatedIndexException(EtcdException exception) {
-        if (exception.isErrorCode(EtcdErrorCode.EventIndexCleared) && 
exception.etcdMessage != null) {
-            return 
exception.etcdMessage.toLowerCase().contains(OUTDATED_EVENT_MSG);
-        }
-
-        return false;
-    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/bba76eff/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/policy/EtcdRoutePolicy.java
----------------------------------------------------------------------
diff --git 
a/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/policy/EtcdRoutePolicy.java
 
b/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/policy/EtcdRoutePolicy.java
new file mode 100644
index 0000000..8c0cc12
--- /dev/null
+++ 
b/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/policy/EtcdRoutePolicy.java
@@ -0,0 +1,330 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.etcd.policy;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+import mousio.client.promises.ResponsePromise;
+import mousio.etcd4j.EtcdClient;
+import mousio.etcd4j.responses.EtcdErrorCode;
+import mousio.etcd4j.responses.EtcdException;
+import mousio.etcd4j.responses.EtcdKeysResponse;
+import org.apache.camel.Exchange;
+import org.apache.camel.NonManagedService;
+import org.apache.camel.Route;
+import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.component.etcd.EtcdHelper;
+import org.apache.camel.support.RoutePolicySupport;
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class EtcdRoutePolicy extends RoutePolicySupport implements 
ResponsePromise.IsSimplePromiseResponseHandler<EtcdKeysResponse>, 
NonManagedService {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(EtcdRoutePolicy.class);
+
+    private final Object lock;
+    private final EtcdClient client;
+    private final boolean managedClient;
+    private final AtomicBoolean leader;
+    private final Set<Route> suspendedRoutes;
+    private final AtomicLong index;
+
+    private String serviceName;
+    private String servicePath;
+    private int ttl;
+    private int watchTimeout;
+    private boolean shouldStopConsumer;
+
+    public EtcdRoutePolicy() {
+        this(new EtcdClient(), true);
+    }
+
+    public EtcdRoutePolicy(EtcdClient client) {
+        this(client, false);
+    }
+
+    public EtcdRoutePolicy(EtcdClient client, boolean managedClient) {
+        this.client = client;
+        this.managedClient = managedClient;
+        this.suspendedRoutes =  new HashSet<>();
+        this.leader = new AtomicBoolean(false);
+        this.lock = new Object();
+        this.index = new AtomicLong(0);
+        this.serviceName = null;
+        this.servicePath = null;
+        this.ttl = 60;
+        this.watchTimeout = ttl / 3;
+        this.shouldStopConsumer = true;
+    }
+
+    @Override
+    public void onExchangeBegin(Route route, Exchange exchange)  {
+        if (leader.get()) {
+            if (shouldStopConsumer) {
+                startConsumer(route);
+            }
+        } else {
+            if (shouldStopConsumer) {
+                stopConsumer(route);
+            }
+
+            exchange.setException(new IllegalStateException(
+                "Etcd based route policy prohibits processing exchanges, 
stopping route and failing the exchange")
+            );
+        }
+    }
+
+    @Override
+    public void onStop(Route route) {
+        synchronized (lock) {
+            suspendedRoutes.remove(route);
+        }
+    }
+
+    @Override
+    public synchronized void onSuspend(Route route) {
+        synchronized (lock) {
+            suspendedRoutes.remove(route);
+        }
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        setLeader(tryTakeLeadership());
+        watch();
+
+        super.doStart();
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        if (managedClient) {
+            client.close();
+        }
+
+        super.doStop();
+    }
+
+    // 
*************************************************************************
+    //
+    // 
*************************************************************************
+
+    protected void setLeader(boolean isLeader) {
+        if (isLeader && leader.compareAndSet(false, isLeader)) {
+            LOGGER.info("Leadership taken (path={}, name={})", servicePath, 
serviceName);
+            startAllStoppedConsumers();
+        } else {
+            if(!leader.getAndSet(isLeader) && isLeader) {
+                LOGGER.info("Leadership lost (path={}, name={})", servicePath, 
serviceName);
+            }
+        }
+    }
+
+    private void startConsumer(Route route) {
+        synchronized (lock) {
+            try {
+                if (suspendedRoutes.contains(route)) {
+                    startConsumer(route.getConsumer());
+                    suspendedRoutes.remove(route);
+                }
+            } catch (Exception e) {
+                handleException(e);
+            }
+        }
+    }
+
+    private void stopConsumer(Route route) {
+        synchronized (lock) {
+            try {
+                if (!suspendedRoutes.contains(route)) {
+                    LOGGER.debug("Stopping consumer for {} ({})", 
route.getId(), route.getConsumer());
+                    stopConsumer(route.getConsumer());
+                    suspendedRoutes.add(route);
+                }
+            } catch (Exception e) {
+                handleException(e);
+            }
+        }
+    }
+
+    private void startAllStoppedConsumers() {
+        synchronized (lock) {
+            try {
+                for (Route route : suspendedRoutes) {
+                    LOGGER.debug("Starting consumer for {} ({})", 
route.getId(), route.getConsumer());
+                    startConsumer(route.getConsumer());
+                }
+
+                suspendedRoutes.clear();
+            } catch (Exception e) {
+                handleException(e);
+            }
+        }
+    }
+
+    // 
*************************************************************************
+    // Getter/Setters
+    // 
*************************************************************************
+
+    public EtcdClient getClient() {
+        return client;
+    }
+
+    public String getServiceName() {
+        return serviceName;
+    }
+
+    public void setServiceName(String serviceName) {
+        this.serviceName = serviceName;
+    }
+
+    public String getServicePath() {
+        return servicePath;
+    }
+
+    public void setServicePath(String servicePath) {
+        this.servicePath = servicePath;
+    }
+
+    public int getTtl() {
+        return ttl;
+    }
+
+    public void setTtl(int ttl) {
+        this.ttl = ttl;
+    }
+
+    public int getWatchTimeout() {
+        return watchTimeout;
+    }
+
+    public void setWatchTimeout(int watchTimeout) {
+        this.watchTimeout = watchTimeout;
+    }
+
+    public boolean isShouldStopConsumer() {
+        return shouldStopConsumer;
+    }
+
+    public void setShouldStopConsumer(boolean shouldStopConsumer) {
+        this.shouldStopConsumer = shouldStopConsumer;
+    }
+
+    // 
*************************************************************************
+    // Watch
+    // 
*************************************************************************
+
+    @Override
+    public void onResponse(ResponsePromise<EtcdKeysResponse> promise) {
+        if (!isRunAllowed()) {
+            return;
+        }
+
+        Throwable throwable = promise.getException();
+        if (throwable != null && throwable instanceof EtcdException) {
+            EtcdException exception = (EtcdException) throwable;
+            if (EtcdHelper.isOutdatedIndexException(exception)) {
+                LOGGER.debug("Outdated index, key={}, cause={}", servicePath, 
exception.etcdCause);
+                index.set(exception.index + 1);
+                throwable = null;
+            }
+        } else {
+            try {
+                EtcdKeysResponse response = promise.get();
+                setIndex(response);
+
+                if (response.node.value == null) {
+                    setLeader(tryTakeLeadership());
+                } else if (!ObjectHelper.equal(serviceName, 
response.node.value) && leader.get()) {
+                    // Looks like I've lost leadership
+                    setLeader(false);
+                }
+            } catch (TimeoutException e) {
+                LOGGER.debug("Timeout watching for {}", servicePath);
+                throwable = null;
+            } catch (Exception e1) {
+                throwable = e1;
+            }
+        }
+
+        if (throwable == null) {
+            try {
+                watch();
+            } catch (Exception e) {
+                throw new RuntimeCamelException(e);
+            }
+        } else {
+            throw new RuntimeCamelException(throwable);
+        }
+    }
+
+    private void watch() throws Exception {
+        if (isRunAllowed()) {
+            if (leader.get()) {
+                setIndex(client.refresh(servicePath, ttl)
+                    .send()
+                    .get()
+                );
+            }
+
+            LOGGER.debug("Watch (path={}, isLeader={}, index={})", 
servicePath, leader.get(), index.get());
+
+            client.get(servicePath)
+                .waitForChange(index.get())
+                .timeout(ttl / 3, TimeUnit.SECONDS)
+                .send()
+                .addListener(this);
+        }
+    }
+
+    private boolean tryTakeLeadership() throws Exception {
+        boolean result = false;
+
+        try {
+            EtcdKeysResponse response = getClient()
+                .put(servicePath, serviceName)
+                .prevExist(false)
+                .ttl(ttl)
+                .send()
+                .get();
+
+            result = ObjectHelper.equal(serviceName, response.node.value);
+            setIndex(response);
+        } catch (EtcdException e) {
+            if (!e.isErrorCode(EtcdErrorCode.NodeExist)) {
+                throw e;
+            }
+        }
+
+        return result;
+    }
+
+    private void setIndex(EtcdKeysResponse response) {
+        if (response != null && response.node != null) {
+            index.set(response.node.modifiedIndex + 1);
+            LOGGER.debug("Index received={}, next={}", 
response.node.modifiedIndex, index.get());
+        } else {
+            index.set(response.etcdIndex + 1);
+            LOGGER.debug("Index received={}, next={}", 
response.node.modifiedIndex, index.get());
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/bba76eff/components/camel-etcd/src/test/java/org/apache/camel/component/etcd/policy/EtcdRoutePolicyMain.java
----------------------------------------------------------------------
diff --git 
a/components/camel-etcd/src/test/java/org/apache/camel/component/etcd/policy/EtcdRoutePolicyMain.java
 
b/components/camel-etcd/src/test/java/org/apache/camel/component/etcd/policy/EtcdRoutePolicyMain.java
new file mode 100644
index 0000000..149f485
--- /dev/null
+++ 
b/components/camel-etcd/src/test/java/org/apache/camel/component/etcd/policy/EtcdRoutePolicyMain.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.etcd.policy;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.main.Main;
+
+public class EtcdRoutePolicyMain {
+    public static void main(final String[] args) throws Exception {
+        Main main = new Main();
+        main.addRouteBuilder(new RouteBuilder() {
+            public void configure() {
+                EtcdRoutePolicy policy = new EtcdRoutePolicy();
+                policy.setServicePath("/camel/services/leader");
+                policy.setServiceName(args[1]);
+                policy.setTtl(15);
+
+                fromF("file:///tmp/camel?delete=true")
+                    .routeId(args[1])
+                    .routePolicy(policy)
+                    .setHeader("EtcdRouteID", constant(args[1]))
+                    .setHeader("EtcdServiceName", constant(args[0]))
+                    
.to("log:org.apache.camel.component.etcd?level=INFO&showAll=true");
+            }
+        });
+
+        main.run();
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/bba76eff/components/camel-etcd/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/components/camel-etcd/src/test/resources/log4j.properties 
b/components/camel-etcd/src/test/resources/log4j.properties
index a915605..67bab96 100644
--- a/components/camel-etcd/src/test/resources/log4j.properties
+++ b/components/camel-etcd/src/test/resources/log4j.properties
@@ -4,7 +4,9 @@
 log4j.rootLogger=INFO, file
 
 # uncomment the following line to turn on Camel debugging
-#log4j.logger.org.apache.camel=DEBUG
+log4j.logger.org.apache.camel=INFO
+log4j.logger.org.apache.camel.component.etcd=DEBUG
+log4j.logger.mousio.etcd4j=INFO
 
 # CONSOLE appender not used by default
 log4j.appender.out=org.apache.log4j.ConsoleAppender

Reply via email to