This is an automated email from the ASF dual-hosted git repository. gnodet pushed a commit to branch sandbox/camel-3.x in repository https://gitbox.apache.org/repos/asf/camel.git
commit 672c81746520376ab350474af2b36aec958b6a0f Author: Guillaume Nodet <gno...@gmail.com> AuthorDate: Mon Oct 1 17:20:26 2018 +0200 Use a single field + lock to manage the services state --- .../apache/camel/component/seda/SedaEndpoint.java | 2 +- .../org/apache/camel/impl/DefaultCamelContext.java | 2 +- .../apache/camel/support/ChildServiceSupport.java | 164 ++++++------ .../org/apache/camel/support/ServiceSupport.java | 278 ++++++++++----------- .../apache/camel/support/ServiceSupportTest.java | 2 +- 5 files changed, 215 insertions(+), 233 deletions(-) diff --git a/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java b/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java index bfa1c98..ebc84c8 100644 --- a/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java +++ b/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java @@ -510,7 +510,7 @@ public class SedaEndpoint extends DefaultEndpoint implements AsyncEndpoint, Brow @Override public void shutdown() throws Exception { - if (shutdown.get()) { + if (isShutdown()) { log.trace("Service already shut down"); return; } diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java index daf04a1..6c4106e 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java +++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java @@ -3968,7 +3968,7 @@ public class DefaultCamelContext extends ServiceSupport implements ModelCamelCon } else { // and start the route service (no need to start children as they are already warmed up) try { - routeService.start(false); + routeService.start(); route.getProperties().remove("route.start.exception"); } catch (Exception e) { route.getProperties().put("route.start.exception", e); diff --git a/camel-core/src/main/java/org/apache/camel/support/ChildServiceSupport.java b/camel-core/src/main/java/org/apache/camel/support/ChildServiceSupport.java index 42cdbea..93b5778 100644 --- a/camel-core/src/main/java/org/apache/camel/support/ChildServiceSupport.java +++ b/camel-core/src/main/java/org/apache/camel/support/ChildServiceSupport.java @@ -16,120 +16,108 @@ */ package org.apache.camel.support; -import java.util.LinkedHashSet; -import java.util.Set; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import org.apache.camel.Service; import org.apache.camel.util.ServiceHelper; /** * Base class to control lifecycle for a set of child {@link org.apache.camel.Service}s. */ public abstract class ChildServiceSupport extends ServiceSupport { - private Set<Object> childServices; - - public void start() throws Exception { - start(true); - } - public void start(boolean startChildren) throws Exception { - if (!started.get()) { - if (starting.compareAndSet(false, true)) { - boolean childrenStarted = false; - Exception ex = null; - try { - if (childServices != null && startChildren) { - ServiceHelper.startService(childServices); - } - childrenStarted = true; - doStart(); - } catch (Exception e) { - ex = e; - } finally { - if (ex != null) { - try { - stop(childrenStarted); - } catch (Exception e) { - // Ignore exceptions as we want to show the original exception - } - throw ex; - } else { - started.set(true); - starting.set(false); - stopping.set(false); - stopped.set(false); - suspending.set(false); - suspended.set(false); - shutdown.set(false); - shuttingdown.set(false); - } - } + protected volatile List<Service> childServices; + + public void start() throws Exception { + synchronized (lock) { + if (status == STARTED) { + log.trace("Service already started"); + return; } - } - } - - private void stop(boolean childrenStarted) throws Exception { - if (stopping.compareAndSet(false, true)) { + if (status == STARTING) { + log.trace("Service already starting"); + return; + } + status = STARTING; + log.trace("Starting service"); try { - try { - starting.set(false); - suspending.set(false); - if (childrenStarted) { - doStop(); - } - } finally { - started.set(false); - suspended.set(false); - if (childServices != null) { - ServiceHelper.stopService(childServices); - } - } - } finally { - stopped.set(true); - stopping.set(false); - starting.set(false); - started.set(false); - suspending.set(false); - suspended.set(false); - shutdown.set(false); - shuttingdown.set(false); + ServiceHelper.startService(childServices); + doStart(); + status = STARTED; + log.trace("Service started"); + } catch (Exception e) { + status = FAILED; + log.trace("Error while starting service", e); + ServiceHelper.stopService(childServices); + throw e; } } } public void stop() throws Exception { - if (!stopped.get()) { - stop(true); + synchronized (lock) { + if (status == STOPPED || status == SHUTTINGDOWN || status == SHUTDOWN) { + log.trace("Service already stopped"); + return; + } + if (status == STOPPING) { + log.trace("Service already stopping"); + return; + } + status = STOPPING; + log.trace("Stopping service"); + try { + doStop(); + ServiceHelper.stopService(childServices); + status = STOPPED; + log.trace("Service stopped service"); + } catch (Exception e) { + status = FAILED; + log.trace("Error while stopping service", e); + throw e; + } } } - - public void shutdown() throws Exception { - // ensure we are stopped first - stop(); - if (shuttingdown.compareAndSet(false, true)) { + @Override + public void shutdown() throws Exception { + synchronized (lock) { + if (status == SHUTDOWN) { + log.trace("Service already shut down"); + return; + } + if (status == SHUTTINGDOWN) { + log.trace("Service already shutting down"); + return; + } + stop(); + status = SHUTDOWN; + log.trace("Shutting down service"); try { - try { - doShutdown(); - } finally { - if (childServices != null) { - ServiceHelper.stopAndShutdownServices(childServices); - } - } - } finally { - // shutdown is also stopped so only set shutdown flags - shutdown.set(true); - shuttingdown.set(false); + doShutdown(); + ServiceHelper.stopAndShutdownServices(childServices); + log.trace("Service shut down"); + status = SHUTDOWN; + } catch (Exception e) { + status = FAILED; + log.trace("Error shutting down service", e); + throw e; } } } - + protected void addChildService(Object childService) { - synchronized (this) { + if (childService instanceof Service) { if (childServices == null) { - childServices = new LinkedHashSet<>(); + synchronized (lock) { + if (childServices == null) { + childServices = new CopyOnWriteArrayList<>(); + } + } } + childServices.add((Service) childService); } - childServices.add(childService); } protected boolean removeChildService(Object childService) { diff --git a/camel-core/src/main/java/org/apache/camel/support/ServiceSupport.java b/camel-core/src/main/java/org/apache/camel/support/ServiceSupport.java index c1ee206..cffa616 100644 --- a/camel-core/src/main/java/org/apache/camel/support/ServiceSupport.java +++ b/camel-core/src/main/java/org/apache/camel/support/ServiceSupport.java @@ -16,13 +16,13 @@ */ package org.apache.camel.support; -import java.io.InputStream; -import java.util.Properties; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import org.apache.camel.Service; import org.apache.camel.ServiceStatus; import org.apache.camel.StatefulService; -import org.apache.camel.util.IOHelper; +import org.apache.camel.util.ServiceHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,16 +42,20 @@ import org.slf4j.LoggerFactory; */ public abstract class ServiceSupport implements StatefulService { - protected final AtomicBoolean started = new AtomicBoolean(false); - protected final AtomicBoolean starting = new AtomicBoolean(false); - protected final AtomicBoolean stopping = new AtomicBoolean(false); - protected final AtomicBoolean stopped = new AtomicBoolean(false); - protected final AtomicBoolean suspending = new AtomicBoolean(false); - protected final AtomicBoolean suspended = new AtomicBoolean(false); - protected final AtomicBoolean shuttingdown = new AtomicBoolean(false); - protected final AtomicBoolean shutdown = new AtomicBoolean(false); + protected static final int NEW = 0; + protected static final int STARTING = 1; + protected static final int STARTED = 2; + protected static final int SUSPENDING = 3; + protected static final int SUSPENDED = 4; + protected static final int STOPPING = 5; + protected static final int STOPPED = 6; + protected static final int SHUTTINGDOWN = 7; + protected static final int SHUTDOWN = 8; + protected static final int FAILED = 9; protected final Logger log = LoggerFactory.getLogger(getClass()); + protected final Object lock = new Object(); + protected volatile int status = NEW; /** * <b>Important: </b> You should override the lifecycle methods that start with <tt>do</tt>, eg {@link #doStart()}, @@ -60,41 +64,26 @@ public abstract class ServiceSupport implements StatefulService { * invoke the operation in a safe manner. */ public void start() throws Exception { - if (isStarting() || isStarted()) { - // only start service if not already started - log.trace("Service already started"); - return; - } - if (starting.compareAndSet(false, true)) { + synchronized (lock) { + if (status == STARTED) { + log.trace("Service already started"); + return; + } + if (status == STARTING) { + log.trace("Service already starting"); + return; + } + status = STARTING; log.trace("Starting service"); try { doStart(); - started.set(true); - starting.set(false); - stopping.set(false); - stopped.set(false); - suspending.set(false); - suspended.set(false); - shutdown.set(false); - shuttingdown.set(false); + status = STARTED; + log.trace("Service started"); } catch (Exception e) { - try { - stop(); - } catch (Exception e2) { - // Ignore exceptions as we want to show the original exception - } finally { - // ensure flags get reset to stopped as we failed during starting - stopping.set(false); - stopped.set(true); - starting.set(false); - started.set(false); - suspending.set(false); - suspended.set(false); - shutdown.set(false); - shuttingdown.set(false); - } + status = FAILED; + log.trace("Error while starting service", e); throw e; - } + } } } @@ -105,26 +94,26 @@ public abstract class ServiceSupport implements StatefulService { * invoke the operation in a safe manner. */ public void stop() throws Exception { - if (isStopped()) { - log.trace("Service already stopped"); - return; - } - if (isStopping()) { - log.trace("Service already stopping"); - return; - } - stopping.set(true); - try { - doStop(); - } finally { - stopping.set(false); - stopped.set(true); - starting.set(false); - started.set(false); - suspending.set(false); - suspended.set(false); - shutdown.set(false); - shuttingdown.set(false); + synchronized (lock) { + if (status == STOPPED || status == SHUTTINGDOWN || status == SHUTDOWN) { + log.trace("Service already stopped"); + return; + } + if (status == STOPPING) { + log.trace("Service already stopping"); + return; + } + status = STOPPING; + log.trace("Stopping service"); + try { + doStop(); + status = STOPPED; + log.trace("Service stopped service"); + } catch (Exception e) { + status = FAILED; + log.trace("Error while stopping service", e); + throw e; + } } } @@ -136,22 +125,25 @@ public abstract class ServiceSupport implements StatefulService { */ @Override public void suspend() throws Exception { - if (!suspended.get()) { - if (suspending.compareAndSet(false, true)) { - try { - starting.set(false); - stopping.set(false); - doSuspend(); - } finally { - stopped.set(false); - stopping.set(false); - starting.set(false); - started.set(false); - suspending.set(false); - suspended.set(true); - shutdown.set(false); - shuttingdown.set(false); - } + synchronized (lock) { + if (status == SUSPENDED) { + log.trace("Service already suspended"); + return; + } + if (status == SUSPENDING) { + log.trace("Service already suspending"); + return; + } + status = SUSPENDING; + log.trace("Suspending service"); + try { + doSuspend(); + status = SUSPENDED; + log.trace("Service suspended"); + } catch (Exception e) { + status = FAILED; + log.trace("Error while suspending service", e); + throw e; } } } @@ -164,20 +156,21 @@ public abstract class ServiceSupport implements StatefulService { */ @Override public void resume() throws Exception { - if (suspended.get()) { - if (starting.compareAndSet(false, true)) { - try { - doResume(); - } finally { - started.set(true); - starting.set(false); - stopping.set(false); - stopped.set(false); - suspending.set(false); - suspended.set(false); - shutdown.set(false); - shuttingdown.set(false); - } + synchronized (lock) { + if (status != SUSPENDED) { + log.trace("Service is not suspended"); + return; + } + status = STARTING; + log.trace("Resuming service"); + try { + doResume(); + status = STARTED; + log.trace("Service resumed"); + } catch (Exception e) { + status = FAILED; + log.trace("Error while resuming service", e); + throw e; } } } @@ -190,105 +183,106 @@ public abstract class ServiceSupport implements StatefulService { */ @Override public void shutdown() throws Exception { - if (shutdown.get()) { - log.trace("Service already shut down"); - return; - } - // ensure we are stopped first - stop(); - - if (shuttingdown.compareAndSet(false, true)) { + synchronized (lock) { + if (status == SHUTDOWN) { + log.trace("Service already shut down"); + return; + } + if (status == SHUTTINGDOWN) { + log.trace("Service already shutting down"); + return; + } + stop(); + status = SHUTDOWN; + log.trace("Shutting down service"); try { doShutdown(); - } finally { - // shutdown is also stopped so only set shutdown flags - shutdown.set(true); - shuttingdown.set(false); + log.trace("Service shut down"); + status = SHUTDOWN; + } catch (Exception e) { + status = FAILED; + log.trace("Error shutting down service", e); + throw e; } } } @Override public ServiceStatus getStatus() { - // we should check the ---ing states first, as this indicate the state is in the middle of doing that - if (isStarting()) { - return ServiceStatus.Starting; - } - if (isStopping()) { - return ServiceStatus.Stopping; - } - if (isSuspending()) { - return ServiceStatus.Suspending; - } - - // then check for the regular states - if (isStarted()) { - return ServiceStatus.Started; - } - if (isStopped()) { - return ServiceStatus.Stopped; + switch (status) { + case STARTING: + return ServiceStatus.Starting; + case STARTED: + return ServiceStatus.Started; + case SUSPENDING: + return ServiceStatus.Suspending; + case SUSPENDED: + return ServiceStatus.Suspended; + case STOPPING: + return ServiceStatus.Stopping; + default: + return ServiceStatus.Stopped; } - if (isSuspended()) { - return ServiceStatus.Suspended; - } - - // use stopped as fallback - return ServiceStatus.Stopped; } @Override public boolean isStarted() { - return started.get(); + return status == STARTED; } @Override public boolean isStarting() { - return starting.get(); + return status == STARTING; } @Override public boolean isStopping() { - return stopping.get(); + return status == STOPPING; } @Override public boolean isStopped() { - return stopped.get(); + return status == STOPPED || status == SHUTTINGDOWN || status == SHUTDOWN || status == FAILED; } @Override public boolean isSuspending() { - return suspending.get(); + return status == SUSPENDING; } @Override public boolean isSuspended() { - return suspended.get(); + return status == SUSPENDED; } @Override public boolean isRunAllowed() { - // if we have not yet initialized, then all options is false - boolean unused1 = !started.get() && !starting.get() && !stopping.get() && !stopped.get(); - boolean unused2 = !suspending.get() && !suspended.get() && !shutdown.get() && !shuttingdown.get(); - if (unused1 && unused2) { - return false; - } - return !isStoppingOrStopped(); + return isStartingOrStarted() || isSuspendingOrSuspended(); + } + + public boolean isShutdown() { + return status == SHUTDOWN; } /** * Is the service in progress of being stopped or already stopped */ public boolean isStoppingOrStopped() { - return stopping.get() || stopped.get(); + return isStopping() || isStopped(); } /** * Is the service in progress of being suspended or already suspended */ public boolean isSuspendingOrSuspended() { - return suspending.get() || suspended.get(); + return isSuspending() || isSuspended(); + } + + /** + * Is the service in progress of being suspended or already suspended + */ + public boolean isStartingOrStarted() { + return isStarting() || isStarted(); } /** diff --git a/camel-core/src/test/java/org/apache/camel/support/ServiceSupportTest.java b/camel-core/src/test/java/org/apache/camel/support/ServiceSupportTest.java index e2c848a..5b2b232 100644 --- a/camel-core/src/test/java/org/apache/camel/support/ServiceSupportTest.java +++ b/camel-core/src/test/java/org/apache/camel/support/ServiceSupportTest.java @@ -135,7 +135,7 @@ public class ServiceSupportTest extends TestSupport { public ServiceSupportTestExOnStart() { // just for testing force it to not be stopped - stopped.set(false); + status = -1; } @Override