Repository: camel
Updated Branches:
  refs/heads/master fd0db5611 -> 0e01cfb55


CAMEL-19596: RoutePolicy - To easily stop routes after X messages or time


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/dc3805a8
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/dc3805a8
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/dc3805a8

Branch: refs/heads/master
Commit: dc3805a8a257f41a8f04da22985209c12c917e37
Parents: ce1f04e
Author: Claus Ibsen <davscl...@apache.org>
Authored: Thu Mar 2 13:59:09 2017 +0100
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Thu Mar 2 14:03:17 2017 +0100

----------------------------------------------------------------------
 .../java/org/apache/camel/main/MainSupport.java | 28 ++++++++++++++++++++
 .../org/apache/camel/impl/MainSupportTest.java  |  5 ++++
 2 files changed, 33 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/dc3805a8/camel-core/src/main/java/org/apache/camel/main/MainSupport.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/main/MainSupport.java 
b/camel-core/src/main/java/org/apache/camel/main/MainSupport.java
index 03b65d8..50b76e6 100644
--- a/camel-core/src/main/java/org/apache/camel/main/MainSupport.java
+++ b/camel-core/src/main/java/org/apache/camel/main/MainSupport.java
@@ -32,6 +32,7 @@ import org.apache.camel.CamelContext;
 import org.apache.camel.ProducerTemplate;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.impl.DefaultModelJAXBContextFactory;
+import org.apache.camel.impl.DurationRoutePolicyFactory;
 import org.apache.camel.impl.FileWatcherReloadStrategy;
 import org.apache.camel.model.RouteDefinition;
 import org.apache.camel.spi.ModelJAXBContextFactory;
@@ -56,6 +57,7 @@ public abstract class MainSupport extends ServiceSupport {
     protected final AtomicBoolean completed = new AtomicBoolean(false);
     protected final AtomicInteger exitCode = new 
AtomicInteger(UNINITIALIZED_EXIT_CODE);
     protected long duration = -1;
+    protected int durationMaxMessages;
     protected TimeUnit timeUnit = TimeUnit.MILLISECONDS;
     protected boolean trace;
     protected List<RouteBuilder> routeBuilders = new ArrayList<RouteBuilder>();
@@ -116,6 +118,13 @@ public abstract class MainSupport extends ServiceSupport {
                 setDuration(Integer.parseInt(value));
             }
         });
+        addOption(new ParameterOption("dm", "durationMaxMessages",
+                "Sets the maximum messages duration that the application will 
process before terminating.",
+                "durationMaxMessages") {
+            protected void doProcess(String arg, String parameter, 
LinkedList<String> remainingArgs) {
+                setDurationMaxMessages(Integer.parseInt(parameter));
+            }
+        });
         addOption(new Option("t", "trace", "Enables tracing") {
             protected void doProcess(String arg, LinkedList<String> 
remainingArgs) {
                 enableTrace();
@@ -324,6 +333,18 @@ public abstract class MainSupport extends ServiceSupport {
         this.duration = duration;
     }
 
+    public int getDurationMaxMessages() {
+        return durationMaxMessages;
+    }
+
+    /**
+     * Sets the duration to run the application to process at most max 
messages until it
+     * should be terminated. Defaults to -1. Any value <= 0 will run forever.
+     */
+    public void setDurationMaxMessages(int durationMaxMessages) {
+        this.durationMaxMessages = durationMaxMessages;
+    }
+
     public TimeUnit getTimeUnit() {
         return timeUnit;
     }
@@ -520,6 +541,13 @@ public abstract class MainSupport extends ServiceSupport {
             }
         }
 
+        if (durationMaxMessages > 0) {
+            DurationRoutePolicyFactory factory = new 
DurationRoutePolicyFactory();
+            factory.setMaxMessages(durationMaxMessages);
+            LOG.debug("Adding DurationRoutePolicyFactory with maxMessages: 
{}", durationMaxMessages);
+            camelContext.addRoutePolicyFactory(factory);
+        }
+
         // try to load the route builders from the routeBuilderClasses
         loadRouteBuilders(camelContext);
         for (RouteBuilder routeBuilder : routeBuilders) {

http://git-wip-us.apache.org/repos/asf/camel/blob/dc3805a8/camel-core/src/test/java/org/apache/camel/impl/MainSupportTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/impl/MainSupportTest.java 
b/camel-core/src/test/java/org/apache/camel/impl/MainSupportTest.java
index 756ca42..2607c81 100644
--- a/camel-core/src/test/java/org/apache/camel/impl/MainSupportTest.java
+++ b/camel-core/src/test/java/org/apache/camel/impl/MainSupportTest.java
@@ -44,6 +44,11 @@ public class MainSupportTest extends ContextTestSupport {
         my.run(new String[]{"-d", "1s"});
     }
 
+    public void testMainSupportMaxMessages() throws Exception {
+        MyMainSupport my = new MyMainSupport();
+        my.run(new String[]{"-d", "1s", "-dm", "2"});
+    }
+
     public void testMainSupportHelp() throws Exception {
         MyMainSupport my = new MyMainSupport();
         my.run(new String[]{"-h"});

Reply via email to