Repository: camel Updated Branches: refs/heads/master 552fe3d1b -> 8f6d4adb0
Fixes CAMEL-8385: Add a OldestInflightDuration and OldestInflightExchangeId attribute to route MBeans Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/8f6d4adb Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/8f6d4adb Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/8f6d4adb Branch: refs/heads/master Commit: 8f6d4adb0161c524303e9bf7688c45976b89084a Parents: 552fe3d Author: Hiram Chirino <hi...@hiramchirino.com> Authored: Sat Feb 21 10:46:00 2015 -0500 Committer: Hiram Chirino <hi...@hiramchirino.com> Committed: Mon Mar 2 09:14:00 2015 -0500 ---------------------------------------------------------------------- .../api/management/mbean/ManagedRouteMBean.java | 7 ++ .../camel/management/mbean/ManagedRoute.java | 124 ++++++++++++++++++- .../ManagedInflightStatisticsTest.java | 113 +++++++++++++++++ 3 files changed, 239 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/8f6d4adb/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedRouteMBean.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedRouteMBean.java b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedRouteMBean.java index 148c688..7083a0f 100644 --- a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedRouteMBean.java +++ b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedRouteMBean.java @@ -114,4 +114,11 @@ public interface ManagedRouteMBean extends ManagedPerformanceCounterMBean { @ManagedOperation(description = "Returns the JSON representation of all the static endpoints (and possible dynamic) defined in this route") String createRouteStaticEndpointJson(boolean includeDynamic); + @ManagedAttribute(description = "Oldest inflight exchange duration") + Long getOldestInflightDuration(); + + @ManagedAttribute(description = "Oldest inflight exchange id") + String getOldestInflightExchangeId(); + + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/8f6d4adb/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java index d186188..fba8b62 100644 --- a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java +++ b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java @@ -23,6 +23,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.TimeUnit; import javax.management.AttributeValueExp; import javax.management.MBeanServer; @@ -33,6 +35,7 @@ import javax.management.QueryExp; import javax.management.StringValueExp; import org.apache.camel.CamelContext; +import org.apache.camel.Exchange; import org.apache.camel.ManagementStatisticsLevel; import org.apache.camel.Route; import org.apache.camel.ServiceStatus; @@ -43,6 +46,7 @@ import org.apache.camel.api.management.mbean.ManagedRouteMBean; import org.apache.camel.model.ModelCamelContext; import org.apache.camel.model.ModelHelper; import org.apache.camel.model.RouteDefinition; +import org.apache.camel.spi.ManagementStrategy; import org.apache.camel.spi.RoutePolicy; import org.apache.camel.util.ObjectHelper; @@ -53,6 +57,8 @@ public class ManagedRoute extends ManagedPerformanceCounter implements TimerList protected final String description; protected final ModelCamelContext context; private final LoadTriplet load = new LoadTriplet(); + private final ConcurrentSkipListMap<InFlightKey, Long> exchangesInFlightStartTimestamps = new ConcurrentSkipListMap<InFlightKey, Long>(); + private final ConcurrentHashMap<String, InFlightKey> exchangesInFlightKeys = new ConcurrentHashMap<String, InFlightKey>(); public ManagedRoute(ModelCamelContext context, Route route) { this.route = route; @@ -178,7 +184,7 @@ public class ManagedRoute extends ManagedPerformanceCounter implements TimerList public void onTimer() { load.update(getInflightExchanges()); } - + public void start() throws Exception { if (!context.getStatus().isStarted()) { throw new IllegalArgumentException("CamelContext is not started"); @@ -211,7 +217,7 @@ public class ManagedRoute extends ManagedPerformanceCounter implements TimerList if (!context.getStatus().isStarted()) { throw new IllegalArgumentException("CamelContext is not started"); } - String routeId = getRouteId(); + String routeId = getRouteId(); context.stopRoute(routeId); context.removeRoute(routeId); } @@ -220,7 +226,7 @@ public class ManagedRoute extends ManagedPerformanceCounter implements TimerList if (!context.getStatus().isStarted()) { throw new IllegalArgumentException("CamelContext is not started"); } - String routeId = getRouteId(); + String routeId = getRouteId(); context.stopRoute(routeId, timeout, TimeUnit.SECONDS); context.removeRoute(routeId); } @@ -329,6 +335,15 @@ public class ManagedRoute extends ManagedPerformanceCounter implements TimerList String stat = dumpStatsAsXml(fullStats); answer.append(" exchangesInflight=\"").append(getInflightExchanges()).append("\""); answer.append(" selfProcessingTime=\"").append(routeSelfTime).append("\""); + answer.append(" exchangesInflight=\"").append(getInflightExchanges()).append("\""); + InFlightKey oldestInflightEntry = getOldestInflightEntry(); + if (oldestInflightEntry != null) { + answer.append(" oldestInflightExchangeId=\"\""); + answer.append(" oldestInflightDuration=\"\""); + } else { + answer.append(" oldestInflightExchangeId=\"").append(oldestInflightEntry.exchangeId).append("\""); + answer.append(" oldestInflightDuration=\"").append(System.currentTimeMillis() - oldestInflightEntry.timeStamp).append("\""); + } answer.append(" ").append(stat.substring(7, stat.length() - 2)).append(">\n"); if (includeProcessors) { @@ -369,7 +384,7 @@ public class ManagedRoute extends ManagedPerformanceCounter implements TimerList @Override public boolean equals(Object o) { - return this == o || (o != null && getClass() == o.getClass() && route.equals(((ManagedRoute)o).route)); + return this == o || (o != null && getClass() == o.getClass() && route.equals(((ManagedRoute) o).route)); } @Override @@ -377,6 +392,106 @@ public class ManagedRoute extends ManagedPerformanceCounter implements TimerList return route.hashCode(); } + private InFlightKey getOldestInflightEntry() { + Map.Entry<InFlightKey, Long> entry = exchangesInFlightStartTimestamps.firstEntry(); + if (entry != null) { + return entry.getKey(); + } + return null; + } + + public Long getOldestInflightDuration() { + InFlightKey oldest = getOldestInflightEntry(); + if (oldest == null) { + return null; + } + return System.currentTimeMillis() - oldest.timeStamp; + } + + public String getOldestInflightExchangeId() { + InFlightKey oldest = getOldestInflightEntry(); + if (oldest == null) { + return null; + } + return oldest.exchangeId; + } + + @Override + public void init(ManagementStrategy strategy) { + super.init(strategy); + exchangesInFlightStartTimestamps.clear(); + } + + @Override + public synchronized void processExchange(Exchange exchange) { + InFlightKey key = new InFlightKey(System.currentTimeMillis(), exchange.getExchangeId()); + exchangesInFlightKeys.put(exchange.getExchangeId(), key); + exchangesInFlightStartTimestamps.put(key, key.timeStamp); + super.processExchange(exchange); + } + + @Override + public synchronized void completedExchange(Exchange exchange, long time) { + InFlightKey key = exchangesInFlightKeys.remove(exchange.getExchangeId()); + if (key != null) { + exchangesInFlightStartTimestamps.remove(key); + } + super.completedExchange(exchange, time); + } + + private static class InFlightKey implements Comparable<InFlightKey> { + + private final Long timeStamp; + private final String exchangeId; + + InFlightKey(Long timeStamp, String exchangeId) { + this.exchangeId = exchangeId; + this.timeStamp = timeStamp; + } + + @Override + public int compareTo(InFlightKey o) { + int compare = Long.compare(timeStamp, o.timeStamp); + if (compare == 0) { + return exchangeId.compareTo(o.exchangeId); + } + return compare; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + InFlightKey that = (InFlightKey) o; + + if (!exchangeId.equals(that.exchangeId)) { + return false; + } + if (!timeStamp.equals(that.timeStamp)) { + return false; + } + + return true; + } + + @Override + public int hashCode() { + int result = timeStamp.hashCode(); + result = 31 * result + exchangeId.hashCode(); + return result; + } + + @Override + public String toString() { + return exchangeId; + } + } + /** * Used for sorting the processor mbeans accordingly to their index. */ @@ -387,5 +502,4 @@ public class ManagedRoute extends ManagedPerformanceCounter implements TimerList return o1.getIndex().compareTo(o2.getIndex()); } } - } http://git-wip-us.apache.org/repos/asf/camel/blob/8f6d4adb/camel-core/src/test/java/org/apache/camel/management/ManagedInflightStatisticsTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/management/ManagedInflightStatisticsTest.java b/camel-core/src/test/java/org/apache/camel/management/ManagedInflightStatisticsTest.java new file mode 100644 index 0000000..9807a44 --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/management/ManagedInflightStatisticsTest.java @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.management; + +import java.util.Set; +import javax.management.MBeanServer; +import javax.management.ObjectName; + +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; + +/** + * @version + */ +public class ManagedInflightStatisticsTest extends ManagementTestSupport { + + public void testManageStatistics() throws Exception { + // JMX tests dont work well on AIX CI servers (hangs them) + if (isPlatform("aix")) { + return; + } + + // get the stats for the route + MBeanServer mbeanServer = getMBeanServer(); + + Set<ObjectName> set = mbeanServer.queryNames(new ObjectName("*:type=routes,*"), null); + assertEquals(1, set.size()); + ObjectName on = set.iterator().next(); + + Long inflight = (Long) mbeanServer.getAttribute(on, "ExchangesInflight"); + assertEquals(0, inflight.longValue()); + Long ts = (Long) mbeanServer.getAttribute(on, "OldestInflightDuration"); + assertNull(ts); + String id = (String) mbeanServer.getAttribute(on, "OldestInflightExchangeId"); + assertNull(id); + + MockEndpoint result = getMockEndpoint("mock:result"); + result.expectedMessageCount(2); + + // start some exchanges. + template.asyncSendBody("direct:start", 1000L); + Thread.sleep(500); + template.asyncSendBody("direct:start", 1000L); + Thread.sleep(100); + + inflight = (Long) mbeanServer.getAttribute(on, "ExchangesInflight"); + assertEquals(2, inflight.longValue()); + + ts = (Long) mbeanServer.getAttribute(on, "OldestInflightDuration"); + assertNotNull(ts); + id = (String) mbeanServer.getAttribute(on, "OldestInflightExchangeId"); + assertNotNull(id); + + // Lets wait for the first exchange to complete. + Thread.sleep(500); + Long ts2 = (Long) mbeanServer.getAttribute(on, "OldestInflightDuration"); + assertNotNull(ts2); + String id2 = (String) mbeanServer.getAttribute(on, "OldestInflightExchangeId"); + assertNotNull(id2); + + // Lets verify the oldest changed. + assertTrue(!id2.equals(id)); + assertTrue(ts2 > ts); + + // Lets wait for all the exchanges to complete. + Thread.sleep(500); + + assertMockEndpointsSatisfied(); + + inflight = (Long) mbeanServer.getAttribute(on, "ExchangesInflight"); + assertEquals(0, inflight.longValue()); + ts = (Long) mbeanServer.getAttribute(on, "OldestInflightDuration"); + assertNull(ts); + id = (String) mbeanServer.getAttribute(on, "OldestInflightExchangeId"); + assertNull(id); + + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:start") + .process(new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + Long delay = (Long) exchange.getIn().getBody(); + Thread.sleep(delay.longValue()); + } + }) + .to("mock:result").id("mock"); + } + }; + } + +} \ No newline at end of file