CAMEL-8223: Inflight repository to allow browsing of current inflight exchanges


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/416654d0
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/416654d0
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/416654d0

Branch: refs/heads/master
Commit: 416654d0e8200f8f00663289db758e9cafcd7397
Parents: 996e32c
Author: Claus Ibsen <davscl...@apache.org>
Authored: Fri Jan 9 11:05:29 2015 +0100
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Fri Jan 9 13:41:46 2015 +0100

----------------------------------------------------------------------
 .../mbean/ManagedInflightRepositoryMBean.java   |  3 ++
 .../camel/impl/DefaultInflightRepository.java   | 40 ++++++++++++++++----
 .../camel/impl/TimeoutInflightRepository.java   | 13 +++----
 .../mbean/ManagedInflightRepository.java        |  9 ++++-
 .../apache/camel/spi/InflightRepository.java    |  9 +++++
 5 files changed, 57 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/416654d0/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedInflightRepositoryMBean.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedInflightRepositoryMBean.java
 
b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedInflightRepositoryMBean.java
index 1b42758..0f74996 100644
--- 
a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedInflightRepositoryMBean.java
+++ 
b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedInflightRepositoryMBean.java
@@ -32,4 +32,7 @@ public interface ManagedInflightRepositoryMBean extends 
ManagedServiceMBean {
     @ManagedOperation(description = "Lists all the exchanges which are 
currently inflight")
     TabularData browse();
 
+    @ManagedOperation(description = "Lists all the exchanges which are 
currently inflight, limited and sorted")
+    TabularData browse(int limit, boolean sortByLongestDuration);
+
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/416654d0/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java 
b/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java
index f992042..5abe8e8 100644
--- 
a/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java
+++ 
b/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java
@@ -19,6 +19,7 @@ package org.apache.camel.impl;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.Date;
 import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
@@ -88,9 +89,30 @@ public class DefaultInflightRepository extends 
ServiceSupport implements Infligh
 
     @Override
     public Collection<InflightExchange> browse() {
+        return browse(-1, false);
+    }
+
+    @Override
+    public Collection<InflightExchange> browse(int limit, boolean 
sortByLongestDuration) {
         List<InflightExchange> answer = new ArrayList<InflightExchange>();
-        for (Exchange exchange : inflight.values()) {
+
+        List<Exchange> values = new ArrayList<Exchange>(inflight.values());
+        if (sortByLongestDuration) {
+            Collections.sort(values, new Comparator<Exchange>() {
+                @Override
+                public int compare(Exchange e1, Exchange e2) {
+                    long d1 = getExchangeDuration(e1);
+                    long d2 = getExchangeDuration(e2);
+                    return Long.compare(d1, d2);
+                }
+            });
+        }
+
+        for (Exchange exchange : values) {
             answer.add(new InflightExchangeEntry(exchange));
+            if (limit > 0 && answer.size() >= limit) {
+                break;
+            }
         }
         return Collections.unmodifiableCollection(answer);
     }
@@ -110,6 +132,15 @@ public class DefaultInflightRepository extends 
ServiceSupport implements Infligh
         routeCount.clear();
     }
 
+    private static long getExchangeDuration(Exchange exchange) {
+        long duration = 0;
+        Date created = exchange.getProperty(Exchange.CREATED_TIMESTAMP, 
Date.class);
+        if (created != null) {
+            duration = System.currentTimeMillis() - created.getTime();
+        }
+        return duration;
+    }
+
     private static final class InflightExchangeEntry implements 
InflightExchange {
 
         private final Exchange exchange;
@@ -125,12 +156,7 @@ public class DefaultInflightRepository extends 
ServiceSupport implements Infligh
 
         @Override
         public long getDuration() {
-            long duration = 0;
-            Date created = exchange.getProperty(Exchange.CREATED_TIMESTAMP, 
Date.class);
-            if (created != null) {
-                duration = System.currentTimeMillis() - created.getTime();
-            }
-            return duration;
+            return DefaultInflightRepository.getExchangeDuration(exchange);
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/camel/blob/416654d0/camel-core/src/main/java/org/apache/camel/impl/TimeoutInflightRepository.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/impl/TimeoutInflightRepository.java 
b/camel-core/src/main/java/org/apache/camel/impl/TimeoutInflightRepository.java
index 85290f6..51f2861 100644
--- 
a/camel-core/src/main/java/org/apache/camel/impl/TimeoutInflightRepository.java
+++ 
b/camel-core/src/main/java/org/apache/camel/impl/TimeoutInflightRepository.java
@@ -52,7 +52,6 @@ public class TimeoutInflightRepository extends ServiceSupport 
implements Infligh
     
     @Override
     protected void doStart() throws Exception {
-    
         if (exchangeFormatter == null) {
             // setup exchange formatter to be used for message history dump
             DefaultExchangeFormatter formatter = new 
DefaultExchangeFormatter();
@@ -69,7 +68,6 @@ public class TimeoutInflightRepository extends ServiceSupport 
implements Infligh
             exchangeWatchDog = new Thread(woker);
         }
         exchangeWatchDog.start();
-
     }
 
     @Override
@@ -78,7 +76,6 @@ public class TimeoutInflightRepository extends ServiceSupport 
implements Infligh
             woker.stop();
             exchangeWatchDog = null;
         }
-
     }
 
     @Override
@@ -97,13 +94,11 @@ public class TimeoutInflightRepository extends 
ServiceSupport implements Infligh
     @Override
     public void add(Exchange exchange, String routeId) {
         // do nothing here
-        
     }
 
     @Override
     public void remove(Exchange exchange, String routeId) {
         // do nothing here
-        
     }
 
     @Override
@@ -120,7 +115,6 @@ public class TimeoutInflightRepository extends 
ServiceSupport implements Infligh
     @Override
     public void removeRoute(String routeId) {
         // We don't support this interface yet
-        
     }
 
     @Override
@@ -134,6 +128,11 @@ public class TimeoutInflightRepository extends 
ServiceSupport implements Infligh
         return null;
     }
 
+    @Override
+    public Collection<InflightExchange> browse(int limit, boolean 
sortByLongestDuration) {
+        return null;
+    }
+
     public long getWaitTime() {
         return waitTime;
     }
@@ -141,7 +140,6 @@ public class TimeoutInflightRepository extends 
ServiceSupport implements Infligh
     public void setWaitTime(long waitTime) {
         this.waitTime = waitTime;
     }
-    
 
     public long getTimeout() {
         return timeout;
@@ -159,7 +157,6 @@ public class TimeoutInflightRepository extends 
ServiceSupport implements Infligh
         this.exchangeFormatter = exchangeFormatter;
     }
 
-    
     protected void processTimeoutExchange(Exchange exchange, long 
processingTime) {
         // print out exchange history or send an alarm
         // dump a route stack trace of the exchange

http://git-wip-us.apache.org/repos/asf/camel/blob/416654d0/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedInflightRepository.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedInflightRepository.java
 
b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedInflightRepository.java
index 1ab9421..4ee47ae 100644
--- 
a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedInflightRepository.java
+++ 
b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedInflightRepository.java
@@ -59,9 +59,15 @@ public class ManagedInflightRepository extends 
ManagedService implements Managed
 
     @Override
     public TabularData browse() {
+        return browse(-1, false);
+    }
+
+    @Override
+    public TabularData browse(int limit, boolean sortByLongestDuration) {
         try {
             TabularData answer = new 
TabularDataSupport(CamelOpenMBeanTypes.listInflightExchangesTabularType());
-            Collection<InflightRepository.InflightExchange> exchanges = 
inflightRepository.browse();
+            Collection<InflightRepository.InflightExchange> exchanges = 
inflightRepository.browse(limit, sortByLongestDuration);
+
             for (InflightRepository.InflightExchange entry : exchanges) {
                 CompositeType ct = 
CamelOpenMBeanTypes.listInflightExchangesCompositeType();
                 String exchangeId = entry.getExchange().getExchangeId();
@@ -80,5 +86,4 @@ public class ManagedInflightRepository extends ManagedService 
implements Managed
             throw ObjectHelper.wrapRuntimeCamelException(e);
         }
     }
-
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/416654d0/camel-core/src/main/java/org/apache/camel/spi/InflightRepository.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/spi/InflightRepository.java 
b/camel-core/src/main/java/org/apache/camel/spi/InflightRepository.java
index cd5391d..0fc313f 100644
--- a/camel-core/src/main/java/org/apache/camel/spi/InflightRepository.java
+++ b/camel-core/src/main/java/org/apache/camel/spi/InflightRepository.java
@@ -135,4 +135,13 @@ public interface InflightRepository extends StaticService {
      */
     Collection<InflightExchange> browse();
 
+    /**
+     * A <i>read-only</i> browser of the {@link InflightExchange}s that are 
currently inflight.
+     *
+     * @param limit maximum number of entries to return
+     * @param sortByLongestDuration to sort by the longest duration. Set to 
<tt>true</tt> to include the exchanges that has been inflight the longest time,
+     *                              set to <tt>false</tt> to include the 
exchanges in unspecified order.
+     */
+    Collection<InflightExchange> browse(int limit, boolean 
sortByLongestDuration);
+
 }

Reply via email to