Repository: camel Updated Branches: refs/heads/camel-ignite [created] a695c5d36
http://git-wip-us.apache.org/repos/asf/camel/blob/a695c5d3/components/camel-ignite/src/test/java/org/apache/camel/component/ignite/IgniteComputeTest.java ---------------------------------------------------------------------- diff --git a/components/camel-ignite/src/test/java/org/apache/camel/component/ignite/IgniteComputeTest.java b/components/camel-ignite/src/test/java/org/apache/camel/component/ignite/IgniteComputeTest.java new file mode 100644 index 0000000..fd66450 --- /dev/null +++ b/components/camel-ignite/src/test/java/org/apache/camel/component/ignite/IgniteComputeTest.java @@ -0,0 +1,192 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.ignite; + +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import com.google.common.base.Splitter; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; + +import org.apache.camel.RuntimeCamelException; +import org.apache.camel.util.ObjectHelper; +import org.apache.ignite.Ignite; +import org.apache.ignite.Ignition; +import org.apache.ignite.events.EventType; +import org.junit.After; +import org.junit.Test; + +import static com.google.common.truth.Truth.assert_; + +public class IgniteComputeTest extends AbstractIgniteTest { + + private static final List<Ignite> ADDITIONAL_INSTANCES = Lists.newArrayList(); + private static final List<UUID> LISTENERS = Lists.newArrayList(); + + @Test + public void testExecuteWithWrongPayload() { + try { + template.requestBody("ignite:compute:abc?executionType=EXECUTE", TestIgniteComputeResources.TEST_CALLABLE, String.class); + } catch (Exception e) { + assert_().that(ObjectHelper.getException(RuntimeCamelException.class, e).getMessage()).startsWith("Ignite Compute endpoint with EXECUTE"); + return; + } + + fail(); + } + + @Test + @SuppressWarnings("unchecked") + public void testCall() { + TestIgniteComputeResources.COUNTER.set(0); + + // Single Callable. + String result = template.requestBody("ignite:compute:abc?executionType=CALL", TestIgniteComputeResources.TEST_CALLABLE, String.class); + + assert_().that(result).isEqualTo("hello"); + + // Collection of Callables. + Object[] callables = new Object[5]; + Arrays.fill(callables, TestIgniteComputeResources.TEST_CALLABLE); + Collection<String> colResult = template.requestBody("ignite:compute:abc?executionType=CALL", Lists.newArrayList(callables), Collection.class); + + assert_().that(colResult).containsExactly("hello", "hello", "hello", "hello", "hello").inOrder(); + + // Callables with a Reducer. + String reduced = template.requestBodyAndHeader("ignite:compute:abc?executionType=CALL", Lists.newArrayList(callables), IgniteConstants.IGNITE_COMPUTE_REDUCER, + TestIgniteComputeResources.STRING_JOIN_REDUCER, String.class); + + assert_().that(reduced).isEqualTo("hellohellohellohellohello"); + } + + @Test + public void testRun() { + TestIgniteComputeResources.COUNTER.set(0); + + // Single Runnable. + Object result = template.requestBody("ignite:compute:abc?executionType=RUN", TestIgniteComputeResources.TEST_RUNNABLE_COUNTER, Object.class); + assert_().that(result).isNull(); + assert_().that(TestIgniteComputeResources.COUNTER.get()).isEqualTo(1); + + // Multiple Runnables. + Object[] runnables = new Object[5]; + Arrays.fill(runnables, TestIgniteComputeResources.TEST_RUNNABLE_COUNTER); + result = template.requestBody("ignite:compute:abc?executionType=RUN", Lists.newArrayList(runnables), Collection.class); + assert_().that(result).isNull(); + assert_().that(TestIgniteComputeResources.COUNTER.get()).isEqualTo(6); + } + + @Test + @SuppressWarnings("unchecked") + public void testBroadcast() { + TestIgniteComputeResources.COUNTER.set(0); + + startAdditionalGridInstance(); + startAdditionalGridInstance(); + + ignite().events().enableLocal(EventType.EVT_JOB_FINISHED); + LISTENERS.add(ignite().events().remoteListen(null, TestIgniteComputeResources.EVENT_COUNTER, EventType.EVT_JOB_FINISHED)); + + // Single Runnable. + Object result = template.requestBody("ignite:compute:abc?executionType=BROADCAST", TestIgniteComputeResources.TEST_RUNNABLE, Object.class); + assert_().that(result).isNull(); + assert_().that(TestIgniteComputeResources.COUNTER.get()).isEqualTo(3); + + // Single Callable. + Collection<String> colResult = template.requestBody("ignite:compute:abc?executionType=BROADCAST", TestIgniteComputeResources.TEST_CALLABLE, Collection.class); + assert_().that(colResult).isNotNull(); + assert_().that(colResult).containsExactly("hello", "hello", "hello").inOrder(); + + // Single Closure. + colResult = template.requestBodyAndHeader("ignite:compute:abc?executionType=BROADCAST", TestIgniteComputeResources.TEST_CLOSURE, IgniteConstants.IGNITE_COMPUTE_PARAMS, "Camel", + Collection.class); + assert_().that(colResult).isNotNull(); + assert_().that(colResult).containsExactly("hello Camel", "hello Camel", "hello Camel").inOrder(); + } + + @Test + public void testExecute() { + TestIgniteComputeResources.COUNTER.set(0); + + startAdditionalGridInstance(); + startAdditionalGridInstance(); + + ignite().events().enableLocal(EventType.EVT_JOB_RESULTED); + LISTENERS.add(ignite().events().remoteListen(null, TestIgniteComputeResources.EVENT_COUNTER, EventType.EVT_JOB_RESULTED)); + + // ComputeTask instance. + String result = template.requestBodyAndHeader("ignite:compute:abc?executionType=EXECUTE", TestIgniteComputeResources.COMPUTE_TASK, IgniteConstants.IGNITE_COMPUTE_PARAMS, 10, String.class); + assert_().that(result).isNotNull(); + assert_().that(Splitter.on(",").splitToList(result)).containsAllOf("a0", "a1", "a2", "a3", "a4", "a5", "a6", "a7", "a8", "a9"); + + // ComputeTask class. + result = template.requestBodyAndHeader("ignite:compute:abc?executionType=EXECUTE", TestIgniteComputeResources.COMPUTE_TASK.getClass(), IgniteConstants.IGNITE_COMPUTE_PARAMS, 10, String.class); + assert_().that(result).isNotNull(); + assert_().that(Splitter.on(",").splitToList(result)).containsAllOf("a0", "a1", "a2", "a3", "a4", "a5", "a6", "a7", "a8", "a9"); + } + + @Test + @SuppressWarnings("unchecked") + public void testApply() { + TestIgniteComputeResources.COUNTER.set(0); + + // Closure with a single parameter. + String result = template.requestBodyAndHeader("ignite:compute:abc?executionType=APPLY", TestIgniteComputeResources.TEST_CLOSURE, IgniteConstants.IGNITE_COMPUTE_PARAMS, "Camel", String.class); + assert_().that(result).isEqualTo("hello Camel"); + + // Closure with a Collection of parameters. + Collection<String> colResult = template.requestBodyAndHeader("ignite:compute:abc?executionType=APPLY", TestIgniteComputeResources.TEST_CLOSURE, IgniteConstants.IGNITE_COMPUTE_PARAMS, + Lists.newArrayList("Camel1", "Camel2", "Camel3"), Collection.class); + assert_().that(colResult).containsAllOf("hello Camel1", "hello Camel2", "hello Camel3"); + + // Closure with a Collection of parameters and a Reducer. + Map<String, Object> headers = ImmutableMap.<String, Object> of(IgniteConstants.IGNITE_COMPUTE_PARAMS, Lists.newArrayList("Camel1", "Camel2", "Camel3"), IgniteConstants.IGNITE_COMPUTE_REDUCER, + TestIgniteComputeResources.STRING_JOIN_REDUCER); + result = template.requestBodyAndHeaders("ignite:compute:abc?executionType=APPLY", TestIgniteComputeResources.TEST_CLOSURE, headers, String.class); + assert_().that(result).isEqualTo("hello Camel1hello Camel2hello Camel3"); + } + + @Override + public boolean isCreateCamelContextPerClass() { + return true; + } + + private void startAdditionalGridInstance() { + ADDITIONAL_INSTANCES.add(Ignition.start(buildComponent().getIgniteConfiguration())); + } + + @After + public void stopAdditionalIgniteInstances() { + for (Ignite ignite : ADDITIONAL_INSTANCES) { + ignite.close(); + } + ADDITIONAL_INSTANCES.clear(); + } + + @After + public void stopRemoteListeners() { + for (UUID uuid : LISTENERS) { + ignite().events().stopRemoteListen(uuid); + } + LISTENERS.clear(); + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/a695c5d3/components/camel-ignite/src/test/java/org/apache/camel/component/ignite/IgniteEventsTest.java ---------------------------------------------------------------------- diff --git a/components/camel-ignite/src/test/java/org/apache/camel/component/ignite/IgniteEventsTest.java b/components/camel-ignite/src/test/java/org/apache/camel/component/ignite/IgniteEventsTest.java new file mode 100644 index 0000000..316eacd --- /dev/null +++ b/components/camel-ignite/src/test/java/org/apache/camel/component/ignite/IgniteEventsTest.java @@ -0,0 +1,168 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.ignite; + + +import java.util.List; +import java.util.concurrent.TimeUnit; + +import javax.cache.expiry.CreatedExpiryPolicy; +import javax.cache.expiry.Duration; + +import com.google.common.base.Function; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; + +import org.apache.camel.Exchange; +import org.apache.camel.Route; +import org.apache.camel.ServiceStatus; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.impl.JndiRegistry; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.events.Event; +import org.apache.ignite.events.EventType; +import org.junit.After; +import org.junit.Test; + +import static com.google.common.truth.Truth.assert_; + +public class IgniteEventsTest extends AbstractIgniteTest { + + @Test + public void testConsumeAllEvents() throws Exception { + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + from("ignite:events:abc").to("mock:test1"); + } + }); + + getMockEndpoint("mock:test1").expectedMinimumMessageCount(9); + + IgniteCache<String, String> cache = ignite().getOrCreateCache("abc"); + + // Generate cache activity. + cache.put("abc", "123"); + cache.get("abc"); + cache.remove("abc"); + cache.withExpiryPolicy(CreatedExpiryPolicy.factoryOf(new Duration(TimeUnit.MILLISECONDS, 100)).create()).put("abc", "123"); + + Thread.sleep(150); + + cache.get("abc"); + + assertMockEndpointsSatisfied(); + + List<Integer> eventTypes = receivedEventTypes("mock:test1"); + + assert_().that(eventTypes).containsAllOf(EventType.EVT_CACHE_STARTED, EventType.EVT_CACHE_ENTRY_CREATED, EventType.EVT_CACHE_OBJECT_PUT, EventType.EVT_CACHE_OBJECT_READ, + EventType.EVT_CACHE_OBJECT_REMOVED, EventType.EVT_CACHE_OBJECT_PUT, EventType.EVT_CACHE_OBJECT_EXPIRED, EventType.EVT_CACHE_OBJECT_READ).inOrder(); + + } + + @Test + public void testConsumeFilteredEventsWithRef() throws Exception { + context.getRegistry(JndiRegistry.class).bind("filter", Sets.newHashSet(EventType.EVT_CACHE_OBJECT_PUT)); + + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + from("ignite:events:abc?events=#filter").to("mock:test2"); + } + }); + + getMockEndpoint("mock:test2").expectedMessageCount(2); + + IgniteCache<String, String> cache = ignite().getOrCreateCache("abc"); + + // Generate cache activity. + cache.put("abc", "123"); + cache.get("abc"); + cache.remove("abc"); + cache.get("abc"); + cache.put("abc", "123"); + + assertMockEndpointsSatisfied(); + + List<Integer> eventTypes = receivedEventTypes("mock:test2"); + + assert_().that(eventTypes).containsExactly(EventType.EVT_CACHE_OBJECT_PUT, EventType.EVT_CACHE_OBJECT_PUT).inOrder(); + } + + @Test + public void testConsumeFilteredEventsInline() throws Exception { + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + from("ignite:events:abc?events=EVT_CACHE_OBJECT_PUT").to("mock:test3"); + } + }); + + getMockEndpoint("mock:test3").expectedMessageCount(2); + + IgniteCache<String, String> cache = ignite().getOrCreateCache("abc"); + + // Generate cache activity. + cache.put("abc", "123"); + cache.get("abc"); + cache.remove("abc"); + cache.get("abc"); + cache.put("abc", "123"); + + assertMockEndpointsSatisfied(); + + List<Integer> eventTypes = receivedEventTypes("mock:test3"); + + assert_().that(eventTypes).containsExactly(EventType.EVT_CACHE_OBJECT_PUT, EventType.EVT_CACHE_OBJECT_PUT).inOrder(); + + } + + private List<Integer> receivedEventTypes(String mockEndpoint) { + List<Integer> eventTypes = Lists.newArrayList(Lists.transform(getMockEndpoint(mockEndpoint).getExchanges(), new Function<Exchange, Integer>() { + @Override + public Integer apply(Exchange input) { + return input.getIn().getBody(Event.class).type(); + } + })); + return eventTypes; + } + + @Override + public boolean isCreateCamelContextPerClass() { + return true; + } + + @After + public void stopAllRoutes() throws Exception { + for (Route route : context.getRoutes()) { + if (context.getRouteStatus(route.getId()) != ServiceStatus.Started) { + return; + } + context.stopRoute(route.getId()); + } + resetMocks(); + } + + @Override + protected IgniteComponent buildComponent() { + IgniteConfiguration config = new IgniteConfiguration(); + config.setIncludeEventTypes(EventType.EVTS_ALL_MINUS_METRIC_UPDATE); + return IgniteComponent.fromConfiguration(config); + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/a695c5d3/components/camel-ignite/src/test/java/org/apache/camel/component/ignite/IgniteIdGenTest.java ---------------------------------------------------------------------- diff --git a/components/camel-ignite/src/test/java/org/apache/camel/component/ignite/IgniteIdGenTest.java b/components/camel-ignite/src/test/java/org/apache/camel/component/ignite/IgniteIdGenTest.java new file mode 100644 index 0000000..c310e41 --- /dev/null +++ b/components/camel-ignite/src/test/java/org/apache/camel/component/ignite/IgniteIdGenTest.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.ignite; + +import com.google.common.collect.ImmutableSet; + +import org.apache.camel.component.ignite.idgen.IgniteIdGenEndpoint; +import org.apache.camel.component.ignite.idgen.IgniteIdGenOperation; +import org.apache.ignite.IgniteAtomicSequence; +import org.junit.After; +import org.junit.Test; + +import static com.google.common.truth.Truth.assert_; + +public class IgniteIdGenTest extends AbstractIgniteTest { + + @Test + public void testOperations() { + assert_().that(template.requestBody("ignite:idgen:abc?initialValue=0&operation=GET", null, Long.class)).isEqualTo(0); + assert_().that(template.requestBody("ignite:idgen:abc?initialValue=0&operation=GET_AND_INCREMENT", null, Long.class)).isEqualTo(0); + assert_().that(template.requestBody("ignite:idgen:abc?initialValue=0&operation=INCREMENT_AND_GET", null, Long.class)).isEqualTo(2); + assert_().that(template.requestBody("ignite:idgen:abc?initialValue=0&operation=ADD_AND_GET", 5, Long.class)).isEqualTo(7); + assert_().that(template.requestBody("ignite:idgen:abc?initialValue=0&operation=GET_AND_ADD", 5, Long.class)).isEqualTo(7); + assert_().that(template.requestBody("ignite:idgen:abc?initialValue=0&operation=GET", 5, Long.class)).isEqualTo(12); + } + + @Test + public void testInitialValue() { + assert_().that(template.requestBody("ignite:idgen:abc?operation=GET&initialValue=100", null, Long.class)).isEqualTo(100); + assert_().that(template.requestBody("ignite:idgen:abc?operation=GET_AND_INCREMENT&initialValue=100", null, Long.class)).isEqualTo(100); + assert_().that(template.requestBody("ignite:idgen:abc?operation=INCREMENT_AND_GET&initialValue=100", null, Long.class)).isEqualTo(102); + assert_().that(template.requestBody("ignite:idgen:abc?operation=ADD_AND_GET&initialValue=100", 5, Long.class)).isEqualTo(107); + assert_().that(template.requestBody("ignite:idgen:abc?operation=GET_AND_ADD&initialValue=100", 5, Long.class)).isEqualTo(107); + assert_().that(template.requestBody("ignite:idgen:abc?operation=GET&initialValue=100", 5, Long.class)).isEqualTo(112); + } + + @Test + public void testDifferentOperation() { + assert_().that(template.requestBody("ignite:idgen:abc?operation=GET&initialValue=100", null, Long.class)).isEqualTo(100); + assert_().that(template.requestBodyAndHeader("ignite:idgen:abc?operation=GET_AND_INCREMENT&initialValue=100", null, IgniteConstants.IGNITE_IDGEN_OPERATION, + IgniteIdGenOperation.INCREMENT_AND_GET, Long.class)).isEqualTo(101); + } + + @Test + public void testBatchSize() { + IgniteIdGenEndpoint endpoint = context.getEndpoint("ignite:idgen:abc?operation=GET&initialValue=100&batchSize=100", IgniteIdGenEndpoint.class); + assert_().that(template.requestBody(endpoint, null, Long.class)).isEqualTo(100); + + // Cannot test much here with a single Ignite instance, let's just test that the parameter could be set. + assert_().that(endpoint.getBatchSize()); + } + + @Override + public boolean isCreateCamelContextPerClass() { + return true; + } + + @After + public void deleteSets() { + for (String name : ImmutableSet.<String> of("abc")) { + IgniteAtomicSequence seq = ignite().atomicSequence(name, 0, false); + if (seq == null) { + continue; + } + seq.close(); + } + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/a695c5d3/components/camel-ignite/src/test/java/org/apache/camel/component/ignite/IgniteMessagingTest.java ---------------------------------------------------------------------- diff --git a/components/camel-ignite/src/test/java/org/apache/camel/component/ignite/IgniteMessagingTest.java b/components/camel-ignite/src/test/java/org/apache/camel/component/ignite/IgniteMessagingTest.java new file mode 100644 index 0000000..a1cee04 --- /dev/null +++ b/components/camel-ignite/src/test/java/org/apache/camel/component/ignite/IgniteMessagingTest.java @@ -0,0 +1,164 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.ignite; + +import java.io.Serializable; +import java.util.List; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import com.google.common.collect.ContiguousSet; +import com.google.common.collect.DiscreteDomain; +import com.google.common.collect.Lists; +import com.google.common.collect.Range; + +import org.apache.camel.Consumer; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.ignite.lang.IgniteBiPredicate; +import org.junit.After; +import org.junit.Test; + +import static com.google.common.truth.Truth.assert_; +import static com.jayway.awaitility.Awaitility.await; +import static com.jayway.awaitility.Awaitility.to; +import static org.hamcrest.Matchers.equalTo; + +public class IgniteMessagingTest extends AbstractIgniteTest implements Serializable { + + private static final long serialVersionUID = 3967738538216977749L; + + private static final String TOPIC1 = "TOPIC1"; + private static final String TOPIC2 = "TOPIC2"; + private UUID uuid; + + @Test + public void testProducerSendMessage() { + List<Object> messages = Lists.newArrayList(); + setupMessageListener(TOPIC1, messages); + + template.requestBody("ignite:messaging:TOPIC1", 1); + + await().atMost(5, TimeUnit.SECONDS).untilCall(to(messages).size(), equalTo(1)); + assert_().that(messages.get(0)).isEqualTo(1); + } + + @Test + public void testProducerSendMessageTopicInHeader() throws Exception { + List<Object> messages1 = Lists.newArrayList(); + setupMessageListener(TOPIC1, messages1); + + List<Object> messages2 = Lists.newArrayList(); + setupMessageListener(TOPIC2, messages2); + + template.requestBodyAndHeader("ignite:messaging:TOPIC1", 1, IgniteConstants.IGNITE_MESSAGING_TOPIC, "TOPIC2"); + + Thread.sleep(1000); + assert_().that(messages1.size()).isEqualTo(0); + assert_().that(messages2.size()).isEqualTo(1); + } + + @Test + public void testProducerSendManyMessages() { + List<Object> messages = Lists.newArrayList(); + setupMessageListener(TOPIC1, messages); + + Set<Integer> request = ContiguousSet.create(Range.closedOpen(0, 100), DiscreteDomain.integers()); + template.requestBody("ignite:messaging:TOPIC1", request); + + await().atMost(5, TimeUnit.SECONDS).untilCall(to(messages).size(), equalTo(100)); + assert_().that(messages).containsAllIn(request); + } + + @Test + public void testProducerSendManyMessagesOrdered() { + List<Object> messages = Lists.newArrayList(); + setupMessageListener(TOPIC1, messages); + + ContiguousSet<Integer> set = ContiguousSet.create(Range.closedOpen(0, 100), DiscreteDomain.integers()); + for (int i : set) { + template.requestBody("ignite:messaging:TOPIC1?sendMode=ORDERED&timeout=1000", i); + } + + await().atMost(5, TimeUnit.SECONDS).untilCall(to(messages).size(), equalTo(100)); + assert_().that(messages).containsAllIn(set); + } + + @Test + public void testProducerSendCollectionAsObject() { + List<Object> messages = Lists.newArrayList(); + setupMessageListener(TOPIC1, messages); + + Set<Integer> request = ContiguousSet.create(Range.closedOpen(0, 100), DiscreteDomain.integers()); + template.requestBody("ignite:messaging:TOPIC1?treatCollectionsAsCacheObjects=true", request); + + await().atMost(5, TimeUnit.SECONDS).untilCall(to(messages).size(), equalTo(1)); + assert_().that(messages.get(0)).isEqualTo(request); + } + + @Test + public void testConsumerManyMessages() throws Exception { + List<Object> messages = Lists.newArrayList(); + Consumer consumer = context.getEndpoint("ignite:messaging:TOPIC1").createConsumer(storeBodyInListProcessor(messages)); + consumer.start(); + + Set<Integer> messagesToSend = ContiguousSet.create(Range.closedOpen(0, 100), DiscreteDomain.integers()); + ignite().message().send(TOPIC1, messagesToSend); + + await().atMost(5, TimeUnit.SECONDS).untilCall(to(messages).size(), equalTo(100)); + + consumer.stop(); + } + + private void setupMessageListener(String topic, final List<Object> messages) { + uuid = ignite().message().remoteListen(topic, new IgniteBiPredicate<UUID, Object>() { + private static final long serialVersionUID = 1L; + + @Override + public boolean apply(UUID uuid, Object message) { + messages.add(message); + return true; + } + }); + } + + @After + public void stopMessageListener() { + if (uuid == null) { + return; + } + + ignite().message().stopRemoteListen(uuid); + uuid = null; + } + + @Override + public boolean isCreateCamelContextPerClass() { + return true; + } + + private Processor storeBodyInListProcessor(final List<Object> list) { + return new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + list.add(exchange.getIn().getBody()); + } + }; + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/a695c5d3/components/camel-ignite/src/test/java/org/apache/camel/component/ignite/IgniteQueueTest.java ---------------------------------------------------------------------- diff --git a/components/camel-ignite/src/test/java/org/apache/camel/component/ignite/IgniteQueueTest.java b/components/camel-ignite/src/test/java/org/apache/camel/component/ignite/IgniteQueueTest.java new file mode 100644 index 0000000..77fb769 --- /dev/null +++ b/components/camel-ignite/src/test/java/org/apache/camel/component/ignite/IgniteQueueTest.java @@ -0,0 +1,257 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.ignite; + + +import java.util.Iterator; +import java.util.List; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import com.google.common.base.Stopwatch; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterators; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; + +import org.apache.camel.component.ignite.queue.IgniteQueueEndpoint; +import org.apache.camel.component.ignite.queue.IgniteQueueOperation; +import org.apache.camel.impl.JndiRegistry; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.configuration.CollectionConfiguration; +import org.junit.After; +import org.junit.Test; + +import static com.google.common.truth.Truth.assert_; + + +public class IgniteQueueTest extends AbstractIgniteTest { + + @Test + public void testOperations() { + boolean result = template.requestBody("ignite:queue:abc?operation=ADD", "hello", boolean.class); + assert_().that(result).isTrue(); + assert_().that(ignite().queue("abc", 0, new CollectionConfiguration()).contains("hello")).isTrue(); + + result = template.requestBody("ignite:queue:abc?operation=CONTAINS", "hello", boolean.class); + assert_().that(result).isTrue(); + assert_().that(ignite().queue("abc", 0, new CollectionConfiguration()).contains("hello")).isTrue(); + + result = template.requestBody("ignite:queue:abc?operation=REMOVE", "hello", boolean.class); + assert_().that(result).isTrue(); + assert_().that(ignite().queue("abc", 0, new CollectionConfiguration()).contains("hello")).isFalse(); + + result = template.requestBody("ignite:queue:abc?operation=CONTAINS", "hello", boolean.class); + assert_().that(result).isFalse(); + } + + @Test + @SuppressWarnings("unchecked") + public void testOperations2() { + for (int i = 0; i < 100; i++) { + template.requestBody("ignite:queue:abc?operation=ADD", "hello" + i); + } + + // SIZE + int size = template.requestBody("ignite:queue:abc?operation=SIZE", "hello", int.class); + assert_().that(size).isEqualTo(100); + assert_().that(ignite().queue("abc", 0, new CollectionConfiguration()).size()).isEqualTo(100); + + List<String> toRetain = Lists.newArrayList(); + for (int i = 0; i < 50; i++) { + toRetain.add("hello" + i); + } + + // RETAIN_ALL + boolean retained = template.requestBodyAndHeader("ignite:queue:abc?operation=CLEAR", toRetain, IgniteConstants.IGNITE_QUEUE_OPERATION, IgniteQueueOperation.RETAIN_ALL, boolean.class); + assert_().that(retained).isTrue(); + + // SIZE + size = template.requestBody("ignite:queue:abc?operation=SIZE", "hello", int.class); + assert_().that(size).isEqualTo(50); + assert_().that(ignite().queue("abc", 0, new CollectionConfiguration()).size()).isEqualTo(50); + + // ITERATOR + Iterator<String> iterator = template.requestBody("ignite:queue:abc?operation=ITERATOR", "hello", Iterator.class); + assert_().that(Iterators.toArray(iterator, String.class)).asList().containsExactlyElementsIn(toRetain).inOrder(); + + // ARRAY + String[] array = template.requestBody("ignite:queue:abc?operation=ARRAY", "hello", String[].class); + assert_().that(array).asList().containsExactlyElementsIn(toRetain).inOrder(); + + // CLEAR + Object result = template.requestBody("ignite:queue:abc?operation=CLEAR", "hello", String.class); + assert_().that(result).isEqualTo("hello"); + assert_().that(ignite().queue("abc", 0, new CollectionConfiguration()).size()).isEqualTo(0); + + // SIZE + size = template.requestBody("ignite:queue:abc?operation=SIZE", "hello", int.class); + assert_().that(size).isEqualTo(0); + assert_().that(ignite().queue("abc", 0, new CollectionConfiguration()).size()).isEqualTo(0); + } + + @Test + public void testRetainSingle() { + // Fill data. + for (int i = 0; i < 100; i++) { + template.requestBody("ignite:queue:abc?operation=ADD", "hello" + i); + } + + boolean retained = template.requestBody("ignite:queue:abc?operation=RETAIN_ALL", "hello10", boolean.class); + assert_().that(retained).isTrue(); + + // ARRAY + String[] array = template.requestBody("ignite:queue:abc?operation=ARRAY", "hello", String[].class); + assert_().that(array).asList().containsExactly("hello10"); + } + + @Test + public void testCollectionsAsCacheObject() { + // Fill data. + for (int i = 0; i < 100; i++) { + template.requestBody("ignite:queue:abc?operation=ADD", "hello" + i); + } + + // Add the set. + Set<String> toAdd = Sets.newHashSet("hello101", "hello102", "hello103"); + template.requestBody("ignite:queue:abc?operation=ADD&treatCollectionsAsCacheObjects=true", toAdd); + + // Size must be 101, not 103. + int size = template.requestBody("ignite:queue:abc?operation=SIZE", "hello", int.class); + assert_().that(size).isEqualTo(101); + assert_().that(ignite().queue("abc", 0, new CollectionConfiguration()).size()).isEqualTo(101); + assert_().that(ignite().queue("abc", 0, new CollectionConfiguration()).contains(toAdd)).isTrue(); + + // Check whether the Set contains the Set. + boolean contains = template.requestBody("ignite:queue:abc?operation=CONTAINS&treatCollectionsAsCacheObjects=true", toAdd, boolean.class); + assert_().that(contains).isTrue(); + + // Delete the Set. + template.requestBody("ignite:queue:abc?operation=REMOVE&treatCollectionsAsCacheObjects=true", toAdd); + + // Size must be 100 again. + size = template.requestBody("ignite:queue:abc?operation=SIZE", "hello", int.class); + assert_().that(size).isEqualTo(100); + assert_().that(ignite().queue("abc", 0, new CollectionConfiguration()).size()).isEqualTo(100); + assert_().that(ignite().queue("abc", 0, new CollectionConfiguration()).contains(toAdd)).isFalse(); + + } + + @Test + public void testWithConfiguration() { + CollectionConfiguration configuration = new CollectionConfiguration(); + configuration.setCacheMode(CacheMode.LOCAL); + + context.getRegistry(JndiRegistry.class).bind("config", configuration); + + IgniteQueueEndpoint igniteEndpoint = context.getEndpoint("ignite:queue:abc?operation=ADD&configuration=#config", IgniteQueueEndpoint.class); + template.requestBody(igniteEndpoint, "hello"); + + assert_().that(ignite().queue("abc", 0, configuration).size()).isEqualTo(1); + assert_().that(igniteEndpoint.getConfiguration()).isEqualTo(configuration); + } + + @Test + public void testBoundedQueueAndOtherOperations() throws Exception { + List<String> list = Lists.newArrayList(); + + // Fill data. + for (int i = 0; i < 100; i++) { + template.requestBody("ignite:queue:def?operation=ADD&capacity=100", "hello" + i); + list.add("hello" + i); + } + + // NOTE: Unfortunately the behaviour of IgniteQueue doesn't adhere to the overridden ADD method. It should return an Exception. + assert_().that(template.requestBody("ignite:queue:def?operation=ADD&capacity=100", "hello101", boolean.class)).isFalse(); + assert_().that(template.requestBody("ignite:queue:def?operation=OFFER&capacity=100", "hello101", boolean.class)).isFalse(); + + final CountDownLatch latch = new CountDownLatch(1); + Thread t = new Thread(new Runnable() { + @Override + public void run() { + assert_().that(template.requestBody("ignite:queue:def?operation=PUT&capacity=100", "hello101", boolean.class)).isFalse(); + latch.countDown(); + } + }); + + t.start(); + + // Wait 2 seconds and check that the thread was blocked. + assert_().that(latch.await(2000, TimeUnit.MILLISECONDS)).isFalse(); + t.interrupt(); + + // PEEK and ELEMENT. + assert_().that(template.requestBody("ignite:queue:def?operation=PEEK&capacity=100", null, String.class)).isEqualTo("hello0"); + assert_().that(template.requestBody("ignite:queue:def?operation=ELEMENT&capacity=100", null, String.class)).isEqualTo("hello0"); + + // TAKE. + assert_().that(template.requestBody("ignite:queue:def?operation=TAKE&capacity=100", null, String.class)).isEqualTo("hello0"); + assert_().that(template.requestBody("ignite:queue:def?operation=SIZE&capacity=100", null, int.class)).isEqualTo(99); + + // Now drain. + assert_().that(template.requestBody("ignite:queue:def?operation=DRAIN&capacity=100", null, String[].class)).asList().hasSize(99); + assert_().that(template.requestBody("ignite:queue:def?operation=SIZE&capacity=100", null, int.class)).isEqualTo(0); + assert_().that(template.requestBody("ignite:queue:def?operation=POLL&capacity=100", null, String.class)).isNull(); + + // TAKE. + t = new Thread(new Runnable() { + @Override + public void run() { + assert_().that(template.requestBody("ignite:queue:def?operation=TAKE&capacity=100", null, String.class)).isEqualTo("hello102"); + latch.countDown(); + } + }); + + t.start(); + + // Element was returned. + assert_().that(template.requestBody("ignite:queue:def?operation=ADD&capacity=100", "hello102", boolean.class)).isTrue(); + assert_().that(latch.await(1000, TimeUnit.MILLISECONDS)).isTrue(); + + // POLL with a timeout. + assert_().that(Executors.newSingleThreadExecutor().submit(new Callable<Long>() { + @Override + public Long call() throws Exception { + Stopwatch sw = Stopwatch.createStarted(); + assert_().that(template.requestBody("ignite:queue:def?operation=POLL&timeoutMillis=1000&capacity=100", null, String.class)).isNull(); + return sw.elapsed(TimeUnit.MILLISECONDS); + } + }).get()).isAtLeast(1000L); + + } + + @Override + public boolean isCreateCamelContextPerClass() { + return true; + } + + @After + public void deleteQueues() { + for (String queueName : ImmutableSet.<String> of("abc")) { + ignite().queue(queueName, 0, new CollectionConfiguration()).close(); + } + + // Bounded queues. + for (String queueName : ImmutableSet.<String> of("def")) { + ignite().queue(queueName, 100, new CollectionConfiguration()).close(); + } + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/a695c5d3/components/camel-ignite/src/test/java/org/apache/camel/component/ignite/IgniteSetTest.java ---------------------------------------------------------------------- diff --git a/components/camel-ignite/src/test/java/org/apache/camel/component/ignite/IgniteSetTest.java b/components/camel-ignite/src/test/java/org/apache/camel/component/ignite/IgniteSetTest.java new file mode 100644 index 0000000..3c3c627 --- /dev/null +++ b/components/camel-ignite/src/test/java/org/apache/camel/component/ignite/IgniteSetTest.java @@ -0,0 +1,176 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.ignite; + +import java.util.Iterator; +import java.util.List; +import java.util.Set; + +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterators; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; + +import org.apache.camel.component.ignite.set.IgniteSetEndpoint; +import org.apache.camel.component.ignite.set.IgniteSetOperation; +import org.apache.camel.impl.JndiRegistry; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.configuration.CollectionConfiguration; +import org.junit.After; +import org.junit.Test; + +import static com.google.common.truth.Truth.assert_; + +public class IgniteSetTest extends AbstractIgniteTest { + + @Test + public void testOperations() { + boolean result = template.requestBody("ignite:set:abc?operation=ADD", "hello", boolean.class); + assert_().that(result).isTrue(); + assert_().that(ignite().set("abc", new CollectionConfiguration()).contains("hello")).isTrue(); + + result = template.requestBody("ignite:set:abc?operation=CONTAINS", "hello", boolean.class); + assert_().that(result).isTrue(); + assert_().that(ignite().set("abc", new CollectionConfiguration()).contains("hello")).isTrue(); + + result = template.requestBody("ignite:set:abc?operation=REMOVE", "hello", boolean.class); + assert_().that(result).isTrue(); + assert_().that(ignite().set("abc", new CollectionConfiguration()).contains("hello")).isFalse(); + + result = template.requestBody("ignite:set:abc?operation=CONTAINS", "hello", boolean.class); + assert_().that(result).isFalse(); + } + + @Test + @SuppressWarnings("unchecked") + public void testOperations2() { + for (int i = 0; i < 100; i++) { + template.requestBody("ignite:set:abc?operation=ADD", "hello" + i); + } + + // SIZE + int size = template.requestBody("ignite:set:abc?operation=SIZE", "hello", int.class); + assert_().that(size).isEqualTo(100); + assert_().that(ignite().set("abc", new CollectionConfiguration()).size()).isEqualTo(100); + + List<String> toRetain = Lists.newArrayList(); + for (int i = 0; i < 50; i++) { + toRetain.add("hello" + i); + } + + // RETAIN_ALL + boolean retained = template.requestBodyAndHeader("ignite:set:abc?operation=CLEAR", toRetain, IgniteConstants.IGNITE_SETS_OPERATION, IgniteSetOperation.RETAIN_ALL, boolean.class); + assert_().that(retained).isTrue(); + + // SIZE + size = template.requestBody("ignite:set:abc?operation=SIZE", "hello", int.class); + assert_().that(size).isEqualTo(50); + assert_().that(ignite().set("abc", new CollectionConfiguration()).size()).isEqualTo(50); + + // ITERATOR + Iterator<String> iterator = template.requestBody("ignite:set:abc?operation=ITERATOR", "hello", Iterator.class); + assert_().that(Iterators.toArray(iterator, String.class)).asList().containsExactlyElementsIn(toRetain); + + // ARRAY + String[] array = template.requestBody("ignite:set:abc?operation=ARRAY", "hello", String[].class); + assert_().that(array).asList().containsExactlyElementsIn(toRetain); + + // CLEAR + Object result = template.requestBody("ignite:set:abc?operation=CLEAR", "hello", String.class); + assert_().that(result).isEqualTo("hello"); + assert_().that(ignite().set("abc", new CollectionConfiguration()).size()).isEqualTo(0); + + // SIZE + size = template.requestBody("ignite:set:abc?operation=SIZE", "hello", int.class); + assert_().that(size).isEqualTo(0); + assert_().that(ignite().set("abc", new CollectionConfiguration()).size()).isEqualTo(0); + } + + @Test + public void testRetainSingle() { + // Fill data. + for (int i = 0; i < 100; i++) { + template.requestBody("ignite:set:abc?operation=ADD", "hello" + i); + } + + boolean retained = template.requestBody("ignite:set:abc?operation=RETAIN_ALL", "hello10", boolean.class); + assert_().that(retained).isTrue(); + + // ARRAY + String[] array = template.requestBody("ignite:set:abc?operation=ARRAY", "hello", String[].class); + assert_().that(array).asList().containsExactly("hello10"); + } + + @Test + public void testCollectionsAsCacheObject() { + // Fill data. + for (int i = 0; i < 100; i++) { + template.requestBody("ignite:set:abc?operation=ADD", "hello" + i); + } + + // Add the set. + Set<String> toAdd = Sets.newHashSet("hello101", "hello102", "hello103"); + template.requestBody("ignite:set:abc?operation=ADD&treatCollectionsAsCacheObjects=true", toAdd); + + // Size must be 101, not 103. + int size = template.requestBody("ignite:set:abc?operation=SIZE", "hello", int.class); + assert_().that(size).isEqualTo(101); + assert_().that(ignite().set("abc", new CollectionConfiguration()).size()).isEqualTo(101); + assert_().that(ignite().set("abc", new CollectionConfiguration()).contains(toAdd)).isTrue(); + + // Check whether the Set contains the Set. + boolean contains = template.requestBody("ignite:set:abc?operation=CONTAINS&treatCollectionsAsCacheObjects=true", toAdd, boolean.class); + assert_().that(contains).isTrue(); + + // Delete the Set. + template.requestBody("ignite:set:abc?operation=REMOVE&treatCollectionsAsCacheObjects=true", toAdd); + + // Size must be 100 again. + size = template.requestBody("ignite:set:abc?operation=SIZE", "hello", int.class); + assert_().that(size).isEqualTo(100); + assert_().that(ignite().set("abc", new CollectionConfiguration()).size()).isEqualTo(100); + assert_().that(ignite().set("abc", new CollectionConfiguration()).contains(toAdd)).isFalse(); + + } + + @Test + public void testWithConfiguration() { + CollectionConfiguration configuration = new CollectionConfiguration(); + configuration.setCacheMode(CacheMode.LOCAL); + + context.getRegistry(JndiRegistry.class).bind("config", configuration); + + IgniteSetEndpoint igniteEndpoint = context.getEndpoint("ignite:set:abc?operation=ADD&configuration=#config", IgniteSetEndpoint.class); + template.requestBody(igniteEndpoint, "hello"); + + assert_().that(ignite().set("abc", configuration).size()).isEqualTo(1); + assert_().that(igniteEndpoint.getConfiguration()).isEqualTo(configuration); + } + + @Override + public boolean isCreateCamelContextPerClass() { + return true; + } + + @After + public void deleteSets() { + for (String setName : ImmutableSet.<String> of("abc")) { + ignite().set(setName, new CollectionConfiguration()).close(); + } + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/a695c5d3/components/camel-ignite/src/test/java/org/apache/camel/component/ignite/TestIgniteComputeResources.java ---------------------------------------------------------------------- diff --git a/components/camel-ignite/src/test/java/org/apache/camel/component/ignite/TestIgniteComputeResources.java b/components/camel-ignite/src/test/java/org/apache/camel/component/ignite/TestIgniteComputeResources.java new file mode 100644 index 0000000..7c31e16e --- /dev/null +++ b/components/camel-ignite/src/test/java/org/apache/camel/component/ignite/TestIgniteComputeResources.java @@ -0,0 +1,149 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.ignite; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; + +import com.google.common.base.Joiner; +import com.google.common.collect.Lists; + +import org.apache.ignite.IgniteException; +import org.apache.ignite.compute.ComputeJob; +import org.apache.ignite.compute.ComputeJobResult; +import org.apache.ignite.compute.ComputeTask; +import org.apache.ignite.compute.ComputeTaskSplitAdapter; +import org.apache.ignite.events.Event; +import org.apache.ignite.lang.IgniteCallable; +import org.apache.ignite.lang.IgniteClosure; +import org.apache.ignite.lang.IgnitePredicate; +import org.apache.ignite.lang.IgniteReducer; +import org.apache.ignite.lang.IgniteRunnable; + +public final class TestIgniteComputeResources { + + public static final AtomicInteger COUNTER = new AtomicInteger(0); + + public static final IgniteRunnable TEST_RUNNABLE = new IgniteRunnable() { + private static final long serialVersionUID = -4961602602993218883L; + + @Override + public void run() { + System.out.println("Hello from a runnable"); + } + }; + + public static final IgniteRunnable TEST_RUNNABLE_COUNTER = new IgniteRunnable() { + private static final long serialVersionUID = 386219709871673366L; + + @Override + public void run() { + COUNTER.incrementAndGet(); + } + }; + + public static final IgnitePredicate<Event> EVENT_COUNTER = new IgnitePredicate<Event>() { + private static final long serialVersionUID = -4214894278107593791L; + + @Override + public boolean apply(Event event) { + COUNTER.incrementAndGet(); + return true; + } + }; + + public static final IgniteCallable<String> TEST_CALLABLE = new IgniteCallable<String>() { + private static final long serialVersionUID = 986972344531961815L; + + @Override + public String call() throws Exception { + return "hello"; + } + }; + + public static final IgniteClosure<String, String> TEST_CLOSURE = new IgniteClosure<String, String>() { + private static final long serialVersionUID = -3969758431961263815L; + + @Override + public String apply(String input) { + return "hello " + input; + } + }; + + public static final ComputeTask<Integer, String> COMPUTE_TASK = new ComputeTaskSplitAdapter<Integer, String>() { + private static final long serialVersionUID = 3040624379256407732L; + + @Override + public String reduce(List<ComputeJobResult> results) throws IgniteException { + StringBuilder answer = new StringBuilder(); + for (ComputeJobResult res : results) { + answer.append(res.getData()).append(","); + } + answer.deleteCharAt(answer.length() - 1); + return answer.toString(); + } + + @Override + protected Collection<? extends ComputeJob> split(int gridSize, final Integer arg) throws IgniteException { + Set<ComputeJob> answer = new HashSet<>(); + for (int i = 0; i < arg; i++) { + final int c = i; + answer.add(new ComputeJob() { + private static final long serialVersionUID = 3365213549618276779L; + + @Override + public Object execute() throws IgniteException { + return "a" + c; + } + + @Override + public void cancel() { + // nothing + } + }); + } + return answer; + } + }; + + public static final IgniteReducer<String, String> STRING_JOIN_REDUCER = new IgniteReducer<String, String>() { + private static final long serialVersionUID = 1L; + private List<String> list = Lists.newArrayList(); + + @Override + public boolean collect(String value) { + list.add(value); + return true; + } + + @Override + public String reduce() { + Collections.sort(list); + String answer = Joiner.on("").join(list); + list.clear(); + return answer; + } + }; + + private TestIgniteComputeResources() { + + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/a695c5d3/components/camel-ignite/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/components/camel-ignite/src/test/resources/log4j.properties b/components/camel-ignite/src/test/resources/log4j.properties new file mode 100644 index 0000000..cb64298 --- /dev/null +++ b/components/camel-ignite/src/test/resources/log4j.properties @@ -0,0 +1,37 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +log4j.rootLogger=INFO, file +# change the logging level of this category to increase verbosity of the MongoDB component +log4j.category.org.apache.camel.component.mongodb=INFO, file +log4j.additivity.org.apache.camel.component.mongodb=false + +# uncomment the following line to turn on Camel debugging +#log4j.logger.org.apache.camel=DEBUG + +# CONSOLE appender not used by default +log4j.appender.out=org.apache.log4j.ConsoleAppender +log4j.appender.out.layout=org.apache.log4j.PatternLayout +log4j.appender.out.layout.ConversionPattern=[%30.30t] %-30.30c{1} %-5p %m%n +#log4j.appender.out.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n + + +# File appender +log4j.appender.file=org.apache.log4j.FileAppender +log4j.appender.file.layout=org.apache.log4j.PatternLayout +log4j.appender.file.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n +log4j.appender.file.file=target/camel-mongodb-test.log http://git-wip-us.apache.org/repos/asf/camel/blob/a695c5d3/components/pom.xml ---------------------------------------------------------------------- diff --git a/components/pom.xml b/components/pom.xml index fe40980..a68a15e 100644 --- a/components/pom.xml +++ b/components/pom.xml @@ -124,6 +124,7 @@ <module>camel-hl7</module> <module>camel-ibatis</module> <module>camel-ical</module> + <module>camel-ignite</module> <module>camel-infinispan</module> <module>camel-irc</module> <module>camel-jackson</module> http://git-wip-us.apache.org/repos/asf/camel/blob/a695c5d3/parent/pom.xml ---------------------------------------------------------------------- diff --git a/parent/pom.xml b/parent/pom.xml index f2e3db2..2839035 100644 --- a/parent/pom.xml +++ b/parent/pom.xml @@ -233,6 +233,7 @@ <ibatis-bundle-version>2.3.4.726_4</ibatis-bundle-version> <ibatis-version>2.3.4.726</ibatis-version> <ical4j-version>1.0.7</ical4j-version> + <ignite-version>1.5.0-b1</ignite-version> <infinispan-version>8.1.0.Final</infinispan-version> <irclib-bundle-version>1.10_5</irclib-bundle-version> <irclib-version>1.10</irclib-version>