This is an automated email from the ASF dual-hosted git repository.
nfilotto pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new ad751863775 CAMEL-18217: debugger - Allow to suspend messages (#7867)
ad751863775 is described below
commit ad75186377554ec6eb55ea206e12b24fd8371495
Author: Nicolas Filotto <[email protected]>
AuthorDate: Thu Jun 23 16:02:50 2022 +0200
CAMEL-18217: debugger - Allow to suspend messages (#7867)
## Motivation
Up to now when using the BacklogDebugger there is no easy way to suspend
all messages before ensuring that all breakpoints have been defined. This can
particularly be a problem when debugging an application with a very short
lifetime like a test.
## Modifications
* Define a system variable `CAMEL_DEBUGGER_SUSPEND` to suspend the message
processing at startup
* Add methods `attach` and `detach` to respectively resume and suspend
message processing when `CAMEL_DEBUGGER_SUSPEND` is set to `true`
* Add `junit-pioneer` as test dependency to be able to set the value of an
environment variable within the context of a Unit test.
* Add some doc to describe it
---
camel-dependencies/pom.xml | 1 +
.../camel/impl/debugger/BacklogDebugger.java | 104 +++++++++++++++++++--
.../camel/impl/engine/CamelInternalProcessor.java | 10 +-
.../mbean/ManagedBacklogDebuggerMBean.java | 6 ++
core/camel-management/pom.xml | 5 +
.../management/mbean/ManagedBacklogDebugger.java | 10 ++
.../camel/management/BacklogDebuggerTest.java | 47 ++++++++++
docs/user-manual/modules/ROOT/pages/debugger.adoc | 7 +-
parent/pom.xml | 6 ++
9 files changed, 180 insertions(+), 16 deletions(-)
diff --git a/camel-dependencies/pom.xml b/camel-dependencies/pom.xml
index 822dcd37cef..a9b2b7b1744 100644
--- a/camel-dependencies/pom.xml
+++ b/camel-dependencies/pom.xml
@@ -355,6 +355,7 @@
<jt400-version>11.0</jt400-version>
<jta-api-1.2-version>1.2</jta-api-1.2-version>
<junit-jupiter-version>5.8.2</junit-jupiter-version>
+ <junit-pioneer-version>1.7.1</junit-pioneer-version>
<junit-toolbox-version>2.4</junit-toolbox-version>
<junit-version>4.13.2</junit-version>
<jxmpp-version>0.6.4</jxmpp-version>
diff --git
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/BacklogDebugger.java
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/BacklogDebugger.java
index a7e92b902c3..246bb05bd27 100644
---
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/BacklogDebugger.java
+++
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/debugger/BacklogDebugger.java
@@ -24,6 +24,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
@@ -44,6 +45,7 @@ import org.apache.camel.support.CamelContextHelper;
import org.apache.camel.support.MessageHelper;
import org.apache.camel.support.service.ServiceHelper;
import org.apache.camel.support.service.ServiceSupport;
+import org.apache.camel.util.StopWatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -61,6 +63,11 @@ import org.slf4j.LoggerFactory;
*/
public final class BacklogDebugger extends ServiceSupport {
+ /**
+ * The name of the environment variable that contains the value of the
flag indicating whether the
+ * {@code BacklogDebugger} should suspend processing the messages and wait
for a debugger to attach or not.
+ */
+ public static final String SUSPEND_MODE_ENV_VAR_NAME =
"CAMEL_DEBUGGER_SUSPEND";
private static final Logger LOG =
LoggerFactory.getLogger(BacklogDebugger.class);
private long fallbackTimeout = 300;
@@ -73,6 +80,15 @@ public final class BacklogDebugger extends ServiceSupport {
private final ConcurrentMap<String, NodeBreakpoint> breakpoints = new
ConcurrentHashMap<>();
private final ConcurrentMap<String, SuspendedExchange>
suspendedBreakpoints = new ConcurrentHashMap<>();
private final ConcurrentMap<String, BacklogTracerEventMessage>
suspendedBreakpointMessages = new ConcurrentHashMap<>();
+ /**
+ * Indicates whether the <i>suspend mode</i> is enabled or not.
+ */
+ private final boolean suspendMode;
+ /**
+ * The reference to the {@code CountDownLatch} used to suspend Camel from
processing the incoming messages when the
+ * <i>suspend mode</i> is enabled.
+ */
+ private final AtomicReference<CountDownLatch> suspend = new
AtomicReference<>();
private volatile String singleStepExchangeId;
private int bodyMaxChars = 128 * 1024;
private boolean bodyIncludeStreams;
@@ -103,19 +119,31 @@ public final class BacklogDebugger extends ServiceSupport
{
}
}
- private BacklogDebugger(CamelContext camelContext) {
+ /**
+ * Constructs a {@code BacklogDebugger} with the given parameters.
+ *
+ * @param camelContext the camel context
+ * @param suspendMode Indicates whether the <i>suspend mode</i> is
enabled or not. If {@code true} the message
+ * processing is immediately suspended until the
{@link #attach()} is called.
+ */
+ private BacklogDebugger(CamelContext camelContext, boolean suspendMode) {
this.camelContext = camelContext;
this.debugger = new DefaultDebugger(camelContext);
+ this.suspendMode = suspendMode;
+ detach();
}
/**
* Creates a new backlog debugger.
+ * <p>
+ * In case the environment variable {@link #SUSPEND_MODE_ENV_VAR_NAME} has
been set to {@code true}, the message
+ * processing is directly suspended.
*
* @param context Camel context
* @return a new backlog debugger
*/
public static BacklogDebugger createDebugger(CamelContext context) {
- return new BacklogDebugger(context);
+ return new BacklogDebugger(context,
Boolean.parseBoolean(System.getenv(SUSPEND_MODE_ENV_VAR_NAME)));
}
/**
@@ -169,6 +197,65 @@ public final class BacklogDebugger extends ServiceSupport {
return singleStepExchangeId != null;
}
+ /**
+ * Attach the debugger which will resume the message processing in case
the <i>suspend mode</i> is enabled. Do
+ * nothing otherwise.
+ */
+ public void attach() {
+ if (suspendMode) {
+ logger.log("A debugger has been attached");
+ resumeMessageProcessing();
+ }
+ }
+
+ /**
+ * Detach the debugger which will suspend the message processing in case
the <i>suspend mode</i> is enabled. Do
+ * nothing otherwise.
+ */
+ public void detach() {
+ if (suspendMode) {
+ logger.log("Waiting for a debugger to attach");
+ suspendMessageProcessing();
+ }
+ }
+
+ /**
+ * Suspend the current thread if the <i>suspend mode</i> is enabled as
long as the method {@link #attach()} is not
+ * called. Do nothing otherwise.
+ */
+ private void suspendIfNeeded() {
+ final CountDownLatch countDownLatch = suspend.get();
+ if (countDownLatch != null) {
+ logger.log("Incoming message suspended");
+ try {
+ countDownLatch.await();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ /**
+ * Make Camel suspend processing incoming messages.
+ */
+ private void suspendMessageProcessing() {
+ suspend.compareAndSet(null, new CountDownLatch(1));
+ }
+
+ /**
+ * Resume the processing of the incoming messages.
+ */
+ private void resumeMessageProcessing() {
+ for (;;) {
+ final CountDownLatch countDownLatch = suspend.get();
+ if (countDownLatch == null) {
+ break;
+ } else if (suspend.compareAndSet(countDownLatch, null)) {
+ countDownLatch.countDown();
+ }
+ }
+ }
+
public void addBreakpoint(String nodeId) {
NodeBreakpoint breakpoint = breakpoints.get(nodeId);
if (breakpoint == null) {
@@ -454,13 +541,18 @@ public final class BacklogDebugger extends ServiceSupport
{
debugCounter.set(0);
}
- public boolean beforeProcess(Exchange exchange, Processor processor,
NamedNode definition) {
- return debugger.beforeProcess(exchange, processor, definition);
+ public StopWatch beforeProcess(Exchange exchange, Processor processor,
NamedNode definition) {
+ suspendIfNeeded();
+ if (isEnabled() && (hasBreakpoint(definition.getId()) ||
isSingleStepMode())) {
+ StopWatch watch = new StopWatch();
+ debugger.beforeProcess(exchange, processor, definition);
+ return watch;
+ }
+ return null;
}
- public boolean afterProcess(Exchange exchange, Processor processor,
NamedNode definition, long timeTaken) {
+ public void afterProcess(Exchange exchange, Processor processor, NamedNode
definition, long timeTaken) {
// noop
- return false;
}
@Override
diff --git
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
index 6a031a1f402..7babd956ea8 100644
---
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
+++
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
@@ -639,24 +639,16 @@ public class CamelInternalProcessor extends
DelegateAsyncProcessor implements In
private final BacklogDebugger backlogDebugger;
private final Processor target;
private final NamedNode definition;
- private final String nodeId;
public BacklogDebuggerAdvice(BacklogDebugger backlogDebugger,
Processor target, NamedNode definition) {
this.backlogDebugger = backlogDebugger;
this.target = target;
this.definition = definition;
- this.nodeId = definition.getId();
}
@Override
public StopWatch before(Exchange exchange) throws Exception {
- if (backlogDebugger.isEnabled() &&
(backlogDebugger.hasBreakpoint(nodeId) || backlogDebugger.isSingleStepMode())) {
- StopWatch watch = new StopWatch();
- backlogDebugger.beforeProcess(exchange, target, definition);
- return watch;
- } else {
- return null;
- }
+ return backlogDebugger.beforeProcess(exchange, target, definition);
}
@Override
diff --git
a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedBacklogDebuggerMBean.java
b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedBacklogDebuggerMBean.java
index b824a84c356..57c2f7573e8 100644
---
a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedBacklogDebuggerMBean.java
+++
b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedBacklogDebuggerMBean.java
@@ -175,4 +175,10 @@ public interface ManagedBacklogDebuggerMBean {
@ManagedOperation(description = "Returns the message history at the given
node id as XML")
String messageHistoryOnBreakpointAsXml(String nodeId);
+
+ @ManagedOperation(description = "Attach the debugger")
+ void attach();
+
+ @ManagedOperation(description = "Detach the debugger")
+ void detach();
}
diff --git a/core/camel-management/pom.xml b/core/camel-management/pom.xml
index 577572d5cef..1e225b27d4c 100644
--- a/core/camel-management/pom.xml
+++ b/core/camel-management/pom.xml
@@ -69,6 +69,11 @@
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.junit-pioneer</groupId>
+ <artifactId>junit-pioneer</artifactId>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-core</artifactId>
diff --git
a/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedBacklogDebugger.java
b/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedBacklogDebugger.java
index 0f78390a6fc..0fb57d6fc0b 100644
---
a/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedBacklogDebugger.java
+++
b/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedBacklogDebugger.java
@@ -441,6 +441,16 @@ public class ManagedBacklogDebugger implements
ManagedBacklogDebuggerMBean {
return messageHistoryBuffer.toString();
}
+ @Override
+ public void attach() {
+ backlogDebugger.attach();
+ }
+
+ @Override
+ public void detach() {
+ backlogDebugger.detach();
+ }
+
private String dumpExchangePropertiesAsXml(String id) {
StringBuilder sb = new StringBuilder();
sb.append(" <exchangeProperties>\n");
diff --git
a/core/camel-management/src/test/java/org/apache/camel/management/BacklogDebuggerTest.java
b/core/camel-management/src/test/java/org/apache/camel/management/BacklogDebuggerTest.java
index 01b8702f5d8..24a1e03f758 100644
---
a/core/camel-management/src/test/java/org/apache/camel/management/BacklogDebuggerTest.java
+++
b/core/camel-management/src/test/java/org/apache/camel/management/BacklogDebuggerTest.java
@@ -24,9 +24,11 @@ import javax.management.ObjectName;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.debugger.BacklogDebugger;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.DisabledOnOs;
import org.junit.jupiter.api.condition.OS;
+import org.junitpioneer.jupiter.SetEnvironmentVariable;
import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -890,6 +892,51 @@ public class BacklogDebuggerTest extends
ManagementTestSupport {
assertEquals(0, nodes.size());
}
+ /**
+ * Ensure that the suspend mode works as expected.
+ */
+ @Test
+ @SetEnvironmentVariable(key = BacklogDebugger.SUSPEND_MODE_ENV_VAR_NAME,
value = "true")
+ public void testSuspendMode() throws Exception {
+ MBeanServer mbeanServer = getMBeanServer();
+ ObjectName on = new ObjectName(
+ "org.apache.camel:context=" + context.getManagementName() +
",type=tracer,name=BacklogDebugger");
+ assertNotNull(on);
+ mbeanServer.isRegistered(on);
+
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedMessageCount(0);
+ mock.setSleepForEmptyTest(100);
+
+ template.sendBody("seda:start", "Hello World");
+ assertMockEndpointsSatisfied();
+
+ resetMocks();
+
+ // Attach debugger
+ mbeanServer.invoke(on, "attach", null, null);
+
+ mock.expectedMessageCount(1);
+
+ resetMocks();
+
+ // Detach debugger
+ mbeanServer.invoke(on, "detach", null, null);
+
+ mock.expectedMessageCount(0);
+ mock.setSleepForEmptyTest(100);
+
+ template.sendBody("seda:start", "Hello World 2");
+ assertMockEndpointsSatisfied();
+
+ resetMocks();
+
+ // Attach debugger
+ mbeanServer.invoke(on, "attach", null, null);
+
+ mock.expectedMessageCount(1);
+ }
+
@Override
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
diff --git a/docs/user-manual/modules/ROOT/pages/debugger.adoc
b/docs/user-manual/modules/ROOT/pages/debugger.adoc
index 1ac971f9b30..845ed592305 100644
--- a/docs/user-manual/modules/ROOT/pages/debugger.adoc
+++ b/docs/user-manual/modules/ROOT/pages/debugger.adoc
@@ -90,7 +90,12 @@ which can be used to extend for custom implementations.
=== JMX debugger
-There is also a xref:backlog-debugger.adoc[Backlog Debugger] which allows
debugging from JMX.
+There is also a xref:backlog-debugger.adoc[Backlog Debugger] which allows
debugging from JMX that is included into `camel-debug`.
+
+To be able to have enough time to add your breakpoints, you could need to
suspend the message processing of Camel to make sure
+that you won't miss any messages. For this kind of need, you have to set the
environment variable `CAMEL_DEBUGGER_SUSPEND` to `true`
+within the context of your application, then the `Backlog Debugger` suspends
the message processing until the JMX operation `attach` is called. Calling the
JMX operation `detach` suspends again the message processing.
+
Several 3rd party tooling are using it:
- https://hawt.io/[hawtio] uses this for its web based debugging functionality
-
https://marketplace.visualstudio.com/items?itemName=redhat.vscode-debug-adapter-apache-camel[VS
Code Debug Adapter for Camel]
diff --git a/parent/pom.xml b/parent/pom.xml
index 67d70ff8e80..772cc4ed68b 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -341,6 +341,7 @@
<junit-toolbox-version>2.4</junit-toolbox-version>
<junit-version>4.13.2</junit-version>
<junit-jupiter-version>5.8.2</junit-jupiter-version>
+ <junit-pioneer-version>1.7.1</junit-pioneer-version>
<jxmpp-version>0.6.4</jxmpp-version>
<jython-version>2.7.2</jython-version>
<jython-standalone-version>2.7.2</jython-standalone-version>
@@ -3614,6 +3615,11 @@
<type>pom</type>
<scope>import</scope>
</dependency>
+ <dependency>
+ <groupId>org.junit-pioneer</groupId>
+ <artifactId>junit-pioneer</artifactId>
+ <version>${junit-pioneer-version}</version>
+ </dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>