This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push: new c07baef CAMEL-13199: Using @Produce as proxy for sending to endpoint does not emit sending/sent events. Also use async processing in unit of work producer. c07baef is described below commit c07baeff3795d7ff4c0426b97e5dba5f9da15934 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Thu Feb 14 11:37:20 2019 +0100 CAMEL-13199: Using @Produce as proxy for sending to endpoint does not emit sending/sent events. Also use async processing in unit of work producer. --- .../apache/camel/component/bean/ProxyHelper.java | 3 +- .../camel/impl/CamelPostProcessorHelper.java | 5 +- .../camel/processor/DeferServiceFactory.java | 2 + .../camel/processor/EventNotifierProducer.java | 109 +++++++++++++++++++++ .../apache/camel/processor/UnitOfWorkProducer.java | 10 +- .../CamelProduceInterfaceEventNotifierTest.java | 93 ++++++++++++++++++ 6 files changed, 216 insertions(+), 6 deletions(-) diff --git a/core/camel-core/src/main/java/org/apache/camel/component/bean/ProxyHelper.java b/core/camel-core/src/main/java/org/apache/camel/component/bean/ProxyHelper.java index 9cf040c..26253b2 100644 --- a/core/camel-core/src/main/java/org/apache/camel/component/bean/ProxyHelper.java +++ b/core/camel-core/src/main/java/org/apache/camel/component/bean/ProxyHelper.java @@ -21,6 +21,8 @@ import java.lang.reflect.Proxy; import org.apache.camel.Endpoint; import org.apache.camel.Producer; import org.apache.camel.processor.DeferServiceFactory; +import org.apache.camel.processor.EventNotifierProducer; +import org.apache.camel.processor.UnitOfWorkProducer; /** * A helper class for creating proxies which delegate to Camel @@ -53,7 +55,6 @@ public final class ProxyHelper { */ public static <T> T createProxy(Endpoint endpoint, boolean binding, ClassLoader cl, Class<T>[] interfaceClasses, MethodInfoCache methodCache) throws Exception { Producer producer = DeferServiceFactory.createProducer(endpoint); - endpoint.getCamelContext().deferStartService(producer, true); return createProxyObject(endpoint, binding, producer, cl, interfaceClasses, methodCache); } diff --git a/core/camel-core/src/main/java/org/apache/camel/impl/CamelPostProcessorHelper.java b/core/camel-core/src/main/java/org/apache/camel/impl/CamelPostProcessorHelper.java index 0375c7e..2845ac8 100644 --- a/core/camel-core/src/main/java/org/apache/camel/impl/CamelPostProcessorHelper.java +++ b/core/camel-core/src/main/java/org/apache/camel/impl/CamelPostProcessorHelper.java @@ -40,6 +40,8 @@ import org.apache.camel.Service; import org.apache.camel.builder.DefaultFluentProducerTemplate; import org.apache.camel.component.bean.ProxyHelper; import org.apache.camel.processor.DeferServiceFactory; +import org.apache.camel.processor.EventNotifierProducer; +import org.apache.camel.processor.SendProcessor; import org.apache.camel.processor.UnitOfWorkProducer; import org.apache.camel.support.CamelContextHelper; import org.apache.camel.support.IntrospectionSupport; @@ -384,8 +386,7 @@ public class CamelPostProcessorHelper implements CamelContextAware { */ protected Producer createInjectionProducer(Endpoint endpoint, Object bean, String beanName) { try { - Producer producer = DeferServiceFactory.createProducer(endpoint); - return new UnitOfWorkProducer(producer); + return DeferServiceFactory.createProducer(endpoint); } catch (Exception e) { throw RuntimeCamelException.wrapRuntimeCamelException(e); } diff --git a/core/camel-core/src/main/java/org/apache/camel/processor/DeferServiceFactory.java b/core/camel-core/src/main/java/org/apache/camel/processor/DeferServiceFactory.java index 26602ca..ae69481 100644 --- a/core/camel-core/src/main/java/org/apache/camel/processor/DeferServiceFactory.java +++ b/core/camel-core/src/main/java/org/apache/camel/processor/DeferServiceFactory.java @@ -43,6 +43,8 @@ public final class DeferServiceFactory { */ public static Producer createProducer(Endpoint endpoint) throws Exception { Producer producer = new DeferProducer(endpoint); + producer = new UnitOfWorkProducer(producer); + producer = new EventNotifierProducer(producer); endpoint.getCamelContext().deferStartService(producer, true); return producer; } diff --git a/core/camel-core/src/main/java/org/apache/camel/processor/EventNotifierProducer.java b/core/camel-core/src/main/java/org/apache/camel/processor/EventNotifierProducer.java new file mode 100644 index 0000000..a646349 --- /dev/null +++ b/core/camel-core/src/main/java/org/apache/camel/processor/EventNotifierProducer.java @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor; + +import org.apache.camel.AsyncCallback; +import org.apache.camel.AsyncProducer; +import org.apache.camel.Endpoint; +import org.apache.camel.Exchange; +import org.apache.camel.Producer; +import org.apache.camel.support.AsyncProcessorConverterHelper; +import org.apache.camel.support.AsyncProcessorSupport; +import org.apache.camel.support.DefaultAsyncProducer; +import org.apache.camel.support.EventHelper; +import org.apache.camel.support.service.ServiceHelper; +import org.apache.camel.util.StopWatch; + +/** + * Ensures a {@link Producer} do send {@link org.apache.camel.spi.EventNotifier} notifications when + * sending. + */ +public final class EventNotifierProducer extends DefaultAsyncProducer { + + private final AsyncProducer producer; + + /** + * The producer which should be executed and emit {@link org.apache.camel.spi.EventNotifier} notifications. + * + * @param producer the producer + */ + public EventNotifierProducer(Producer producer) { + super(producer.getEndpoint()); + this.producer = AsyncProcessorConverterHelper.convert(producer); + } + + @Override + public boolean process(Exchange exchange, AsyncCallback callback) { + if (!isStarted()) { + exchange.setException(new IllegalStateException("Producer has not been started: " + this)); + callback.done(true); + return true; + } + + final boolean sending = EventHelper.notifyExchangeSending(exchange.getContext(), exchange, getEndpoint()); + // record timing for sending the exchange using the producer + StopWatch watch; + if (sending) { + watch = new StopWatch(); + } else { + watch = null; + } + + try { + log.debug(">>>> {} {}", getEndpoint(), exchange); + return producer.process(exchange, new AsyncCallback() { + @Override + public void done(boolean doneSync) { + try { + // emit event that the exchange was sent to the endpoint + if (watch != null) { + long timeTaken = watch.taken(); + EventHelper.notifyExchangeSent(exchange.getContext(), exchange, getEndpoint(), timeTaken); + } + } finally { + callback.done(doneSync); + } + } + }); + } catch (Throwable throwable) { + exchange.setException(throwable); + callback.done(true); + } + + return true; + } + + @Override + public Endpoint getEndpoint() { + return producer.getEndpoint(); + } + + @Override + public boolean isSingleton() { + return producer.isSingleton(); + } + + @Override + protected void doStart() throws Exception { + ServiceHelper.startService(producer); + } + + @Override + protected void doStop() throws Exception { + ServiceHelper.stopService(producer); + } +} diff --git a/core/camel-core/src/main/java/org/apache/camel/processor/UnitOfWorkProducer.java b/core/camel-core/src/main/java/org/apache/camel/processor/UnitOfWorkProducer.java index 965a25d..b45c296 100644 --- a/core/camel-core/src/main/java/org/apache/camel/processor/UnitOfWorkProducer.java +++ b/core/camel-core/src/main/java/org/apache/camel/processor/UnitOfWorkProducer.java @@ -16,16 +16,18 @@ */ package org.apache.camel.processor; +import org.apache.camel.AsyncCallback; import org.apache.camel.AsyncProcessor; import org.apache.camel.Endpoint; import org.apache.camel.Exchange; import org.apache.camel.Producer; +import org.apache.camel.support.DefaultAsyncProducer; import org.apache.camel.support.service.ServiceHelper; /** * Ensures a {@link Producer} is executed within an {@link org.apache.camel.spi.UnitOfWork}. */ -public final class UnitOfWorkProducer implements Producer { +public final class UnitOfWorkProducer extends DefaultAsyncProducer { private final Producer producer; private final AsyncProcessor processor; @@ -36,6 +38,7 @@ public final class UnitOfWorkProducer implements Producer { * @param producer the producer */ public UnitOfWorkProducer(Producer producer) { + super(producer.getEndpoint()); this.producer = producer; // wrap in unit of work CamelInternalProcessor internal = new CamelInternalProcessor(producer); @@ -47,8 +50,9 @@ public final class UnitOfWorkProducer implements Producer { return producer.getEndpoint(); } - public void process(final Exchange exchange) throws Exception { - processor.process(exchange); + @Override + public boolean process(Exchange exchange, AsyncCallback callback) { + return processor.process(exchange, callback); } public void start() throws Exception { diff --git a/core/camel-core/src/test/java/org/apache/camel/impl/CamelProduceInterfaceEventNotifierTest.java b/core/camel-core/src/test/java/org/apache/camel/impl/CamelProduceInterfaceEventNotifierTest.java new file mode 100644 index 0000000..24e5c82 --- /dev/null +++ b/core/camel-core/src/test/java/org/apache/camel/impl/CamelProduceInterfaceEventNotifierTest.java @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.impl; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.camel.CamelContext; +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Produce; +import org.apache.camel.spi.CamelEvent; +import org.apache.camel.support.EventNotifierSupport; +import org.junit.Before; +import org.junit.Test; + +public class CamelProduceInterfaceEventNotifierTest extends ContextTestSupport { + + private static List<CamelEvent> events = new ArrayList<>(); + + private DefaultCamelBeanPostProcessor postProcessor; + + @Override + protected CamelContext createCamelContext() throws Exception { + DefaultCamelContext context = new DefaultCamelContext(createRegistry()); + context.getManagementStrategy().addEventNotifier(new EventNotifierSupport() { + public void notify(CamelEvent event) throws Exception { + if (event instanceof CamelEvent.ExchangeSendingEvent || event instanceof CamelEvent.ExchangeSentEvent) { + events.add(event); + } + } + }); + return context; + } + + @Test + public void testPostProcessor() throws Exception { + events.clear(); + + int before = events.size(); + assertEquals(0, before); + + MySender sender = new MySender(); + + postProcessor.postProcessBeforeInitialization(sender, "foo"); + postProcessor.postProcessAfterInitialization(sender, "foo"); + + getMockEndpoint("mock:result").expectedMessageCount(1); + + sender.hello.sayHello("Hello World"); + + assertMockEndpointsSatisfied(); + + int after = events.size(); + // should be 2 events + assertEquals(2, after); + assertTrue(events.get(0) instanceof CamelEvent.ExchangeSendingEvent); + assertTrue(events.get(1) instanceof CamelEvent.ExchangeSentEvent); + } + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + postProcessor = new DefaultCamelBeanPostProcessor(context); + } + + interface FooService { + + void sayHello(String hello); + } + + class MySender { + + @Produce(uri = "mock:result") + FooService hello; + + } + +}