CAMEL-8626: Fixed leaking exchangesInFlightKeys in ManagedRoute
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/0493c54d Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/0493c54d Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/0493c54d Branch: refs/heads/camel-2.15.x Commit: 0493c54d21288f89bbded8901b01fc584f4261a1 Parents: 72d297b Author: Claus Ibsen <davscl...@apache.org> Authored: Fri Apr 17 10:07:18 2015 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Fri Apr 17 10:07:35 2015 +0200 ---------------------------------------------------------------------- .../camel/management/mbean/ManagedRoute.java | 9 ++- .../ManagedRouteDirectWhileIssueLeakTest.java | 82 ++++++++++++++++++++ 2 files changed, 89 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/0493c54d/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java index 51ee576..d2a87d4 100644 --- a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java +++ b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java @@ -417,6 +417,7 @@ public class ManagedRoute extends ManagedPerformanceCounter implements TimerList @Override public void init(ManagementStrategy strategy) { + exchangesInFlightKeys.clear(); exchangesInFlightStartTimestamps.clear(); super.init(strategy); } @@ -424,8 +425,12 @@ public class ManagedRoute extends ManagedPerformanceCounter implements TimerList @Override public synchronized void processExchange(Exchange exchange) { InFlightKey key = new InFlightKey(System.currentTimeMillis(), exchange.getExchangeId()); - exchangesInFlightKeys.put(exchange.getExchangeId(), key); - exchangesInFlightStartTimestamps.put(key, key.timeStamp); + InFlightKey oldKey = exchangesInFlightKeys.putIfAbsent(exchange.getExchangeId(), key); + // we may already have the exchange being processed so only add to timestamp if its a new exchange + // for example when people call the same routes recursive + if (oldKey == null) { + exchangesInFlightStartTimestamps.put(key, key.timeStamp); + } super.processExchange(exchange); } http://git-wip-us.apache.org/repos/asf/camel/blob/0493c54d/camel-core/src/test/java/org/apache/camel/management/ManagedRouteDirectWhileIssueLeakTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/management/ManagedRouteDirectWhileIssueLeakTest.java b/camel-core/src/test/java/org/apache/camel/management/ManagedRouteDirectWhileIssueLeakTest.java new file mode 100644 index 0000000..1869959 --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/management/ManagedRouteDirectWhileIssueLeakTest.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.management; + +import java.util.Set; +import javax.management.MBeanServer; +import javax.management.ObjectName; + +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.builder.RouteBuilder; + +public class ManagedRouteDirectWhileIssueLeakTest extends ManagementTestSupport { + + public void testInflightLeak() throws Exception { + // JMX tests dont work well on AIX CI servers (hangs them) + if (isPlatform("aix")) { + return; + } + + MBeanServer mbeanServer = getMBeanServer(); + + getMockEndpoint("mock:result").expectedBodiesReceived("AAAA"); + + template.sendBodyAndHeader("direct:start", "", "counter", 4); + + assertMockEndpointsSatisfied(); + + // should not be any inflights + Set<ObjectName> set = mbeanServer.queryNames(new ObjectName("*:type=routes,*"), null); + assertEquals(1, set.size()); + ObjectName on = set.iterator().next(); + + Long inflight = (Long) mbeanServer.getAttribute(on, "ExchangesInflight"); + assertEquals(0, inflight.longValue()); + Long ts = (Long) mbeanServer.getAttribute(on, "OldestInflightDuration"); + assertNull(ts); + String id = (String) mbeanServer.getAttribute(on, "OldestInflightExchangeId"); + assertNull(id); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:start") + .choice().when(simple("${header.counter} > 0")) + .process(new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + String body = exchange.getIn().getBody(String.class); + body = "A" + body; + exchange.getIn().setBody(body); + + int counter = exchange.getIn().getHeader("counter", int.class); + counter = counter - 1; + exchange.getIn().setHeader("counter", counter); + } + }).to("direct:start") + .otherwise() + .to("mock:result") + .end(); + } + }; + } +} +