CAMEL-11354: Optimize oldest exchange inflight per route
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/ce4e8f7d Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/ce4e8f7d Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/ce4e8f7d Branch: refs/heads/master Commit: ce4e8f7df2dbc2b8cf9b0a55c8bba01010ad3e86 Parents: 4a3debe Author: Claus Ibsen <davscl...@apache.org> Authored: Sun May 28 20:03:59 2017 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Sun May 28 20:03:59 2017 +0200 ---------------------------------------------------------------------- .../main/java/org/apache/camel/Exchange.java | 6 + .../org/apache/camel/impl/DefaultExchange.java | 10 ++ .../camel/impl/DefaultInflightRepository.java | 2 +- .../camel/management/mbean/ManagedRoute.java | 111 ++++--------------- .../org/apache/camel/util/MessageHelper.java | 2 +- .../EventNotifierExchangeCompletedTest.java | 2 +- 6 files changed, 43 insertions(+), 90 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/ce4e8f7d/camel-core/src/main/java/org/apache/camel/Exchange.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/Exchange.java b/camel-core/src/main/java/org/apache/camel/Exchange.java index eecc7e0..87fcf13 100644 --- a/camel-core/src/main/java/org/apache/camel/Exchange.java +++ b/camel-core/src/main/java/org/apache/camel/Exchange.java @@ -16,6 +16,7 @@ */ package org.apache.camel; +import java.util.Date; import java.util.List; import java.util.Map; @@ -584,4 +585,9 @@ public interface Exchange { */ List<Synchronization> handoverCompletions(); + /** + * Gets the timestamp when this exchange was created. + */ + Date getCreated(); + } http://git-wip-us.apache.org/repos/asf/camel/blob/ce4e8f7d/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java index c1a5c39..6b7a686 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java +++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java @@ -17,6 +17,7 @@ package org.apache.camel.impl; import java.util.ArrayList; +import java.util.Date; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedList; @@ -86,6 +87,15 @@ public final class DefaultExchange implements Exchange { return String.format("Exchange[%s]", exchangeId == null ? "" : exchangeId); } + @Override + public Date getCreated() { + if (hasProperties()) { + return getProperty(Exchange.CREATED_TIMESTAMP, Date.class); + } else { + return null; + } + } + public Exchange copy() { // to be backwards compatible as today return copy(false); http://git-wip-us.apache.org/repos/asf/camel/blob/ce4e8f7d/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java index 01a2826..2c6c78b 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java +++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java @@ -167,7 +167,7 @@ public class DefaultInflightRepository extends ServiceSupport implements Infligh private static long getExchangeDuration(Exchange exchange) { long duration = 0; - Date created = exchange.getProperty(Exchange.CREATED_TIMESTAMP, Date.class); + Date created = exchange.getCreated(); if (created != null) { duration = System.currentTimeMillis() - created.getTime(); } http://git-wip-us.apache.org/repos/asf/camel/blob/ce4e8f7d/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java index 166f3a2..1b031b1 100644 --- a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java +++ b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java @@ -21,12 +21,12 @@ import java.io.InputStream; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; +import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import javax.management.AttributeValueExp; @@ -68,8 +68,7 @@ public class ManagedRoute extends ManagedPerformanceCounter implements TimerList protected final String description; protected final ModelCamelContext context; private final LoadTriplet load = new LoadTriplet(); - private final ConcurrentSkipListMap<InFlightKey, Long> exchangesInFlightStartTimestamps = new ConcurrentSkipListMap<InFlightKey, Long>(); - private final ConcurrentHashMap<String, InFlightKey> exchangesInFlightKeys = new ConcurrentHashMap<String, InFlightKey>(); + private final ConcurrentHashMap<String, Exchange> exchangesInFlight = new ConcurrentHashMap<String, Exchange>(); private final String jmxDomain; public ManagedRoute(ModelCamelContext context, Route route) { @@ -85,8 +84,7 @@ public class ManagedRoute extends ManagedPerformanceCounter implements TimerList boolean enabled = context.getManagementStrategy().getManagementAgent().getStatisticsLevel() != ManagementStatisticsLevel.Off; setStatisticsEnabled(enabled); - exchangesInFlightKeys.clear(); - exchangesInFlightStartTimestamps.clear(); + exchangesInFlight.clear(); } public Route getRoute() { @@ -410,13 +408,14 @@ public class ManagedRoute extends ManagedPerformanceCounter implements TimerList String stat = dumpStatsAsXml(fullStats); answer.append(" exchangesInflight=\"").append(getInflightExchanges()).append("\""); answer.append(" selfProcessingTime=\"").append(routeSelfTime).append("\""); - InFlightKey oldestInflightEntry = getOldestInflightEntry(); - if (oldestInflightEntry == null) { + Exchange oldest = getOldestInflightEntry(); + if (oldest == null) { answer.append(" oldestInflightExchangeId=\"\""); answer.append(" oldestInflightDuration=\"\""); } else { - answer.append(" oldestInflightExchangeId=\"").append(oldestInflightEntry.exchangeId).append("\""); - answer.append(" oldestInflightDuration=\"").append(System.currentTimeMillis() - oldestInflightEntry.timeStamp).append("\""); + long duration = System.currentTimeMillis() - oldest.getCreated().getTime(); + answer.append(" oldestInflightExchangeId=\"").append(oldest.getExchangeId()).append("\""); + answer.append(" oldestInflightDuration=\"").append(duration).append("\""); } answer.append(" ").append(stat.substring(7, stat.length() - 2)).append(">\n"); @@ -466,111 +465,49 @@ public class ManagedRoute extends ManagedPerformanceCounter implements TimerList return route.hashCode(); } - private InFlightKey getOldestInflightEntry() { - Map.Entry<InFlightKey, Long> entry = exchangesInFlightStartTimestamps.firstEntry(); - if (entry != null) { - return entry.getKey(); - } - return null; + private Exchange getOldestInflightEntry() { + return exchangesInFlight.values().stream().max(Comparator.comparing(Exchange::getCreated)).orElse(null); } public Long getOldestInflightDuration() { - InFlightKey oldest = getOldestInflightEntry(); - if (oldest == null) { + Exchange exchange = getOldestInflightEntry(); + if (exchange == null) { + return null; + } + Date created = exchange.getCreated(); + if (created != null) { + return System.currentTimeMillis() - created.getTime(); + } else { return null; } - return System.currentTimeMillis() - oldest.timeStamp; } public String getOldestInflightExchangeId() { - InFlightKey oldest = getOldestInflightEntry(); - if (oldest == null) { + Exchange exchange = getOldestInflightEntry(); + if (exchange == null) { return null; } - return oldest.exchangeId; + return exchange.getExchangeId(); } @Override public void processExchange(Exchange exchange) { - exchangesInFlightKeys.computeIfAbsent(exchange.getExchangeId(), id -> { - InFlightKey key = new InFlightKey(System.currentTimeMillis(), exchange.getExchangeId()); - exchangesInFlightStartTimestamps.put(key, key.timeStamp); - return key; - }); + exchangesInFlight.put(exchange.getExchangeId(), exchange); super.processExchange(exchange); } @Override public void completedExchange(Exchange exchange, long time) { - exchangesInFlightKeys.computeIfPresent(exchange.getExchangeId(), (id, key) -> { - exchangesInFlightStartTimestamps.remove(key); - return null; - }); + exchangesInFlight.remove(exchange.getExchangeId()); super.completedExchange(exchange, time); } @Override public void failedExchange(Exchange exchange) { - exchangesInFlightKeys.computeIfPresent(exchange.getExchangeId(), (id, key) -> { - exchangesInFlightStartTimestamps.remove(key); - return null; - }); + exchangesInFlight.remove(exchange.getExchangeId()); super.failedExchange(exchange); } - private static class InFlightKey implements Comparable<InFlightKey> { - - private final Long timeStamp; - private final String exchangeId; - - InFlightKey(Long timeStamp, String exchangeId) { - this.timeStamp = timeStamp; - this.exchangeId = exchangeId; - } - - @Override - public int compareTo(InFlightKey o) { - int compare = Long.compare(timeStamp, o.timeStamp); - if (compare == 0) { - return exchangeId.compareTo(o.exchangeId); - } - return compare; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - - InFlightKey that = (InFlightKey) o; - - if (!exchangeId.equals(that.exchangeId)) { - return false; - } - if (!timeStamp.equals(that.timeStamp)) { - return false; - } - - return true; - } - - @Override - public int hashCode() { - int result = timeStamp.hashCode(); - result = 31 * result + exchangeId.hashCode(); - return result; - } - - @Override - public String toString() { - return exchangeId; - } - } - /** * Used for sorting the processor mbeans accordingly to their index. */ http://git-wip-us.apache.org/repos/asf/camel/blob/ce4e8f7d/camel-core/src/main/java/org/apache/camel/util/MessageHelper.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/util/MessageHelper.java b/camel-core/src/main/java/org/apache/camel/util/MessageHelper.java index 311e987..050fecc 100644 --- a/camel-core/src/main/java/org/apache/camel/util/MessageHelper.java +++ b/camel-core/src/main/java/org/apache/camel/util/MessageHelper.java @@ -560,7 +560,7 @@ public final class MessageHelper { label = URISupport.sanitizeUri(exchange.getFromEndpoint().getEndpointUri()); } long elapsed = 0; - Date created = exchange.getProperty(Exchange.CREATED_TIMESTAMP, Date.class); + Date created = exchange.getCreated(); if (created != null) { elapsed = new StopWatch(created).taken(); } http://git-wip-us.apache.org/repos/asf/camel/blob/ce4e8f7d/camel-core/src/test/java/org/apache/camel/management/EventNotifierExchangeCompletedTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/management/EventNotifierExchangeCompletedTest.java b/camel-core/src/test/java/org/apache/camel/management/EventNotifierExchangeCompletedTest.java index 45e10a0..f37e7e0 100644 --- a/camel-core/src/test/java/org/apache/camel/management/EventNotifierExchangeCompletedTest.java +++ b/camel-core/src/test/java/org/apache/camel/management/EventNotifierExchangeCompletedTest.java @@ -85,7 +85,7 @@ public class EventNotifierExchangeCompletedTest extends ContextTestSupport { assertEquals("direct://start", event.getExchange().getFromEndpoint().getEndpointUri()); // grab the created timestamp - Date created = event.getExchange().getProperty(Exchange.CREATED_TIMESTAMP, Date.class); + Date created = event.getExchange().getCreated(); assertNotNull(created); // calculate elapsed time