Repository: camel
Updated Branches:
  refs/heads/eventbus [created] 9aafe35f3


Camel experiment with using vertx as eventbus for routing engine.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/48219df4
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/48219df4
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/48219df4

Branch: refs/heads/eventbus
Commit: 48219df43b5c81181d687456083ede593c4a6cec
Parents: dcdbf48
Author: Claus Ibsen <davscl...@apache.org>
Authored: Fri Apr 8 10:00:13 2016 +0200
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Fri Apr 8 10:00:13 2016 +0200

----------------------------------------------------------------------
 .../vertx/eventbus/VertxCamelProducer.java      | 65 +++++++++++++++++++
 .../vertx/eventbus/VertxExchangeCodec.java      | 51 +++++++++++++++
 .../vertx/eventbus/VertxProcessorFactory.java   | 50 +++++++++++++++
 .../vertx/eventbus/VertxSendToProcessor.java    | 45 +++++++++++++
 .../vertx/eventbus/VertxSendToTest.java         | 67 ++++++++++++++++++++
 .../src/test/resources/log4j.properties         |  2 +-
 6 files changed, 279 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/48219df4/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxCamelProducer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxCamelProducer.java
 
b/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxCamelProducer.java
new file mode 100644
index 0000000..923ca38
--- /dev/null
+++ 
b/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxCamelProducer.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.vertx.eventbus;
+
+import io.vertx.core.Handler;
+import io.vertx.core.Vertx;
+import io.vertx.core.eventbus.Message;
+import io.vertx.core.eventbus.MessageConsumer;
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.support.ServiceSupport;
+import org.apache.camel.util.ServiceHelper;
+
+public class VertxCamelProducer extends ServiceSupport implements 
Handler<Message<Exchange>> {
+
+    private final CamelContext camelContext;
+    private final ProducerTemplate template;
+    private final Vertx vertx;
+    private final String id;
+    private MessageConsumer<Exchange> consumer;
+
+    public VertxCamelProducer(CamelContext camelContext, Vertx vertx, String 
id) {
+        this.camelContext = camelContext;
+        this.template = camelContext.createProducerTemplate();
+        this.vertx = vertx;
+        this.id = id;
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        consumer = vertx.eventBus().localConsumer(id, this);
+        ServiceHelper.startService(template);
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        if (consumer != null) {
+            consumer.unregister();
+        }
+        ServiceHelper.stopService(template);
+    }
+
+    @Override
+    public void handle(Message<Exchange> event) {
+        Exchange exchange = event.body();
+        String url = (String) exchange.removeProperty("CamelVertxUrl");
+        // TODO: execute blocking
+        template.send(url, exchange);
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/48219df4/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxExchangeCodec.java
----------------------------------------------------------------------
diff --git 
a/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxExchangeCodec.java
 
b/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxExchangeCodec.java
new file mode 100644
index 0000000..ac7a33f
--- /dev/null
+++ 
b/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxExchangeCodec.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.vertx.eventbus;
+
+import io.vertx.core.buffer.Buffer;
+import io.vertx.core.eventbus.MessageCodec;
+import org.apache.camel.Exchange;
+
+public class VertxExchangeCodec implements MessageCodec<Exchange, Exchange> {
+
+    @Override
+    public void encodeToWire(Buffer buffer, Exchange exchange) {
+        // noop
+        System.out.println("xxx");
+    }
+
+    @Override
+    public Exchange decodeFromWire(int pos, Buffer buffer) {
+        System.out.println("yyy");
+        return null;
+    }
+
+    @Override
+    public Exchange transform(Exchange exchange) {
+        return exchange;
+    }
+
+    @Override
+    public String name() {
+        return "camel";
+    }
+
+    @Override
+    public byte systemCodecID() {
+        return -1;
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/48219df4/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxProcessorFactory.java
----------------------------------------------------------------------
diff --git 
a/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxProcessorFactory.java
 
b/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxProcessorFactory.java
new file mode 100644
index 0000000..d335725
--- /dev/null
+++ 
b/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxProcessorFactory.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.vertx.eventbus;
+
+import io.vertx.core.Vertx;
+import org.apache.camel.Processor;
+import org.apache.camel.model.ProcessorDefinition;
+import org.apache.camel.model.ToDefinition;
+import org.apache.camel.spi.ProcessorFactory;
+import org.apache.camel.spi.RouteContext;
+
+public class VertxProcessorFactory implements ProcessorFactory {
+
+    private final Vertx vertx;
+
+    public VertxProcessorFactory(Vertx vertx) {
+        this.vertx = vertx;
+    }
+
+    @Override
+    public Processor createChildProcessor(RouteContext routeContext, 
ProcessorDefinition<?> def, boolean mandatory) throws Exception {
+        return null;
+    }
+
+    @Override
+    public Processor createProcessor(RouteContext routeContext, 
ProcessorDefinition<?> def) throws Exception {
+        String id = 
def.idOrCreate(routeContext.getCamelContext().getNodeIdFactory());
+
+        if (def instanceof ToDefinition) {
+            String uri = ((ToDefinition) def).getEndpointUri();
+            return new VertxSendToProcessor(vertx, id, uri);
+        }
+
+        throw new UnsupportedOperationException("EIP not supported yet");
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/48219df4/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxSendToProcessor.java
----------------------------------------------------------------------
diff --git 
a/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxSendToProcessor.java
 
b/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxSendToProcessor.java
new file mode 100644
index 0000000..139a289
--- /dev/null
+++ 
b/components/camel-vertx/src/main/java/org/apache/camel/component/vertx/eventbus/VertxSendToProcessor.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.vertx.eventbus;
+
+import io.vertx.core.Vertx;
+import io.vertx.core.eventbus.DeliveryOptions;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+
+public class VertxSendToProcessor implements Processor {
+
+    private final Vertx vertx;
+    private final String id;
+    private final String uri;
+    private final DeliveryOptions options;
+
+    public VertxSendToProcessor(Vertx vertx, String id, String uri) {
+        this.vertx = vertx;
+        this.id = id;
+        this.uri = uri;
+        this.options = new DeliveryOptions();
+        this.options.setCodecName("camel");
+    }
+
+    @Override
+    public void process(Exchange exchange) throws Exception {
+        // if OUT then use reply handler to update exchange with result
+        exchange.setProperty("CamelVertxUrl", uri);
+        vertx.eventBus().send(VertxCamelProducer.class.getName(), exchange, 
options);
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/48219df4/components/camel-vertx/src/test/java/org/apache/camel/component/vertx/eventbus/VertxSendToTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-vertx/src/test/java/org/apache/camel/component/vertx/eventbus/VertxSendToTest.java
 
b/components/camel-vertx/src/test/java/org/apache/camel/component/vertx/eventbus/VertxSendToTest.java
new file mode 100644
index 0000000..b6c7ded
--- /dev/null
+++ 
b/components/camel-vertx/src/test/java/org/apache/camel/component/vertx/eventbus/VertxSendToTest.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.vertx.eventbus;
+
+import io.vertx.core.Vertx;
+import org.apache.camel.CamelContext;
+import org.apache.camel.RoutesBuilder;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.vertx.VertxBaseTestSupport;
+import org.junit.Test;
+
+public class VertxSendToTest extends VertxBaseTestSupport {
+
+    private Vertx vertx;
+
+    @Override
+    protected CamelContext createCamelContext() throws Exception {
+        CamelContext context = super.createCamelContext();
+
+        vertx = Vertx.vertx();
+        vertx.eventBus().registerCodec(new VertxExchangeCodec());
+
+        VertxProcessorFactory pf = new VertxProcessorFactory(vertx);
+        context.setProcessorFactory(pf);
+
+        VertxCamelProducer vcp = new VertxCamelProducer(context, vertx, 
VertxCamelProducer.class.getName());
+        context.addService(vcp);
+
+        return context;
+    }
+
+    @Test
+    public void testVertxSendTo() throws Exception {
+        getMockEndpoint("mock:foo").expectedMessageCount(1);
+        getMockEndpoint("mock:bar").expectedMessageCount(1);
+
+        template.sendBody("direct:start", "Hello World");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RoutesBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                    .to("mock:foo")
+                    .to("mock:bar");
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/48219df4/components/camel-vertx/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/components/camel-vertx/src/test/resources/log4j.properties 
b/components/camel-vertx/src/test/resources/log4j.properties
index e169468..57a694f 100644
--- a/components/camel-vertx/src/test/resources/log4j.properties
+++ b/components/camel-vertx/src/test/resources/log4j.properties
@@ -18,7 +18,7 @@
 #
 # The logging properties used for testing
 #
-log4j.rootLogger=INFO, file
+log4j.rootLogger=INFO, out
 
 log4j.logger.org.apache.camel.component.vertx=DEBUG
 #log4j.logger.org.apache.camel=DEBUG

Reply via email to