Updated Branches:
  refs/heads/master c352b4251 -> 268c13a09

CAMEL-7022: UnitOfWorkFactory for spi to make it possible to plugin custom UoW 
implementations.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/268c13a0
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/268c13a0
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/268c13a0

Branch: refs/heads/master
Commit: 268c13a09e4eb38303309959f70329cf4e522e4f
Parents: c352b425
Author: Claus Ibsen <davscl...@apache.org>
Authored: Thu Nov 28 10:13:54 2013 +0100
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Thu Nov 28 10:16:04 2013 +0100

----------------------------------------------------------------------
 .../java/org/apache/camel/CamelContext.java     | 11 +++
 .../apache/camel/impl/DefaultCamelContext.java  | 10 +++
 .../org/apache/camel/impl/DefaultConsumer.java  |  2 +-
 .../camel/impl/DefaultUnitOfWorkFactory.java    | 36 +++++++++
 .../camel/processor/CamelInternalProcessor.java |  2 +-
 .../org/apache/camel/spi/UnitOfWorkFactory.java | 31 ++++++++
 .../org/apache/camel/util/UnitOfWorkHelper.java | 12 +--
 .../camel/impl/CustomUnitOfWorkFactoryTest.java | 78 ++++++++++++++++++++
 .../xml/AbstractCamelContextFactoryBean.java    |  6 ++
 9 files changed, 177 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/268c13a0/camel-core/src/main/java/org/apache/camel/CamelContext.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/CamelContext.java 
b/camel-core/src/main/java/org/apache/camel/CamelContext.java
index d8f7894..a9005bc 100644
--- a/camel-core/src/main/java/org/apache/camel/CamelContext.java
+++ b/camel-core/src/main/java/org/apache/camel/CamelContext.java
@@ -55,6 +55,7 @@ import org.apache.camel.spi.ServicePool;
 import org.apache.camel.spi.ShutdownStrategy;
 import org.apache.camel.spi.StreamCachingStrategy;
 import org.apache.camel.spi.TypeConverterRegistry;
+import org.apache.camel.spi.UnitOfWorkFactory;
 import org.apache.camel.spi.UuidGenerator;
 import org.apache.camel.util.LoadPropertiesException;
 
@@ -1259,4 +1260,14 @@ public interface CamelContext extends 
SuspendableService, RuntimeConfiguration {
      */
     void setStreamCachingStrategy(StreamCachingStrategy streamCachingStrategy);
 
+    /**
+     * Gets the {@link UnitOfWorkFactory} to use.
+     */
+    UnitOfWorkFactory getUnitOfWorkFactory();
+
+    /**
+     * Sets a custom {@link UnitOfWorkFactory} to use.
+     */
+    void setUnitOfWorkFactory(UnitOfWorkFactory unitOfWorkFactory);
+
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/268c13a0/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java 
b/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
index b9f8b25..6b1e796 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
@@ -120,6 +120,7 @@ import org.apache.camel.spi.ServicePool;
 import org.apache.camel.spi.ShutdownStrategy;
 import org.apache.camel.spi.StreamCachingStrategy;
 import org.apache.camel.spi.TypeConverterRegistry;
+import org.apache.camel.spi.UnitOfWorkFactory;
 import org.apache.camel.spi.UuidGenerator;
 import org.apache.camel.support.ServiceSupport;
 import org.apache.camel.util.CamelContextHelper;
@@ -218,6 +219,7 @@ public class DefaultCamelContext extends ServiceSupport 
implements ModelCamelCon
     private ExecutorServiceManager executorServiceManager;
     private Debugger debugger;
     private UuidGenerator uuidGenerator = createDefaultUuidGenerator();
+    private UnitOfWorkFactory unitOfWorkFactory = new 
DefaultUnitOfWorkFactory();
     private final StopWatch stopWatch = new StopWatch(false);
     private Date startDate;
 
@@ -1403,6 +1405,14 @@ public class DefaultCamelContext extends ServiceSupport 
implements ModelCamelCon
         return producerServicePool;
     }
 
+    public UnitOfWorkFactory getUnitOfWorkFactory() {
+        return unitOfWorkFactory;
+    }
+
+    public void setUnitOfWorkFactory(UnitOfWorkFactory unitOfWorkFactory) {
+        this.unitOfWorkFactory = unitOfWorkFactory;
+    }
+
     public String getUptime() {
         // compute and log uptime
         if (startDate == null) {

http://git-wip-us.apache.org/repos/asf/camel/blob/268c13a0/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java 
b/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java
index 2d82a47..a141238 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java
@@ -83,7 +83,7 @@ public class DefaultConsumer extends ServiceSupport 
implements Consumer, RouteAw
             exchange.setFromRouteId(route.getId());
         }
 
-        UnitOfWork uow = UnitOfWorkHelper.createUoW(exchange);
+        UnitOfWork uow = 
endpoint.getCamelContext().getUnitOfWorkFactory().createUnitOfWork(exchange);
         exchange.setUnitOfWork(uow);
         uow.start();
         return uow;

http://git-wip-us.apache.org/repos/asf/camel/blob/268c13a0/camel-core/src/main/java/org/apache/camel/impl/DefaultUnitOfWorkFactory.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/impl/DefaultUnitOfWorkFactory.java 
b/camel-core/src/main/java/org/apache/camel/impl/DefaultUnitOfWorkFactory.java
new file mode 100644
index 0000000..f36b9b4
--- /dev/null
+++ 
b/camel-core/src/main/java/org/apache/camel/impl/DefaultUnitOfWorkFactory.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.spi.UnitOfWork;
+import org.apache.camel.spi.UnitOfWorkFactory;
+
+public class DefaultUnitOfWorkFactory implements UnitOfWorkFactory {
+
+    @Override
+    public UnitOfWork createUnitOfWork(Exchange exchange) {
+        UnitOfWork answer;
+        if (exchange.getContext().isUseMDCLogging()) {
+            answer = new MDCUnitOfWork(exchange);
+        } else {
+            answer = new DefaultUnitOfWork(exchange);
+        }
+        return answer;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/268c13a0/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
 
b/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
index 5c4ef85..29d4c46 100644
--- 
a/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
+++ 
b/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
@@ -615,7 +615,7 @@ public class CamelInternalProcessor extends 
DelegateAsyncProcessor {
         }
 
         protected UnitOfWork createUnitOfWork(Exchange exchange) {
-            return UnitOfWorkHelper.createUoW(exchange);
+            return 
exchange.getContext().getUnitOfWorkFactory().createUnitOfWork(exchange);
         }
 
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/268c13a0/camel-core/src/main/java/org/apache/camel/spi/UnitOfWorkFactory.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/spi/UnitOfWorkFactory.java 
b/camel-core/src/main/java/org/apache/camel/spi/UnitOfWorkFactory.java
new file mode 100644
index 0000000..a11618c
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/spi/UnitOfWorkFactory.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.spi;
+
+import org.apache.camel.Exchange;
+
+public interface UnitOfWorkFactory {
+
+    /**
+     * Creates a new {@link UnitOfWork}
+     *
+     * @param exchange  the exchange
+     * @return the created {@link UnitOfWork}
+     */
+    UnitOfWork createUnitOfWork(Exchange exchange);
+}
+

http://git-wip-us.apache.org/repos/asf/camel/blob/268c13a0/camel-core/src/main/java/org/apache/camel/util/UnitOfWorkHelper.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/util/UnitOfWorkHelper.java 
b/camel-core/src/main/java/org/apache/camel/util/UnitOfWorkHelper.java
index 835d237..50c4e8f 100644
--- a/camel-core/src/main/java/org/apache/camel/util/UnitOfWorkHelper.java
+++ b/camel-core/src/main/java/org/apache/camel/util/UnitOfWorkHelper.java
@@ -21,8 +21,6 @@ import java.util.Collections;
 import java.util.List;
 
 import org.apache.camel.Exchange;
-import org.apache.camel.impl.DefaultUnitOfWork;
-import org.apache.camel.impl.MDCUnitOfWork;
 import org.apache.camel.spi.Synchronization;
 import org.apache.camel.spi.UnitOfWork;
 import org.slf4j.Logger;
@@ -43,15 +41,11 @@ public final class UnitOfWorkHelper {
      *
      * @param exchange the exchange
      * @return the created unit of work (is not started)
+     * @deprecated use {@link 
org.apache.camel.CamelContext#getUnitOfWorkFactory()} instead.
      */
+    @Deprecated
     public static UnitOfWork createUoW(Exchange exchange) {
-        UnitOfWork answer;
-        if (exchange.getContext().isUseMDCLogging()) {
-            answer = new MDCUnitOfWork(exchange);
-        } else {
-            answer = new DefaultUnitOfWork(exchange);
-        }
-        return answer;
+        return 
exchange.getContext().getUnitOfWorkFactory().createUnitOfWork(exchange);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/camel/blob/268c13a0/camel-core/src/test/java/org/apache/camel/impl/CustomUnitOfWorkFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/impl/CustomUnitOfWorkFactoryTest.java
 
b/camel-core/src/test/java/org/apache/camel/impl/CustomUnitOfWorkFactoryTest.java
new file mode 100644
index 0000000..2457b92
--- /dev/null
+++ 
b/camel-core/src/test/java/org/apache/camel/impl/CustomUnitOfWorkFactoryTest.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.CamelContext;
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.spi.UnitOfWork;
+import org.apache.camel.spi.UnitOfWorkFactory;
+
+public class CustomUnitOfWorkFactoryTest extends ContextTestSupport {
+
+    @Override
+    protected CamelContext createCamelContext() throws Exception {
+        CamelContext context = super.createCamelContext();
+        context.setUnitOfWorkFactory(new MyUnitOfWorkFactory());
+        return context;
+    }
+
+    public void testCustomUnitOfWorkFactory() throws Exception {
+        getMockEndpoint("mock:result").expectedBodiesReceived("Hello World");
+        getMockEndpoint("mock:result").expectedHeaderReceived("before", "I was 
here");
+
+        template.sendBody("direct:start", "Hello World");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                    .to("log:foo")
+                    .to("mock:result");
+            }
+        };
+    }
+
+    private class MyUnitOfWorkFactory implements UnitOfWorkFactory {
+
+        @Override
+        public UnitOfWork createUnitOfWork(Exchange exchange) {
+            return new MyUnitOfWork(exchange);
+        }
+    }
+
+    private class MyUnitOfWork extends DefaultUnitOfWork {
+
+        public MyUnitOfWork(Exchange exchange) {
+            super(exchange);
+        }
+
+        @Override
+        public AsyncCallback beforeProcess(Processor processor, Exchange 
exchange, AsyncCallback callback) {
+            exchange.getIn().setHeader("before", "I was here");
+            return callback;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/268c13a0/components/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelContextFactoryBean.java
----------------------------------------------------------------------
diff --git 
a/components/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelContextFactoryBean.java
 
b/components/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelContextFactoryBean.java
index d682ce1..f2ee007 100644
--- 
a/components/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelContextFactoryBean.java
+++ 
b/components/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelContextFactoryBean.java
@@ -85,6 +85,7 @@ import org.apache.camel.spi.ShutdownStrategy;
 import org.apache.camel.spi.StreamCachingStrategy;
 import org.apache.camel.spi.ThreadPoolFactory;
 import org.apache.camel.spi.ThreadPoolProfile;
+import org.apache.camel.spi.UnitOfWorkFactory;
 import org.apache.camel.spi.UuidGenerator;
 import org.apache.camel.util.CamelContextHelper;
 import org.apache.camel.util.ObjectHelper;
@@ -210,6 +211,11 @@ public abstract class AbstractCamelContextFactoryBean<T 
extends ModelCamelContex
             LOG.info("Using custom EventFactory: {}", eventFactory);
             getContext().getManagementStrategy().setEventFactory(eventFactory);
         }
+        UnitOfWorkFactory unitOfWorkFactory = 
getBeanForType(UnitOfWorkFactory.class);
+        if (unitOfWorkFactory != null) {
+            LOG.info("Using custom UnitOfWorkFactory: {}", unitOfWorkFactory);
+            getContext().setUnitOfWorkFactory(unitOfWorkFactory);
+        }
         // set the event notifier strategies if defined
         Map<String, EventNotifier> eventNotifiers = 
getContext().getRegistry().findByTypeWithName(EventNotifier.class);
         if (eventNotifiers != null && !eventNotifiers.isEmpty()) {

Reply via email to