Updated Branches: refs/heads/master b96c92e56 -> 81d0a174b
CAMEL-6628: ProducerTemplate allows to turn off event notifier for sending/sent exchanges. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/81d0a174 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/81d0a174 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/81d0a174 Branch: refs/heads/master Commit: 81d0a174b81ed2b8005327f48b9ffd6d3718c03a Parents: b96c92e Author: Claus Ibsen <davscl...@apache.org> Authored: Mon Aug 12 16:54:43 2013 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Mon Aug 12 16:54:43 2013 +0200 ---------------------------------------------------------------------- .../java/org/apache/camel/ProducerTemplate.java | 20 ++++++ .../mbean/ManagedProducerCacheMBean.java | 3 + .../camel/impl/DefaultProducerTemplate.java | 14 +++++ .../org/apache/camel/impl/ProducerCache.java | 34 +++++++--- .../management/mbean/ManagedProducerCache.java | 3 + .../camel/support/EventNotifierSupport.java | 8 +++ .../camel/processor/MySentEventNotifier.java | 49 +++++++++++++++ ...roducerTemplateDisableEventNotifierTest.java | 65 ++++++++++++++++++++ 8 files changed, 186 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/81d0a174/camel-core/src/main/java/org/apache/camel/ProducerTemplate.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/ProducerTemplate.java b/camel-core/src/main/java/org/apache/camel/ProducerTemplate.java index aedb7d3..32533c1 100644 --- a/camel-core/src/main/java/org/apache/camel/ProducerTemplate.java +++ b/camel-core/src/main/java/org/apache/camel/ProducerTemplate.java @@ -112,6 +112,26 @@ public interface ProducerTemplate extends Service { */ void setDefaultEndpointUri(String endpointUri); + /** + * Sets whether the {@link org.apache.camel.spi.EventNotifier} should be + * used by this {@link ProducerTemplate} to send events about the {@link Exchange} + * being sent. + * <p/> + * By default this is enabled. + * + * @param enabled <tt>true</tt> to enable, <tt>false</tt> to disable. + */ + void setEventNotifierEnabled(boolean enabled); + + /** + * Whether the {@link org.apache.camel.spi.EventNotifier} should be + * used by this {@link ProducerTemplate} to send events about the {@link Exchange} + * being sent. + * + * @return <tt>true</tt> if enabled, <tt>false</tt> otherwise + */ + boolean isEventNotifierEnabled(); + // Synchronous methods // ----------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/81d0a174/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedProducerCacheMBean.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedProducerCacheMBean.java b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedProducerCacheMBean.java index 6758615..aeb9474 100644 --- a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedProducerCacheMBean.java +++ b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedProducerCacheMBean.java @@ -45,4 +45,7 @@ public interface ManagedProducerCacheMBean extends ManagedServiceMBean { @ManagedOperation(description = "Purges the cache") void purge(); + @ManagedAttribute(description = "EventNotifier enabled") + Boolean isEventNotifierEnabled(); + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/81d0a174/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java index bce26e8..7f852f7 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java +++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java @@ -52,6 +52,7 @@ public class DefaultProducerTemplate extends ServiceSupport implements ProducerT private volatile ExecutorService executor; private Endpoint defaultEndpoint; private int maximumCacheSize; + private boolean eventNotifierEnabled = true; public DefaultProducerTemplate(CamelContext camelContext) { this.camelContext = camelContext; @@ -87,6 +88,18 @@ public class DefaultProducerTemplate extends ServiceSupport implements ProducerT return producerCache.size(); } + public boolean isEventNotifierEnabled() { + return eventNotifierEnabled; + } + + public void setEventNotifierEnabled(boolean eventNotifierEnabled) { + this.eventNotifierEnabled = eventNotifierEnabled; + // if we already created the cache then adjust its setting as well + if (producerCache != null) { + producerCache.setEventNotifierEnabled(eventNotifierEnabled); + } + } + public Exchange send(String endpointUri, Exchange exchange) { Endpoint endpoint = resolveMandatoryEndpoint(endpointUri); return send(endpoint, exchange); @@ -717,6 +730,7 @@ public class DefaultProducerTemplate extends ServiceSupport implements ProducerT } else { producerCache = new ProducerCache(this, camelContext); } + producerCache.setEventNotifierEnabled(isEventNotifierEnabled()); } ServiceHelper.startService(producerCache); } http://git-wip-us.apache.org/repos/asf/camel/blob/81d0a174/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java b/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java index 8c5f976..99023d0 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java +++ b/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java @@ -54,6 +54,7 @@ public class ProducerCache extends ServiceSupport { private final ServicePool<Endpoint, Producer> pool; private final Map<String, Producer> producers; private final Object source; + private boolean eventNotifierEnabled = true; public ProducerCache(Object source, CamelContext camelContext) { this(source, camelContext, CamelContextHelper.getMaximumCachePoolSize(camelContext)); @@ -74,6 +75,14 @@ public class ProducerCache extends ServiceSupport { this.producers = cache; } + public boolean isEventNotifierEnabled() { + return eventNotifierEnabled; + } + + public void setEventNotifierEnabled(boolean eventNotifierEnabled) { + this.eventNotifierEnabled = eventNotifierEnabled; + } + /** * Creates the {@link LRUCache} to be used. * <p/> @@ -220,13 +229,13 @@ public class ProducerCache extends ServiceSupport { } StopWatch watch = null; - if (exchange != null) { + if (eventNotifierEnabled && exchange != null) { // record timing for sending the exchange using the producer watch = new StopWatch(); } try { - if (exchange != null) { + if (eventNotifierEnabled && exchange != null) { EventHelper.notifyExchangeSending(exchange.getContext(), exchange, endpoint); } // invoke the callback @@ -236,7 +245,7 @@ public class ProducerCache extends ServiceSupport { exchange.setException(e); } } finally { - if (exchange != null) { + if (eventNotifierEnabled && exchange != null) { long timeTaken = watch.stop(); // emit event that the exchange was sent to the endpoint EventHelper.notifyExchangeSent(exchange.getContext(), exchange, endpoint, timeTaken); @@ -287,10 +296,10 @@ public class ProducerCache extends ServiceSupport { } // record timing for sending the exchange using the producer - final StopWatch watch = exchange != null ? new StopWatch() : null; + final StopWatch watch = eventNotifierEnabled && exchange != null ? new StopWatch() : null; try { - if (exchange != null) { + if (eventNotifierEnabled && exchange != null) { EventHelper.notifyExchangeSending(exchange.getContext(), exchange, endpoint); } // invoke the callback @@ -299,7 +308,7 @@ public class ProducerCache extends ServiceSupport { @Override public void done(boolean doneSync) { try { - if (watch != null) { + if (eventNotifierEnabled && watch != null) { long timeTaken = watch.stop(); // emit event that the exchange was sent to the endpoint EventHelper.notifyExchangeSent(exchange.getContext(), exchange, endpoint, timeTaken); @@ -358,9 +367,12 @@ public class ProducerCache extends ServiceSupport { exchange.setProperty(Exchange.TO_ENDPOINT, endpoint.getEndpointUri()); // send the exchange using the processor - StopWatch watch = new StopWatch(); + StopWatch watch = null; try { - EventHelper.notifyExchangeSending(exchange.getContext(), exchange, endpoint); + if (eventNotifierEnabled) { + watch = new StopWatch(); + EventHelper.notifyExchangeSending(exchange.getContext(), exchange, endpoint); + } // ensure we run in an unit of work Producer target = new UnitOfWorkProducer(producer); target.process(exchange); @@ -369,8 +381,10 @@ public class ProducerCache extends ServiceSupport { exchange.setException(e); } finally { // emit event that the exchange was sent to the endpoint - long timeTaken = watch.stop(); - EventHelper.notifyExchangeSent(exchange.getContext(), exchange, endpoint, timeTaken); + if (eventNotifierEnabled && watch != null) { + long timeTaken = watch.stop(); + EventHelper.notifyExchangeSent(exchange.getContext(), exchange, endpoint, timeTaken); + } } return exchange; } http://git-wip-us.apache.org/repos/asf/camel/blob/81d0a174/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedProducerCache.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedProducerCache.java b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedProducerCache.java index 5cb57b0..ef07077 100644 --- a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedProducerCache.java +++ b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedProducerCache.java @@ -72,4 +72,7 @@ public class ManagedProducerCache extends ManagedService implements ManagedProdu producerCache.purge(); } + public Boolean isEventNotifierEnabled() { + return producerCache.isEventNotifierEnabled(); + } } http://git-wip-us.apache.org/repos/asf/camel/blob/81d0a174/camel-core/src/main/java/org/apache/camel/support/EventNotifierSupport.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/support/EventNotifierSupport.java b/camel-core/src/main/java/org/apache/camel/support/EventNotifierSupport.java index 2bc2c08..68dcc81 100644 --- a/camel-core/src/main/java/org/apache/camel/support/EventNotifierSupport.java +++ b/camel-core/src/main/java/org/apache/camel/support/EventNotifierSupport.java @@ -117,4 +117,12 @@ public abstract class EventNotifierSupport extends ServiceSupport implements Eve public void setIgnoreExchangeSendingEvents(boolean ignoreExchangeSendingEvents) { this.ignoreExchangeSendingEvents = ignoreExchangeSendingEvents; } + + protected void doStart() throws Exception { + // noop + } + + protected void doStop() throws Exception { + // noop + } } http://git-wip-us.apache.org/repos/asf/camel/blob/81d0a174/camel-core/src/test/java/org/apache/camel/processor/MySentEventNotifier.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/MySentEventNotifier.java b/camel-core/src/test/java/org/apache/camel/processor/MySentEventNotifier.java new file mode 100644 index 0000000..117c104 --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/processor/MySentEventNotifier.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor; + +import java.util.ArrayList; +import java.util.EventObject; +import java.util.List; + +import org.apache.camel.management.event.ExchangeSentEvent; +import org.apache.camel.support.EventNotifierSupport; + +/** + * @version + */ +public class MySentEventNotifier extends EventNotifierSupport { + + private final List<ExchangeSentEvent> events = new ArrayList<ExchangeSentEvent>(); + + public List<ExchangeSentEvent> getEvents() { + return events; + } + + public void notify(EventObject event) throws Exception { + if (event instanceof ExchangeSentEvent) { + ExchangeSentEvent sent = (ExchangeSentEvent) event; + events.add(sent); + } + } + + public boolean isEnabled(EventObject event) { + // we only want the sent events + return event instanceof ExchangeSentEvent; + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/81d0a174/camel-core/src/test/java/org/apache/camel/processor/ProducerTemplateDisableEventNotifierTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/ProducerTemplateDisableEventNotifierTest.java b/camel-core/src/test/java/org/apache/camel/processor/ProducerTemplateDisableEventNotifierTest.java new file mode 100644 index 0000000..ad9855c --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/processor/ProducerTemplateDisableEventNotifierTest.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor; + +import org.apache.camel.CamelContext; +import org.apache.camel.ContextTestSupport; +import org.apache.camel.ProducerTemplate; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.impl.DefaultCamelContext; + +/** + * @version + */ +public class ProducerTemplateDisableEventNotifierTest extends ContextTestSupport { + + private MySentEventNotifier notifier = new MySentEventNotifier(); + + @Override + protected CamelContext createCamelContext() throws Exception { + DefaultCamelContext context = (DefaultCamelContext) super.createCamelContext(); + context.getManagementStrategy().addEventNotifier(notifier); + return context; + } + + public void testExchangeSent() throws Exception { + getMockEndpoint("mock:result").expectedMessageCount(1); + + // we dont want events for producer template itself so disable that + ProducerTemplate other = context.createProducerTemplate(); + other.setEventNotifierEnabled(false); + + other.sendBody("direct:start", "Hello World"); + + assertMockEndpointsSatisfied(); + + assertEquals(2, notifier.getEvents().size()); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:start").to("direct:bar").to("mock:result"); + + from("direct:bar").delay(1); + } + }; + } + +} \ No newline at end of file