CAMEL-11354: Optimize oldest exchange inflight per route
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/e6bc3975 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/e6bc3975 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/e6bc3975 Branch: refs/heads/master Commit: e6bc3975fd31e83a01c3ad94200586f5173e6b9e Parents: ffd97f6 Author: Claus Ibsen <davscl...@apache.org> Authored: Sun May 28 20:43:13 2017 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Sun May 28 20:43:13 2017 +0200 ---------------------------------------------------------------------- .../camel/impl/DefaultInflightRepository.java | 30 ++++++++++++++++++++ .../camel/management/mbean/ManagedRoute.java | 19 ++----------- .../apache/camel/spi/InflightRepository.java | 8 ++++++ 3 files changed, 41 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/e6bc3975/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java index f176861..cae7ecb 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java +++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java @@ -26,6 +26,8 @@ import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.camel.Endpoint; import org.apache.camel.Exchange; @@ -152,6 +154,34 @@ public class DefaultInflightRepository extends ServiceSupport implements Infligh } @Override + public InflightExchange oldest(String fromRouteId) { + Stream<Exchange> values; + + if (fromRouteId == null) { + // all values + values = inflight.values().stream(); + } else { + // only if route match + values = inflight.values().stream() + .filter(e -> fromRouteId.equals(e.getFromRouteId())); + } + + // sort by duration and grab the first + Exchange first = values.sorted((e1, e2) -> { + long d1 = getExchangeDuration(e1); + long d2 = getExchangeDuration(e2); + // need the biggest number first + return -1 * Long.compare(d1, d2); + }).findFirst().orElse(null); + + if (first != null) { + return new InflightExchangeEntry(first); + } else { + return null; + } + } + + @Override protected void doStart() throws Exception { } http://git-wip-us.apache.org/repos/asf/camel/blob/e6bc3975/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java index 81f4794..93248b5 100644 --- a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java +++ b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java @@ -19,15 +19,12 @@ package org.apache.camel.management.mbean; import java.io.ByteArrayInputStream; import java.io.InputStream; import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; import java.util.Comparator; -import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import javax.management.AttributeValueExp; @@ -37,11 +34,7 @@ import javax.management.Query; import javax.management.QueryExp; import javax.management.StringValueExp; -import org.apache.camel.spi.InflightRepository; -import org.w3c.dom.Document; - import org.apache.camel.CamelContext; -import org.apache.camel.Exchange; import org.apache.camel.ManagementStatisticsLevel; import org.apache.camel.Route; import org.apache.camel.ServiceStatus; @@ -52,12 +45,14 @@ import org.apache.camel.api.management.mbean.ManagedRouteMBean; import org.apache.camel.model.ModelCamelContext; import org.apache.camel.model.ModelHelper; import org.apache.camel.model.RouteDefinition; +import org.apache.camel.spi.InflightRepository; import org.apache.camel.spi.ManagementStrategy; import org.apache.camel.spi.RoutePolicy; import org.apache.camel.util.ObjectHelper; import org.apache.camel.util.XmlLineNumberParser; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.w3c.dom.Document; @ManagedResource(description = "Managed Route") public class ManagedRoute extends ManagedPerformanceCounter implements TimerListener, ManagedRouteMBean { @@ -70,7 +65,6 @@ public class ManagedRoute extends ManagedPerformanceCounter implements TimerList protected final String description; protected final ModelCamelContext context; private final LoadTriplet load = new LoadTriplet(); - private final ConcurrentHashMap<String, Exchange> exchangesInFlight = new ConcurrentHashMap<String, Exchange>(); private final String jmxDomain; public ManagedRoute(ModelCamelContext context, Route route) { @@ -85,8 +79,6 @@ public class ManagedRoute extends ManagedPerformanceCounter implements TimerList super.init(strategy); boolean enabled = context.getManagementStrategy().getManagementAgent().getStatisticsLevel() != ManagementStatisticsLevel.Off; setStatisticsEnabled(enabled); - - exchangesInFlight.clear(); } public Route getRoute() { @@ -467,12 +459,7 @@ public class ManagedRoute extends ManagedPerformanceCounter implements TimerList } private InflightRepository.InflightExchange getOldestInflightEntry() { - Collection<InflightRepository.InflightExchange> list = getContext().getInflightRepository().browse(getRouteId(), 1, true); - if (list.size() == 1) { - return list.iterator().next(); - } else { - return null; - } + return getContext().getInflightRepository().oldest(getRouteId()); } public Long getOldestInflightDuration() { http://git-wip-us.apache.org/repos/asf/camel/blob/e6bc3975/camel-core/src/main/java/org/apache/camel/spi/InflightRepository.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/spi/InflightRepository.java b/camel-core/src/main/java/org/apache/camel/spi/InflightRepository.java index 3fbd710..53c02e2 100644 --- a/camel-core/src/main/java/org/apache/camel/spi/InflightRepository.java +++ b/camel-core/src/main/java/org/apache/camel/spi/InflightRepository.java @@ -175,4 +175,12 @@ public interface InflightRepository extends StaticService { */ Collection<InflightExchange> browse(String fromRouteId, int limit, boolean sortByLongestDuration); + /** + * Gets the oldest {@link InflightExchange} that are currently inflight that started from the given route. + * + * @param fromRouteId the route id, or <tt>null</tt> for all routes. + * @return the oldest, or <tt>null</tt> if none inflight + */ + InflightExchange oldest(String fromRouteId); + }