This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 00a1f31859150a128a8581df11435d9d89a2184c
Author: Claus Ibsen <claus.ib...@gmail.com>
AuthorDate: Fri Jun 7 09:41:53 2019 +0200

    CAMEL-13515: Allow producer to lazy start until first message
---
 .../camel/support/service/ServiceHelper.java       | 15 ++++
 .../camel/processor/channel/DefaultChannel.java    |  9 ---
 .../apache/camel/impl/LazyStartProducerTest.java   | 68 ++++++++++++++++++
 .../org/apache/camel/support/DefaultEndpoint.java  | 15 +++-
 .../apache/camel/support/LazyStartProducer.java    | 83 ++++++++++++++++++++++
 5 files changed, 180 insertions(+), 10 deletions(-)

diff --git 
a/core/camel-api/src/main/java/org/apache/camel/support/service/ServiceHelper.java
 
b/core/camel-api/src/main/java/org/apache/camel/support/service/ServiceHelper.java
index 1817e38..05393ea 100644
--- 
a/core/camel-api/src/main/java/org/apache/camel/support/service/ServiceHelper.java
+++ 
b/core/camel-api/src/main/java/org/apache/camel/support/service/ServiceHelper.java
@@ -47,6 +47,21 @@ public final class ServiceHelper {
     }
 
     /**
+     * Initializes the given {@code value} if it's a {@link Service} or a 
collection of it.
+     * <p/>
+     * Calling this method has no effect if {@code value} is {@code null}.
+     */
+    public static void initService(Object value) {
+        if (value instanceof Service) {
+            ((Service) value).init();
+        } else if (value instanceof Iterable) {
+            for (Object o : (Iterable) value) {
+                initService(o);
+            }
+        }
+    }
+
+    /**
      * Starts the given {@code value} if it's a {@link Service} or a 
collection of it.
      * <p/>
      * Calling this method has no effect if {@code value} is {@code null}.
diff --git 
a/core/camel-base/src/main/java/org/apache/camel/processor/channel/DefaultChannel.java
 
b/core/camel-base/src/main/java/org/apache/camel/processor/channel/DefaultChannel.java
index 71370a6..9d7090b 100644
--- 
a/core/camel-base/src/main/java/org/apache/camel/processor/channel/DefaultChannel.java
+++ 
b/core/camel-base/src/main/java/org/apache/camel/processor/channel/DefaultChannel.java
@@ -176,15 +176,6 @@ public class DefaultChannel extends CamelInternalProcessor 
implements Channel {
         if (nextProcessor instanceof CamelContextAware) {
             ((CamelContextAware) nextProcessor).setCamelContext(camelContext);
         }
-        if (nextProcessor instanceof EndpointAware) {
-            Endpoint endpoint = ((EndpointAware) nextProcessor).getEndpoint();
-            if (endpoint instanceof DefaultEndpoint) {
-                DefaultEndpoint de = (DefaultEndpoint) endpoint;
-                if (de.isLazyStartProducer()) {
-                    System.out.println("Lazy start producer, so wrap endpoint 
where we can control when to start the producer");
-                }
-            }
-        }
 
         // the definition to wrap should be the fine grained,
         // so if a child is set then use it, if not then its the original 
output used
diff --git 
a/core/camel-core/src/test/java/org/apache/camel/impl/LazyStartProducerTest.java
 
b/core/camel-core/src/test/java/org/apache/camel/impl/LazyStartProducerTest.java
new file mode 100644
index 0000000..275ddfe
--- /dev/null
+++ 
b/core/camel-core/src/test/java/org/apache/camel/impl/LazyStartProducerTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import org.apache.camel.AsyncProducer;
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Producer;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.support.LazyStartProducer;
+import org.apache.camel.support.service.ServiceHelper;
+import org.junit.Test;
+
+public class LazyStartProducerTest extends ContextTestSupport {
+
+    @Override
+    public boolean isUseRouteBuilder() {
+        return false;
+    }
+
+    @Test
+    public void testLazyStartProducer() throws Exception {
+        context.start();
+
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedBodiesReceived("Hello Lazy Producer", "Hello Again Lazy 
Producer");
+
+        Producer delegate = mock.createProducer();
+        assertFalse(ServiceHelper.isStarted(delegate));
+
+        LazyStartProducer lazy = new LazyStartProducer((AsyncProducer) 
delegate);
+        assertFalse(ServiceHelper.isStarted(lazy));
+
+        ServiceHelper.startService(lazy);
+        assertTrue(ServiceHelper.isStarted(lazy));
+        assertFalse(ServiceHelper.isStarted(delegate));
+
+        // process a message which should start the delegate
+        Exchange exchange = mock.createExchange();
+        exchange.getIn().setBody("Hello Lazy Producer");
+        lazy.process(exchange);
+        assertTrue(ServiceHelper.isStarted(lazy));
+        assertTrue(ServiceHelper.isStarted(delegate));
+
+        // process a message which should start the delegate
+        exchange = mock.createExchange();
+        exchange.getIn().setBody("Hello Again Lazy Producer");
+        lazy.process(exchange);
+        assertTrue(ServiceHelper.isStarted(lazy));
+        assertTrue(ServiceHelper.isStarted(delegate));
+
+        assertMockEndpointsSatisfied();
+    }
+}
diff --git 
a/core/camel-support/src/main/java/org/apache/camel/support/DefaultEndpoint.java
 
b/core/camel-support/src/main/java/org/apache/camel/support/DefaultEndpoint.java
index 88912ec..1e2b551 100644
--- 
a/core/camel-support/src/main/java/org/apache/camel/support/DefaultEndpoint.java
+++ 
b/core/camel-support/src/main/java/org/apache/camel/support/DefaultEndpoint.java
@@ -28,6 +28,7 @@ import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
 import org.apache.camel.PollingConsumer;
+import org.apache.camel.Producer;
 import org.apache.camel.ResolveEndpointFailedException;
 import org.apache.camel.spi.ExceptionHandler;
 import org.apache.camel.spi.HasId;
@@ -185,7 +186,19 @@ public abstract class DefaultEndpoint extends 
ServiceSupport implements Endpoint
 
     @Override
     public AsyncProducer createAsyncProducer() throws Exception {
-        return AsyncProcessorConverterHelper.convert(createProducer());
+        // create producer and turn it into async
+        Producer producer = createProducer();
+        AsyncProducer target;
+        if (producer instanceof AsyncProducer) {
+            target = (AsyncProducer) producer;
+        } else {
+            target = AsyncProcessorConverterHelper.convert(producer);
+        }
+        if (isLazyStartProducer()) {
+            // wrap in lazy start
+            target = new LazyStartProducer(target);
+        }
+        return target;
     }
 
     /**
diff --git 
a/core/camel-support/src/main/java/org/apache/camel/support/LazyStartProducer.java
 
b/core/camel-support/src/main/java/org/apache/camel/support/LazyStartProducer.java
new file mode 100644
index 0000000..32336b3
--- /dev/null
+++ 
b/core/camel-support/src/main/java/org/apache/camel/support/LazyStartProducer.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.support;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProducer;
+import org.apache.camel.Exchange;
+import org.apache.camel.support.service.ServiceHelper;
+
+/**
+ * A {@link org.apache.camel.Producer} which is started lazy, on the first 
message being processed.
+ */
+public final class LazyStartProducer extends DefaultAsyncProducer {
+
+    private final AsyncProducer delegate;
+
+    public LazyStartProducer(AsyncProducer producer) {
+        super(producer.getEndpoint());
+        this.delegate = producer;
+    }
+
+    @Override
+    public boolean process(Exchange exchange, AsyncCallback callback) {
+        if (!ServiceHelper.isStarted(delegate)) {
+            try {
+                ServiceHelper.startService(delegate);
+            } catch (Throwable e) {
+                exchange.setException(e);
+                return true;
+            }
+        }
+        return delegate.process(exchange, callback);
+    }
+
+    @Override
+    public boolean isSingleton() {
+        return delegate.isSingleton();
+    }
+
+    @Override
+    protected void doInit() throws Exception {
+        ServiceHelper.initService(delegate);
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        // noop as we dont want to start the delegate but its started on the 
first message processed
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        ServiceHelper.stopService(delegate);
+    }
+
+    @Override
+    protected void doSuspend() throws Exception {
+        ServiceHelper.suspendService(delegate);
+    }
+
+    @Override
+    protected void doResume() throws Exception {
+        ServiceHelper.resumeService(delegate);
+    }
+
+    @Override
+    protected void doShutdown() throws Exception {
+        ServiceHelper.stopAndShutdownService(delegate);
+    }
+}

Reply via email to