Updated Branches: refs/heads/master c352b4251 -> 268c13a09
CAMEL-7022: UnitOfWorkFactory for spi to make it possible to plugin custom UoW implementations. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/268c13a0 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/268c13a0 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/268c13a0 Branch: refs/heads/master Commit: 268c13a09e4eb38303309959f70329cf4e522e4f Parents: c352b425 Author: Claus Ibsen <davscl...@apache.org> Authored: Thu Nov 28 10:13:54 2013 +0100 Committer: Claus Ibsen <davscl...@apache.org> Committed: Thu Nov 28 10:16:04 2013 +0100 ---------------------------------------------------------------------- .../java/org/apache/camel/CamelContext.java | 11 +++ .../apache/camel/impl/DefaultCamelContext.java | 10 +++ .../org/apache/camel/impl/DefaultConsumer.java | 2 +- .../camel/impl/DefaultUnitOfWorkFactory.java | 36 +++++++++ .../camel/processor/CamelInternalProcessor.java | 2 +- .../org/apache/camel/spi/UnitOfWorkFactory.java | 31 ++++++++ .../org/apache/camel/util/UnitOfWorkHelper.java | 12 +-- .../camel/impl/CustomUnitOfWorkFactoryTest.java | 78 ++++++++++++++++++++ .../xml/AbstractCamelContextFactoryBean.java | 6 ++ 9 files changed, 177 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/268c13a0/camel-core/src/main/java/org/apache/camel/CamelContext.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/CamelContext.java b/camel-core/src/main/java/org/apache/camel/CamelContext.java index d8f7894..a9005bc 100644 --- a/camel-core/src/main/java/org/apache/camel/CamelContext.java +++ b/camel-core/src/main/java/org/apache/camel/CamelContext.java @@ -55,6 +55,7 @@ import org.apache.camel.spi.ServicePool; import org.apache.camel.spi.ShutdownStrategy; import org.apache.camel.spi.StreamCachingStrategy; import org.apache.camel.spi.TypeConverterRegistry; +import org.apache.camel.spi.UnitOfWorkFactory; import org.apache.camel.spi.UuidGenerator; import org.apache.camel.util.LoadPropertiesException; @@ -1259,4 +1260,14 @@ public interface CamelContext extends SuspendableService, RuntimeConfiguration { */ void setStreamCachingStrategy(StreamCachingStrategy streamCachingStrategy); + /** + * Gets the {@link UnitOfWorkFactory} to use. + */ + UnitOfWorkFactory getUnitOfWorkFactory(); + + /** + * Sets a custom {@link UnitOfWorkFactory} to use. + */ + void setUnitOfWorkFactory(UnitOfWorkFactory unitOfWorkFactory); + } http://git-wip-us.apache.org/repos/asf/camel/blob/268c13a0/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java index b9f8b25..6b1e796 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java +++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java @@ -120,6 +120,7 @@ import org.apache.camel.spi.ServicePool; import org.apache.camel.spi.ShutdownStrategy; import org.apache.camel.spi.StreamCachingStrategy; import org.apache.camel.spi.TypeConverterRegistry; +import org.apache.camel.spi.UnitOfWorkFactory; import org.apache.camel.spi.UuidGenerator; import org.apache.camel.support.ServiceSupport; import org.apache.camel.util.CamelContextHelper; @@ -218,6 +219,7 @@ public class DefaultCamelContext extends ServiceSupport implements ModelCamelCon private ExecutorServiceManager executorServiceManager; private Debugger debugger; private UuidGenerator uuidGenerator = createDefaultUuidGenerator(); + private UnitOfWorkFactory unitOfWorkFactory = new DefaultUnitOfWorkFactory(); private final StopWatch stopWatch = new StopWatch(false); private Date startDate; @@ -1403,6 +1405,14 @@ public class DefaultCamelContext extends ServiceSupport implements ModelCamelCon return producerServicePool; } + public UnitOfWorkFactory getUnitOfWorkFactory() { + return unitOfWorkFactory; + } + + public void setUnitOfWorkFactory(UnitOfWorkFactory unitOfWorkFactory) { + this.unitOfWorkFactory = unitOfWorkFactory; + } + public String getUptime() { // compute and log uptime if (startDate == null) { http://git-wip-us.apache.org/repos/asf/camel/blob/268c13a0/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java index 2d82a47..a141238 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java +++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java @@ -83,7 +83,7 @@ public class DefaultConsumer extends ServiceSupport implements Consumer, RouteAw exchange.setFromRouteId(route.getId()); } - UnitOfWork uow = UnitOfWorkHelper.createUoW(exchange); + UnitOfWork uow = endpoint.getCamelContext().getUnitOfWorkFactory().createUnitOfWork(exchange); exchange.setUnitOfWork(uow); uow.start(); return uow; http://git-wip-us.apache.org/repos/asf/camel/blob/268c13a0/camel-core/src/main/java/org/apache/camel/impl/DefaultUnitOfWorkFactory.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultUnitOfWorkFactory.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultUnitOfWorkFactory.java new file mode 100644 index 0000000..f36b9b4 --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultUnitOfWorkFactory.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.impl; + +import org.apache.camel.Exchange; +import org.apache.camel.spi.UnitOfWork; +import org.apache.camel.spi.UnitOfWorkFactory; + +public class DefaultUnitOfWorkFactory implements UnitOfWorkFactory { + + @Override + public UnitOfWork createUnitOfWork(Exchange exchange) { + UnitOfWork answer; + if (exchange.getContext().isUseMDCLogging()) { + answer = new MDCUnitOfWork(exchange); + } else { + answer = new DefaultUnitOfWork(exchange); + } + return answer; + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/268c13a0/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java index 5c4ef85..29d4c46 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java @@ -615,7 +615,7 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor { } protected UnitOfWork createUnitOfWork(Exchange exchange) { - return UnitOfWorkHelper.createUoW(exchange); + return exchange.getContext().getUnitOfWorkFactory().createUnitOfWork(exchange); } } http://git-wip-us.apache.org/repos/asf/camel/blob/268c13a0/camel-core/src/main/java/org/apache/camel/spi/UnitOfWorkFactory.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/spi/UnitOfWorkFactory.java b/camel-core/src/main/java/org/apache/camel/spi/UnitOfWorkFactory.java new file mode 100644 index 0000000..a11618c --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/spi/UnitOfWorkFactory.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.spi; + +import org.apache.camel.Exchange; + +public interface UnitOfWorkFactory { + + /** + * Creates a new {@link UnitOfWork} + * + * @param exchange the exchange + * @return the created {@link UnitOfWork} + */ + UnitOfWork createUnitOfWork(Exchange exchange); +} + http://git-wip-us.apache.org/repos/asf/camel/blob/268c13a0/camel-core/src/main/java/org/apache/camel/util/UnitOfWorkHelper.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/util/UnitOfWorkHelper.java b/camel-core/src/main/java/org/apache/camel/util/UnitOfWorkHelper.java index 835d237..50c4e8f 100644 --- a/camel-core/src/main/java/org/apache/camel/util/UnitOfWorkHelper.java +++ b/camel-core/src/main/java/org/apache/camel/util/UnitOfWorkHelper.java @@ -21,8 +21,6 @@ import java.util.Collections; import java.util.List; import org.apache.camel.Exchange; -import org.apache.camel.impl.DefaultUnitOfWork; -import org.apache.camel.impl.MDCUnitOfWork; import org.apache.camel.spi.Synchronization; import org.apache.camel.spi.UnitOfWork; import org.slf4j.Logger; @@ -43,15 +41,11 @@ public final class UnitOfWorkHelper { * * @param exchange the exchange * @return the created unit of work (is not started) + * @deprecated use {@link org.apache.camel.CamelContext#getUnitOfWorkFactory()} instead. */ + @Deprecated public static UnitOfWork createUoW(Exchange exchange) { - UnitOfWork answer; - if (exchange.getContext().isUseMDCLogging()) { - answer = new MDCUnitOfWork(exchange); - } else { - answer = new DefaultUnitOfWork(exchange); - } - return answer; + return exchange.getContext().getUnitOfWorkFactory().createUnitOfWork(exchange); } /** http://git-wip-us.apache.org/repos/asf/camel/blob/268c13a0/camel-core/src/test/java/org/apache/camel/impl/CustomUnitOfWorkFactoryTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/impl/CustomUnitOfWorkFactoryTest.java b/camel-core/src/test/java/org/apache/camel/impl/CustomUnitOfWorkFactoryTest.java new file mode 100644 index 0000000..2457b92 --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/impl/CustomUnitOfWorkFactoryTest.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.impl; + +import org.apache.camel.AsyncCallback; +import org.apache.camel.CamelContext; +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.spi.UnitOfWork; +import org.apache.camel.spi.UnitOfWorkFactory; + +public class CustomUnitOfWorkFactoryTest extends ContextTestSupport { + + @Override + protected CamelContext createCamelContext() throws Exception { + CamelContext context = super.createCamelContext(); + context.setUnitOfWorkFactory(new MyUnitOfWorkFactory()); + return context; + } + + public void testCustomUnitOfWorkFactory() throws Exception { + getMockEndpoint("mock:result").expectedBodiesReceived("Hello World"); + getMockEndpoint("mock:result").expectedHeaderReceived("before", "I was here"); + + template.sendBody("direct:start", "Hello World"); + + assertMockEndpointsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:start") + .to("log:foo") + .to("mock:result"); + } + }; + } + + private class MyUnitOfWorkFactory implements UnitOfWorkFactory { + + @Override + public UnitOfWork createUnitOfWork(Exchange exchange) { + return new MyUnitOfWork(exchange); + } + } + + private class MyUnitOfWork extends DefaultUnitOfWork { + + public MyUnitOfWork(Exchange exchange) { + super(exchange); + } + + @Override + public AsyncCallback beforeProcess(Processor processor, Exchange exchange, AsyncCallback callback) { + exchange.getIn().setHeader("before", "I was here"); + return callback; + } + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/268c13a0/components/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelContextFactoryBean.java ---------------------------------------------------------------------- diff --git a/components/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelContextFactoryBean.java b/components/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelContextFactoryBean.java index d682ce1..f2ee007 100644 --- a/components/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelContextFactoryBean.java +++ b/components/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelContextFactoryBean.java @@ -85,6 +85,7 @@ import org.apache.camel.spi.ShutdownStrategy; import org.apache.camel.spi.StreamCachingStrategy; import org.apache.camel.spi.ThreadPoolFactory; import org.apache.camel.spi.ThreadPoolProfile; +import org.apache.camel.spi.UnitOfWorkFactory; import org.apache.camel.spi.UuidGenerator; import org.apache.camel.util.CamelContextHelper; import org.apache.camel.util.ObjectHelper; @@ -210,6 +211,11 @@ public abstract class AbstractCamelContextFactoryBean<T extends ModelCamelContex LOG.info("Using custom EventFactory: {}", eventFactory); getContext().getManagementStrategy().setEventFactory(eventFactory); } + UnitOfWorkFactory unitOfWorkFactory = getBeanForType(UnitOfWorkFactory.class); + if (unitOfWorkFactory != null) { + LOG.info("Using custom UnitOfWorkFactory: {}", unitOfWorkFactory); + getContext().setUnitOfWorkFactory(unitOfWorkFactory); + } // set the event notifier strategies if defined Map<String, EventNotifier> eventNotifiers = getContext().getRegistry().findByTypeWithName(EventNotifier.class); if (eventNotifiers != null && !eventNotifiers.isEmpty()) {