CAMEL-7735: Fixed producer template to not emit 2x sent event to notifier if the exchange was manually created and using the send method.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/42e26e10 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/42e26e10 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/42e26e10 Branch: refs/heads/camel-2.12.x Commit: 42e26e10d2dc4d0555eb2de217db4a1147e48bad Parents: 57d9e9c Author: Claus Ibsen <davscl...@apache.org> Authored: Fri Aug 22 10:25:44 2014 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Fri Aug 22 10:38:46 2014 +0200 ---------------------------------------------------------------------- .../org/apache/camel/impl/ProducerCache.java | 14 --- .../SentExchangeEventNotifierIssueTest.java | 117 +++++++++++++++++++ .../SentExchangeEventNotifierTwoIssueTest.java | 114 ++++++++++++++++++ 3 files changed, 231 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/42e26e10/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java b/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java index dd3b0a0..7b713c8 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java +++ b/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java @@ -228,16 +228,7 @@ public class ProducerCache extends ServiceSupport { } } - StopWatch watch = null; - if (eventNotifierEnabled && exchange != null) { - // record timing for sending the exchange using the producer - watch = new StopWatch(); - } - try { - if (eventNotifierEnabled && exchange != null) { - EventHelper.notifyExchangeSending(exchange.getContext(), exchange, endpoint); - } // invoke the callback answer = callback.doInProducer(producer, exchange, pattern); } catch (Throwable e) { @@ -245,11 +236,6 @@ public class ProducerCache extends ServiceSupport { exchange.setException(e); } } finally { - if (eventNotifierEnabled && exchange != null) { - long timeTaken = watch.stop(); - // emit event that the exchange was sent to the endpoint - EventHelper.notifyExchangeSent(exchange.getContext(), exchange, endpoint, timeTaken); - } if (producer instanceof ServicePoolAware) { // release back to the pool pool.release(endpoint, producer); http://git-wip-us.apache.org/repos/asf/camel/blob/42e26e10/camel-core/src/test/java/org/apache/camel/issues/SentExchangeEventNotifierIssueTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/issues/SentExchangeEventNotifierIssueTest.java b/camel-core/src/test/java/org/apache/camel/issues/SentExchangeEventNotifierIssueTest.java new file mode 100644 index 0000000..c210d2c --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/issues/SentExchangeEventNotifierIssueTest.java @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.issues; + +import java.util.EventObject; + +import org.apache.camel.CamelContext; +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.impl.DefaultExchange; +import org.apache.camel.management.event.ExchangeSentEvent; +import org.apache.camel.support.EventNotifierSupport; + +public class SentExchangeEventNotifierIssueTest extends ContextTestSupport { + + private MyNotifier notifier = new MyNotifier(); + + private class MyNotifier extends EventNotifierSupport { + + private int counter; + + @Override + public void notify(EventObject event) throws Exception { + counter++; + } + + @Override + public boolean isEnabled(EventObject event) { + return event instanceof ExchangeSentEvent; + } + + public int getCounter() { + return counter; + } + + public void reset() { + counter = 0; + } + } + + @Override + protected CamelContext createCamelContext() throws Exception { + CamelContext context = super.createCamelContext(); + context.getManagementStrategy().addEventNotifier(notifier); + return context; + } + + public void testExchangeSentNotifier() throws Exception { + notifier.reset(); + + String out = template.requestBody("direct:start", "Hello World", String.class); + assertEquals("I was here", out); + + // should only be one event + assertEquals(1, notifier.getCounter()); + } + + public void testExchangeSentNotifierExchange() throws Exception { + notifier.reset(); + + Exchange out = template.request("direct:start", new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + exchange.getIn().setBody("Hello World"); + } + }); + assertEquals("I was here", out.getIn().getBody()); + + // should only be one event + assertEquals(1, notifier.getCounter()); + } + + public void testExchangeSentNotifierManualExchange() throws Exception { + notifier.reset(); + + Exchange exchange = new DefaultExchange(context); + exchange.getIn().setBody("Hello World"); + + template.send("direct:start", exchange); + assertEquals("I was here", exchange.getIn().getBody()); + + // should only be one event + assertEquals(1, notifier.getCounter()); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:start") + .process(new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + exchange.getIn().setBody("I was here"); + } + }); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/42e26e10/camel-core/src/test/java/org/apache/camel/issues/SentExchangeEventNotifierTwoIssueTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/issues/SentExchangeEventNotifierTwoIssueTest.java b/camel-core/src/test/java/org/apache/camel/issues/SentExchangeEventNotifierTwoIssueTest.java new file mode 100644 index 0000000..31eccf8 --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/issues/SentExchangeEventNotifierTwoIssueTest.java @@ -0,0 +1,114 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.issues; + +import java.util.EventObject; + +import org.apache.camel.CamelContext; +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.impl.DefaultExchange; +import org.apache.camel.management.event.ExchangeSentEvent; +import org.apache.camel.support.EventNotifierSupport; + +public class SentExchangeEventNotifierTwoIssueTest extends ContextTestSupport { + + private MyNotifier notifier = new MyNotifier(); + + private class MyNotifier extends EventNotifierSupport { + + private int counter; + + @Override + public void notify(EventObject event) throws Exception { + counter++; + } + + @Override + public boolean isEnabled(EventObject event) { + return event instanceof ExchangeSentEvent; + } + + public int getCounter() { + return counter; + } + + public void reset() { + counter = 0; + } + } + + @Override + protected CamelContext createCamelContext() throws Exception { + CamelContext context = super.createCamelContext(); + context.getManagementStrategy().addEventNotifier(notifier); + return context; + } + + public void testExchangeSentNotifier() throws Exception { + notifier.reset(); + + String out = template.requestBody("direct:start", "Hello World", String.class); + assertEquals("I was here", out); + + assertEquals(2, notifier.getCounter()); + } + + public void testExchangeSentNotifierExchange() throws Exception { + notifier.reset(); + + Exchange out = template.request("direct:start", new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + exchange.getIn().setBody("Hello World"); + } + }); + assertEquals("I was here", out.getIn().getBody()); + + assertEquals(2, notifier.getCounter()); + } + + public void testExchangeSentNotifierManualExchange() throws Exception { + notifier.reset(); + + Exchange exchange = new DefaultExchange(context); + exchange.getIn().setBody("Hello World"); + + template.send("direct:start", exchange); + assertEquals("I was here", exchange.getIn().getBody()); + + assertEquals(2, notifier.getCounter()); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:start") + .process(new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + exchange.getIn().setBody("I was here"); + } + }).to("mock:result"); + } + }; + } +}