Updated Branches:
  refs/heads/master 993c3ca19 -> c1d72c22f

CAMEL-6638: Polished and added support for async router on consumer


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/c1d72c22
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/c1d72c22
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/c1d72c22

Branch: refs/heads/master
Commit: c1d72c22ffa521bcbb2c8c9a7dff658512eeedcc
Parents: 993c3ca
Author: Claus Ibsen <davscl...@apache.org>
Authored: Thu Aug 15 10:46:03 2013 +0200
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Thu Aug 15 10:46:03 2013 +0200

----------------------------------------------------------------------
 .../camel/component/vertx/VertxComponent.java   |  8 ++++
 .../camel/component/vertx/VertxConsumer.java    | 39 ++++++++++++++------
 .../camel/component/vertx/VertxProducer.java    |  7 ++++
 3 files changed, 42 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/c1d72c22/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxComponent.java
----------------------------------------------------------------------
diff --git 
a/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxComponent.java
 
b/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxComponent.java
index b79b652..57ad087 100644
--- 
a/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxComponent.java
+++ 
b/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxComponent.java
@@ -24,6 +24,8 @@ import org.apache.camel.ComponentConfiguration;
 import org.apache.camel.Endpoint;
 import org.apache.camel.impl.UriEndpointComponent;
 import org.apache.camel.spi.EndpointCompleter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.vertx.java.core.Vertx;
 import org.vertx.java.core.VertxFactory;
 
@@ -31,6 +33,7 @@ import org.vertx.java.core.VertxFactory;
  * A Camel Component for <a href="http://vertx.io/";>vert.x</a>
  */
 public class VertxComponent extends UriEndpointComponent implements 
EndpointCompleter {
+    private static final Logger LOG = 
LoggerFactory.getLogger(VertxComponent.class);
     private Vertx vertx;
     private String host = "127.0.0.1";
     private int port = 5701;
@@ -85,8 +88,10 @@ public class VertxComponent extends UriEndpointComponent 
implements EndpointComp
         if (vertx == null) {
             // lets using a host / port if a host name is specified
             if (host != null && host.length() > 0) {
+                LOG.debug("Creating Vertx {}:{}", host, port);
                 vertx = VertxFactory.newVertx(port, host);
             } else {
+                LOG.debug("Creating Vertx");
                 vertx = VertxFactory.newVertx();
             }
         }
@@ -95,5 +100,8 @@ public class VertxComponent extends UriEndpointComponent 
implements EndpointComp
     @Override
     protected void doStop() throws Exception {
         super.doStop();
+
+        LOG.debug("Stopping Vertx {}", vertx);
+        vertx.stop();
     }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/c1d72c22/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxConsumer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxConsumer.java
 
b/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxConsumer.java
index ba1927e..66f2a95 100644
--- 
a/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxConsumer.java
+++ 
b/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxConsumer.java
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.component.vertx;
 
+import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.impl.DefaultConsumer;
@@ -39,25 +40,39 @@ public class VertxConsumer extends DefaultConsumer {
         this.endpoint = endpoint;
     }
 
+    protected void onEventBusEvent(final Message event) {
+        LOG.debug("onEvent {}", event);
+
+        final Exchange exchange = endpoint.createExchange();
+        exchange.getIn().setBody(event.body());
+
+        try {
+            getAsyncProcessor().process(exchange, new AsyncCallback() {
+                @Override
+                public void done(boolean doneSync) {
+                    // noop
+                }
+            });
+        } catch (Exception e) {
+            getExceptionHandler().handleException("Error processing Vertx 
event: " + event, exchange, e);
+        }
+    }
+
     protected void doStart() throws Exception {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Registering EventBus handler on address {}", 
endpoint.getAddress());
+        }
+
         endpoint.getEventBus().registerHandler(endpoint.getAddress(), handler);
         super.doStart();
     }
 
     protected void doStop() throws Exception {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Unregistering EventBus handler on address {}", 
endpoint.getAddress());
+        }
+
         endpoint.getEventBus().unregisterHandler(endpoint.getAddress(), 
handler);
         super.doStop();
     }
-
-    protected void onEventBusEvent(Message event) {
-        LOG.debug("onEvent {}", event);
-
-        Exchange exchange = endpoint.createExchange();
-        exchange.getIn().setBody(event.body());
-        try {
-            getProcessor().process(exchange);
-        } catch (Exception e) {
-            getExceptionHandler().handleException("Error processing vertx 
message " + event, exchange, e);
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/c1d72c22/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxProducer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxProducer.java
 
b/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxProducer.java
index 917eed2..1597dd7 100644
--- 
a/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxProducer.java
+++ 
b/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxProducer.java
@@ -20,12 +20,16 @@ import org.apache.camel.Exchange;
 import org.apache.camel.InvalidPayloadRuntimeException;
 import org.apache.camel.Message;
 import org.apache.camel.impl.DefaultProducer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.vertx.java.core.eventbus.EventBus;
 import org.vertx.java.core.json.JsonArray;
 import org.vertx.java.core.json.JsonObject;
 
 public class VertxProducer extends DefaultProducer {
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(VertxProducer.class);
+
     public VertxProducer(VertxEndpoint endpoint) {
         super(endpoint);
     }
@@ -43,11 +47,13 @@ public class VertxProducer extends DefaultProducer {
 
         JsonObject jsonObject = in.getBody(JsonObject.class);
         if (jsonObject != null) {
+            LOG.debug("Publishing to: with JsonObject: {}", address, 
jsonObject);
             eventBus.publish(address, jsonObject);
             return;
         }
         JsonArray jsonArray = in.getBody(JsonArray.class);
         if (jsonArray != null) {
+            LOG.debug("Publishing to: with JsonArray: {}", address, jsonArray);
             eventBus.publish(address, jsonArray);
             return;
         }
@@ -55,6 +61,7 @@ public class VertxProducer extends DefaultProducer {
         // and fallback and use string which almost all can be converted
         String text = in.getBody(String.class);
         if (text != null) {
+            LOG.debug("Publishing to: with String: {}", address, text);
             eventBus.publish(address, new JsonObject(text));
             return;
         }

Reply via email to