Updated Branches:
  refs/heads/master 68db90403 -> ca35a4d7b

CAMEL-6384: Added BacklogDebugger MBean for live debugging of Camel routes.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/ca35a4d7
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/ca35a4d7
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/ca35a4d7

Branch: refs/heads/master
Commit: ca35a4d7b9ea4f5865816d37ac7e3052e09957c9
Parents: 68db904
Author: Claus Ibsen <davscl...@apache.org>
Authored: Fri May 24 09:17:51 2013 +0200
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Fri May 24 09:17:51 2013 +0200

----------------------------------------------------------------------
 .../mbean/ManagedBacklogDebuggerMBean.java         |   10 +-
 .../management/mbean/ManagedBacklogDebugger.java   |    9 +
 .../processor/interceptor/BacklogDebugger.java     |  118 +++++++++++----
 .../camel/management/BacklogDebuggerTest.java      |   72 +++++++++
 4 files changed, 174 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/ca35a4d7/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedBacklogDebuggerMBean.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedBacklogDebuggerMBean.java
 
b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedBacklogDebuggerMBean.java
index bd1c813..1df2e2c 100644
--- 
a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedBacklogDebuggerMBean.java
+++ 
b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedBacklogDebuggerMBean.java
@@ -50,6 +50,12 @@ public interface ManagedBacklogDebuggerMBean {
     @ManagedOperation(description = "Resume running from the suspended 
breakpoint at the given node id")
     void resumeBreakpoint(String nodeId);
 
+    @ManagedOperation(description = "Updates the message body on the suspended 
breakpoint at the given node id")
+    void setMessageBodyOnBreakpoint(String nodeId, String body);
+
+    @ManagedOperation(description = "Updates/adds the message header on the 
suspended breakpoint at the given node id")
+    void setMessageHeaderOnBreakpoint(String nodeId, String headerName, String 
value);
+
     @ManagedOperation(description = "Resume running any suspended breakpoints, 
and exits step mode")
     void resumeAll();
 
@@ -69,10 +75,10 @@ public interface ManagedBacklogDebuggerMBean {
     Set<String> getSuspendedBreakpointNodeIds();
 
     @ManagedOperation(description = "Disables a breakpoint")
-    public void disableBreakpoint(String nodeId);
+    void disableBreakpoint(String nodeId);
 
     @ManagedOperation(description = "Enables a breakpoint which has been 
disabled")
-    public void enableBreakpoint(String nodeId);
+    void enableBreakpoint(String nodeId);
 
     @ManagedAttribute(description = "Number of maximum chars in the message 
body in the trace message. Use zero or negative value to have unlimited size.")
     int getBodyMaxChars();

http://git-wip-us.apache.org/repos/asf/camel/blob/ca35a4d7/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedBacklogDebugger.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedBacklogDebugger.java
 
b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedBacklogDebugger.java
index 03a8081..444e35d 100644
--- 
a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedBacklogDebugger.java
+++ 
b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedBacklogDebugger.java
@@ -19,6 +19,7 @@ package org.apache.camel.management.mbean;
 import java.util.Set;
 
 import org.apache.camel.CamelContext;
+import org.apache.camel.api.management.ManagedOperation;
 import org.apache.camel.api.management.ManagedResource;
 import org.apache.camel.api.management.mbean.ManagedBacklogDebuggerMBean;
 import org.apache.camel.processor.interceptor.BacklogDebugger;
@@ -87,6 +88,14 @@ public class ManagedBacklogDebugger implements 
ManagedBacklogDebuggerMBean {
         backlogDebugger.resumeBreakpoint(nodeId);
     }
 
+    public void setMessageBodyOnBreakpoint(String nodeId, String body) {
+        backlogDebugger.setMessageBodyOnBreakpoint(nodeId, body);
+    }
+
+    public void setMessageHeaderOnBreakpoint(String nodeId, String headerName, 
String value) {
+        backlogDebugger.setMessageHeaderOnBreakpoint(nodeId, headerName, 
value);
+    }
+
     public void resumeAll() {
         backlogDebugger.resumeAll();
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/ca35a4d7/camel-core/src/main/java/org/apache/camel/processor/interceptor/BacklogDebugger.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/processor/interceptor/BacklogDebugger.java
 
b/camel-core/src/main/java/org/apache/camel/processor/interceptor/BacklogDebugger.java
index dedc63c..1e331e7 100644
--- 
a/camel-core/src/main/java/org/apache/camel/processor/interceptor/BacklogDebugger.java
+++ 
b/camel-core/src/main/java/org/apache/camel/processor/interceptor/BacklogDebugger.java
@@ -75,13 +75,38 @@ public class BacklogDebugger extends ServiceSupport 
implements InterceptStrategy
     private final AtomicLong debugCounter = new AtomicLong(0);
     private final Debugger debugger;
     private final ConcurrentMap<String, NodeBreakpoint> breakpoints = new 
ConcurrentHashMap<String, NodeBreakpoint>();
-    private final ConcurrentMap<String, CountDownLatch> suspendedBreakpoints = 
new ConcurrentHashMap<String, CountDownLatch>();
+    private final ConcurrentMap<String, SuspendedExchange> 
suspendedBreakpoints = new ConcurrentHashMap<String, SuspendedExchange>();
     private final ConcurrentMap<String, BacklogTracerEventMessage> 
suspendedBreakpointMessages = new ConcurrentHashMap<String, 
BacklogTracerEventMessage>();
     private volatile String singleStepExchangeId;
     private int bodyMaxChars = 128 * 1024;
     private boolean bodyIncludeStreams;
     private boolean bodyIncludeFiles = true;
 
+    /**
+     * A suspend {@link Exchange} at a breakpoint.
+     */
+    private static final class SuspendedExchange {
+        private final Exchange exchange;
+        private final CountDownLatch latch;
+
+        /**
+         * @param exchange the suspend exchange
+         * @param latch    the latch to use to continue routing the exchange
+         */
+        private SuspendedExchange(Exchange exchange, CountDownLatch latch) {
+            this.exchange = exchange;
+            this.latch = latch;
+        }
+
+        public Exchange getExchange() {
+            return exchange;
+        }
+
+        public CountDownLatch getLatch() {
+            return latch;
+        }
+    }
+
     public BacklogDebugger(CamelContext camelContext) {
         this.camelContext = camelContext;
         DefaultDebugger debugger = new DefaultDebugger(camelContext);
@@ -144,8 +169,8 @@ public class BacklogDebugger extends ServiceSupport 
implements InterceptStrategy
 
         // make sure to clear state and latches is counted down so we wont 
have hanging threads
         breakpoints.clear();
-        for (CountDownLatch latch : suspendedBreakpoints.values()) {
-            latch.countDown();
+        for (SuspendedExchange se : suspendedBreakpoints.values()) {
+            se.getLatch().countDown();
         }
         suspendedBreakpoints.clear();
         suspendedBreakpointMessages.clear();
@@ -198,13 +223,13 @@ public class BacklogDebugger extends ServiceSupport 
implements InterceptStrategy
         logger.log("Removing breakpoint " + nodeId);
         // when removing a break point then ensure latches is cleared and 
counted down so we wont have hanging threads
         suspendedBreakpointMessages.remove(nodeId);
-        CountDownLatch latch = suspendedBreakpoints.remove(nodeId);
+        SuspendedExchange se = suspendedBreakpoints.remove(nodeId);
         NodeBreakpoint breakpoint = breakpoints.remove(nodeId);
         if (breakpoint != null) {
             debugger.removeBreakpoint(breakpoint);
         }
-        if (latch != null) {
-            latch.countDown();
+        if (se != null) {
+            se.getLatch().countDown();
         }
     }
 
@@ -228,9 +253,33 @@ public class BacklogDebugger extends ServiceSupport 
implements InterceptStrategy
 
         // remember to remove the dumped message as its no longer in need
         suspendedBreakpointMessages.remove(nodeId);
-        CountDownLatch latch = suspendedBreakpoints.remove(nodeId);
-        if (latch != null) {
-            latch.countDown();
+        SuspendedExchange se = suspendedBreakpoints.remove(nodeId);
+        if (se != null) {
+            se.getLatch().countDown();
+        }
+    }
+
+    public void setMessageBodyOnBreakpoint(String nodeId, String body) {
+        SuspendedExchange se = suspendedBreakpoints.get(nodeId);
+        if (se != null) {
+            logger.log("Breakpoint at node " + nodeId + " is updating message 
body on exchangeId: " + se.getExchange().getExchangeId() + " with new body: " + 
body);
+            if (se.getExchange().hasOut()) {
+                se.getExchange().getOut().setBody(body);
+            } else {
+                se.getExchange().getIn().setBody(body);
+            }
+        }
+    }
+
+    public void setMessageHeaderOnBreakpoint(String nodeId, String headerName, 
String value) {
+        SuspendedExchange se = suspendedBreakpoints.get(nodeId);
+        if (se != null) {
+            logger.log("Breakpoint at node " + nodeId + " is updating message 
header on exchangeId: " + se.getExchange().getExchangeId() + " with header: " + 
headerName + " and value: " + value);
+            if (se.getExchange().hasOut()) {
+                se.getExchange().getOut().setHeader(headerName, value);
+            } else {
+                se.getExchange().getIn().setHeader(headerName, value);
+            }
         }
     }
 
@@ -242,9 +291,9 @@ public class BacklogDebugger extends ServiceSupport 
implements InterceptStrategy
         for (String node : getSuspendedBreakpointNodeIds()) {
             // remember to remove the dumped message as its no longer in need
             suspendedBreakpointMessages.remove(node);
-            CountDownLatch latch = suspendedBreakpoints.remove(node);
-            if (latch != null) {
-                latch.countDown();
+            SuspendedExchange se = suspendedBreakpoints.remove(node);
+            if (se != null) {
+                se.getLatch().countDown();
             }
         }
     }
@@ -267,9 +316,9 @@ public class BacklogDebugger extends ServiceSupport 
implements InterceptStrategy
         for (String node : getSuspendedBreakpointNodeIds()) {
             // remember to remove the dumped message as its no longer in need
             suspendedBreakpointMessages.remove(node);
-            CountDownLatch latch = suspendedBreakpoints.remove(node);
-            if (latch != null) {
-                latch.countDown();
+            SuspendedExchange se = suspendedBreakpoints.remove(node);
+            if (se != null) {
+                se.getLatch().countDown();
             }
         }
     }
@@ -392,20 +441,21 @@ public class BacklogDebugger extends ServiceSupport 
implements InterceptStrategy
             BacklogTracerEventMessage msg = new 
DefaultBacklogTracerEventMessage(uid, timestamp, routeId, toNode, exchangeId, 
messageAsXml);
             suspendedBreakpointMessages.put(nodeId, msg);
 
-            // mark as suspend
-            final CountDownLatch latch = suspendedBreakpoints.get(nodeId);
-
-            // now wait until we should continue
-            logger.log("NodeBreakpoint at node " + toNode + " is waiting to 
continue for exchangeId: " + exchange.getExchangeId());
-            try {
-                boolean hit = latch.await(fallbackTimeout, TimeUnit.SECONDS);
-                if (!hit) {
-                    logger.log("NodeBreakpoint at node " + toNode + " timed 
out and is continued exchangeId: " + exchange.getExchangeId(), 
LoggingLevel.WARN);
-                } else {
-                    logger.log("NodeBreakpoint at node " + toNode + " is 
continued exchangeId: " + exchange.getExchangeId());
+            // suspend at this breakpoint
+            final SuspendedExchange se = suspendedBreakpoints.get(nodeId);
+            if (se != null) {
+                // now wait until we should continue
+                logger.log("NodeBreakpoint at node " + toNode + " is waiting 
to continue for exchangeId: " + exchange.getExchangeId());
+                try {
+                    boolean hit = se.getLatch().await(fallbackTimeout, 
TimeUnit.SECONDS);
+                    if (!hit) {
+                        logger.log("NodeBreakpoint at node " + toNode + " 
timed out and is continued exchangeId: " + exchange.getExchangeId(), 
LoggingLevel.WARN);
+                    } else {
+                        logger.log("NodeBreakpoint at node " + toNode + " is 
continued exchangeId: " + exchange.getExchangeId());
+                    }
+                } catch (InterruptedException e) {
+                    // ignore
                 }
-            } catch (InterruptedException e) {
-                // ignore
             }
         }
 
@@ -422,7 +472,8 @@ public class BacklogDebugger extends ServiceSupport 
implements InterceptStrategy
             }
 
             // we only want to break one exchange at a time, so if there is 
already a suspended breakpoint then do not match
-            boolean existing = suspendedBreakpoints.putIfAbsent(nodeId, new 
CountDownLatch(1)) != null;
+            SuspendedExchange se = new SuspendedExchange(exchange, new 
CountDownLatch(1));
+            boolean existing = suspendedBreakpoints.putIfAbsent(nodeId, se) != 
null;
             return !existing;
         }
 
@@ -450,14 +501,14 @@ public class BacklogDebugger extends ServiceSupport 
implements InterceptStrategy
             BacklogTracerEventMessage msg = new 
DefaultBacklogTracerEventMessage(uid, timestamp, routeId, toNode, exchangeId, 
messageAsXml);
             suspendedBreakpointMessages.put(toNode, msg);
 
-            // mark as suspend
-            final CountDownLatch latch = new CountDownLatch(1);
-            suspendedBreakpoints.put(toNode, latch);
+            // suspend at this breakpoint
+            SuspendedExchange se = new SuspendedExchange(exchange, new 
CountDownLatch(1));
+            suspendedBreakpoints.put(toNode, se);
 
             // now wait until we should continue
             logger.log("StepBreakpoint at node " + toNode + " is waiting to 
continue for exchangeId: " + exchange.getExchangeId());
             try {
-                boolean hit = latch.await(fallbackTimeout, TimeUnit.SECONDS);
+                boolean hit = se.getLatch().await(fallbackTimeout, 
TimeUnit.SECONDS);
                 if (!hit) {
                     logger.log("StepBreakpoint at node " + toNode + " timed 
out and is continued exchangeId: " + exchange.getExchangeId(), 
LoggingLevel.WARN);
                 } else {
@@ -491,4 +542,5 @@ public class BacklogDebugger extends ServiceSupport 
implements InterceptStrategy
             }
         }
     }
+
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/ca35a4d7/camel-core/src/test/java/org/apache/camel/management/BacklogDebuggerTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/management/BacklogDebuggerTest.java 
b/camel-core/src/test/java/org/apache/camel/management/BacklogDebuggerTest.java
index 23b9aee..5583ea9 100644
--- 
a/camel-core/src/test/java/org/apache/camel/management/BacklogDebuggerTest.java
+++ 
b/camel-core/src/test/java/org/apache/camel/management/BacklogDebuggerTest.java
@@ -81,6 +81,78 @@ public class BacklogDebuggerTest extends 
ManagementTestSupport {
     }
 
     @SuppressWarnings("unchecked")
+    public void testBacklogDebuggerUpdateBodyAndHeader() throws Exception {
+        MBeanServer mbeanServer = getMBeanServer();
+        ObjectName on = new 
ObjectName("org.apache.camel:context=localhost/camel-1,type=tracer,name=BacklogDebugger");
+        assertNotNull(on);
+        mbeanServer.isRegistered(on);
+
+        Boolean enabled = (Boolean) mbeanServer.getAttribute(on, "Enabled");
+        assertEquals("Should not be enabled", Boolean.FALSE, enabled);
+
+        // enable debugger
+        mbeanServer.invoke(on, "enableDebugger", null, null);
+
+        enabled = (Boolean) mbeanServer.getAttribute(on, "Enabled");
+        assertEquals("Should be enabled", Boolean.TRUE, enabled);
+
+        // add breakpoint at bar
+        mbeanServer.invoke(on, "addBreakpoint", new Object[]{"foo"}, new 
String[]{"java.lang.String"});
+        mbeanServer.invoke(on, "addBreakpoint", new Object[]{"bar"}, new 
String[]{"java.lang.String"});
+
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMessageCount(0);
+        mock.setSleepForEmptyTest(1000);
+
+        template.sendBody("seda:start", "Hello World");
+
+        assertMockEndpointsSatisfied();
+
+        // add breakpoint at bar
+        Set<String> nodes = (Set<String>) mbeanServer.invoke(on, 
"getSuspendedBreakpointNodeIds", null, null);
+        assertNotNull(nodes);
+        assertEquals(1, nodes.size());
+        assertEquals("foo", nodes.iterator().next());
+
+        // update body and header
+        mbeanServer.invoke(on, "setMessageBodyOnBreakpoint", new 
Object[]{"foo", "Changed body"}, new String[]{"java.lang.String", 
"java.lang.String"});
+        mbeanServer.invoke(on, "setMessageHeaderOnBreakpoint", new 
Object[]{"foo", "beer", "Carlsberg"}, new String[]{"java.lang.String", 
"java.lang.String", "java.lang.String"});
+
+        // resume breakpoint
+        mbeanServer.invoke(on, "resumeBreakpoint", new Object[]{"foo"}, new 
String[]{"java.lang.String"});
+
+        Thread.sleep(1000);
+
+        // add breakpoint at bar
+        nodes = (Set<String>) mbeanServer.invoke(on, 
"getSuspendedBreakpointNodeIds", null, null);
+        assertNotNull(nodes);
+        assertEquals(1, nodes.size());
+        assertEquals("bar", nodes.iterator().next());
+
+        // the message should be ours
+        String xml = (String) mbeanServer.invoke(on, 
"dumpTracedMessagesAsXml", new Object[]{"bar"}, new 
String[]{"java.lang.String"});
+        assertNotNull(xml);
+        log.info(xml);
+
+        assertTrue("Should contain our body", xml.contains("Changed body"));
+        assertTrue("Should contain bar node", 
xml.contains("<toNode>bar</toNode>"));
+        assertTrue("Should contain our added header", xml.contains("<header 
key=\"beer\" type=\"java.lang.String\">Carlsberg</header>"));
+
+        resetMocks();
+        mock.expectedMessageCount(1);
+
+        // resume breakpoint
+        mbeanServer.invoke(on, "resumeBreakpoint", new Object[]{"bar"}, new 
String[]{"java.lang.String"});
+
+        assertMockEndpointsSatisfied();
+
+        // and no suspended anymore
+        nodes = (Set<String>) mbeanServer.invoke(on, 
"getSuspendedBreakpointNodeIds", null, null);
+        assertNotNull(nodes);
+        assertEquals(0, nodes.size());
+    }
+
+    @SuppressWarnings("unchecked")
     public void testBacklogDebuggerSuspendOnlyOneAtBreakpoint() throws 
Exception {
         MBeanServer mbeanServer = getMBeanServer();
         ObjectName on = new 
ObjectName("org.apache.camel:context=localhost/camel-1,type=tracer,name=BacklogDebugger");

Reply via email to