Camel experiment with using vertx as eventbus for routing engine.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/0479f3b5 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/0479f3b5 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/0479f3b5 Branch: refs/heads/eventbus Commit: 0479f3b5cb987b98c788fb4be376db71f1d7f0b3 Parents: 45050b1 Author: Claus Ibsen <davscl...@apache.org> Authored: Fri Apr 8 14:36:08 2016 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Fri Apr 8 14:36:08 2016 +0200 ---------------------------------------------------------------------- .../vertx/eventbus/VertxCamelFilter.java | 99 ++++++++++++++++++++ .../vertx/eventbus/VertxFilterProcessor.java | 66 +++++++++++++ .../vertx/eventbus/VertxProcessorFactory.java | 16 ++++ .../vertx/eventbus/VertxFilterTest.java | 91 ++++++++++++++++++ 4 files changed, 272 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/0479f3b5/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxCamelFilter.java ---------------------------------------------------------------------- diff --git a/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxCamelFilter.java b/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxCamelFilter.java new file mode 100644 index 0000000..938c660 --- /dev/null +++ b/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxCamelFilter.java @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.vertx.eventbus; + +import java.util.List; + +import io.vertx.core.Handler; +import io.vertx.core.Vertx; +import io.vertx.core.eventbus.DeliveryOptions; +import io.vertx.core.eventbus.Message; +import io.vertx.core.eventbus.MessageConsumer; +import org.apache.camel.CamelContext; +import org.apache.camel.Exchange; +import org.apache.camel.Predicate; +import org.apache.camel.Processor; +import org.apache.camel.ProducerTemplate; +import org.apache.camel.support.ServiceSupport; +import org.apache.camel.util.ServiceHelper; + +public class VertxCamelFilter extends ServiceSupport implements Handler<Message<Exchange>> { + + private final CamelContext camelContext; + private final Vertx vertx; + private final String id; + private MessageConsumer<Exchange> consumer; + private final DeliveryOptions options; + + public VertxCamelFilter(CamelContext camelContext, Vertx vertx, String id) { + this.camelContext = camelContext; + this.vertx = vertx; + this.id = id; + this.options = new DeliveryOptions(); + this.options.setCodecName("camel"); + } + + @Override + protected void doStart() throws Exception { + consumer = vertx.eventBus().localConsumer(id, this); + } + + @Override + protected void doStop() throws Exception { + if (consumer != null) { + consumer.unregister(); + } + } + + @Override + public void handle(Message<Exchange> event) { + Exchange exchange = event.body(); + Predicate predicate = (Predicate) exchange.removeProperty("CamelVertxPredicate"); + List<Processor> children = (List<Processor>) exchange.removeProperty("CamelVertxChildren"); + + boolean matches = false; + + try { + matches = matches(exchange, predicate); + } catch (Exception e) { + exchange.setException(e); + } + + if (matches) { + exchange.setProperty("CamelVerxReplyAddress", event.replyAddress()); + Processor child = children.get(0); + try { + child.process(exchange); + } catch (Exception e) { + // ignore + } + } else { + // signal we are done + event.reply(exchange, options); + } + } + + private boolean matches(Exchange exchange, Predicate predicate) { + boolean matches = predicate.matches(exchange); + + // set property whether the filter matches or not + exchange.setProperty(Exchange.FILTER_MATCHED, matches); + + return matches; + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/0479f3b5/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxFilterProcessor.java ---------------------------------------------------------------------- diff --git a/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxFilterProcessor.java b/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxFilterProcessor.java new file mode 100644 index 0000000..4b4833d --- /dev/null +++ b/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxFilterProcessor.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.vertx.eventbus; + +import java.util.List; + +import io.vertx.core.Vertx; +import io.vertx.core.eventbus.DeliveryOptions; +import org.apache.camel.AsyncCallback; +import org.apache.camel.AsyncProcessor; +import org.apache.camel.Exchange; +import org.apache.camel.Predicate; +import org.apache.camel.Processor; +import org.apache.camel.util.AsyncProcessorHelper; + +public class VertxFilterProcessor implements AsyncProcessor { + + private final Vertx vertx; + private final String id; + private final Predicate predicate; + private final DeliveryOptions options; + private final List<Processor> children; + + public VertxFilterProcessor(Vertx vertx, String id, Predicate predicate, List<Processor> children) { + this.vertx = vertx; + this.id = id; + this.predicate = predicate; + this.children = children; + this.options = new DeliveryOptions(); + this.options.setCodecName("camel"); + } + + @Override + public void process(Exchange exchange) throws Exception { + AsyncProcessorHelper.process(this, exchange); + } + + @Override + public boolean process(final Exchange exchange, final AsyncCallback callback) { + exchange.setProperty("CamelVertxPredicate", predicate); + exchange.setProperty("CamelVertxChildren", children); + vertx.eventBus().send(VertxCamelFilter.class.getName(), exchange, options, (handler) -> { + if (handler.failed()) { + Throwable t = handler.cause(); + exchange.setException(t); + } + callback.done(false); + }); + return false; + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/0479f3b5/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxProcessorFactory.java ---------------------------------------------------------------------- diff --git a/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxProcessorFactory.java b/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxProcessorFactory.java index 6131d34..5c762cf 100644 --- a/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxProcessorFactory.java +++ b/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxProcessorFactory.java @@ -16,9 +16,14 @@ */ package org.apache.camel.component.vertx.eventbus; +import java.util.ArrayList; +import java.util.List; + import io.vertx.core.Vertx; import org.apache.camel.Expression; +import org.apache.camel.Predicate; import org.apache.camel.Processor; +import org.apache.camel.model.FilterDefinition; import org.apache.camel.model.ProcessorDefinition; import org.apache.camel.model.ToDefinition; import org.apache.camel.model.TransformDefinition; @@ -35,6 +40,9 @@ public class VertxProcessorFactory implements ProcessorFactory { @Override public Processor createChildProcessor(RouteContext routeContext, ProcessorDefinition<?> def, boolean mandatory) throws Exception { + // let the classic camel-core create the child processor, which end up calling the createProcessor below + String id = def.idOrCreate(routeContext.getCamelContext().getNodeIdFactory()); + System.out.println("Child id" + id); return null; } @@ -48,6 +56,14 @@ public class VertxProcessorFactory implements ProcessorFactory { } else if (def instanceof TransformDefinition) { Expression expression = ((TransformDefinition) def).getExpression().createExpression(routeContext); return new VertxTransformProcessor(vertx, id, expression); + } else if (def instanceof FilterDefinition) { + Predicate predicate = ((FilterDefinition) def).getExpression().createPredicate(routeContext); + List<Processor> children = new ArrayList<>(); + for (ProcessorDefinition childDef : def.getOutputs()) { + Processor child = createProcessor(routeContext, childDef); + children.add(child); + } + return new VertxFilterProcessor(vertx, id, predicate, children); } throw new UnsupportedOperationException("EIP not supported yet"); http://git-wip-us.apache.org/repos/asf/camel/blob/0479f3b5/components/camel-vertx/src/test/java/org/apache/camel/component/vertx/eventbus/VertxFilterTest.java ---------------------------------------------------------------------- diff --git a/components/camel-vertx/src/test/java/org/apache/camel/component/vertx/eventbus/VertxFilterTest.java b/components/camel-vertx/src/test/java/org/apache/camel/component/vertx/eventbus/VertxFilterTest.java new file mode 100644 index 0000000..51cd7d4 --- /dev/null +++ b/components/camel-vertx/src/test/java/org/apache/camel/component/vertx/eventbus/VertxFilterTest.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.vertx.eventbus; + +import io.vertx.core.Vertx; +import org.apache.camel.CamelContext; +import org.apache.camel.RoutesBuilder; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.vertx.VertxBaseTestSupport; +import org.junit.Test; + +public class VertxFilterTest extends VertxBaseTestSupport { + + private Vertx vertx; + + @Override + protected CamelContext createCamelContext() throws Exception { + CamelContext context = super.createCamelContext(); + + vertx = Vertx.vertx(); + vertx.eventBus().registerCodec(new VertxExchangeCodec()); + + VertxProcessorFactory pf = new VertxProcessorFactory(vertx); + context.setProcessorFactory(pf); + + VertxCamelProducer vcp = new VertxCamelProducer(context, vertx, VertxCamelProducer.class.getName()); + context.addService(vcp); + + VertxCamelTransform vct = new VertxCamelTransform(context, vertx, VertxCamelTransform.class.getName()); + context.addService(vct); + + VertxCamelFilter vcf = new VertxCamelFilter(context, vertx, VertxCamelFilter.class.getName()); + context.addService(vcf); + + return context; + } + + @Test + public void testVertxFilter() throws Exception { + getMockEndpoint("mock:foo").expectedMessageCount(1); + getMockEndpoint("mock:bar").expectedMessageCount(1); + getMockEndpoint("mock:baz").expectedMessageCount(1); + getMockEndpoint("mock:end").expectedMessageCount(1); + + template.sendBody("direct:start", "Hello World"); + + assertMockEndpointsSatisfied(); + } + + @Test + public void testVertxFilterFalse() throws Exception { + getMockEndpoint("mock:foo").expectedMessageCount(1); + getMockEndpoint("mock:bar").expectedMessageCount(0); + getMockEndpoint("mock:baz").expectedMessageCount(0); + getMockEndpoint("mock:end").expectedMessageCount(1); + + template.sendBody("direct:start", "Bye World"); + + assertMockEndpointsSatisfied(); + } + + @Override + protected RoutesBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:start") + .to("mock:foo") + .filter(body().contains("Hello")) + .to("mock:bar") + .to("mock:baz") + .end() + .to("mock:end"); + } + }; + } +}