CAMEL-7833 Added CamelOperatorTest
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/2d9b777a Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/2d9b777a Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/2d9b777a Branch: refs/heads/master Commit: 2d9b777a02cbe8359b738fadadf28da3eb4b96f6 Parents: 081e8a7 Author: Jyrki Ruuskanen <[email protected]> Authored: Wed Apr 1 18:34:47 2015 +0300 Committer: Willem Jiang <[email protected]> Committed: Thu Apr 2 15:48:04 2015 +0800 ---------------------------------------------------------------------- .../java/org/apache/camel/rx/CamelOperator.java | 16 ++++++ .../org/apache/camel/rx/CamelOperatorTest.java | 57 ++++++++++++++++++++ 2 files changed, 73 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/2d9b777a/components/camel-rx/src/main/java/org/apache/camel/rx/CamelOperator.java ---------------------------------------------------------------------- diff --git a/components/camel-rx/src/main/java/org/apache/camel/rx/CamelOperator.java b/components/camel-rx/src/main/java/org/apache/camel/rx/CamelOperator.java index c218776..f965388 100644 --- a/components/camel-rx/src/main/java/org/apache/camel/rx/CamelOperator.java +++ b/components/camel-rx/src/main/java/org/apache/camel/rx/CamelOperator.java @@ -1,3 +1,19 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.camel.rx; import org.apache.camel.CamelContext; http://git-wip-us.apache.org/repos/asf/camel/blob/2d9b777a/components/camel-rx/src/test/java/org/apache/camel/rx/CamelOperatorTest.java ---------------------------------------------------------------------- diff --git a/components/camel-rx/src/test/java/org/apache/camel/rx/CamelOperatorTest.java b/components/camel-rx/src/test/java/org/apache/camel/rx/CamelOperatorTest.java new file mode 100644 index 0000000..8667e3b --- /dev/null +++ b/components/camel-rx/src/test/java/org/apache/camel/rx/CamelOperatorTest.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.rx; + +import java.util.concurrent.TimeUnit; + +import org.apache.camel.Message; +import org.apache.camel.component.mock.MockEndpoint; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import rx.Observable; + +/** + */ +public class CamelOperatorTest extends RxTestSupport { + private static final Logger LOG = LoggerFactory.getLogger(CamelOperatorTest.class); + + @Test + public void testCamelOperator() throws Exception { + final MockEndpoint mockEndpoint1 = camelContext.getEndpoint("mock:results1", MockEndpoint.class); + final MockEndpoint mockEndpoint2 = camelContext.getEndpoint("mock:results2", MockEndpoint.class); + final MockEndpoint mockEndpoint3 = camelContext.getEndpoint("mock:results3", MockEndpoint.class); + mockEndpoint1.expectedMessageCount(2); + mockEndpoint2.expectedMessageCount(1); + mockEndpoint3.expectedMessageCount(1); + + Observable<Message> result = reactiveCamel.toObservable("direct:start") + .lift(new CamelOperator(camelContext, "mock:results1")) + .lift(new CamelOperator(camelContext, "log:foo")) + .debounce(1, TimeUnit.SECONDS) + .lift(new CamelOperator(mockEndpoint2)); + reactiveCamel.sendTo(result, "mock:results3"); + + // Send two test messages + producerTemplate.sendBody("direct:start", "<test/>"); + producerTemplate.sendBody("direct:start", "<test/>"); + + mockEndpoint1.assertIsSatisfied(); + mockEndpoint2.assertIsSatisfied(); + mockEndpoint3.assertIsSatisfied(); + } +}
