CAMEL-8223: Inflight repository to allow browsing of current inflight exchanges
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/416654d0 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/416654d0 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/416654d0 Branch: refs/heads/master Commit: 416654d0e8200f8f00663289db758e9cafcd7397 Parents: 996e32c Author: Claus Ibsen <davscl...@apache.org> Authored: Fri Jan 9 11:05:29 2015 +0100 Committer: Claus Ibsen <davscl...@apache.org> Committed: Fri Jan 9 13:41:46 2015 +0100 ---------------------------------------------------------------------- .../mbean/ManagedInflightRepositoryMBean.java | 3 ++ .../camel/impl/DefaultInflightRepository.java | 40 ++++++++++++++++---- .../camel/impl/TimeoutInflightRepository.java | 13 +++---- .../mbean/ManagedInflightRepository.java | 9 ++++- .../apache/camel/spi/InflightRepository.java | 9 +++++ 5 files changed, 57 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/416654d0/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedInflightRepositoryMBean.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedInflightRepositoryMBean.java b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedInflightRepositoryMBean.java index 1b42758..0f74996 100644 --- a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedInflightRepositoryMBean.java +++ b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedInflightRepositoryMBean.java @@ -32,4 +32,7 @@ public interface ManagedInflightRepositoryMBean extends ManagedServiceMBean { @ManagedOperation(description = "Lists all the exchanges which are currently inflight") TabularData browse(); + @ManagedOperation(description = "Lists all the exchanges which are currently inflight, limited and sorted") + TabularData browse(int limit, boolean sortByLongestDuration); + } http://git-wip-us.apache.org/repos/asf/camel/blob/416654d0/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java index f992042..5abe8e8 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java +++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java @@ -19,6 +19,7 @@ package org.apache.camel.impl; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.Comparator; import java.util.Date; import java.util.List; import java.util.concurrent.ConcurrentHashMap; @@ -88,9 +89,30 @@ public class DefaultInflightRepository extends ServiceSupport implements Infligh @Override public Collection<InflightExchange> browse() { + return browse(-1, false); + } + + @Override + public Collection<InflightExchange> browse(int limit, boolean sortByLongestDuration) { List<InflightExchange> answer = new ArrayList<InflightExchange>(); - for (Exchange exchange : inflight.values()) { + + List<Exchange> values = new ArrayList<Exchange>(inflight.values()); + if (sortByLongestDuration) { + Collections.sort(values, new Comparator<Exchange>() { + @Override + public int compare(Exchange e1, Exchange e2) { + long d1 = getExchangeDuration(e1); + long d2 = getExchangeDuration(e2); + return Long.compare(d1, d2); + } + }); + } + + for (Exchange exchange : values) { answer.add(new InflightExchangeEntry(exchange)); + if (limit > 0 && answer.size() >= limit) { + break; + } } return Collections.unmodifiableCollection(answer); } @@ -110,6 +132,15 @@ public class DefaultInflightRepository extends ServiceSupport implements Infligh routeCount.clear(); } + private static long getExchangeDuration(Exchange exchange) { + long duration = 0; + Date created = exchange.getProperty(Exchange.CREATED_TIMESTAMP, Date.class); + if (created != null) { + duration = System.currentTimeMillis() - created.getTime(); + } + return duration; + } + private static final class InflightExchangeEntry implements InflightExchange { private final Exchange exchange; @@ -125,12 +156,7 @@ public class DefaultInflightRepository extends ServiceSupport implements Infligh @Override public long getDuration() { - long duration = 0; - Date created = exchange.getProperty(Exchange.CREATED_TIMESTAMP, Date.class); - if (created != null) { - duration = System.currentTimeMillis() - created.getTime(); - } - return duration; + return DefaultInflightRepository.getExchangeDuration(exchange); } @Override http://git-wip-us.apache.org/repos/asf/camel/blob/416654d0/camel-core/src/main/java/org/apache/camel/impl/TimeoutInflightRepository.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/TimeoutInflightRepository.java b/camel-core/src/main/java/org/apache/camel/impl/TimeoutInflightRepository.java index 85290f6..51f2861 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/TimeoutInflightRepository.java +++ b/camel-core/src/main/java/org/apache/camel/impl/TimeoutInflightRepository.java @@ -52,7 +52,6 @@ public class TimeoutInflightRepository extends ServiceSupport implements Infligh @Override protected void doStart() throws Exception { - if (exchangeFormatter == null) { // setup exchange formatter to be used for message history dump DefaultExchangeFormatter formatter = new DefaultExchangeFormatter(); @@ -69,7 +68,6 @@ public class TimeoutInflightRepository extends ServiceSupport implements Infligh exchangeWatchDog = new Thread(woker); } exchangeWatchDog.start(); - } @Override @@ -78,7 +76,6 @@ public class TimeoutInflightRepository extends ServiceSupport implements Infligh woker.stop(); exchangeWatchDog = null; } - } @Override @@ -97,13 +94,11 @@ public class TimeoutInflightRepository extends ServiceSupport implements Infligh @Override public void add(Exchange exchange, String routeId) { // do nothing here - } @Override public void remove(Exchange exchange, String routeId) { // do nothing here - } @Override @@ -120,7 +115,6 @@ public class TimeoutInflightRepository extends ServiceSupport implements Infligh @Override public void removeRoute(String routeId) { // We don't support this interface yet - } @Override @@ -134,6 +128,11 @@ public class TimeoutInflightRepository extends ServiceSupport implements Infligh return null; } + @Override + public Collection<InflightExchange> browse(int limit, boolean sortByLongestDuration) { + return null; + } + public long getWaitTime() { return waitTime; } @@ -141,7 +140,6 @@ public class TimeoutInflightRepository extends ServiceSupport implements Infligh public void setWaitTime(long waitTime) { this.waitTime = waitTime; } - public long getTimeout() { return timeout; @@ -159,7 +157,6 @@ public class TimeoutInflightRepository extends ServiceSupport implements Infligh this.exchangeFormatter = exchangeFormatter; } - protected void processTimeoutExchange(Exchange exchange, long processingTime) { // print out exchange history or send an alarm // dump a route stack trace of the exchange http://git-wip-us.apache.org/repos/asf/camel/blob/416654d0/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedInflightRepository.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedInflightRepository.java b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedInflightRepository.java index 1ab9421..4ee47ae 100644 --- a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedInflightRepository.java +++ b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedInflightRepository.java @@ -59,9 +59,15 @@ public class ManagedInflightRepository extends ManagedService implements Managed @Override public TabularData browse() { + return browse(-1, false); + } + + @Override + public TabularData browse(int limit, boolean sortByLongestDuration) { try { TabularData answer = new TabularDataSupport(CamelOpenMBeanTypes.listInflightExchangesTabularType()); - Collection<InflightRepository.InflightExchange> exchanges = inflightRepository.browse(); + Collection<InflightRepository.InflightExchange> exchanges = inflightRepository.browse(limit, sortByLongestDuration); + for (InflightRepository.InflightExchange entry : exchanges) { CompositeType ct = CamelOpenMBeanTypes.listInflightExchangesCompositeType(); String exchangeId = entry.getExchange().getExchangeId(); @@ -80,5 +86,4 @@ public class ManagedInflightRepository extends ManagedService implements Managed throw ObjectHelper.wrapRuntimeCamelException(e); } } - } http://git-wip-us.apache.org/repos/asf/camel/blob/416654d0/camel-core/src/main/java/org/apache/camel/spi/InflightRepository.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/spi/InflightRepository.java b/camel-core/src/main/java/org/apache/camel/spi/InflightRepository.java index cd5391d..0fc313f 100644 --- a/camel-core/src/main/java/org/apache/camel/spi/InflightRepository.java +++ b/camel-core/src/main/java/org/apache/camel/spi/InflightRepository.java @@ -135,4 +135,13 @@ public interface InflightRepository extends StaticService { */ Collection<InflightExchange> browse(); + /** + * A <i>read-only</i> browser of the {@link InflightExchange}s that are currently inflight. + * + * @param limit maximum number of entries to return + * @param sortByLongestDuration to sort by the longest duration. Set to <tt>true</tt> to include the exchanges that has been inflight the longest time, + * set to <tt>false</tt> to include the exchanges in unspecified order. + */ + Collection<InflightExchange> browse(int limit, boolean sortByLongestDuration); + }