Repository: camel
Updated Branches:
  refs/heads/master db7cd8e1d -> 028e810bb


CAMEL-11122: camel-reactive-streams - Add more JMX information


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/028e810b
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/028e810b
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/028e810b

Branch: refs/heads/master
Commit: 028e810bbc9656e6c8d8ba7b29281efffa6a5de5
Parents: bfb151b
Author: Claus Ibsen <davscl...@apache.org>
Authored: Sat Apr 8 15:28:32 2017 +0200
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Sat Apr 8 15:41:40 2017 +0200

----------------------------------------------------------------------
 .../streams/ReactiveStreamsConsumer.java        |  24 +----
 .../api/CamelReactiveStreamsService.java        |   1 -
 .../reactive/streams/engine/CamelPublisher.java |   6 +-
 .../engine/CamelReactiveStreamsServiceImpl.java | 101 ++++++++++++++++++-
 .../streams/engine/CamelSubscriber.java         |   9 ++
 .../streams/engine/CamelSubscription.java       |   4 +
 6 files changed, 116 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/028e810b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsConsumer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsConsumer.java
 
b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsConsumer.java
index 8661200..3724585 100644
--- 
a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsConsumer.java
+++ 
b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsConsumer.java
@@ -21,11 +21,8 @@ import java.util.concurrent.ExecutorService;
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
-import org.apache.camel.api.management.ManagedAttribute;
-import org.apache.camel.api.management.ManagedResource;
 import org.apache.camel.component.reactive.streams.api.CamelReactiveStreams;
 import 
org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService;
-import org.apache.camel.component.reactive.streams.engine.CamelSubscriber;
 import org.apache.camel.impl.DefaultConsumer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -33,7 +30,6 @@ import org.slf4j.LoggerFactory;
 /**
  * The Camel reactive-streams consumer.
  */
-@ManagedResource(description = "Managed ReactiveStreamsConsumer")
 public class ReactiveStreamsConsumer extends DefaultConsumer {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(ReactiveStreamsConsumer.class);
@@ -44,8 +40,6 @@ public class ReactiveStreamsConsumer extends DefaultConsumer {
 
     private CamelReactiveStreamsService service;
 
-    private volatile CamelSubscriber subscriber;
-
     public ReactiveStreamsConsumer(ReactiveStreamsEndpoint endpoint, Processor 
processor) {
         super(endpoint, processor);
         this.endpoint = endpoint;
@@ -62,14 +56,13 @@ public class ReactiveStreamsConsumer extends 
DefaultConsumer {
             executor = 
getEndpoint().getCamelContext().getExecutorServiceManager().newFixedThreadPool(this,
 getEndpoint().getEndpointUri(), poolSize);
         }
 
-        this.subscriber = 
this.service.attachCamelConsumer(endpoint.getStream(), this);
+        this.service.attachCamelConsumer(endpoint.getStream(), this);
     }
 
     @Override
     protected void doStop() throws Exception {
         super.doStop();
         this.service.detachCamelConsumer(endpoint.getStream());
-        this.subscriber = null;
 
         if (executor != null) {
             
endpoint.getCamelContext().getExecutorServiceManager().shutdownNow(executor);
@@ -128,19 +121,4 @@ public class ReactiveStreamsConsumer extends 
DefaultConsumer {
         return endpoint;
     }
 
-    @ManagedAttribute(description = "Number of inflight messages")
-    public long getInflightCount() {
-        return subscriber != null ? subscriber.getInflightCount() : 0;
-    }
-
-    @ManagedAttribute(description = "Number of messages to be requested on 
next request")
-    public long getToBeRequested() {
-        return subscriber != null ? subscriber.getRequested() : 0;
-    }
-
-    @ManagedAttribute(description = "Number of pending messages in the buffer")
-    public long getBufferSize() {
-        return subscriber != null ? subscriber.getBufferSize() : 0;
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/028e810b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreamsService.java
----------------------------------------------------------------------
diff --git 
a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreamsService.java
 
b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreamsService.java
index bdd316e..91b16d4 100644
--- 
a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreamsService.java
+++ 
b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/api/CamelReactiveStreamsService.java
@@ -20,7 +20,6 @@ import java.util.function.Function;
 
 import org.apache.camel.CamelContextAware;
 import org.apache.camel.Exchange;
-import org.apache.camel.Service;
 import org.apache.camel.StaticService;
 import org.apache.camel.component.reactive.streams.ReactiveStreamsConsumer;
 import org.apache.camel.component.reactive.streams.ReactiveStreamsProducer;

http://git-wip-us.apache.org/repos/asf/camel/blob/028e810b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelPublisher.java
----------------------------------------------------------------------
diff --git 
a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelPublisher.java
 
b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelPublisher.java
index 5cafcd2..f90f19c 100644
--- 
a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelPublisher.java
+++ 
b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelPublisher.java
@@ -78,7 +78,7 @@ public class CamelPublisher implements 
Publisher<StreamPayload<Exchange>>, AutoC
         DispatchCallback<Exchange> originalCallback = data.getCallback();
         if (originalCallback != null && subs.size() > 0) {
             // When multiple subscribers have an active subscription,
-            // we aknowledge the exchange once it has been delivered to every
+            // we acknowledge the exchange once it has been delivered to every
             // subscriber (or their subscription is cancelled)
             AtomicInteger counter = new AtomicInteger(subs.size());
             // Use just the first exception in the callback when multiple 
exceptions are thrown
@@ -131,4 +131,8 @@ public class CamelPublisher implements 
Publisher<StreamPayload<Exchange>>, AutoC
         }
         subscriptions.clear();
     }
+
+    public int getSubscriptionSize() {
+        return subscriptions.size();
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/028e810b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelReactiveStreamsServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelReactiveStreamsServiceImpl.java
 
b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelReactiveStreamsServiceImpl.java
index 1abbd94..c3ccc42 100644
--- 
a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelReactiveStreamsServiceImpl.java
+++ 
b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelReactiveStreamsServiceImpl.java
@@ -20,10 +20,21 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.function.Function;
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.CompositeDataSupport;
+import javax.management.openmbean.CompositeType;
+import javax.management.openmbean.OpenDataException;
+import javax.management.openmbean.OpenType;
+import javax.management.openmbean.SimpleType;
+import javax.management.openmbean.TabularData;
+import javax.management.openmbean.TabularDataSupport;
+import javax.management.openmbean.TabularType;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
+import org.apache.camel.api.management.ManagedOperation;
+import org.apache.camel.api.management.ManagedResource;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.reactive.streams.ReactiveStreamsComponent;
 import org.apache.camel.component.reactive.streams.ReactiveStreamsConsumer;
@@ -36,13 +47,16 @@ import 
org.apache.camel.component.reactive.streams.util.MonoPublisher;
 import org.apache.camel.component.reactive.streams.util.UnwrapStreamProcessor;
 import org.apache.camel.impl.DefaultExchange;
 import org.apache.camel.spi.Synchronization;
+import org.apache.camel.support.ServiceSupport;
+import org.apache.camel.util.ObjectHelper;
 import org.reactivestreams.Publisher;
 import org.reactivestreams.Subscriber;
 
 /**
  * The default implementation of the reactive streams service.
  */
-public class CamelReactiveStreamsServiceImpl implements 
CamelReactiveStreamsService {
+@ManagedResource(description = "Managed CamelReactiveStreamsService")
+public class CamelReactiveStreamsServiceImpl extends ServiceSupport implements 
CamelReactiveStreamsService {
 
     private CamelContext context;
 
@@ -60,16 +74,17 @@ public class CamelReactiveStreamsServiceImpl implements 
CamelReactiveStreamsServ
     }
 
     @Override
-    public void start() throws Exception {
+    protected void doStart() throws Exception {
         ReactiveStreamsComponent component = 
context.getComponent("reactive-streams", ReactiveStreamsComponent.class);
         ReactiveStreamsEngineConfiguration config = 
component.getInternalEngineConfiguration();
         this.workerPool = 
context.getExecutorServiceManager().newThreadPool(this, 
config.getThreadPoolName(), config.getThreadPoolMinSize(), 
config.getThreadPoolMaxSize());
     }
 
     @Override
-    public void stop() throws Exception {
+    protected void doStop() throws Exception {
         if (this.workerPool != null) {
             context.getExecutorServiceManager().shutdownNow(this.workerPool);
+            this.workerPool = null;
         }
     }
 
@@ -249,7 +264,6 @@ public class CamelReactiveStreamsServiceImpl implements 
CamelReactiveStreamsServ
         return data -> to(uri, data, type);
     }
 
-
     @Override
     public void process(String uri, Function<? super Publisher<Exchange>, ?> 
processor) {
         try {
@@ -320,4 +334,83 @@ public class CamelReactiveStreamsServiceImpl implements 
CamelReactiveStreamsServ
         return exchange;
     }
 
+    @ManagedOperation(description = "Information about Camel Reactive 
subscribers")
+    public TabularData camelSubscribers() {
+        try {
+            final TabularData answer = new 
TabularDataSupport(subscriptionsTabularType());
+
+            subscribers.forEach((k, v) -> {
+                try {
+                    String name = k;
+                    long inflight = v.getInflightCount();
+                    long requested = v.getRequested();
+                    long bufferSize = v.getBufferSize();
+                    String backpressure = v.getBackpressureStrategy() != null 
? v.getBackpressureStrategy().name() : "";
+
+                    CompositeType ct = subscriptionsCompositeType();
+                    CompositeData data = new CompositeDataSupport(ct,
+                        new String[] {"name", "inflight", "requested", "buffer 
size", "back pressure"},
+                        new Object[] {name, inflight, requested, bufferSize, 
backpressure});
+                    answer.put(data);
+                } catch (Exception e) {
+                    throw ObjectHelper.wrapRuntimeCamelException(e);
+                }
+            });
+
+            return answer;
+        } catch (Exception e) {
+            throw ObjectHelper.wrapRuntimeCamelException(e);
+        }
+    }
+
+    @ManagedOperation(description = "Information about Camel Reactive 
publishers")
+    public TabularData camelPublishers() {
+        try {
+            final TabularData answer = new 
TabularDataSupport(publishersTabularType());
+
+            publishers.forEach((k, v) -> {
+                try {
+                    String name = k;
+                    int subscribers = v.getSubscriptionSize();
+
+                    CompositeType ct = publishersCompositeType();
+                    CompositeData data = new CompositeDataSupport(ct,
+                        new String[] {"name", "subscribers"},
+                        new Object[] {name, subscribers});
+                    answer.put(data);
+                } catch (Exception e) {
+                    throw ObjectHelper.wrapRuntimeCamelException(e);
+                }
+            });
+
+            return answer;
+        } catch (Exception e) {
+            throw ObjectHelper.wrapRuntimeCamelException(e);
+        }
+    }
+
+    private static CompositeType subscriptionsCompositeType() throws 
OpenDataException {
+        return new CompositeType("subscriptions", "Subscriptions",
+            new String[] {"name", "inflight", "requested", "buffer size", 
"back pressure"},
+            new String[] {"Name", "Inflight", "Requested", "Buffer Size", 
"Back Pressure"},
+            new OpenType[] {SimpleType.STRING, SimpleType.LONG, 
SimpleType.LONG, SimpleType.LONG, SimpleType.STRING});
+    }
+
+    private static TabularType subscriptionsTabularType() throws 
OpenDataException {
+        CompositeType ct = subscriptionsCompositeType();
+        return new TabularType("subscriptions", "Information about Camel 
Reactive subscribers", ct, new String[]{"name"});
+    }
+
+    private static CompositeType publishersCompositeType() throws 
OpenDataException {
+        return new CompositeType("publishers", "Publishers",
+            new String[] {"name", "subscribers"},
+            new String[] {"Name", "Subscribers"},
+            new OpenType[] {SimpleType.STRING, SimpleType.INTEGER});
+    }
+
+    private static TabularType publishersTabularType() throws 
OpenDataException {
+        CompositeType ct = publishersCompositeType();
+        return new TabularType("publishers", "Information about Camel Reactive 
publishers", ct, new String[]{"name"});
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/028e810b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscriber.java
----------------------------------------------------------------------
diff --git 
a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscriber.java
 
b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscriber.java
index 4a232df..dba42f0 100644
--- 
a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscriber.java
+++ 
b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscriber.java
@@ -20,6 +20,7 @@ import java.io.Closeable;
 import java.io.IOException;
 
 import org.apache.camel.Exchange;
+import 
org.apache.camel.component.reactive.streams.ReactiveStreamsBackpressureStrategy;
 import org.apache.camel.component.reactive.streams.ReactiveStreamsConsumer;
 import org.reactivestreams.Subscriber;
 import org.reactivestreams.Subscription;
@@ -207,4 +208,12 @@ public class CamelSubscriber implements 
Subscriber<Exchange>, Closeable {
             return 0;
         }
     }
+
+    public ReactiveStreamsBackpressureStrategy getBackpressureStrategy() {
+        if (subscription != null && subscription instanceof CamelSubscription) 
{
+            return ((CamelSubscription) 
subscription).getBackpressureStrategy();
+        } else {
+            return null;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/028e810b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscription.java
----------------------------------------------------------------------
diff --git 
a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscription.java
 
b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscription.java
index 37b6670..431ca6d 100644
--- 
a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscription.java
+++ 
b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/engine/CamelSubscription.java
@@ -248,4 +248,8 @@ public class CamelSubscription implements Subscription {
     public long getBufferSize() {
         return buffer.size();
     }
+
+    public ReactiveStreamsBackpressureStrategy getBackpressureStrategy() {
+        return backpressureStrategy;
+    }
 }

Reply via email to