Repository: camel Updated Branches: refs/heads/master 917b2f27f -> ee2c7f4f5
Polish HazelcastRoutePolicy Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/ee2c7f4f Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/ee2c7f4f Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/ee2c7f4f Branch: refs/heads/master Commit: ee2c7f4f5c7e0add89fd7a0eb1a22f3e66454a8b Parents: 917b2f2 Author: lburgazzoli <[email protected]> Authored: Fri Sep 2 17:09:39 2016 +0200 Committer: lburgazzoli <[email protected]> Committed: Fri Sep 2 17:09:39 2016 +0200 ---------------------------------------------------------------------- .../hazelcast/policy/HazelcastRoutePolicy.java | 78 +++++++++----------- 1 file changed, 33 insertions(+), 45 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/ee2c7f4f/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/policy/HazelcastRoutePolicy.java ---------------------------------------------------------------------- diff --git a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/policy/HazelcastRoutePolicy.java b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/policy/HazelcastRoutePolicy.java index 55f91cc..9dbb089 100644 --- a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/policy/HazelcastRoutePolicy.java +++ b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/policy/HazelcastRoutePolicy.java @@ -37,21 +37,20 @@ import org.slf4j.LoggerFactory; public class HazelcastRoutePolicy extends RoutePolicySupport implements NonManagedService { private static final Logger LOGGER = LoggerFactory.getLogger(HazelcastRoutePolicy.class); - private final Object lock; private final boolean managedInstance; private final AtomicBoolean leader; private final Set<Route> suspendedRoutes; private final ExecutorService executorService; - private long tryLockTimeout; - private TimeUnit tryLockTimeoutUnit; private HazelcastInstance instance; private String lockMapName; private String lockKey; private String lockValue; - private boolean shouldStopConsumer; + private long tryLockTimeout; + private TimeUnit tryLockTimeoutUnit; private IMap<String, String> locks; private volatile Future<Void> future; + private boolean shouldStopConsumer; public HazelcastRoutePolicy() { this(HazelcastUtil.newInstance(), true); @@ -66,15 +65,14 @@ public class HazelcastRoutePolicy extends RoutePolicySupport implements NonManag this.managedInstance = managedInstance; this.suspendedRoutes = new HashSet<>(); this.leader = new AtomicBoolean(false); - this.lock = new Object(); - this.shouldStopConsumer = true; this.lockMapName = null; this.lockKey = null; this.lockValue = null; - this.locks = null; - this.future = null; this.tryLockTimeout = Long.MAX_VALUE; this.tryLockTimeoutUnit = TimeUnit.MILLISECONDS; + this.locks = null; + this.future = null; + this.shouldStopConsumer = true; this.executorService = Executors.newSingleThreadExecutor(r -> { Thread thread = new Thread(r, "Camel RoutePolicy"); @@ -101,17 +99,13 @@ public class HazelcastRoutePolicy extends RoutePolicySupport implements NonManag } @Override - public void onStop(Route route) { - synchronized (lock) { - suspendedRoutes.remove(route); - } + public synchronized void onStop(Route route) { + suspendedRoutes.remove(route); } @Override public synchronized void onSuspend(Route route) { - synchronized (lock) { - suspendedRoutes.remove(route); - } + suspendedRoutes.remove(route); } @Override @@ -157,45 +151,39 @@ public class HazelcastRoutePolicy extends RoutePolicySupport implements NonManag } } - private void startConsumer(Route route) { - synchronized (lock) { - try { - if (suspendedRoutes.contains(route)) { - startConsumer(route.getConsumer()); - suspendedRoutes.remove(route); - } - } catch (Exception e) { - handleException(e); + private synchronized void startConsumer(Route route) { + try { + if (suspendedRoutes.contains(route)) { + startConsumer(route.getConsumer()); + suspendedRoutes.remove(route); } + } catch (Exception e) { + handleException(e); } } - private void stopConsumer(Route route) { - synchronized (lock) { - try { - if (!suspendedRoutes.contains(route)) { - LOGGER.debug("Stopping consumer for {} ({})", route.getId(), route.getConsumer()); - stopConsumer(route.getConsumer()); - suspendedRoutes.add(route); - } - } catch (Exception e) { - handleException(e); + private synchronized void stopConsumer(Route route) { + try { + if (!suspendedRoutes.contains(route)) { + LOGGER.debug("Stopping consumer for {} ({})", route.getId(), route.getConsumer()); + stopConsumer(route.getConsumer()); + suspendedRoutes.add(route); } + } catch (Exception e) { + handleException(e); } } - private void startAllStoppedConsumers() { - synchronized (lock) { - try { - for (Route route : suspendedRoutes) { - LOGGER.debug("Starting consumer for {} ({})", route.getId(), route.getConsumer()); - startConsumer(route.getConsumer()); - } - - suspendedRoutes.clear(); - } catch (Exception e) { - handleException(e); + private synchronized void startAllStoppedConsumers() { + try { + for (Route route : suspendedRoutes) { + LOGGER.debug("Starting consumer for {} ({})", route.getId(), route.getConsumer()); + startConsumer(route.getConsumer()); } + + suspendedRoutes.clear(); + } catch (Exception e) { + handleException(e); } }
