Repository: camel Updated Branches: refs/heads/master e16315825 -> 267f065fb
CAMEL-10612: adding Publisher types for bean call Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/267f065f Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/267f065f Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/267f065f Branch: refs/heads/master Commit: 267f065fb9640a95164404795a3094d978e313cd Parents: e163158 Author: Nicola Ferraro <ni.ferr...@gmail.com> Authored: Wed Feb 1 16:47:56 2017 +0100 Committer: Nicola Ferraro <ni.ferr...@gmail.com> Committed: Wed Feb 1 16:48:09 2017 +0100 ---------------------------------------------------------------------- .../streams/ReactiveStreamsConverter.java | 62 ++++++++ .../reactive/streams/util/MonoPublisher.java | 64 ++++++++ .../streams/util/UnwrappingStreamProcessor.java | 78 ++++++++++ .../services/org/apache/camel/TypeConverter | 18 +++ .../reactive/streams/BeanCallTest.java | 148 +++++++++++++++++++ 5 files changed, 370 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/267f065f/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsConverter.java ---------------------------------------------------------------------- diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsConverter.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsConverter.java new file mode 100644 index 0000000..23f49c9 --- /dev/null +++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsConverter.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.reactive.streams; + +import org.apache.camel.AsyncCallback; +import org.apache.camel.AsyncProcessor; +import org.apache.camel.CamelContext; +import org.apache.camel.CamelContextAware; +import org.apache.camel.Converter; +import org.apache.camel.Exchange; +import org.apache.camel.FallbackConverter; +import org.apache.camel.Processor; +import org.apache.camel.component.bean.BeanInfo; +import org.apache.camel.component.bean.BeanProcessor; +import org.apache.camel.component.bean.ConstantBeanHolder; +import org.apache.camel.component.reactive.streams.util.MonoPublisher; +import org.apache.camel.component.reactive.streams.util.UnwrappingStreamProcessor; +import org.apache.camel.spi.TypeConverterRegistry; +import org.apache.camel.util.AsyncProcessorHelper; +import org.reactivestreams.Publisher; + + +@Converter +public final class ReactiveStreamsConverter implements CamelContextAware { + + private CamelContext camelContext; + + public ReactiveStreamsConverter() { + } + + @FallbackConverter + public Object convertToPublisher(Class<?> type, Exchange exchange, Object value, TypeConverterRegistry registry) { + if (type.isAssignableFrom(Publisher.class)) { + return new MonoPublisher<>(value); + } + return null; + } + + @Override + public CamelContext getCamelContext() { + return camelContext; + } + + @Override + public void setCamelContext(CamelContext camelContext) { + this.camelContext = camelContext; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/267f065f/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/MonoPublisher.java ---------------------------------------------------------------------- diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/MonoPublisher.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/MonoPublisher.java new file mode 100644 index 0000000..d0be491 --- /dev/null +++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/MonoPublisher.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.reactive.streams.util; + +import java.util.Objects; + +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; + +/** + * A publisher that publish a given item, then completes. + */ +public class MonoPublisher<T> implements Publisher<T> { + + private T item; + + public MonoPublisher(T item) { + this.item = item; + } + + @Override + public void subscribe(Subscriber<? super T> subscriber) { + Objects.requireNonNull(subscriber, "subscriber must not be null"); + subscriber.onSubscribe(new Subscription() { + + private boolean terminated; + + @Override + public void request(long l) { + if (terminated) { + throw new IllegalStateException("The subscription is terminated"); + } + + if (l <= 0) { + subscriber.onError(new IllegalArgumentException("3.9")); + } else { + subscriber.onNext(item); + subscriber.onComplete(); + } + terminated = true; + } + + @Override + public void cancel() { + terminated = true; + } + }); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/267f065f/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/UnwrappingStreamProcessor.java ---------------------------------------------------------------------- diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/UnwrappingStreamProcessor.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/UnwrappingStreamProcessor.java new file mode 100644 index 0000000..fa85fd2 --- /dev/null +++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/UnwrappingStreamProcessor.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.reactive.streams.util; + +import java.util.LinkedList; +import java.util.List; + +import org.apache.camel.AsyncCallback; +import org.apache.camel.AsyncProcessor; +import org.apache.camel.Exchange; +import org.apache.camel.util.AsyncProcessorHelper; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; + +/** + * A Processor that converts a Publisher into its content asynchronously. + */ +public class UnwrappingStreamProcessor implements AsyncProcessor { + + @Override + public boolean process(Exchange exchange, AsyncCallback callback) { + Object content = exchange.getIn().getBody(); + if (content instanceof Publisher) { + Publisher<?> pub = Publisher.class.cast(content); + + List<Object> data = new LinkedList<>(); + + pub.subscribe(new Subscriber<Object>() { + + @Override + public void onSubscribe(Subscription subscription) { + subscription.request(Long.MAX_VALUE); + } + + @Override + public void onNext(Object o) { + data.add(o); + } + + @Override + public void onError(Throwable throwable) { + exchange.getIn().setBody(data); + exchange.setException(throwable); + callback.done(false); + } + + @Override + public void onComplete() { + exchange.getIn().setBody(data); + callback.done(false); + } + + }); + } + return false; + } + + @Override + public void process(Exchange exchange) throws Exception { + AsyncProcessorHelper.process(this, exchange); + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/267f065f/components/camel-reactive-streams/src/main/resources/META-INF/services/org/apache/camel/TypeConverter ---------------------------------------------------------------------- diff --git a/components/camel-reactive-streams/src/main/resources/META-INF/services/org/apache/camel/TypeConverter b/components/camel-reactive-streams/src/main/resources/META-INF/services/org/apache/camel/TypeConverter new file mode 100644 index 0000000..1167eeb --- /dev/null +++ b/components/camel-reactive-streams/src/main/resources/META-INF/services/org/apache/camel/TypeConverter @@ -0,0 +1,18 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +org.apache.camel.component.reactive.streams.ReactiveStreamsConverter \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/267f065f/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/BeanCallTest.java ---------------------------------------------------------------------- diff --git a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/BeanCallTest.java b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/BeanCallTest.java new file mode 100644 index 0000000..1f05a00 --- /dev/null +++ b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/BeanCallTest.java @@ -0,0 +1,148 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.reactive.streams; + +import io.reactivex.Flowable; + +import org.apache.camel.Exchange; +import org.apache.camel.Header; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.component.reactive.streams.util.UnwrappingStreamProcessor; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Test; +import org.reactivestreams.Publisher; + +/** + * + */ +public class BeanCallTest extends CamelTestSupport { + + @Test + public void beanCallTest() throws Exception { + new RouteBuilder() { + @Override + public void configure() throws Exception { + + onException(Throwable.class).to("direct:handle").handled(true); + + from("direct:num") + .bean(BeanCallTest.this, "processBody") + .process(new UnwrappingStreamProcessor()).split().body() // Can be removed? + .to("mock:endpoint"); + + from("direct:handle") + .setBody().constant("ERR") + .to("mock:endpoint"); + + } + }.addRoutesToCamelContext(context); + + MockEndpoint mock = getMockEndpoint("mock:endpoint"); + mock.expectedMessageCount(1); + + context.start(); + + template.sendBody("direct:num", 1); + mock.assertIsSatisfied(); + + Exchange exchange = mock.getExchanges().get(0); + assertEquals("HelloBody 1", exchange.getIn().getBody()); + } + + @Test + public void beanCallWithErrorTest() throws Exception { + new RouteBuilder() { + @Override + public void configure() throws Exception { + + onException(Throwable.class).to("direct:handle").handled(true); + + from("direct:num") + .bean(BeanCallTest.this, "processBodyWrongType") + .process(new UnwrappingStreamProcessor()).split().body() // Can be removed? + .to("mock:endpoint"); + + from("direct:handle") + .setBody().constant("ERR") + .to("mock:endpoint"); + } + }.addRoutesToCamelContext(context); + + MockEndpoint mock = getMockEndpoint("mock:endpoint"); + mock.expectedMessageCount(1); + + context.start(); + + template.sendBody("direct:num", 1); + mock.assertIsSatisfied(); + + Exchange exchange = mock.getExchanges().get(0); + assertEquals("ERR", exchange.getIn().getBody()); + } + + @Test + public void beanCallHeaderMappingTest() throws Exception { + new RouteBuilder() { + @Override + public void configure() throws Exception { + + onException(Throwable.class).to("direct:handle").handled(true); + + from("direct:num") + .bean(BeanCallTest.this, "processHeader") + .process(new UnwrappingStreamProcessor()).split().body() // Can be removed? + .to("mock:endpoint"); + + from("direct:handle") + .setBody().constant("ERR") + .to("mock:endpoint"); + } + }.addRoutesToCamelContext(context); + + MockEndpoint mock = getMockEndpoint("mock:endpoint"); + mock.expectedMessageCount(1); + + context.start(); + + template.sendBodyAndHeader("direct:num", 1, "myheader", 2); + mock.assertIsSatisfied(); + + Exchange exchange = mock.getExchanges().get(0); + assertEquals("HelloHeader 2", exchange.getIn().getBody()); + } + + public Publisher<String> processBody(Publisher<Integer> data) { + return Flowable.fromPublisher(data) + .map(l -> "HelloBody " + l); + } + + public Publisher<String> processBodyWrongType(Publisher<BeanCallTest> data) { + return Flowable.fromPublisher(data) + .map(l -> "HelloBody " + l); + } + + public Publisher<String> processHeader(@Header("myheader") Publisher<Integer> data) { + return Flowable.fromPublisher(data) + .map(l -> "HelloHeader " + l); + } + + @Override + public boolean isUseRouteBuilder() { + return false; + } +}