Repository: camel Updated Branches: refs/heads/master db7cd8e1d -> 028e810bb
CAMEL-11122: camel-reactive-streams - Add more JMX information Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/028e810b Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/028e810b Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/028e810b Branch: refs/heads/master Commit: 028e810bbc9656e6c8d8ba7b29281efffa6a5de5 Parents: bfb151b Author: Claus Ibsen <davscl...@apache.org> Authored: Sat Apr 8 15:28:32 2017 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Sat Apr 8 15:41:40 2017 +0200 ---------------------------------------------------------------------- .../streams/ReactiveStreamsConsumer.java | 24 +---- .../api/CamelReactiveStreamsService.java | 1 - .../reactive/streams/engine/CamelPublisher.java | 6 +- .../engine/CamelReactiveStreamsServiceImpl.java | 101 ++++++++++++++++++- .../streams/engine/CamelSubscriber.java | 9 ++ .../streams/engine/CamelSubscription.java | 4 + 6 files changed, 116 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/028e810b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsConsumer.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsConsumer.java index 8661200..3724585 100644 --- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsConsumer.java +++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsConsumer.java @@ -21,11 +21,8 @@ import java.util.concurrent.ExecutorService; import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; import org.apache.camel.Processor; -import org.apache.camel.api.management.ManagedAttribute; -import org.apache.camel.api.management.ManagedResource; import org.apache.camel.component.reactive.streams.api.CamelReactiveStreams; import org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService; -import org.apache.camel.component.reactive.streams.engine.CamelSubscriber; import org.apache.camel.impl.DefaultConsumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,7 +30,6 @@ import org.slf4j.LoggerFactory; /** * The Camel reactive-streams consumer. */ -@ManagedResource(description = "Managed ReactiveStreamsConsumer") public class ReactiveStreamsConsumer extends DefaultConsumer { private static final Logger LOG = LoggerFactory.getLogger(ReactiveStreamsConsumer.class); @@ -44,8 +40,6 @@ public class ReactiveStreamsConsumer extends DefaultConsumer { private CamelReactiveStreamsService service; - private volatile CamelSubscriber subscriber; - public ReactiveStreamsConsumer(ReactiveStreamsEndpoint endpoint, Processor processor) { super(endpoint, processor); this.endpoint = endpoint; @@ -62,14 +56,13 @@ public class ReactiveStreamsConsumer extends DefaultConsumer { executor = getEndpoint().getCamelContext().getExecutorServiceManager().newFixedThreadPool(this, getEndpoint().getEndpointUri(), poolSize); } - this.subscriber = this.service.attachCamelConsumer(endpoint.getStream(), this); + this.service.attachCamelConsumer(endpoint.getStream(), this); } @Override protected void doStop() throws Exception { super.doStop(); this.service.detachCamelConsumer(endpoint.getStream()); - this.subscriber = null; if (executor != null) { endpoint.getCamelContext().getExecutorServiceManager().shutdownNow(executor); @@ -128,19 +121,4 @@ public class ReactiveStreamsConsumer extends DefaultConsumer { return endpoint; } - @ManagedAttribute(description = "Number of inflight messages") - public long getInflightCount() { - return subscriber != null ? subscriber.getInflightCount() : 0; - } - - @ManagedAttribute(description = "Number of messages to be requested on next request") - public long getToBeRequested() { - return subscriber != null ? subscriber.getRequested() : 0; - } - - @ManagedAttribute(description = "Number of pending messages in the buffer") - public long getBufferSize() { - return subscriber != null ? subscriber.getBufferSize() : 0; - } - } http://git-wip-us.apache.org/repos/asf/camel/blob/028e810b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreamsService.java ---------------------------------------------------------------------- diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreamsService.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreamsService.java index bdd316e..91b16d4 100644 --- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreamsService.java +++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreamsService.java @@ -20,7 +20,6 @@ import java.util.function.Function; import org.apache.camel.CamelContextAware; import org.apache.camel.Exchange; -import org.apache.camel.Service; import org.apache.camel.StaticService; import org.apache.camel.component.reactive.streams.ReactiveStreamsConsumer; import org.apache.camel.component.reactive.streams.ReactiveStreamsProducer; http://git-wip-us.apache.org/repos/asf/camel/blob/028e810b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelPublisher.java ---------------------------------------------------------------------- diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelPublisher.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelPublisher.java index 5cafcd2..f90f19c 100644 --- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelPublisher.java +++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelPublisher.java @@ -78,7 +78,7 @@ public class CamelPublisher implements Publisher<StreamPayload<Exchange>>, AutoC DispatchCallback<Exchange> originalCallback = data.getCallback(); if (originalCallback != null && subs.size() > 0) { // When multiple subscribers have an active subscription, - // we aknowledge the exchange once it has been delivered to every + // we acknowledge the exchange once it has been delivered to every // subscriber (or their subscription is cancelled) AtomicInteger counter = new AtomicInteger(subs.size()); // Use just the first exception in the callback when multiple exceptions are thrown @@ -131,4 +131,8 @@ public class CamelPublisher implements Publisher<StreamPayload<Exchange>>, AutoC } subscriptions.clear(); } + + public int getSubscriptionSize() { + return subscriptions.size(); + } } http://git-wip-us.apache.org/repos/asf/camel/blob/028e810b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelReactiveStreamsServiceImpl.java ---------------------------------------------------------------------- diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelReactiveStreamsServiceImpl.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelReactiveStreamsServiceImpl.java index 1abbd94..c3ccc42 100644 --- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelReactiveStreamsServiceImpl.java +++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelReactiveStreamsServiceImpl.java @@ -20,10 +20,21 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.function.Function; +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.CompositeDataSupport; +import javax.management.openmbean.CompositeType; +import javax.management.openmbean.OpenDataException; +import javax.management.openmbean.OpenType; +import javax.management.openmbean.SimpleType; +import javax.management.openmbean.TabularData; +import javax.management.openmbean.TabularDataSupport; +import javax.management.openmbean.TabularType; import org.apache.camel.CamelContext; import org.apache.camel.Exchange; import org.apache.camel.ExchangePattern; +import org.apache.camel.api.management.ManagedOperation; +import org.apache.camel.api.management.ManagedResource; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.reactive.streams.ReactiveStreamsComponent; import org.apache.camel.component.reactive.streams.ReactiveStreamsConsumer; @@ -36,13 +47,16 @@ import org.apache.camel.component.reactive.streams.util.MonoPublisher; import org.apache.camel.component.reactive.streams.util.UnwrapStreamProcessor; import org.apache.camel.impl.DefaultExchange; import org.apache.camel.spi.Synchronization; +import org.apache.camel.support.ServiceSupport; +import org.apache.camel.util.ObjectHelper; import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; /** * The default implementation of the reactive streams service. */ -public class CamelReactiveStreamsServiceImpl implements CamelReactiveStreamsService { +@ManagedResource(description = "Managed CamelReactiveStreamsService") +public class CamelReactiveStreamsServiceImpl extends ServiceSupport implements CamelReactiveStreamsService { private CamelContext context; @@ -60,16 +74,17 @@ public class CamelReactiveStreamsServiceImpl implements CamelReactiveStreamsServ } @Override - public void start() throws Exception { + protected void doStart() throws Exception { ReactiveStreamsComponent component = context.getComponent("reactive-streams", ReactiveStreamsComponent.class); ReactiveStreamsEngineConfiguration config = component.getInternalEngineConfiguration(); this.workerPool = context.getExecutorServiceManager().newThreadPool(this, config.getThreadPoolName(), config.getThreadPoolMinSize(), config.getThreadPoolMaxSize()); } @Override - public void stop() throws Exception { + protected void doStop() throws Exception { if (this.workerPool != null) { context.getExecutorServiceManager().shutdownNow(this.workerPool); + this.workerPool = null; } } @@ -249,7 +264,6 @@ public class CamelReactiveStreamsServiceImpl implements CamelReactiveStreamsServ return data -> to(uri, data, type); } - @Override public void process(String uri, Function<? super Publisher<Exchange>, ?> processor) { try { @@ -320,4 +334,83 @@ public class CamelReactiveStreamsServiceImpl implements CamelReactiveStreamsServ return exchange; } + @ManagedOperation(description = "Information about Camel Reactive subscribers") + public TabularData camelSubscribers() { + try { + final TabularData answer = new TabularDataSupport(subscriptionsTabularType()); + + subscribers.forEach((k, v) -> { + try { + String name = k; + long inflight = v.getInflightCount(); + long requested = v.getRequested(); + long bufferSize = v.getBufferSize(); + String backpressure = v.getBackpressureStrategy() != null ? v.getBackpressureStrategy().name() : ""; + + CompositeType ct = subscriptionsCompositeType(); + CompositeData data = new CompositeDataSupport(ct, + new String[] {"name", "inflight", "requested", "buffer size", "back pressure"}, + new Object[] {name, inflight, requested, bufferSize, backpressure}); + answer.put(data); + } catch (Exception e) { + throw ObjectHelper.wrapRuntimeCamelException(e); + } + }); + + return answer; + } catch (Exception e) { + throw ObjectHelper.wrapRuntimeCamelException(e); + } + } + + @ManagedOperation(description = "Information about Camel Reactive publishers") + public TabularData camelPublishers() { + try { + final TabularData answer = new TabularDataSupport(publishersTabularType()); + + publishers.forEach((k, v) -> { + try { + String name = k; + int subscribers = v.getSubscriptionSize(); + + CompositeType ct = publishersCompositeType(); + CompositeData data = new CompositeDataSupport(ct, + new String[] {"name", "subscribers"}, + new Object[] {name, subscribers}); + answer.put(data); + } catch (Exception e) { + throw ObjectHelper.wrapRuntimeCamelException(e); + } + }); + + return answer; + } catch (Exception e) { + throw ObjectHelper.wrapRuntimeCamelException(e); + } + } + + private static CompositeType subscriptionsCompositeType() throws OpenDataException { + return new CompositeType("subscriptions", "Subscriptions", + new String[] {"name", "inflight", "requested", "buffer size", "back pressure"}, + new String[] {"Name", "Inflight", "Requested", "Buffer Size", "Back Pressure"}, + new OpenType[] {SimpleType.STRING, SimpleType.LONG, SimpleType.LONG, SimpleType.LONG, SimpleType.STRING}); + } + + private static TabularType subscriptionsTabularType() throws OpenDataException { + CompositeType ct = subscriptionsCompositeType(); + return new TabularType("subscriptions", "Information about Camel Reactive subscribers", ct, new String[]{"name"}); + } + + private static CompositeType publishersCompositeType() throws OpenDataException { + return new CompositeType("publishers", "Publishers", + new String[] {"name", "subscribers"}, + new String[] {"Name", "Subscribers"}, + new OpenType[] {SimpleType.STRING, SimpleType.INTEGER}); + } + + private static TabularType publishersTabularType() throws OpenDataException { + CompositeType ct = publishersCompositeType(); + return new TabularType("publishers", "Information about Camel Reactive publishers", ct, new String[]{"name"}); + } + } http://git-wip-us.apache.org/repos/asf/camel/blob/028e810b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscriber.java ---------------------------------------------------------------------- diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscriber.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscriber.java index 4a232df..dba42f0 100644 --- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscriber.java +++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscriber.java @@ -20,6 +20,7 @@ import java.io.Closeable; import java.io.IOException; import org.apache.camel.Exchange; +import org.apache.camel.component.reactive.streams.ReactiveStreamsBackpressureStrategy; import org.apache.camel.component.reactive.streams.ReactiveStreamsConsumer; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; @@ -207,4 +208,12 @@ public class CamelSubscriber implements Subscriber<Exchange>, Closeable { return 0; } } + + public ReactiveStreamsBackpressureStrategy getBackpressureStrategy() { + if (subscription != null && subscription instanceof CamelSubscription) { + return ((CamelSubscription) subscription).getBackpressureStrategy(); + } else { + return null; + } + } } http://git-wip-us.apache.org/repos/asf/camel/blob/028e810b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscription.java ---------------------------------------------------------------------- diff --git a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscription.java b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscription.java index 37b6670..431ca6d 100644 --- a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscription.java +++ b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscription.java @@ -248,4 +248,8 @@ public class CamelSubscription implements Subscription { public long getBufferSize() { return buffer.size(); } + + public ReactiveStreamsBackpressureStrategy getBackpressureStrategy() { + return backpressureStrategy; + } }