Camel experiment with using vertx as eventbus for routing engine.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/3e3e1803 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/3e3e1803 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/3e3e1803 Branch: refs/heads/eventbus Commit: 3e3e1803316e45035d9a51c101a1eb38d19c8508 Parents: 48219df Author: Claus Ibsen <davscl...@apache.org> Authored: Fri Apr 8 10:20:09 2016 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Fri Apr 8 10:20:09 2016 +0200 ---------------------------------------------------------------------- .../vertx/eventbus/VertxCamelProducer.java | 6 ++ .../vertx/eventbus/VertxCamelTransform.java | 102 +++++++++++++++++++ .../vertx/eventbus/VertxProcessorFactory.java | 5 + .../vertx/eventbus/VertxSendToProcessor.java | 22 +++- .../vertx/eventbus/VertxTransformProcessor.java | 60 +++++++++++ .../vertx/eventbus/VertxTransformTest.java | 71 +++++++++++++ 6 files changed, 262 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/3e3e1803/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxCamelProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxCamelProducer.java b/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxCamelProducer.java index 923ca38..7b411c9 100644 --- a/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxCamelProducer.java +++ b/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxCamelProducer.java @@ -18,6 +18,7 @@ package org.apache.camel.component.vertx.eventbus; import io.vertx.core.Handler; import io.vertx.core.Vertx; +import io.vertx.core.eventbus.DeliveryOptions; import io.vertx.core.eventbus.Message; import io.vertx.core.eventbus.MessageConsumer; import org.apache.camel.CamelContext; @@ -33,12 +34,15 @@ public class VertxCamelProducer extends ServiceSupport implements Handler<Messag private final Vertx vertx; private final String id; private MessageConsumer<Exchange> consumer; + private final DeliveryOptions options; public VertxCamelProducer(CamelContext camelContext, Vertx vertx, String id) { this.camelContext = camelContext; this.template = camelContext.createProducerTemplate(); this.vertx = vertx; this.id = id; + this.options = new DeliveryOptions(); + this.options.setCodecName("camel"); } @Override @@ -61,5 +65,7 @@ public class VertxCamelProducer extends ServiceSupport implements Handler<Messag String url = (String) exchange.removeProperty("CamelVertxUrl"); // TODO: execute blocking template.send(url, exchange); + // signal we are done + event.reply(exchange, options); } } http://git-wip-us.apache.org/repos/asf/camel/blob/3e3e1803/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxCamelTransform.java ---------------------------------------------------------------------- diff --git a/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxCamelTransform.java b/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxCamelTransform.java new file mode 100644 index 0000000..923fcaf --- /dev/null +++ b/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxCamelTransform.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.vertx.eventbus; + +import io.vertx.core.Handler; +import io.vertx.core.Vertx; +import io.vertx.core.eventbus.DeliveryOptions; +import io.vertx.core.eventbus.Message; +import io.vertx.core.eventbus.MessageConsumer; +import org.apache.camel.CamelContext; +import org.apache.camel.Exchange; +import org.apache.camel.Expression; +import org.apache.camel.impl.DefaultMessage; +import org.apache.camel.support.ServiceSupport; +import org.apache.camel.util.ExchangeHelper; + +public class VertxCamelTransform extends ServiceSupport implements Handler<Message<Exchange>> { + + private final CamelContext camelContext; + private final Vertx vertx; + private final String id; + private MessageConsumer<Exchange> consumer; + private final DeliveryOptions options; + + public VertxCamelTransform(CamelContext camelContext, Vertx vertx, String id) { + this.camelContext = camelContext; + this.vertx = vertx; + this.id = id; + this.options = new DeliveryOptions(); + this.options.setCodecName("camel"); + } + + @Override + protected void doStart() throws Exception { + consumer = vertx.eventBus().localConsumer(id, this); + } + + @Override + protected void doStop() throws Exception { + if (consumer != null) { + consumer.unregister(); + } + } + + @Override + public void handle(Message<Exchange> event) { + Exchange exchange = event.body(); + Expression expression = (Expression) exchange.removeProperty("CamelVertxExpression"); + // TODO: execute blocking + transform(exchange, expression); + // signal we are done + event.reply(exchange, options); + } + + private void transform(Exchange exchange, Expression expression) { + Object newBody = expression.evaluate(exchange, Object.class); + + if (exchange.getException() != null) { + // the expression threw an exception so we should break-out + return; + } + + boolean out = exchange.hasOut(); + org.apache.camel.Message old = out ? exchange.getOut() : exchange.getIn(); + + // create a new message container so we do not drag specialized message objects along + // but that is only needed if the old message is a specialized message + boolean copyNeeded = !(old.getClass().equals(DefaultMessage.class)); + + if (copyNeeded) { + org.apache.camel.Message msg = new DefaultMessage(); + msg.copyFrom(old); + msg.setBody(newBody); + + // replace message on exchange (must set as OUT) + ExchangeHelper.replaceMessage(exchange, msg, true); + } else { + // no copy needed so set replace value directly + old.setBody(newBody); + + // but the message must be on OUT + if (!exchange.hasOut()) { + exchange.setOut(exchange.getIn()); + } + } + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/3e3e1803/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxProcessorFactory.java ---------------------------------------------------------------------- diff --git a/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxProcessorFactory.java b/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxProcessorFactory.java index d335725..6131d34 100644 --- a/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxProcessorFactory.java +++ b/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxProcessorFactory.java @@ -17,9 +17,11 @@ package org.apache.camel.component.vertx.eventbus; import io.vertx.core.Vertx; +import org.apache.camel.Expression; import org.apache.camel.Processor; import org.apache.camel.model.ProcessorDefinition; import org.apache.camel.model.ToDefinition; +import org.apache.camel.model.TransformDefinition; import org.apache.camel.spi.ProcessorFactory; import org.apache.camel.spi.RouteContext; @@ -43,6 +45,9 @@ public class VertxProcessorFactory implements ProcessorFactory { if (def instanceof ToDefinition) { String uri = ((ToDefinition) def).getEndpointUri(); return new VertxSendToProcessor(vertx, id, uri); + } else if (def instanceof TransformDefinition) { + Expression expression = ((TransformDefinition) def).getExpression().createExpression(routeContext); + return new VertxTransformProcessor(vertx, id, expression); } throw new UnsupportedOperationException("EIP not supported yet"); http://git-wip-us.apache.org/repos/asf/camel/blob/3e3e1803/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxSendToProcessor.java ---------------------------------------------------------------------- diff --git a/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxSendToProcessor.java b/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxSendToProcessor.java index 139a289..c99dd6a 100644 --- a/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxSendToProcessor.java +++ b/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxSendToProcessor.java @@ -18,10 +18,12 @@ package org.apache.camel.component.vertx.eventbus; import io.vertx.core.Vertx; import io.vertx.core.eventbus.DeliveryOptions; +import org.apache.camel.AsyncCallback; +import org.apache.camel.AsyncProcessor; import org.apache.camel.Exchange; -import org.apache.camel.Processor; +import org.apache.camel.util.AsyncProcessorHelper; -public class VertxSendToProcessor implements Processor { +public class VertxSendToProcessor implements AsyncProcessor { private final Vertx vertx; private final String id; @@ -37,9 +39,21 @@ public class VertxSendToProcessor implements Processor { } @Override - public void process(Exchange exchange) throws Exception { + public boolean process(final Exchange exchange, final AsyncCallback callback) { // if OUT then use reply handler to update exchange with result exchange.setProperty("CamelVertxUrl", uri); - vertx.eventBus().send(VertxCamelProducer.class.getName(), exchange, options); + vertx.eventBus().send(VertxCamelProducer.class.getName(), exchange, options, (handler) -> { + if (handler.failed()) { + Throwable t = handler.cause(); + exchange.setException(t); + } + callback.done(false); + }); + return false; + } + + @Override + public void process(Exchange exchange) throws Exception { + AsyncProcessorHelper.process(this, exchange); } } http://git-wip-us.apache.org/repos/asf/camel/blob/3e3e1803/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxTransformProcessor.java ---------------------------------------------------------------------- diff --git a/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxTransformProcessor.java b/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxTransformProcessor.java new file mode 100644 index 0000000..368b12c --- /dev/null +++ b/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxTransformProcessor.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.vertx.eventbus; + +import io.vertx.core.Vertx; +import io.vertx.core.eventbus.DeliveryOptions; +import org.apache.camel.AsyncCallback; +import org.apache.camel.AsyncProcessor; +import org.apache.camel.Exchange; +import org.apache.camel.Expression; +import org.apache.camel.util.AsyncProcessorHelper; + +public class VertxTransformProcessor implements AsyncProcessor { + + private final Vertx vertx; + private final String id; + private final Expression expression; + private final DeliveryOptions options; + + public VertxTransformProcessor(Vertx vertx, String id, Expression expression) { + this.vertx = vertx; + this.id = id; + this.expression = expression; + this.options = new DeliveryOptions(); + this.options.setCodecName("camel"); + } + + @Override + public void process(Exchange exchange) throws Exception { + AsyncProcessorHelper.process(this, exchange); + } + + @Override + public boolean process(final Exchange exchange, final AsyncCallback callback) { + exchange.setProperty("CamelVertxExpression", expression); + vertx.eventBus().send(VertxCamelTransform.class.getName(), exchange, options, (handler) -> { + if (handler.failed()) { + Throwable t = handler.cause(); + exchange.setException(t); + } + callback.done(false); + }); + return false; + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/3e3e1803/components/camel-vertx/src/test/java/org/apache/camel/component/vertx/eventbus/VertxTransformTest.java ---------------------------------------------------------------------- diff --git a/components/camel-vertx/src/test/java/org/apache/camel/component/vertx/eventbus/VertxTransformTest.java b/components/camel-vertx/src/test/java/org/apache/camel/component/vertx/eventbus/VertxTransformTest.java new file mode 100644 index 0000000..c29fd48 --- /dev/null +++ b/components/camel-vertx/src/test/java/org/apache/camel/component/vertx/eventbus/VertxTransformTest.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.vertx.eventbus; + +import io.vertx.core.Vertx; +import org.apache.camel.CamelContext; +import org.apache.camel.RoutesBuilder; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.vertx.VertxBaseTestSupport; +import org.junit.Test; + +public class VertxTransformTest extends VertxBaseTestSupport { + + private Vertx vertx; + + @Override + protected CamelContext createCamelContext() throws Exception { + CamelContext context = super.createCamelContext(); + + vertx = Vertx.vertx(); + vertx.eventBus().registerCodec(new VertxExchangeCodec()); + + VertxProcessorFactory pf = new VertxProcessorFactory(vertx); + context.setProcessorFactory(pf); + + VertxCamelProducer vcp = new VertxCamelProducer(context, vertx, VertxCamelProducer.class.getName()); + context.addService(vcp); + + VertxCamelTransform vct = new VertxCamelTransform(context, vertx, VertxCamelTransform.class.getName()); + context.addService(vct); + + return context; + } + + @Test + public void testVertxTransform() throws Exception { + getMockEndpoint("mock:foo").expectedBodiesReceived("World"); + getMockEndpoint("mock:bar").expectedBodiesReceived("Hello World"); + + template.sendBody("direct:start", "World"); + + assertMockEndpointsSatisfied(); + } + + @Override + protected RoutesBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:start") + .to("mock:foo") + .transform(simple("Hello ${body}")) + .to("mock:bar"); + } + }; + } +}