CAMEL-9445 camel-ignite: New Apache Ignite component. Initial work.

Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/a695c5d3
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/a695c5d3
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/a695c5d3

Branch: refs/heads/master
Commit: a695c5d3656dcbac1e17b3109c7a6c340e8df7f0
Parents: ff8e1f1
Author: Raul Kripalani <ra...@apache.org>
Authored: Wed Dec 23 13:26:42 2015 +0000
Committer: Raul Kripalani <ra...@apache.org>
Committed: Wed Dec 23 15:18:57 2015 +0000

----------------------------------------------------------------------
 components/camel-ignite/pom.xml                 |  92 ++++++
 .../ignite/AbstractIgniteEndpoint.java          |  99 ++++++
 .../ignite/ClusterGroupExpression.java          |  30 ++
 .../ignite/ClusterGroupExpressions.java         |  85 +++++
 .../camel/component/ignite/IgniteComponent.java | 240 ++++++++++++++
 .../camel/component/ignite/IgniteConstants.java |  59 ++++
 .../camel/component/ignite/IgniteHelper.java    |  38 +++
 .../IgniteCacheContinuousQueryConsumer.java     | 155 +++++++++
 .../ignite/cache/IgniteCacheEndpoint.java       | 311 +++++++++++++++++++
 .../ignite/cache/IgniteCacheOperation.java      |  26 ++
 .../ignite/cache/IgniteCacheProducer.java       | 208 +++++++++++++
 .../ignite/compute/IgniteComputeEndpoint.java   | 155 +++++++++
 .../compute/IgniteComputeExecutionType.java     |  26 ++
 .../ignite/compute/IgniteComputeProducer.java   | 275 ++++++++++++++++
 .../ignite/events/IgniteEventsConsumer.java     |  92 ++++++
 .../ignite/events/IgniteEventsEndpoint.java     | 146 +++++++++
 .../ignite/idgen/IgniteIdGenEndpoint.java       | 116 +++++++
 .../ignite/idgen/IgniteIdGenOperation.java      |  26 ++
 .../ignite/idgen/IgniteIdGenProducer.java       |  83 +++++
 .../messaging/IgniteMessagingConsumer.java      |  78 +++++
 .../messaging/IgniteMessagingEndpoint.java      | 123 ++++++++
 .../messaging/IgniteMessagingProducer.java      |  69 ++++
 .../messaging/IgniteMessagingSendMode.java      |  23 ++
 .../ignite/queue/IgniteQueueEndpoint.java       | 175 +++++++++++
 .../ignite/queue/IgniteQueueOperation.java      |  26 ++
 .../ignite/queue/IgniteQueueProducer.java       | 170 ++++++++++
 .../component/ignite/set/IgniteSetEndpoint.java | 131 ++++++++
 .../ignite/set/IgniteSetOperation.java          |  26 ++
 .../component/ignite/set/IgniteSetProducer.java | 120 +++++++
 .../src/main/resources/META-INF/LICENSE.txt     | 203 ++++++++++++
 .../src/main/resources/META-INF/NOTICE.txt      |  11 +
 .../services/org/apache/camel/component/ignite  |  18 ++
 .../component/ignite/AbstractIgniteTest.java    |  63 ++++
 .../ignite/IgniteCacheContinuousQueryTest.java  | 281 +++++++++++++++++
 .../camel/component/ignite/IgniteCacheTest.java | 237 ++++++++++++++
 .../component/ignite/IgniteComputeTest.java     | 192 ++++++++++++
 .../component/ignite/IgniteEventsTest.java      | 168 ++++++++++
 .../camel/component/ignite/IgniteIdGenTest.java |  83 +++++
 .../component/ignite/IgniteMessagingTest.java   | 164 ++++++++++
 .../camel/component/ignite/IgniteQueueTest.java | 257 +++++++++++++++
 .../camel/component/ignite/IgniteSetTest.java   | 176 +++++++++++
 .../ignite/TestIgniteComputeResources.java      | 149 +++++++++
 .../src/test/resources/log4j.properties         |  37 +++
 components/pom.xml                              |   1 +
 parent/pom.xml                                  |   1 +
 45 files changed, 5244 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/a695c5d3/components/camel-ignite/pom.xml
----------------------------------------------------------------------
diff --git a/components/camel-ignite/pom.xml b/components/camel-ignite/pom.xml
new file mode 100644
index 0000000..53af3f7
--- /dev/null
+++ b/components/camel-ignite/pom.xml
@@ -0,0 +1,92 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.camel</groupId>
+    <artifactId>components</artifactId>
+    <version>2.17-SNAPSHOT</version>
+  </parent>
+
+  <artifactId>camel-ignite</artifactId>
+  <packaging>bundle</packaging>
+  <name>Camel :: Ignite</name>
+  <description>Camel Ignite component</description>
+
+  <properties>
+    
<camel.osgi.export.pkg>org.apache.camel.component.ignite.*</camel.osgi.export.pkg>
+    
<camel.osgi.export.service>org.apache.camel.spi.ComponentResolver;component=ignite</camel.osgi.export.service>
+  </properties>
+
+  <dependencies>
+
+    <dependency>
+      <groupId>org.apache.camel</groupId>
+      <artifactId>camel-core</artifactId>
+    </dependency>
+
+    <!-- Apache Ignite dependency -->
+    <dependency>
+      <groupId>org.apache.ignite</groupId>
+      <artifactId>ignite-core</artifactId>
+      <version>${ignite-version}</version>
+    </dependency>
+
+    <!-- test dependencies -->
+    <dependency>
+      <groupId>org.apache.camel</groupId>
+      <artifactId>camel-test-spring</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-log4j12</artifactId>
+      <scope>test</scope>
+    </dependency>
+    
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.truth</groupId>
+      <artifactId>truth</artifactId>
+      <scope>test</scope>
+    </dependency>
+    
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+      <scope>test</scope>
+    </dependency>
+    
+    <dependency>
+      <groupId>com.jayway.awaitility</groupId>
+      <artifactId>awaitility</artifactId>
+      <scope>test</scope>
+    </dependency>
+    
+  </dependencies>
+
+</project>

http://git-wip-us.apache.org/repos/asf/camel/blob/a695c5d3/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/AbstractIgniteEndpoint.java
----------------------------------------------------------------------
diff --git 
a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/AbstractIgniteEndpoint.java
 
b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/AbstractIgniteEndpoint.java
new file mode 100644
index 0000000..09791ca
--- /dev/null
+++ 
b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/AbstractIgniteEndpoint.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.ignite;
+
+import org.apache.camel.Component;
+import org.apache.camel.impl.DefaultEndpoint;
+import org.apache.camel.spi.UriEndpoint;
+import org.apache.camel.spi.UriParam;
+import org.apache.ignite.Ignite;
+
+/**
+ * Base class for all Ignite endpoints. 
+ */
+@UriEndpoint(scheme = "ignite:...", title = "Ignite endpoints", syntax = 
"ignite:...", label = "nosql,cache,compute", producerOnly = true)
+public abstract class AbstractIgniteEndpoint extends DefaultEndpoint {
+
+    protected IgniteComponent component;
+
+    @UriParam(defaultValue = "true")
+    private boolean propagateIncomingBodyIfNoReturnValue = true;
+
+    @UriParam
+    private boolean treatCollectionsAsCacheObjects;
+
+    public AbstractIgniteEndpoint(String endpointUri, Component component) {
+        super(endpointUri, component);
+    }
+
+    @Override
+    public boolean isSingleton() {
+        return false;
+    }
+
+    protected IgniteComponent igniteComponent() {
+        if (component == null) {
+            component = (IgniteComponent) getComponent();
+        }
+        return component;
+    }
+
+    protected Ignite ignite() {
+        return igniteComponent().getIgnite();
+    }
+
+    /**
+     * Gets whether to propagate the incoming body if the return type of the 
underlying 
+     * Ignite operation is void.
+     * 
+     * @return
+     */
+    public boolean isPropagateIncomingBodyIfNoReturnValue() {
+        return propagateIncomingBodyIfNoReturnValue;
+    }
+
+    /**
+     * Sets whether to propagate the incoming body if the return type of the 
underlying 
+     * Ignite operation is void.
+     * 
+     * @param propagateIncomingBodyIfNoReturnValue
+     */
+    public void setPropagateIncomingBodyIfNoReturnValue(boolean 
propagateIncomingBodyIfNoReturnValue) {
+        this.propagateIncomingBodyIfNoReturnValue = 
propagateIncomingBodyIfNoReturnValue;
+    }
+
+    /**
+     * Gets whether to treat Collections as cache objects or as Collections of 
items to 
+     * insert/update/compute, etc.
+     * 
+     * @return
+     */
+    public boolean isTreatCollectionsAsCacheObjects() {
+        return treatCollectionsAsCacheObjects;
+    }
+
+    /**
+     * Sets whether to treat Collections as cache objects or as Collections of 
items to 
+     * insert/update/compute, etc.
+     * 
+     * @param treatCollectionsAsCacheObjects
+     */
+    public void setTreatCollectionsAsCacheObjects(boolean 
treatCollectionsAsCacheObjects) {
+        this.treatCollectionsAsCacheObjects = treatCollectionsAsCacheObjects;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/a695c5d3/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/ClusterGroupExpression.java
----------------------------------------------------------------------
diff --git 
a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/ClusterGroupExpression.java
 
b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/ClusterGroupExpression.java
new file mode 100644
index 0000000..b9286a1c
--- /dev/null
+++ 
b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/ClusterGroupExpression.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.ignite;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.cluster.ClusterGroup;
+
+/**
+ * Implement this interface to create an expression that returns a {@link 
ClusterGroup} from the
+ * {@link Ignite} instance that was passed in.
+ */
+public interface ClusterGroupExpression {
+
+    ClusterGroup getClusterGroup(Ignite ignite);
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/a695c5d3/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/ClusterGroupExpressions.java
----------------------------------------------------------------------
diff --git 
a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/ClusterGroupExpressions.java
 
b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/ClusterGroupExpressions.java
new file mode 100644
index 0000000..5ec660c
--- /dev/null
+++ 
b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/ClusterGroupExpressions.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.ignite;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.cluster.ClusterGroup;
+
+/**
+ * Convenient set of commonly used {@link ClusterGroupExpression}s.
+ */
+public final class ClusterGroupExpressions {
+    
+    public static final ClusterGroupExpression FOR_CLIENTS = new 
ClusterGroupExpression() {
+        @Override
+        public ClusterGroup getClusterGroup(Ignite ignite) {
+            return ignite.cluster().forClients();
+        }
+    };
+
+    public static final ClusterGroupExpression FOR_DAEMONS = new 
ClusterGroupExpression() {
+        @Override
+        public ClusterGroup getClusterGroup(Ignite ignite) {
+            return ignite.cluster().forDaemons();
+        }
+    };
+
+    public static final ClusterGroupExpression FOR_LOCAL = new 
ClusterGroupExpression() {
+        @Override
+        public ClusterGroup getClusterGroup(Ignite ignite) {
+            return ignite.cluster().forLocal();
+        }
+    };
+
+    public static final ClusterGroupExpression FOR_OLDEST = new 
ClusterGroupExpression() {
+        @Override
+        public ClusterGroup getClusterGroup(Ignite ignite) {
+            return ignite.cluster().forOldest();
+        }
+    };
+
+    public static final ClusterGroupExpression FOR_YOUNGEST = new 
ClusterGroupExpression() {
+        @Override
+        public ClusterGroup getClusterGroup(Ignite ignite) {
+            return ignite.cluster().forYoungest();
+        }
+    };
+
+    public static final ClusterGroupExpression FOR_RANDOM = new 
ClusterGroupExpression() {
+        @Override
+        public ClusterGroup getClusterGroup(Ignite ignite) {
+            return ignite.cluster().forRandom();
+        }
+    };
+
+    public static final ClusterGroupExpression FOR_REMOTES = new 
ClusterGroupExpression() {
+        @Override
+        public ClusterGroup getClusterGroup(Ignite ignite) {
+            return ignite.cluster().forRemotes();
+        }
+    };
+
+    public static final ClusterGroupExpression FOR_SERVERS = new 
ClusterGroupExpression() {
+        @Override
+        public ClusterGroup getClusterGroup(Ignite ignite) {
+            return ignite.cluster().forServers();
+        }
+    };
+
+    private ClusterGroupExpressions() { }
+    
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/a695c5d3/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/IgniteComponent.java
----------------------------------------------------------------------
diff --git 
a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/IgniteComponent.java
 
b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/IgniteComponent.java
new file mode 100644
index 0000000..587d9be
--- /dev/null
+++ 
b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/IgniteComponent.java
@@ -0,0 +1,240 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.ignite;
+
+import java.io.InputStream;
+import java.net.MalformedURLException;
+import java.net.URI;
+import java.net.URL;
+import java.util.Map;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.component.ignite.cache.IgniteCacheEndpoint;
+import org.apache.camel.component.ignite.compute.IgniteComputeEndpoint;
+import org.apache.camel.component.ignite.events.IgniteEventsEndpoint;
+import org.apache.camel.component.ignite.idgen.IgniteIdGenEndpoint;
+import org.apache.camel.component.ignite.messaging.IgniteMessagingEndpoint;
+import org.apache.camel.component.ignite.queue.IgniteQueueEndpoint;
+import org.apache.camel.component.ignite.set.IgniteSetEndpoint;
+import org.apache.camel.impl.UriEndpointComponent;
+import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.URISupport;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.configuration.IgniteConfiguration;
+
+/**
+ * The Ignite Component integrates Apache Camel with Apache Ignite, providing 
endpoints for the following functions:
+ * <ul>
+ * <li>Cache operations.</li>
+ * <li>Cluster computation.</li>
+ * <li>Messaging.</li>
+ * <li>Eventing.</li>
+ * <li>Id Generation.</li>
+ * <li>Set operations.</li>
+ * <li>Queue operations.</li>
+ * </ul>
+ */
+public class IgniteComponent extends UriEndpointComponent {
+
+    /**
+     * Modes of managing the underlying {@link Ignite} instance. 
+     */
+    public enum IgniteLifecycleMode {
+        USER_MANAGED, COMPONENT_MANAGED
+    }
+
+    /** Ignite configuration. */
+    private IgniteConfiguration igniteConfiguration;
+
+    /** Resource from where to load configuration. */
+    private Object configurationResource;
+
+    /** Ignite instance. */
+    private Ignite ignite;
+
+    /** How the Ignite lifecycle is managed. */
+    private IgniteLifecycleMode lifecycleMode = 
IgniteLifecycleMode.COMPONENT_MANAGED;
+
+    public IgniteComponent() {
+        super(AbstractIgniteEndpoint.class);
+    }
+
+    public static IgniteComponent fromIgnite(Ignite ignite) {
+        IgniteComponent answer = new IgniteComponent();
+        answer.setIgnite(ignite);
+        return answer;
+    }
+
+    public static IgniteComponent fromConfiguration(IgniteConfiguration 
configuration) {
+        IgniteComponent answer = new IgniteComponent();
+        answer.setIgniteConfiguration(configuration);
+        return answer;
+    }
+
+    public static IgniteComponent fromInputStream(InputStream inputStream) {
+        IgniteComponent answer = new IgniteComponent();
+        answer.setConfigurationResource(inputStream);
+        return answer;
+    }
+
+    public static IgniteComponent fromUrl(URL url) {
+        IgniteComponent answer = new IgniteComponent();
+        answer.setConfigurationResource(url);
+        return answer;
+    }
+
+    public static IgniteComponent fromLocation(String location) {
+        IgniteComponent answer = new IgniteComponent();
+        answer.setConfigurationResource(location);
+        return answer;
+    }
+
+    @Override
+    protected Endpoint createEndpoint(String uri, String remaining, 
Map<String, Object> parameters) throws Exception {
+        ObjectHelper.notNull(getCamelContext(), "Camel Context");
+
+        AbstractIgniteEndpoint answer = null;
+        URI remainingUri = new URI(URISupport.normalizeUri(remaining));
+        String scheme = remainingUri.getScheme();
+
+        switch (scheme) {
+        case "cache":
+            answer = new IgniteCacheEndpoint(uri, remainingUri, parameters, 
this);
+            break;
+        case "compute":
+            answer = new IgniteComputeEndpoint(uri, remainingUri, parameters, 
this);
+            break;
+        case "messaging":
+            answer = new IgniteMessagingEndpoint(uri, remainingUri, 
parameters, this);
+            break;
+        case "events":
+            answer = new IgniteEventsEndpoint(uri, remainingUri, parameters, 
this);
+            break;
+        case "set":
+            answer = new IgniteSetEndpoint(uri, remainingUri, parameters, 
this);
+            break;
+        case "idgen":
+            answer = new IgniteIdGenEndpoint(uri, remainingUri, parameters, 
this);
+            break;
+        case "queue":
+            answer = new IgniteQueueEndpoint(uri, remainingUri, parameters, 
this);
+            break;
+            
+        default:
+            throw new MalformedURLException("An invalid Ignite endpoint URI 
was provided. Please check that "
+                    + "it starts with:" + " 
ignite:[cache/compute/messaging/...]:...");
+        }
+
+        setProperties(answer, parameters);
+
+        return answer;
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        super.doStart();
+
+        if (lifecycleMode == IgniteLifecycleMode.USER_MANAGED) {
+            return;
+        }
+
+        // Try to load the configuration from the resource.
+        if (configurationResource != null) {
+            if (configurationResource instanceof URL) {
+                ignite = Ignition.start((URL) configurationResource);
+            } else if (configurationResource instanceof InputStream) {
+                ignite = Ignition.start((InputStream) configurationResource);
+            } else if (configurationResource instanceof String) {
+                ignite = Ignition.start((String) configurationResource);
+            } else {
+                throw new IllegalStateException("An unsupported configuration 
resource was provided to the Ignite component. " + "Supported types are: URL, 
InputStream, String.");
+            }
+        } else if (igniteConfiguration != null) {
+            ignite = Ignition.start(igniteConfiguration);
+        } else {
+            throw new IllegalStateException("No configuration resource or 
IgniteConfiguration was provided to the Ignite component.");
+        }
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        super.doStop();
+
+        if (lifecycleMode == IgniteLifecycleMode.USER_MANAGED) {
+            return;
+        }
+
+        ignite.close();
+    }
+
+    /**
+     * Returns the {@link Ignite} instance.
+     * 
+     * @return
+     */
+    public Ignite getIgnite() {
+        return ignite;
+    }
+
+    /**
+     * Sets the {@link Ignite} instance.
+     * 
+     * @param ignite
+     */
+    public void setIgnite(Ignite ignite) {
+        this.ignite = ignite;
+    }
+
+    /**
+     * Gets the resource from where to load the configuration. It can be a: 
{@link URI}, {@link String} (URI) 
+     * or an {@link InputStream}.
+     * 
+     * @return
+     */
+    public Object getConfigurationResource() {
+        return configurationResource;
+    }
+
+    /**
+     * Sets the resource from where to load the configuration. It can be a: 
{@link URI}, {@link String} (URI) 
+     * or an {@link InputStream}.
+     * 
+     * @param configurationResource
+     */
+    public void setConfigurationResource(Object configurationResource) {
+        this.configurationResource = configurationResource;
+    }
+
+    /**
+     * Gets the {@link IgniteConfiguration} if the user set it explicitly.
+     * @return
+     */
+    public IgniteConfiguration getIgniteConfiguration() {
+        return igniteConfiguration;
+    }
+
+    /**
+     * Allows the user to set a programmatic {@link IgniteConfiguration}.
+     * 
+     * @param igniteConfiguration
+     */
+    public void setIgniteConfiguration(IgniteConfiguration 
igniteConfiguration) {
+        this.igniteConfiguration = igniteConfiguration;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/a695c5d3/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/IgniteConstants.java
----------------------------------------------------------------------
diff --git 
a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/IgniteConstants.java
 
b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/IgniteConstants.java
new file mode 100644
index 0000000..e9b9256
--- /dev/null
+++ 
b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/IgniteConstants.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.ignite;
+
+/**
+ * Ignite Component constants.
+ */
+public final class IgniteConstants {
+
+    // Ignite Cache.
+    public static final String IGNITE_CACHE_KEY = "CamelIgniteCacheKey";
+    public static final String IGNITE_CACHE_QUERY = "CamelIgniteCacheQuery";
+    public static final String IGNITE_CACHE_OPERATION = 
"CamelIgniteCacheOperation";
+    public static final String IGNITE_CACHE_PEEK_MODE = 
"CamelIgniteCachePeekMode";
+    public static final String IGNITE_CACHE_EVENT_TYPE = 
"CamelIgniteCacheEventType";
+    public static final String IGNITE_CACHE_NAME = "CamelIgniteCacheName";
+    public static final String IGNITE_CACHE_OLD_VALUE = 
"CamelIgniteCacheOldValue";
+
+    // Ignite Messaging.
+    public static final String IGNITE_MESSAGING_TOPIC = 
"CamelIgniteMessagingTopic";
+    public static final String IGNITE_MESSAGING_UUID = 
"CamelIgniteMessagingUUID";
+
+    // Ignite Compute.
+    public static final String IGNITE_COMPUTE_EXECUTION_TYPE = 
"CamelIgniteComputeExecutionType";
+    public static final String IGNITE_COMPUTE_PARAMS = 
"CamelIgniteComputeParameters";
+    public static final String IGNITE_COMPUTE_REDUCER = 
"CamelIgniteComputeReducer";
+    public static final String IGNITE_COMPUTE_AFFINITY_CACHE_NAME = 
"CamelIgniteComputeAffinityCacheName";
+    public static final String IGNITE_COMPUTE_AFFINITY_KEY = 
"CamelIgniteComputeAffinityKey";
+
+    // Ignite Sets.
+    public static final String IGNITE_SETS_OPERATION = 
"CamelIgniteSetsOperation";
+    public static final String IGNITE_SETS_OPERATION_RESULT = 
"CamelIgniteSetsOperationResult";
+
+    // Ignite ID Gen.
+    public static final String IGNITE_IDGEN_OPERATION = 
"CamelIgniteIdGenOperation";
+
+    // Ignite Queues.
+    public static final String IGNITE_QUEUE_OPERATION = 
"CamelIgniteQueueOperation";
+    public static final String IGNITE_QUEUE_MAX_ELEMENTS = 
"CamelIgniteQueueMaxElements";
+    public static final String IGNITE_QUEUE_TRANSFERRED_COUNT = 
"CamelIgniteQueueTransferredCount";
+    public static final String IGNITE_QUEUE_TIMEOUT_MILLIS = 
"CamelIgniteQueueTimeoutMillis";
+    
+    private IgniteConstants() { }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/a695c5d3/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/IgniteHelper.java
----------------------------------------------------------------------
diff --git 
a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/IgniteHelper.java
 
b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/IgniteHelper.java
new file mode 100644
index 0000000..05099f4
--- /dev/null
+++ 
b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/IgniteHelper.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.ignite;
+
+import org.apache.camel.Message;
+
+/**
+ * Helper methods for the Ignite component.
+ */
+public final class IgniteHelper {
+
+    private IgniteHelper() {
+        
+    }
+
+    public static void maybePropagateIncomingBody(AbstractIgniteEndpoint 
endpoint, Message in, Message out) {
+        if (!endpoint.isPropagateIncomingBodyIfNoReturnValue()) {
+            return;
+        }
+
+        out.setBody(in.getBody());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/a695c5d3/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/cache/IgniteCacheContinuousQueryConsumer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/cache/IgniteCacheContinuousQueryConsumer.java
 
b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/cache/IgniteCacheContinuousQueryConsumer.java
new file mode 100644
index 0000000..e4c7302
--- /dev/null
+++ 
b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/cache/IgniteCacheContinuousQueryConsumer.java
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.ignite.cache;
+
+import javax.cache.Cache.Entry;
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryListenerException;
+import javax.cache.event.CacheEntryUpdatedListener;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.Message;
+import org.apache.camel.Processor;
+import org.apache.camel.component.ignite.IgniteConstants;
+import org.apache.camel.impl.DefaultConsumer;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A consumer that generates {@link Exchange}s for items received from a 
continuous query.
+ */
+public class IgniteCacheContinuousQueryConsumer extends DefaultConsumer {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(IgniteCacheContinuousQueryConsumer.class);
+
+    private IgniteCacheEndpoint endpoint;
+
+    private IgniteCache<Object, Object> cache;
+
+    private QueryCursor<Entry<Object, Object>> cursor;
+
+    public IgniteCacheContinuousQueryConsumer(IgniteCacheEndpoint endpoint, 
Processor processor, IgniteCache<Object, Object> cache) {
+        super(endpoint, processor);
+        this.endpoint = endpoint;
+        this.cache = cache;
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        super.doStart();
+
+        launchContinuousQuery();
+
+        maybeFireExistingQueryResults();
+    }
+
+    private void maybeFireExistingQueryResults() {
+        if (!endpoint.isFireExistingQueryResults()) {
+            LOG.info(String.format("Skipping existing cache results for cache 
name = %s.", endpoint.getCacheName()));
+            return;
+        }
+
+        LOG.info(String.format("Processing existing cache results for cache 
name = %s.", endpoint.getCacheName()));
+
+        for (Entry<Object, Object> entry : cursor) {
+            Exchange exchange = createExchange(entry.getValue());
+            exchange.getIn().setHeader(IgniteConstants.IGNITE_CACHE_KEY, 
entry.getKey());
+            getAsyncProcessor().process(createExchange(entry), new 
AsyncCallback() {
+                @Override
+                public void done(boolean doneSync) {
+                    // do nothing
+                }
+            });
+        }
+    }
+
+    private void launchContinuousQuery() {
+        ContinuousQuery<Object, Object> continuousQuery = new 
ContinuousQuery<>();
+
+        if (endpoint.getQuery() != null) {
+            continuousQuery.setInitialQuery(endpoint.getQuery());
+        }
+
+        if (endpoint.getRemoteFilter() != null) {
+            continuousQuery.setRemoteFilter(endpoint.getRemoteFilter());
+        }
+
+        continuousQuery.setLocalListener(new CacheEntryUpdatedListener<Object, 
Object>() {
+            @Override
+            public void onUpdated(Iterable<CacheEntryEvent<? extends Object, ? 
extends Object>> events) throws CacheEntryListenerException {
+                if (!endpoint.isOneExchangePerUpdate()) {
+                    fireGroupedExchange(events);
+                    return;
+                }
+
+                for (CacheEntryEvent<? extends Object, ? extends Object> entry 
: events) {
+                    fireSingleExchange(entry);
+                }
+            }
+        });
+
+        continuousQuery.setAutoUnsubscribe(endpoint.isAutoUnsubscribe());
+        continuousQuery.setPageSize(endpoint.getPageSize());
+        continuousQuery.setTimeInterval(endpoint.getTimeInterval());
+
+        cursor = cache.query(continuousQuery);
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        super.doStop();
+
+        cursor.close();
+    }
+
+    private void fireSingleExchange(CacheEntryEvent<? extends Object, ? 
extends Object> entry) {
+        Exchange exchange = createExchange(entry.getValue());
+        exchange.getIn().setHeader(IgniteConstants.IGNITE_CACHE_EVENT_TYPE, 
entry.getEventType());
+        exchange.getIn().setHeader(IgniteConstants.IGNITE_CACHE_OLD_VALUE, 
entry.getOldValue());
+        exchange.getIn().setHeader(IgniteConstants.IGNITE_CACHE_KEY, 
entry.getKey());
+        getAsyncProcessor().process(exchange, new AsyncCallback() {
+            @Override
+            public void done(boolean doneSync) {
+                // do nothing
+            }
+        });
+    }
+
+    private void fireGroupedExchange(Iterable<CacheEntryEvent<? extends 
Object, ? extends Object>> events) {
+        Exchange exchange = createExchange(events);
+        getAsyncProcessor().process(exchange, new AsyncCallback() {
+            @Override
+            public void done(boolean doneSync) {
+                // do nothing
+            }
+        });
+    }
+
+    private Exchange createExchange(Object payload) {
+        Exchange exchange = endpoint.createExchange(ExchangePattern.InOnly);
+        Message in = exchange.getIn();
+        in.setBody(payload);
+        in.setHeader(IgniteConstants.IGNITE_CACHE_NAME, 
endpoint.getCacheName());
+        return exchange;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/a695c5d3/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/cache/IgniteCacheEndpoint.java
----------------------------------------------------------------------
diff --git 
a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/cache/IgniteCacheEndpoint.java
 
b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/cache/IgniteCacheEndpoint.java
new file mode 100644
index 0000000..2b22409
--- /dev/null
+++ 
b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/cache/IgniteCacheEndpoint.java
@@ -0,0 +1,311 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.ignite.cache;
+
+import java.net.URI;
+import java.util.Map;
+
+import javax.cache.Cache.Entry;
+
+import org.apache.camel.CamelException;
+import org.apache.camel.Consumer;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.component.ignite.AbstractIgniteEndpoint;
+import org.apache.camel.component.ignite.IgniteComponent;
+import org.apache.camel.spi.Metadata;
+import org.apache.camel.spi.UriEndpoint;
+import org.apache.camel.spi.UriParam;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
+import org.apache.ignite.cache.CachePeekMode;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cache.query.Query;
+
+/**
+ * Ignite Cache endpoint.
+ */
+@UriEndpoint(scheme = "ignite:cache", title = "Ignite Cache", syntax = 
"ignite:cache:[cacheName]", label = "nosql,cache,compute", 
+    consumerClass = IgniteCacheContinuousQueryConsumer.class)
+public class IgniteCacheEndpoint extends AbstractIgniteEndpoint {
+
+    @UriParam
+    @Metadata(required = "true")
+    private String cacheName;
+
+    @UriParam
+    private IgniteCacheOperation operation;
+
+    @UriParam
+    private boolean failIfInexistentCache;
+
+    @UriParam
+    private CachePeekMode cachePeekMode = CachePeekMode.ALL;
+
+    @UriParam
+    private Query<Entry<Object, Object>> query;
+
+    @UriParam
+    private CacheEntryEventSerializableFilter<Object, Object> remoteFilter;
+
+    @UriParam
+    private boolean oneExchangePerUpdate = true;
+
+    @UriParam
+    private boolean fireExistingQueryResults;
+
+    @UriParam
+    private boolean autoUnsubscribe = ContinuousQuery.DFLT_AUTO_UNSUBSCRIBE;
+
+    @UriParam
+    private int pageSize = ContinuousQuery.DFLT_PAGE_SIZE;
+
+    @UriParam
+    private long timeInterval = ContinuousQuery.DFLT_TIME_INTERVAL;
+
+    public IgniteCacheEndpoint(String endpointUri, URI remainingUri, 
Map<String, Object> parameters, IgniteComponent igniteComponent) {
+        super(endpointUri, igniteComponent);
+        cacheName = remainingUri.getHost();
+    }
+
+    @Override
+    public Producer createProducer() throws Exception {
+        return new IgniteCacheProducer(this, obtainCache());
+    }
+
+    @Override
+    public Consumer createConsumer(Processor processor) throws Exception {
+        return new IgniteCacheContinuousQueryConsumer(this, processor, 
obtainCache());
+    }
+
+    private IgniteCache<Object, Object> obtainCache() throws CamelException {
+        IgniteCache<Object, Object> cache = ignite().cache(cacheName);
+        if (cache == null) {
+            if (failIfInexistentCache) {
+                throw new CamelException(String.format("Ignite cache %s 
doesn't exist, and failIfInexistentCache is true", cacheName));
+            }
+            cache = ignite().createCache(cacheName);
+        }
+
+        return cache;
+    }
+
+    /**
+     * Gets the cache name.
+     * 
+     * @return
+     */
+    public String getCacheName() {
+        return cacheName;
+    }
+
+    /**
+     * Sets the cache name.
+     * 
+     * @param cacheName
+     */
+    public void setCacheName(String cacheName) {
+        this.cacheName = cacheName;
+    }
+
+    /**
+     * Gets the cache operation to invoke.
+     * 
+     * @return
+     */
+    public IgniteCacheOperation getOperation() {
+        return operation;
+    }
+
+    /**
+     * Sets the cache operation to invoke.
+     * 
+     * @param operation
+     */
+    public void setOperation(IgniteCacheOperation operation) {
+        this.operation = operation;
+    }
+
+    /**
+     * Gets whether to fail the initialization if the cache doesn't exist.
+     * 
+     * @return
+     */
+    public boolean isFailIfInexistentCache() {
+        return failIfInexistentCache;
+    }
+
+    /**
+     * Sets whether to fail the initialization if the cache doesn't exist.
+     * 
+     * @param failIfInexistentCache
+     */
+    public void setFailIfInexistentCache(boolean failIfInexistentCache) {
+        this.failIfInexistentCache = failIfInexistentCache;
+    }
+
+    /**
+     * Gets the {@link CachePeekMode}, only needed for operations that require 
it ({@link IgniteCacheOperation#SIZE}).
+     * 
+     * @return
+     */
+    public CachePeekMode getCachePeekMode() {
+        return cachePeekMode;
+    }
+
+    /**
+     * Sets the {@link CachePeekMode}, only needed for operations that require 
it ({@link IgniteCacheOperation#SIZE}).
+     * 
+     * @param cachePeekMode
+     */
+    public void setCachePeekMode(CachePeekMode cachePeekMode) {
+        this.cachePeekMode = cachePeekMode;
+    }
+
+    /**
+     * Gets the query to execute, only needed for operations that require it, 
+     * and for the Continuous Query Consumer.
+     * 
+     * @return
+     */
+    public Query<Entry<Object, Object>> getQuery() {
+        return query;
+    }
+
+    /**
+     * Sets the query to execute, only needed for operations that require it,
+     * and for the Continuous Query Consumer.
+     * 
+     * @param query
+     */
+    public void setQuery(Query<Entry<Object, Object>> query) {
+        this.query = query;
+    }
+
+    /**
+     * Gets the remote filter, only used by the Continuous Query Consumer.
+     * 
+     * @return
+     */
+    public CacheEntryEventSerializableFilter<Object, Object> getRemoteFilter() 
{
+        return remoteFilter;
+    }
+
+    /**
+     * Sets the remote filter, only used by the Continuous Query Consumer.
+     * 
+     * @param remoteFilter
+     */
+    public void setRemoteFilter(CacheEntryEventSerializableFilter<Object, 
Object> remoteFilter) {
+        this.remoteFilter = remoteFilter;
+    }
+
+    /**
+     * Gets whether to pack each update in an individual Exchange, even if 
multiple updates are
+     * received in one batch. Only used by the Continuous Query Consumer.
+     * 
+     * @return
+     */
+    public boolean isOneExchangePerUpdate() {
+        return oneExchangePerUpdate;
+    }
+
+    /**
+     * Sets whether to pack each update in an individual Exchange, even if 
multiple updates are
+     * received in one batch. Only used by the Continuous Query Consumer.
+     * 
+     * @param oneExchangePerUpdate
+     */
+    public void setOneExchangePerUpdate(boolean oneExchangePerUpdate) {
+        this.oneExchangePerUpdate = oneExchangePerUpdate;
+    }
+
+    /**
+     * Gets whether auto unsubscribe is enabled in the Continuous Query 
Consumer.
+     * 
+     * @return
+     */
+    public boolean isAutoUnsubscribe() {
+        return autoUnsubscribe;
+    }
+
+    /**
+     * Sets whether auto unsubscribe is enabled in the Continuous Query 
Consumer.
+     * 
+     * @param autoUnsubscribe
+     */
+    public void setAutoUnsubscribe(boolean autoUnsubscribe) {
+        this.autoUnsubscribe = autoUnsubscribe;
+    }
+
+    /**
+     * Gets the page size. Only used by the Continuous Query Consumer.
+     * 
+     * @return
+     */
+    public int getPageSize() {
+        return pageSize;
+    }
+
+    /**
+     * Sets the page size. Only used by the Continuous Query Consumer.
+     * 
+     * @param pageSize
+     */
+    public void setPageSize(int pageSize) {
+        this.pageSize = pageSize;
+    }
+
+    /**
+     * Gets whether to process existing results that match the query. Used on 
initialization of 
+     * the Continuous Query Consumer.
+     * 
+     * @return
+     */
+    public boolean isFireExistingQueryResults() {
+        return fireExistingQueryResults;
+    }
+
+    /**
+     * Sets whether to process existing results that match the query. Used on 
initialization of 
+     * the Continuous Query Consumer.
+     * 
+     * @param fireExistingQueryResults
+     */
+    public void setFireExistingQueryResults(boolean fireExistingQueryResults) {
+        this.fireExistingQueryResults = fireExistingQueryResults;
+    }
+
+    /**
+     * Gets the time interval for the Continuous Query Consumer.
+     * 
+     * @return
+     */
+    public long getTimeInterval() {
+        return timeInterval;
+    }
+
+    /**
+     * Sets the time interval for the Continuous Query Consumer.
+     * 
+     * @param timeInterval
+     */
+    public void setTimeInterval(long timeInterval) {
+        this.timeInterval = timeInterval;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/a695c5d3/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/cache/IgniteCacheOperation.java
----------------------------------------------------------------------
diff --git 
a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/cache/IgniteCacheOperation.java
 
b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/cache/IgniteCacheOperation.java
new file mode 100644
index 0000000..636df73
--- /dev/null
+++ 
b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/cache/IgniteCacheOperation.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.ignite.cache;
+
+/**
+ * Enumeration of Ignite Cache operations.
+ */
+public enum IgniteCacheOperation {
+
+    GET, PUT, REMOVE, SIZE, REBALANCE, QUERY, CLEAR
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/a695c5d3/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/cache/IgniteCacheProducer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/cache/IgniteCacheProducer.java
 
b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/cache/IgniteCacheProducer.java
new file mode 100644
index 0000000..7ad3f86
--- /dev/null
+++ 
b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/cache/IgniteCacheProducer.java
@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.ignite.cache;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+import org.apache.camel.InvalidPayloadException;
+import org.apache.camel.Message;
+import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.component.ignite.IgniteConstants;
+import org.apache.camel.component.ignite.IgniteHelper;
+import org.apache.camel.impl.DefaultAsyncProducer;
+import org.apache.camel.spi.Synchronization;
+import org.apache.camel.util.MessageHelper;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CachePeekMode;
+import org.apache.ignite.cache.query.Query;
+import org.apache.ignite.cache.query.QueryCursor;
+
+/**
+ * Ignite Cache producer.
+ */
+public class IgniteCacheProducer extends DefaultAsyncProducer {
+
+    private IgniteCache<Object, Object> cache;
+    private IgniteCacheEndpoint endpoint;
+
+    public IgniteCacheProducer(IgniteCacheEndpoint endpoint, 
IgniteCache<Object, Object> igniteCache) {
+        super(endpoint);
+        this.endpoint = endpoint;
+        this.cache = igniteCache;
+    }
+
+    @Override
+    public boolean process(Exchange exchange, AsyncCallback callback) {
+        Message in = exchange.getIn();
+        Message out = exchange.getOut();
+        MessageHelper.copyHeaders(exchange.getIn(), out, true);
+
+        switch (cacheOperationFor(exchange)) {
+
+        case GET:
+            doGet(in, out);
+            break;
+
+        case PUT:
+            doPut(in, out);
+            break;
+
+        case QUERY:
+            doQuery(in, out, exchange);
+            break;
+
+        case REMOVE:
+            doRemove(in, out);
+            break;
+
+        case CLEAR:
+            doClear(in, out);
+            break;
+
+        case SIZE:
+            doSize(in, out);
+            break;
+
+        case REBALANCE:
+            doRebalance(in, out);
+            break;
+
+        default:
+            break;
+        }
+
+        return true;
+    }
+
+    @SuppressWarnings("unchecked")
+    private void doGet(Message in, Message out) {
+        Object cacheKey = cacheKey(in);
+
+        if (cacheKey instanceof Set && 
!endpoint.isTreatCollectionsAsCacheObjects()) {
+            out.setBody(cache.getAll((Set<Object>) cacheKey));
+        } else {
+            out.setBody(cache.get(cacheKey));
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    private void doPut(Message in, Message out) {
+        Map<Object, Object> map = in.getBody(Map.class);
+
+        if (map != null) {
+            cache.putAll(map);
+            return;
+        }
+
+        Object cacheKey = in.getHeader(IgniteConstants.IGNITE_CACHE_KEY);
+
+        if (cacheKey == null) {
+            throw new RuntimeCamelException("Cache PUT operation requires the 
cache key in the CamelIgniteCacheKey header, " + "or a payload of type Map.");
+        }
+
+        cache.put(cacheKey, in.getBody());
+
+        IgniteHelper.maybePropagateIncomingBody(endpoint, in, out);
+    }
+
+    @SuppressWarnings("unchecked")
+    private void doQuery(Message in, Message out, Exchange exchange) {
+        Query<Object> query = in.getHeader(IgniteConstants.IGNITE_CACHE_QUERY, 
Query.class);
+
+        if (query == null) {
+            try {
+                query = in.getMandatoryBody(Query.class);
+            } catch (InvalidPayloadException e) {
+                exchange.setException(e);
+                return;
+            }
+        }
+
+        final QueryCursor<Object> cursor = cache.query(query);
+
+        out.setBody(cursor.iterator());
+
+        exchange.addOnCompletion(new Synchronization() {
+            @Override
+            public void onFailure(Exchange exchange) {
+                cursor.close();
+            }
+
+            @Override
+            public void onComplete(Exchange exchange) {
+                cursor.close();
+            }
+        });
+
+    }
+
+    @SuppressWarnings("unchecked")
+    private void doRemove(Message in, Message out) {
+        Object cacheKey = cacheKey(in);
+
+        if (cacheKey instanceof Set && 
!endpoint.isTreatCollectionsAsCacheObjects()) {
+            cache.removeAll((Set<Object>) cacheKey);
+        } else {
+            cache.remove(cacheKey);
+        }
+
+        IgniteHelper.maybePropagateIncomingBody(endpoint, in, out);
+    }
+
+    private void doClear(Message in, Message out) {
+        cache.removeAll();
+
+        IgniteHelper.maybePropagateIncomingBody(endpoint, in, out);
+    }
+
+    private void doRebalance(Message in, Message out) {
+        cache.rebalance().get();
+
+        IgniteHelper.maybePropagateIncomingBody(endpoint, in, out);
+    }
+
+    @SuppressWarnings("unchecked")
+    private void doSize(Message in, Message out) {
+        Object peekMode = in.getHeader(IgniteConstants.IGNITE_CACHE_PEEK_MODE, 
endpoint.getCachePeekMode());
+
+        Integer result = null;
+        if (peekMode instanceof Collection) {
+            result = cache.size(((Collection<Object>) peekMode).toArray(new 
CachePeekMode[0]));
+        } else if (peekMode instanceof CachePeekMode) {
+            result = cache.size((CachePeekMode) peekMode);
+        }
+
+        out.setBody(result);
+    }
+
+    private Object cacheKey(Message msg) {
+        Object cacheKey = msg.getHeader(IgniteConstants.IGNITE_CACHE_KEY);
+        if (cacheKey == null) {
+            cacheKey = msg.getBody();
+        }
+        return cacheKey;
+    }
+
+    private IgniteCacheOperation cacheOperationFor(Exchange exchange) {
+        return 
exchange.getIn().getHeader(IgniteConstants.IGNITE_CACHE_OPERATION, 
endpoint.getOperation(), IgniteCacheOperation.class);
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/a695c5d3/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/compute/IgniteComputeEndpoint.java
----------------------------------------------------------------------
diff --git 
a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/compute/IgniteComputeEndpoint.java
 
b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/compute/IgniteComputeEndpoint.java
new file mode 100644
index 0000000..1dc0663
--- /dev/null
+++ 
b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/compute/IgniteComputeEndpoint.java
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.ignite.compute;
+
+import java.net.URI;
+import java.util.Map;
+
+import org.apache.camel.Consumer;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.component.ignite.AbstractIgniteEndpoint;
+import org.apache.camel.component.ignite.ClusterGroupExpression;
+import org.apache.camel.component.ignite.IgniteComponent;
+import org.apache.camel.spi.UriEndpoint;
+import org.apache.camel.spi.UriParam;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCompute;
+
+/**
+ * Ignite Compute endpoint.
+ */
+@UriEndpoint(scheme = "ignite:compute", title = "Ignite Compute", syntax = 
"ignite:compute:endpointId", label = "nosql,cache,compute", producerOnly = true)
+public class IgniteComputeEndpoint extends AbstractIgniteEndpoint {
+
+    @UriParam
+    private ClusterGroupExpression clusterGroupExpression;
+
+    @UriParam
+    private IgniteComputeExecutionType executionType;
+
+    @UriParam
+    private String taskName;
+
+    @UriParam
+    private String computeName;
+
+    @UriParam
+    private Long timeoutMillis;
+
+    public IgniteComputeEndpoint(String uri, URI remainingUri, Map<String, 
Object> parameters, IgniteComponent igniteComponent) throws 
ClassNotFoundException {
+        super(uri, igniteComponent);
+    }
+
+    @Override
+    public Producer createProducer() throws Exception {
+        return new IgniteComputeProducer(this);
+    }
+
+    @Override
+    public Consumer createConsumer(Processor processor) throws Exception {
+        throw new UnsupportedOperationException("The Ignite Compute endpoint 
does not support consumers.");
+    }
+
+    public IgniteCompute createIgniteCompute() {
+        Ignite ignite = ignite();
+        IgniteCompute compute = clusterGroupExpression == null ? 
ignite.compute() : 
ignite.compute(clusterGroupExpression.getClusterGroup(ignite));
+
+        if (computeName != null) {
+            compute = compute.withName(computeName);
+        }
+
+        if (timeoutMillis != null) {
+            compute = compute.withTimeout(timeoutMillis);
+        }
+
+        return compute;
+    }
+
+    /**
+     * Gets the execution type of this producer.
+     * 
+     * @return
+     */
+    public IgniteComputeExecutionType getExecutionType() {
+        return executionType;
+    }
+
+    /**
+     * Sets the execution type of this producer.
+     * 
+     * @param executionType
+     */
+    public void setExecutionType(IgniteComputeExecutionType executionType) {
+        this.executionType = executionType;
+    }
+
+    /**
+     * Gets the task name, only applicable if using the {@link 
IgniteComputeExecutionType#EXECUTE} execution type.
+     * 
+     * @return
+     */
+    public String getTaskName() {
+        return taskName;
+    }
+
+    /**
+     * Sets the task name, only applicable if using the {@link 
IgniteComputeExecutionType#EXECUTE} execution type.
+     * 
+     * @param taskName
+     */
+    public void setTaskName(String taskName) {
+        this.taskName = taskName;
+    }
+
+    /**
+     * Gets the name of the compute job, which will be set via {@link 
IgniteCompute#withName(String)}. 
+     * 
+     * @return
+     */
+    public String getComputeName() {
+        return computeName;
+    }
+
+    /**
+     * Sets the name of the compute job, which will be set via {@link 
IgniteCompute#withName(String)}.
+     * 
+     * @param computeName
+     */
+    public void setComputeName(String computeName) {
+        this.computeName = computeName;
+    }
+
+    /**
+     * Gets the timeout interval for triggered jobs, in milliseconds, which 
will be set via {@link IgniteCompute#withTimeout(long)}.
+     * 
+     * @return
+     */
+    public Long getTimeoutMillis() {
+        return timeoutMillis;
+    }
+
+    /**
+     * Sets the timeout interval for triggered jobs, in milliseconds, which 
will be set via {@link IgniteCompute#withTimeout(long)}.
+     * 
+     * @param timeoutMillis
+     */
+    public void setTimeoutMillis(Long timeoutMillis) {
+        this.timeoutMillis = timeoutMillis;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/a695c5d3/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/compute/IgniteComputeExecutionType.java
----------------------------------------------------------------------
diff --git 
a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/compute/IgniteComputeExecutionType.java
 
b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/compute/IgniteComputeExecutionType.java
new file mode 100644
index 0000000..f537a95
--- /dev/null
+++ 
b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/compute/IgniteComputeExecutionType.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.ignite.compute;
+
+/**
+ * Enumeration of execution type for the Ignite Compute producer.
+ */
+public enum IgniteComputeExecutionType {
+
+    CALL, BROADCAST, APPLY, EXECUTE, RUN, AFFINITY_CALL, AFFINITY_RUN
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/a695c5d3/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/compute/IgniteComputeProducer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/compute/IgniteComputeProducer.java
 
b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/compute/IgniteComputeProducer.java
new file mode 100644
index 0000000..d550a74
--- /dev/null
+++ 
b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/compute/IgniteComputeProducer.java
@@ -0,0 +1,275 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.ignite.compute;
+
+import java.util.ArrayList;
+import java.util.Collection;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.TypeConverter;
+import org.apache.camel.component.ignite.IgniteConstants;
+import org.apache.camel.impl.DefaultAsyncProducer;
+import org.apache.camel.util.MessageHelper;
+import org.apache.ignite.IgniteCompute;
+import org.apache.ignite.compute.ComputeTask;
+import org.apache.ignite.lang.IgniteCallable;
+import org.apache.ignite.lang.IgniteClosure;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.lang.IgniteReducer;
+import org.apache.ignite.lang.IgniteRunnable;
+
+/**
+ * Ignite Compute producer.
+ */
+public class IgniteComputeProducer extends DefaultAsyncProducer {
+
+    private IgniteComputeEndpoint endpoint;
+
+    public IgniteComputeProducer(IgniteComputeEndpoint endpoint) {
+        super(endpoint);
+        this.endpoint = endpoint;
+    }
+
+    @Override
+    public boolean process(Exchange exchange, AsyncCallback callback) {
+        IgniteCompute compute = endpoint.createIgniteCompute().withAsync();
+
+        try {
+            switch (executionTypeFor(exchange)) {
+            
+            case CALL:
+                doCall(exchange, callback, compute);
+                break;
+                
+            case BROADCAST:
+                doBroadcast(exchange, callback, compute);
+                break;
+                
+            case EXECUTE:
+                doExecute(exchange, callback, compute);
+                break;
+                
+            case RUN:
+                doRun(exchange, callback, compute);
+                break;
+                
+            case APPLY:
+                doApply(exchange, callback, compute);
+                break;
+                
+            case AFFINITY_CALL:
+                doAffinityCall(exchange, callback, compute);
+                break;
+                
+            case AFFINITY_RUN:
+                doAffinityRun(exchange, callback, compute);
+                break;
+                
+            default:
+                exchange.setException(new 
UnsupportedOperationException("Operation not supported by Ignite Compute 
producer."));
+                return true;
+            }
+
+            compute.future().listen(IgniteInCamelClosure.create(exchange, 
callback));
+
+        } catch (Exception e) {
+            exchange.setException(e);
+            return true;
+        }
+
+        return false;
+    }
+
+    @SuppressWarnings({ "unchecked", "rawtypes" })
+    private void doCall(final Exchange exchange, final AsyncCallback callback, 
IgniteCompute compute) throws Exception {
+        Object job = exchange.getIn().getBody();
+        IgniteReducer<Object, Object> reducer = 
exchange.getIn().getHeader(IgniteConstants.IGNITE_COMPUTE_REDUCER, 
IgniteReducer.class);
+
+        if (Collection.class.isAssignableFrom(job.getClass())) {
+            Collection<?> col = (Collection<?>) job;
+            TypeConverter tc = exchange.getContext().getTypeConverter();
+            Collection<IgniteCallable<?>> callables = new 
ArrayList<>(col.size());
+            for (Object o : col) {
+                callables.add(tc.mandatoryConvertTo(IgniteCallable.class, o));
+            }
+            if (reducer != null) {
+                compute.call((Collection) callables, reducer);
+            } else {
+                compute.call((Collection) callables);
+            }
+        } else if (IgniteCallable.class.isAssignableFrom(job.getClass())) {
+            compute.call((IgniteCallable<Object>) job);
+        } else {
+            throw new RuntimeCamelException(String.format(
+                    "Ignite Compute endpoint with CALL executionType is only " 
+ "supported for IgniteCallable payloads, or collections of them. The payload 
type was: %s.", job.getClass().getName()));
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    private void doBroadcast(final Exchange exchange, final AsyncCallback 
callback, IgniteCompute compute) throws Exception {
+        Object job = exchange.getIn().getBody();
+
+        if (IgniteCallable.class.isAssignableFrom(job.getClass())) {
+            compute.broadcast((IgniteCallable<?>) job);
+        } else if (IgniteRunnable.class.isAssignableFrom(job.getClass())) {
+            compute.broadcast((IgniteRunnable) job);
+        } else if (IgniteClosure.class.isAssignableFrom(job.getClass())) {
+            compute.broadcast((IgniteClosure<Object, Object>) job, 
exchange.getIn().getHeader(IgniteConstants.IGNITE_COMPUTE_PARAMS));
+        } else {
+            throw new RuntimeCamelException(
+                    String.format("Ignite Compute endpoint with BROADCAST 
executionType is only " + "supported for IgniteCallable, IgniteRunnable or 
IgniteClosure payloads. The payload type was: %s.",
+                            job.getClass().getName()));
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    private void doExecute(final Exchange exchange, final AsyncCallback 
callback, IgniteCompute compute) throws Exception {
+        Object job = exchange.getIn().getBody();
+        Object params = 
exchange.getIn().getHeader(IgniteConstants.IGNITE_COMPUTE_PARAMS);
+
+        if (job instanceof Class && 
ComputeTask.class.isAssignableFrom((Class<?>) job)) {
+            Class<? extends ComputeTask<Object, Object>> task = (Class<? 
extends ComputeTask<Object, Object>>) job;
+            compute.execute(task, params);
+        } else if (ComputeTask.class.isAssignableFrom(job.getClass())) {
+            compute.execute((ComputeTask<Object, Object>) job, params);
+        } else if (endpoint.getTaskName() != null) {
+            if (exchange.getIn().getBody() != null) {
+                params = exchange.getIn().getBody();
+            }
+            compute.execute(endpoint.getTaskName(), params);
+        } else {
+            throw new RuntimeCamelException(String.format("Ignite Compute 
endpoint with EXECUTE executionType is only "
+                    + "supported for ComputeTask payloads, Class<ComputeTask> 
or any payload in conjunction with the " + "task name option. The payload type 
was: %s.", job.getClass().getName()));
+        }
+    }
+
+    private void doRun(final Exchange exchange, final AsyncCallback callback, 
IgniteCompute compute) throws Exception {
+        Object job = exchange.getIn().getBody();
+
+        if (Collection.class.isAssignableFrom(job.getClass())) {
+            Collection<?> col = (Collection<?>) job;
+            TypeConverter tc = exchange.getContext().getTypeConverter();
+            Collection<IgniteRunnable> runnables = new ArrayList<>(col.size());
+            for (Object o : col) {
+                runnables.add(tc.mandatoryConvertTo(IgniteRunnable.class, o));
+            }
+            compute.run(runnables);
+        } else if (IgniteRunnable.class.isAssignableFrom(job.getClass())) {
+            compute.run((IgniteRunnable) job);
+        } else {
+            throw new RuntimeCamelException(String.format(
+                    "Ignite Compute endpoint with RUN executionType is only " 
+ "supported for IgniteRunnable payloads, or collections of them. The payload 
type was: %s.", job.getClass().getName()));
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    private  <T, R1, R2> void doApply(final Exchange exchange, final 
AsyncCallback callback, IgniteCompute compute) throws Exception {
+        IgniteClosure<T, R1> job = 
exchange.getIn().getBody(IgniteClosure.class);
+        T params = (T) 
exchange.getIn().getHeader(IgniteConstants.IGNITE_COMPUTE_PARAMS);
+
+        if (job == null || params == null) {
+            throw new RuntimeCamelException(
+                    String.format("Ignite Compute endpoint with APPLY 
executionType is only " + "supported for IgniteClosure payloads with 
parameters. The payload type was: %s.",
+                            exchange.getIn().getBody().getClass().getName()));
+        }
+
+        IgniteReducer<R1, R2> reducer = 
exchange.getIn().getHeader(IgniteConstants.IGNITE_COMPUTE_REDUCER, 
IgniteReducer.class);
+
+        if (Collection.class.isAssignableFrom(params.getClass())) {
+            Collection<T> colParams = (Collection<T>) params;
+            if (reducer == null) {
+                compute.apply(job, colParams);
+            } else {
+                compute.apply(job, colParams, reducer);
+            }
+        } else {
+            compute.apply(job, params);
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    private void doAffinityCall(final Exchange exchange, final AsyncCallback 
callback, IgniteCompute compute) throws Exception {
+        IgniteCallable<Object> job = 
exchange.getIn().getBody(IgniteCallable.class);
+        String affinityCache = 
exchange.getIn().getHeader(IgniteConstants.IGNITE_COMPUTE_AFFINITY_CACHE_NAME, 
String.class);
+        Object affinityKey = 
exchange.getIn().getHeader(IgniteConstants.IGNITE_COMPUTE_AFFINITY_KEY, 
Object.class);
+
+        if (job == null || affinityCache == null || affinityKey == null) {
+            throw new RuntimeCamelException(String.format(
+                    "Ignite Compute endpoint with AFFINITY_CALL executionType 
is only " + "supported for IgniteCallable payloads, along with an affinity 
cache and key. The payload type was: %s.",
+                    exchange.getIn().getBody().getClass().getName()));
+        }
+
+        compute.affinityCall(affinityCache, affinityKey, job);
+    }
+
+    private void doAffinityRun(final Exchange exchange, final AsyncCallback 
callback, IgniteCompute compute) throws Exception {
+        IgniteRunnable job = exchange.getIn().getBody(IgniteRunnable.class);
+        String affinityCache = 
exchange.getIn().getHeader(IgniteConstants.IGNITE_COMPUTE_AFFINITY_CACHE_NAME, 
String.class);
+        Object affinityKey = 
exchange.getIn().getHeader(IgniteConstants.IGNITE_COMPUTE_AFFINITY_KEY, 
Object.class);
+
+        if (job == null || affinityCache == null || affinityKey == null) {
+            throw new RuntimeCamelException(String.format(
+                    "Ignite Compute endpoint with AFFINITY_RUN executionType 
is only " + "supported for IgniteRunnable payloads, along with an affinity 
cache and key. The payload type was: %s.",
+                    exchange.getIn().getBody().getClass().getName()));
+        }
+
+        compute.affinityRun(affinityCache, affinityKey, job);
+    }
+
+    private IgniteComputeExecutionType executionTypeFor(Exchange exchange) {
+        return 
exchange.getIn().getHeader(IgniteConstants.IGNITE_COMPUTE_EXECUTION_TYPE, 
endpoint.getExecutionType(), IgniteComputeExecutionType.class);
+    }
+
+    private static class IgniteInCamelClosure implements 
IgniteInClosure<IgniteFuture<Object>> {
+        private static final long serialVersionUID = 7486030906412223384L;
+
+        private Exchange exchange;
+        private AsyncCallback callback;
+
+        private static IgniteInCamelClosure create(Exchange exchange, 
AsyncCallback callback) {
+            IgniteInCamelClosure answer = new IgniteInCamelClosure();
+            answer.exchange = exchange;
+            answer.callback = callback;
+            return answer;
+        }
+
+        @Override
+        public void apply(IgniteFuture<Object> future) {
+            Message in = exchange.getIn();
+            Message out = exchange.getOut();
+            MessageHelper.copyHeaders(in, out, true);
+
+            Object result = null;
+
+            try {
+                result = future.get();
+            } catch (Exception e) {
+                exchange.setException(e);
+                callback.done(false);
+                return;
+            }
+
+            exchange.getOut().setBody(result);
+            callback.done(false);
+        }
+    };
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/a695c5d3/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/events/IgniteEventsConsumer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/events/IgniteEventsConsumer.java
 
b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/events/IgniteEventsConsumer.java
new file mode 100644
index 0000000..5d63611
--- /dev/null
+++ 
b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/events/IgniteEventsConsumer.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.ignite.events;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.Message;
+import org.apache.camel.Processor;
+import org.apache.camel.impl.DefaultConsumer;
+import org.apache.ignite.IgniteEvents;
+import org.apache.ignite.events.Event;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Ignite Events consumer.
+ */
+public class IgniteEventsConsumer extends DefaultConsumer {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(IgniteEventsConsumer.class);
+
+    private IgniteEventsEndpoint endpoint;
+    private IgniteEvents events;
+    private int[] eventTypes = new int[0];
+
+    private IgnitePredicate<Event> predicate = new IgnitePredicate<Event>() {
+        private static final long serialVersionUID = 6738594728074592726L;
+
+        @Override
+        public boolean apply(Event event) {
+            Exchange exchange = 
endpoint.createExchange(ExchangePattern.InOnly);
+            Message in = exchange.getIn();
+            in.setBody(event);
+            try {
+                getAsyncProcessor().process(exchange, new AsyncCallback() {
+                    @Override
+                    public void done(boolean doneSync) {
+                        // do nothing
+                    }
+                });
+            } catch (Exception e) {
+                LOG.error(String.format("Exception while processing Ignite 
Event: %s.", event), e);
+            }
+            return true;
+        }
+    };
+
+    public IgniteEventsConsumer(IgniteEventsEndpoint endpoint, Processor 
processor, IgniteEvents events) {
+        super(endpoint, processor);
+        this.endpoint = endpoint;
+        this.events = events;
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        super.doStart();
+
+        if (endpoint.getEvents() != null && endpoint.getEvents().size() > 0) {
+            eventTypes = new int[endpoint.getEvents().size()];
+            int counter = 0;
+            for (Integer i : endpoint.getEvents()) {
+                eventTypes[counter++] = i;
+            }
+        }
+
+        events.localListen(predicate, eventTypes);
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        super.doStop();
+
+        events.stopLocalListen(predicate, eventTypes);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/a695c5d3/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/events/IgniteEventsEndpoint.java
----------------------------------------------------------------------
diff --git 
a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/events/IgniteEventsEndpoint.java
 
b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/events/IgniteEventsEndpoint.java
new file mode 100644
index 0000000..6237ad8
--- /dev/null
+++ 
b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/events/IgniteEventsEndpoint.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.ignite.events;
+
+import java.lang.reflect.Field;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.camel.Consumer;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.component.ignite.AbstractIgniteEndpoint;
+import org.apache.camel.component.ignite.ClusterGroupExpression;
+import org.apache.camel.component.ignite.IgniteComponent;
+import org.apache.camel.spi.UriEndpoint;
+import org.apache.camel.spi.UriParam;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteEvents;
+import org.apache.ignite.cluster.ClusterGroup;
+import org.apache.ignite.events.EventType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Ignite Events endpoint. Only supports consumers.
+ */
+@UriEndpoint(scheme = "ignite:events", title = "Ignite Events", syntax = 
"ignite:events:endpointId", label = "nosql,cache,compute,messaging,data", 
+    consumerOnly = true, consumerClass = IgniteEventsConsumer.class)
+public class IgniteEventsEndpoint extends AbstractIgniteEndpoint {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(IgniteEventsEndpoint.class);
+
+    @UriParam
+    private Set<Integer> events;
+
+    @UriParam
+    private ClusterGroupExpression clusterGroupExpression;
+
+    public IgniteEventsEndpoint(String uri, URI remainingUri, Map<String, 
Object> parameters, IgniteComponent igniteComponent) {
+        super(uri, igniteComponent);
+
+        // Initialize subscribed event types with ALL.
+        events = new HashSet<>();
+        for (Integer eventType : EventType.EVTS_ALL) {
+            events.add(eventType);
+        }
+    }
+
+    @Override
+    public Producer createProducer() throws Exception {
+        throw new UnsupportedOperationException("The Ignite Events endpoint 
does not support producers.");
+    }
+
+    @Override
+    public Consumer createConsumer(Processor processor) throws Exception {
+        // Initialize the Consumer.
+        IgniteEvents events = createIgniteEvents();
+        IgniteEventsConsumer consumer = new IgniteEventsConsumer(this, 
processor, events);
+        configureConsumer(consumer);
+
+        LOG.info("Created Ignite Events consumer for event types: {}.", 
events);
+
+        return consumer;
+    }
+
+    private IgniteEvents createIgniteEvents() {
+        Ignite ignite = ignite();
+        IgniteEvents events;
+        if (clusterGroupExpression == null) {
+            LOG.info("Ignite Events endpoint for event types {} using no 
Cluster Group.", this.events);
+            events = ignite.events();
+        } else {
+            ClusterGroup group = 
clusterGroupExpression.getClusterGroup(ignite);
+            LOG.info("Ignite Events endpoint for event types {} using Cluster 
Group: {}.", this.events, group);
+            events = ignite.events(group);
+        }
+        return events;
+    }
+
+    /**
+     * Gets the event types to subscribe to.
+     * 
+     * @return
+     */
+    public Set<Integer> getEvents() {
+        return events;
+    }
+
+    /**
+     * Sets the event types to subscribe to as a {@link Set}.
+     * 
+     * @param events
+     */
+    public void setEvents(Set<Integer> events) {
+        this.events = events;
+    }
+
+    /**
+     * Sets the event types to subscribe to as a comma-separated string of 
event constants as defined in {@link EventType}.
+     * <p>
+     * For example: 
EVT_CACHE_ENTRY_CREATED,EVT_CACHE_OBJECT_REMOVED,EVT_IGFS_DIR_CREATED.
+     * 
+     * @param events
+     */
+    public void setEvents(String events) {
+        this.events = new HashSet<>();
+        Set<String> requestedEvents = new 
HashSet<>(Arrays.asList(events.toUpperCase().split(",")));
+        Field[] fields = EventType.class.getDeclaredFields();
+        for (Field field : fields) {
+            if (!requestedEvents.contains(field.getName())) {
+                continue;
+            }
+            try {
+                this.events.add(field.getInt(null));
+            } catch (Exception e) {
+                throw new IllegalArgumentException("Problem while resolving 
event type. See stacktrace.", e);
+            }
+        }
+    }
+
+    public ClusterGroupExpression getClusterGroupExpression() {
+        return clusterGroupExpression;
+    }
+
+    public void setClusterGroupExpression(ClusterGroupExpression 
clusterGroupExpression) {
+        this.clusterGroupExpression = clusterGroupExpression;
+    }
+
+}

Reply via email to