Updated Branches: refs/heads/master 86a762032 -> a779d09c9
CAMEL-7176: Added support for sending replies with camel-vertx, so you can do request/reply over vertx. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/a779d09c Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/a779d09c Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/a779d09c Branch: refs/heads/master Commit: a779d09c9d9e4e630086a892888134607f295f58 Parents: 86a7620 Author: Claus Ibsen <davscl...@apache.org> Authored: Fri Feb 7 17:09:22 2014 +0100 Committer: Claus Ibsen <davscl...@apache.org> Committed: Fri Feb 7 17:09:22 2014 +0100 ---------------------------------------------------------------------- .../camel/component/vertx/VertxConsumer.java | 14 +++- .../camel/component/vertx/VertxEndpoint.java | 14 ++++ .../camel/component/vertx/VertxHelper.java | 40 +++++++++++ .../camel/component/vertx/VertxProducer.java | 76 +++++++++++++------- .../component/vertx/VertxRequestReplyTest.java | 66 +++++++++++++++++ .../component/vertx/VertxRoutePubSubTest.java | 69 ++++++++++++++++++ 6 files changed, 253 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/a779d09c/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxConsumer.java b/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxConsumer.java index 66f2a95..a805fb6 100644 --- a/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxConsumer.java +++ b/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxConsumer.java @@ -18,6 +18,7 @@ package org.apache.camel.component.vertx; import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; +import org.apache.camel.ExchangePattern; import org.apache.camel.Processor; import org.apache.camel.impl.DefaultConsumer; import org.slf4j.Logger; @@ -25,6 +26,8 @@ import org.slf4j.LoggerFactory; import org.vertx.java.core.Handler; import org.vertx.java.core.eventbus.Message; +import static org.apache.camel.component.vertx.VertxHelper.getVertxBody; + public class VertxConsumer extends DefaultConsumer { private static final Logger LOG = LoggerFactory.getLogger(VertxConsumer.class); private final VertxEndpoint endpoint; @@ -43,14 +46,21 @@ public class VertxConsumer extends DefaultConsumer { protected void onEventBusEvent(final Message event) { LOG.debug("onEvent {}", event); - final Exchange exchange = endpoint.createExchange(); + final boolean reply = event.replyAddress() != null; + final Exchange exchange = endpoint.createExchange(reply ? ExchangePattern.InOut : ExchangePattern.InOnly); exchange.getIn().setBody(event.body()); try { getAsyncProcessor().process(exchange, new AsyncCallback() { @Override public void done(boolean doneSync) { - // noop + if (reply) { + Object body = getVertxBody(exchange); + if (body != null) { + LOG.debug("Sending reply to: {} with body: {}", event.replyAddress(), body); + event.reply(body); + } + } } }); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/camel/blob/a779d09c/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxEndpoint.java b/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxEndpoint.java index 733d337..4cac188 100644 --- a/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxEndpoint.java +++ b/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxEndpoint.java @@ -33,6 +33,8 @@ public class VertxEndpoint extends DefaultEndpoint { @UriParam private String address; + @UriParam + private Boolean pubSub; public VertxEndpoint(String uri, VertxComponent component, String address) { super(uri, component); @@ -70,6 +72,18 @@ public class VertxEndpoint extends DefaultEndpoint { return address; } + public boolean isPubSub() { + return pubSub != null && pubSub; + } + + public Boolean getPubSub() { + return pubSub; + } + + public void setPubSub(Boolean pubSub) { + this.pubSub = pubSub; + } + /** * Sets the event bus address used to communicate */ http://git-wip-us.apache.org/repos/asf/camel/blob/a779d09c/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxHelper.java ---------------------------------------------------------------------- diff --git a/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxHelper.java b/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxHelper.java new file mode 100644 index 0000000..661c86d --- /dev/null +++ b/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxHelper.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.vertx; + +import org.apache.camel.Exchange; +import org.apache.camel.Message; +import org.vertx.java.core.json.JsonArray; +import org.vertx.java.core.json.JsonObject; + +public final class VertxHelper { + + private VertxHelper() { + } + + public static Object getVertxBody(Exchange exchange) { + Message msg = exchange.hasOut() ? exchange.getOut() : exchange.getIn(); + Object body = msg.getBody(JsonObject.class); + if (body == null) { + body = msg.getBody(JsonArray.class); + } + if (body == null) { + body = msg.getBody(String.class); + } + return body; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/a779d09c/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxProducer.java b/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxProducer.java index b25a448..7748b90 100644 --- a/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxProducer.java +++ b/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/VertxProducer.java @@ -16,17 +16,20 @@ */ package org.apache.camel.component.vertx; +import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; import org.apache.camel.InvalidPayloadRuntimeException; -import org.apache.camel.Message; -import org.apache.camel.impl.DefaultProducer; +import org.apache.camel.impl.DefaultAsyncProducer; +import org.apache.camel.util.ExchangeHelper; +import org.apache.camel.util.MessageHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.vertx.java.core.Handler; import org.vertx.java.core.eventbus.EventBus; -import org.vertx.java.core.json.JsonArray; -import org.vertx.java.core.json.JsonObject; -public class VertxProducer extends DefaultProducer { +import static org.apache.camel.component.vertx.VertxHelper.getVertxBody; + +public class VertxProducer extends DefaultAsyncProducer { private static final Logger LOG = LoggerFactory.getLogger(VertxProducer.class); @@ -39,32 +42,57 @@ public class VertxProducer extends DefaultProducer { return (VertxEndpoint) super.getEndpoint(); } - public void process(Exchange exchange) throws Exception { + @Override + public boolean process(Exchange exchange, AsyncCallback callback) { EventBus eventBus = getEndpoint().getEventBus(); String address = getEndpoint().getAddress(); - Message in = exchange.getIn(); + boolean reply = ExchangeHelper.isOutCapable(exchange); + boolean pubSub = getEndpoint().isPubSub(); - JsonObject jsonObject = in.getBody(JsonObject.class); - if (jsonObject != null) { - LOG.debug("Publishing to: {} with JsonObject: {}", address, jsonObject); - eventBus.publish(address, jsonObject); - return; + Object body = getVertxBody(exchange); + if (body != null) { + if (reply) { + LOG.debug("Sending to: {} with body: {}", address, body); + eventBus.send(address, body, new CamelReplyHandler(exchange, callback)); + return false; + } else { + if (pubSub) { + LOG.debug("Publishing to: {} with body: {}", address, body); + eventBus.publish(address, body); + } else { + LOG.debug("Sending to: {} with body: {}", address, body); + eventBus.send(address, body); + } + callback.done(true); + return true; + } } - JsonArray jsonArray = in.getBody(JsonArray.class); - if (jsonArray != null) { - LOG.debug("Publishing to: {} with JsonArray: {}", address, jsonArray); - eventBus.publish(address, jsonArray); - return; + + exchange.setException(new InvalidPayloadRuntimeException(exchange, String.class)); + callback.done(true); + return true; + } + + private static final class CamelReplyHandler implements Handler<org.vertx.java.core.eventbus.Message> { + + private final Exchange exchange; + private final AsyncCallback callback; + + private CamelReplyHandler(Exchange exchange, AsyncCallback callback) { + this.exchange = exchange; + this.callback = callback; } - // and fallback and use string which almost all can be converted - String text = in.getBody(String.class); - if (text != null) { - LOG.debug("Publishing to: {} with String: {}", address, text); - eventBus.publish(address, new JsonObject(text)); - return; + @Override + public void handle(org.vertx.java.core.eventbus.Message event) { + try { + // preserve headers + MessageHelper.copyHeaders(exchange.getIn(), exchange.getOut(), false); + exchange.getOut().setBody(event.body()); + } finally { + callback.done(false); + } } - throw new InvalidPayloadRuntimeException(exchange, String.class); } } http://git-wip-us.apache.org/repos/asf/camel/blob/a779d09c/components/camel-vertx/src/test/java/org/apache/camel/component/vertx/VertxRequestReplyTest.java ---------------------------------------------------------------------- diff --git a/components/camel-vertx/src/test/java/org/apache/camel/component/vertx/VertxRequestReplyTest.java b/components/camel-vertx/src/test/java/org/apache/camel/component/vertx/VertxRequestReplyTest.java new file mode 100644 index 0000000..1000d01 --- /dev/null +++ b/components/camel-vertx/src/test/java/org/apache/camel/component/vertx/VertxRequestReplyTest.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.vertx; + +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.junit.Test; + +/** + * @version + */ +public class VertxRequestReplyTest extends VertxBaseTestSupport { + + protected String startUri = "direct:start"; + protected String middleUri = "vertx:foo.middle"; + protected String resultUri = "mock:result"; + + protected MockEndpoint resultEndpoint; + protected String body1 = "Camel"; + protected String body2 = "World"; + + @Test + public void testVertxMessages() throws Exception { + resultEndpoint = context.getEndpoint(resultUri, MockEndpoint.class); + resultEndpoint.expectedBodiesReceivedInAnyOrder("Bye Camel", "Bye World"); + + String out = template.requestBody(startUri, body1, String.class); + String out2 = template.requestBody(startUri, body2, String.class); + + resultEndpoint.assertIsSatisfied(); + + assertEquals("Bye Camel", out); + assertEquals("Bye World", out2); + } + + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + public void configure() throws Exception { + // camel-vertx cannot be ran with JDK 1.6 + org.junit.Assume.assumeTrue(!isJava16()); + + VertxComponent vertx = getContext().getComponent("vertx", VertxComponent.class); + vertx.setPort(getPort()); + + from(startUri).to(middleUri).to(resultUri); + + from(middleUri) + .transform(simple("Bye ${body}")); + } + }; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/a779d09c/components/camel-vertx/src/test/java/org/apache/camel/component/vertx/VertxRoutePubSubTest.java ---------------------------------------------------------------------- diff --git a/components/camel-vertx/src/test/java/org/apache/camel/component/vertx/VertxRoutePubSubTest.java b/components/camel-vertx/src/test/java/org/apache/camel/component/vertx/VertxRoutePubSubTest.java new file mode 100644 index 0000000..d185f51 --- /dev/null +++ b/components/camel-vertx/src/test/java/org/apache/camel/component/vertx/VertxRoutePubSubTest.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.vertx; + +import java.util.List; + +import org.apache.camel.Exchange; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.junit.Test; + +/** + * @version + */ +public class VertxRoutePubSubTest extends VertxBaseTestSupport { + + protected String startUri = "vertx:foo.start?pubSub=true"; + protected String middleUri = "vertx:foo.middle?pubSub=true"; + protected String resultUri = "mock:result"; + + protected MockEndpoint resultEndpoint; + protected String body1 = "{\"id\":1,\"description\":\"Message One\"}"; + protected String body2 = "{\"id\":2,\"description\":\"Message Two\"}"; + + @Test + public void testVertxMessages() throws Exception { + resultEndpoint = context.getEndpoint(resultUri, MockEndpoint.class); + resultEndpoint.expectedBodiesReceivedInAnyOrder(body1, body2); + + template.sendBody(startUri, body1); + template.sendBody(startUri, body2); + + resultEndpoint.assertIsSatisfied(); + + List<Exchange> list = resultEndpoint.getReceivedExchanges(); + for (Exchange exchange : list) { + log.info("Received exchange: " + exchange + " headers: " + exchange.getIn().getHeaders()); + } + } + + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + public void configure() throws Exception { + // camel-vertx cannot be ran with JDK 1.6 + org.junit.Assume.assumeTrue(!isJava16()); + + VertxComponent vertx = getContext().getComponent("vertx", VertxComponent.class); + vertx.setPort(getPort()); + + from(startUri).to(middleUri); + from(middleUri).to(resultUri); + } + }; + } +} \ No newline at end of file