This is an automated email from the ASF dual-hosted git repository.

nfilotto pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new ad751863775 CAMEL-18217: debugger - Allow to suspend messages (#7867)
ad751863775 is described below

commit ad75186377554ec6eb55ea206e12b24fd8371495
Author: Nicolas Filotto <[email protected]>
AuthorDate: Thu Jun 23 16:02:50 2022 +0200

    CAMEL-18217: debugger - Allow to suspend messages (#7867)
    
    ## Motivation
    
    Up to now when using the BacklogDebugger there is no easy way to suspend 
all messages before ensuring that all breakpoints have been defined. This can 
particularly be a problem when debugging an application with a very short 
lifetime like a test.
    
    ## Modifications
    
    * Define a system variable `CAMEL_DEBUGGER_SUSPEND` to suspend the message 
processing at startup
    * Add methods `attach` and `detach` to respectively resume and suspend 
message processing when `CAMEL_DEBUGGER_SUSPEND` is set to `true`
    * Add `junit-pioneer` as test dependency to be able to set the value of an 
environment variable within the context of a Unit test.
    * Add some doc to describe it
---
 camel-dependencies/pom.xml                         |   1 +
 .../camel/impl/debugger/BacklogDebugger.java       | 104 +++++++++++++++++++--
 .../camel/impl/engine/CamelInternalProcessor.java  |  10 +-
 .../mbean/ManagedBacklogDebuggerMBean.java         |   6 ++
 core/camel-management/pom.xml                      |   5 +
 .../management/mbean/ManagedBacklogDebugger.java   |  10 ++
 .../camel/management/BacklogDebuggerTest.java      |  47 ++++++++++
 docs/user-manual/modules/ROOT/pages/debugger.adoc  |   7 +-
 parent/pom.xml                                     |   6 ++
 9 files changed, 180 insertions(+), 16 deletions(-)

diff --git a/camel-dependencies/pom.xml b/camel-dependencies/pom.xml
index 822dcd37cef..a9b2b7b1744 100644
--- a/camel-dependencies/pom.xml
+++ b/camel-dependencies/pom.xml
@@ -355,6 +355,7 @@
     <jt400-version>11.0</jt400-version>
     <jta-api-1.2-version>1.2</jta-api-1.2-version>
     <junit-jupiter-version>5.8.2</junit-jupiter-version>
+    <junit-pioneer-version>1.7.1</junit-pioneer-version>
     <junit-toolbox-version>2.4</junit-toolbox-version>
     <junit-version>4.13.2</junit-version>
     <jxmpp-version>0.6.4</jxmpp-version>
diff --git 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/BacklogDebugger.java
 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/BacklogDebugger.java
index a7e92b902c3..246bb05bd27 100644
--- 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/BacklogDebugger.java
+++ 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/BacklogDebugger.java
@@ -24,6 +24,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
@@ -44,6 +45,7 @@ import org.apache.camel.support.CamelContextHelper;
 import org.apache.camel.support.MessageHelper;
 import org.apache.camel.support.service.ServiceHelper;
 import org.apache.camel.support.service.ServiceSupport;
+import org.apache.camel.util.StopWatch;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -61,6 +63,11 @@ import org.slf4j.LoggerFactory;
  */
 public final class BacklogDebugger extends ServiceSupport {
 
+    /**
+     * The name of the environment variable that contains the value of the 
flag indicating whether the
+     * {@code BacklogDebugger} should suspend processing the messages and wait 
for a debugger to attach or not.
+     */
+    public static final String SUSPEND_MODE_ENV_VAR_NAME = 
"CAMEL_DEBUGGER_SUSPEND";
     private static final Logger LOG = 
LoggerFactory.getLogger(BacklogDebugger.class);
 
     private long fallbackTimeout = 300;
@@ -73,6 +80,15 @@ public final class BacklogDebugger extends ServiceSupport {
     private final ConcurrentMap<String, NodeBreakpoint> breakpoints = new 
ConcurrentHashMap<>();
     private final ConcurrentMap<String, SuspendedExchange> 
suspendedBreakpoints = new ConcurrentHashMap<>();
     private final ConcurrentMap<String, BacklogTracerEventMessage> 
suspendedBreakpointMessages = new ConcurrentHashMap<>();
+    /**
+     * Indicates whether the <i>suspend mode</i> is enabled or not.
+     */
+    private final boolean suspendMode;
+    /**
+     * The reference to the {@code CountDownLatch} used to suspend Camel from 
processing the incoming messages when the
+     * <i>suspend mode</i> is enabled.
+     */
+    private final AtomicReference<CountDownLatch> suspend = new 
AtomicReference<>();
     private volatile String singleStepExchangeId;
     private int bodyMaxChars = 128 * 1024;
     private boolean bodyIncludeStreams;
@@ -103,19 +119,31 @@ public final class BacklogDebugger extends ServiceSupport 
{
         }
     }
 
-    private BacklogDebugger(CamelContext camelContext) {
+    /**
+     * Constructs a {@code BacklogDebugger} with the given parameters.
+     *
+     * @param camelContext the camel context
+     * @param suspendMode  Indicates whether the <i>suspend mode</i> is 
enabled or not. If {@code true} the message
+     *                     processing is immediately suspended until the 
{@link #attach()} is called.
+     */
+    private BacklogDebugger(CamelContext camelContext, boolean suspendMode) {
         this.camelContext = camelContext;
         this.debugger = new DefaultDebugger(camelContext);
+        this.suspendMode = suspendMode;
+        detach();
     }
 
     /**
      * Creates a new backlog debugger.
+     * <p>
+     * In case the environment variable {@link #SUSPEND_MODE_ENV_VAR_NAME} has 
been set to {@code true}, the message
+     * processing is directly suspended.
      *
      * @param  context Camel context
      * @return         a new backlog debugger
      */
     public static BacklogDebugger createDebugger(CamelContext context) {
-        return new BacklogDebugger(context);
+        return new BacklogDebugger(context, 
Boolean.parseBoolean(System.getenv(SUSPEND_MODE_ENV_VAR_NAME)));
     }
 
     /**
@@ -169,6 +197,65 @@ public final class BacklogDebugger extends ServiceSupport {
         return singleStepExchangeId != null;
     }
 
+    /**
+     * Attach the debugger which will resume the message processing in case 
the <i>suspend mode</i> is enabled. Do
+     * nothing otherwise.
+     */
+    public void attach() {
+        if (suspendMode) {
+            logger.log("A debugger has been attached");
+            resumeMessageProcessing();
+        }
+    }
+
+    /**
+     * Detach the debugger which will suspend the message processing in case 
the <i>suspend mode</i> is enabled. Do
+     * nothing otherwise.
+     */
+    public void detach() {
+        if (suspendMode) {
+            logger.log("Waiting for a debugger to attach");
+            suspendMessageProcessing();
+        }
+    }
+
+    /**
+     * Suspend the current thread if the <i>suspend mode</i> is enabled as 
long as the method {@link #attach()} is not
+     * called. Do nothing otherwise.
+     */
+    private void suspendIfNeeded() {
+        final CountDownLatch countDownLatch = suspend.get();
+        if (countDownLatch != null) {
+            logger.log("Incoming message suspended");
+            try {
+                countDownLatch.await();
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+            }
+        }
+    }
+
+    /**
+     * Make Camel suspend processing incoming messages.
+     */
+    private void suspendMessageProcessing() {
+        suspend.compareAndSet(null, new CountDownLatch(1));
+    }
+
+    /**
+     * Resume the processing of the incoming messages.
+     */
+    private void resumeMessageProcessing() {
+        for (;;) {
+            final CountDownLatch countDownLatch = suspend.get();
+            if (countDownLatch == null) {
+                break;
+            } else if (suspend.compareAndSet(countDownLatch, null)) {
+                countDownLatch.countDown();
+            }
+        }
+    }
+
     public void addBreakpoint(String nodeId) {
         NodeBreakpoint breakpoint = breakpoints.get(nodeId);
         if (breakpoint == null) {
@@ -454,13 +541,18 @@ public final class BacklogDebugger extends ServiceSupport 
{
         debugCounter.set(0);
     }
 
-    public boolean beforeProcess(Exchange exchange, Processor processor, 
NamedNode definition) {
-        return debugger.beforeProcess(exchange, processor, definition);
+    public StopWatch beforeProcess(Exchange exchange, Processor processor, 
NamedNode definition) {
+        suspendIfNeeded();
+        if (isEnabled() && (hasBreakpoint(definition.getId()) || 
isSingleStepMode())) {
+            StopWatch watch = new StopWatch();
+            debugger.beforeProcess(exchange, processor, definition);
+            return watch;
+        }
+        return null;
     }
 
-    public boolean afterProcess(Exchange exchange, Processor processor, 
NamedNode definition, long timeTaken) {
+    public void afterProcess(Exchange exchange, Processor processor, NamedNode 
definition, long timeTaken) {
         // noop
-        return false;
     }
 
     @Override
diff --git 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
index 6a031a1f402..7babd956ea8 100644
--- 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
+++ 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
@@ -639,24 +639,16 @@ public class CamelInternalProcessor extends 
DelegateAsyncProcessor implements In
         private final BacklogDebugger backlogDebugger;
         private final Processor target;
         private final NamedNode definition;
-        private final String nodeId;
 
         public BacklogDebuggerAdvice(BacklogDebugger backlogDebugger, 
Processor target, NamedNode definition) {
             this.backlogDebugger = backlogDebugger;
             this.target = target;
             this.definition = definition;
-            this.nodeId = definition.getId();
         }
 
         @Override
         public StopWatch before(Exchange exchange) throws Exception {
-            if (backlogDebugger.isEnabled() && 
(backlogDebugger.hasBreakpoint(nodeId) || backlogDebugger.isSingleStepMode())) {
-                StopWatch watch = new StopWatch();
-                backlogDebugger.beforeProcess(exchange, target, definition);
-                return watch;
-            } else {
-                return null;
-            }
+            return backlogDebugger.beforeProcess(exchange, target, definition);
         }
 
         @Override
diff --git 
a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedBacklogDebuggerMBean.java
 
b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedBacklogDebuggerMBean.java
index b824a84c356..57c2f7573e8 100644
--- 
a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedBacklogDebuggerMBean.java
+++ 
b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedBacklogDebuggerMBean.java
@@ -175,4 +175,10 @@ public interface ManagedBacklogDebuggerMBean {
 
     @ManagedOperation(description = "Returns the message history at the given 
node id as XML")
     String messageHistoryOnBreakpointAsXml(String nodeId);
+
+    @ManagedOperation(description = "Attach the debugger")
+    void attach();
+
+    @ManagedOperation(description = "Detach the debugger")
+    void detach();
 }
diff --git a/core/camel-management/pom.xml b/core/camel-management/pom.xml
index 577572d5cef..1e225b27d4c 100644
--- a/core/camel-management/pom.xml
+++ b/core/camel-management/pom.xml
@@ -69,6 +69,11 @@
             <artifactId>awaitility</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.junit-pioneer</groupId>
+            <artifactId>junit-pioneer</artifactId>
+            <scope>test</scope>
+        </dependency>
         <dependency>
             <groupId>org.apache.camel</groupId>
             <artifactId>camel-core</artifactId>
diff --git 
a/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedBacklogDebugger.java
 
b/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedBacklogDebugger.java
index 0f78390a6fc..0fb57d6fc0b 100644
--- 
a/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedBacklogDebugger.java
+++ 
b/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedBacklogDebugger.java
@@ -441,6 +441,16 @@ public class ManagedBacklogDebugger implements 
ManagedBacklogDebuggerMBean {
         return messageHistoryBuffer.toString();
     }
 
+    @Override
+    public void attach() {
+        backlogDebugger.attach();
+    }
+
+    @Override
+    public void detach() {
+        backlogDebugger.detach();
+    }
+
     private String dumpExchangePropertiesAsXml(String id) {
         StringBuilder sb = new StringBuilder();
         sb.append("  <exchangeProperties>\n");
diff --git 
a/core/camel-management/src/test/java/org/apache/camel/management/BacklogDebuggerTest.java
 
b/core/camel-management/src/test/java/org/apache/camel/management/BacklogDebuggerTest.java
index 01b8702f5d8..24a1e03f758 100644
--- 
a/core/camel-management/src/test/java/org/apache/camel/management/BacklogDebuggerTest.java
+++ 
b/core/camel-management/src/test/java/org/apache/camel/management/BacklogDebuggerTest.java
@@ -24,9 +24,11 @@ import javax.management.ObjectName;
 
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.debugger.BacklogDebugger;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.condition.DisabledOnOs;
 import org.junit.jupiter.api.condition.OS;
+import org.junitpioneer.jupiter.SetEnvironmentVariable;
 
 import static org.awaitility.Awaitility.await;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -890,6 +892,51 @@ public class BacklogDebuggerTest extends 
ManagementTestSupport {
         assertEquals(0, nodes.size());
     }
 
+    /**
+     * Ensure that the suspend mode works as expected.
+     */
+    @Test
+    @SetEnvironmentVariable(key = BacklogDebugger.SUSPEND_MODE_ENV_VAR_NAME, 
value = "true")
+    public void testSuspendMode() throws Exception {
+        MBeanServer mbeanServer = getMBeanServer();
+        ObjectName on = new ObjectName(
+                "org.apache.camel:context=" + context.getManagementName() + 
",type=tracer,name=BacklogDebugger");
+        assertNotNull(on);
+        mbeanServer.isRegistered(on);
+
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMessageCount(0);
+        mock.setSleepForEmptyTest(100);
+
+        template.sendBody("seda:start", "Hello World");
+        assertMockEndpointsSatisfied();
+
+        resetMocks();
+
+        // Attach debugger
+        mbeanServer.invoke(on, "attach", null, null);
+
+        mock.expectedMessageCount(1);
+
+        resetMocks();
+
+        // Detach debugger
+        mbeanServer.invoke(on, "detach", null, null);
+
+        mock.expectedMessageCount(0);
+        mock.setSleepForEmptyTest(100);
+
+        template.sendBody("seda:start", "Hello World 2");
+        assertMockEndpointsSatisfied();
+
+        resetMocks();
+
+        // Attach debugger
+        mbeanServer.invoke(on, "attach", null, null);
+
+        mock.expectedMessageCount(1);
+    }
+
     @Override
     protected RouteBuilder createRouteBuilder() throws Exception {
         return new RouteBuilder() {
diff --git a/docs/user-manual/modules/ROOT/pages/debugger.adoc 
b/docs/user-manual/modules/ROOT/pages/debugger.adoc
index 1ac971f9b30..845ed592305 100644
--- a/docs/user-manual/modules/ROOT/pages/debugger.adoc
+++ b/docs/user-manual/modules/ROOT/pages/debugger.adoc
@@ -90,7 +90,12 @@ which can be used to extend for custom implementations.
 
 === JMX debugger
 
-There is also a xref:backlog-debugger.adoc[Backlog Debugger] which allows 
debugging from JMX.
+There is also a xref:backlog-debugger.adoc[Backlog Debugger] which allows 
debugging from JMX that is included into `camel-debug`.
+
+To be able to have enough time to add your breakpoints, you could need to 
suspend the message processing of Camel to make sure
+that you won't miss any messages. For this kind of need, you have to set the 
environment variable `CAMEL_DEBUGGER_SUSPEND` to `true`
+within the context of your application, then the `Backlog Debugger` suspends 
the message processing until the JMX operation `attach` is called. Calling the 
JMX operation `detach` suspends again the message processing.
+
 Several 3rd party tooling are using it:
 - https://hawt.io/[hawtio] uses this for its web based debugging functionality
 - 
https://marketplace.visualstudio.com/items?itemName=redhat.vscode-debug-adapter-apache-camel[VS
 Code Debug Adapter for Camel]
diff --git a/parent/pom.xml b/parent/pom.xml
index 67d70ff8e80..772cc4ed68b 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -341,6 +341,7 @@
         <junit-toolbox-version>2.4</junit-toolbox-version>
         <junit-version>4.13.2</junit-version>
         <junit-jupiter-version>5.8.2</junit-jupiter-version>
+        <junit-pioneer-version>1.7.1</junit-pioneer-version>
         <jxmpp-version>0.6.4</jxmpp-version>
         <jython-version>2.7.2</jython-version>
         <jython-standalone-version>2.7.2</jython-standalone-version>
@@ -3614,6 +3615,11 @@
                 <type>pom</type>
                 <scope>import</scope>
             </dependency>
+            <dependency>
+                <groupId>org.junit-pioneer</groupId>
+                <artifactId>junit-pioneer</artifactId>
+                <version>${junit-pioneer-version}</version>
+            </dependency>
             <dependency>
                 <groupId>org.awaitility</groupId>
                 <artifactId>awaitility</artifactId>

Reply via email to