Repository: camel Updated Branches: refs/heads/camel-2.15.x 6a45c7a3b -> 756f4a471
CAMEL-8747: camel-rx - Should leverage UoW when subscribe or observe Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/756f4a47 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/756f4a47 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/756f4a47 Branch: refs/heads/camel-2.15.x Commit: 756f4a4713b0abd2f5d36852bed8afbc1410e5a6 Parents: 6a45c7a Author: Claus Ibsen <davscl...@apache.org> Authored: Wed May 6 10:22:12 2015 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Wed May 6 10:22:39 2015 +0200 ---------------------------------------------------------------------- .../camel/rx/support/EndpointSubscription.java | 6 +- .../apache/camel/rx/support/ObserverSender.java | 13 ++-- .../java/org/apache/camel/rx/SendToTest.java | 4 +- .../java/org/apache/camel/rx/SendToUoWTest.java | 67 ++++++++++++++++++++ .../apache/camel/rx/ToObservableUoWTest.java | 59 +++++++++++++++++ 5 files changed, 140 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/756f4a47/components/camel-rx/src/main/java/org/apache/camel/rx/support/EndpointSubscription.java ---------------------------------------------------------------------- diff --git a/components/camel-rx/src/main/java/org/apache/camel/rx/support/EndpointSubscription.java b/components/camel-rx/src/main/java/org/apache/camel/rx/support/EndpointSubscription.java index 8ecd265..593e1d4 100644 --- a/components/camel-rx/src/main/java/org/apache/camel/rx/support/EndpointSubscription.java +++ b/components/camel-rx/src/main/java/org/apache/camel/rx/support/EndpointSubscription.java @@ -22,6 +22,7 @@ import org.apache.camel.Consumer; import org.apache.camel.Endpoint; import org.apache.camel.Exchange; import org.apache.camel.Processor; +import org.apache.camel.processor.CamelInternalProcessor; import org.apache.camel.util.ServiceHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,8 +49,11 @@ public class EndpointSubscription<T> implements Subscription { // lets create the consumer Processor processor = new ProcessorToObserver<T>(func, observer); + // must ensure the consumer is being executed in an unit of work so synchronization callbacks etc is invoked + CamelInternalProcessor internal = new CamelInternalProcessor(processor); + internal.addAdvice(new CamelInternalProcessor.UnitOfWorkProcessorAdvice(null)); try { - this.consumer = endpoint.createConsumer(processor); + this.consumer = endpoint.createConsumer(internal); ServiceHelper.startService(consumer); } catch (Exception e) { observer.onError(e); http://git-wip-us.apache.org/repos/asf/camel/blob/756f4a47/components/camel-rx/src/main/java/org/apache/camel/rx/support/ObserverSender.java ---------------------------------------------------------------------- diff --git a/components/camel-rx/src/main/java/org/apache/camel/rx/support/ObserverSender.java b/components/camel-rx/src/main/java/org/apache/camel/rx/support/ObserverSender.java index 35f1048..0cf5d90 100644 --- a/components/camel-rx/src/main/java/org/apache/camel/rx/support/ObserverSender.java +++ b/components/camel-rx/src/main/java/org/apache/camel/rx/support/ObserverSender.java @@ -19,19 +19,22 @@ package org.apache.camel.rx.support; import org.apache.camel.Endpoint; import org.apache.camel.Exchange; import org.apache.camel.Producer; -import org.apache.camel.rx.RuntimeCamelRxException; - +import org.apache.camel.processor.UnitOfWorkProducer; import org.apache.camel.util.ServiceHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import rx.Observer; /** * An {@link Observer} which sends events to a given {@link Endpoint} */ public class ObserverSender<T> implements Observer<T> { + private static final Logger LOG = LoggerFactory.getLogger(ObserverSender.class); + private Producer producer; public ObserverSender(Endpoint endpoint) throws Exception { - this.producer = endpoint.createProducer(); + this.producer = new UnitOfWorkProducer(endpoint.createProducer()); ServiceHelper.startService(producer); } @@ -41,7 +44,7 @@ public class ObserverSender<T> implements Observer<T> { try { ServiceHelper.stopService(producer); } catch (Exception e) { - throw new RuntimeCamelRxException(e); + LOG.warn("Error stopping producer: " + producer + " due " + e.getMessage() + ". This exception is ignored.", e); } finally { producer = null; } @@ -66,7 +69,7 @@ public class ObserverSender<T> implements Observer<T> { try { producer.process(exchange); } catch (Exception e) { - throw new RuntimeCamelRxException(e); + exchange.setException(e); } } http://git-wip-us.apache.org/repos/asf/camel/blob/756f4a47/components/camel-rx/src/test/java/org/apache/camel/rx/SendToTest.java ---------------------------------------------------------------------- diff --git a/components/camel-rx/src/test/java/org/apache/camel/rx/SendToTest.java b/components/camel-rx/src/test/java/org/apache/camel/rx/SendToTest.java index 634c7a9..d905f5a 100644 --- a/components/camel-rx/src/test/java/org/apache/camel/rx/SendToTest.java +++ b/components/camel-rx/src/test/java/org/apache/camel/rx/SendToTest.java @@ -21,9 +21,8 @@ import org.junit.Test; import rx.Observable; -/** - */ public class SendToTest extends RxTestSupport { + @Test public void testSendObservableToEndpoint() throws Exception { Order[] expectedBodies = {new Order("o1", 1.10), new Order("o2", 2.20), new Order("o3", 3.30)}; @@ -36,6 +35,5 @@ public class SendToTest extends RxTestSupport { reactiveCamel.sendTo(someObservable, "mock:results"); mockEndpoint.assertIsSatisfied(); - } } http://git-wip-us.apache.org/repos/asf/camel/blob/756f4a47/components/camel-rx/src/test/java/org/apache/camel/rx/SendToUoWTest.java ---------------------------------------------------------------------- diff --git a/components/camel-rx/src/test/java/org/apache/camel/rx/SendToUoWTest.java b/components/camel-rx/src/test/java/org/apache/camel/rx/SendToUoWTest.java new file mode 100644 index 0000000..0852c3d --- /dev/null +++ b/components/camel-rx/src/test/java/org/apache/camel/rx/SendToUoWTest.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.rx; + +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.support.SynchronizationAdapter; +import org.junit.Assert; +import org.junit.Test; +import rx.Observable; + +public class SendToUoWTest extends RxTestSupport { + + private MyOnCompletion onCompletion = new MyOnCompletion(); + + @Test + public void testSendObservableToEndpoint() throws Exception { + Order[] expectedBodies = {new Order("o1", 1.10), new Order("o2", 2.20), new Order("o3", 3.30)}; + Observable<Order> someObservable = Observable.from(expectedBodies); + + final MockEndpoint mockEndpoint = camelContext.getEndpoint("mock:results", MockEndpoint.class); + mockEndpoint.expectedBodiesReceived((Object[]) expectedBodies); + + mockEndpoint.whenAnyExchangeReceived(new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + exchange.addOnCompletion(onCompletion); + } + }); + + // lets send events on the observable to the camel endpoint + reactiveCamel.sendTo(someObservable, "mock:results"); + + mockEndpoint.assertIsSatisfied(); + + Assert.assertEquals(3, onCompletion.getDone()); + } + + private static class MyOnCompletion extends SynchronizationAdapter { + + private int done; + + @Override + public void onComplete(Exchange exchange) { + done++; + } + + public int getDone() { + return done; + } + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/756f4a47/components/camel-rx/src/test/java/org/apache/camel/rx/ToObservableUoWTest.java ---------------------------------------------------------------------- diff --git a/components/camel-rx/src/test/java/org/apache/camel/rx/ToObservableUoWTest.java b/components/camel-rx/src/test/java/org/apache/camel/rx/ToObservableUoWTest.java new file mode 100644 index 0000000..88a5b2f --- /dev/null +++ b/components/camel-rx/src/test/java/org/apache/camel/rx/ToObservableUoWTest.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.rx; + +import java.io.File; + +import org.apache.camel.Exchange; +import org.apache.camel.Message; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.util.FileUtil; +import org.junit.Test; +import rx.Observable; +import rx.functions.Action1; + +public class ToObservableUoWTest extends RxTestSupport { + + @Override + public void init() throws Exception { + FileUtil.removeDir(new File("target/foo")); + super.init(); + } + + @Test + public void testConsumeUoW() throws Exception { + final MockEndpoint mockEndpoint = camelContext.getEndpoint("mock:results", MockEndpoint.class); + mockEndpoint.expectedBodiesReceivedInAnyOrder("Hello World", "Bye World"); + + Observable<Message> observable = reactiveCamel.toObservable("file://target/foo?move=done"); + observable.subscribe(new Action1<Message>() { + @Override + public void call(Message message) { + String body = message.getBody(String.class); + producerTemplate.sendBody("mock:results", body); + } + }); + + producerTemplate.sendBodyAndHeader("file://target/foo", "Hello World", Exchange.FILE_NAME, "hello.txt"); + producerTemplate.sendBodyAndHeader("file://target/foo", "Bye World", Exchange.FILE_NAME, "bye.txt"); + + mockEndpoint.expectedFileExists("target/foo/done/hello.txt"); + mockEndpoint.expectedFileExists("target/foo/done/bye.txt"); + + mockEndpoint.assertIsSatisfied(); + } +}