Repository: camel Updated Branches: refs/heads/master 2ee7b6f99 -> 917b2f27f
CAMEL-6399: hazelcast - route policy for having one route being master, and others as slaves Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/917b2f27 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/917b2f27 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/917b2f27 Branch: refs/heads/master Commit: 917b2f27fe1f2f41305de4438616a49ff3bdba03 Parents: 2ee7b6f Author: lburgazzoli <lburgazz...@gmail.com> Authored: Fri Sep 2 15:37:38 2016 +0200 Committer: lburgazzoli <lburgazz...@gmail.com> Committed: Fri Sep 2 15:38:16 2016 +0200 ---------------------------------------------------------------------- components/camel-hazelcast/pom.xml | 5 + .../component/hazelcast/HazelcastUtil.java | 47 +++ .../hazelcast/policy/HazelcastRoutePolicy.java | 304 +++++++++++++++++++ .../policy/HazelcastRoutePolicyMain.java | 51 ++++ .../src/test/resources/hazelcast-default.xml | 8 +- .../src/test/resources/log4j2.properties | 4 + 6 files changed, 416 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/917b2f27/components/camel-hazelcast/pom.xml ---------------------------------------------------------------------- diff --git a/components/camel-hazelcast/pom.xml b/components/camel-hazelcast/pom.xml index 8dc742f..60dfa49 100644 --- a/components/camel-hazelcast/pom.xml +++ b/components/camel-hazelcast/pom.xml @@ -71,6 +71,11 @@ </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-jul</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-slf4j-impl</artifactId> <scope>test</scope> </dependency> http://git-wip-us.apache.org/repos/asf/camel/blob/917b2f27/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastUtil.java ---------------------------------------------------------------------- diff --git a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastUtil.java b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastUtil.java new file mode 100644 index 0000000..143a194 --- /dev/null +++ b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/HazelcastUtil.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.hazelcast; + +import com.hazelcast.config.Config; +import com.hazelcast.config.XmlConfigBuilder; +import com.hazelcast.core.Hazelcast; +import com.hazelcast.core.HazelcastInstance; + +public final class HazelcastUtil { + private HazelcastUtil() { + } + + public static HazelcastInstance newInstance() { + Config cfg = new XmlConfigBuilder().build(); + + // hazelcast.version.check.enabled is deprecated + cfg.setProperty( + "hazelcast.phone.home.enabled", + System.getProperty("hazelcast.phone.home.enabled", "false") + ); + cfg.setProperty( + "hazelcast.logging.type", + System.getProperty("hazelcast.logging.type", "slf4j") + ); + + return newInstance(cfg); + } + + public static HazelcastInstance newInstance(Config cfg) { + return Hazelcast.newHazelcastInstance(cfg); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/917b2f27/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/policy/HazelcastRoutePolicy.java ---------------------------------------------------------------------- diff --git a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/policy/HazelcastRoutePolicy.java b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/policy/HazelcastRoutePolicy.java new file mode 100644 index 0000000..55f91cc --- /dev/null +++ b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/policy/HazelcastRoutePolicy.java @@ -0,0 +1,304 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.hazelcast.policy; + +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import com.hazelcast.core.HazelcastInstance; +import com.hazelcast.core.IMap; +import org.apache.camel.Exchange; +import org.apache.camel.NonManagedService; +import org.apache.camel.Route; +import org.apache.camel.component.hazelcast.HazelcastUtil; +import org.apache.camel.support.RoutePolicySupport; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class HazelcastRoutePolicy extends RoutePolicySupport implements NonManagedService { + private static final Logger LOGGER = LoggerFactory.getLogger(HazelcastRoutePolicy.class); + + private final Object lock; + private final boolean managedInstance; + private final AtomicBoolean leader; + private final Set<Route> suspendedRoutes; + private final ExecutorService executorService; + + private long tryLockTimeout; + private TimeUnit tryLockTimeoutUnit; + private HazelcastInstance instance; + private String lockMapName; + private String lockKey; + private String lockValue; + private boolean shouldStopConsumer; + private IMap<String, String> locks; + private volatile Future<Void> future; + + public HazelcastRoutePolicy() { + this(HazelcastUtil.newInstance(), true); + } + + public HazelcastRoutePolicy(HazelcastInstance instance) { + this(instance, false); + } + + public HazelcastRoutePolicy(HazelcastInstance instance, boolean managedInstance) { + this.instance = instance; + this.managedInstance = managedInstance; + this.suspendedRoutes = new HashSet<>(); + this.leader = new AtomicBoolean(false); + this.lock = new Object(); + this.shouldStopConsumer = true; + this.lockMapName = null; + this.lockKey = null; + this.lockValue = null; + this.locks = null; + this.future = null; + this.tryLockTimeout = Long.MAX_VALUE; + this.tryLockTimeoutUnit = TimeUnit.MILLISECONDS; + + this.executorService = Executors.newSingleThreadExecutor(r -> { + Thread thread = new Thread(r, "Camel RoutePolicy"); + thread.setDaemon(true); + return thread; + }); + } + + @Override + public void onExchangeBegin(Route route, Exchange exchange) { + if (leader.get()) { + if (shouldStopConsumer) { + startConsumer(route); + } + } else { + if (shouldStopConsumer) { + stopConsumer(route); + } + + exchange.setException(new IllegalStateException( + "Hazelcast based route policy prohibits processing exchanges, stopping route and failing the exchange") + ); + } + } + + @Override + public void onStop(Route route) { + synchronized (lock) { + suspendedRoutes.remove(route); + } + } + + @Override + public synchronized void onSuspend(Route route) { + synchronized (lock) { + suspendedRoutes.remove(route); + } + } + + @Override + protected void doStart() throws Exception { + locks = instance.getMap(lockMapName); + future = executorService.submit(this::acquireLeadership); + + super.doStart(); + } + + @Override + protected void doStop() throws Exception { + if (future != null) { + future.cancel(true); + future = null; + } + + if (managedInstance) { + instance.shutdown(); + } + + super.doStop(); + } + // ************************************************************************* + // + // ************************************************************************* + + protected void setLeader(boolean isLeader) { + if (isLeader && leader.compareAndSet(false, isLeader)) { + LOGGER.info("Leadership taken (map={}, key={}, val={})", + lockMapName, + lockKey, + lockValue); + + startAllStoppedConsumers(); + } else { + if (!leader.getAndSet(isLeader) && isLeader) { + LOGGER.info("Leadership lost (map={}, key={} val={})", + lockMapName, + lockKey, + lockValue); + } + } + } + + private void startConsumer(Route route) { + synchronized (lock) { + try { + if (suspendedRoutes.contains(route)) { + startConsumer(route.getConsumer()); + suspendedRoutes.remove(route); + } + } catch (Exception e) { + handleException(e); + } + } + } + + private void stopConsumer(Route route) { + synchronized (lock) { + try { + if (!suspendedRoutes.contains(route)) { + LOGGER.debug("Stopping consumer for {} ({})", route.getId(), route.getConsumer()); + stopConsumer(route.getConsumer()); + suspendedRoutes.add(route); + } + } catch (Exception e) { + handleException(e); + } + } + } + + private void startAllStoppedConsumers() { + synchronized (lock) { + try { + for (Route route : suspendedRoutes) { + LOGGER.debug("Starting consumer for {} ({})", route.getId(), route.getConsumer()); + startConsumer(route.getConsumer()); + } + + suspendedRoutes.clear(); + } catch (Exception e) { + handleException(e); + } + } + } + + // ************************************************************************* + // Getter/Setters + // ************************************************************************* + + public String getLockMapName() { + return lockMapName; + } + + public void setLockMapName(String lockMapName) { + this.lockMapName = lockMapName; + } + + public boolean isShouldStopConsumer() { + return shouldStopConsumer; + } + + public void setShouldStopConsumer(boolean shouldStopConsumer) { + this.shouldStopConsumer = shouldStopConsumer; + } + + public String getLockKey() { + return lockKey; + } + + public void setLockKey(String lockKey) { + this.lockKey = lockKey; + } + + public String getLockValue() { + return lockValue; + } + + public void setLockValue(String lockValue) { + this.lockValue = lockValue; + } + + public long getTryLockTimeout() { + return tryLockTimeout; + } + + public void setTryLockTimeout(long tryLockTimeout) { + this.tryLockTimeout = tryLockTimeout; + } + + public void setTryLockTimeout(long tryLockTimeout, TimeUnit tryLockTimeoutUnit) { + this.tryLockTimeout = tryLockTimeout; + this.tryLockTimeoutUnit = tryLockTimeoutUnit; + } + + public TimeUnit getTryLockTimeoutUnit() { + return tryLockTimeoutUnit; + } + + public void setTryLockTimeoutUnit(TimeUnit tryLockTimeoutUnit) { + this.tryLockTimeoutUnit = tryLockTimeoutUnit; + } + + // ************************************************************************* + // + // ************************************************************************* + + private Void acquireLeadership() throws Exception { + boolean locked = false; + while (isRunAllowed()) { + try { + locked = locks.tryLock(lockKey, tryLockTimeout, tryLockTimeoutUnit); + if (locked) { + locks.put(lockKey, lockValue); + setLeader(true); + + // Wait almost forever + Thread.sleep(Long.MAX_VALUE); + } else { + LOGGER.debug("Failed to acquire lock (map={}, key={}, val={}) after {} {}", + lockMapName, + lockKey, + lockValue, + tryLockTimeout, + tryLockTimeoutUnit.name() + ); + } + } catch (InterruptedException e) { + if (isRunAllowed()) { + LOGGER.warn("Interrupted Exception caught", e); + } else { + LOGGER.debug("Interrupted Exception caught", e); + } + } catch (Exception e) { + LOGGER.warn("Exception caught", e); + } finally { + if (locked) { + locks.remove(lockKey); + locks.unlock(lockKey); + locked = false; + } + + setLeader(false); + } + } + + return null; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/917b2f27/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/policy/HazelcastRoutePolicyMain.java ---------------------------------------------------------------------- diff --git a/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/policy/HazelcastRoutePolicyMain.java b/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/policy/HazelcastRoutePolicyMain.java new file mode 100644 index 0000000..8f50694 --- /dev/null +++ b/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/policy/HazelcastRoutePolicyMain.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.component.hazelcast.policy; + +import java.util.concurrent.TimeUnit; + +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.main.Main; + +public final class HazelcastRoutePolicyMain { + + private HazelcastRoutePolicyMain() { + } + + public static void main(final String[] args) throws Exception { + Main main = new Main(); + main.addRouteBuilder(new RouteBuilder() { + public void configure() { + HazelcastRoutePolicy policy = new HazelcastRoutePolicy(); + policy.setLockMapName("camel:lock:map"); + policy.setLockKey("route-policy"); + policy.setLockValue(args[1]); + policy.setTryLockTimeout(5, TimeUnit.SECONDS); + + from("file:///tmp/camel?delete=true") + .routeId(args[1]) + .routePolicy(policy) + .setHeader("HazelcastRouteID", constant(args[1])) + .setHeader("HazelcastServiceName", constant(args[0])) + .to("log:org.apache.camel.component.hazelcast?level=INFO&showAll=true"); + } + }); + + main.run(); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/917b2f27/components/camel-hazelcast/src/test/resources/hazelcast-default.xml ---------------------------------------------------------------------- diff --git a/components/camel-hazelcast/src/test/resources/hazelcast-default.xml b/components/camel-hazelcast/src/test/resources/hazelcast-default.xml index 12ef6ed..29aedbb 100644 --- a/components/camel-hazelcast/src/test/resources/hazelcast-default.xml +++ b/components/camel-hazelcast/src/test/resources/hazelcast-default.xml @@ -15,8 +15,9 @@ See the License for the specific language governing permissions and limitations under the License. --> -<hazelcast xsi:schemaLocation="http://www.hazelcast.com/schema/config hazelcast-basic.xsd" - xmlns="http://www.hazelcast.com/schema/config" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> +<hazelcast xsi:schemaLocation="http://www.hazelcast.com/schema/config hazelcast-config-3.6.xsd" + xmlns="http://www.hazelcast.com/schema/config" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> <group> <name>dev</name> @@ -25,7 +26,8 @@ <!-- Disable the version check --> <properties> - <property name="hazelcast.version.check.enabled">false</property> + <property name="hazelcast.phone.home.enabled">false</property> + <property name="hazelcast.logging.type">slf4j</property> </properties> <network> http://git-wip-us.apache.org/repos/asf/camel/blob/917b2f27/components/camel-hazelcast/src/test/resources/log4j2.properties ---------------------------------------------------------------------- diff --git a/components/camel-hazelcast/src/test/resources/log4j2.properties b/components/camel-hazelcast/src/test/resources/log4j2.properties index 703ab93..1ff00cb 100644 --- a/components/camel-hazelcast/src/test/resources/log4j2.properties +++ b/components/camel-hazelcast/src/test/resources/log4j2.properties @@ -24,5 +24,9 @@ appender.out.type = Console appender.out.name = out appender.out.layout.type = PatternLayout appender.out.layout.pattern = [%30.30t] %-30.30c{1} %-5p %m%n + +logger.hz.name = org.apache.camel.component.hazelcast +logger.hz.level = debug + rootLogger.level = INFO rootLogger.appenderRef.file.ref = file