Repository: camel Updated Branches: refs/heads/master 9f893d83e -> 9600bc4fe
CAMEL-8491: Camel POJO producer/consumer should defer starting until CamelContext is starting Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/9600bc4f Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/9600bc4f Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/9600bc4f Branch: refs/heads/master Commit: 9600bc4fec55a4ed02f2404dfd33b3584491ec80 Parents: 9f893d8 Author: Claus Ibsen <davscl...@apache.org> Authored: Mon Mar 16 09:10:53 2015 +0100 Committer: Claus Ibsen <davscl...@apache.org> Committed: Mon Mar 16 13:00:08 2015 +0100 ---------------------------------------------------------------------- .../java/org/apache/camel/CamelContext.java | 16 ++- .../org/apache/camel/DeferStartService.java | 28 ++++ .../camel/component/bean/ProxyHelper.java | 7 +- .../camel/impl/CamelPostProcessorHelper.java | 35 +++-- .../apache/camel/impl/DefaultCamelContext.java | 57 ++++++-- .../camel/impl/DefaultProducerTemplate.java | 6 + .../org/apache/camel/impl/DeferProducer.java | 140 +++++++++++++++++++ .../camel/impl/DeferServiceStartupListener.java | 45 ++++++ .../camel/processor/DeferServiceFactory.java | 46 ++++++ .../impl/PojoProduceInterceptEndpointTest.java | 107 ++++++++++++++ .../PojoProduceProxyInterceptEndpointTest.java | 106 ++++++++++++++ .../handler/CamelNamespaceHandler.java | 12 +- 12 files changed, 566 insertions(+), 39 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/9600bc4f/camel-core/src/main/java/org/apache/camel/CamelContext.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/CamelContext.java b/camel-core/src/main/java/org/apache/camel/CamelContext.java index bfe116c..84505bc 100644 --- a/camel-core/src/main/java/org/apache/camel/CamelContext.java +++ b/camel-core/src/main/java/org/apache/camel/CamelContext.java @@ -219,10 +219,10 @@ public interface CamelContext extends SuspendableService, RuntimeConfiguration { * If the option <tt>closeOnShutdown</tt> is <tt>false</tt> then this context will not stop the service when the context stops. * * @param object the service - * @param closeOnShutdown whether to close the service when this CamelContext shutdown. + * @param stopOnShutdown whether to stop the service when this CamelContext shutdown. * @throws Exception can be thrown when starting the service */ - void addService(Object object, boolean closeOnShutdown) throws Exception; + void addService(Object object, boolean stopOnShutdown) throws Exception; /** * Removes a service from this context. @@ -253,6 +253,18 @@ public interface CamelContext extends SuspendableService, RuntimeConfiguration { <T> T hasService(Class<T> type); /** + * Defers starting the service until {@link CamelContext} is started and has initialized all its prior services and routes. + * <p/> + * If {@link CamelContext} is already started then the service is started immediately. + * + * @param object the service + * @param stopOnShutdown whether to stop the service when this CamelContext shutdown. Setting this to <tt>true</tt> will keep a reference to the service in + * this {@link CamelContext} until the context is stopped. So do not use it for short lived services. + * @throws Exception can be thrown when starting the service, which is only attempted if {@link CamelContext} has already been started when calling this method. + */ + void deferStartService(Object object, boolean stopOnShutdown) throws Exception; + + /** * Adds the given listener to be invoked when {@link CamelContext} have just been started. * <p/> * This allows listeners to do any custom work after the routes and other services have been started and are running. http://git-wip-us.apache.org/repos/asf/camel/blob/9600bc4f/camel-core/src/main/java/org/apache/camel/DeferStartService.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/DeferStartService.java b/camel-core/src/main/java/org/apache/camel/DeferStartService.java new file mode 100644 index 0000000..90e45b3 --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/DeferStartService.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel; + +public interface DeferStartService<T, F> { + + /** + * Creates the service by defers starting the service. + * + * @param factory the factory which creates the service + * @return the service + */ + T create(F factory); +} http://git-wip-us.apache.org/repos/asf/camel/blob/9600bc4f/camel-core/src/main/java/org/apache/camel/component/bean/ProxyHelper.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/component/bean/ProxyHelper.java b/camel-core/src/main/java/org/apache/camel/component/bean/ProxyHelper.java index d287caf..d51cf18 100644 --- a/camel-core/src/main/java/org/apache/camel/component/bean/ProxyHelper.java +++ b/camel-core/src/main/java/org/apache/camel/component/bean/ProxyHelper.java @@ -20,7 +20,7 @@ import java.lang.reflect.Proxy; import org.apache.camel.Endpoint; import org.apache.camel.Producer; -import org.apache.camel.util.ServiceHelper; +import org.apache.camel.processor.DeferServiceFactory; /** * A helper class for creating proxies which delegate to Camel @@ -54,9 +54,8 @@ public final class ProxyHelper { * Creates a Proxy which sends the exchange to the endpoint. */ public static <T> T createProxy(Endpoint endpoint, ClassLoader cl, Class<T>[] interfaceClasses, MethodInfoCache methodCache) throws Exception { - Producer producer = endpoint.createProducer(); - // ensure the producer is started - ServiceHelper.startService(producer); + Producer producer = DeferServiceFactory.createProducer(endpoint); + endpoint.getCamelContext().deferStartService(producer, true); return createProxyObject(endpoint, producer, cl, interfaceClasses, methodCache); } http://git-wip-us.apache.org/repos/asf/camel/blob/9600bc4f/camel-core/src/main/java/org/apache/camel/impl/CamelPostProcessorHelper.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/CamelPostProcessorHelper.java b/camel-core/src/main/java/org/apache/camel/impl/CamelPostProcessorHelper.java index 817a2f9..af91ae7 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/CamelPostProcessorHelper.java +++ b/camel-core/src/main/java/org/apache/camel/impl/CamelPostProcessorHelper.java @@ -22,7 +22,6 @@ import javax.xml.bind.annotation.XmlTransient; import org.apache.camel.CamelContext; import org.apache.camel.CamelContextAware; -import org.apache.camel.Component; import org.apache.camel.Consume; import org.apache.camel.Consumer; import org.apache.camel.ConsumerTemplate; @@ -39,6 +38,7 @@ import org.apache.camel.component.bean.BeanInfo; import org.apache.camel.component.bean.BeanProcessor; import org.apache.camel.component.bean.ProxyHelper; import org.apache.camel.processor.CamelInternalProcessor; +import org.apache.camel.processor.DeferServiceFactory; import org.apache.camel.processor.UnitOfWorkProducer; import org.apache.camel.util.CamelContextHelper; import org.apache.camel.util.IntrospectionSupport; @@ -105,7 +105,7 @@ public class CamelPostProcessorHelper implements CamelContextAware { Processor processor = createConsumerProcessor(bean, method, endpoint); Consumer consumer = endpoint.createConsumer(processor); LOG.debug("Created processor: {} for consumer: {}", processor, consumer); - startService(consumer, bean, beanName); + startService(consumer, endpoint.getCamelContext(), bean, beanName); } catch (Exception e) { throw ObjectHelper.wrapRuntimeCamelException(e); } @@ -115,13 +115,19 @@ public class CamelPostProcessorHelper implements CamelContextAware { /** * Stats the given service */ - protected void startService(Service service, Object bean, String beanName) throws Exception { - if (isSingleton(bean, beanName)) { - getCamelContext().addService(service); + protected void startService(Service service, CamelContext camelContext, Object bean, String beanName) throws Exception { + // defer starting the service until CamelContext has started all its initial services + if (camelContext != null) { + camelContext.deferStartService(service, true); } else { - LOG.debug("Service is not singleton so you must remember to stop it manually {}", service); + // mo CamelContext then start service manually ServiceHelper.startService(service); } + + boolean singleton = isSingleton(bean, beanName); + if (!singleton) { + LOG.debug("Service is not singleton so you must remember to stop it manually {}", service); + } } /** @@ -281,10 +287,12 @@ public class CamelPostProcessorHelper implements CamelContextAware { String injectionPointName, Object bean) { // endpoint is optional for this injection point Endpoint endpoint = getEndpointInjection(bean, endpointUri, endpointRef, endpointProperty, injectionPointName, false); - ProducerTemplate answer = new DefaultProducerTemplate(getCamelContext(), endpoint); + CamelContext context = endpoint != null ? endpoint.getCamelContext() : getCamelContext(); + ProducerTemplate answer = new DefaultProducerTemplate(context, endpoint); // start the template so its ready to use try { - answer.start(); + // no need to defer the template as it can adjust to the endpoint at runtime + startService(answer, context, bean, null); } catch (Exception e) { throw ObjectHelper.wrapRuntimeCamelException(e); } @@ -299,7 +307,7 @@ public class CamelPostProcessorHelper implements CamelContextAware { ConsumerTemplate answer = new DefaultConsumerTemplate(getCamelContext()); // start the template so its ready to use try { - answer.start(); + startService(answer, null, null, null); } catch (Exception e) { throw ObjectHelper.wrapRuntimeCamelException(e); } @@ -311,9 +319,9 @@ public class CamelPostProcessorHelper implements CamelContextAware { */ protected PollingConsumer createInjectionPollingConsumer(Endpoint endpoint, Object bean, String beanName) { try { - PollingConsumer pollingConsumer = endpoint.createPollingConsumer(); - startService(pollingConsumer, bean, beanName); - return pollingConsumer; + PollingConsumer consumer = endpoint.createPollingConsumer(); + startService(consumer, endpoint.getCamelContext(), bean, beanName); + return consumer; } catch (Exception e) { throw ObjectHelper.wrapRuntimeCamelException(e); } @@ -324,8 +332,7 @@ public class CamelPostProcessorHelper implements CamelContextAware { */ protected Producer createInjectionProducer(Endpoint endpoint, Object bean, String beanName) { try { - Producer producer = endpoint.createProducer(); - startService(producer, bean, beanName); + Producer producer = DeferServiceFactory.createProducer(endpoint); return new UnitOfWorkProducer(producer); } catch (Exception e) { throw ObjectHelper.wrapRuntimeCamelException(e); http://git-wip-us.apache.org/repos/asf/camel/blob/9600bc4f/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java index 9ac71b6..f9cfceb 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java +++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java @@ -38,7 +38,6 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; - import javax.naming.Context; import javax.xml.bind.JAXBContext; import javax.xml.bind.Unmarshaller; @@ -176,8 +175,9 @@ public class DefaultCamelContext extends ServiceSupport implements ModelCamelCon private final List<EndpointStrategy> endpointStrategies = new ArrayList<EndpointStrategy>(); private final Map<String, Component> components = new HashMap<String, Component>(); private final Set<Route> routes = new LinkedHashSet<Route>(); - private final List<Service> servicesToClose = new CopyOnWriteArrayList<Service>(); + private final List<Service> servicesToStop = new CopyOnWriteArrayList<Service>(); private final Set<StartupListener> startupListeners = new LinkedHashSet<StartupListener>(); + private final DeferServiceStartupListener deferStartupListener = new DeferServiceStartupListener(); private TypeConverter typeConverter; private TypeConverterRegistry typeConverterRegistry; private Injector injector; @@ -266,6 +266,9 @@ public class DefaultCamelContext extends ServiceSupport implements ModelCamelCon // create endpoint registry at first since end users may access endpoints before CamelContext is started this.endpoints = new DefaultEndpointRegistry(this); + // add the derfer service startup listener + this.startupListeners.add(deferStartupListener); + // use WebSphere specific resolver if running on WebSphere if (WebSpherePackageScanClassResolver.isWebSphereClassLoader(this.getClass().getClassLoader())) { log.info("Using WebSphere specific PackageScanClassResolver"); @@ -1054,11 +1057,11 @@ public class DefaultCamelContext extends ServiceSupport implements ModelCamelCon addService(object, true); } - public void addService(Object object, boolean closeOnShutdown) throws Exception { - doAddService(object, closeOnShutdown); + public void addService(Object object, boolean stopOnShutdown) throws Exception { + doAddService(object, stopOnShutdown); } - private void doAddService(Object object, boolean closeOnShutdown) throws Exception { + private void doAddService(Object object, boolean stopOnShutdown) throws Exception { // inject CamelContext if (object instanceof CamelContextAware) { CamelContextAware aware = (CamelContextAware) object; @@ -1085,9 +1088,9 @@ public class DefaultCamelContext extends ServiceSupport implements ModelCamelCon } // do not add endpoints as they have their own list if (singleton && !(service instanceof Endpoint)) { - // only add to list of services to close if its not already there - if (closeOnShutdown && !hasService(service)) { - servicesToClose.add(service); + // only add to list of services to stop if its not already there + if (stopOnShutdown && !hasService(service)) { + servicesToStop.add(service); } } } @@ -1110,7 +1113,7 @@ public class DefaultCamelContext extends ServiceSupport implements ModelCamelCon for (LifecycleStrategy strategy : lifecycleStrategies) { strategy.onServiceRemove(this, service, null); } - return servicesToClose.remove(service); + return servicesToStop.remove(service); } return false; } @@ -1118,14 +1121,14 @@ public class DefaultCamelContext extends ServiceSupport implements ModelCamelCon public boolean hasService(Object object) { if (object instanceof Service) { Service service = (Service) object; - return servicesToClose.contains(service); + return servicesToStop.contains(service); } return false; } @Override public <T> T hasService(Class<T> type) { - for (Service service : servicesToClose) { + for (Service service : servicesToStop) { if (type.isInstance(service)) { return type.cast(service); } @@ -1133,6 +1136,32 @@ public class DefaultCamelContext extends ServiceSupport implements ModelCamelCon return null; } + public void deferStartService(Object object, boolean stopOnShutdown) throws Exception { + if (object instanceof Service) { + Service service = (Service) object; + + // only add to services to close if its a singleton + // otherwise we could for example end up with a lot of prototype scope endpoints + boolean singleton = true; // assume singleton by default + if (object instanceof IsSingleton) { + singleton = ((IsSingleton) service).isSingleton(); + } + // do not add endpoints as they have their own list + if (singleton && !(service instanceof Endpoint)) { + // only add to list of services to stop if its not already there + if (stopOnShutdown && !hasService(service)) { + servicesToStop.add(service); + } + } + // are we already started? + if (isStarted()) { + ServiceHelper.startService(service); + } else { + deferStartupListener.addService(service); + } + } + } + public void addStartupListener(StartupListener listener) throws Exception { // either add to listener so we can invoke then later when CamelContext has been started // or invoke the callback right now @@ -2680,7 +2709,7 @@ public class DefaultCamelContext extends ServiceSupport implements ModelCamelCon // stop consumers from the services to close first, such as POJO consumer (eg @Consumer) // which we need to stop after the routes, as a POJO consumer is essentially a route also - for (Service service : servicesToClose) { + for (Service service : servicesToStop) { if (service instanceof Consumer) { shutdownServices(service); } @@ -2716,8 +2745,8 @@ public class DefaultCamelContext extends ServiceSupport implements ModelCamelCon } // shutdown services as late as possible - shutdownServices(servicesToClose); - servicesToClose.clear(); + shutdownServices(servicesToStop); + servicesToStop.clear(); // must notify that we are stopped before stopping the management strategy EventHelper.notifyCamelContextStopped(this); http://git-wip-us.apache.org/repos/asf/camel/blob/9600bc4f/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java index 6a7cfd4..aa3fa10 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java +++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java @@ -736,6 +736,12 @@ public class DefaultProducerTemplate extends ServiceSupport implements ProducerT } producerCache.setEventNotifierEnabled(isEventNotifierEnabled()); } + + // need to lookup default endpoint as it may have been intercepted + if (defaultEndpoint != null) { + defaultEndpoint = camelContext.getEndpoint(defaultEndpoint.getEndpointUri()); + } + ServiceHelper.startService(producerCache); } http://git-wip-us.apache.org/repos/asf/camel/blob/9600bc4f/camel-core/src/main/java/org/apache/camel/impl/DeferProducer.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/DeferProducer.java b/camel-core/src/main/java/org/apache/camel/impl/DeferProducer.java new file mode 100644 index 0000000..0ddf1bf --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/impl/DeferProducer.java @@ -0,0 +1,140 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.impl; + +import org.apache.camel.AsyncCallback; +import org.apache.camel.AsyncProcessor; +import org.apache.camel.Endpoint; +import org.apache.camel.Exchange; +import org.apache.camel.ExchangePattern; +import org.apache.camel.Producer; +import org.apache.camel.util.ServiceHelper; + +/** + * A {@link Producer} that defers being started, until {@link org.apache.camel.CamelContext} has been started, this + * ensures that the producer is able to adapt to changes that may otherwise occur during starting + * CamelContext. If we do not defer starting the producer it may not adapt to those changes, and + * send messages to wrong endpoints. + */ +public class DeferProducer extends org.apache.camel.support.ServiceSupport implements Producer, AsyncProcessor { + + private Producer delegate; + private final Endpoint endpoint; + + public DeferProducer(Endpoint endpoint) { + this.endpoint = endpoint; + } + + @Override + public Exchange createExchange() { + if (delegate == null) { + throw new IllegalStateException("Not started"); + } + return delegate.createExchange(); + } + + @Override + public Exchange createExchange(ExchangePattern pattern) { + if (delegate == null) { + throw new IllegalStateException("Not started"); + } + return delegate.createExchange(pattern); + } + + @Override + @Deprecated + public Exchange createExchange(Exchange exchange) { + if (delegate == null) { + throw new IllegalStateException("Not started"); + } + return delegate.createExchange(exchange); + } + + @Override + public void process(Exchange exchange) throws Exception { + if (delegate == null) { + throw new IllegalStateException("Not started"); + } + delegate.process(exchange); + } + + @Override + public boolean process(Exchange exchange, AsyncCallback callback) { + if (delegate == null) { + exchange.setException(new IllegalStateException("Not started")); + callback.done(true); + return true; + } + + if (delegate instanceof AsyncProcessor) { + return ((AsyncProcessor) delegate).process(exchange, callback); + } + + // fallback to sync mode + try { + process(exchange); + } catch (Exception e) { + exchange.setException(e); + } + + callback.done(true); + return true; + } + + @Override + protected void doStart() throws Exception { + // need to lookup endpoint again as it may be intercepted + Endpoint lookup = endpoint.getCamelContext().getEndpoint(endpoint.getEndpointUri()); + + delegate = lookup.createProducer(); + ServiceHelper.startService(delegate); + } + + @Override + protected void doStop() throws Exception { + ServiceHelper.stopService(delegate); + } + + @Override + public boolean isSingleton() { + if (delegate != null) { + return delegate.isSingleton(); + } else { + // assume singleton by default + return true; + } + } + + @Override + public Endpoint getEndpoint() { + if (delegate != null) { + return delegate.getEndpoint(); + } else { + return endpoint; + } + } + + @Override + public String toString() { + if (delegate != null) { + return delegate.toString(); + } else { + return "DelegateProducer[" + endpoint + "]"; + } + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/9600bc4f/camel-core/src/main/java/org/apache/camel/impl/DeferServiceStartupListener.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/DeferServiceStartupListener.java b/camel-core/src/main/java/org/apache/camel/impl/DeferServiceStartupListener.java new file mode 100644 index 0000000..a78bdd8 --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/impl/DeferServiceStartupListener.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.impl; + +import java.util.Set; +import java.util.concurrent.CopyOnWriteArraySet; + +import org.apache.camel.CamelContext; +import org.apache.camel.Service; +import org.apache.camel.StartupListener; +import org.apache.camel.util.ServiceHelper; + +/** + * A {@link org.apache.camel.StartupListener} that defers starting {@link Service}s. + */ +public class DeferServiceStartupListener implements StartupListener { + + private final Set<Service> services = new CopyOnWriteArraySet<Service>(); + + public void addService(Service service) { + services.add(service); + } + + @Override + public void onCamelContextStarted(CamelContext context, boolean alreadyStarted) throws Exception { + for (Service service : services) { + ServiceHelper.startService(service); + } + services.clear(); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/9600bc4f/camel-core/src/main/java/org/apache/camel/processor/DeferServiceFactory.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/DeferServiceFactory.java b/camel-core/src/main/java/org/apache/camel/processor/DeferServiceFactory.java new file mode 100644 index 0000000..9dac0dd --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/processor/DeferServiceFactory.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor; + +import org.apache.camel.Endpoint; +import org.apache.camel.Producer; +import org.apache.camel.impl.DeferProducer; + +/** + * Factory to create {@link org.apache.camel.DeferStartService} services such as {@link Producer}s + * and {@link org.apache.camel.PollingConsumer}s + */ +public class DeferServiceFactory { + + /** + * Creates the {@link Producer} which is deferred started until {@link org.apache.camel.CamelContext} is being started. + * <p/> + * When the producer is started, it re-lookup the endpoint to capture any changes such as the endpoint has been intercepted. + * This allows the producer to react and send messages to the updated endpoint. + * + * @param endpoint the endpoint + * @return the producer which will be deferred started until {@link org.apache.camel.CamelContext} has been started + * @throws Exception can be thrown if there is an error starting the producer + * @see org.apache.camel.impl.DeferProducer + */ + public static Producer createProducer(Endpoint endpoint) throws Exception { + Producer producer = new DeferProducer(endpoint); + endpoint.getCamelContext().deferStartService(producer, true); + return producer; + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/9600bc4f/camel-core/src/test/java/org/apache/camel/impl/PojoProduceInterceptEndpointTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/impl/PojoProduceInterceptEndpointTest.java b/camel-core/src/test/java/org/apache/camel/impl/PojoProduceInterceptEndpointTest.java new file mode 100644 index 0000000..553d28e --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/impl/PojoProduceInterceptEndpointTest.java @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.impl; + +import junit.framework.TestCase; +import org.apache.camel.CamelContext; +import org.apache.camel.Exchange; +import org.apache.camel.Produce; +import org.apache.camel.Producer; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; + +/** + * + */ +public class PojoProduceInterceptEndpointTest extends TestCase { + + public void testPojoProduceInterceptAlreadyStarted() throws Exception { + CamelContext context = new DefaultCamelContext(); + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + interceptSendToEndpoint("direct:start") + .to("language:simple:${body}${body}"); + + from("direct:start") + .to("mock:result"); + } + }); + + // start Camel before POJO being injected + context.start(); + + // use the injector (will use the default) + // which should post process the bean to inject the @Produce + MyBean bean = context.getInjector().newInstance(MyBean.class); + + MockEndpoint mock = context.getEndpoint("mock:result", MockEndpoint.class); + mock.expectedBodiesReceived("WorldWorld"); + + Object reply = bean.doSomething("World"); + assertEquals("WorldWorld", reply); + + mock.assertIsSatisfied(); + + context.stop(); + } + + public void testPojoProduceInterceptNotStarted() throws Exception { + CamelContext context = new DefaultCamelContext(); + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + interceptSendToEndpoint("direct:start") + .to("language:simple:${body}${body}"); + + from("direct:start") + .to("mock:result"); + } + }); + + // use the injector (will use the default) + // which should post process the bean to inject the @Produce + MyBean bean = context.getInjector().newInstance(MyBean.class); + + // do NOT start Camel before POJO being injected + context.start(); + + MockEndpoint mock = context.getEndpoint("mock:result", MockEndpoint.class); + mock.expectedBodiesReceived("WorldWorld"); + + Object reply = bean.doSomething("World"); + assertEquals("WorldWorld", reply); + + mock.assertIsSatisfied(); + + context.stop(); + } + + public static class MyBean { + + @Produce(uri = "direct:start") + Producer producer; + + public Object doSomething(String body) throws Exception { + Exchange exchange = producer.createExchange(); + exchange.getIn().setBody(body); + producer.process(exchange); + return exchange.hasOut() ? exchange.getOut().getBody() : exchange.getIn().getBody(); + } + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/9600bc4f/camel-core/src/test/java/org/apache/camel/impl/PojoProduceProxyInterceptEndpointTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/impl/PojoProduceProxyInterceptEndpointTest.java b/camel-core/src/test/java/org/apache/camel/impl/PojoProduceProxyInterceptEndpointTest.java new file mode 100644 index 0000000..f03ec9d --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/impl/PojoProduceProxyInterceptEndpointTest.java @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.impl; + +import junit.framework.TestCase; +import org.apache.camel.CamelContext; +import org.apache.camel.Produce; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; + +/** + * + */ +public class PojoProduceProxyInterceptEndpointTest extends TestCase { + + public void testPojoProduceInterceptAlreadyStarted() throws Exception { + CamelContext context = new DefaultCamelContext(); + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + interceptSendToEndpoint("direct:start") + .to("language:simple:${body}${body}"); + + from("direct:start") + .to("mock:result"); + } + }); + + // start Camel before POJO being injected + context.start(); + + // use the injector (will use the default) + // which should post process the bean to inject the @Produce + MyBean bean = context.getInjector().newInstance(MyBean.class); + + MockEndpoint mock = context.getEndpoint("mock:result", MockEndpoint.class); + mock.expectedBodiesReceived("WorldWorld"); + + Object reply = bean.doSomething("World"); + assertEquals("WorldWorld", reply); + + mock.assertIsSatisfied(); + + context.stop(); + } + + public void testPojoProduceInterceptNotStarted() throws Exception { + CamelContext context = new DefaultCamelContext(); + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + interceptSendToEndpoint("direct:start") + .to("language:simple:${body}${body}"); + + from("direct:start") + .to("mock:result"); + } + }); + + // use the injector (will use the default) + // which should post process the bean to inject the @Produce + MyBean bean = context.getInjector().newInstance(MyBean.class); + + // do NOT start Camel before POJO being injected + context.start(); + + MockEndpoint mock = context.getEndpoint("mock:result", MockEndpoint.class); + mock.expectedBodiesReceived("WorldWorld"); + + Object reply = bean.doSomething("World"); + assertEquals("WorldWorld", reply); + + mock.assertIsSatisfied(); + + context.stop(); + } + + public static interface EchoService { + public String echo(String word); + } + + public static class MyBean { + + @Produce(uri = "direct:start") + private EchoService echo; + + public Object doSomething(String body) throws Exception { + return echo.echo(body); + } + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/9600bc4f/components/camel-blueprint/src/main/java/org/apache/camel/blueprint/handler/CamelNamespaceHandler.java ---------------------------------------------------------------------- diff --git a/components/camel-blueprint/src/main/java/org/apache/camel/blueprint/handler/CamelNamespaceHandler.java b/components/camel-blueprint/src/main/java/org/apache/camel/blueprint/handler/CamelNamespaceHandler.java index cd44d85..475c057 100644 --- a/components/camel-blueprint/src/main/java/org/apache/camel/blueprint/handler/CamelNamespaceHandler.java +++ b/components/camel-blueprint/src/main/java/org/apache/camel/blueprint/handler/CamelNamespaceHandler.java @@ -928,11 +928,13 @@ public class CamelNamespaceHandler implements NamespaceHandler { @Override protected boolean isSingleton(Object bean, String beanName) { - ComponentMetadata meta = blueprintContainer.getComponentMetadata(beanName); - if (meta != null && meta instanceof BeanMetadata) { - String scope = ((BeanMetadata) meta).getScope(); - if (scope != null) { - return BeanMetadata.SCOPE_SINGLETON.equals(scope); + if (beanName != null) { + ComponentMetadata meta = blueprintContainer.getComponentMetadata(beanName); + if (meta != null && meta instanceof BeanMetadata) { + String scope = ((BeanMetadata) meta).getScope(); + if (scope != null) { + return BeanMetadata.SCOPE_SINGLETON.equals(scope); + } } } // fallback to super, which will assume singleton