Repository: camel Updated Branches: refs/heads/eventbus [created] 9aafe35f3
Camel experiment with using vertx as eventbus for routing engine. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/48219df4 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/48219df4 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/48219df4 Branch: refs/heads/eventbus Commit: 48219df43b5c81181d687456083ede593c4a6cec Parents: dcdbf48 Author: Claus Ibsen <davscl...@apache.org> Authored: Fri Apr 8 10:00:13 2016 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Fri Apr 8 10:00:13 2016 +0200 ---------------------------------------------------------------------- .../vertx/eventbus/VertxCamelProducer.java | 65 +++++++++++++++++++ .../vertx/eventbus/VertxExchangeCodec.java | 51 +++++++++++++++ .../vertx/eventbus/VertxProcessorFactory.java | 50 +++++++++++++++ .../vertx/eventbus/VertxSendToProcessor.java | 45 +++++++++++++ .../vertx/eventbus/VertxSendToTest.java | 67 ++++++++++++++++++++ .../src/test/resources/log4j.properties | 2 +- 6 files changed, 279 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/48219df4/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxCamelProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxCamelProducer.java b/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxCamelProducer.java new file mode 100644 index 0000000..923ca38 --- /dev/null +++ b/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxCamelProducer.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.vertx.eventbus; + +import io.vertx.core.Handler; +import io.vertx.core.Vertx; +import io.vertx.core.eventbus.Message; +import io.vertx.core.eventbus.MessageConsumer; +import org.apache.camel.CamelContext; +import org.apache.camel.Exchange; +import org.apache.camel.ProducerTemplate; +import org.apache.camel.support.ServiceSupport; +import org.apache.camel.util.ServiceHelper; + +public class VertxCamelProducer extends ServiceSupport implements Handler<Message<Exchange>> { + + private final CamelContext camelContext; + private final ProducerTemplate template; + private final Vertx vertx; + private final String id; + private MessageConsumer<Exchange> consumer; + + public VertxCamelProducer(CamelContext camelContext, Vertx vertx, String id) { + this.camelContext = camelContext; + this.template = camelContext.createProducerTemplate(); + this.vertx = vertx; + this.id = id; + } + + @Override + protected void doStart() throws Exception { + consumer = vertx.eventBus().localConsumer(id, this); + ServiceHelper.startService(template); + } + + @Override + protected void doStop() throws Exception { + if (consumer != null) { + consumer.unregister(); + } + ServiceHelper.stopService(template); + } + + @Override + public void handle(Message<Exchange> event) { + Exchange exchange = event.body(); + String url = (String) exchange.removeProperty("CamelVertxUrl"); + // TODO: execute blocking + template.send(url, exchange); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/48219df4/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxExchangeCodec.java ---------------------------------------------------------------------- diff --git a/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxExchangeCodec.java b/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxExchangeCodec.java new file mode 100644 index 0000000..ac7a33f --- /dev/null +++ b/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxExchangeCodec.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.vertx.eventbus; + +import io.vertx.core.buffer.Buffer; +import io.vertx.core.eventbus.MessageCodec; +import org.apache.camel.Exchange; + +public class VertxExchangeCodec implements MessageCodec<Exchange, Exchange> { + + @Override + public void encodeToWire(Buffer buffer, Exchange exchange) { + // noop + System.out.println("xxx"); + } + + @Override + public Exchange decodeFromWire(int pos, Buffer buffer) { + System.out.println("yyy"); + return null; + } + + @Override + public Exchange transform(Exchange exchange) { + return exchange; + } + + @Override + public String name() { + return "camel"; + } + + @Override + public byte systemCodecID() { + return -1; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/48219df4/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxProcessorFactory.java ---------------------------------------------------------------------- diff --git a/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxProcessorFactory.java b/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxProcessorFactory.java new file mode 100644 index 0000000..d335725 --- /dev/null +++ b/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxProcessorFactory.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.vertx.eventbus; + +import io.vertx.core.Vertx; +import org.apache.camel.Processor; +import org.apache.camel.model.ProcessorDefinition; +import org.apache.camel.model.ToDefinition; +import org.apache.camel.spi.ProcessorFactory; +import org.apache.camel.spi.RouteContext; + +public class VertxProcessorFactory implements ProcessorFactory { + + private final Vertx vertx; + + public VertxProcessorFactory(Vertx vertx) { + this.vertx = vertx; + } + + @Override + public Processor createChildProcessor(RouteContext routeContext, ProcessorDefinition<?> def, boolean mandatory) throws Exception { + return null; + } + + @Override + public Processor createProcessor(RouteContext routeContext, ProcessorDefinition<?> def) throws Exception { + String id = def.idOrCreate(routeContext.getCamelContext().getNodeIdFactory()); + + if (def instanceof ToDefinition) { + String uri = ((ToDefinition) def).getEndpointUri(); + return new VertxSendToProcessor(vertx, id, uri); + } + + throw new UnsupportedOperationException("EIP not supported yet"); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/48219df4/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxSendToProcessor.java ---------------------------------------------------------------------- diff --git a/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxSendToProcessor.java b/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxSendToProcessor.java new file mode 100644 index 0000000..139a289 --- /dev/null +++ b/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxSendToProcessor.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.vertx.eventbus; + +import io.vertx.core.Vertx; +import io.vertx.core.eventbus.DeliveryOptions; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; + +public class VertxSendToProcessor implements Processor { + + private final Vertx vertx; + private final String id; + private final String uri; + private final DeliveryOptions options; + + public VertxSendToProcessor(Vertx vertx, String id, String uri) { + this.vertx = vertx; + this.id = id; + this.uri = uri; + this.options = new DeliveryOptions(); + this.options.setCodecName("camel"); + } + + @Override + public void process(Exchange exchange) throws Exception { + // if OUT then use reply handler to update exchange with result + exchange.setProperty("CamelVertxUrl", uri); + vertx.eventBus().send(VertxCamelProducer.class.getName(), exchange, options); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/48219df4/components/camel-vertx/src/test/java/org/apache/camel/component/vertx/eventbus/VertxSendToTest.java ---------------------------------------------------------------------- diff --git a/components/camel-vertx/src/test/java/org/apache/camel/component/vertx/eventbus/VertxSendToTest.java b/components/camel-vertx/src/test/java/org/apache/camel/component/vertx/eventbus/VertxSendToTest.java new file mode 100644 index 0000000..b6c7ded --- /dev/null +++ b/components/camel-vertx/src/test/java/org/apache/camel/component/vertx/eventbus/VertxSendToTest.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.vertx.eventbus; + +import io.vertx.core.Vertx; +import org.apache.camel.CamelContext; +import org.apache.camel.RoutesBuilder; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.vertx.VertxBaseTestSupport; +import org.junit.Test; + +public class VertxSendToTest extends VertxBaseTestSupport { + + private Vertx vertx; + + @Override + protected CamelContext createCamelContext() throws Exception { + CamelContext context = super.createCamelContext(); + + vertx = Vertx.vertx(); + vertx.eventBus().registerCodec(new VertxExchangeCodec()); + + VertxProcessorFactory pf = new VertxProcessorFactory(vertx); + context.setProcessorFactory(pf); + + VertxCamelProducer vcp = new VertxCamelProducer(context, vertx, VertxCamelProducer.class.getName()); + context.addService(vcp); + + return context; + } + + @Test + public void testVertxSendTo() throws Exception { + getMockEndpoint("mock:foo").expectedMessageCount(1); + getMockEndpoint("mock:bar").expectedMessageCount(1); + + template.sendBody("direct:start", "Hello World"); + + assertMockEndpointsSatisfied(); + } + + @Override + protected RoutesBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:start") + .to("mock:foo") + .to("mock:bar"); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/48219df4/components/camel-vertx/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/components/camel-vertx/src/test/resources/log4j.properties b/components/camel-vertx/src/test/resources/log4j.properties index e169468..57a694f 100644 --- a/components/camel-vertx/src/test/resources/log4j.properties +++ b/components/camel-vertx/src/test/resources/log4j.properties @@ -18,7 +18,7 @@ # # The logging properties used for testing # -log4j.rootLogger=INFO, file +log4j.rootLogger=INFO, out log4j.logger.org.apache.camel.component.vertx=DEBUG #log4j.logger.org.apache.camel=DEBUG