CAMEL-9445 camel-ignite: New Apache Ignite component. Initial work.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/a695c5d3 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/a695c5d3 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/a695c5d3 Branch: refs/heads/camel-ignite Commit: a695c5d3656dcbac1e17b3109c7a6c340e8df7f0 Parents: ff8e1f1 Author: Raul Kripalani <ra...@apache.org> Authored: Wed Dec 23 13:26:42 2015 +0000 Committer: Raul Kripalani <ra...@apache.org> Committed: Wed Dec 23 15:18:57 2015 +0000 ---------------------------------------------------------------------- components/camel-ignite/pom.xml | 92 ++++++ .../ignite/AbstractIgniteEndpoint.java | 99 ++++++ .../ignite/ClusterGroupExpression.java | 30 ++ .../ignite/ClusterGroupExpressions.java | 85 +++++ .../camel/component/ignite/IgniteComponent.java | 240 ++++++++++++++ .../camel/component/ignite/IgniteConstants.java | 59 ++++ .../camel/component/ignite/IgniteHelper.java | 38 +++ .../IgniteCacheContinuousQueryConsumer.java | 155 +++++++++ .../ignite/cache/IgniteCacheEndpoint.java | 311 +++++++++++++++++++ .../ignite/cache/IgniteCacheOperation.java | 26 ++ .../ignite/cache/IgniteCacheProducer.java | 208 +++++++++++++ .../ignite/compute/IgniteComputeEndpoint.java | 155 +++++++++ .../compute/IgniteComputeExecutionType.java | 26 ++ .../ignite/compute/IgniteComputeProducer.java | 275 ++++++++++++++++ .../ignite/events/IgniteEventsConsumer.java | 92 ++++++ .../ignite/events/IgniteEventsEndpoint.java | 146 +++++++++ .../ignite/idgen/IgniteIdGenEndpoint.java | 116 +++++++ .../ignite/idgen/IgniteIdGenOperation.java | 26 ++ .../ignite/idgen/IgniteIdGenProducer.java | 83 +++++ .../messaging/IgniteMessagingConsumer.java | 78 +++++ .../messaging/IgniteMessagingEndpoint.java | 123 ++++++++ .../messaging/IgniteMessagingProducer.java | 69 ++++ .../messaging/IgniteMessagingSendMode.java | 23 ++ .../ignite/queue/IgniteQueueEndpoint.java | 175 +++++++++++ .../ignite/queue/IgniteQueueOperation.java | 26 ++ .../ignite/queue/IgniteQueueProducer.java | 170 ++++++++++ .../component/ignite/set/IgniteSetEndpoint.java | 131 ++++++++ .../ignite/set/IgniteSetOperation.java | 26 ++ .../component/ignite/set/IgniteSetProducer.java | 120 +++++++ .../src/main/resources/META-INF/LICENSE.txt | 203 ++++++++++++ .../src/main/resources/META-INF/NOTICE.txt | 11 + .../services/org/apache/camel/component/ignite | 18 ++ .../component/ignite/AbstractIgniteTest.java | 63 ++++ .../ignite/IgniteCacheContinuousQueryTest.java | 281 +++++++++++++++++ .../camel/component/ignite/IgniteCacheTest.java | 237 ++++++++++++++ .../component/ignite/IgniteComputeTest.java | 192 ++++++++++++ .../component/ignite/IgniteEventsTest.java | 168 ++++++++++ .../camel/component/ignite/IgniteIdGenTest.java | 83 +++++ .../component/ignite/IgniteMessagingTest.java | 164 ++++++++++ .../camel/component/ignite/IgniteQueueTest.java | 257 +++++++++++++++ .../camel/component/ignite/IgniteSetTest.java | 176 +++++++++++ .../ignite/TestIgniteComputeResources.java | 149 +++++++++ .../src/test/resources/log4j.properties | 37 +++ components/pom.xml | 1 + parent/pom.xml | 1 + 45 files changed, 5244 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/a695c5d3/components/camel-ignite/pom.xml ---------------------------------------------------------------------- diff --git a/components/camel-ignite/pom.xml b/components/camel-ignite/pom.xml new file mode 100644 index 0000000..53af3f7 --- /dev/null +++ b/components/camel-ignite/pom.xml @@ -0,0 +1,92 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.camel</groupId> + <artifactId>components</artifactId> + <version>2.17-SNAPSHOT</version> + </parent> + + <artifactId>camel-ignite</artifactId> + <packaging>bundle</packaging> + <name>Camel :: Ignite</name> + <description>Camel Ignite component</description> + + <properties> + <camel.osgi.export.pkg>org.apache.camel.component.ignite.*</camel.osgi.export.pkg> + <camel.osgi.export.service>org.apache.camel.spi.ComponentResolver;component=ignite</camel.osgi.export.service> + </properties> + + <dependencies> + + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-core</artifactId> + </dependency> + + <!-- Apache Ignite dependency --> + <dependency> + <groupId>org.apache.ignite</groupId> + <artifactId>ignite-core</artifactId> + <version>${ignite-version}</version> + </dependency> + + <!-- test dependencies --> + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-test-spring</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>com.google.truth</groupId> + <artifactId>truth</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>com.jayway.awaitility</groupId> + <artifactId>awaitility</artifactId> + <scope>test</scope> + </dependency> + + </dependencies> + +</project> http://git-wip-us.apache.org/repos/asf/camel/blob/a695c5d3/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/AbstractIgniteEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/AbstractIgniteEndpoint.java b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/AbstractIgniteEndpoint.java new file mode 100644 index 0000000..09791ca --- /dev/null +++ b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/AbstractIgniteEndpoint.java @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.ignite; + +import org.apache.camel.Component; +import org.apache.camel.impl.DefaultEndpoint; +import org.apache.camel.spi.UriEndpoint; +import org.apache.camel.spi.UriParam; +import org.apache.ignite.Ignite; + +/** + * Base class for all Ignite endpoints. + */ +@UriEndpoint(scheme = "ignite:...", title = "Ignite endpoints", syntax = "ignite:...", label = "nosql,cache,compute", producerOnly = true) +public abstract class AbstractIgniteEndpoint extends DefaultEndpoint { + + protected IgniteComponent component; + + @UriParam(defaultValue = "true") + private boolean propagateIncomingBodyIfNoReturnValue = true; + + @UriParam + private boolean treatCollectionsAsCacheObjects; + + public AbstractIgniteEndpoint(String endpointUri, Component component) { + super(endpointUri, component); + } + + @Override + public boolean isSingleton() { + return false; + } + + protected IgniteComponent igniteComponent() { + if (component == null) { + component = (IgniteComponent) getComponent(); + } + return component; + } + + protected Ignite ignite() { + return igniteComponent().getIgnite(); + } + + /** + * Gets whether to propagate the incoming body if the return type of the underlying + * Ignite operation is void. + * + * @return + */ + public boolean isPropagateIncomingBodyIfNoReturnValue() { + return propagateIncomingBodyIfNoReturnValue; + } + + /** + * Sets whether to propagate the incoming body if the return type of the underlying + * Ignite operation is void. + * + * @param propagateIncomingBodyIfNoReturnValue + */ + public void setPropagateIncomingBodyIfNoReturnValue(boolean propagateIncomingBodyIfNoReturnValue) { + this.propagateIncomingBodyIfNoReturnValue = propagateIncomingBodyIfNoReturnValue; + } + + /** + * Gets whether to treat Collections as cache objects or as Collections of items to + * insert/update/compute, etc. + * + * @return + */ + public boolean isTreatCollectionsAsCacheObjects() { + return treatCollectionsAsCacheObjects; + } + + /** + * Sets whether to treat Collections as cache objects or as Collections of items to + * insert/update/compute, etc. + * + * @param treatCollectionsAsCacheObjects + */ + public void setTreatCollectionsAsCacheObjects(boolean treatCollectionsAsCacheObjects) { + this.treatCollectionsAsCacheObjects = treatCollectionsAsCacheObjects; + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/a695c5d3/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/ClusterGroupExpression.java ---------------------------------------------------------------------- diff --git a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/ClusterGroupExpression.java b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/ClusterGroupExpression.java new file mode 100644 index 0000000..b9286a1c --- /dev/null +++ b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/ClusterGroupExpression.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.ignite; + +import org.apache.ignite.Ignite; +import org.apache.ignite.cluster.ClusterGroup; + +/** + * Implement this interface to create an expression that returns a {@link ClusterGroup} from the + * {@link Ignite} instance that was passed in. + */ +public interface ClusterGroupExpression { + + ClusterGroup getClusterGroup(Ignite ignite); + +} http://git-wip-us.apache.org/repos/asf/camel/blob/a695c5d3/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/ClusterGroupExpressions.java ---------------------------------------------------------------------- diff --git a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/ClusterGroupExpressions.java b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/ClusterGroupExpressions.java new file mode 100644 index 0000000..5ec660c --- /dev/null +++ b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/ClusterGroupExpressions.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.ignite; + +import org.apache.ignite.Ignite; +import org.apache.ignite.cluster.ClusterGroup; + +/** + * Convenient set of commonly used {@link ClusterGroupExpression}s. + */ +public final class ClusterGroupExpressions { + + public static final ClusterGroupExpression FOR_CLIENTS = new ClusterGroupExpression() { + @Override + public ClusterGroup getClusterGroup(Ignite ignite) { + return ignite.cluster().forClients(); + } + }; + + public static final ClusterGroupExpression FOR_DAEMONS = new ClusterGroupExpression() { + @Override + public ClusterGroup getClusterGroup(Ignite ignite) { + return ignite.cluster().forDaemons(); + } + }; + + public static final ClusterGroupExpression FOR_LOCAL = new ClusterGroupExpression() { + @Override + public ClusterGroup getClusterGroup(Ignite ignite) { + return ignite.cluster().forLocal(); + } + }; + + public static final ClusterGroupExpression FOR_OLDEST = new ClusterGroupExpression() { + @Override + public ClusterGroup getClusterGroup(Ignite ignite) { + return ignite.cluster().forOldest(); + } + }; + + public static final ClusterGroupExpression FOR_YOUNGEST = new ClusterGroupExpression() { + @Override + public ClusterGroup getClusterGroup(Ignite ignite) { + return ignite.cluster().forYoungest(); + } + }; + + public static final ClusterGroupExpression FOR_RANDOM = new ClusterGroupExpression() { + @Override + public ClusterGroup getClusterGroup(Ignite ignite) { + return ignite.cluster().forRandom(); + } + }; + + public static final ClusterGroupExpression FOR_REMOTES = new ClusterGroupExpression() { + @Override + public ClusterGroup getClusterGroup(Ignite ignite) { + return ignite.cluster().forRemotes(); + } + }; + + public static final ClusterGroupExpression FOR_SERVERS = new ClusterGroupExpression() { + @Override + public ClusterGroup getClusterGroup(Ignite ignite) { + return ignite.cluster().forServers(); + } + }; + + private ClusterGroupExpressions() { } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/a695c5d3/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/IgniteComponent.java ---------------------------------------------------------------------- diff --git a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/IgniteComponent.java b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/IgniteComponent.java new file mode 100644 index 0000000..587d9be --- /dev/null +++ b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/IgniteComponent.java @@ -0,0 +1,240 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.ignite; + +import java.io.InputStream; +import java.net.MalformedURLException; +import java.net.URI; +import java.net.URL; +import java.util.Map; + +import org.apache.camel.Endpoint; +import org.apache.camel.component.ignite.cache.IgniteCacheEndpoint; +import org.apache.camel.component.ignite.compute.IgniteComputeEndpoint; +import org.apache.camel.component.ignite.events.IgniteEventsEndpoint; +import org.apache.camel.component.ignite.idgen.IgniteIdGenEndpoint; +import org.apache.camel.component.ignite.messaging.IgniteMessagingEndpoint; +import org.apache.camel.component.ignite.queue.IgniteQueueEndpoint; +import org.apache.camel.component.ignite.set.IgniteSetEndpoint; +import org.apache.camel.impl.UriEndpointComponent; +import org.apache.camel.util.ObjectHelper; +import org.apache.camel.util.URISupport; +import org.apache.ignite.Ignite; +import org.apache.ignite.Ignition; +import org.apache.ignite.configuration.IgniteConfiguration; + +/** + * The Ignite Component integrates Apache Camel with Apache Ignite, providing endpoints for the following functions: + * <ul> + * <li>Cache operations.</li> + * <li>Cluster computation.</li> + * <li>Messaging.</li> + * <li>Eventing.</li> + * <li>Id Generation.</li> + * <li>Set operations.</li> + * <li>Queue operations.</li> + * </ul> + */ +public class IgniteComponent extends UriEndpointComponent { + + /** + * Modes of managing the underlying {@link Ignite} instance. + */ + public enum IgniteLifecycleMode { + USER_MANAGED, COMPONENT_MANAGED + } + + /** Ignite configuration. */ + private IgniteConfiguration igniteConfiguration; + + /** Resource from where to load configuration. */ + private Object configurationResource; + + /** Ignite instance. */ + private Ignite ignite; + + /** How the Ignite lifecycle is managed. */ + private IgniteLifecycleMode lifecycleMode = IgniteLifecycleMode.COMPONENT_MANAGED; + + public IgniteComponent() { + super(AbstractIgniteEndpoint.class); + } + + public static IgniteComponent fromIgnite(Ignite ignite) { + IgniteComponent answer = new IgniteComponent(); + answer.setIgnite(ignite); + return answer; + } + + public static IgniteComponent fromConfiguration(IgniteConfiguration configuration) { + IgniteComponent answer = new IgniteComponent(); + answer.setIgniteConfiguration(configuration); + return answer; + } + + public static IgniteComponent fromInputStream(InputStream inputStream) { + IgniteComponent answer = new IgniteComponent(); + answer.setConfigurationResource(inputStream); + return answer; + } + + public static IgniteComponent fromUrl(URL url) { + IgniteComponent answer = new IgniteComponent(); + answer.setConfigurationResource(url); + return answer; + } + + public static IgniteComponent fromLocation(String location) { + IgniteComponent answer = new IgniteComponent(); + answer.setConfigurationResource(location); + return answer; + } + + @Override + protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception { + ObjectHelper.notNull(getCamelContext(), "Camel Context"); + + AbstractIgniteEndpoint answer = null; + URI remainingUri = new URI(URISupport.normalizeUri(remaining)); + String scheme = remainingUri.getScheme(); + + switch (scheme) { + case "cache": + answer = new IgniteCacheEndpoint(uri, remainingUri, parameters, this); + break; + case "compute": + answer = new IgniteComputeEndpoint(uri, remainingUri, parameters, this); + break; + case "messaging": + answer = new IgniteMessagingEndpoint(uri, remainingUri, parameters, this); + break; + case "events": + answer = new IgniteEventsEndpoint(uri, remainingUri, parameters, this); + break; + case "set": + answer = new IgniteSetEndpoint(uri, remainingUri, parameters, this); + break; + case "idgen": + answer = new IgniteIdGenEndpoint(uri, remainingUri, parameters, this); + break; + case "queue": + answer = new IgniteQueueEndpoint(uri, remainingUri, parameters, this); + break; + + default: + throw new MalformedURLException("An invalid Ignite endpoint URI was provided. Please check that " + + "it starts with:" + " ignite:[cache/compute/messaging/...]:..."); + } + + setProperties(answer, parameters); + + return answer; + } + + @Override + protected void doStart() throws Exception { + super.doStart(); + + if (lifecycleMode == IgniteLifecycleMode.USER_MANAGED) { + return; + } + + // Try to load the configuration from the resource. + if (configurationResource != null) { + if (configurationResource instanceof URL) { + ignite = Ignition.start((URL) configurationResource); + } else if (configurationResource instanceof InputStream) { + ignite = Ignition.start((InputStream) configurationResource); + } else if (configurationResource instanceof String) { + ignite = Ignition.start((String) configurationResource); + } else { + throw new IllegalStateException("An unsupported configuration resource was provided to the Ignite component. " + "Supported types are: URL, InputStream, String."); + } + } else if (igniteConfiguration != null) { + ignite = Ignition.start(igniteConfiguration); + } else { + throw new IllegalStateException("No configuration resource or IgniteConfiguration was provided to the Ignite component."); + } + } + + @Override + protected void doStop() throws Exception { + super.doStop(); + + if (lifecycleMode == IgniteLifecycleMode.USER_MANAGED) { + return; + } + + ignite.close(); + } + + /** + * Returns the {@link Ignite} instance. + * + * @return + */ + public Ignite getIgnite() { + return ignite; + } + + /** + * Sets the {@link Ignite} instance. + * + * @param ignite + */ + public void setIgnite(Ignite ignite) { + this.ignite = ignite; + } + + /** + * Gets the resource from where to load the configuration. It can be a: {@link URI}, {@link String} (URI) + * or an {@link InputStream}. + * + * @return + */ + public Object getConfigurationResource() { + return configurationResource; + } + + /** + * Sets the resource from where to load the configuration. It can be a: {@link URI}, {@link String} (URI) + * or an {@link InputStream}. + * + * @param configurationResource + */ + public void setConfigurationResource(Object configurationResource) { + this.configurationResource = configurationResource; + } + + /** + * Gets the {@link IgniteConfiguration} if the user set it explicitly. + * @return + */ + public IgniteConfiguration getIgniteConfiguration() { + return igniteConfiguration; + } + + /** + * Allows the user to set a programmatic {@link IgniteConfiguration}. + * + * @param igniteConfiguration + */ + public void setIgniteConfiguration(IgniteConfiguration igniteConfiguration) { + this.igniteConfiguration = igniteConfiguration; + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/a695c5d3/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/IgniteConstants.java ---------------------------------------------------------------------- diff --git a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/IgniteConstants.java b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/IgniteConstants.java new file mode 100644 index 0000000..e9b9256 --- /dev/null +++ b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/IgniteConstants.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.ignite; + +/** + * Ignite Component constants. + */ +public final class IgniteConstants { + + // Ignite Cache. + public static final String IGNITE_CACHE_KEY = "CamelIgniteCacheKey"; + public static final String IGNITE_CACHE_QUERY = "CamelIgniteCacheQuery"; + public static final String IGNITE_CACHE_OPERATION = "CamelIgniteCacheOperation"; + public static final String IGNITE_CACHE_PEEK_MODE = "CamelIgniteCachePeekMode"; + public static final String IGNITE_CACHE_EVENT_TYPE = "CamelIgniteCacheEventType"; + public static final String IGNITE_CACHE_NAME = "CamelIgniteCacheName"; + public static final String IGNITE_CACHE_OLD_VALUE = "CamelIgniteCacheOldValue"; + + // Ignite Messaging. + public static final String IGNITE_MESSAGING_TOPIC = "CamelIgniteMessagingTopic"; + public static final String IGNITE_MESSAGING_UUID = "CamelIgniteMessagingUUID"; + + // Ignite Compute. + public static final String IGNITE_COMPUTE_EXECUTION_TYPE = "CamelIgniteComputeExecutionType"; + public static final String IGNITE_COMPUTE_PARAMS = "CamelIgniteComputeParameters"; + public static final String IGNITE_COMPUTE_REDUCER = "CamelIgniteComputeReducer"; + public static final String IGNITE_COMPUTE_AFFINITY_CACHE_NAME = "CamelIgniteComputeAffinityCacheName"; + public static final String IGNITE_COMPUTE_AFFINITY_KEY = "CamelIgniteComputeAffinityKey"; + + // Ignite Sets. + public static final String IGNITE_SETS_OPERATION = "CamelIgniteSetsOperation"; + public static final String IGNITE_SETS_OPERATION_RESULT = "CamelIgniteSetsOperationResult"; + + // Ignite ID Gen. + public static final String IGNITE_IDGEN_OPERATION = "CamelIgniteIdGenOperation"; + + // Ignite Queues. + public static final String IGNITE_QUEUE_OPERATION = "CamelIgniteQueueOperation"; + public static final String IGNITE_QUEUE_MAX_ELEMENTS = "CamelIgniteQueueMaxElements"; + public static final String IGNITE_QUEUE_TRANSFERRED_COUNT = "CamelIgniteQueueTransferredCount"; + public static final String IGNITE_QUEUE_TIMEOUT_MILLIS = "CamelIgniteQueueTimeoutMillis"; + + private IgniteConstants() { } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/a695c5d3/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/IgniteHelper.java ---------------------------------------------------------------------- diff --git a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/IgniteHelper.java b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/IgniteHelper.java new file mode 100644 index 0000000..05099f4 --- /dev/null +++ b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/IgniteHelper.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.ignite; + +import org.apache.camel.Message; + +/** + * Helper methods for the Ignite component. + */ +public final class IgniteHelper { + + private IgniteHelper() { + + } + + public static void maybePropagateIncomingBody(AbstractIgniteEndpoint endpoint, Message in, Message out) { + if (!endpoint.isPropagateIncomingBodyIfNoReturnValue()) { + return; + } + + out.setBody(in.getBody()); + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/a695c5d3/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/cache/IgniteCacheContinuousQueryConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/cache/IgniteCacheContinuousQueryConsumer.java b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/cache/IgniteCacheContinuousQueryConsumer.java new file mode 100644 index 0000000..e4c7302 --- /dev/null +++ b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/cache/IgniteCacheContinuousQueryConsumer.java @@ -0,0 +1,155 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.ignite.cache; + +import javax.cache.Cache.Entry; +import javax.cache.event.CacheEntryEvent; +import javax.cache.event.CacheEntryListenerException; +import javax.cache.event.CacheEntryUpdatedListener; + +import org.apache.camel.AsyncCallback; +import org.apache.camel.Exchange; +import org.apache.camel.ExchangePattern; +import org.apache.camel.Message; +import org.apache.camel.Processor; +import org.apache.camel.component.ignite.IgniteConstants; +import org.apache.camel.impl.DefaultConsumer; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.query.ContinuousQuery; +import org.apache.ignite.cache.query.QueryCursor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A consumer that generates {@link Exchange}s for items received from a continuous query. + */ +public class IgniteCacheContinuousQueryConsumer extends DefaultConsumer { + + private static final Logger LOG = LoggerFactory.getLogger(IgniteCacheContinuousQueryConsumer.class); + + private IgniteCacheEndpoint endpoint; + + private IgniteCache<Object, Object> cache; + + private QueryCursor<Entry<Object, Object>> cursor; + + public IgniteCacheContinuousQueryConsumer(IgniteCacheEndpoint endpoint, Processor processor, IgniteCache<Object, Object> cache) { + super(endpoint, processor); + this.endpoint = endpoint; + this.cache = cache; + } + + @Override + protected void doStart() throws Exception { + super.doStart(); + + launchContinuousQuery(); + + maybeFireExistingQueryResults(); + } + + private void maybeFireExistingQueryResults() { + if (!endpoint.isFireExistingQueryResults()) { + LOG.info(String.format("Skipping existing cache results for cache name = %s.", endpoint.getCacheName())); + return; + } + + LOG.info(String.format("Processing existing cache results for cache name = %s.", endpoint.getCacheName())); + + for (Entry<Object, Object> entry : cursor) { + Exchange exchange = createExchange(entry.getValue()); + exchange.getIn().setHeader(IgniteConstants.IGNITE_CACHE_KEY, entry.getKey()); + getAsyncProcessor().process(createExchange(entry), new AsyncCallback() { + @Override + public void done(boolean doneSync) { + // do nothing + } + }); + } + } + + private void launchContinuousQuery() { + ContinuousQuery<Object, Object> continuousQuery = new ContinuousQuery<>(); + + if (endpoint.getQuery() != null) { + continuousQuery.setInitialQuery(endpoint.getQuery()); + } + + if (endpoint.getRemoteFilter() != null) { + continuousQuery.setRemoteFilter(endpoint.getRemoteFilter()); + } + + continuousQuery.setLocalListener(new CacheEntryUpdatedListener<Object, Object>() { + @Override + public void onUpdated(Iterable<CacheEntryEvent<? extends Object, ? extends Object>> events) throws CacheEntryListenerException { + if (!endpoint.isOneExchangePerUpdate()) { + fireGroupedExchange(events); + return; + } + + for (CacheEntryEvent<? extends Object, ? extends Object> entry : events) { + fireSingleExchange(entry); + } + } + }); + + continuousQuery.setAutoUnsubscribe(endpoint.isAutoUnsubscribe()); + continuousQuery.setPageSize(endpoint.getPageSize()); + continuousQuery.setTimeInterval(endpoint.getTimeInterval()); + + cursor = cache.query(continuousQuery); + } + + @Override + protected void doStop() throws Exception { + super.doStop(); + + cursor.close(); + } + + private void fireSingleExchange(CacheEntryEvent<? extends Object, ? extends Object> entry) { + Exchange exchange = createExchange(entry.getValue()); + exchange.getIn().setHeader(IgniteConstants.IGNITE_CACHE_EVENT_TYPE, entry.getEventType()); + exchange.getIn().setHeader(IgniteConstants.IGNITE_CACHE_OLD_VALUE, entry.getOldValue()); + exchange.getIn().setHeader(IgniteConstants.IGNITE_CACHE_KEY, entry.getKey()); + getAsyncProcessor().process(exchange, new AsyncCallback() { + @Override + public void done(boolean doneSync) { + // do nothing + } + }); + } + + private void fireGroupedExchange(Iterable<CacheEntryEvent<? extends Object, ? extends Object>> events) { + Exchange exchange = createExchange(events); + getAsyncProcessor().process(exchange, new AsyncCallback() { + @Override + public void done(boolean doneSync) { + // do nothing + } + }); + } + + private Exchange createExchange(Object payload) { + Exchange exchange = endpoint.createExchange(ExchangePattern.InOnly); + Message in = exchange.getIn(); + in.setBody(payload); + in.setHeader(IgniteConstants.IGNITE_CACHE_NAME, endpoint.getCacheName()); + return exchange; + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/a695c5d3/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/cache/IgniteCacheEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/cache/IgniteCacheEndpoint.java b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/cache/IgniteCacheEndpoint.java new file mode 100644 index 0000000..2b22409 --- /dev/null +++ b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/cache/IgniteCacheEndpoint.java @@ -0,0 +1,311 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.ignite.cache; + +import java.net.URI; +import java.util.Map; + +import javax.cache.Cache.Entry; + +import org.apache.camel.CamelException; +import org.apache.camel.Consumer; +import org.apache.camel.Processor; +import org.apache.camel.Producer; +import org.apache.camel.component.ignite.AbstractIgniteEndpoint; +import org.apache.camel.component.ignite.IgniteComponent; +import org.apache.camel.spi.Metadata; +import org.apache.camel.spi.UriEndpoint; +import org.apache.camel.spi.UriParam; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheEntryEventSerializableFilter; +import org.apache.ignite.cache.CachePeekMode; +import org.apache.ignite.cache.query.ContinuousQuery; +import org.apache.ignite.cache.query.Query; + +/** + * Ignite Cache endpoint. + */ +@UriEndpoint(scheme = "ignite:cache", title = "Ignite Cache", syntax = "ignite:cache:[cacheName]", label = "nosql,cache,compute", + consumerClass = IgniteCacheContinuousQueryConsumer.class) +public class IgniteCacheEndpoint extends AbstractIgniteEndpoint { + + @UriParam + @Metadata(required = "true") + private String cacheName; + + @UriParam + private IgniteCacheOperation operation; + + @UriParam + private boolean failIfInexistentCache; + + @UriParam + private CachePeekMode cachePeekMode = CachePeekMode.ALL; + + @UriParam + private Query<Entry<Object, Object>> query; + + @UriParam + private CacheEntryEventSerializableFilter<Object, Object> remoteFilter; + + @UriParam + private boolean oneExchangePerUpdate = true; + + @UriParam + private boolean fireExistingQueryResults; + + @UriParam + private boolean autoUnsubscribe = ContinuousQuery.DFLT_AUTO_UNSUBSCRIBE; + + @UriParam + private int pageSize = ContinuousQuery.DFLT_PAGE_SIZE; + + @UriParam + private long timeInterval = ContinuousQuery.DFLT_TIME_INTERVAL; + + public IgniteCacheEndpoint(String endpointUri, URI remainingUri, Map<String, Object> parameters, IgniteComponent igniteComponent) { + super(endpointUri, igniteComponent); + cacheName = remainingUri.getHost(); + } + + @Override + public Producer createProducer() throws Exception { + return new IgniteCacheProducer(this, obtainCache()); + } + + @Override + public Consumer createConsumer(Processor processor) throws Exception { + return new IgniteCacheContinuousQueryConsumer(this, processor, obtainCache()); + } + + private IgniteCache<Object, Object> obtainCache() throws CamelException { + IgniteCache<Object, Object> cache = ignite().cache(cacheName); + if (cache == null) { + if (failIfInexistentCache) { + throw new CamelException(String.format("Ignite cache %s doesn't exist, and failIfInexistentCache is true", cacheName)); + } + cache = ignite().createCache(cacheName); + } + + return cache; + } + + /** + * Gets the cache name. + * + * @return + */ + public String getCacheName() { + return cacheName; + } + + /** + * Sets the cache name. + * + * @param cacheName + */ + public void setCacheName(String cacheName) { + this.cacheName = cacheName; + } + + /** + * Gets the cache operation to invoke. + * + * @return + */ + public IgniteCacheOperation getOperation() { + return operation; + } + + /** + * Sets the cache operation to invoke. + * + * @param operation + */ + public void setOperation(IgniteCacheOperation operation) { + this.operation = operation; + } + + /** + * Gets whether to fail the initialization if the cache doesn't exist. + * + * @return + */ + public boolean isFailIfInexistentCache() { + return failIfInexistentCache; + } + + /** + * Sets whether to fail the initialization if the cache doesn't exist. + * + * @param failIfInexistentCache + */ + public void setFailIfInexistentCache(boolean failIfInexistentCache) { + this.failIfInexistentCache = failIfInexistentCache; + } + + /** + * Gets the {@link CachePeekMode}, only needed for operations that require it ({@link IgniteCacheOperation#SIZE}). + * + * @return + */ + public CachePeekMode getCachePeekMode() { + return cachePeekMode; + } + + /** + * Sets the {@link CachePeekMode}, only needed for operations that require it ({@link IgniteCacheOperation#SIZE}). + * + * @param cachePeekMode + */ + public void setCachePeekMode(CachePeekMode cachePeekMode) { + this.cachePeekMode = cachePeekMode; + } + + /** + * Gets the query to execute, only needed for operations that require it, + * and for the Continuous Query Consumer. + * + * @return + */ + public Query<Entry<Object, Object>> getQuery() { + return query; + } + + /** + * Sets the query to execute, only needed for operations that require it, + * and for the Continuous Query Consumer. + * + * @param query + */ + public void setQuery(Query<Entry<Object, Object>> query) { + this.query = query; + } + + /** + * Gets the remote filter, only used by the Continuous Query Consumer. + * + * @return + */ + public CacheEntryEventSerializableFilter<Object, Object> getRemoteFilter() { + return remoteFilter; + } + + /** + * Sets the remote filter, only used by the Continuous Query Consumer. + * + * @param remoteFilter + */ + public void setRemoteFilter(CacheEntryEventSerializableFilter<Object, Object> remoteFilter) { + this.remoteFilter = remoteFilter; + } + + /** + * Gets whether to pack each update in an individual Exchange, even if multiple updates are + * received in one batch. Only used by the Continuous Query Consumer. + * + * @return + */ + public boolean isOneExchangePerUpdate() { + return oneExchangePerUpdate; + } + + /** + * Sets whether to pack each update in an individual Exchange, even if multiple updates are + * received in one batch. Only used by the Continuous Query Consumer. + * + * @param oneExchangePerUpdate + */ + public void setOneExchangePerUpdate(boolean oneExchangePerUpdate) { + this.oneExchangePerUpdate = oneExchangePerUpdate; + } + + /** + * Gets whether auto unsubscribe is enabled in the Continuous Query Consumer. + * + * @return + */ + public boolean isAutoUnsubscribe() { + return autoUnsubscribe; + } + + /** + * Sets whether auto unsubscribe is enabled in the Continuous Query Consumer. + * + * @param autoUnsubscribe + */ + public void setAutoUnsubscribe(boolean autoUnsubscribe) { + this.autoUnsubscribe = autoUnsubscribe; + } + + /** + * Gets the page size. Only used by the Continuous Query Consumer. + * + * @return + */ + public int getPageSize() { + return pageSize; + } + + /** + * Sets the page size. Only used by the Continuous Query Consumer. + * + * @param pageSize + */ + public void setPageSize(int pageSize) { + this.pageSize = pageSize; + } + + /** + * Gets whether to process existing results that match the query. Used on initialization of + * the Continuous Query Consumer. + * + * @return + */ + public boolean isFireExistingQueryResults() { + return fireExistingQueryResults; + } + + /** + * Sets whether to process existing results that match the query. Used on initialization of + * the Continuous Query Consumer. + * + * @param fireExistingQueryResults + */ + public void setFireExistingQueryResults(boolean fireExistingQueryResults) { + this.fireExistingQueryResults = fireExistingQueryResults; + } + + /** + * Gets the time interval for the Continuous Query Consumer. + * + * @return + */ + public long getTimeInterval() { + return timeInterval; + } + + /** + * Sets the time interval for the Continuous Query Consumer. + * + * @param timeInterval + */ + public void setTimeInterval(long timeInterval) { + this.timeInterval = timeInterval; + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/a695c5d3/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/cache/IgniteCacheOperation.java ---------------------------------------------------------------------- diff --git a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/cache/IgniteCacheOperation.java b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/cache/IgniteCacheOperation.java new file mode 100644 index 0000000..636df73 --- /dev/null +++ b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/cache/IgniteCacheOperation.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.ignite.cache; + +/** + * Enumeration of Ignite Cache operations. + */ +public enum IgniteCacheOperation { + + GET, PUT, REMOVE, SIZE, REBALANCE, QUERY, CLEAR + +} http://git-wip-us.apache.org/repos/asf/camel/blob/a695c5d3/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/cache/IgniteCacheProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/cache/IgniteCacheProducer.java b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/cache/IgniteCacheProducer.java new file mode 100644 index 0000000..7ad3f86 --- /dev/null +++ b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/cache/IgniteCacheProducer.java @@ -0,0 +1,208 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.ignite.cache; + +import java.util.Collection; +import java.util.Map; +import java.util.Set; + +import org.apache.camel.AsyncCallback; +import org.apache.camel.Exchange; +import org.apache.camel.InvalidPayloadException; +import org.apache.camel.Message; +import org.apache.camel.RuntimeCamelException; +import org.apache.camel.component.ignite.IgniteConstants; +import org.apache.camel.component.ignite.IgniteHelper; +import org.apache.camel.impl.DefaultAsyncProducer; +import org.apache.camel.spi.Synchronization; +import org.apache.camel.util.MessageHelper; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CachePeekMode; +import org.apache.ignite.cache.query.Query; +import org.apache.ignite.cache.query.QueryCursor; + +/** + * Ignite Cache producer. + */ +public class IgniteCacheProducer extends DefaultAsyncProducer { + + private IgniteCache<Object, Object> cache; + private IgniteCacheEndpoint endpoint; + + public IgniteCacheProducer(IgniteCacheEndpoint endpoint, IgniteCache<Object, Object> igniteCache) { + super(endpoint); + this.endpoint = endpoint; + this.cache = igniteCache; + } + + @Override + public boolean process(Exchange exchange, AsyncCallback callback) { + Message in = exchange.getIn(); + Message out = exchange.getOut(); + MessageHelper.copyHeaders(exchange.getIn(), out, true); + + switch (cacheOperationFor(exchange)) { + + case GET: + doGet(in, out); + break; + + case PUT: + doPut(in, out); + break; + + case QUERY: + doQuery(in, out, exchange); + break; + + case REMOVE: + doRemove(in, out); + break; + + case CLEAR: + doClear(in, out); + break; + + case SIZE: + doSize(in, out); + break; + + case REBALANCE: + doRebalance(in, out); + break; + + default: + break; + } + + return true; + } + + @SuppressWarnings("unchecked") + private void doGet(Message in, Message out) { + Object cacheKey = cacheKey(in); + + if (cacheKey instanceof Set && !endpoint.isTreatCollectionsAsCacheObjects()) { + out.setBody(cache.getAll((Set<Object>) cacheKey)); + } else { + out.setBody(cache.get(cacheKey)); + } + } + + @SuppressWarnings("unchecked") + private void doPut(Message in, Message out) { + Map<Object, Object> map = in.getBody(Map.class); + + if (map != null) { + cache.putAll(map); + return; + } + + Object cacheKey = in.getHeader(IgniteConstants.IGNITE_CACHE_KEY); + + if (cacheKey == null) { + throw new RuntimeCamelException("Cache PUT operation requires the cache key in the CamelIgniteCacheKey header, " + "or a payload of type Map."); + } + + cache.put(cacheKey, in.getBody()); + + IgniteHelper.maybePropagateIncomingBody(endpoint, in, out); + } + + @SuppressWarnings("unchecked") + private void doQuery(Message in, Message out, Exchange exchange) { + Query<Object> query = in.getHeader(IgniteConstants.IGNITE_CACHE_QUERY, Query.class); + + if (query == null) { + try { + query = in.getMandatoryBody(Query.class); + } catch (InvalidPayloadException e) { + exchange.setException(e); + return; + } + } + + final QueryCursor<Object> cursor = cache.query(query); + + out.setBody(cursor.iterator()); + + exchange.addOnCompletion(new Synchronization() { + @Override + public void onFailure(Exchange exchange) { + cursor.close(); + } + + @Override + public void onComplete(Exchange exchange) { + cursor.close(); + } + }); + + } + + @SuppressWarnings("unchecked") + private void doRemove(Message in, Message out) { + Object cacheKey = cacheKey(in); + + if (cacheKey instanceof Set && !endpoint.isTreatCollectionsAsCacheObjects()) { + cache.removeAll((Set<Object>) cacheKey); + } else { + cache.remove(cacheKey); + } + + IgniteHelper.maybePropagateIncomingBody(endpoint, in, out); + } + + private void doClear(Message in, Message out) { + cache.removeAll(); + + IgniteHelper.maybePropagateIncomingBody(endpoint, in, out); + } + + private void doRebalance(Message in, Message out) { + cache.rebalance().get(); + + IgniteHelper.maybePropagateIncomingBody(endpoint, in, out); + } + + @SuppressWarnings("unchecked") + private void doSize(Message in, Message out) { + Object peekMode = in.getHeader(IgniteConstants.IGNITE_CACHE_PEEK_MODE, endpoint.getCachePeekMode()); + + Integer result = null; + if (peekMode instanceof Collection) { + result = cache.size(((Collection<Object>) peekMode).toArray(new CachePeekMode[0])); + } else if (peekMode instanceof CachePeekMode) { + result = cache.size((CachePeekMode) peekMode); + } + + out.setBody(result); + } + + private Object cacheKey(Message msg) { + Object cacheKey = msg.getHeader(IgniteConstants.IGNITE_CACHE_KEY); + if (cacheKey == null) { + cacheKey = msg.getBody(); + } + return cacheKey; + } + + private IgniteCacheOperation cacheOperationFor(Exchange exchange) { + return exchange.getIn().getHeader(IgniteConstants.IGNITE_CACHE_OPERATION, endpoint.getOperation(), IgniteCacheOperation.class); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/a695c5d3/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/compute/IgniteComputeEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/compute/IgniteComputeEndpoint.java b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/compute/IgniteComputeEndpoint.java new file mode 100644 index 0000000..1dc0663 --- /dev/null +++ b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/compute/IgniteComputeEndpoint.java @@ -0,0 +1,155 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.ignite.compute; + +import java.net.URI; +import java.util.Map; + +import org.apache.camel.Consumer; +import org.apache.camel.Processor; +import org.apache.camel.Producer; +import org.apache.camel.component.ignite.AbstractIgniteEndpoint; +import org.apache.camel.component.ignite.ClusterGroupExpression; +import org.apache.camel.component.ignite.IgniteComponent; +import org.apache.camel.spi.UriEndpoint; +import org.apache.camel.spi.UriParam; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCompute; + +/** + * Ignite Compute endpoint. + */ +@UriEndpoint(scheme = "ignite:compute", title = "Ignite Compute", syntax = "ignite:compute:endpointId", label = "nosql,cache,compute", producerOnly = true) +public class IgniteComputeEndpoint extends AbstractIgniteEndpoint { + + @UriParam + private ClusterGroupExpression clusterGroupExpression; + + @UriParam + private IgniteComputeExecutionType executionType; + + @UriParam + private String taskName; + + @UriParam + private String computeName; + + @UriParam + private Long timeoutMillis; + + public IgniteComputeEndpoint(String uri, URI remainingUri, Map<String, Object> parameters, IgniteComponent igniteComponent) throws ClassNotFoundException { + super(uri, igniteComponent); + } + + @Override + public Producer createProducer() throws Exception { + return new IgniteComputeProducer(this); + } + + @Override + public Consumer createConsumer(Processor processor) throws Exception { + throw new UnsupportedOperationException("The Ignite Compute endpoint does not support consumers."); + } + + public IgniteCompute createIgniteCompute() { + Ignite ignite = ignite(); + IgniteCompute compute = clusterGroupExpression == null ? ignite.compute() : ignite.compute(clusterGroupExpression.getClusterGroup(ignite)); + + if (computeName != null) { + compute = compute.withName(computeName); + } + + if (timeoutMillis != null) { + compute = compute.withTimeout(timeoutMillis); + } + + return compute; + } + + /** + * Gets the execution type of this producer. + * + * @return + */ + public IgniteComputeExecutionType getExecutionType() { + return executionType; + } + + /** + * Sets the execution type of this producer. + * + * @param executionType + */ + public void setExecutionType(IgniteComputeExecutionType executionType) { + this.executionType = executionType; + } + + /** + * Gets the task name, only applicable if using the {@link IgniteComputeExecutionType#EXECUTE} execution type. + * + * @return + */ + public String getTaskName() { + return taskName; + } + + /** + * Sets the task name, only applicable if using the {@link IgniteComputeExecutionType#EXECUTE} execution type. + * + * @param taskName + */ + public void setTaskName(String taskName) { + this.taskName = taskName; + } + + /** + * Gets the name of the compute job, which will be set via {@link IgniteCompute#withName(String)}. + * + * @return + */ + public String getComputeName() { + return computeName; + } + + /** + * Sets the name of the compute job, which will be set via {@link IgniteCompute#withName(String)}. + * + * @param computeName + */ + public void setComputeName(String computeName) { + this.computeName = computeName; + } + + /** + * Gets the timeout interval for triggered jobs, in milliseconds, which will be set via {@link IgniteCompute#withTimeout(long)}. + * + * @return + */ + public Long getTimeoutMillis() { + return timeoutMillis; + } + + /** + * Sets the timeout interval for triggered jobs, in milliseconds, which will be set via {@link IgniteCompute#withTimeout(long)}. + * + * @param timeoutMillis + */ + public void setTimeoutMillis(Long timeoutMillis) { + this.timeoutMillis = timeoutMillis; + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/a695c5d3/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/compute/IgniteComputeExecutionType.java ---------------------------------------------------------------------- diff --git a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/compute/IgniteComputeExecutionType.java b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/compute/IgniteComputeExecutionType.java new file mode 100644 index 0000000..f537a95 --- /dev/null +++ b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/compute/IgniteComputeExecutionType.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.ignite.compute; + +/** + * Enumeration of execution type for the Ignite Compute producer. + */ +public enum IgniteComputeExecutionType { + + CALL, BROADCAST, APPLY, EXECUTE, RUN, AFFINITY_CALL, AFFINITY_RUN + +} http://git-wip-us.apache.org/repos/asf/camel/blob/a695c5d3/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/compute/IgniteComputeProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/compute/IgniteComputeProducer.java b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/compute/IgniteComputeProducer.java new file mode 100644 index 0000000..d550a74 --- /dev/null +++ b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/compute/IgniteComputeProducer.java @@ -0,0 +1,275 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.ignite.compute; + +import java.util.ArrayList; +import java.util.Collection; + +import org.apache.camel.AsyncCallback; +import org.apache.camel.Exchange; +import org.apache.camel.Message; +import org.apache.camel.RuntimeCamelException; +import org.apache.camel.TypeConverter; +import org.apache.camel.component.ignite.IgniteConstants; +import org.apache.camel.impl.DefaultAsyncProducer; +import org.apache.camel.util.MessageHelper; +import org.apache.ignite.IgniteCompute; +import org.apache.ignite.compute.ComputeTask; +import org.apache.ignite.lang.IgniteCallable; +import org.apache.ignite.lang.IgniteClosure; +import org.apache.ignite.lang.IgniteFuture; +import org.apache.ignite.lang.IgniteInClosure; +import org.apache.ignite.lang.IgniteReducer; +import org.apache.ignite.lang.IgniteRunnable; + +/** + * Ignite Compute producer. + */ +public class IgniteComputeProducer extends DefaultAsyncProducer { + + private IgniteComputeEndpoint endpoint; + + public IgniteComputeProducer(IgniteComputeEndpoint endpoint) { + super(endpoint); + this.endpoint = endpoint; + } + + @Override + public boolean process(Exchange exchange, AsyncCallback callback) { + IgniteCompute compute = endpoint.createIgniteCompute().withAsync(); + + try { + switch (executionTypeFor(exchange)) { + + case CALL: + doCall(exchange, callback, compute); + break; + + case BROADCAST: + doBroadcast(exchange, callback, compute); + break; + + case EXECUTE: + doExecute(exchange, callback, compute); + break; + + case RUN: + doRun(exchange, callback, compute); + break; + + case APPLY: + doApply(exchange, callback, compute); + break; + + case AFFINITY_CALL: + doAffinityCall(exchange, callback, compute); + break; + + case AFFINITY_RUN: + doAffinityRun(exchange, callback, compute); + break; + + default: + exchange.setException(new UnsupportedOperationException("Operation not supported by Ignite Compute producer.")); + return true; + } + + compute.future().listen(IgniteInCamelClosure.create(exchange, callback)); + + } catch (Exception e) { + exchange.setException(e); + return true; + } + + return false; + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + private void doCall(final Exchange exchange, final AsyncCallback callback, IgniteCompute compute) throws Exception { + Object job = exchange.getIn().getBody(); + IgniteReducer<Object, Object> reducer = exchange.getIn().getHeader(IgniteConstants.IGNITE_COMPUTE_REDUCER, IgniteReducer.class); + + if (Collection.class.isAssignableFrom(job.getClass())) { + Collection<?> col = (Collection<?>) job; + TypeConverter tc = exchange.getContext().getTypeConverter(); + Collection<IgniteCallable<?>> callables = new ArrayList<>(col.size()); + for (Object o : col) { + callables.add(tc.mandatoryConvertTo(IgniteCallable.class, o)); + } + if (reducer != null) { + compute.call((Collection) callables, reducer); + } else { + compute.call((Collection) callables); + } + } else if (IgniteCallable.class.isAssignableFrom(job.getClass())) { + compute.call((IgniteCallable<Object>) job); + } else { + throw new RuntimeCamelException(String.format( + "Ignite Compute endpoint with CALL executionType is only " + "supported for IgniteCallable payloads, or collections of them. The payload type was: %s.", job.getClass().getName())); + } + } + + @SuppressWarnings("unchecked") + private void doBroadcast(final Exchange exchange, final AsyncCallback callback, IgniteCompute compute) throws Exception { + Object job = exchange.getIn().getBody(); + + if (IgniteCallable.class.isAssignableFrom(job.getClass())) { + compute.broadcast((IgniteCallable<?>) job); + } else if (IgniteRunnable.class.isAssignableFrom(job.getClass())) { + compute.broadcast((IgniteRunnable) job); + } else if (IgniteClosure.class.isAssignableFrom(job.getClass())) { + compute.broadcast((IgniteClosure<Object, Object>) job, exchange.getIn().getHeader(IgniteConstants.IGNITE_COMPUTE_PARAMS)); + } else { + throw new RuntimeCamelException( + String.format("Ignite Compute endpoint with BROADCAST executionType is only " + "supported for IgniteCallable, IgniteRunnable or IgniteClosure payloads. The payload type was: %s.", + job.getClass().getName())); + } + } + + @SuppressWarnings("unchecked") + private void doExecute(final Exchange exchange, final AsyncCallback callback, IgniteCompute compute) throws Exception { + Object job = exchange.getIn().getBody(); + Object params = exchange.getIn().getHeader(IgniteConstants.IGNITE_COMPUTE_PARAMS); + + if (job instanceof Class && ComputeTask.class.isAssignableFrom((Class<?>) job)) { + Class<? extends ComputeTask<Object, Object>> task = (Class<? extends ComputeTask<Object, Object>>) job; + compute.execute(task, params); + } else if (ComputeTask.class.isAssignableFrom(job.getClass())) { + compute.execute((ComputeTask<Object, Object>) job, params); + } else if (endpoint.getTaskName() != null) { + if (exchange.getIn().getBody() != null) { + params = exchange.getIn().getBody(); + } + compute.execute(endpoint.getTaskName(), params); + } else { + throw new RuntimeCamelException(String.format("Ignite Compute endpoint with EXECUTE executionType is only " + + "supported for ComputeTask payloads, Class<ComputeTask> or any payload in conjunction with the " + "task name option. The payload type was: %s.", job.getClass().getName())); + } + } + + private void doRun(final Exchange exchange, final AsyncCallback callback, IgniteCompute compute) throws Exception { + Object job = exchange.getIn().getBody(); + + if (Collection.class.isAssignableFrom(job.getClass())) { + Collection<?> col = (Collection<?>) job; + TypeConverter tc = exchange.getContext().getTypeConverter(); + Collection<IgniteRunnable> runnables = new ArrayList<>(col.size()); + for (Object o : col) { + runnables.add(tc.mandatoryConvertTo(IgniteRunnable.class, o)); + } + compute.run(runnables); + } else if (IgniteRunnable.class.isAssignableFrom(job.getClass())) { + compute.run((IgniteRunnable) job); + } else { + throw new RuntimeCamelException(String.format( + "Ignite Compute endpoint with RUN executionType is only " + "supported for IgniteRunnable payloads, or collections of them. The payload type was: %s.", job.getClass().getName())); + } + } + + @SuppressWarnings("unchecked") + private <T, R1, R2> void doApply(final Exchange exchange, final AsyncCallback callback, IgniteCompute compute) throws Exception { + IgniteClosure<T, R1> job = exchange.getIn().getBody(IgniteClosure.class); + T params = (T) exchange.getIn().getHeader(IgniteConstants.IGNITE_COMPUTE_PARAMS); + + if (job == null || params == null) { + throw new RuntimeCamelException( + String.format("Ignite Compute endpoint with APPLY executionType is only " + "supported for IgniteClosure payloads with parameters. The payload type was: %s.", + exchange.getIn().getBody().getClass().getName())); + } + + IgniteReducer<R1, R2> reducer = exchange.getIn().getHeader(IgniteConstants.IGNITE_COMPUTE_REDUCER, IgniteReducer.class); + + if (Collection.class.isAssignableFrom(params.getClass())) { + Collection<T> colParams = (Collection<T>) params; + if (reducer == null) { + compute.apply(job, colParams); + } else { + compute.apply(job, colParams, reducer); + } + } else { + compute.apply(job, params); + } + } + + @SuppressWarnings("unchecked") + private void doAffinityCall(final Exchange exchange, final AsyncCallback callback, IgniteCompute compute) throws Exception { + IgniteCallable<Object> job = exchange.getIn().getBody(IgniteCallable.class); + String affinityCache = exchange.getIn().getHeader(IgniteConstants.IGNITE_COMPUTE_AFFINITY_CACHE_NAME, String.class); + Object affinityKey = exchange.getIn().getHeader(IgniteConstants.IGNITE_COMPUTE_AFFINITY_KEY, Object.class); + + if (job == null || affinityCache == null || affinityKey == null) { + throw new RuntimeCamelException(String.format( + "Ignite Compute endpoint with AFFINITY_CALL executionType is only " + "supported for IgniteCallable payloads, along with an affinity cache and key. The payload type was: %s.", + exchange.getIn().getBody().getClass().getName())); + } + + compute.affinityCall(affinityCache, affinityKey, job); + } + + private void doAffinityRun(final Exchange exchange, final AsyncCallback callback, IgniteCompute compute) throws Exception { + IgniteRunnable job = exchange.getIn().getBody(IgniteRunnable.class); + String affinityCache = exchange.getIn().getHeader(IgniteConstants.IGNITE_COMPUTE_AFFINITY_CACHE_NAME, String.class); + Object affinityKey = exchange.getIn().getHeader(IgniteConstants.IGNITE_COMPUTE_AFFINITY_KEY, Object.class); + + if (job == null || affinityCache == null || affinityKey == null) { + throw new RuntimeCamelException(String.format( + "Ignite Compute endpoint with AFFINITY_RUN executionType is only " + "supported for IgniteRunnable payloads, along with an affinity cache and key. The payload type was: %s.", + exchange.getIn().getBody().getClass().getName())); + } + + compute.affinityRun(affinityCache, affinityKey, job); + } + + private IgniteComputeExecutionType executionTypeFor(Exchange exchange) { + return exchange.getIn().getHeader(IgniteConstants.IGNITE_COMPUTE_EXECUTION_TYPE, endpoint.getExecutionType(), IgniteComputeExecutionType.class); + } + + private static class IgniteInCamelClosure implements IgniteInClosure<IgniteFuture<Object>> { + private static final long serialVersionUID = 7486030906412223384L; + + private Exchange exchange; + private AsyncCallback callback; + + private static IgniteInCamelClosure create(Exchange exchange, AsyncCallback callback) { + IgniteInCamelClosure answer = new IgniteInCamelClosure(); + answer.exchange = exchange; + answer.callback = callback; + return answer; + } + + @Override + public void apply(IgniteFuture<Object> future) { + Message in = exchange.getIn(); + Message out = exchange.getOut(); + MessageHelper.copyHeaders(in, out, true); + + Object result = null; + + try { + result = future.get(); + } catch (Exception e) { + exchange.setException(e); + callback.done(false); + return; + } + + exchange.getOut().setBody(result); + callback.done(false); + } + }; + +} http://git-wip-us.apache.org/repos/asf/camel/blob/a695c5d3/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/events/IgniteEventsConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/events/IgniteEventsConsumer.java b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/events/IgniteEventsConsumer.java new file mode 100644 index 0000000..5d63611 --- /dev/null +++ b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/events/IgniteEventsConsumer.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.ignite.events; + +import org.apache.camel.AsyncCallback; +import org.apache.camel.Exchange; +import org.apache.camel.ExchangePattern; +import org.apache.camel.Message; +import org.apache.camel.Processor; +import org.apache.camel.impl.DefaultConsumer; +import org.apache.ignite.IgniteEvents; +import org.apache.ignite.events.Event; +import org.apache.ignite.lang.IgnitePredicate; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Ignite Events consumer. + */ +public class IgniteEventsConsumer extends DefaultConsumer { + + private static final Logger LOG = LoggerFactory.getLogger(IgniteEventsConsumer.class); + + private IgniteEventsEndpoint endpoint; + private IgniteEvents events; + private int[] eventTypes = new int[0]; + + private IgnitePredicate<Event> predicate = new IgnitePredicate<Event>() { + private static final long serialVersionUID = 6738594728074592726L; + + @Override + public boolean apply(Event event) { + Exchange exchange = endpoint.createExchange(ExchangePattern.InOnly); + Message in = exchange.getIn(); + in.setBody(event); + try { + getAsyncProcessor().process(exchange, new AsyncCallback() { + @Override + public void done(boolean doneSync) { + // do nothing + } + }); + } catch (Exception e) { + LOG.error(String.format("Exception while processing Ignite Event: %s.", event), e); + } + return true; + } + }; + + public IgniteEventsConsumer(IgniteEventsEndpoint endpoint, Processor processor, IgniteEvents events) { + super(endpoint, processor); + this.endpoint = endpoint; + this.events = events; + } + + @Override + protected void doStart() throws Exception { + super.doStart(); + + if (endpoint.getEvents() != null && endpoint.getEvents().size() > 0) { + eventTypes = new int[endpoint.getEvents().size()]; + int counter = 0; + for (Integer i : endpoint.getEvents()) { + eventTypes[counter++] = i; + } + } + + events.localListen(predicate, eventTypes); + } + + @Override + protected void doStop() throws Exception { + super.doStop(); + + events.stopLocalListen(predicate, eventTypes); + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/a695c5d3/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/events/IgniteEventsEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/events/IgniteEventsEndpoint.java b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/events/IgniteEventsEndpoint.java new file mode 100644 index 0000000..6237ad8 --- /dev/null +++ b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/events/IgniteEventsEndpoint.java @@ -0,0 +1,146 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.ignite.events; + +import java.lang.reflect.Field; +import java.net.URI; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.apache.camel.Consumer; +import org.apache.camel.Processor; +import org.apache.camel.Producer; +import org.apache.camel.component.ignite.AbstractIgniteEndpoint; +import org.apache.camel.component.ignite.ClusterGroupExpression; +import org.apache.camel.component.ignite.IgniteComponent; +import org.apache.camel.spi.UriEndpoint; +import org.apache.camel.spi.UriParam; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteEvents; +import org.apache.ignite.cluster.ClusterGroup; +import org.apache.ignite.events.EventType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Ignite Events endpoint. Only supports consumers. + */ +@UriEndpoint(scheme = "ignite:events", title = "Ignite Events", syntax = "ignite:events:endpointId", label = "nosql,cache,compute,messaging,data", + consumerOnly = true, consumerClass = IgniteEventsConsumer.class) +public class IgniteEventsEndpoint extends AbstractIgniteEndpoint { + + private static final Logger LOG = LoggerFactory.getLogger(IgniteEventsEndpoint.class); + + @UriParam + private Set<Integer> events; + + @UriParam + private ClusterGroupExpression clusterGroupExpression; + + public IgniteEventsEndpoint(String uri, URI remainingUri, Map<String, Object> parameters, IgniteComponent igniteComponent) { + super(uri, igniteComponent); + + // Initialize subscribed event types with ALL. + events = new HashSet<>(); + for (Integer eventType : EventType.EVTS_ALL) { + events.add(eventType); + } + } + + @Override + public Producer createProducer() throws Exception { + throw new UnsupportedOperationException("The Ignite Events endpoint does not support producers."); + } + + @Override + public Consumer createConsumer(Processor processor) throws Exception { + // Initialize the Consumer. + IgniteEvents events = createIgniteEvents(); + IgniteEventsConsumer consumer = new IgniteEventsConsumer(this, processor, events); + configureConsumer(consumer); + + LOG.info("Created Ignite Events consumer for event types: {}.", events); + + return consumer; + } + + private IgniteEvents createIgniteEvents() { + Ignite ignite = ignite(); + IgniteEvents events; + if (clusterGroupExpression == null) { + LOG.info("Ignite Events endpoint for event types {} using no Cluster Group.", this.events); + events = ignite.events(); + } else { + ClusterGroup group = clusterGroupExpression.getClusterGroup(ignite); + LOG.info("Ignite Events endpoint for event types {} using Cluster Group: {}.", this.events, group); + events = ignite.events(group); + } + return events; + } + + /** + * Gets the event types to subscribe to. + * + * @return + */ + public Set<Integer> getEvents() { + return events; + } + + /** + * Sets the event types to subscribe to as a {@link Set}. + * + * @param events + */ + public void setEvents(Set<Integer> events) { + this.events = events; + } + + /** + * Sets the event types to subscribe to as a comma-separated string of event constants as defined in {@link EventType}. + * <p> + * For example: EVT_CACHE_ENTRY_CREATED,EVT_CACHE_OBJECT_REMOVED,EVT_IGFS_DIR_CREATED. + * + * @param events + */ + public void setEvents(String events) { + this.events = new HashSet<>(); + Set<String> requestedEvents = new HashSet<>(Arrays.asList(events.toUpperCase().split(","))); + Field[] fields = EventType.class.getDeclaredFields(); + for (Field field : fields) { + if (!requestedEvents.contains(field.getName())) { + continue; + } + try { + this.events.add(field.getInt(null)); + } catch (Exception e) { + throw new IllegalArgumentException("Problem while resolving event type. See stacktrace.", e); + } + } + } + + public ClusterGroupExpression getClusterGroupExpression() { + return clusterGroupExpression; + } + + public void setClusterGroupExpression(ClusterGroupExpression clusterGroupExpression) { + this.clusterGroupExpression = clusterGroupExpression; + } + +}