Repository: camel
Updated Branches:
  refs/heads/master 2ee7b6f99 -> 917b2f27f


CAMEL-6399: hazelcast - route policy for having one route being master, and 
others as slaves


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/917b2f27
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/917b2f27
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/917b2f27

Branch: refs/heads/master
Commit: 917b2f27fe1f2f41305de4438616a49ff3bdba03
Parents: 2ee7b6f
Author: lburgazzoli <lburgazz...@gmail.com>
Authored: Fri Sep 2 15:37:38 2016 +0200
Committer: lburgazzoli <lburgazz...@gmail.com>
Committed: Fri Sep 2 15:38:16 2016 +0200

----------------------------------------------------------------------
 components/camel-hazelcast/pom.xml              |   5 +
 .../component/hazelcast/HazelcastUtil.java      |  47 +++
 .../hazelcast/policy/HazelcastRoutePolicy.java  | 304 +++++++++++++++++++
 .../policy/HazelcastRoutePolicyMain.java        |  51 ++++
 .../src/test/resources/hazelcast-default.xml    |   8 +-
 .../src/test/resources/log4j2.properties        |   4 +
 6 files changed, 416 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/917b2f27/components/camel-hazelcast/pom.xml
----------------------------------------------------------------------
diff --git a/components/camel-hazelcast/pom.xml 
b/components/camel-hazelcast/pom.xml
index 8dc742f..60dfa49 100644
--- a/components/camel-hazelcast/pom.xml
+++ b/components/camel-hazelcast/pom.xml
@@ -71,6 +71,11 @@
     </dependency>
     <dependency>
       <groupId>org.apache.logging.log4j</groupId>
+      <artifactId>log4j-jul</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.logging.log4j</groupId>
       <artifactId>log4j-slf4j-impl</artifactId>
       <scope>test</scope>
     </dependency>

http://git-wip-us.apache.org/repos/asf/camel/blob/917b2f27/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastUtil.java
----------------------------------------------------------------------
diff --git 
a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastUtil.java
 
b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastUtil.java
new file mode 100644
index 0000000..143a194
--- /dev/null
+++ 
b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastUtil.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.hazelcast;
+
+import com.hazelcast.config.Config;
+import com.hazelcast.config.XmlConfigBuilder;
+import com.hazelcast.core.Hazelcast;
+import com.hazelcast.core.HazelcastInstance;
+
+public final class HazelcastUtil {
+    private HazelcastUtil() {
+    }
+
+    public static HazelcastInstance newInstance() {
+        Config cfg = new XmlConfigBuilder().build();
+
+        // hazelcast.version.check.enabled is deprecated
+        cfg.setProperty(
+            "hazelcast.phone.home.enabled",
+            System.getProperty("hazelcast.phone.home.enabled", "false")
+        );
+        cfg.setProperty(
+            "hazelcast.logging.type",
+            System.getProperty("hazelcast.logging.type", "slf4j")
+        );
+
+        return newInstance(cfg);
+    }
+
+    public static HazelcastInstance newInstance(Config cfg) {
+        return Hazelcast.newHazelcastInstance(cfg);
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/917b2f27/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/policy/HazelcastRoutePolicy.java
----------------------------------------------------------------------
diff --git 
a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/policy/HazelcastRoutePolicy.java
 
b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/policy/HazelcastRoutePolicy.java
new file mode 100644
index 0000000..55f91cc
--- /dev/null
+++ 
b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/policy/HazelcastRoutePolicy.java
@@ -0,0 +1,304 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.hazelcast.policy;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import com.hazelcast.core.HazelcastInstance;
+import com.hazelcast.core.IMap;
+import org.apache.camel.Exchange;
+import org.apache.camel.NonManagedService;
+import org.apache.camel.Route;
+import org.apache.camel.component.hazelcast.HazelcastUtil;
+import org.apache.camel.support.RoutePolicySupport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class HazelcastRoutePolicy extends RoutePolicySupport implements 
NonManagedService {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(HazelcastRoutePolicy.class);
+
+    private final Object lock;
+    private final boolean managedInstance;
+    private final AtomicBoolean leader;
+    private final Set<Route> suspendedRoutes;
+    private final ExecutorService executorService;
+
+    private long tryLockTimeout;
+    private TimeUnit tryLockTimeoutUnit;
+    private HazelcastInstance instance;
+    private String lockMapName;
+    private String lockKey;
+    private String lockValue;
+    private boolean shouldStopConsumer;
+    private IMap<String, String> locks;
+    private volatile Future<Void> future;
+
+    public HazelcastRoutePolicy() {
+        this(HazelcastUtil.newInstance(), true);
+    }
+
+    public HazelcastRoutePolicy(HazelcastInstance instance) {
+        this(instance, false);
+    }
+
+    public HazelcastRoutePolicy(HazelcastInstance instance, boolean 
managedInstance) {
+        this.instance = instance;
+        this.managedInstance = managedInstance;
+        this.suspendedRoutes =  new HashSet<>();
+        this.leader = new AtomicBoolean(false);
+        this.lock = new Object();
+        this.shouldStopConsumer = true;
+        this.lockMapName = null;
+        this.lockKey = null;
+        this.lockValue = null;
+        this.locks = null;
+        this.future = null;
+        this.tryLockTimeout = Long.MAX_VALUE;
+        this.tryLockTimeoutUnit = TimeUnit.MILLISECONDS;
+
+        this.executorService =  Executors.newSingleThreadExecutor(r -> {
+            Thread thread = new Thread(r, "Camel RoutePolicy");
+            thread.setDaemon(true);
+            return thread;
+        });
+    }
+
+    @Override
+    public void onExchangeBegin(Route route, Exchange exchange)  {
+        if (leader.get()) {
+            if (shouldStopConsumer) {
+                startConsumer(route);
+            }
+        } else {
+            if (shouldStopConsumer) {
+                stopConsumer(route);
+            }
+
+            exchange.setException(new IllegalStateException(
+                "Hazelcast based route policy prohibits processing exchanges, 
stopping route and failing the exchange")
+            );
+        }
+    }
+
+    @Override
+    public void onStop(Route route) {
+        synchronized (lock) {
+            suspendedRoutes.remove(route);
+        }
+    }
+
+    @Override
+    public synchronized void onSuspend(Route route) {
+        synchronized (lock) {
+            suspendedRoutes.remove(route);
+        }
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        locks = instance.getMap(lockMapName);
+        future = executorService.submit(this::acquireLeadership);
+
+        super.doStart();
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        if (future != null) {
+            future.cancel(true);
+            future = null;
+        }
+
+        if (managedInstance) {
+            instance.shutdown();
+        }
+
+        super.doStop();
+    }
+    // 
*************************************************************************
+    //
+    // 
*************************************************************************
+
+    protected void setLeader(boolean isLeader) {
+        if (isLeader && leader.compareAndSet(false, isLeader)) {
+            LOGGER.info("Leadership taken (map={}, key={}, val={})",
+                lockMapName,
+                lockKey,
+                lockValue);
+
+            startAllStoppedConsumers();
+        } else {
+            if (!leader.getAndSet(isLeader) && isLeader) {
+                LOGGER.info("Leadership lost (map={}, key={} val={})",
+                    lockMapName,
+                    lockKey,
+                    lockValue);
+            }
+        }
+    }
+
+    private void startConsumer(Route route) {
+        synchronized (lock) {
+            try {
+                if (suspendedRoutes.contains(route)) {
+                    startConsumer(route.getConsumer());
+                    suspendedRoutes.remove(route);
+                }
+            } catch (Exception e) {
+                handleException(e);
+            }
+        }
+    }
+
+    private void stopConsumer(Route route) {
+        synchronized (lock) {
+            try {
+                if (!suspendedRoutes.contains(route)) {
+                    LOGGER.debug("Stopping consumer for {} ({})", 
route.getId(), route.getConsumer());
+                    stopConsumer(route.getConsumer());
+                    suspendedRoutes.add(route);
+                }
+            } catch (Exception e) {
+                handleException(e);
+            }
+        }
+    }
+
+    private void startAllStoppedConsumers() {
+        synchronized (lock) {
+            try {
+                for (Route route : suspendedRoutes) {
+                    LOGGER.debug("Starting consumer for {} ({})", 
route.getId(), route.getConsumer());
+                    startConsumer(route.getConsumer());
+                }
+
+                suspendedRoutes.clear();
+            } catch (Exception e) {
+                handleException(e);
+            }
+        }
+    }
+
+    // 
*************************************************************************
+    // Getter/Setters
+    // 
*************************************************************************
+
+    public String getLockMapName() {
+        return lockMapName;
+    }
+
+    public void setLockMapName(String lockMapName) {
+        this.lockMapName = lockMapName;
+    }
+
+    public boolean isShouldStopConsumer() {
+        return shouldStopConsumer;
+    }
+
+    public void setShouldStopConsumer(boolean shouldStopConsumer) {
+        this.shouldStopConsumer = shouldStopConsumer;
+    }
+
+    public String getLockKey() {
+        return lockKey;
+    }
+
+    public void setLockKey(String lockKey) {
+        this.lockKey = lockKey;
+    }
+
+    public String getLockValue() {
+        return lockValue;
+    }
+
+    public void setLockValue(String lockValue) {
+        this.lockValue = lockValue;
+    }
+
+    public long getTryLockTimeout() {
+        return tryLockTimeout;
+    }
+
+    public void setTryLockTimeout(long tryLockTimeout) {
+        this.tryLockTimeout = tryLockTimeout;
+    }
+
+    public void setTryLockTimeout(long tryLockTimeout, TimeUnit 
tryLockTimeoutUnit) {
+        this.tryLockTimeout = tryLockTimeout;
+        this.tryLockTimeoutUnit = tryLockTimeoutUnit;
+    }
+
+    public TimeUnit getTryLockTimeoutUnit() {
+        return tryLockTimeoutUnit;
+    }
+
+    public void setTryLockTimeoutUnit(TimeUnit tryLockTimeoutUnit) {
+        this.tryLockTimeoutUnit = tryLockTimeoutUnit;
+    }
+
+    // 
*************************************************************************
+    //
+    // 
*************************************************************************
+
+    private Void acquireLeadership() throws Exception {
+        boolean locked = false;
+        while (isRunAllowed()) {
+            try {
+                locked = locks.tryLock(lockKey, tryLockTimeout, 
tryLockTimeoutUnit);
+                if (locked) {
+                    locks.put(lockKey, lockValue);
+                    setLeader(true);
+
+                    // Wait almost forever
+                    Thread.sleep(Long.MAX_VALUE);
+                } else {
+                    LOGGER.debug("Failed to acquire lock (map={}, key={}, 
val={}) after {} {}",
+                        lockMapName,
+                        lockKey,
+                        lockValue,
+                        tryLockTimeout,
+                        tryLockTimeoutUnit.name()
+                    );
+                }
+            } catch (InterruptedException e) {
+                if (isRunAllowed()) {
+                    LOGGER.warn("Interrupted Exception caught", e);
+                } else {
+                    LOGGER.debug("Interrupted Exception caught", e);
+                }
+            } catch (Exception e) {
+                LOGGER.warn("Exception caught", e);
+            } finally {
+                if (locked) {
+                    locks.remove(lockKey);
+                    locks.unlock(lockKey);
+                    locked = false;
+                }
+
+                setLeader(false);
+            }
+        }
+
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/917b2f27/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/policy/HazelcastRoutePolicyMain.java
----------------------------------------------------------------------
diff --git 
a/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/policy/HazelcastRoutePolicyMain.java
 
b/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/policy/HazelcastRoutePolicyMain.java
new file mode 100644
index 0000000..8f50694
--- /dev/null
+++ 
b/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/policy/HazelcastRoutePolicyMain.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.hazelcast.policy;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.main.Main;
+
+public final class HazelcastRoutePolicyMain {
+
+    private HazelcastRoutePolicyMain() {
+    }
+
+    public static void main(final String[] args) throws Exception {
+        Main main = new Main();
+        main.addRouteBuilder(new RouteBuilder() {
+            public void configure() {
+                HazelcastRoutePolicy policy = new HazelcastRoutePolicy();
+                policy.setLockMapName("camel:lock:map");
+                policy.setLockKey("route-policy");
+                policy.setLockValue(args[1]);
+                policy.setTryLockTimeout(5, TimeUnit.SECONDS);
+
+                from("file:///tmp/camel?delete=true")
+                    .routeId(args[1])
+                    .routePolicy(policy)
+                    .setHeader("HazelcastRouteID", constant(args[1]))
+                    .setHeader("HazelcastServiceName", constant(args[0]))
+                    
.to("log:org.apache.camel.component.hazelcast?level=INFO&showAll=true");
+            }
+        });
+
+        main.run();
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/917b2f27/components/camel-hazelcast/src/test/resources/hazelcast-default.xml
----------------------------------------------------------------------
diff --git 
a/components/camel-hazelcast/src/test/resources/hazelcast-default.xml 
b/components/camel-hazelcast/src/test/resources/hazelcast-default.xml
index 12ef6ed..29aedbb 100644
--- a/components/camel-hazelcast/src/test/resources/hazelcast-default.xml
+++ b/components/camel-hazelcast/src/test/resources/hazelcast-default.xml
@@ -15,8 +15,9 @@
   See the License for the specific language governing permissions and
   limitations under the License.
 -->
-<hazelcast xsi:schemaLocation="http://www.hazelcast.com/schema/config 
hazelcast-basic.xsd"
-       xmlns="http://www.hazelcast.com/schema/config"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";>
+<hazelcast xsi:schemaLocation="http://www.hazelcast.com/schema/config 
hazelcast-config-3.6.xsd"
+                                        
xmlns="http://www.hazelcast.com/schema/config";
+                                        
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";>
 
        <group>
                <name>dev</name>
@@ -25,7 +26,8 @@
        
        <!-- Disable the version check -->
        <properties>
-            <property name="hazelcast.version.check.enabled">false</property>
+         <property name="hazelcast.phone.home.enabled">false</property>
+               <property name="hazelcast.logging.type">slf4j</property>
        </properties>
 
        <network>

http://git-wip-us.apache.org/repos/asf/camel/blob/917b2f27/components/camel-hazelcast/src/test/resources/log4j2.properties
----------------------------------------------------------------------
diff --git a/components/camel-hazelcast/src/test/resources/log4j2.properties 
b/components/camel-hazelcast/src/test/resources/log4j2.properties
index 703ab93..1ff00cb 100644
--- a/components/camel-hazelcast/src/test/resources/log4j2.properties
+++ b/components/camel-hazelcast/src/test/resources/log4j2.properties
@@ -24,5 +24,9 @@ appender.out.type = Console
 appender.out.name = out
 appender.out.layout.type = PatternLayout
 appender.out.layout.pattern = [%30.30t] %-30.30c{1} %-5p %m%n
+
+logger.hz.name  = org.apache.camel.component.hazelcast
+logger.hz.level = debug
+
 rootLogger.level = INFO
 rootLogger.appenderRef.file.ref = file

Reply via email to