CAMEL-11354: Optimize oldest exchange inflight per route
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/ffd97f65 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/ffd97f65 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/ffd97f65 Branch: refs/heads/master Commit: ffd97f652c00e2e83bb64fa8aaccd62b2f412a5f Parents: ce4e8f7 Author: Claus Ibsen <davscl...@apache.org> Authored: Sun May 28 20:24:19 2017 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Sun May 28 20:24:19 2017 +0200 ---------------------------------------------------------------------- .../camel/impl/DefaultInflightRepository.java | 3 +- .../camel/management/mbean/ManagedRoute.java | 53 +++++------- .../ManagedInflightStatisticsTest.java | 88 +++++--------------- 3 files changed, 43 insertions(+), 101 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/ffd97f65/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java index 2c6c78b..f176861 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java +++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java @@ -128,7 +128,8 @@ public class DefaultInflightRepository extends ServiceSupport implements Infligh public int compare(Exchange e1, Exchange e2) { long d1 = getExchangeDuration(e1); long d2 = getExchangeDuration(e2); - return Long.compare(d1, d2); + // need the biggest number first + return -1 * Long.compare(d1, d2); } }); } else { http://git-wip-us.apache.org/repos/asf/camel/blob/ffd97f65/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java index 1b031b1..81f4794 100644 --- a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java +++ b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java @@ -19,6 +19,7 @@ package org.apache.camel.management.mbean; import java.io.ByteArrayInputStream; import java.io.InputStream; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.Date; @@ -36,6 +37,7 @@ import javax.management.Query; import javax.management.QueryExp; import javax.management.StringValueExp; +import org.apache.camel.spi.InflightRepository; import org.w3c.dom.Document; import org.apache.camel.CamelContext; @@ -408,14 +410,13 @@ public class ManagedRoute extends ManagedPerformanceCounter implements TimerList String stat = dumpStatsAsXml(fullStats); answer.append(" exchangesInflight=\"").append(getInflightExchanges()).append("\""); answer.append(" selfProcessingTime=\"").append(routeSelfTime).append("\""); - Exchange oldest = getOldestInflightEntry(); + InflightRepository.InflightExchange oldest = getOldestInflightEntry(); if (oldest == null) { answer.append(" oldestInflightExchangeId=\"\""); answer.append(" oldestInflightDuration=\"\""); } else { - long duration = System.currentTimeMillis() - oldest.getCreated().getTime(); - answer.append(" oldestInflightExchangeId=\"").append(oldest.getExchangeId()).append("\""); - answer.append(" oldestInflightDuration=\"").append(duration).append("\""); + answer.append(" oldestInflightExchangeId=\"").append(oldest.getExchange().getExchangeId()).append("\""); + answer.append(" oldestInflightDuration=\"").append(oldest.getDuration()).append("\""); } answer.append(" ").append(stat.substring(7, stat.length() - 2)).append(">\n"); @@ -465,47 +466,31 @@ public class ManagedRoute extends ManagedPerformanceCounter implements TimerList return route.hashCode(); } - private Exchange getOldestInflightEntry() { - return exchangesInFlight.values().stream().max(Comparator.comparing(Exchange::getCreated)).orElse(null); + private InflightRepository.InflightExchange getOldestInflightEntry() { + Collection<InflightRepository.InflightExchange> list = getContext().getInflightRepository().browse(getRouteId(), 1, true); + if (list.size() == 1) { + return list.iterator().next(); + } else { + return null; + } } public Long getOldestInflightDuration() { - Exchange exchange = getOldestInflightEntry(); - if (exchange == null) { + InflightRepository.InflightExchange oldest = getOldestInflightEntry(); + if (oldest == null) { return null; - } - Date created = exchange.getCreated(); - if (created != null) { - return System.currentTimeMillis() - created.getTime(); } else { - return null; + return oldest.getDuration(); } } public String getOldestInflightExchangeId() { - Exchange exchange = getOldestInflightEntry(); - if (exchange == null) { + InflightRepository.InflightExchange oldest = getOldestInflightEntry(); + if (oldest == null) { return null; + } else { + return oldest.getExchange().getExchangeId(); } - return exchange.getExchangeId(); - } - - @Override - public void processExchange(Exchange exchange) { - exchangesInFlight.put(exchange.getExchangeId(), exchange); - super.processExchange(exchange); - } - - @Override - public void completedExchange(Exchange exchange, long time) { - exchangesInFlight.remove(exchange.getExchangeId()); - super.completedExchange(exchange, time); - } - - @Override - public void failedExchange(Exchange exchange) { - exchangesInFlight.remove(exchange.getExchangeId()); - super.failedExchange(exchange); } /** http://git-wip-us.apache.org/repos/asf/camel/blob/ffd97f65/camel-core/src/test/java/org/apache/camel/management/ManagedInflightStatisticsTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/management/ManagedInflightStatisticsTest.java b/camel-core/src/test/java/org/apache/camel/management/ManagedInflightStatisticsTest.java index aa6fbe7..c8fd5ca 100644 --- a/camel-core/src/test/java/org/apache/camel/management/ManagedInflightStatisticsTest.java +++ b/camel-core/src/test/java/org/apache/camel/management/ManagedInflightStatisticsTest.java @@ -16,7 +16,10 @@ */ package org.apache.camel.management; +import java.util.Objects; import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import javax.management.MBeanServer; import javax.management.ObjectName; @@ -30,7 +33,7 @@ import org.apache.camel.component.mock.MockEndpoint; */ public class ManagedInflightStatisticsTest extends ManagementTestSupport { - public void testManageStatistics() throws Exception { + public void testOldestInflight() throws Exception { // JMX tests dont work well on AIX CI servers (hangs them) if (isPlatform("aix")) { return; @@ -53,10 +56,13 @@ public class ManagedInflightStatisticsTest extends ManagementTestSupport { MockEndpoint result = getMockEndpoint("mock:result"); result.expectedMessageCount(2); + CountDownLatch latch1 = new CountDownLatch(1); + CountDownLatch latch2 = new CountDownLatch(1); + // start some exchanges. - template.asyncSendBody("direct:start", 1000L); + template.asyncSendBody("direct:start", latch1); Thread.sleep(500); - template.asyncSendBody("direct:start", 1000L); + template.asyncSendBody("direct:start", latch2); Thread.sleep(100); inflight = (Long) mbeanServer.getAttribute(on, "ExchangesInflight"); @@ -67,77 +73,32 @@ public class ManagedInflightStatisticsTest extends ManagementTestSupport { id = (String) mbeanServer.getAttribute(on, "OldestInflightExchangeId"); assertNotNull(id); + log.info("Oldest Exchange id: {}, duration: {}", id, ts); + + // complete first exchange + latch1.countDown(); + // Lets wait for the first exchange to complete. - Thread.sleep(500); + Thread.sleep(200); Long ts2 = (Long) mbeanServer.getAttribute(on, "OldestInflightDuration"); assertNotNull(ts2); String id2 = (String) mbeanServer.getAttribute(on, "OldestInflightExchangeId"); assertNotNull(id2); + log.info("Oldest Exchange id: {}, duration: {}", id2, ts2); + // Lets verify the oldest changed. assertTrue(!id2.equals(id)); // The duration values could be different - //assertTrue(ts2 > ts); - - // Lets wait for all the exchanges to complete. - Thread.sleep(500); - - assertMockEndpointsSatisfied(); - - inflight = (Long) mbeanServer.getAttribute(on, "ExchangesInflight"); - assertEquals(0, inflight.longValue()); - ts = (Long) mbeanServer.getAttribute(on, "OldestInflightDuration"); - assertNull(ts); - id = (String) mbeanServer.getAttribute(on, "OldestInflightExchangeId"); - assertNull(id); - } - - public void testManageStatisticsFailed() throws Exception { - // JMX tests dont work well on AIX CI servers (hangs them) - if (isPlatform("aix")) { - return; - } - - // get the stats for the route - MBeanServer mbeanServer = getMBeanServer(); + assertTrue(!Objects.equals(ts2, ts)); - Set<ObjectName> set = mbeanServer.queryNames(new ObjectName("*:type=routes,*"), null); - assertEquals(1, set.size()); - ObjectName on = set.iterator().next(); + latch2.countDown(); - Long inflight = (Long) mbeanServer.getAttribute(on, "ExchangesInflight"); - assertEquals(0, inflight.longValue()); - Long ts = (Long) mbeanServer.getAttribute(on, "OldestInflightDuration"); - assertNull(ts); - String id = (String) mbeanServer.getAttribute(on, "OldestInflightExchangeId"); - assertNull(id); - - MockEndpoint result = getMockEndpoint("mock:result"); - result.expectedMessageCount(1); - - // start some exchanges. - template.asyncSendBody("direct:start", 1000L); + // Lets wait for all the exchanges to complete. Thread.sleep(500); - try { - template.sendBody("direct:start", "Kaboom"); - fail("Should have thrown exception"); - } catch (Exception e) { - // expected - } - - inflight = (Long) mbeanServer.getAttribute(on, "ExchangesInflight"); - assertEquals(1, inflight.longValue()); - - ts = (Long) mbeanServer.getAttribute(on, "OldestInflightDuration"); - assertNotNull(ts); - id = (String) mbeanServer.getAttribute(on, "OldestInflightExchangeId"); - assertNotNull(id); assertMockEndpointsSatisfied(); - // Lets wait for all the exchanges to complete. - Thread.sleep(500); - inflight = (Long) mbeanServer.getAttribute(on, "ExchangesInflight"); assertEquals(0, inflight.longValue()); ts = (Long) mbeanServer.getAttribute(on, "OldestInflightDuration"); @@ -155,13 +116,8 @@ public class ManagedInflightStatisticsTest extends ManagementTestSupport { .process(new Processor() { @Override public void process(Exchange exchange) throws Exception { - String body = exchange.getIn().getBody(String.class); - if ("Kaboom".equals(body)) { - throw new IllegalArgumentException("Forced"); - } - - Long delay = (Long) exchange.getIn().getBody(); - Thread.sleep(delay.longValue()); + CountDownLatch latch = (CountDownLatch) exchange.getIn().getBody(); + latch.await(10, TimeUnit.SECONDS); } }) .to("mock:result").id("mock");