CAMEL-10054: Create camel-atomix component

Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/08d2b0fe
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/08d2b0fe
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/08d2b0fe

Branch: refs/heads/master
Commit: 08d2b0fe8a91eab8e22e064299eaa74783279f2f
Parents: b3681ba
Author: lburgazzoli <lburgazz...@gmail.com>
Authored: Wed Jun 14 18:02:53 2017 +0200
Committer: lburgazzoli <lburgazz...@gmail.com>
Committed: Fri Jun 16 17:37:55 2017 +0200

----------------------------------------------------------------------
 .../apache/camel/FluentProducerTemplate.java    |   5 +
 .../builder/DefaultFluentProducerTemplate.java  |   8 +
 .../org/apache/camel/util/ExchangeHelper.java   |  18 +
 .../camel/util/function/ThrowingBiFunction.java |  22 +
 components/camel-atomix/pom.xml                 |  21 +-
 .../src/main/docs/atomix-client-component.adoc  |   6 -
 .../src/main/docs/atomix-map-component.adoc     |  45 +++
 .../atomix/AtomixAsyncMessageProcessor.java     |  31 ++
 .../component/atomix/AtomixConfiguration.java   |  36 +-
 .../camel/component/atomix/AtomixHelper.java    |  29 ++
 .../AbstractAsyncAtomixClientProducer.java      | 140 +++++++
 .../client/AbstractAtomixClientComponent.java   |  93 +++++
 .../client/AbstractAtomixClientConsumer.java    |  31 ++
 .../client/AbstractAtomixClientEndpoint.java    |  86 ++++
 .../client/AbstractAtomixClientProducer.java    |  30 ++
 .../atomix/client/AtomixClientAction.java       |  32 ++
 .../atomix/client/AtomixClientComponent.java    |  37 --
 .../client/AtomixClientConfiguration.java       |   7 +-
 .../atomix/client/AtomixClientConstants.java    |  34 ++
 .../atomix/client/AtomixClientHelper.java       |  61 +++
 .../client/map/AtomixClientMapComponent.java    |  41 ++
 .../map/AtomixClientMapConfiguration.java       | 109 +++++
 .../client/map/AtomixClientMapConsumer.java     |  85 ++++
 .../client/map/AtomixClientMapEndpoint.java     |  53 +++
 .../client/map/AtomixClientMapProducer.java     | 364 +++++++++++++++++
 .../cluster/AtomixClusterConfiguration.java     |  60 +--
 .../atomix/cluster/AtomixClusterHelper.java     |  59 +--
 .../atomix/ha/AtomixClusterService.java         |  30 +-
 .../component/atomix/ha/AtomixClusterView.java  |  35 +-
 .../org/apache/camel/component/atomix-client    |  18 -
 .../org/apache/camel/component/atomix-map       |  18 +
 .../atomix/client/AtomixClientTestSupport.java  |  72 ++++
 .../client/map/AtomixClientMapConsumerTest.java | 115 ++++++
 .../client/map/AtomixClientMapProducerTest.java | 398 +++++++++++++++++++
 parent/pom.xml                                  |   1 +
 ...omixClientMapComponentAutoConfiguration.java | 129 ++++++
 .../AtomixClientMapComponentConfiguration.java  | 103 +++++
 .../main/resources/META-INF/spring.factories    |  19 +
 38 files changed, 2319 insertions(+), 162 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/08d2b0fe/camel-core/src/main/java/org/apache/camel/FluentProducerTemplate.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/FluentProducerTemplate.java 
b/camel-core/src/main/java/org/apache/camel/FluentProducerTemplate.java
index 688ce19..3a37890 100644
--- a/camel-core/src/main/java/org/apache/camel/FluentProducerTemplate.java
+++ b/camel-core/src/main/java/org/apache/camel/FluentProducerTemplate.java
@@ -137,6 +137,11 @@ public interface FluentProducerTemplate extends Service {
     // -----------------------------------------------------------------------
 
     /**
+     * Remove the body and headers.
+     */
+    FluentProducerTemplate clearAll();
+
+    /**
      * Set the header
      *
      * @param key the key of the header

http://git-wip-us.apache.org/repos/asf/camel/blob/08d2b0fe/camel-core/src/main/java/org/apache/camel/builder/DefaultFluentProducerTemplate.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/builder/DefaultFluentProducerTemplate.java
 
b/camel-core/src/main/java/org/apache/camel/builder/DefaultFluentProducerTemplate.java
index 965afef..4c6034b 100644
--- 
a/camel-core/src/main/java/org/apache/camel/builder/DefaultFluentProducerTemplate.java
+++ 
b/camel-core/src/main/java/org/apache/camel/builder/DefaultFluentProducerTemplate.java
@@ -124,6 +124,14 @@ public class DefaultFluentProducerTemplate extends 
ServiceSupport implements Flu
     }
 
     @Override
+    public FluentProducerTemplate clearAll() {
+       clearBody();
+       clearHeaders();
+
+       return this;
+    }
+
+    @Override
     public FluentProducerTemplate withHeader(String key, Object value) {
         if (headers == null) {
             headers = new HashMap<>();

http://git-wip-us.apache.org/repos/asf/camel/blob/08d2b0fe/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java 
b/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java
index 06fe7bf..11b0dac 100644
--- a/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java
+++ b/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java
@@ -129,6 +129,24 @@ public final class ExchangeHelper {
     }
 
     /**
+     * Gets the mandatory inbound header of the correct type
+     *
+     * @param message       the message
+     * @param headerName    the header name
+     * @param type          the type
+     * @return the header value
+     * @throws TypeConversionException is thrown if error during type 
conversion
+     * @throws NoSuchHeaderException is thrown if no headers exists
+     */
+    public static <T> T getMandatoryHeader(Message message, String headerName, 
Class<T> type) throws TypeConversionException, NoSuchHeaderException {
+        T answer = message.getHeader(headerName, type);
+        if (answer == null) {
+            throw new NoSuchHeaderException(message.getExchange(), headerName, 
type);
+        }
+        return answer;
+    }
+
+    /**
      * Gets an header or property of the correct type
      *
      * @param exchange      the exchange

http://git-wip-us.apache.org/repos/asf/camel/blob/08d2b0fe/camel-core/src/main/java/org/apache/camel/util/function/ThrowingBiFunction.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/util/function/ThrowingBiFunction.java
 
b/camel-core/src/main/java/org/apache/camel/util/function/ThrowingBiFunction.java
new file mode 100644
index 0000000..6b8cedc
--- /dev/null
+++ 
b/camel-core/src/main/java/org/apache/camel/util/function/ThrowingBiFunction.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.util.function;
+
+@FunctionalInterface
+public interface ThrowingBiFunction<I1, I2, R, T extends Throwable> {
+    R apply(I1 in1, I2 in2) throws T;
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/08d2b0fe/components/camel-atomix/pom.xml
----------------------------------------------------------------------
diff --git a/components/camel-atomix/pom.xml b/components/camel-atomix/pom.xml
index ca4ad29..0c4a8db 100644
--- a/components/camel-atomix/pom.xml
+++ b/components/camel-atomix/pom.xml
@@ -38,9 +38,10 @@
       org.apache.camel.component.atomix.*,
       org.apache.camel.component.atomix.client.*,
       org.apache.camel.component.atomix.cluster.*
+      org.apache.camel.component.atomix.ha.*
     </camel.osgi.export.pkg>
     <camel.osgi.export.service>
-      org.apache.camel.spi.ComponentResolver;component=atomix-client,
+      org.apache.camel.spi.ComponentResolver;component=atomix-map,
       org.apache.camel.spi.ComponentResolver;component=atomix-cluster,
     </camel.osgi.export.service>
   </properties>
@@ -56,9 +57,19 @@
     <!-- atomix -->
     <dependency>
       <groupId>io.atomix</groupId>
-      <artifactId>atomix-all</artifactId>
+      <artifactId>atomix</artifactId>
       <version>${atomix-version}</version>
     </dependency>
+    <dependency>
+      <groupId>io.atomix</groupId>
+      <artifactId>atomix-groups</artifactId>
+      <version>${atomix-version}</version>
+    </dependency>
+    <dependency>
+      <groupId>io.atomix.catalyst</groupId>
+      <artifactId>catalyst-netty</artifactId>
+      <version>${atomix-catalyst-version}</version>
+    </dependency>
 
     <!-- logging -->       
     <dependency>
@@ -88,6 +99,12 @@
       <artifactId>camel-test-spring</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.hamcrest</groupId>
+      <artifactId>java-hamcrest</artifactId>
+      <version>${hamcrest-version}</version>
+      <scope>test</scope>
+    </dependency>
 
   </dependencies>
 

http://git-wip-us.apache.org/repos/asf/camel/blob/08d2b0fe/components/camel-atomix/src/main/docs/atomix-client-component.adoc
----------------------------------------------------------------------
diff --git a/components/camel-atomix/src/main/docs/atomix-client-component.adoc 
b/components/camel-atomix/src/main/docs/atomix-client-component.adoc
deleted file mode 100644
index 20ece42..0000000
--- a/components/camel-atomix/src/main/docs/atomix-client-component.adoc
+++ /dev/null
@@ -1,6 +0,0 @@
-## Atomix Client Component
-
-*Available as of Camel version 2.20*
-
-The camel atomix component allows you to work with Atomix, a fault-tolerant 
distributed coordination framework.
-

http://git-wip-us.apache.org/repos/asf/camel/blob/08d2b0fe/components/camel-atomix/src/main/docs/atomix-map-component.adoc
----------------------------------------------------------------------
diff --git a/components/camel-atomix/src/main/docs/atomix-map-component.adoc 
b/components/camel-atomix/src/main/docs/atomix-map-component.adoc
new file mode 100644
index 0000000..653057e
--- /dev/null
+++ b/components/camel-atomix/src/main/docs/atomix-map-component.adoc
@@ -0,0 +1,45 @@
+## atomix-map Component
+
+*Available as of Camel version 2.20*
+
+The camel atomix component allows you to work with Atomix, a fault-tolerant 
distributed coordination framework.
+
+// component options: START
+The atomix-map component supports 5 options which are listed below.
+
+
+
+[width="100%",cols="2,5,^1,2",options="header"]
+|=======================================================================
+| Name | Description | Default | Type
+| **configuration** (advanced) | The shared component configuration |  | C
+| **atomix** (producer) | The shared AtomixClient instance |  | AtomixClient
+| **nodes** (producer) | The nodes the AtomixClient should connect to |  | List
+| **configurationUri** (producer) | The path to the AtomixClient configuration 
|  | String
+| **resolveProperty Placeholders** (advanced) | Whether the component should 
resolve property placeholders on itself when starting. Only properties which 
are of String type can use property placeholders. | true | boolean
+|=======================================================================
+// component options: END
+
+// endpoint options: START
+The atomix-map endpoint is configured using URI syntax:
+
+    atomix-map:mapName
+
+with the following path and query parameters:
+
+#### Path Parameters (1 parameters):
+
+[width="100%",cols="2,5,^1,2",options="header"]
+|=======================================================================
+| Name | Description | Default | Type
+| **mapName** | *Required* The distributed map name |  | String
+|=======================================================================
+
+#### Query Parameters (1 parameters):
+
+[width="100%",cols="2,5,^1,2",options="header"]
+|=======================================================================
+| Name | Description | Default | Type
+| **synchronous** (advanced) | Sets whether synchronous processing should be 
strictly used or Camel is allowed to use asynchronous processing (if 
supported). | false | boolean
+|=======================================================================
+// endpoint options: END

http://git-wip-us.apache.org/repos/asf/camel/blob/08d2b0fe/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/AtomixAsyncMessageProcessor.java
----------------------------------------------------------------------
diff --git 
a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/AtomixAsyncMessageProcessor.java
 
b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/AtomixAsyncMessageProcessor.java
new file mode 100644
index 0000000..bfa5e95
--- /dev/null
+++ 
b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/AtomixAsyncMessageProcessor.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.atomix;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Message;
+
+@FunctionalInterface
+public interface AtomixAsyncMessageProcessor {
+    /**
+     * @param message
+     * @param callback
+     * @return
+     * @throws Exception
+     */
+    boolean process(Message message, AsyncCallback callback) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/08d2b0fe/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/AtomixConfiguration.java
----------------------------------------------------------------------
diff --git 
a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/AtomixConfiguration.java
 
b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/AtomixConfiguration.java
index d0e3203..1ffc06d 100644
--- 
a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/AtomixConfiguration.java
+++ 
b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/AtomixConfiguration.java
@@ -21,13 +21,18 @@ import java.util.List;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import io.atomix.Atomix;
 import io.atomix.catalyst.transport.Address;
 import io.atomix.catalyst.transport.Transport;
 import io.atomix.catalyst.transport.netty.NettyTransport;
+import io.atomix.resource.ReadConsistency;
 import org.apache.camel.spi.UriParam;
 import org.apache.camel.util.ObjectHelper;
 
-public class AtomixConfiguration {
+public class AtomixConfiguration<T extends Atomix> implements Cloneable {
+    @UriParam
+    private T atomix;
+
     @UriParam(javaType = "java.lang.String")
     private List<Address> nodes = Collections.emptyList();
 
@@ -37,9 +42,27 @@ public class AtomixConfiguration {
     @UriParam
     private String configurationUri;
 
+    @UriParam(label = "advanced")
+    private ReadConsistency readConsistency;
+
     protected AtomixConfiguration() {
     }
 
+    // *****************************************
+    // Properties
+    // *****************************************
+
+    public T getAtomix() {
+        return atomix;
+    }
+
+    /**
+     * The Atomix instance to use
+     */
+    public void setAtomix(T client) {
+        this.atomix = client;
+    }
+
     public List<Address> getNodes() {
         return nodes;
     }
@@ -78,4 +101,15 @@ public class AtomixConfiguration {
     public void setConfigurationUri(String configurationUri) {
         this.configurationUri = configurationUri;
     }
+
+    public ReadConsistency getReadConsistency() {
+        return readConsistency;
+    }
+
+    /**
+     * The read consistency level.
+     */
+    public void setReadConsistency(ReadConsistency readConsistency) {
+        this.readConsistency = readConsistency;
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/08d2b0fe/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/AtomixHelper.java
----------------------------------------------------------------------
diff --git 
a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/AtomixHelper.java
 
b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/AtomixHelper.java
new file mode 100644
index 0000000..959c894
--- /dev/null
+++ 
b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/AtomixHelper.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.atomix;
+
+import io.atomix.copycat.server.storage.Storage;
+import io.atomix.copycat.server.storage.StorageLevel;
+
+public final class AtomixHelper {
+    private AtomixHelper() {
+    }
+
+    public static Storage inMemoryStorage() {
+        return Storage.builder().withStorageLevel(StorageLevel.MEMORY).build();
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/08d2b0fe/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/AbstractAsyncAtomixClientProducer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/AbstractAsyncAtomixClientProducer.java
 
b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/AbstractAsyncAtomixClientProducer.java
new file mode 100644
index 0000000..931e9e4
--- /dev/null
+++ 
b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/AbstractAsyncAtomixClientProducer.java
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.atomix.client;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Repeatable;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+import java.lang.reflect.Method;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.component.atomix.AtomixAsyncMessageProcessor;
+import org.apache.camel.impl.DefaultProducer;
+import org.apache.camel.util.AsyncProcessorHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class AbstractAsyncAtomixClientProducer<E extends 
AbstractAtomixClientEndpoint> extends DefaultProducer implements AsyncProcessor 
{
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(AbstractAsyncAtomixClientProducer.class);
+    private final Map<AtomixClientAction, AtomixAsyncMessageProcessor> 
processors;
+
+    protected AbstractAsyncAtomixClientProducer(E endpoint) {
+        super(endpoint);
+
+        this.processors = new HashMap<>();
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        for (final Method method : getClass().getDeclaredMethods()) {
+            AsyncInvokeOnHeaders annotations = 
method.getAnnotation(AsyncInvokeOnHeaders.class);
+            if (annotations != null) {
+                for (AsyncInvokeOnHeader annotation : annotations.value()) {
+                    bind(annotation, method);
+                }
+            } else {
+                AsyncInvokeOnHeader annotation = 
method.getAnnotation(AsyncInvokeOnHeader.class);
+                if (annotation != null) {
+                    bind(annotation, method);
+                }
+            }
+        }
+
+        super.doStart();
+    }
+
+    @Override
+    public void process(Exchange exchange) throws Exception {
+        AsyncProcessorHelper.process(this, exchange);
+    }
+
+    @Override
+    public boolean process(Exchange exchange, AsyncCallback callback) {
+        final Message message = exchange.getIn();
+        final AtomixClientAction action = getAction(message);
+
+        AtomixAsyncMessageProcessor processor = this.processors.get(action);
+        if (processor != null) {
+            try {
+                return processor.process(message, callback);
+            } catch (Exception e) {
+                throw new RuntimeCamelException(e);
+            }
+        } else {
+            throw new RuntimeCamelException("No handler for action " + action);
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    protected E getAtomixEndpoint() {
+        return (E)super.getEndpoint();
+    }
+
+    protected abstract AtomixClientAction getAction(Message message);
+
+    // ************************************
+    // Binding helpers
+    // ************************************
+
+    private void bind(AsyncInvokeOnHeader annotation, final Method method) {
+        if (method.getParameterCount() == 2) {
+            method.setAccessible(true);
+
+            if 
(!Message.class.isAssignableFrom(method.getParameterTypes()[0])) {
+                throw new IllegalArgumentException("First argument should be 
of type Message");
+            }
+            if 
(!AsyncCallback.class.isAssignableFrom(method.getParameterTypes()[1])) {
+                throw new IllegalArgumentException("Second argument should be 
of type AsyncCallback");
+            }
+
+            LOGGER.debug("bind key={}, class={}, method={}",
+                annotation.value(), this.getClass(), method.getName());
+
+            this.processors.put(annotation.value(), (m, c) -> 
(boolean)method.invoke(this, m, c));
+        } else {
+            throw new IllegalArgumentException(
+                "Illegal number of parameters for method: " + method.getName() 
+ ", required: 2, found: " + method.getParameterCount()
+            );
+        }
+    }
+
+    // ************************************
+    // Annotations
+    // ************************************
+
+    @Repeatable(AsyncInvokeOnHeaders.class)
+    @Retention(RetentionPolicy.RUNTIME)
+    @Target(ElementType.METHOD)
+    public @interface AsyncInvokeOnHeader {
+        AtomixClientAction value();
+    }
+
+    @Retention(RetentionPolicy.RUNTIME)
+    @Target(ElementType.METHOD)
+    public @interface AsyncInvokeOnHeaders {
+        AsyncInvokeOnHeader[] value();
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/camel/blob/08d2b0fe/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/AbstractAtomixClientComponent.java
----------------------------------------------------------------------
diff --git 
a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/AbstractAtomixClientComponent.java
 
b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/AbstractAtomixClientComponent.java
new file mode 100644
index 0000000..66fd91b
--- /dev/null
+++ 
b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/AbstractAtomixClientComponent.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.atomix.client;
+
+import java.util.List;
+
+import io.atomix.AtomixClient;
+import io.atomix.catalyst.transport.Address;
+import org.apache.camel.CamelContext;
+import org.apache.camel.impl.DefaultComponent;
+import org.apache.camel.spi.Metadata;
+
+public abstract class AbstractAtomixClientComponent<C extends 
AtomixClientConfiguration> extends DefaultComponent {
+    @Metadata(label = "advanced")
+    private C configuration;
+
+    protected AbstractAtomixClientComponent(C configuration) {
+        this.configuration = configuration;
+    }
+
+    protected AbstractAtomixClientComponent(CamelContext camelContext, C 
configuration) {
+        super(camelContext);
+
+        this.configuration = configuration;
+    }
+
+    // *****************************************
+    // Properties
+    // *****************************************
+
+    public C getConfiguration() {
+        return configuration;
+    }
+
+    /**
+     * The shared component configuration
+     */
+    public void setConfiguration(C configuration) {
+        this.configuration = configuration;
+    }
+
+    public AtomixClient getAtomix() {
+        return configuration.getAtomix();
+    }
+
+    /**
+     * The shared AtomixClient instance
+     */
+    public void setAtomix(AtomixClient client) {
+        configuration.setAtomix(client);
+    }
+
+    public List<Address> getNodes() {
+        return configuration.getNodes();
+    }
+
+    /**
+     * The nodes the AtomixClient should connect to
+     */
+    public void setNodes(List<Address> nodes) {
+        configuration.setNodes(nodes);
+    }
+
+    public void setNodes(String nodes) {
+        configuration.setNodes(nodes);
+    }
+
+    public String getConfigurationUri() {
+        return configuration.getConfigurationUri();
+    }
+
+
+    /**
+     * The path to the AtomixClient configuration
+     */
+    public void setConfigurationUri(String configurationUri) {
+        configuration.setConfigurationUri(configurationUri);
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/08d2b0fe/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/AbstractAtomixClientConsumer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/AbstractAtomixClientConsumer.java
 
b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/AbstractAtomixClientConsumer.java
new file mode 100644
index 0000000..8f800e9
--- /dev/null
+++ 
b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/AbstractAtomixClientConsumer.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.atomix.client;
+
+import org.apache.camel.Processor;
+import org.apache.camel.impl.DefaultConsumer;
+
+public abstract class AbstractAtomixClientConsumer<E extends 
AbstractAtomixClientEndpoint> extends DefaultConsumer {
+    protected AbstractAtomixClientConsumer(E endpoint, Processor processor) {
+        super(endpoint, processor);
+    }
+
+    @SuppressWarnings("unchecked")
+    protected E getAtomixEndpoint() {
+        return (E)super.getEndpoint();
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/08d2b0fe/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/AbstractAtomixClientEndpoint.java
----------------------------------------------------------------------
diff --git 
a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/AbstractAtomixClientEndpoint.java
 
b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/AbstractAtomixClientEndpoint.java
new file mode 100644
index 0000000..5a83873
--- /dev/null
+++ 
b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/AbstractAtomixClientEndpoint.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.atomix.client;
+
+import io.atomix.AtomixClient;
+import org.apache.camel.Consumer;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.impl.DefaultEndpoint;
+
+
+public abstract class AbstractAtomixClientEndpoint<T extends 
AbstractAtomixClientComponent, C extends AtomixClientConfiguration> extends 
DefaultEndpoint {
+    private final C configuration;
+    private AtomixClient atomix;
+
+    protected AbstractAtomixClientEndpoint(String uri, T component, C 
configuration) {
+        super(uri, component);
+
+        this.configuration = configuration;
+    }
+
+    @Override
+    public boolean isSingleton() {
+        return true;
+    }
+
+    @Override
+    public Producer createProducer() throws Exception {
+        throw new UnsupportedOperationException("Producer not supported");
+    }
+
+    @Override
+    public Consumer createConsumer(Processor processor) throws Exception {
+        throw new UnsupportedOperationException("Consumer not supported");
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        if (atomix == null) {
+            atomix = AtomixClientHelper.createClient(getCamelContext(), 
configuration);
+            atomix.connect(configuration.getNodes()).join();
+        }
+
+        super.doStart();
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        super.doStop();
+
+        if (atomix != null) {
+            atomix.close();
+        }
+    }
+
+    // **********************************
+    // Helpers for implementations
+    // *********************************
+
+    @SuppressWarnings("unchecked")
+    public T getAtomixComponent() {
+        return (T)super.getComponent();
+    }
+
+    public C getAtomixConfiguration() {
+        return this.configuration;
+    }
+
+    public AtomixClient getAtomix() {
+        return atomix;
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/08d2b0fe/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/AbstractAtomixClientProducer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/AbstractAtomixClientProducer.java
 
b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/AbstractAtomixClientProducer.java
new file mode 100644
index 0000000..16a7866
--- /dev/null
+++ 
b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/AbstractAtomixClientProducer.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.atomix.client;
+
+import org.apache.camel.impl.HeaderSelectorProducer;
+
+public abstract class AbstractAtomixClientProducer<E extends 
AbstractAtomixClientEndpoint> extends HeaderSelectorProducer {
+    protected AbstractAtomixClientProducer(E endpoint, String header) {
+        super(endpoint, header);
+    }
+
+    @SuppressWarnings("unchecked")
+    protected E getAtomixEndpoint() {
+        return (E)super.getEndpoint();
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/08d2b0fe/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/AtomixClientAction.java
----------------------------------------------------------------------
diff --git 
a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/AtomixClientAction.java
 
b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/AtomixClientAction.java
new file mode 100644
index 0000000..5bf641f
--- /dev/null
+++ 
b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/AtomixClientAction.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.atomix.client;
+
+public enum AtomixClientAction {
+    PUT,
+    PUT_IF_ABSENT,
+    GET,
+    CLEAR,
+    SIZE,
+    CONTAINS_KEY,
+    CONTAINS_VALUE,
+    IS_EMPTY,
+    ENTRY_SET,
+    REMOVE,
+    REPLACE,
+    VALUES
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/08d2b0fe/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/AtomixClientComponent.java
----------------------------------------------------------------------
diff --git 
a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/AtomixClientComponent.java
 
b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/AtomixClientComponent.java
deleted file mode 100644
index 775315b..0000000
--- 
a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/AtomixClientComponent.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.atomix.client;
-
-import java.util.Map;
-
-import org.apache.camel.CamelContext;
-import org.apache.camel.Endpoint;
-import org.apache.camel.impl.DefaultComponent;
-
-public class AtomixClientComponent extends DefaultComponent {
-    public AtomixClientComponent() {
-    }
-
-    public AtomixClientComponent(CamelContext camelContext) {
-        super(camelContext);
-    }
-
-    @Override
-    protected Endpoint createEndpoint(String uri, String remaining, 
Map<String, Object> parameters) throws Exception {
-        throw new UnsupportedOperationException("Not yet implemented");
-    }
-}

http://git-wip-us.apache.org/repos/asf/camel/blob/08d2b0fe/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/AtomixClientConfiguration.java
----------------------------------------------------------------------
diff --git 
a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/AtomixClientConfiguration.java
 
b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/AtomixClientConfiguration.java
index d3134c4..0b8a0d8 100644
--- 
a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/AtomixClientConfiguration.java
+++ 
b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/AtomixClientConfiguration.java
@@ -16,11 +16,8 @@
  */
 package org.apache.camel.component.atomix.client;
 
+import io.atomix.AtomixClient;
 import org.apache.camel.component.atomix.AtomixConfiguration;
-import org.apache.camel.spi.UriParams;
 
-@UriParams
-public class AtomixClientConfiguration extends AtomixConfiguration {
-    public AtomixClientConfiguration() {
-    }
+public class AtomixClientConfiguration extends 
AtomixConfiguration<AtomixClient> implements Cloneable {
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/08d2b0fe/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/AtomixClientConstants.java
----------------------------------------------------------------------
diff --git 
a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/AtomixClientConstants.java
 
b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/AtomixClientConstants.java
new file mode 100644
index 0000000..2fccd8e
--- /dev/null
+++ 
b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/AtomixClientConstants.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.atomix.client;
+
+public final class AtomixClientConstants {
+    public static final String RESOURCE_NAME = "CamelAtomixResourceName";
+    public static final String RESOURCE_ACTION = "CamelAtomixResourceAction";
+    public static final String RESOURCE_KEY = "CamelAtomixResourceKey";
+    public static final String RESOURCE_VALUE = "CamelAtomixResourceValue";
+    public static final String RESOURCE_DEFAULT_VALUE = 
"CamelAtomixResourceDefaultValue";
+    public static final String RESOURCE_OLD_VALUE = 
"CamelAtomixResourceOldValue";
+    public static final String RESOURCE_ACTION_HAS_RESULT = 
"CamelAtomixResourceActionHasResult";
+    public static final String RESOURCE_TTL = "CamelAtomixResourceTTL";
+    public static final String RESOURCE_READ_CONSISTENCY = 
"CamelAtomixResourceReadConsistency";
+    public static final String EVENT_TYPE = "CamelAtomixEventType";
+
+    private AtomixClientConstants() {
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/08d2b0fe/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/AtomixClientHelper.java
----------------------------------------------------------------------
diff --git 
a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/AtomixClientHelper.java
 
b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/AtomixClientHelper.java
new file mode 100644
index 0000000..bbe9358
--- /dev/null
+++ 
b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/AtomixClientHelper.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.atomix.client;
+
+import java.io.InputStream;
+import java.util.Properties;
+
+import io.atomix.AtomixClient;
+import org.apache.camel.CamelContext;
+import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.ResourceHelper;
+
+public final class AtomixClientHelper {
+    private AtomixClientHelper() {
+    }
+
+    public static AtomixClient createClient(CamelContext camelContext, 
AtomixClientConfiguration configuration) throws Exception {
+        AtomixClient atomix = configuration.getAtomix();
+
+        if (atomix == null) {
+            final AtomixClient.Builder atomixBuilder;
+
+            String uri = configuration.getConfigurationUri();
+            if (ObjectHelper.isNotEmpty(uri)) {
+                uri = camelContext.resolvePropertyPlaceholders(uri);
+                try (InputStream is = 
ResourceHelper.resolveMandatoryResourceAsInputStream(camelContext, uri)) {
+                    Properties properties = new Properties();
+                    properties.load(is);
+
+                    atomixBuilder = AtomixClient.builder(properties);
+                }
+            } else {
+                atomixBuilder = AtomixClient.builder();
+            }
+
+            if (configuration.getTransport() != null) {
+                atomixBuilder.withTransport(
+                    
camelContext.getInjector().newInstance(configuration.getTransport())
+                );
+            }
+
+            atomix = atomixBuilder.build();
+        }
+
+        return atomix;
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/08d2b0fe/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/map/AtomixClientMapComponent.java
----------------------------------------------------------------------
diff --git 
a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/map/AtomixClientMapComponent.java
 
b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/map/AtomixClientMapComponent.java
new file mode 100644
index 0000000..a853e7d
--- /dev/null
+++ 
b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/map/AtomixClientMapComponent.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.atomix.client.map;
+
+import java.util.Map;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Endpoint;
+import org.apache.camel.component.atomix.client.AbstractAtomixClientComponent;
+
+public class AtomixClientMapComponent extends 
AbstractAtomixClientComponent<AtomixClientMapConfiguration> {
+    public AtomixClientMapComponent() {
+        super(new AtomixClientMapConfiguration());
+    }
+
+    public AtomixClientMapComponent(CamelContext camelContext) {
+        super(camelContext, new AtomixClientMapConfiguration());
+    }
+
+    @Override
+    protected Endpoint createEndpoint(String uri, String remaining, 
Map<String, Object> parameters) throws Exception {
+        AtomixClientMapConfiguration configuration = 
super.getConfiguration().copy();
+        setProperties(configuration, parameters);
+
+        return new AtomixClientMapEndpoint(uri, this, configuration, 
remaining);
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/08d2b0fe/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/map/AtomixClientMapConfiguration.java
----------------------------------------------------------------------
diff --git 
a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/map/AtomixClientMapConfiguration.java
 
b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/map/AtomixClientMapConfiguration.java
new file mode 100644
index 0000000..f7f50a2
--- /dev/null
+++ 
b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/map/AtomixClientMapConfiguration.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.atomix.client.map;
+
+import io.atomix.collections.DistributedMap;
+import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.component.atomix.client.AtomixClientAction;
+import org.apache.camel.component.atomix.client.AtomixClientConfiguration;
+import org.apache.camel.spi.UriParam;
+import org.apache.camel.spi.UriParams;
+
+@UriParams
+public class AtomixClientMapConfiguration extends AtomixClientConfiguration {
+    @UriParam(defaultValue = "PUT")
+    private AtomixClientAction defaultAction = AtomixClientAction.PUT;
+    @UriParam
+    private Long ttl;
+    @UriParam
+    private String resultHeader;
+    @UriParam(label = "advanced")
+    private DistributedMap.Config config = new DistributedMap.Config();
+    @UriParam(label = "advanced")
+    private DistributedMap.Options options = new DistributedMap.Options();
+
+    // ****************************************
+    // Properties
+    // ****************************************
+
+    public AtomixClientAction getDefaultAction() {
+        return defaultAction;
+    }
+
+    /**
+     * The default action.
+     */
+    public void setDefaultAction(AtomixClientAction defaultAction) {
+        this.defaultAction = defaultAction;
+    }
+
+    public Long getTtl() {
+        return ttl;
+    }
+
+    /**
+     * The resource ttl.
+     */
+    public void setTtl(Long ttl) {
+        this.ttl = ttl;
+    }
+
+    public String getResultHeader() {
+        return resultHeader;
+    }
+
+    /**
+     * The header that wil carry the result.
+     */
+    public void setResultHeader(String resultHeader) {
+        this.resultHeader = resultHeader;
+    }
+
+    public DistributedMap.Config getConfig() {
+        return config;
+    }
+
+    /**
+     * The cluster wide map config
+     */
+    public void setConfig(DistributedMap.Config config) {
+        this.config = config;
+    }
+
+    public DistributedMap.Options getOptions() {
+        return options;
+    }
+
+    /**
+     * The local map options
+     */
+    public void setOptions(DistributedMap.Options options) {
+        this.options = options;
+    }
+
+    // ****************************************
+    // Copy
+    // ****************************************
+
+    public AtomixClientMapConfiguration copy() {
+        try {
+            return (AtomixClientMapConfiguration) super.clone();
+        } catch (CloneNotSupportedException e) {
+            throw new RuntimeCamelException(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/08d2b0fe/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/map/AtomixClientMapConsumer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/map/AtomixClientMapConsumer.java
 
b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/map/AtomixClientMapConsumer.java
new file mode 100644
index 0000000..6e7b410
--- /dev/null
+++ 
b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/map/AtomixClientMapConsumer.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.atomix.client.map;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import io.atomix.catalyst.concurrent.Listener;
+import io.atomix.collections.DistributedMap;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.component.atomix.client.AbstractAtomixClientConsumer;
+import org.apache.camel.component.atomix.client.AtomixClientConstants;
+
+public class AtomixClientMapConsumer extends 
AbstractAtomixClientConsumer<AtomixClientMapEndpoint> {
+    private final List<Listener<DistributedMap.EntryEvent<Object, Object>>> 
listeners;
+    private DistributedMap<Object, Object> map;
+
+    public AtomixClientMapConsumer(AtomixClientMapEndpoint endpoint, Processor 
processor) {
+        super(endpoint, processor);
+        this.listeners = new ArrayList<>();
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        super.doStart();
+
+        this.map = getAtomixEndpoint()
+            .getAtomix()
+            .getMap(
+                getAtomixEndpoint().getMapName(),
+                getAtomixEndpoint().getAtomixConfiguration().getConfig(),
+                getAtomixEndpoint().getAtomixConfiguration().getOptions())
+            .join();
+
+        this.listeners.add(this.map.onAdd(this::onEvent).join());
+        this.listeners.add(this.map.onRemove(this::onEvent).join());
+        this.listeners.add(this.map.onUpdate(this::onEvent).join());
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        // close listeners
+        listeners.forEach(Listener::close);
+
+        // close the map
+        if (this.map == null) {
+            this.map.close().join();
+            this.map = null;
+        }
+
+        super.doStart();
+    }
+
+    // ********************************************
+    // Event handler
+    // ********************************************
+
+    private void onEvent(DistributedMap.EntryEvent<Object, Object> event) {
+        Exchange exchange = getEndpoint().createExchange();
+        exchange.getIn().setHeader(AtomixClientConstants.EVENT_TYPE, 
event.type());
+        exchange.getIn().setHeader(AtomixClientConstants.RESOURCE_KEY, 
event.entry().getKey());
+        exchange.getIn().setBody(event.entry().getValue());
+
+        try {
+            getProcessor().process(exchange);
+        } catch (Exception e) {
+            getExceptionHandler().handleException(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/08d2b0fe/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/map/AtomixClientMapEndpoint.java
----------------------------------------------------------------------
diff --git 
a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/map/AtomixClientMapEndpoint.java
 
b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/map/AtomixClientMapEndpoint.java
new file mode 100644
index 0000000..e8525a0
--- /dev/null
+++ 
b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/map/AtomixClientMapEndpoint.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.atomix.client.map;
+
+import org.apache.camel.Producer;
+import org.apache.camel.component.atomix.client.AbstractAtomixClientEndpoint;
+import org.apache.camel.spi.Metadata;
+import org.apache.camel.spi.UriEndpoint;
+import org.apache.camel.spi.UriPath;
+
+@UriEndpoint(firstVersion = "2.20.0", scheme = "atomix-map", title = 
"atomix-map", syntax = "atomix-map:mapName", producerOnly = true, label = 
"clustering")
+class AtomixClientMapEndpoint extends 
AbstractAtomixClientEndpoint<AtomixClientMapComponent, 
AtomixClientMapConfiguration> {
+
+    @UriPath(description = "The distributed map name")
+    @Metadata(required = "true")
+    private final String mapName;
+
+    public AtomixClientMapEndpoint(String uri, AtomixClientMapComponent 
component, AtomixClientMapConfiguration configuration, String mapName) {
+        super(uri, component, configuration);
+
+        this.mapName = mapName;
+    }
+
+    @Override
+    public Producer createProducer() throws Exception {
+        return new AtomixClientMapProducer(this, mapName);
+    }
+
+    /*
+    @Override
+    public Consumer createConsumer(Processor processor) throws Exception {
+        return new AtomixClientMapConsumer(this, processor);
+    }
+    */
+
+    public String getMapName() {
+        return mapName;
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/08d2b0fe/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/map/AtomixClientMapProducer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/map/AtomixClientMapProducer.java
 
b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/map/AtomixClientMapProducer.java
new file mode 100644
index 0000000..70dc656
--- /dev/null
+++ 
b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/client/map/AtomixClientMapProducer.java
@@ -0,0 +1,364 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.atomix.client.map;
+
+import java.time.Duration;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import io.atomix.collections.DistributedMap;
+import io.atomix.resource.ReadConsistency;
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Message;
+import 
org.apache.camel.component.atomix.client.AbstractAsyncAtomixClientProducer;
+import org.apache.camel.component.atomix.client.AtomixClientAction;
+import org.apache.camel.util.ExchangeHelper;
+import org.apache.camel.util.ObjectHelper;
+
+import static 
org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_ACTION;
+import static 
org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_ACTION_HAS_RESULT;
+import static 
org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_DEFAULT_VALUE;
+import static 
org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_KEY;
+import static 
org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_NAME;
+import static 
org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_OLD_VALUE;
+import static 
org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_READ_CONSISTENCY;
+import static 
org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_TTL;
+import static 
org.apache.camel.component.atomix.client.AtomixClientConstants.RESOURCE_VALUE;
+
+final class AtomixClientMapProducer extends 
AbstractAsyncAtomixClientProducer<AtomixClientMapEndpoint> {
+    private final String mapName;
+    private final ConcurrentMap<String, DistributedMap<Object, Object>> maps;
+    private final AtomixClientMapConfiguration configuration;
+
+    protected AtomixClientMapProducer(AtomixClientMapEndpoint endpoint, String 
mapName) {
+        super(endpoint);
+        this.mapName = ObjectHelper.notNull(mapName, "map name");
+        this.configuration = endpoint.getAtomixConfiguration();
+        this.maps = new ConcurrentHashMap<>();
+    }
+
+    @Override
+    protected AtomixClientAction getAction(Message message) {
+        return message.getHeader(RESOURCE_ACTION, 
configuration::getDefaultAction, AtomixClientAction.class);
+    }
+
+    // *********************************
+    // Handlers
+    // *********************************
+
+    @AsyncInvokeOnHeader(AtomixClientAction.PUT)
+    boolean onPut(Message message, AsyncCallback callback) throws Exception {
+        final DistributedMap<Object, Object> map = getMap(message);
+        final Object key = ExchangeHelper.getMandatoryHeader(message, 
RESOURCE_KEY, Object.class);
+        final Duration ttl = message.getHeader(RESOURCE_TTL, 
configuration::getTtl, Duration.class);
+
+        if (ttl != null) {
+            map.put(
+                key,
+                message.getMandatoryBody(),
+                ttl
+            ).thenAccept(
+                result -> processResult(message, callback, result)
+            );
+        } else {
+            map.put(
+                key,
+                message.getMandatoryBody()
+            ).thenAccept(
+                result -> processResult(message, callback, result)
+            );
+        }
+
+        return false;
+    }
+
+    @AsyncInvokeOnHeader(AtomixClientAction.PUT_IF_ABSENT)
+    boolean onPutIfAbsent(Message message, AsyncCallback callback) throws 
Exception {
+        final DistributedMap<Object, Object> map = getMap(message);
+        final Object key = ExchangeHelper.getMandatoryHeader(message, 
RESOURCE_KEY, Object.class);
+        final Duration ttl = message.getHeader(RESOURCE_TTL, 
configuration::getTtl, Duration.class);
+
+        if (ttl != null) {
+            map.putIfAbsent(
+                key,
+                message.getMandatoryBody(),
+                ttl
+            ).thenAccept(
+                result -> processResult(message, callback, result)
+            );
+        } else {
+            map.putIfAbsent(
+                key,
+                message.getMandatoryBody()
+            ).thenAccept(
+                result -> processResult(message, callback, result)
+            );
+        }
+
+        return false;
+    }
+
+    @AsyncInvokeOnHeader(AtomixClientAction.GET)
+    boolean onGet(Message message, AsyncCallback callback) throws Exception {
+        final DistributedMap<Object, Object> map = getMap(message);
+        final Object key = ExchangeHelper.getMandatoryHeader(message, 
RESOURCE_KEY, Object.class);
+        final Object defaultValue = message.getHeader(RESOURCE_DEFAULT_VALUE);
+        final ReadConsistency consistency = 
message.getHeader(RESOURCE_READ_CONSISTENCY,  
configuration::getReadConsistency, ReadConsistency.class);
+
+        if (consistency != null) {
+            if (defaultValue != null) {
+                map.getOrDefault(key, defaultValue, consistency).thenAccept(
+                    result -> processResult(message, callback, result)
+                );
+            } else {
+                map.get(key, consistency).thenAccept(
+                    result -> processResult(message, callback, result)
+                );
+            }
+        } else {
+            if (defaultValue != null) {
+                map.getOrDefault(key, defaultValue).thenAccept(
+                    result -> processResult(message, callback, result)
+                );
+            } else {
+                map.get(key).thenAccept(
+                    result -> processResult(message, callback, result)
+                );
+            }
+        }
+
+        return false;
+    }
+
+    @AsyncInvokeOnHeader(AtomixClientAction.CLEAR)
+    boolean onClear(Message message, AsyncCallback callback) throws Exception {
+        final DistributedMap<Object, Object> map = getMap(message);
+
+        map.clear().thenAccept(
+            result -> processResult(message, callback, result)
+        );
+
+        return false;
+    }
+
+    @AsyncInvokeOnHeader(AtomixClientAction.SIZE)
+    boolean onSize(Message message, AsyncCallback callback) throws Exception {
+        final DistributedMap<Object, Object> map = getMap(message);
+        final ReadConsistency consistency = 
message.getHeader(RESOURCE_READ_CONSISTENCY,  
configuration::getReadConsistency, ReadConsistency.class);
+
+        if (consistency != null) {
+            map.size(consistency).thenAccept(
+                result -> processResult(message, callback, result)
+            );
+        } else {
+            map.size().thenAccept(
+                result -> processResult(message, callback, result)
+            );
+        }
+
+        return false;
+    }
+
+    @AsyncInvokeOnHeader(AtomixClientAction.IS_EMPTY)
+    boolean onIsEmpty(Message message, AsyncCallback callback) throws 
Exception {
+        final DistributedMap<Object, Object> map = getMap(message);
+        final ReadConsistency consistency = 
message.getHeader(RESOURCE_READ_CONSISTENCY,  
configuration::getReadConsistency, ReadConsistency.class);
+
+        if (consistency != null) {
+            map.isEmpty(consistency).thenAccept(
+                result -> processResult(message, callback, result)
+            );
+        } else {
+            map.isEmpty().thenAccept(
+                result -> processResult(message, callback, result)
+            );
+        }
+
+        return false;
+    }
+
+    @AsyncInvokeOnHeader(AtomixClientAction.ENTRY_SET)
+    boolean onEntrySet(Message message, AsyncCallback callback) throws 
Exception {
+        final DistributedMap<Object, Object> map = getMap(message);
+        final ReadConsistency consistency = 
message.getHeader(RESOURCE_READ_CONSISTENCY, configuration::getReadConsistency, 
ReadConsistency.class);
+
+        if (consistency != null) {
+            map.entrySet(consistency).thenAccept(
+                result -> processResult(message, callback, result)
+            );
+        } else {
+            map.entrySet().thenAccept(
+                result -> processResult(message, callback, result)
+            );
+        }
+
+        return false;
+    }
+
+    @AsyncInvokeOnHeader(AtomixClientAction.VALUES)
+    boolean onValues(Message message, AsyncCallback callback) throws Exception 
{
+        final DistributedMap<Object, Object> map = getMap(message);
+        final ReadConsistency consistency = 
message.getHeader(RESOURCE_READ_CONSISTENCY, configuration::getReadConsistency, 
ReadConsistency.class);
+
+        if (consistency != null) {
+            map.values(consistency).thenAccept(
+                result -> processResult(message, callback, result)
+            );
+        } else {
+            map.values().thenAccept(
+                result -> processResult(message, callback, result)
+            );
+        }
+
+        return false;
+    }
+
+    @AsyncInvokeOnHeader(AtomixClientAction.CONTAINS_KEY)
+    boolean onContainsKey(Message message, AsyncCallback callback) throws 
Exception {
+        final DistributedMap<Object, Object> map = getMap(message);
+        final ReadConsistency consistency = 
message.getHeader(RESOURCE_READ_CONSISTENCY,  
configuration::getReadConsistency, ReadConsistency.class);
+        final Object key = message.getHeader(RESOURCE_KEY, message::getBody, 
Object.class);
+
+        ObjectHelper.notNull(key, RESOURCE_KEY);
+
+        if (consistency != null) {
+            map.containsKey(key, consistency).thenAccept(
+                result -> processResult(message, callback, result)
+            );
+        } else {
+            map.containsKey(key).thenAccept(
+                result -> processResult(message, callback, result)
+            );
+        }
+
+        return false;
+    }
+
+    @AsyncInvokeOnHeader(AtomixClientAction.CONTAINS_VALUE)
+    boolean onContainsValue(Message message, AsyncCallback callback) throws 
Exception {
+        final DistributedMap<Object, Object> map = getMap(message);
+        final ReadConsistency consistency = 
message.getHeader(RESOURCE_READ_CONSISTENCY,  
configuration::getReadConsistency, ReadConsistency.class);
+        final Object value = message.getHeader(RESOURCE_VALUE, 
message::getBody, Object.class);
+
+        ObjectHelper.notNull(value, RESOURCE_VALUE);
+
+        if (consistency != null) {
+            map.containsValue(value, consistency).thenAccept(
+                result -> processResult(message, callback, result)
+            );
+        } else {
+            map.containsValue(value).thenAccept(
+                result -> processResult(message, callback, result)
+            );
+        }
+
+        return false;
+    }
+
+    @AsyncInvokeOnHeader(AtomixClientAction.REMOVE)
+    boolean onRemove(Message message, AsyncCallback callback) throws Exception 
{
+        final DistributedMap<Object, Object> map = getMap(message);
+        final Object key = message.getHeader(RESOURCE_KEY, message::getBody, 
Object.class);
+        final Object value = message.getHeader(RESOURCE_VALUE, 
message::getBody, Object.class);
+
+        ObjectHelper.notNull(key, RESOURCE_VALUE);
+
+        if (value != null) {
+            map.remove(key, value).thenAccept(
+                result -> processResult(message, callback, result)
+            );
+        } else {
+            map.remove(key).thenAccept(
+                result -> processResult(message, callback, result)
+            );
+        }
+
+        return false;
+    }
+
+    @AsyncInvokeOnHeader(AtomixClientAction.REPLACE)
+    boolean onReplace(Message message, AsyncCallback callback) throws 
Exception {
+        final DistributedMap<Object, Object> map = getMap(message);
+        final Duration ttl = message.getHeader(RESOURCE_TTL, 
configuration::getTtl, Duration.class);
+        final Object key = message.getHeader(RESOURCE_KEY, message::getBody, 
Object.class);
+        final Object newValue = message.getHeader(RESOURCE_VALUE, 
message::getBody, Object.class);
+        final Object oldValue = message.getHeader(RESOURCE_OLD_VALUE, 
Object.class);
+
+        ObjectHelper.notNull(key, RESOURCE_VALUE);
+        ObjectHelper.notNull(newValue, RESOURCE_VALUE);
+
+        if (ttl != null) {
+            if (oldValue != null) {
+                map.replace(key, oldValue, newValue, ttl).thenAccept(
+                    result -> processResult(message, callback, result)
+                );
+            } else {
+                map.replace(key, newValue, ttl).thenAccept(
+                    result -> processResult(message, callback, result)
+                );
+            }
+        } else {
+            if (oldValue != null) {
+                map.replace(key, oldValue, newValue).thenAccept(
+                    result -> processResult(message, callback, result)
+                );
+            } else {
+                map.replace(key, newValue).thenAccept(
+                    result -> processResult(message, callback, result)
+                );
+            }
+        }
+
+        return false;
+    }
+
+    // *********************************
+    // Helpers
+    // *********************************
+
+    private void processResult(Message message, AsyncCallback callback, Object 
result) {
+        if (result != null && !(result instanceof Void)) {
+            message.setHeader(RESOURCE_ACTION_HAS_RESULT, true);
+
+            String resultHeader = configuration.getResultHeader();
+            if (resultHeader != null) {
+                message.setHeader(resultHeader, result);
+            } else {
+                message.setBody(result);
+            }
+        } else {
+            message.setHeader(RESOURCE_ACTION_HAS_RESULT, false);
+        }
+
+        callback.done(false);
+    }
+
+    private DistributedMap<Object, Object> getMap(Message message) {
+        return maps.computeIfAbsent(
+            message.getHeader(RESOURCE_NAME, getAtomixEndpoint().getMapName(), 
String.class),
+            name -> {
+                return getAtomixEndpoint()
+                    .getAtomix()
+                    .getMap(
+                        name,
+                        
getAtomixEndpoint().getAtomixConfiguration().getConfig(),
+                        
getAtomixEndpoint().getAtomixConfiguration().getOptions())
+                    .join();
+            }
+        );
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/08d2b0fe/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/cluster/AtomixClusterConfiguration.java
----------------------------------------------------------------------
diff --git 
a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/cluster/AtomixClusterConfiguration.java
 
b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/cluster/AtomixClusterConfiguration.java
index 8a732e1..b05230f 100644
--- 
a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/cluster/AtomixClusterConfiguration.java
+++ 
b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/cluster/AtomixClusterConfiguration.java
@@ -17,6 +17,7 @@
 package org.apache.camel.component.atomix.cluster;
 
 import io.atomix.AtomixReplica;
+import io.atomix.catalyst.transport.Transport;
 import io.atomix.copycat.server.storage.StorageLevel;
 import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.component.atomix.AtomixConfiguration;
@@ -24,65 +25,70 @@ import org.apache.camel.spi.UriParam;
 import org.apache.camel.spi.UriParams;
 
 @UriParams
-public class AtomixClusterConfiguration extends AtomixConfiguration implements 
Cloneable {
+public class AtomixClusterConfiguration extends 
AtomixConfiguration<AtomixReplica> implements Cloneable {
 
     @UriParam
-    private String storagePath;
-
-    @UriParam(defaultValue = "MEMORY")
-    private StorageLevel storageLevel = StorageLevel.MEMORY;
+    private Class<? extends Transport> clientTransport;
 
     @UriParam
-    private String replicaRef;
+    private Class<? extends Transport> serverTransport;
 
     @UriParam
-    private AtomixReplica replica;
+    private String storagePath;
+
+    @UriParam(defaultValue = "MEMORY")
+    private StorageLevel storageLevel = StorageLevel.MEMORY;
 
     public AtomixClusterConfiguration() {
     }
 
-    public String getStoragePath() {
-        return storagePath;
+    // ******************************************
+    // Properties
+    // ******************************************
+
+
+    public Class<? extends Transport> getClientTransport() {
+        return clientTransport;
     }
 
     /**
-     * Sets the log directory.
+     * The client transport
      */
-    public void setStoragePath(String storagePath) {
-        this.storagePath = storagePath;
+    public void setClientTransport(Class<? extends Transport> clientTransport) 
{
+        this.clientTransport = clientTransport;
     }
 
-    public StorageLevel getStorageLevel() {
-        return storageLevel;
+    public Class<? extends Transport> getServerTransport() {
+        return serverTransport;
     }
 
     /**
-     * Sets the log storage level.
+     * The server transport
      */
-    public void setStorageLevel(StorageLevel storageLevel) {
-        this.storageLevel = storageLevel;
+    public void setServerTransport(Class<? extends Transport> serverTransport) 
{
+        this.serverTransport = serverTransport;
     }
 
-    public String getReplicaRef() {
-        return replicaRef;
+    public String getStoragePath() {
+        return storagePath;
     }
 
     /**
-     * Set the reference of an instance of {@link AtomixReplica}.
+     * Sets the log directory.
      */
-    public void setReplicaRef(String clusterref) {
-        this.replicaRef = clusterref;
+    public void setStoragePath(String storagePath) {
+        this.storagePath = storagePath;
     }
 
-    public AtomixReplica getReplica() {
-        return replica;
+    public StorageLevel getStorageLevel() {
+        return storageLevel;
     }
 
     /**
-     * Set an instance of {@link AtomixReplica}.
+     * Sets the log storage level.
      */
-    public void setReplica(AtomixReplica replica) {
-        this.replica = replica;
+    public void setStorageLevel(StorageLevel storageLevel) {
+        this.storageLevel = storageLevel;
     }
 
     // ****************************************

http://git-wip-us.apache.org/repos/asf/camel/blob/08d2b0fe/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/cluster/AtomixClusterHelper.java
----------------------------------------------------------------------
diff --git 
a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/cluster/AtomixClusterHelper.java
 
b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/cluster/AtomixClusterHelper.java
index 777a4f6..0530495 100644
--- 
a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/cluster/AtomixClusterHelper.java
+++ 
b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/cluster/AtomixClusterHelper.java
@@ -23,7 +23,6 @@ import io.atomix.AtomixReplica;
 import io.atomix.catalyst.transport.Address;
 import io.atomix.copycat.server.storage.Storage;
 import org.apache.camel.CamelContext;
-import org.apache.camel.util.CamelContextHelper;
 import org.apache.camel.util.ObjectHelper;
 import org.apache.camel.util.ResourceHelper;
 
@@ -36,41 +35,47 @@ public final class AtomixClusterHelper {
     }
 
     public static AtomixReplica createReplica(CamelContext camelContext, 
Address address, AtomixClusterConfiguration configuration) throws Exception {
-        AtomixReplica atomix = configuration.getReplica();
+        AtomixReplica atomix = configuration.getAtomix();
 
         if (atomix == null) {
-            if (configuration.getReplicaRef() != null) {
-                atomix = CamelContextHelper.mandatoryLookup(camelContext, 
configuration.getReplicaRef(), AtomixReplica.class);
-            } else {
-                final AtomixReplica.Builder atomixBuilder;
+            final AtomixReplica.Builder atomixBuilder;
 
-                String uri = configuration.getConfigurationUri();
-                if (ObjectHelper.isNotEmpty(uri)) {
-                    uri = camelContext.resolvePropertyPlaceholders(uri);
-                    try (InputStream is = 
ResourceHelper.resolveMandatoryResourceAsInputStream(camelContext, uri)) {
-                        Properties properties = new Properties();
-                        properties.load(is);
+            String uri = configuration.getConfigurationUri();
+            if (ObjectHelper.isNotEmpty(uri)) {
+                uri = camelContext.resolvePropertyPlaceholders(uri);
+                try (InputStream is = 
ResourceHelper.resolveMandatoryResourceAsInputStream(camelContext, uri)) {
+                    Properties properties = new Properties();
+                    properties.load(is);
 
-                        atomixBuilder = AtomixReplica.builder(address, 
properties);
-                    }
-                } else {
-                    atomixBuilder = AtomixReplica.builder(address);
+                    atomixBuilder = AtomixReplica.builder(address, properties);
                 }
+            } else {
+                atomixBuilder = AtomixReplica.builder(address);
+            }
 
-                Storage.Builder storageBuilder = Storage.builder();
-                ObjectHelper.ifNotEmpty(configuration.getStorageLevel(), 
storageBuilder::withStorageLevel);
-                ObjectHelper.ifNotEmpty(configuration.getStoragePath(), 
storageBuilder::withDirectory);
-
-                atomixBuilder.withStorage(storageBuilder.build());
+            Storage.Builder storageBuilder = Storage.builder();
+            ObjectHelper.ifNotEmpty(configuration.getStorageLevel(), 
storageBuilder::withStorageLevel);
+            ObjectHelper.ifNotEmpty(configuration.getStoragePath(), 
storageBuilder::withDirectory);
 
-                if (configuration.getTransport() != null) {
-                    atomixBuilder.withTransport(
-                        
camelContext.getInjector().newInstance(configuration.getTransport())
-                    );
-                }
+            atomixBuilder.withStorage(storageBuilder.build());
 
-                atomix = atomixBuilder.build();
+            if (configuration.getTransport() != null) {
+                atomixBuilder.withTransport(
+                    
camelContext.getInjector().newInstance(configuration.getTransport())
+                );
+            }
+            if (configuration.getClientTransport() != null) {
+                atomixBuilder.withClientTransport(
+                    
camelContext.getInjector().newInstance(configuration.getClientTransport())
+                );
             }
+            if (configuration.getServerTransport() != null) {
+                atomixBuilder.withServerTransport(
+                    
camelContext.getInjector().newInstance(configuration.getServerTransport())
+                );
+            }
+
+            atomix = atomixBuilder.build();
         }
 
         return atomix;

http://git-wip-us.apache.org/repos/asf/camel/blob/08d2b0fe/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterService.java
----------------------------------------------------------------------
diff --git 
a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterService.java
 
b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterService.java
index 6a6a750..bf49eab 100644
--- 
a/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterService.java
+++ 
b/components/camel-atomix/src/main/java/org/apache/camel/component/atomix/ha/AtomixClusterService.java
@@ -18,7 +18,6 @@ package org.apache.camel.component.atomix.ha;
 
 import java.util.List;
 
-import io.atomix.Atomix;
 import io.atomix.AtomixReplica;
 import io.atomix.catalyst.transport.Address;
 import io.atomix.catalyst.transport.Transport;
@@ -109,20 +108,12 @@ public final class AtomixClusterService extends 
AbstractCamelClusterService<Atom
         configuration.setTransport(transport);
     }
 
-    public String getReplicaRef() {
-        return configuration.getReplicaRef();
+    public AtomixReplica getAtomix() {
+        return configuration.getAtomix();
     }
 
-    public void setReplicaRef(String clusterref) {
-        configuration.setReplicaRef(clusterref);
-    }
-
-    public Atomix getReplica() {
-        return configuration.getReplica();
-    }
-
-    public void setReplica(AtomixReplica replica) {
-        configuration.setReplica(replica);
+    public void setAtomix(AtomixReplica atomix) {
+        configuration.setAtomix(atomix);
     }
 
     public String getConfigurationUri() {
@@ -139,23 +130,18 @@ public final class AtomixClusterService extends 
AbstractCamelClusterService<Atom
 
     @Override
     protected void doStart() throws Exception {
-        // Assume that if addresses are provided the cluster needs be 
bootstrapped.
-        if (ObjectHelper.isNotEmpty(configuration.getNodes())) {
-            LOGGER.debug("Bootstrap cluster on address {} for nodes: {}", 
address, configuration.getNodes());
-            getOrCreateAtomix().bootstrap(configuration.getNodes()).join();
-            LOGGER.debug("Bootstrap cluster done");
-        }
+        // instantiate a new atomix replica
+        getOrCreateReplica();
 
         super.doStart();
     }
 
     @Override
     protected AtomixClusterView createView(String namespace) throws Exception {
-        return new AtomixClusterView(this, namespace, getOrCreateAtomix());
+        return new AtomixClusterView(this, namespace, getOrCreateReplica());
     }
 
-
-    private AtomixReplica getOrCreateAtomix() throws Exception {
+    private AtomixReplica getOrCreateReplica() throws Exception {
         if (atomix == null) {
             // Validate parameters
             ObjectHelper.notNull(getCamelContext(), "Camel Context");

Reply via email to