This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch camel-13636 in repository https://gitbox.apache.org/repos/asf/camel.git
commit f0d50772af3ab7a4b92a70998d3b682b30c4d425 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Wed Jun 12 12:54:37 2019 +0200 camel3 - SPI for ReactiveHelper so we can plugin different reactive engines --- .../main/java/org/apache/camel/CamelContext.java | 8 +++ .../org/apache/camel/spi/ReactiveExecutor.java | 41 +++++++++++++++ .../camel/impl/engine/AbstractCamelContext.java | 19 +++++++ .../camel/impl/engine/DefaultReactiveExecutor.java | 61 ++++++++++++++++++++++ .../org/apache/camel/impl/DefaultCamelContext.java | 6 +++ 5 files changed, 135 insertions(+) diff --git a/core/camel-api/src/main/java/org/apache/camel/CamelContext.java b/core/camel-api/src/main/java/org/apache/camel/CamelContext.java index 48f928a..b260e98 100644 --- a/core/camel-api/src/main/java/org/apache/camel/CamelContext.java +++ b/core/camel-api/src/main/java/org/apache/camel/CamelContext.java @@ -37,6 +37,7 @@ import org.apache.camel.spi.ManagementNameStrategy; import org.apache.camel.spi.ManagementStrategy; import org.apache.camel.spi.MessageHistoryFactory; import org.apache.camel.spi.PropertiesComponent; +import org.apache.camel.spi.ReactiveExecutor; import org.apache.camel.spi.Registry; import org.apache.camel.spi.RestConfiguration; import org.apache.camel.spi.RestRegistry; @@ -1217,4 +1218,11 @@ public interface CamelContext extends StatefulService, RuntimeConfiguration { */ void setHeadersMapFactory(HeadersMapFactory factory); + ReactiveExecutor getReactiveExecutor(); + + /** + * Sets a custom {@link ReactiveExecutor} to be used. + */ + void setReactiveExecutor(ReactiveExecutor reactiveExecutor); + } diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/ReactiveExecutor.java b/core/camel-api/src/main/java/org/apache/camel/spi/ReactiveExecutor.java new file mode 100644 index 0000000..8987bd3 --- /dev/null +++ b/core/camel-api/src/main/java/org/apache/camel/spi/ReactiveExecutor.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.spi; + +/** + * SPI to plugin different reactive engines in the Camel routing engine. + */ +public interface ReactiveExecutor { + + // TODO: Add javadoc + // TODO: Better name + + void scheduleMain(Runnable runnable); + + void scheduleSync(Runnable runnable); + + void scheduleMain(Runnable runnable, String description); + + void schedule(Runnable runnable); + + void schedule(Runnable runnable, String description); + + void scheduleSync(Runnable runnable, String description); + + boolean executeFromQueue(); + +} diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java index aacefac..521a8d6 100644 --- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java +++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java @@ -119,6 +119,7 @@ import org.apache.camel.spi.NodeIdFactory; import org.apache.camel.spi.PackageScanClassResolver; import org.apache.camel.spi.ProcessorFactory; import org.apache.camel.spi.PropertiesComponent; +import org.apache.camel.spi.ReactiveExecutor; import org.apache.camel.spi.Registry; import org.apache.camel.spi.RestConfiguration; import org.apache.camel.spi.RestRegistry; @@ -216,6 +217,7 @@ public abstract class AbstractCamelContext extends ServiceSupport implements Ext private final Object lock = new Object(); private volatile CamelContextNameStrategy nameStrategy; + private volatile ReactiveExecutor reactiveExecutor; private volatile ManagementNameStrategy managementNameStrategy; private volatile Registry registry; private volatile TypeConverter typeConverter; @@ -3791,6 +3793,21 @@ public abstract class AbstractCamelContext extends ServiceSupport implements Ext this.headersMapFactory = doAddService(headersMapFactory); } + public ReactiveExecutor getReactiveExecutor() { + if (reactiveExecutor == null) { + synchronized (lock) { + if (reactiveExecutor == null) { + setReactiveExecutor(createReactiveExecutor()); + } + } + } + return reactiveExecutor; + } + + public void setReactiveExecutor(ReactiveExecutor reactiveExecutor) { + this.reactiveExecutor = reactiveExecutor; + } + @Override public DeferServiceFactory getDeferServiceFactory() { return deferServiceFactory; @@ -3869,6 +3886,8 @@ public abstract class AbstractCamelContext extends ServiceSupport implements Ext } } + protected abstract ReactiveExecutor createReactiveExecutor(); + protected abstract StreamCachingStrategy createStreamCachingStrategy(); protected abstract TypeConverter createTypeConverter(); diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java new file mode 100644 index 0000000..6a9473b --- /dev/null +++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.impl.engine; + +import org.apache.camel.spi.ReactiveExecutor; +import org.apache.camel.support.ReactiveHelper; + +/** + * Default {@link ReactiveExecutor}. + */ +public class DefaultReactiveExecutor implements ReactiveExecutor { + + @Override + public void scheduleMain(Runnable runnable) { + ReactiveHelper.scheduleMain(runnable); + } + + @Override + public void scheduleSync(Runnable runnable) { + ReactiveHelper.scheduleSync(runnable); + } + + @Override + public void scheduleMain(Runnable runnable, String description) { + ReactiveHelper.scheduleMain(runnable, description); + } + + @Override + public void schedule(Runnable runnable) { + ReactiveHelper.schedule(runnable); + } + + @Override + public void schedule(Runnable runnable, String description) { + ReactiveHelper.schedule(runnable, description); + } + + @Override + public void scheduleSync(Runnable runnable, String description) { + ReactiveHelper.scheduleSync(runnable, description); + } + + @Override + public boolean executeFromQueue() { + return ReactiveHelper.executeFromQueue(); + } +} diff --git a/core/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java b/core/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java index 6b7ac7e..1cce2b8 100644 --- a/core/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java +++ b/core/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java @@ -45,6 +45,7 @@ import org.apache.camel.impl.engine.DefaultMessageHistoryFactory; import org.apache.camel.impl.engine.DefaultNodeIdFactory; import org.apache.camel.impl.engine.DefaultPackageScanClassResolver; import org.apache.camel.impl.engine.DefaultProcessorFactory; +import org.apache.camel.impl.engine.DefaultReactiveExecutor; import org.apache.camel.impl.engine.DefaultRouteController; import org.apache.camel.impl.engine.DefaultShutdownStrategy; import org.apache.camel.impl.engine.DefaultStreamCachingStrategy; @@ -81,6 +82,7 @@ import org.apache.camel.spi.ModelJAXBContextFactory; import org.apache.camel.spi.NodeIdFactory; import org.apache.camel.spi.PackageScanClassResolver; import org.apache.camel.spi.ProcessorFactory; +import org.apache.camel.spi.ReactiveExecutor; import org.apache.camel.spi.Registry; import org.apache.camel.spi.RestRegistryFactory; import org.apache.camel.spi.RouteController; @@ -304,4 +306,8 @@ public class DefaultCamelContext extends AbstractModelCamelContext { protected StreamCachingStrategy createStreamCachingStrategy() { return new DefaultStreamCachingStrategy(); } + + protected ReactiveExecutor createReactiveExecutor() { + return new DefaultReactiveExecutor(); + } }