Repository: camel Updated Branches: refs/heads/master cf6a7414b -> 96bbc91cf
CAMEL-11123: Rename Impl to Default which is the naming convention we typically use in camel. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/96bbc91c Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/96bbc91c Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/96bbc91c Branch: refs/heads/master Commit: 96bbc91cfc1abac48a659493354036cb6ccc01b2 Parents: cf6a741 Author: Claus Ibsen <davscl...@apache.org> Authored: Sun Apr 9 10:25:44 2017 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Sun Apr 9 10:25:44 2017 +0200 ---------------------------------------------------------------------- .../engine/CamelReactiveStreamsServiceImpl.java | 418 ------------------- .../DefaultCamelReactiveStreamsService.java | 418 +++++++++++++++++++ .../camel/reactive-streams/default-service | 2 +- .../streams/CamelReactiveStreamsTest.java | 6 +- 4 files changed, 422 insertions(+), 422 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/96bbc91c/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelReactiveStreamsServiceImpl.java ---------------------------------------------------------------------- diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelReactiveStreamsServiceImpl.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelReactiveStreamsServiceImpl.java deleted file mode 100644 index 3d23ce7..0000000 --- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelReactiveStreamsServiceImpl.java +++ /dev/null @@ -1,418 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.reactive.streams.engine; - -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; -import java.util.function.Function; -import javax.management.openmbean.CompositeData; -import javax.management.openmbean.CompositeDataSupport; -import javax.management.openmbean.CompositeType; -import javax.management.openmbean.OpenDataException; -import javax.management.openmbean.OpenType; -import javax.management.openmbean.SimpleType; -import javax.management.openmbean.TabularData; -import javax.management.openmbean.TabularDataSupport; -import javax.management.openmbean.TabularType; - -import org.apache.camel.CamelContext; -import org.apache.camel.Exchange; -import org.apache.camel.ExchangePattern; -import org.apache.camel.api.management.ManagedOperation; -import org.apache.camel.api.management.ManagedResource; -import org.apache.camel.builder.RouteBuilder; -import org.apache.camel.component.reactive.streams.ReactiveStreamsComponent; -import org.apache.camel.component.reactive.streams.ReactiveStreamsConsumer; -import org.apache.camel.component.reactive.streams.ReactiveStreamsProducer; -import org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService; -import org.apache.camel.component.reactive.streams.api.DispatchCallback; -import org.apache.camel.component.reactive.streams.util.ConvertingPublisher; -import org.apache.camel.component.reactive.streams.util.ConvertingSubscriber; -import org.apache.camel.component.reactive.streams.util.MonoPublisher; -import org.apache.camel.component.reactive.streams.util.UnwrapStreamProcessor; -import org.apache.camel.impl.DefaultExchange; -import org.apache.camel.spi.Synchronization; -import org.apache.camel.support.ServiceSupport; -import org.apache.camel.util.ObjectHelper; -import org.reactivestreams.Publisher; -import org.reactivestreams.Subscriber; - -/** - * The default implementation of the reactive streams service. - */ -@ManagedResource(description = "Managed CamelReactiveStreamsService") -public class CamelReactiveStreamsServiceImpl extends ServiceSupport implements CamelReactiveStreamsService { - - private CamelContext context; - - private ExecutorService workerPool; - - private final Map<String, CamelPublisher> publishers = new ConcurrentHashMap<>(); - - private final Map<String, CamelSubscriber> subscribers = new ConcurrentHashMap<>(); - - private final Map<String, String> publishedUriToStream = new ConcurrentHashMap<>(); - - private final Map<String, String> requestedUriToStream = new ConcurrentHashMap<>(); - - public CamelReactiveStreamsServiceImpl() { - } - - @Override - protected void doStart() throws Exception { - ReactiveStreamsComponent component = context.getComponent("reactive-streams", ReactiveStreamsComponent.class); - ReactiveStreamsEngineConfiguration config = component.getInternalEngineConfiguration(); - this.workerPool = context.getExecutorServiceManager().newThreadPool(this, config.getThreadPoolName(), config.getThreadPoolMinSize(), config.getThreadPoolMaxSize()); - } - - @Override - protected void doStop() throws Exception { - if (this.workerPool != null) { - context.getExecutorServiceManager().shutdownNow(this.workerPool); - this.workerPool = null; - } - } - - @Override - public Publisher<Exchange> fromStream(String name) { - return new UnwrappingPublisher<>(getPayloadPublisher(name)); - } - - @SuppressWarnings("unchecked") - public <T> Publisher<T> fromStream(String name, Class<T> cls) { - if (Exchange.class.equals(cls)) { - return (Publisher<T>) fromStream(name); - } - - return new ConvertingPublisher<T>(fromStream(name), cls); - } - - @Override - public CamelSubscriber streamSubscriber(String name) { - subscribers.computeIfAbsent(name, n -> new CamelSubscriber(name)); - return subscribers.get(name); - } - - @SuppressWarnings("unchecked") - public <T> Subscriber<T> streamSubscriber(String name, Class<T> type) { - if (Exchange.class.equals(type)) { - return (Subscriber<T>) streamSubscriber(name); - } - - return new ConvertingSubscriber<T>(streamSubscriber(name), getCamelContext()); - } - - @Override - public void sendCamelExchange(String name, Exchange exchange, DispatchCallback<Exchange> callback) { - StreamPayload<Exchange> payload = new StreamPayload<>(exchange, callback); - getPayloadPublisher(name).publish(payload); - } - - @Override - public Publisher<Exchange> toStream(String name, Object data) { - Exchange exchange = convertToExchange(data); - return doRequest(name, exchange); - } - - @Override - public Function<?, ? extends Publisher<Exchange>> toStream(String name) { - return data -> toStream(name, data); - } - - @Override - public <T> Publisher<T> toStream(String name, Object data, Class<T> type) { - return new ConvertingPublisher<>(toStream(name, data), type); - } - - protected Publisher<Exchange> doRequest(String name, Exchange data) { - ReactiveStreamsConsumer consumer = streamSubscriber(name).getConsumer(); - if (consumer == null) { - throw new IllegalStateException("No consumers attached to the stream " + name); - } - - DelayedMonoPublisher<Exchange> publisher = new DelayedMonoPublisher<>(this.workerPool); - - data.addOnCompletion(new Synchronization() { - @Override - public void onComplete(Exchange exchange) { - publisher.setData(exchange); - } - - @Override - public void onFailure(Exchange exchange) { - Throwable throwable = exchange.getException(); - if (throwable == null) { - throwable = new IllegalStateException("Unknown Exception"); - } - publisher.setException(throwable); - } - }); - - consumer.process(data, doneSync -> { - }); - - return publisher; - } - - @Override - public <T> Function<Object, Publisher<T>> toStream(String name, Class<T> type) { - return data -> toStream(name, data, type); - } - - private CamelPublisher getPayloadPublisher(String name) { - publishers.computeIfAbsent(name, n -> new CamelPublisher(this.workerPool, this.context, n)); - return publishers.get(name); - } - - @Override - public Publisher<Exchange> from(String uri) { - publishedUriToStream.computeIfAbsent(uri, u -> { - try { - String uuid = context.getUuidGenerator().generateUuid(); - new RouteBuilder() { - @Override - public void configure() throws Exception { - from(u) - .to("reactive-streams:" + uuid); - } - }.addRoutesToCamelContext(context); - - return uuid; - } catch (Exception e) { - throw new IllegalStateException("Unable to create source reactive stream from direct URI: " + uri, e); - } - }); - return fromStream(publishedUriToStream.get(uri)); - } - - @Override - public <T> Publisher<T> from(String uri, Class<T> type) { - return new ConvertingPublisher<T>(from(uri), type); - } - - @Override - public Subscriber<Exchange> subscriber(String uri) { - try { - String uuid = context.getUuidGenerator().generateUuid(); - new RouteBuilder() { - @Override - public void configure() throws Exception { - from("reactive-streams:" + uuid) - .to(uri); - } - }.addRoutesToCamelContext(context); - - return streamSubscriber(uuid); - } catch (Exception e) { - throw new IllegalStateException("Unable to create source reactive stream towards direct URI: " + uri, e); - } - } - - @Override - public <T> Subscriber<T> subscriber(String uri, Class<T> type) { - return new ConvertingSubscriber<T>(subscriber(uri), context); - } - - @Override - public Publisher<Exchange> to(String uri, Object data) { - requestedUriToStream.computeIfAbsent(uri, u -> { - try { - String uuid = context.getUuidGenerator().generateUuid(); - new RouteBuilder() { - @Override - public void configure() throws Exception { - from("reactive-streams:" + uuid) - .to(u); - } - }.addRoutesToCamelContext(context); - - return uuid; - } catch (Exception e) { - throw new IllegalStateException("Unable to create requested reactive stream from direct URI: " + uri, e); - } - }); - return toStream(requestedUriToStream.get(uri), data); - } - - @Override - public Function<Object, Publisher<Exchange>> to(String uri) { - return data -> to(uri, data); - } - - @Override - public <T> Publisher<T> to(String uri, Object data, Class<T> type) { - return new ConvertingPublisher<T>(to(uri, data), type); - } - - @Override - public <T> Function<Object, Publisher<T>> to(String uri, Class<T> type) { - return data -> to(uri, data, type); - } - - @Override - public void process(String uri, Function<? super Publisher<Exchange>, ?> processor) { - try { - new RouteBuilder() { - @Override - public void configure() throws Exception { - from(uri) - .process(exchange -> { - Exchange copy = exchange.copy(); - Object result = processor.apply(new MonoPublisher<>(copy)); - exchange.getIn().setBody(result); - }) - .process(new UnwrapStreamProcessor()); - } - }.addRoutesToCamelContext(context); - } catch (Exception e) { - throw new IllegalStateException("Unable to add reactive stream processor to the direct URI: " + uri, e); - } - } - - @Override - public <T> void process(String uri, Class<T> type, Function<? super Publisher<T>, ?> processor) { - process(uri, exPub -> processor.apply(new ConvertingPublisher<T>(exPub, type))); - } - - @Override - public CamelSubscriber attachCamelConsumer(String name, ReactiveStreamsConsumer consumer) { - CamelSubscriber subscriber = streamSubscriber(name); - subscriber.attachConsumer(consumer); - return subscriber; - } - - @Override - public void detachCamelConsumer(String name) { - streamSubscriber(name).detachConsumer(); - } - - @Override - public void attachCamelProducer(String name, ReactiveStreamsProducer producer) { - getPayloadPublisher(name).attachProducer(producer); - } - - @Override - public void detachCamelProducer(String name) { - getPayloadPublisher(name).detachProducer(); - } - - @Override - public void setCamelContext(CamelContext camelContext) { - this.context = camelContext; - } - - @Override - public CamelContext getCamelContext() { - return this.context; - } - - private Exchange convertToExchange(Object data) { - Exchange exchange; - if (data instanceof Exchange) { - exchange = (Exchange) data; - } else { - exchange = new DefaultExchange(context); - exchange.setPattern(ExchangePattern.InOut); - exchange.getIn().setBody(data); - } - - return exchange; - } - - @ManagedOperation(description = "Information about Camel Reactive subscribers") - public TabularData camelSubscribers() { - try { - final TabularData answer = new TabularDataSupport(subscriptionsTabularType()); - - subscribers.forEach((k, v) -> { - try { - String name = k; - long inflight = v.getInflightCount(); - long requested = v.getRequested(); - long bufferSize = v.getBufferSize(); - String backpressure = v.getBackpressureStrategy() != null ? v.getBackpressureStrategy().name() : ""; - - CompositeType ct = subscriptionsCompositeType(); - CompositeData data = new CompositeDataSupport(ct, - new String[] {"name", "inflight", "requested", "buffer size", "back pressure"}, - new Object[] {name, inflight, requested, bufferSize, backpressure}); - answer.put(data); - } catch (Exception e) { - throw ObjectHelper.wrapRuntimeCamelException(e); - } - }); - - return answer; - } catch (Exception e) { - throw ObjectHelper.wrapRuntimeCamelException(e); - } - } - - @ManagedOperation(description = "Information about Camel Reactive publishers") - public TabularData camelPublishers() { - try { - final TabularData answer = new TabularDataSupport(publishersTabularType()); - - publishers.forEach((k, v) -> { - try { - String name = k; - int subscribers = v.getSubscriptionSize(); - - // TODO: include more subscriber information, either as a nested table or flattern - - CompositeType ct = publishersCompositeType(); - CompositeData data = new CompositeDataSupport(ct, - new String[] {"name", "subscribers"}, - new Object[] {name, subscribers}); - answer.put(data); - } catch (Exception e) { - throw ObjectHelper.wrapRuntimeCamelException(e); - } - }); - - return answer; - } catch (Exception e) { - throw ObjectHelper.wrapRuntimeCamelException(e); - } - } - - private static CompositeType subscriptionsCompositeType() throws OpenDataException { - return new CompositeType("subscriptions", "Subscriptions", - new String[] {"name", "inflight", "requested", "buffer size", "back pressure"}, - new String[] {"Name", "Inflight", "Requested", "Buffer Size", "Back Pressure"}, - new OpenType[] {SimpleType.STRING, SimpleType.LONG, SimpleType.LONG, SimpleType.LONG, SimpleType.STRING}); - } - - private static TabularType subscriptionsTabularType() throws OpenDataException { - CompositeType ct = subscriptionsCompositeType(); - return new TabularType("subscriptions", "Information about Camel Reactive subscribers", ct, new String[]{"name"}); - } - - private static CompositeType publishersCompositeType() throws OpenDataException { - return new CompositeType("publishers", "Publishers", - new String[] {"name", "subscribers"}, - new String[] {"Name", "Subscribers"}, - new OpenType[] {SimpleType.STRING, SimpleType.INTEGER}); - } - - private static TabularType publishersTabularType() throws OpenDataException { - CompositeType ct = publishersCompositeType(); - return new TabularType("publishers", "Information about Camel Reactive publishers", ct, new String[]{"name"}); - } - -} http://git-wip-us.apache.org/repos/asf/camel/blob/96bbc91c/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/DefaultCamelReactiveStreamsService.java ---------------------------------------------------------------------- diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/DefaultCamelReactiveStreamsService.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/DefaultCamelReactiveStreamsService.java new file mode 100644 index 0000000..3d8f968 --- /dev/null +++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/DefaultCamelReactiveStreamsService.java @@ -0,0 +1,418 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.reactive.streams.engine; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.function.Function; +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.CompositeDataSupport; +import javax.management.openmbean.CompositeType; +import javax.management.openmbean.OpenDataException; +import javax.management.openmbean.OpenType; +import javax.management.openmbean.SimpleType; +import javax.management.openmbean.TabularData; +import javax.management.openmbean.TabularDataSupport; +import javax.management.openmbean.TabularType; + +import org.apache.camel.CamelContext; +import org.apache.camel.Exchange; +import org.apache.camel.ExchangePattern; +import org.apache.camel.api.management.ManagedOperation; +import org.apache.camel.api.management.ManagedResource; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.reactive.streams.ReactiveStreamsComponent; +import org.apache.camel.component.reactive.streams.ReactiveStreamsConsumer; +import org.apache.camel.component.reactive.streams.ReactiveStreamsProducer; +import org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService; +import org.apache.camel.component.reactive.streams.api.DispatchCallback; +import org.apache.camel.component.reactive.streams.util.ConvertingPublisher; +import org.apache.camel.component.reactive.streams.util.ConvertingSubscriber; +import org.apache.camel.component.reactive.streams.util.MonoPublisher; +import org.apache.camel.component.reactive.streams.util.UnwrapStreamProcessor; +import org.apache.camel.impl.DefaultExchange; +import org.apache.camel.spi.Synchronization; +import org.apache.camel.support.ServiceSupport; +import org.apache.camel.util.ObjectHelper; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; + +/** + * The default implementation of the reactive streams service. + */ +@ManagedResource(description = "Managed CamelReactiveStreamsService") +public class DefaultCamelReactiveStreamsService extends ServiceSupport implements CamelReactiveStreamsService { + + private CamelContext context; + + private ExecutorService workerPool; + + private final Map<String, CamelPublisher> publishers = new ConcurrentHashMap<>(); + + private final Map<String, CamelSubscriber> subscribers = new ConcurrentHashMap<>(); + + private final Map<String, String> publishedUriToStream = new ConcurrentHashMap<>(); + + private final Map<String, String> requestedUriToStream = new ConcurrentHashMap<>(); + + public DefaultCamelReactiveStreamsService() { + } + + @Override + protected void doStart() throws Exception { + ReactiveStreamsComponent component = context.getComponent("reactive-streams", ReactiveStreamsComponent.class); + ReactiveStreamsEngineConfiguration config = component.getInternalEngineConfiguration(); + this.workerPool = context.getExecutorServiceManager().newThreadPool(this, config.getThreadPoolName(), config.getThreadPoolMinSize(), config.getThreadPoolMaxSize()); + } + + @Override + protected void doStop() throws Exception { + if (this.workerPool != null) { + context.getExecutorServiceManager().shutdownNow(this.workerPool); + this.workerPool = null; + } + } + + @Override + public Publisher<Exchange> fromStream(String name) { + return new UnwrappingPublisher<>(getPayloadPublisher(name)); + } + + @SuppressWarnings("unchecked") + public <T> Publisher<T> fromStream(String name, Class<T> cls) { + if (Exchange.class.equals(cls)) { + return (Publisher<T>) fromStream(name); + } + + return new ConvertingPublisher<T>(fromStream(name), cls); + } + + @Override + public CamelSubscriber streamSubscriber(String name) { + subscribers.computeIfAbsent(name, n -> new CamelSubscriber(name)); + return subscribers.get(name); + } + + @SuppressWarnings("unchecked") + public <T> Subscriber<T> streamSubscriber(String name, Class<T> type) { + if (Exchange.class.equals(type)) { + return (Subscriber<T>) streamSubscriber(name); + } + + return new ConvertingSubscriber<T>(streamSubscriber(name), getCamelContext()); + } + + @Override + public void sendCamelExchange(String name, Exchange exchange, DispatchCallback<Exchange> callback) { + StreamPayload<Exchange> payload = new StreamPayload<>(exchange, callback); + getPayloadPublisher(name).publish(payload); + } + + @Override + public Publisher<Exchange> toStream(String name, Object data) { + Exchange exchange = convertToExchange(data); + return doRequest(name, exchange); + } + + @Override + public Function<?, ? extends Publisher<Exchange>> toStream(String name) { + return data -> toStream(name, data); + } + + @Override + public <T> Publisher<T> toStream(String name, Object data, Class<T> type) { + return new ConvertingPublisher<>(toStream(name, data), type); + } + + protected Publisher<Exchange> doRequest(String name, Exchange data) { + ReactiveStreamsConsumer consumer = streamSubscriber(name).getConsumer(); + if (consumer == null) { + throw new IllegalStateException("No consumers attached to the stream " + name); + } + + DelayedMonoPublisher<Exchange> publisher = new DelayedMonoPublisher<>(this.workerPool); + + data.addOnCompletion(new Synchronization() { + @Override + public void onComplete(Exchange exchange) { + publisher.setData(exchange); + } + + @Override + public void onFailure(Exchange exchange) { + Throwable throwable = exchange.getException(); + if (throwable == null) { + throwable = new IllegalStateException("Unknown Exception"); + } + publisher.setException(throwable); + } + }); + + consumer.process(data, doneSync -> { + }); + + return publisher; + } + + @Override + public <T> Function<Object, Publisher<T>> toStream(String name, Class<T> type) { + return data -> toStream(name, data, type); + } + + private CamelPublisher getPayloadPublisher(String name) { + publishers.computeIfAbsent(name, n -> new CamelPublisher(this.workerPool, this.context, n)); + return publishers.get(name); + } + + @Override + public Publisher<Exchange> from(String uri) { + publishedUriToStream.computeIfAbsent(uri, u -> { + try { + String uuid = context.getUuidGenerator().generateUuid(); + new RouteBuilder() { + @Override + public void configure() throws Exception { + from(u) + .to("reactive-streams:" + uuid); + } + }.addRoutesToCamelContext(context); + + return uuid; + } catch (Exception e) { + throw new IllegalStateException("Unable to create source reactive stream from direct URI: " + uri, e); + } + }); + return fromStream(publishedUriToStream.get(uri)); + } + + @Override + public <T> Publisher<T> from(String uri, Class<T> type) { + return new ConvertingPublisher<T>(from(uri), type); + } + + @Override + public Subscriber<Exchange> subscriber(String uri) { + try { + String uuid = context.getUuidGenerator().generateUuid(); + new RouteBuilder() { + @Override + public void configure() throws Exception { + from("reactive-streams:" + uuid) + .to(uri); + } + }.addRoutesToCamelContext(context); + + return streamSubscriber(uuid); + } catch (Exception e) { + throw new IllegalStateException("Unable to create source reactive stream towards direct URI: " + uri, e); + } + } + + @Override + public <T> Subscriber<T> subscriber(String uri, Class<T> type) { + return new ConvertingSubscriber<T>(subscriber(uri), context); + } + + @Override + public Publisher<Exchange> to(String uri, Object data) { + requestedUriToStream.computeIfAbsent(uri, u -> { + try { + String uuid = context.getUuidGenerator().generateUuid(); + new RouteBuilder() { + @Override + public void configure() throws Exception { + from("reactive-streams:" + uuid) + .to(u); + } + }.addRoutesToCamelContext(context); + + return uuid; + } catch (Exception e) { + throw new IllegalStateException("Unable to create requested reactive stream from direct URI: " + uri, e); + } + }); + return toStream(requestedUriToStream.get(uri), data); + } + + @Override + public Function<Object, Publisher<Exchange>> to(String uri) { + return data -> to(uri, data); + } + + @Override + public <T> Publisher<T> to(String uri, Object data, Class<T> type) { + return new ConvertingPublisher<T>(to(uri, data), type); + } + + @Override + public <T> Function<Object, Publisher<T>> to(String uri, Class<T> type) { + return data -> to(uri, data, type); + } + + @Override + public void process(String uri, Function<? super Publisher<Exchange>, ?> processor) { + try { + new RouteBuilder() { + @Override + public void configure() throws Exception { + from(uri) + .process(exchange -> { + Exchange copy = exchange.copy(); + Object result = processor.apply(new MonoPublisher<>(copy)); + exchange.getIn().setBody(result); + }) + .process(new UnwrapStreamProcessor()); + } + }.addRoutesToCamelContext(context); + } catch (Exception e) { + throw new IllegalStateException("Unable to add reactive stream processor to the direct URI: " + uri, e); + } + } + + @Override + public <T> void process(String uri, Class<T> type, Function<? super Publisher<T>, ?> processor) { + process(uri, exPub -> processor.apply(new ConvertingPublisher<T>(exPub, type))); + } + + @Override + public CamelSubscriber attachCamelConsumer(String name, ReactiveStreamsConsumer consumer) { + CamelSubscriber subscriber = streamSubscriber(name); + subscriber.attachConsumer(consumer); + return subscriber; + } + + @Override + public void detachCamelConsumer(String name) { + streamSubscriber(name).detachConsumer(); + } + + @Override + public void attachCamelProducer(String name, ReactiveStreamsProducer producer) { + getPayloadPublisher(name).attachProducer(producer); + } + + @Override + public void detachCamelProducer(String name) { + getPayloadPublisher(name).detachProducer(); + } + + @Override + public void setCamelContext(CamelContext camelContext) { + this.context = camelContext; + } + + @Override + public CamelContext getCamelContext() { + return this.context; + } + + private Exchange convertToExchange(Object data) { + Exchange exchange; + if (data instanceof Exchange) { + exchange = (Exchange) data; + } else { + exchange = new DefaultExchange(context); + exchange.setPattern(ExchangePattern.InOut); + exchange.getIn().setBody(data); + } + + return exchange; + } + + @ManagedOperation(description = "Information about Camel Reactive subscribers") + public TabularData camelSubscribers() { + try { + final TabularData answer = new TabularDataSupport(subscriptionsTabularType()); + + subscribers.forEach((k, v) -> { + try { + String name = k; + long inflight = v.getInflightCount(); + long requested = v.getRequested(); + long bufferSize = v.getBufferSize(); + String backpressure = v.getBackpressureStrategy() != null ? v.getBackpressureStrategy().name() : ""; + + CompositeType ct = subscriptionsCompositeType(); + CompositeData data = new CompositeDataSupport(ct, + new String[] {"name", "inflight", "requested", "buffer size", "back pressure"}, + new Object[] {name, inflight, requested, bufferSize, backpressure}); + answer.put(data); + } catch (Exception e) { + throw ObjectHelper.wrapRuntimeCamelException(e); + } + }); + + return answer; + } catch (Exception e) { + throw ObjectHelper.wrapRuntimeCamelException(e); + } + } + + @ManagedOperation(description = "Information about Camel Reactive publishers") + public TabularData camelPublishers() { + try { + final TabularData answer = new TabularDataSupport(publishersTabularType()); + + publishers.forEach((k, v) -> { + try { + String name = k; + int subscribers = v.getSubscriptionSize(); + + // TODO: include more subscriber information, either as a nested table or flattern + + CompositeType ct = publishersCompositeType(); + CompositeData data = new CompositeDataSupport(ct, + new String[] {"name", "subscribers"}, + new Object[] {name, subscribers}); + answer.put(data); + } catch (Exception e) { + throw ObjectHelper.wrapRuntimeCamelException(e); + } + }); + + return answer; + } catch (Exception e) { + throw ObjectHelper.wrapRuntimeCamelException(e); + } + } + + private static CompositeType subscriptionsCompositeType() throws OpenDataException { + return new CompositeType("subscriptions", "Subscriptions", + new String[] {"name", "inflight", "requested", "buffer size", "back pressure"}, + new String[] {"Name", "Inflight", "Requested", "Buffer Size", "Back Pressure"}, + new OpenType[] {SimpleType.STRING, SimpleType.LONG, SimpleType.LONG, SimpleType.LONG, SimpleType.STRING}); + } + + private static TabularType subscriptionsTabularType() throws OpenDataException { + CompositeType ct = subscriptionsCompositeType(); + return new TabularType("subscriptions", "Information about Camel Reactive subscribers", ct, new String[]{"name"}); + } + + private static CompositeType publishersCompositeType() throws OpenDataException { + return new CompositeType("publishers", "Publishers", + new String[] {"name", "subscribers"}, + new String[] {"Name", "Subscribers"}, + new OpenType[] {SimpleType.STRING, SimpleType.INTEGER}); + } + + private static TabularType publishersTabularType() throws OpenDataException { + CompositeType ct = publishersCompositeType(); + return new TabularType("publishers", "Information about Camel Reactive publishers", ct, new String[]{"name"}); + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/96bbc91c/components/camel-reactive-streams/src/main/resources/META-INF/services/org/apache/camel/reactive-streams/default-service ---------------------------------------------------------------------- diff --git a/components/camel-reactive-streams/src/main/resources/META-INF/services/org/apache/camel/reactive-streams/default-service b/components/camel-reactive-streams/src/main/resources/META-INF/services/org/apache/camel/reactive-streams/default-service index 2ce7448..008f4fc 100644 --- a/components/camel-reactive-streams/src/main/resources/META-INF/services/org/apache/camel/reactive-streams/default-service +++ b/components/camel-reactive-streams/src/main/resources/META-INF/services/org/apache/camel/reactive-streams/default-service @@ -15,4 +15,4 @@ # limitations under the License. # -class=org.apache.camel.component.reactive.streams.engine.CamelReactiveStreamsServiceImpl +class=org.apache.camel.component.reactive.streams.engine.DefaultCamelReactiveStreamsService http://git-wip-us.apache.org/repos/asf/camel/blob/96bbc91c/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/CamelReactiveStreamsTest.java ---------------------------------------------------------------------- diff --git a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/CamelReactiveStreamsTest.java b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/CamelReactiveStreamsTest.java index 149e317..d633f52 100644 --- a/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/CamelReactiveStreamsTest.java +++ b/components/camel-reactive-streams/src/test/java/org/apache/camel/component/reactive/streams/CamelReactiveStreamsTest.java @@ -18,7 +18,7 @@ package org.apache.camel.component.reactive.streams; import org.apache.camel.component.reactive.streams.api.CamelReactiveStreams; import org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService; -import org.apache.camel.component.reactive.streams.engine.CamelReactiveStreamsServiceImpl; +import org.apache.camel.component.reactive.streams.engine.DefaultCamelReactiveStreamsService; import org.apache.camel.component.reactive.streams.support.ReactiveStreamsTestService; import org.apache.camel.impl.JndiRegistry; import org.apache.camel.test.junit4.CamelTestSupport; @@ -30,14 +30,14 @@ public class CamelReactiveStreamsTest extends CamelTestSupport { @Test public void testDefaultService() { CamelReactiveStreamsService service1 = CamelReactiveStreams.get(context, "default-service"); - assertTrue(service1 instanceof CamelReactiveStreamsServiceImpl); + assertTrue(service1 instanceof DefaultCamelReactiveStreamsService); } @Test public void testSameDefaultServiceReturned() { CamelReactiveStreamsService service1 = CamelReactiveStreams.get(context, "default-service"); CamelReactiveStreamsService service2 = CamelReactiveStreams.get(context, "default-service"); - assertTrue(service1 instanceof CamelReactiveStreamsServiceImpl); + assertTrue(service1 instanceof DefaultCamelReactiveStreamsService); assertEquals(service1, service2); }