Repository: camel Updated Branches: refs/heads/master 0f9b93b03 -> 2648a301f
CAMEL-10612: added a bunch of client API Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/2648a301 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/2648a301 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/2648a301 Branch: refs/heads/master Commit: 2648a301fee1d0a03f42fb97ce91eef2a310641e Parents: a7e838d Author: Nicola Ferraro <ni.ferr...@gmail.com> Authored: Fri Feb 3 13:46:36 2017 +0100 Committer: Nicola Ferraro <ni.ferr...@gmail.com> Committed: Fri Feb 3 13:46:47 2017 +0100 ---------------------------------------------------------------------- .../api/CamelReactiveStreamsService.java | 105 +++++++++ .../engine/CamelReactiveStreamsServiceImpl.java | 137 +++++++++--- .../streams/util/ConvertingPublisher.java | 6 +- .../streams/util/UnwrapStreamProcessor.java | 13 +- .../reactive/streams/BeanCallTest.java | 4 +- .../reactive/streams/DirectClientAPITest.java | 212 +++++++++++++++++++ .../support/ReactiveStreamsTestService.java | 40 ++++ .../support/ReactiveStreamsTestSupport.java | 38 ++++ 8 files changed, 526 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/2648a301/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreamsService.java ---------------------------------------------------------------------- diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreamsService.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreamsService.java index 03ca6a0..6b639cf 100644 --- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreamsService.java +++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreamsService.java @@ -126,6 +126,111 @@ public interface CamelReactiveStreamsService extends CamelContextAware, Service <T> Function<Object, Publisher<T>> request(String name, Class<T> type); /* + * Direct client API methods + */ + + /** + * Creates a new stream from the endpoint URI (used as Camel Consumer) and returns + * the associated {@code Publisher}. + * + * If a stream has already been created, the existing {@link Publisher} is returned. + * + * @param uri the consumer uri + * @return the publisher associated to the uri + */ + Publisher<Exchange> publishURI(String uri); + + /** + * Creates a new stream of the given type from the endpoint URI (used as Camel Consumer) and returns + * the associated {@code Publisher}. + * + * If a stream has already been created, the existing {@link Publisher} is returned. + * + * @param uri the consumer uri + * @param type the type of items emitted by the publisher + * @param <T> the type to which Camel should convert exchanges to + * @return the publisher associated to the uri + */ + <T> Publisher<T> publishURI(String uri, Class<T> type); + + /** + * Creates a new route that uses the endpoint URI as producer, pushes the given data to the route + * and returns a {@code Publisher} that will eventually return the resulting exchange or an error. + * + * @param uri the producer uri + * @param data the data to push + * @return a publisher with the resulting exchange + */ + Publisher<Exchange> requestURI(String uri, Object data); + + /** + * Creates a new route that uses the endpoint URI as producer, and returns a + * function that pushes the data into the route and returns the + * {@code Publisher} that holds the resulting exchange or the error. + * + * + * This is a curryied version of {@link CamelReactiveStreamsService#requestURI(String, Object)}. + * + * @param uri the producer uri + * @return a function that returns a publisher with the resulting exchange + */ + Function<?, ? extends Publisher<Exchange>> requestURI(String uri); + + /** + * Creates a new route that uses the endpoint URI as producer, pushes the given data to the route + * and returns a {@code Publisher} that will eventually return the exchange output or an error. + * + * @param uri the producer uri + * @param data the data to push + * @param type the type to which the output should be converted + * @param <T> the generic type of the resulting Publisher + * @return a publisher with the resulting data + */ + <T> Publisher<T> requestURI(String uri, Object data, Class<T> type); + + /** + * Creates a new route that uses the endpoint URI as producer, and returns a + * function that pushes the data into the route and returns the + * {@code Publisher} that holds the exchange output or an error. + * + * This is a curryied version of {@link CamelReactiveStreamsService#requestURI(String, Object, Class)}. + * + * @param uri the producer uri + * @param type the type to which the output should be converted + * @param <T> the generic type of the resulting Publisher + * @return a function that returns a publisher with the resulting data + */ + <T> Function<Object, Publisher<T>> requestURI(String uri, Class<T> type); + + /** + * Adds a processing step at the specified endpoint uri (usually a "direct:name") that delegates + * to the given reactive processor. + * + * The processor receives a {@link Publisher} of exchanges and returns an object. + * If the output of the processor is a {@link Publisher}, it will be unwrapped before + * delivering the result to the source route. + * + * @param uri the uri where the processor should be attached + * @param processor the reactive processor + */ + void processFromURI(String uri, Function<? super Publisher<Exchange>, ?> processor); + + /** + * Adds a processing step at the specified endpoint uri (usually a "direct:name") that delegates + * to the given reactive processor. + * + * The processor receives a {@link Publisher} of items of the given type and returns an object. + * If the output of the processor is a {@link Publisher}, it will be unwrapped before + * delivering the result to the source route. + * + * @param uri the uri where the processor should be attached + * @param type the type to which the body of the exchange should be converted + * @param <T> the generic type of the Publisher that should be processed + * @param processor the reactive processor + */ + <T> void processFromURI(String uri, Class<T> type, Function<? super Publisher<T>, ?> processor); + + /* * Methods for Camel producers. */ http://git-wip-us.apache.org/repos/asf/camel/blob/2648a301/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelReactiveStreamsServiceImpl.java ---------------------------------------------------------------------- diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelReactiveStreamsServiceImpl.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelReactiveStreamsServiceImpl.java index 4b67ce0..f5f5f4e 100644 --- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelReactiveStreamsServiceImpl.java +++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelReactiveStreamsServiceImpl.java @@ -16,14 +16,15 @@ */ package org.apache.camel.component.reactive.streams.engine; -import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.function.Function; import org.apache.camel.CamelContext; import org.apache.camel.Exchange; import org.apache.camel.ExchangePattern; +import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.reactive.streams.ReactiveStreamsComponent; import org.apache.camel.component.reactive.streams.ReactiveStreamsConsumer; import org.apache.camel.component.reactive.streams.ReactiveStreamsProducer; @@ -31,6 +32,8 @@ import org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsServi import org.apache.camel.component.reactive.streams.api.DispatchCallback; import org.apache.camel.component.reactive.streams.util.ConvertingPublisher; import org.apache.camel.component.reactive.streams.util.ConvertingSubscriber; +import org.apache.camel.component.reactive.streams.util.MonoPublisher; +import org.apache.camel.component.reactive.streams.util.UnwrapStreamProcessor; import org.apache.camel.impl.DefaultExchange; import org.apache.camel.spi.Synchronization; import org.reactivestreams.Publisher; @@ -45,9 +48,13 @@ public class CamelReactiveStreamsServiceImpl implements CamelReactiveStreamsServ private ExecutorService workerPool; - private Map<String, CamelPublisher> publishers = new HashMap<>(); + private final Map<String, CamelPublisher> publishers = new ConcurrentHashMap<>(); - private final Map<String, CamelSubscriber> subscribers = new HashMap<>(); + private final Map<String, CamelSubscriber> subscribers = new ConcurrentHashMap<>(); + + private final Map<String, String> publishedUriToStream = new ConcurrentHashMap<>(); + + private final Map<String, String> requestedUriToStream = new ConcurrentHashMap<>(); public CamelReactiveStreamsServiceImpl() { } @@ -82,13 +89,8 @@ public class CamelReactiveStreamsServiceImpl implements CamelReactiveStreamsServ @Override public CamelSubscriber getSubscriber(String name) { - synchronized (this) { - if (!subscribers.containsKey(name)) { - CamelSubscriber sub = new CamelSubscriber(name); - subscribers.put(name, sub); - } - return subscribers.get(name); - } + subscribers.computeIfAbsent(name, n -> new CamelSubscriber(name)); + return subscribers.get(name); } @SuppressWarnings("unchecked") @@ -108,15 +110,7 @@ public class CamelReactiveStreamsServiceImpl implements CamelReactiveStreamsServ @Override public Publisher<Exchange> request(String name, Object data) { - Exchange exchange; - if (data instanceof Exchange) { - exchange = (Exchange) data; - } else { - exchange = new DefaultExchange(context); - exchange.setPattern(ExchangePattern.InOut); - exchange.getIn().setBody(data); - } - + Exchange exchange = convertToExchange(data); return doRequest(name, exchange); } @@ -166,17 +160,99 @@ public class CamelReactiveStreamsServiceImpl implements CamelReactiveStreamsServ } private CamelPublisher getPayloadPublisher(String name) { - synchronized (this) { - if (!publishers.containsKey(name)) { - CamelPublisher publisher = new CamelPublisher(this.workerPool, this.context, name); - publishers.put(name, publisher); + publishers.computeIfAbsent(name, n -> new CamelPublisher(this.workerPool, this.context, n)); + return publishers.get(name); + } + + @Override + public Publisher<Exchange> publishURI(String uri) { + publishedUriToStream.computeIfAbsent(uri, u -> { + try { + String uuid = context.getUuidGenerator().generateUuid(); + new RouteBuilder() { + @Override + public void configure() throws Exception { + from(u) + .to("reactive-streams:" + uuid); + } + }.addRoutesToCamelContext(context); + + return uuid; + } catch (Exception e) { + throw new IllegalStateException("Unable to create source reactive stream from direct URI: " + uri, e); } + }); + return getPublisher(publishedUriToStream.get(uri)); + } + + @Override + public <T> Publisher<T> publishURI(String uri, Class<T> type) { + return new ConvertingPublisher<T>(publishURI(uri), type); + } + + @Override + public Publisher<Exchange> requestURI(String uri, Object data) { + requestedUriToStream.computeIfAbsent(uri, u -> { + try { + String uuid = context.getUuidGenerator().generateUuid(); + new RouteBuilder() { + @Override + public void configure() throws Exception { + from("reactive-streams:" + uuid) + .to(u); + } + }.addRoutesToCamelContext(context); + + return uuid; + } catch (Exception e) { + throw new IllegalStateException("Unable to create requested reactive stream from direct URI: " + uri, e); + } + }); + return request(requestedUriToStream.get(uri), data); + } - return publishers.get(name); + @Override + public Function<?, ? extends Publisher<Exchange>> requestURI(String uri) { + return data -> requestURI(uri, data); + } + + @Override + public <T> Publisher<T> requestURI(String uri, Object data, Class<T> type) { + return new ConvertingPublisher<T>(requestURI(uri, data), type); + } + + @Override + public <T> Function<Object, Publisher<T>> requestURI(String uri, Class<T> type) { + return data -> requestURI(uri, data, type); + } + + + @Override + public void processFromURI(String uri, Function<? super Publisher<Exchange>, ?> processor) { + try { + new RouteBuilder() { + @Override + public void configure() throws Exception { + from(uri) + .process(exchange -> { + Exchange copy = exchange.copy(); + Object result = processor.apply(new MonoPublisher<>(copy)); + exchange.getIn().setBody(result); + }) + .process(new UnwrapStreamProcessor()); + } + }.addRoutesToCamelContext(context); + } catch (Exception e) { + throw new IllegalStateException("Unable to add reactive stream processor to the direct URI: " + uri, e); } } @Override + public <T> void processFromURI(String uri, Class<T> type, Function<? super Publisher<T>, ?> processor) { + processFromURI(uri, exPub -> processor.apply(new ConvertingPublisher<T>(exPub, type))); + } + + @Override public void attachCamelConsumer(String name, ReactiveStreamsConsumer consumer) { getSubscriber(name).attachConsumer(consumer); } @@ -206,4 +282,17 @@ public class CamelReactiveStreamsServiceImpl implements CamelReactiveStreamsServ return this.context; } + private Exchange convertToExchange(Object data) { + Exchange exchange; + if (data instanceof Exchange) { + exchange = (Exchange) data; + } else { + exchange = new DefaultExchange(context); + exchange.setPattern(ExchangePattern.InOut); + exchange.getIn().setBody(data); + } + + return exchange; + } + } http://git-wip-us.apache.org/repos/asf/camel/blob/2648a301/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/ConvertingPublisher.java ---------------------------------------------------------------------- diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/ConvertingPublisher.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/ConvertingPublisher.java index 12ed7df..44f7e8c 100644 --- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/ConvertingPublisher.java +++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/ConvertingPublisher.java @@ -76,7 +76,11 @@ public class ConvertingPublisher<R> implements Publisher<R> { R r; try { - r = ex.getIn().getBody(type); + if (ex.hasOut()) { + r = ex.getOut().getBody(type); + } else { + r = ex.getIn().getBody(type); + } } catch (TypeConversionException e) { LOG.warn("Unable to convert body to the specified type: " + type.getName(), e); r = null; http://git-wip-us.apache.org/repos/asf/camel/blob/2648a301/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/UnwrapStreamProcessor.java ---------------------------------------------------------------------- diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/UnwrapStreamProcessor.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/UnwrapStreamProcessor.java index 3fb1a8a..c5bb03f 100644 --- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/UnwrapStreamProcessor.java +++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/util/UnwrapStreamProcessor.java @@ -74,7 +74,18 @@ public class UnwrapStreamProcessor implements AsyncProcessor { } else { body = data; } - exchange.getIn().setBody(body); + + if (body instanceof Exchange && !exchange.equals(body)) { + // copy into the original Exchange + Exchange copy = (Exchange) body; + exchange.setException(copy.getException()); + exchange.setIn(copy.getIn()); + exchange.setOut(copy.getOut()); + exchange.getProperties().clear(); + exchange.getProperties().putAll(copy.getProperties()); + } else { + exchange.getIn().setBody(body); + } } }); http://git-wip-us.apache.org/repos/asf/camel/blob/2648a301/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/BeanCallTest.java ---------------------------------------------------------------------- diff --git a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/BeanCallTest.java b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/BeanCallTest.java index 1b97382..3e714b5 100644 --- a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/BeanCallTest.java +++ b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/BeanCallTest.java @@ -31,9 +31,7 @@ import org.apache.camel.test.junit4.CamelTestSupport; import org.junit.Test; import org.reactivestreams.Publisher; -/** - * - */ + public class BeanCallTest extends CamelTestSupport { @Test http://git-wip-us.apache.org/repos/asf/camel/blob/2648a301/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/DirectClientAPITest.java ---------------------------------------------------------------------- diff --git a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/DirectClientAPITest.java b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/DirectClientAPITest.java new file mode 100644 index 0000000..cd62bcf --- /dev/null +++ b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/DirectClientAPITest.java @@ -0,0 +1,212 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.reactive.streams; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.TimeUnit; + +import io.reactivex.Flowable; + +import org.apache.camel.Exchange; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.component.reactive.streams.support.ReactiveStreamsTestSupport; +import org.apache.camel.impl.JndiRegistry; +import org.junit.Test; +import org.reactivestreams.Publisher; + + +public class DirectClientAPITest extends ReactiveStreamsTestSupport { + + @Test + public void testFromDirect() throws Exception { + + Publisher<Integer> data = camel.publishURI("direct:endpoint", Integer.class); + + BlockingQueue<Integer> queue = new LinkedBlockingDeque<>(); + + Flowable.fromPublisher(data) + .map(i -> -i) + .doOnNext(queue::add) + .subscribe(); + + context.start(); + template.sendBody("direct:endpoint", 1); + + Integer res = queue.poll(1, TimeUnit.SECONDS); + assertNotNull(res); + assertEquals(-1, res.intValue()); + } + + @Test + public void testFromDirectOnHotContext() throws Exception { + + context.start(); + Thread.sleep(200); + + Publisher<Integer> data = camel.publishURI("direct:endpoint", Integer.class); + + BlockingQueue<Integer> queue = new LinkedBlockingDeque<>(); + + Flowable.fromPublisher(data) + .map(i -> -i) + .doOnNext(queue::add) + .subscribe(); + + template.sendBody("direct:endpoint", 1); + + Integer res = queue.poll(1, TimeUnit.SECONDS); + assertNotNull(res); + assertEquals(-1, res.intValue()); + } + + @Test + public void testDirectCall() throws Exception { + context.start(); + + BlockingQueue<String> queue = new LinkedBlockingDeque<>(); + + Flowable.just(1, 2, 3) + .flatMap(camel.requestURI("bean:hello", String.class)::apply) + .doOnNext(queue::add) + .subscribe(); + + for (int i = 1; i <= 3; i++) { + String res = queue.poll(1, TimeUnit.SECONDS); + assertEquals("Hello " + i, res); + } + + } + + @Test + public void testProxiedDirectCall() throws Exception { + context.start(); + + new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:proxy") + .to("bean:hello") + .setBody().simple("proxy to ${body}"); + } + }.addRoutesToCamelContext(context); + + BlockingQueue<String> queue = new LinkedBlockingDeque<>(); + + Flowable.just(1, 2, 3) + .flatMap(camel.requestURI("direct:proxy", String.class)::apply) + .doOnNext(queue::add) + .subscribe(); + + for (int i = 1; i <= 3; i++) { + String res = queue.poll(1, TimeUnit.SECONDS); + assertEquals("proxy to Hello " + i, res); + } + + } + + @Test + public void testDirectCallFromCamel() throws Exception { + + new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:source") + .to("direct:stream") + .setBody().simple("after stream: ${body}") + .to("mock:dest"); + } + }.addRoutesToCamelContext(context); + + context.start(); + + camel.processFromURI("direct:stream", p -> + Flowable.fromPublisher(p) + .map(exchange -> { + int val = exchange.getIn().getBody(Integer.class); + exchange.getOut().setBody(-val); + return exchange; + }) + ); + + for (int i = 1; i <= 3; i++) { + template.sendBody("direct:source", i); + } + + MockEndpoint mock = getMockEndpoint("mock:dest"); + mock.expectedMessageCount(3); + mock.assertIsSatisfied(); + + int id = 1; + for (Exchange ex : mock.getExchanges()) { + String content = ex.getIn().getBody(String.class); + assertEquals("after stream: " + (-id++), content); + } + } + + @Test + public void testDirectCallFromCamelWithConversion() throws Exception { + + new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:source") + .to("direct:stream") + .setBody().simple("after stream: ${body}") + .to("mock:dest"); + } + }.addRoutesToCamelContext(context); + + context.start(); + + camel.processFromURI("direct:stream", Integer.class, p -> + Flowable.fromPublisher(p) + .map(i -> -i) + ); + + for (int i = 1; i <= 3; i++) { + template.sendBody("direct:source", i); + } + + MockEndpoint mock = getMockEndpoint("mock:dest"); + mock.expectedMessageCount(3); + mock.assertIsSatisfied(); + + int id = 1; + for (Exchange ex : mock.getExchanges()) { + String content = ex.getIn().getBody(String.class); + assertEquals("after stream: " + (-id++), content); + } + } + + @Override + protected JndiRegistry createRegistry() throws Exception { + JndiRegistry registry = super.createRegistry(); + registry.bind("hello", new SampleBean()); + return registry; + } + + public static class SampleBean { + + public String hello(String name) { + return "Hello " + name; + } + + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/2648a301/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/support/ReactiveStreamsTestService.java ---------------------------------------------------------------------- diff --git a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/support/ReactiveStreamsTestService.java b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/support/ReactiveStreamsTestService.java index 6ab9c5e..186a9b5 100644 --- a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/support/ReactiveStreamsTestService.java +++ b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/support/ReactiveStreamsTestService.java @@ -126,6 +126,46 @@ public class ReactiveStreamsTestService implements CamelReactiveStreamsService { return null; } + @Override + public Publisher<Exchange> publishURI(String uri) { + return null; + } + + @Override + public <T> Publisher<T> publishURI(String uri, Class<T> type) { + return null; + } + + @Override + public Publisher<Exchange> requestURI(String uri, Object data) { + return null; + } + + @Override + public Function<?, ? extends Publisher<Exchange>> requestURI(String uri) { + return null; + } + + @Override + public <T> Publisher<T> requestURI(String uri, Object data, Class<T> type) { + return null; + } + + @Override + public <T> Function<Object, Publisher<T>> requestURI(String uri, Class<T> type) { + return null; + } + + @Override + public void processFromURI(String uri, Function<? super Publisher<Exchange>, ?> processor) { + + } + + @Override + public <T> void processFromURI(String uri, Class<T> type, Function<? super Publisher<T>, ?> processor) { + + } + public String getName() { return name; } http://git-wip-us.apache.org/repos/asf/camel/blob/2648a301/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/support/ReactiveStreamsTestSupport.java ---------------------------------------------------------------------- diff --git a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/support/ReactiveStreamsTestSupport.java b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/support/ReactiveStreamsTestSupport.java new file mode 100644 index 0000000..0fb4b6e --- /dev/null +++ b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/support/ReactiveStreamsTestSupport.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.reactive.streams.support; + +import org.apache.camel.component.reactive.streams.api.CamelReactiveStreams; +import org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Before; + +public class ReactiveStreamsTestSupport extends CamelTestSupport { + + protected CamelReactiveStreamsService camel; + + @Before + public void initReactiveStreamService() { + this.camel = CamelReactiveStreams.get(context); + } + + @Override + public boolean isUseRouteBuilder() { + return false; + } + +}