Repository: camel Updated Branches: refs/heads/camel-2.16.x 62dea9064 -> d40de6907
CAMEL-10096: Camel tracer with stream caching should tracer after stream cache has been setup. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/d40de690 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/d40de690 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/d40de690 Branch: refs/heads/camel-2.16.x Commit: d40de690768ee3735ca5d00723f0dee05c4cd6ca Parents: 62dea90 Author: Claus Ibsen <davscl...@apache.org> Authored: Tue Jun 28 09:32:29 2016 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Tue Aug 30 18:49:20 2016 +0200 ---------------------------------------------------------------------- .../camel/processor/CamelInternalProcessor.java | 33 +++++- .../BacklogTracerStreamCachingTest.java | 101 +++++++++++++++++++ 2 files changed, 130 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/d40de690/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java index 856a225..1b42b94 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java @@ -17,6 +17,7 @@ package org.apache.camel.processor; import java.util.ArrayList; +import java.util.Collections; import java.util.Date; import java.util.List; import java.util.concurrent.RejectedExecutionException; @@ -25,6 +26,7 @@ import org.apache.camel.AsyncCallback; import org.apache.camel.CamelContext; import org.apache.camel.Exchange; import org.apache.camel.MessageHistory; +import org.apache.camel.Ordered; import org.apache.camel.Processor; import org.apache.camel.Route; import org.apache.camel.StatefulService; @@ -44,6 +46,7 @@ import org.apache.camel.spi.RoutePolicy; import org.apache.camel.spi.StreamCachingStrategy; import org.apache.camel.spi.UnitOfWork; import org.apache.camel.util.MessageHelper; +import org.apache.camel.util.OrderedComparator; import org.apache.camel.util.StopWatch; import org.apache.camel.util.UnitOfWorkHelper; import org.slf4j.Logger; @@ -72,6 +75,8 @@ import org.slf4j.LoggerFactory; * <b>Debugging tips:</b> Camel end users whom want to debug their Camel applications with the Camel source code, then make sure to * read the source code of this class about the debugging tips, which you can find in the * {@link #process(org.apache.camel.Exchange, org.apache.camel.AsyncCallback)} method. + * <p/> + * The added advices can implement {@link Ordered} to control in which order the advices are executed. */ public class CamelInternalProcessor extends DelegateAsyncProcessor { @@ -92,6 +97,8 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor { */ public void addAdvice(CamelInternalProcessorAdvice advice) { advices.add(advice); + // ensure advices are sorted so they are in the order we want + Collections.sort(advices, new OrderedComparator()); } /** @@ -125,7 +132,6 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor { // you can see in the code below. // ---------------------------------------------------------- - if (processor == null || !continueProcessing(exchange)) { // no processor or we should not continue then we are done callback.done(true); @@ -521,7 +527,7 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor { /** * Advice to execute the {@link BacklogTracer} if enabled. */ - public static final class BacklogTracerAdvice implements CamelInternalProcessorAdvice { + public static final class BacklogTracerAdvice implements CamelInternalProcessorAdvice, Ordered { private final BacklogTracer backlogTracer; private final ProcessorDefinition<?> processorDefinition; @@ -563,12 +569,19 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor { public void after(Exchange exchange, Object data) throws Exception { // noop } + + @Override + public int getOrder() { + // we want tracer just before calling the processor + return Ordered.LOWEST - 1; + } + } /** * Advice to execute the {@link org.apache.camel.processor.interceptor.BacklogDebugger} if enabled. */ - public static final class BacklogDebuggerAdvice implements CamelInternalProcessorAdvice<StopWatch> { + public static final class BacklogDebuggerAdvice implements CamelInternalProcessorAdvice<StopWatch>, Ordered { private final BacklogDebugger backlogDebugger; private final Processor target; @@ -599,6 +612,12 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor { backlogDebugger.afterProcess(exchange, target, definition, stopWatch.stop()); } } + + @Override + public int getOrder() { + // we want debugger just before calling the processor + return Ordered.LOWEST; + } } /** @@ -741,7 +760,7 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor { /** * Advice for {@link org.apache.camel.spi.StreamCachingStrategy} */ - public static class StreamCachingAdvice implements CamelInternalProcessorAdvice<StreamCache> { + public static class StreamCachingAdvice implements CamelInternalProcessorAdvice<StreamCache>, Ordered { private final StreamCachingStrategy strategy; @@ -782,6 +801,12 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor { ((StreamCache) body).reset(); } } + + @Override + public int getOrder() { + // we want stream caching first + return Ordered.HIGHEST; + } } /** http://git-wip-us.apache.org/repos/asf/camel/blob/d40de690/camel-core/src/test/java/org/apache/camel/management/BacklogTracerStreamCachingTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/management/BacklogTracerStreamCachingTest.java b/camel-core/src/test/java/org/apache/camel/management/BacklogTracerStreamCachingTest.java new file mode 100644 index 0000000..14eeecb --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/management/BacklogTracerStreamCachingTest.java @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.management; + +import java.io.ByteArrayInputStream; +import java.util.List; +import javax.management.Attribute; +import javax.management.MBeanServer; +import javax.management.ObjectName; + +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.api.management.mbean.BacklogTracerEventMessage; +import org.apache.camel.builder.RouteBuilder; + +public class BacklogTracerStreamCachingTest extends ManagementTestSupport { + + @SuppressWarnings("unchecked") + public void testBacklogTracerEventMessageStreamCaching() throws Exception { + // JMX tests dont work well on AIX CI servers (hangs them) + if (isPlatform("aix")) { + return; + } + + MBeanServer mbeanServer = getMBeanServer(); + ObjectName on = new ObjectName("org.apache.camel:context=camel-1,type=tracer,name=BacklogTracer"); + assertNotNull(on); + assertTrue(mbeanServer.isRegistered(on)); + + Boolean enabled = (Boolean) mbeanServer.getAttribute(on, "Enabled"); + assertEquals("Should not be enabled", Boolean.FALSE, enabled); + + Integer size = (Integer) mbeanServer.getAttribute(on, "BacklogSize"); + assertEquals("Should be 1000", 1000, size.intValue()); + + Boolean removeOnDump = (Boolean) mbeanServer.getAttribute(on, "RemoveOnDump"); + assertEquals(Boolean.TRUE, removeOnDump); + + // enable streams + mbeanServer.setAttribute(on, new Attribute("BodyIncludeStreams", Boolean.TRUE)); + + // enable it + mbeanServer.setAttribute(on, new Attribute("Enabled", Boolean.TRUE)); + + getMockEndpoint("mock:bar").expectedMessageCount(1); + + template.sendBody("direct:start", "Hello World"); + + assertMockEndpointsSatisfied(); + + List<Exchange> exchanges = getMockEndpoint("mock:bar").getReceivedExchanges(); + + List<BacklogTracerEventMessage> events = (List<BacklogTracerEventMessage>) mbeanServer.invoke(on, "dumpTracedMessages", + new Object[]{"bar"}, new String[]{"java.lang.String"}); + + assertNotNull(events); + assertEquals(1, events.size()); + + BacklogTracerEventMessage event1 = events.get(0); + assertEquals("bar", event1.getToNode()); + assertEquals(" <message exchangeId=\"" + exchanges.get(0).getExchangeId() + "\">\n" + + " <body type=\"org.apache.camel.converter.stream.ByteArrayInputStreamCache\">Bye World</body>\n" + + " </message>", event1.getMessageAsXml()); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + context.setUseBreadcrumb(false); + + from("direct:start").streamCaching() + .process(new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + ByteArrayInputStream is = new ByteArrayInputStream("Bye World".getBytes()); + exchange.getIn().setBody(is); + } + }) + .log("Got ${body}") + .to("mock:bar").id("bar"); + } + }; + } + +}