This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit 00a1f31859150a128a8581df11435d9d89a2184c Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Fri Jun 7 09:41:53 2019 +0200 CAMEL-13515: Allow producer to lazy start until first message --- .../camel/support/service/ServiceHelper.java | 15 ++++ .../camel/processor/channel/DefaultChannel.java | 9 --- .../apache/camel/impl/LazyStartProducerTest.java | 68 ++++++++++++++++++ .../org/apache/camel/support/DefaultEndpoint.java | 15 +++- .../apache/camel/support/LazyStartProducer.java | 83 ++++++++++++++++++++++ 5 files changed, 180 insertions(+), 10 deletions(-) diff --git a/core/camel-api/src/main/java/org/apache/camel/support/service/ServiceHelper.java b/core/camel-api/src/main/java/org/apache/camel/support/service/ServiceHelper.java index 1817e38..05393ea 100644 --- a/core/camel-api/src/main/java/org/apache/camel/support/service/ServiceHelper.java +++ b/core/camel-api/src/main/java/org/apache/camel/support/service/ServiceHelper.java @@ -47,6 +47,21 @@ public final class ServiceHelper { } /** + * Initializes the given {@code value} if it's a {@link Service} or a collection of it. + * <p/> + * Calling this method has no effect if {@code value} is {@code null}. + */ + public static void initService(Object value) { + if (value instanceof Service) { + ((Service) value).init(); + } else if (value instanceof Iterable) { + for (Object o : (Iterable) value) { + initService(o); + } + } + } + + /** * Starts the given {@code value} if it's a {@link Service} or a collection of it. * <p/> * Calling this method has no effect if {@code value} is {@code null}. diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/channel/DefaultChannel.java b/core/camel-base/src/main/java/org/apache/camel/processor/channel/DefaultChannel.java index 71370a6..9d7090b 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/channel/DefaultChannel.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/channel/DefaultChannel.java @@ -176,15 +176,6 @@ public class DefaultChannel extends CamelInternalProcessor implements Channel { if (nextProcessor instanceof CamelContextAware) { ((CamelContextAware) nextProcessor).setCamelContext(camelContext); } - if (nextProcessor instanceof EndpointAware) { - Endpoint endpoint = ((EndpointAware) nextProcessor).getEndpoint(); - if (endpoint instanceof DefaultEndpoint) { - DefaultEndpoint de = (DefaultEndpoint) endpoint; - if (de.isLazyStartProducer()) { - System.out.println("Lazy start producer, so wrap endpoint where we can control when to start the producer"); - } - } - } // the definition to wrap should be the fine grained, // so if a child is set then use it, if not then its the original output used diff --git a/core/camel-core/src/test/java/org/apache/camel/impl/LazyStartProducerTest.java b/core/camel-core/src/test/java/org/apache/camel/impl/LazyStartProducerTest.java new file mode 100644 index 0000000..275ddfe --- /dev/null +++ b/core/camel-core/src/test/java/org/apache/camel/impl/LazyStartProducerTest.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.impl; + +import org.apache.camel.AsyncProducer; +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Exchange; +import org.apache.camel.Producer; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.support.LazyStartProducer; +import org.apache.camel.support.service.ServiceHelper; +import org.junit.Test; + +public class LazyStartProducerTest extends ContextTestSupport { + + @Override + public boolean isUseRouteBuilder() { + return false; + } + + @Test + public void testLazyStartProducer() throws Exception { + context.start(); + + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedBodiesReceived("Hello Lazy Producer", "Hello Again Lazy Producer"); + + Producer delegate = mock.createProducer(); + assertFalse(ServiceHelper.isStarted(delegate)); + + LazyStartProducer lazy = new LazyStartProducer((AsyncProducer) delegate); + assertFalse(ServiceHelper.isStarted(lazy)); + + ServiceHelper.startService(lazy); + assertTrue(ServiceHelper.isStarted(lazy)); + assertFalse(ServiceHelper.isStarted(delegate)); + + // process a message which should start the delegate + Exchange exchange = mock.createExchange(); + exchange.getIn().setBody("Hello Lazy Producer"); + lazy.process(exchange); + assertTrue(ServiceHelper.isStarted(lazy)); + assertTrue(ServiceHelper.isStarted(delegate)); + + // process a message which should start the delegate + exchange = mock.createExchange(); + exchange.getIn().setBody("Hello Again Lazy Producer"); + lazy.process(exchange); + assertTrue(ServiceHelper.isStarted(lazy)); + assertTrue(ServiceHelper.isStarted(delegate)); + + assertMockEndpointsSatisfied(); + } +} diff --git a/core/camel-support/src/main/java/org/apache/camel/support/DefaultEndpoint.java b/core/camel-support/src/main/java/org/apache/camel/support/DefaultEndpoint.java index 88912ec..1e2b551 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/DefaultEndpoint.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/DefaultEndpoint.java @@ -28,6 +28,7 @@ import org.apache.camel.Endpoint; import org.apache.camel.Exchange; import org.apache.camel.ExchangePattern; import org.apache.camel.PollingConsumer; +import org.apache.camel.Producer; import org.apache.camel.ResolveEndpointFailedException; import org.apache.camel.spi.ExceptionHandler; import org.apache.camel.spi.HasId; @@ -185,7 +186,19 @@ public abstract class DefaultEndpoint extends ServiceSupport implements Endpoint @Override public AsyncProducer createAsyncProducer() throws Exception { - return AsyncProcessorConverterHelper.convert(createProducer()); + // create producer and turn it into async + Producer producer = createProducer(); + AsyncProducer target; + if (producer instanceof AsyncProducer) { + target = (AsyncProducer) producer; + } else { + target = AsyncProcessorConverterHelper.convert(producer); + } + if (isLazyStartProducer()) { + // wrap in lazy start + target = new LazyStartProducer(target); + } + return target; } /** diff --git a/core/camel-support/src/main/java/org/apache/camel/support/LazyStartProducer.java b/core/camel-support/src/main/java/org/apache/camel/support/LazyStartProducer.java new file mode 100644 index 0000000..32336b3 --- /dev/null +++ b/core/camel-support/src/main/java/org/apache/camel/support/LazyStartProducer.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.support; + +import org.apache.camel.AsyncCallback; +import org.apache.camel.AsyncProducer; +import org.apache.camel.Exchange; +import org.apache.camel.support.service.ServiceHelper; + +/** + * A {@link org.apache.camel.Producer} which is started lazy, on the first message being processed. + */ +public final class LazyStartProducer extends DefaultAsyncProducer { + + private final AsyncProducer delegate; + + public LazyStartProducer(AsyncProducer producer) { + super(producer.getEndpoint()); + this.delegate = producer; + } + + @Override + public boolean process(Exchange exchange, AsyncCallback callback) { + if (!ServiceHelper.isStarted(delegate)) { + try { + ServiceHelper.startService(delegate); + } catch (Throwable e) { + exchange.setException(e); + return true; + } + } + return delegate.process(exchange, callback); + } + + @Override + public boolean isSingleton() { + return delegate.isSingleton(); + } + + @Override + protected void doInit() throws Exception { + ServiceHelper.initService(delegate); + } + + @Override + protected void doStart() throws Exception { + // noop as we dont want to start the delegate but its started on the first message processed + } + + @Override + protected void doStop() throws Exception { + ServiceHelper.stopService(delegate); + } + + @Override + protected void doSuspend() throws Exception { + ServiceHelper.suspendService(delegate); + } + + @Override + protected void doResume() throws Exception { + ServiceHelper.resumeService(delegate); + } + + @Override + protected void doShutdown() throws Exception { + ServiceHelper.stopAndShutdownService(delegate); + } +}