This is an automated email from the ASF dual-hosted git repository. lburgazzoli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel-spring-boot.git
The following commit(s) were added to refs/heads/master by this push: new 63eb3ef CAMEL-15056: camel-main: provide a way to bring a custom engine to control shutdown logic 63eb3ef is described below commit 63eb3ef22570287f1d883d26da5b3d3f57825738 Author: lburgazzoli <lburgazz...@gmail.com> AuthorDate: Thu May 14 14:33:50 2020 +0200 CAMEL-15056: camel-main: provide a way to bring a custom engine to control shutdown logic --- .../camel/spring/boot/CamelMainRunController.java | 9 +-- .../boot/CamelSpringBootApplicationController.java | 89 +++++++++------------- .../boot/CamelSpringBootApplicationListener.java | 45 +++++------ 3 files changed, 64 insertions(+), 79 deletions(-) diff --git a/core/camel-spring-boot/src/main/java/org/apache/camel/spring/boot/CamelMainRunController.java b/core/camel-spring-boot/src/main/java/org/apache/camel/spring/boot/CamelMainRunController.java index 1eaeba6..bbae692 100644 --- a/core/camel-spring-boot/src/main/java/org/apache/camel/spring/boot/CamelMainRunController.java +++ b/core/camel-spring-boot/src/main/java/org/apache/camel/spring/boot/CamelMainRunController.java @@ -20,6 +20,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.camel.CamelContext; +import org.apache.camel.main.MainShutdownStrategy; import org.springframework.context.ApplicationContext; /** @@ -39,18 +40,14 @@ public class CamelMainRunController { daemon.start(); } - public CountDownLatch getLatch() { - return controller.getLatch(); + public MainShutdownStrategy getMainShutdownStrategy() { + return controller.getMainShutdownStrategy(); } public Runnable getMainCompleteTask() { return controller.getMainCompletedTask(); } - public AtomicBoolean getCompleted() { - return controller.getCompleted(); - } - private final class DaemonTask implements Runnable { @Override public void run() { diff --git a/core/camel-spring-boot/src/main/java/org/apache/camel/spring/boot/CamelSpringBootApplicationController.java b/core/camel-spring-boot/src/main/java/org/apache/camel/spring/boot/CamelSpringBootApplicationController.java index 0dc546a..060006d 100644 --- a/core/camel-spring-boot/src/main/java/org/apache/camel/spring/boot/CamelSpringBootApplicationController.java +++ b/core/camel-spring-boot/src/main/java/org/apache/camel/spring/boot/CamelSpringBootApplicationController.java @@ -16,79 +16,34 @@ */ package org.apache.camel.spring.boot; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicBoolean; - import javax.annotation.PreDestroy; import org.apache.camel.CamelContext; import org.apache.camel.ProducerTemplate; import org.apache.camel.main.Main; +import org.apache.camel.main.MainShutdownStrategy; +import org.apache.camel.main.SimpleMainShutdownStrategy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.ApplicationContext; public class CamelSpringBootApplicationController { - private static final Logger LOG = LoggerFactory.getLogger(CamelSpringBootApplicationController.class); private final Main main; - private final CountDownLatch latch = new CountDownLatch(1); - private final AtomicBoolean completed = new AtomicBoolean(); public CamelSpringBootApplicationController(final ApplicationContext applicationContext, final CamelContext context) { - this.main = new Main() { - - { - this.camelContext = context; - // disable shutdown hook as spring-boot has its own hook we use - this.disableHangupSupport(); - } - - @Override - protected ProducerTemplate findOrCreateCamelTemplate() { - return applicationContext.getBean(ProducerTemplate.class); - } - - @Override - protected CamelContext createCamelContext() { - return context; - } - - @Override - protected void postProcessCamelContext(CamelContext camelContext) throws Exception { - // spring boot has configured camel context and no post processing is needed - } - - @Override - protected void doStop() throws Exception { - LOG.debug("Controller is shutting down CamelContext"); - try { - super.doStop(); - } finally { - completed.set(true); - // should use the latch on this instance - CamelSpringBootApplicationController.this.latch.countDown(); - } - } - }; - // turn off route collector on main as camel-spring-boot has already discovered the routes - // and here we just use the main as implementation detail (to keep the jvm running) - this.main.configure().setRoutesCollectorEnabled(false); + this.main = new CamelSpringMain(applicationContext, context); } - public CountDownLatch getLatch() { - return this.latch; + public MainShutdownStrategy getMainShutdownStrategy() { + return this.main.getShutdownStrategy(); } public Runnable getMainCompletedTask() { return main.getCompleteTask(); } - public AtomicBoolean getCompleted() { - return completed; - } - /** * Runs the application and blocks the main thread and shutdown Camel graceful when the JVM is stopping. */ @@ -98,7 +53,7 @@ public class CamelSpringBootApplicationController { main.run(); // keep the daemon thread running LOG.debug("Waiting for CamelContext to complete shutdown"); - latch.await(); + this.main.getShutdownStrategy().await(); } catch (Exception e) { throw new RuntimeException(e); } @@ -118,4 +73,36 @@ public class CamelSpringBootApplicationController { main.completed(); } + private static class CamelSpringMain extends Main { + final ApplicationContext applicationContext; + + public CamelSpringMain(ApplicationContext applicationContext, CamelContext camelContext) { + this.applicationContext = applicationContext; + this.camelContext = camelContext; + + // use a simple shutdown strategy that does not install any shutdown hook as spring-boot + // as spring-boot has its own hook we use + this.shutdownStrategy = new SimpleMainShutdownStrategy(); + + // turn off route collector on main as camel-spring-boot has already discovered the routes + // and here we just use the main as implementation detail (to keep the jvm running) + this.mainConfigurationProperties.setRoutesCollectorEnabled(false); + } + + @Override + protected ProducerTemplate findOrCreateCamelTemplate() { + return applicationContext.getBean(ProducerTemplate.class); + } + + @Override + protected CamelContext createCamelContext() { + return camelContext; + } + + @Override + protected void postProcessCamelContext(CamelContext camelContext) throws Exception { + // spring boot has configured camel context and no post processing is needed + } + } + } diff --git a/core/camel-spring-boot/src/main/java/org/apache/camel/spring/boot/CamelSpringBootApplicationListener.java b/core/camel-spring-boot/src/main/java/org/apache/camel/spring/boot/CamelSpringBootApplicationListener.java index 74168fb..1bc09af 100644 --- a/core/camel-spring-boot/src/main/java/org/apache/camel/spring/boot/CamelSpringBootApplicationListener.java +++ b/core/camel-spring-boot/src/main/java/org/apache/camel/spring/boot/CamelSpringBootApplicationListener.java @@ -18,7 +18,6 @@ package org.apache.camel.spring.boot; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; @@ -29,8 +28,10 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.camel.CamelContext; import org.apache.camel.StartupListener; import org.apache.camel.main.MainDurationEventNotifier; +import org.apache.camel.main.MainShutdownStrategy; import org.apache.camel.main.RoutesCollector; import org.apache.camel.main.RoutesConfigurer; +import org.apache.camel.main.SimpleMainShutdownStrategy; import org.apache.camel.spi.CamelEvent; import org.apache.camel.spi.CamelEvent.Type; import org.apache.camel.spi.EventNotifier; @@ -85,7 +86,7 @@ public class CamelSpringBootApplicationListener implements ApplicationListener<C // only add and start Camel if its stopped (initial state) if (event.getApplicationContext() == this.applicationContext - && camelContext.getStatus().isStopped()) { + && camelContext.getStatus().isStopped()) { LOG.debug("Post-processing CamelContext bean: {}", camelContext.getName()); // we can use the default routes configurer @@ -117,8 +118,8 @@ public class CamelSpringBootApplicationListener implements ApplicationListener<C } // register lifecycle so we can trigger to shutdown the JVM when maximum number of messages has been processed EventNotifier notifier = new MainDurationEventNotifier(camelContext, - configurationProperties.getDurationMaxMessages(), configurationProperties.getDurationMaxIdleSeconds(), - controller.getCompleted(), controller.getLatch(), true); + configurationProperties.getDurationMaxMessages(), configurationProperties.getDurationMaxIdleSeconds(), + controller.getMainShutdownStrategy(), true); // register our event notifier ServiceHelper.startService(notifier); camelContext.getManagementStrategy().addEventNotifier(notifier); @@ -127,7 +128,7 @@ public class CamelSpringBootApplicationListener implements ApplicationListener<C if (configurationProperties.getDurationMaxSeconds() > 0) { LOG.info("CamelSpringBoot will terminate after {} seconds", configurationProperties.getDurationMaxSeconds()); terminateMainControllerAfter(camelContext, configurationProperties.getDurationMaxSeconds(), - controller.getCompleted(), controller.getLatch(), controller.getMainCompleteTask()); + controller.getMainShutdownStrategy(), controller.getMainCompleteTask()); } camelContext.addStartupListener(new StartupListener() { @@ -161,18 +162,19 @@ public class CamelSpringBootApplicationListener implements ApplicationListener<C LOG.info("CamelSpringBoot will terminate after being idle for more {} seconds", configurationProperties.getDurationMaxIdleSeconds()); } // needed by MainDurationEventNotifier to signal when we have processed the max messages - final AtomicBoolean completed = new AtomicBoolean(); - final CountDownLatch latch = new CountDownLatch(1); + final MainShutdownStrategy strategy = new SimpleMainShutdownStrategy(); // register lifecycle so we can trigger to shutdown the JVM when maximum number of messages has been processed EventNotifier notifier = new MainDurationEventNotifier(camelContext, - configurationProperties.getDurationMaxMessages(), configurationProperties.getDurationMaxIdleSeconds(), - completed, latch, false); + configurationProperties.getDurationMaxMessages(), + configurationProperties.getDurationMaxIdleSeconds(), + strategy, false); + // register our event notifier ServiceHelper.startService(notifier); camelContext.getManagementStrategy().addEventNotifier(notifier); - terminateApplicationContext(cac, camelContext, latch); + terminateApplicationContext(cac, camelContext, strategy); } } } @@ -214,7 +216,7 @@ public class CamelSpringBootApplicationListener implements ApplicationListener<C // SpringCamelContext should be the last one, // CamelContextFactoryBean should be second to last and then // RoutesCollector. This is important for startup as we want - // all resources to be ready and all routes added to the + // all resources to be ready and all routes added to the // context before we start CamelContext. // So the order should be: // 1. RoutesCollector (LOWEST_PRECEDENCE - 2) @@ -225,8 +227,8 @@ public class CamelSpringBootApplicationListener implements ApplicationListener<C // Helpers - private void terminateMainControllerAfter(final CamelContext camelContext, int seconds, final AtomicBoolean completed, - final CountDownLatch latch, final Runnable mainCompletedTask) { + private void terminateMainControllerAfter(final CamelContext camelContext, int seconds, + final MainShutdownStrategy shutdownStrategy, final Runnable mainCompletedTask) { ScheduledExecutorService executorService = camelContext.getExecutorServiceManager().newSingleThreadScheduledExecutor(this, "CamelSpringBootTerminateTask"); final AtomicBoolean running = new AtomicBoolean(); @@ -240,8 +242,7 @@ public class CamelSpringBootApplicationListener implements ApplicationListener<C } catch (Throwable e) { LOG.warn("Error during stopping CamelContext", e); } finally { - completed.set(true); - latch.countDown(); + shutdownStrategy.shutdown(); mainCompletedTask.run(); } running.set(false); @@ -249,7 +250,7 @@ public class CamelSpringBootApplicationListener implements ApplicationListener<C new Thread(stop, "CamelSpringBootTerminateTaskWorker").start(); }; - final ScheduledFuture future = executorService.schedule(task, seconds, TimeUnit.SECONDS); + final ScheduledFuture<?> future = executorService.schedule(task, seconds, TimeUnit.SECONDS); camelContext.addLifecycleStrategy(new LifecycleStrategySupport() { @Override public void onContextStop(CamelContext context) { @@ -257,7 +258,7 @@ public class CamelSpringBootApplicationListener implements ApplicationListener<C if (!running.get()) { future.cancel(true); // trigger shutdown - latch.countDown(); + shutdownStrategy.shutdown(); mainCompletedTask.run(); } } @@ -280,7 +281,7 @@ public class CamelSpringBootApplicationListener implements ApplicationListener<C new Thread(stop, "CamelSpringBootTerminateTaskWorker").start(); }; - final ScheduledFuture future = executorService.schedule(task, seconds, TimeUnit.SECONDS); + final ScheduledFuture<?> future = executorService.schedule(task, seconds, TimeUnit.SECONDS); camelContext.addLifecycleStrategy(new LifecycleStrategySupport() { @Override public void onContextStop(CamelContext context) { @@ -293,13 +294,13 @@ public class CamelSpringBootApplicationListener implements ApplicationListener<C } private void terminateApplicationContext(final ConfigurableApplicationContext applicationContext, final CamelContext camelContext, - final CountDownLatch latch) { + final MainShutdownStrategy shutdownStrategy) { ExecutorService executorService = camelContext.getExecutorServiceManager().newSingleThreadExecutor(this, "CamelSpringBootTerminateTask"); final AtomicBoolean running = new AtomicBoolean(); Runnable task = () -> { try { - latch.await(); + shutdownStrategy.await(); // only mark as running after the latch running.set(true); LOG.info("CamelSpringBoot triggering shutdown of the JVM."); @@ -311,7 +312,7 @@ public class CamelSpringBootApplicationListener implements ApplicationListener<C running.set(false); }; - final Future future = executorService.submit(task); + final Future<?> future = executorService.submit(task); camelContext.addLifecycleStrategy(new LifecycleStrategySupport() { @Override public void onContextStop(CamelContext context) { @@ -320,7 +321,7 @@ public class CamelSpringBootApplicationListener implements ApplicationListener<C future.cancel(true); } else { // trigger shutdown - latch.countDown(); + shutdownStrategy.shutdown(); } } });