Updated Branches: refs/heads/master 68db90403 -> ca35a4d7b
CAMEL-6384: Added BacklogDebugger MBean for live debugging of Camel routes. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/ca35a4d7 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/ca35a4d7 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/ca35a4d7 Branch: refs/heads/master Commit: ca35a4d7b9ea4f5865816d37ac7e3052e09957c9 Parents: 68db904 Author: Claus Ibsen <davscl...@apache.org> Authored: Fri May 24 09:17:51 2013 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Fri May 24 09:17:51 2013 +0200 ---------------------------------------------------------------------- .../mbean/ManagedBacklogDebuggerMBean.java | 10 +- .../management/mbean/ManagedBacklogDebugger.java | 9 + .../processor/interceptor/BacklogDebugger.java | 118 +++++++++++---- .../camel/management/BacklogDebuggerTest.java | 72 +++++++++ 4 files changed, 174 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/ca35a4d7/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedBacklogDebuggerMBean.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedBacklogDebuggerMBean.java b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedBacklogDebuggerMBean.java index bd1c813..1df2e2c 100644 --- a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedBacklogDebuggerMBean.java +++ b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedBacklogDebuggerMBean.java @@ -50,6 +50,12 @@ public interface ManagedBacklogDebuggerMBean { @ManagedOperation(description = "Resume running from the suspended breakpoint at the given node id") void resumeBreakpoint(String nodeId); + @ManagedOperation(description = "Updates the message body on the suspended breakpoint at the given node id") + void setMessageBodyOnBreakpoint(String nodeId, String body); + + @ManagedOperation(description = "Updates/adds the message header on the suspended breakpoint at the given node id") + void setMessageHeaderOnBreakpoint(String nodeId, String headerName, String value); + @ManagedOperation(description = "Resume running any suspended breakpoints, and exits step mode") void resumeAll(); @@ -69,10 +75,10 @@ public interface ManagedBacklogDebuggerMBean { Set<String> getSuspendedBreakpointNodeIds(); @ManagedOperation(description = "Disables a breakpoint") - public void disableBreakpoint(String nodeId); + void disableBreakpoint(String nodeId); @ManagedOperation(description = "Enables a breakpoint which has been disabled") - public void enableBreakpoint(String nodeId); + void enableBreakpoint(String nodeId); @ManagedAttribute(description = "Number of maximum chars in the message body in the trace message. Use zero or negative value to have unlimited size.") int getBodyMaxChars(); http://git-wip-us.apache.org/repos/asf/camel/blob/ca35a4d7/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedBacklogDebugger.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedBacklogDebugger.java b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedBacklogDebugger.java index 03a8081..444e35d 100644 --- a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedBacklogDebugger.java +++ b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedBacklogDebugger.java @@ -19,6 +19,7 @@ package org.apache.camel.management.mbean; import java.util.Set; import org.apache.camel.CamelContext; +import org.apache.camel.api.management.ManagedOperation; import org.apache.camel.api.management.ManagedResource; import org.apache.camel.api.management.mbean.ManagedBacklogDebuggerMBean; import org.apache.camel.processor.interceptor.BacklogDebugger; @@ -87,6 +88,14 @@ public class ManagedBacklogDebugger implements ManagedBacklogDebuggerMBean { backlogDebugger.resumeBreakpoint(nodeId); } + public void setMessageBodyOnBreakpoint(String nodeId, String body) { + backlogDebugger.setMessageBodyOnBreakpoint(nodeId, body); + } + + public void setMessageHeaderOnBreakpoint(String nodeId, String headerName, String value) { + backlogDebugger.setMessageHeaderOnBreakpoint(nodeId, headerName, value); + } + public void resumeAll() { backlogDebugger.resumeAll(); } http://git-wip-us.apache.org/repos/asf/camel/blob/ca35a4d7/camel-core/src/main/java/org/apache/camel/processor/interceptor/BacklogDebugger.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/interceptor/BacklogDebugger.java b/camel-core/src/main/java/org/apache/camel/processor/interceptor/BacklogDebugger.java index dedc63c..1e331e7 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/interceptor/BacklogDebugger.java +++ b/camel-core/src/main/java/org/apache/camel/processor/interceptor/BacklogDebugger.java @@ -75,13 +75,38 @@ public class BacklogDebugger extends ServiceSupport implements InterceptStrategy private final AtomicLong debugCounter = new AtomicLong(0); private final Debugger debugger; private final ConcurrentMap<String, NodeBreakpoint> breakpoints = new ConcurrentHashMap<String, NodeBreakpoint>(); - private final ConcurrentMap<String, CountDownLatch> suspendedBreakpoints = new ConcurrentHashMap<String, CountDownLatch>(); + private final ConcurrentMap<String, SuspendedExchange> suspendedBreakpoints = new ConcurrentHashMap<String, SuspendedExchange>(); private final ConcurrentMap<String, BacklogTracerEventMessage> suspendedBreakpointMessages = new ConcurrentHashMap<String, BacklogTracerEventMessage>(); private volatile String singleStepExchangeId; private int bodyMaxChars = 128 * 1024; private boolean bodyIncludeStreams; private boolean bodyIncludeFiles = true; + /** + * A suspend {@link Exchange} at a breakpoint. + */ + private static final class SuspendedExchange { + private final Exchange exchange; + private final CountDownLatch latch; + + /** + * @param exchange the suspend exchange + * @param latch the latch to use to continue routing the exchange + */ + private SuspendedExchange(Exchange exchange, CountDownLatch latch) { + this.exchange = exchange; + this.latch = latch; + } + + public Exchange getExchange() { + return exchange; + } + + public CountDownLatch getLatch() { + return latch; + } + } + public BacklogDebugger(CamelContext camelContext) { this.camelContext = camelContext; DefaultDebugger debugger = new DefaultDebugger(camelContext); @@ -144,8 +169,8 @@ public class BacklogDebugger extends ServiceSupport implements InterceptStrategy // make sure to clear state and latches is counted down so we wont have hanging threads breakpoints.clear(); - for (CountDownLatch latch : suspendedBreakpoints.values()) { - latch.countDown(); + for (SuspendedExchange se : suspendedBreakpoints.values()) { + se.getLatch().countDown(); } suspendedBreakpoints.clear(); suspendedBreakpointMessages.clear(); @@ -198,13 +223,13 @@ public class BacklogDebugger extends ServiceSupport implements InterceptStrategy logger.log("Removing breakpoint " + nodeId); // when removing a break point then ensure latches is cleared and counted down so we wont have hanging threads suspendedBreakpointMessages.remove(nodeId); - CountDownLatch latch = suspendedBreakpoints.remove(nodeId); + SuspendedExchange se = suspendedBreakpoints.remove(nodeId); NodeBreakpoint breakpoint = breakpoints.remove(nodeId); if (breakpoint != null) { debugger.removeBreakpoint(breakpoint); } - if (latch != null) { - latch.countDown(); + if (se != null) { + se.getLatch().countDown(); } } @@ -228,9 +253,33 @@ public class BacklogDebugger extends ServiceSupport implements InterceptStrategy // remember to remove the dumped message as its no longer in need suspendedBreakpointMessages.remove(nodeId); - CountDownLatch latch = suspendedBreakpoints.remove(nodeId); - if (latch != null) { - latch.countDown(); + SuspendedExchange se = suspendedBreakpoints.remove(nodeId); + if (se != null) { + se.getLatch().countDown(); + } + } + + public void setMessageBodyOnBreakpoint(String nodeId, String body) { + SuspendedExchange se = suspendedBreakpoints.get(nodeId); + if (se != null) { + logger.log("Breakpoint at node " + nodeId + " is updating message body on exchangeId: " + se.getExchange().getExchangeId() + " with new body: " + body); + if (se.getExchange().hasOut()) { + se.getExchange().getOut().setBody(body); + } else { + se.getExchange().getIn().setBody(body); + } + } + } + + public void setMessageHeaderOnBreakpoint(String nodeId, String headerName, String value) { + SuspendedExchange se = suspendedBreakpoints.get(nodeId); + if (se != null) { + logger.log("Breakpoint at node " + nodeId + " is updating message header on exchangeId: " + se.getExchange().getExchangeId() + " with header: " + headerName + " and value: " + value); + if (se.getExchange().hasOut()) { + se.getExchange().getOut().setHeader(headerName, value); + } else { + se.getExchange().getIn().setHeader(headerName, value); + } } } @@ -242,9 +291,9 @@ public class BacklogDebugger extends ServiceSupport implements InterceptStrategy for (String node : getSuspendedBreakpointNodeIds()) { // remember to remove the dumped message as its no longer in need suspendedBreakpointMessages.remove(node); - CountDownLatch latch = suspendedBreakpoints.remove(node); - if (latch != null) { - latch.countDown(); + SuspendedExchange se = suspendedBreakpoints.remove(node); + if (se != null) { + se.getLatch().countDown(); } } } @@ -267,9 +316,9 @@ public class BacklogDebugger extends ServiceSupport implements InterceptStrategy for (String node : getSuspendedBreakpointNodeIds()) { // remember to remove the dumped message as its no longer in need suspendedBreakpointMessages.remove(node); - CountDownLatch latch = suspendedBreakpoints.remove(node); - if (latch != null) { - latch.countDown(); + SuspendedExchange se = suspendedBreakpoints.remove(node); + if (se != null) { + se.getLatch().countDown(); } } } @@ -392,20 +441,21 @@ public class BacklogDebugger extends ServiceSupport implements InterceptStrategy BacklogTracerEventMessage msg = new DefaultBacklogTracerEventMessage(uid, timestamp, routeId, toNode, exchangeId, messageAsXml); suspendedBreakpointMessages.put(nodeId, msg); - // mark as suspend - final CountDownLatch latch = suspendedBreakpoints.get(nodeId); - - // now wait until we should continue - logger.log("NodeBreakpoint at node " + toNode + " is waiting to continue for exchangeId: " + exchange.getExchangeId()); - try { - boolean hit = latch.await(fallbackTimeout, TimeUnit.SECONDS); - if (!hit) { - logger.log("NodeBreakpoint at node " + toNode + " timed out and is continued exchangeId: " + exchange.getExchangeId(), LoggingLevel.WARN); - } else { - logger.log("NodeBreakpoint at node " + toNode + " is continued exchangeId: " + exchange.getExchangeId()); + // suspend at this breakpoint + final SuspendedExchange se = suspendedBreakpoints.get(nodeId); + if (se != null) { + // now wait until we should continue + logger.log("NodeBreakpoint at node " + toNode + " is waiting to continue for exchangeId: " + exchange.getExchangeId()); + try { + boolean hit = se.getLatch().await(fallbackTimeout, TimeUnit.SECONDS); + if (!hit) { + logger.log("NodeBreakpoint at node " + toNode + " timed out and is continued exchangeId: " + exchange.getExchangeId(), LoggingLevel.WARN); + } else { + logger.log("NodeBreakpoint at node " + toNode + " is continued exchangeId: " + exchange.getExchangeId()); + } + } catch (InterruptedException e) { + // ignore } - } catch (InterruptedException e) { - // ignore } } @@ -422,7 +472,8 @@ public class BacklogDebugger extends ServiceSupport implements InterceptStrategy } // we only want to break one exchange at a time, so if there is already a suspended breakpoint then do not match - boolean existing = suspendedBreakpoints.putIfAbsent(nodeId, new CountDownLatch(1)) != null; + SuspendedExchange se = new SuspendedExchange(exchange, new CountDownLatch(1)); + boolean existing = suspendedBreakpoints.putIfAbsent(nodeId, se) != null; return !existing; } @@ -450,14 +501,14 @@ public class BacklogDebugger extends ServiceSupport implements InterceptStrategy BacklogTracerEventMessage msg = new DefaultBacklogTracerEventMessage(uid, timestamp, routeId, toNode, exchangeId, messageAsXml); suspendedBreakpointMessages.put(toNode, msg); - // mark as suspend - final CountDownLatch latch = new CountDownLatch(1); - suspendedBreakpoints.put(toNode, latch); + // suspend at this breakpoint + SuspendedExchange se = new SuspendedExchange(exchange, new CountDownLatch(1)); + suspendedBreakpoints.put(toNode, se); // now wait until we should continue logger.log("StepBreakpoint at node " + toNode + " is waiting to continue for exchangeId: " + exchange.getExchangeId()); try { - boolean hit = latch.await(fallbackTimeout, TimeUnit.SECONDS); + boolean hit = se.getLatch().await(fallbackTimeout, TimeUnit.SECONDS); if (!hit) { logger.log("StepBreakpoint at node " + toNode + " timed out and is continued exchangeId: " + exchange.getExchangeId(), LoggingLevel.WARN); } else { @@ -491,4 +542,5 @@ public class BacklogDebugger extends ServiceSupport implements InterceptStrategy } } } + } http://git-wip-us.apache.org/repos/asf/camel/blob/ca35a4d7/camel-core/src/test/java/org/apache/camel/management/BacklogDebuggerTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/management/BacklogDebuggerTest.java b/camel-core/src/test/java/org/apache/camel/management/BacklogDebuggerTest.java index 23b9aee..5583ea9 100644 --- a/camel-core/src/test/java/org/apache/camel/management/BacklogDebuggerTest.java +++ b/camel-core/src/test/java/org/apache/camel/management/BacklogDebuggerTest.java @@ -81,6 +81,78 @@ public class BacklogDebuggerTest extends ManagementTestSupport { } @SuppressWarnings("unchecked") + public void testBacklogDebuggerUpdateBodyAndHeader() throws Exception { + MBeanServer mbeanServer = getMBeanServer(); + ObjectName on = new ObjectName("org.apache.camel:context=localhost/camel-1,type=tracer,name=BacklogDebugger"); + assertNotNull(on); + mbeanServer.isRegistered(on); + + Boolean enabled = (Boolean) mbeanServer.getAttribute(on, "Enabled"); + assertEquals("Should not be enabled", Boolean.FALSE, enabled); + + // enable debugger + mbeanServer.invoke(on, "enableDebugger", null, null); + + enabled = (Boolean) mbeanServer.getAttribute(on, "Enabled"); + assertEquals("Should be enabled", Boolean.TRUE, enabled); + + // add breakpoint at bar + mbeanServer.invoke(on, "addBreakpoint", new Object[]{"foo"}, new String[]{"java.lang.String"}); + mbeanServer.invoke(on, "addBreakpoint", new Object[]{"bar"}, new String[]{"java.lang.String"}); + + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedMessageCount(0); + mock.setSleepForEmptyTest(1000); + + template.sendBody("seda:start", "Hello World"); + + assertMockEndpointsSatisfied(); + + // add breakpoint at bar + Set<String> nodes = (Set<String>) mbeanServer.invoke(on, "getSuspendedBreakpointNodeIds", null, null); + assertNotNull(nodes); + assertEquals(1, nodes.size()); + assertEquals("foo", nodes.iterator().next()); + + // update body and header + mbeanServer.invoke(on, "setMessageBodyOnBreakpoint", new Object[]{"foo", "Changed body"}, new String[]{"java.lang.String", "java.lang.String"}); + mbeanServer.invoke(on, "setMessageHeaderOnBreakpoint", new Object[]{"foo", "beer", "Carlsberg"}, new String[]{"java.lang.String", "java.lang.String", "java.lang.String"}); + + // resume breakpoint + mbeanServer.invoke(on, "resumeBreakpoint", new Object[]{"foo"}, new String[]{"java.lang.String"}); + + Thread.sleep(1000); + + // add breakpoint at bar + nodes = (Set<String>) mbeanServer.invoke(on, "getSuspendedBreakpointNodeIds", null, null); + assertNotNull(nodes); + assertEquals(1, nodes.size()); + assertEquals("bar", nodes.iterator().next()); + + // the message should be ours + String xml = (String) mbeanServer.invoke(on, "dumpTracedMessagesAsXml", new Object[]{"bar"}, new String[]{"java.lang.String"}); + assertNotNull(xml); + log.info(xml); + + assertTrue("Should contain our body", xml.contains("Changed body")); + assertTrue("Should contain bar node", xml.contains("<toNode>bar</toNode>")); + assertTrue("Should contain our added header", xml.contains("<header key=\"beer\" type=\"java.lang.String\">Carlsberg</header>")); + + resetMocks(); + mock.expectedMessageCount(1); + + // resume breakpoint + mbeanServer.invoke(on, "resumeBreakpoint", new Object[]{"bar"}, new String[]{"java.lang.String"}); + + assertMockEndpointsSatisfied(); + + // and no suspended anymore + nodes = (Set<String>) mbeanServer.invoke(on, "getSuspendedBreakpointNodeIds", null, null); + assertNotNull(nodes); + assertEquals(0, nodes.size()); + } + + @SuppressWarnings("unchecked") public void testBacklogDebuggerSuspendOnlyOneAtBreakpoint() throws Exception { MBeanServer mbeanServer = getMBeanServer(); ObjectName on = new ObjectName("org.apache.camel:context=localhost/camel-1,type=tracer,name=BacklogDebugger");