Updated Branches: refs/heads/master 993c3ca19 -> c1d72c22f
CAMEL-6638: Polished and added support for async router on consumer Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/c1d72c22 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/c1d72c22 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/c1d72c22 Branch: refs/heads/master Commit: c1d72c22ffa521bcbb2c8c9a7dff658512eeedcc Parents: 993c3ca Author: Claus Ibsen <davscl...@apache.org> Authored: Thu Aug 15 10:46:03 2013 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Thu Aug 15 10:46:03 2013 +0200 ---------------------------------------------------------------------- .../camel/component/vertx/VertxComponent.java | 8 ++++ .../camel/component/vertx/VertxConsumer.java | 39 ++++++++++++++------ .../camel/component/vertx/VertxProducer.java | 7 ++++ 3 files changed, 42 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/c1d72c22/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxComponent.java ---------------------------------------------------------------------- diff --git a/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxComponent.java b/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxComponent.java index b79b652..57ad087 100644 --- a/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxComponent.java +++ b/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxComponent.java @@ -24,6 +24,8 @@ import org.apache.camel.ComponentConfiguration; import org.apache.camel.Endpoint; import org.apache.camel.impl.UriEndpointComponent; import org.apache.camel.spi.EndpointCompleter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.vertx.java.core.Vertx; import org.vertx.java.core.VertxFactory; @@ -31,6 +33,7 @@ import org.vertx.java.core.VertxFactory; * A Camel Component for <a href="http://vertx.io/">vert.x</a> */ public class VertxComponent extends UriEndpointComponent implements EndpointCompleter { + private static final Logger LOG = LoggerFactory.getLogger(VertxComponent.class); private Vertx vertx; private String host = "127.0.0.1"; private int port = 5701; @@ -85,8 +88,10 @@ public class VertxComponent extends UriEndpointComponent implements EndpointComp if (vertx == null) { // lets using a host / port if a host name is specified if (host != null && host.length() > 0) { + LOG.debug("Creating Vertx {}:{}", host, port); vertx = VertxFactory.newVertx(port, host); } else { + LOG.debug("Creating Vertx"); vertx = VertxFactory.newVertx(); } } @@ -95,5 +100,8 @@ public class VertxComponent extends UriEndpointComponent implements EndpointComp @Override protected void doStop() throws Exception { super.doStop(); + + LOG.debug("Stopping Vertx {}", vertx); + vertx.stop(); } } http://git-wip-us.apache.org/repos/asf/camel/blob/c1d72c22/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxConsumer.java b/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxConsumer.java index ba1927e..66f2a95 100644 --- a/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxConsumer.java +++ b/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxConsumer.java @@ -16,6 +16,7 @@ */ package org.apache.camel.component.vertx; +import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.impl.DefaultConsumer; @@ -39,25 +40,39 @@ public class VertxConsumer extends DefaultConsumer { this.endpoint = endpoint; } + protected void onEventBusEvent(final Message event) { + LOG.debug("onEvent {}", event); + + final Exchange exchange = endpoint.createExchange(); + exchange.getIn().setBody(event.body()); + + try { + getAsyncProcessor().process(exchange, new AsyncCallback() { + @Override + public void done(boolean doneSync) { + // noop + } + }); + } catch (Exception e) { + getExceptionHandler().handleException("Error processing Vertx event: " + event, exchange, e); + } + } + protected void doStart() throws Exception { + if (LOG.isDebugEnabled()) { + LOG.debug("Registering EventBus handler on address {}", endpoint.getAddress()); + } + endpoint.getEventBus().registerHandler(endpoint.getAddress(), handler); super.doStart(); } protected void doStop() throws Exception { + if (LOG.isDebugEnabled()) { + LOG.debug("Unregistering EventBus handler on address {}", endpoint.getAddress()); + } + endpoint.getEventBus().unregisterHandler(endpoint.getAddress(), handler); super.doStop(); } - - protected void onEventBusEvent(Message event) { - LOG.debug("onEvent {}", event); - - Exchange exchange = endpoint.createExchange(); - exchange.getIn().setBody(event.body()); - try { - getProcessor().process(exchange); - } catch (Exception e) { - getExceptionHandler().handleException("Error processing vertx message " + event, exchange, e); - } - } } http://git-wip-us.apache.org/repos/asf/camel/blob/c1d72c22/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxProducer.java b/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxProducer.java index 917eed2..1597dd7 100644 --- a/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxProducer.java +++ b/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxProducer.java @@ -20,12 +20,16 @@ import org.apache.camel.Exchange; import org.apache.camel.InvalidPayloadRuntimeException; import org.apache.camel.Message; import org.apache.camel.impl.DefaultProducer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.vertx.java.core.eventbus.EventBus; import org.vertx.java.core.json.JsonArray; import org.vertx.java.core.json.JsonObject; public class VertxProducer extends DefaultProducer { + private static final Logger LOG = LoggerFactory.getLogger(VertxProducer.class); + public VertxProducer(VertxEndpoint endpoint) { super(endpoint); } @@ -43,11 +47,13 @@ public class VertxProducer extends DefaultProducer { JsonObject jsonObject = in.getBody(JsonObject.class); if (jsonObject != null) { + LOG.debug("Publishing to: with JsonObject: {}", address, jsonObject); eventBus.publish(address, jsonObject); return; } JsonArray jsonArray = in.getBody(JsonArray.class); if (jsonArray != null) { + LOG.debug("Publishing to: with JsonArray: {}", address, jsonArray); eventBus.publish(address, jsonArray); return; } @@ -55,6 +61,7 @@ public class VertxProducer extends DefaultProducer { // and fallback and use string which almost all can be converted String text = in.getBody(String.class); if (text != null) { + LOG.debug("Publishing to: with String: {}", address, text); eventBus.publish(address, new JsonObject(text)); return; }