http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderProcessorSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderProcessorSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderProcessorSelfTest.java new file mode 100644 index 0000000..820f47a --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/dataload/GridDataLoaderProcessorSelfTest.java @@ -0,0 +1,883 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.dataload; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.eviction.fifo.*; +import org.apache.ignite.cache.store.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.events.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.marshaller.optimized.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.gridgain.testframework.junits.common.*; +import org.jetbrains.annotations.*; + +import javax.cache.*; +import javax.cache.configuration.*; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +import static java.util.concurrent.TimeUnit.*; +import static org.apache.ignite.cache.GridCacheAtomicityMode.*; +import static org.apache.ignite.cache.GridCacheDistributionMode.*; +import static org.apache.ignite.cache.GridCacheMode.*; +import static org.apache.ignite.cache.GridCacheWriteSynchronizationMode.*; +import static org.apache.ignite.events.IgniteEventType.*; + +/** + * + */ +public class GridDataLoaderProcessorSelfTest extends GridCommonAbstractTest { + /** */ + private static ConcurrentHashMap<Object, Object> storeMap; + + /** */ + private TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** */ + private GridCacheMode mode = PARTITIONED; + + /** */ + private boolean nearEnabled = true; + + /** */ + private boolean useCache; + + /** */ + private boolean useGrpLock; + + /** */ + private TestStore store; + + /** {@inheritDoc} */ + @Override public void afterTest() throws Exception { + super.afterTest(); + + useCache = false; + } + + /** {@inheritDoc} */ + @SuppressWarnings({"IfMayBeConditional", "unchecked"}) + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + TcpDiscoverySpi spi = new TcpDiscoverySpi(); + + spi.setIpFinder(ipFinder); + + cfg.setDiscoverySpi(spi); + + cfg.setIncludeProperties(); + + cfg.setMarshaller(new IgniteOptimizedMarshaller(false)); + + if (useCache) { + CacheConfiguration cc = defaultCacheConfiguration(); + + cc.setCacheMode(mode); + cc.setAtomicityMode(TRANSACTIONAL); + cc.setDistributionMode(nearEnabled ? NEAR_PARTITIONED : PARTITIONED_ONLY); + cc.setWriteSynchronizationMode(FULL_SYNC); + + cc.setEvictionPolicy(new GridCacheFifoEvictionPolicy(10000)); + + cc.setEvictSynchronized(false); + cc.setEvictNearSynchronized(false); + + if (store != null) { + cc.setCacheStoreFactory(new FactoryBuilder.SingletonFactory(store)); + cc.setReadThrough(true); + cc.setWriteThrough(true); + } + + cfg.setCacheConfiguration(cc); + } + else + cfg.setCacheConfiguration(); + + return cfg; + } + + /** + * @throws Exception If failed. + */ + public void testPartitioned() throws Exception { + mode = PARTITIONED; + + checkDataLoader(); + } + + /** + * @throws Exception If failed. + */ + public void testColocated() throws Exception { + mode = PARTITIONED; + nearEnabled = false; + + checkDataLoader(); + } + + /** + * @throws Exception If failed. + */ + public void testPartitionedGroupLock() throws Exception { + mode = PARTITIONED; + useGrpLock = true; + + checkDataLoader(); + } + + /** + * @throws Exception If failed. + */ + public void testReplicated() throws Exception { + mode = REPLICATED; + + checkDataLoader(); + } + + /** + * @throws Exception If failed. + */ + public void testReplicatedGroupLock() throws Exception { + mode = REPLICATED; + useGrpLock = true; + + checkDataLoader(); + } + + /** + * @throws Exception If failed. + */ + public void testLocal() throws Exception { + mode = LOCAL; + + try { + checkDataLoader(); + + assert false; + } + catch (IgniteCheckedException e) { + // Cannot load local cache configured remotely. + info("Caught expected exception: " + e); + } + } + + /** + * @throws Exception If failed. + */ + @SuppressWarnings("ErrorNotRethrown") + private void checkDataLoader() throws Exception { + try { + Ignite g1 = startGrid(1); + + useCache = true; + + Ignite g2 = startGrid(2); + Ignite g3 = startGrid(3); + + final IgniteDataLoader<Integer, Integer> ldr = g1.dataLoader(null); + + ldr.updater(useGrpLock ? GridDataLoadCacheUpdaters.<Integer, Integer>groupLocked() : + GridDataLoadCacheUpdaters.<Integer, Integer>batchedSorted()); + + final AtomicInteger idxGen = new AtomicInteger(); + final int cnt = 400; + final int threads = 10; + + final CountDownLatch l1 = new CountDownLatch(threads); + + IgniteFuture<?> f1 = multithreadedAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + Collection<IgniteFuture<?>> futs = new ArrayList<>(cnt); + + for (int i = 0; i < cnt; i++) { + int idx = idxGen.getAndIncrement(); + + futs.add(ldr.addData(idx, idx)); + } + + l1.countDown(); + + for (IgniteFuture<?> fut : futs) + fut.get(); + + return null; + } + }, threads); + + l1.await(); + + // This will wait until data loader finishes loading. + stopGrid(getTestGridName(1), false); + + f1.get(); + + int s2 = g2.cache(null).primaryKeySet().size(); + int s3 = g3.cache(null).primaryKeySet().size(); + int total = threads * cnt; + + assertEquals(total, s2 + s3); + + final IgniteDataLoader<Integer, Integer> rmvLdr = g2.dataLoader(null); + + rmvLdr.updater(useGrpLock ? GridDataLoadCacheUpdaters.<Integer, Integer>groupLocked() : + GridDataLoadCacheUpdaters.<Integer, Integer>batchedSorted()); + + final CountDownLatch l2 = new CountDownLatch(threads); + + IgniteFuture<?> f2 = multithreadedAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + Collection<IgniteFuture<?>> futs = new ArrayList<>(cnt); + + for (int i = 0; i < cnt; i++) { + final int key = idxGen.decrementAndGet(); + + futs.add(rmvLdr.removeData(key)); + } + + l2.countDown(); + + for (IgniteFuture<?> fut : futs) + fut.get(); + + return null; + } + }, threads); + + l2.await(); + + rmvLdr.close(false); + + f2.get(); + + s2 = g2.cache(null).primaryKeySet().size(); + s3 = g3.cache(null).primaryKeySet().size(); + + assert s2 == 0 && s3 == 0 : "Incorrect entries count [s2=" + s2 + ", s3=" + s3 + ']'; + } + finally { + stopAllGrids(); + } + } + + /** + * Test primitive arrays can be passed into data loader. + * + * @throws Exception If failed. + */ + public void testPrimitiveArrays() throws Exception { + try { + useCache = true; + mode = PARTITIONED; + + Ignite g1 = startGrid(1); + startGrid(2); // Reproduced only for several nodes in topology (if marshalling is used). + + List<Object> arrays = Arrays.<Object>asList( + new byte[] {1}, new boolean[] {true, false}, new char[] {2, 3}, new short[] {3, 4}, + new int[] {4, 5}, new long[] {5, 6}, new float[] {6, 7}, new double[] {7, 8}); + + IgniteDataLoader<Object, Object> dataLdr = g1.dataLoader(null); + + for (int i = 0, size = arrays.size(); i < 1000; i++) { + Object arr = arrays.get(i % size); + + dataLdr.addData(i, arr); + dataLdr.addData(i, fixedClosure(arr)); + } + + dataLdr.close(false); + } + finally { + stopAllGrids(); + } + } + + /** + * @throws Exception If failed. + */ + public void testReplicatedMultiThreaded() throws Exception { + mode = REPLICATED; + + checkLoaderMultithreaded(1, 2); + } + + /** + * @throws Exception If failed. + */ + public void testReplicatedMultiThreadedGroupLock() throws Exception { + mode = REPLICATED; + useGrpLock = true; + + checkLoaderMultithreaded(1, 2); + } + + /** + * @throws Exception If failed. + */ + public void testPartitionedMultiThreaded() throws Exception { + mode = PARTITIONED; + + checkLoaderMultithreaded(1, 3); + } + + /** + * @throws Exception If failed. + */ + public void testPartitionedMultiThreadedGroupLock() throws Exception { + mode = PARTITIONED; + useGrpLock = true; + + checkLoaderMultithreaded(1, 3); + } + + /** + * Tests loader in multithreaded environment with various count of grids started. + * + * @param nodesCntNoCache How many nodes should be started without cache. + * @param nodesCntCache How many nodes should be started with cache. + * @throws Exception If failed. + */ + protected void checkLoaderMultithreaded(int nodesCntNoCache, int nodesCntCache) + throws Exception { + try { + // Start all required nodes. + int idx = 1; + + for (int i = 0; i < nodesCntNoCache; i++) + startGrid(idx++); + + useCache = true; + + for (int i = 0; i < nodesCntCache; i++) + startGrid(idx++); + + Ignite g1 = grid(1); + + // Get and configure loader. + final IgniteDataLoader<Integer, Integer> ldr = g1.dataLoader(null); + + ldr.updater(useGrpLock ? GridDataLoadCacheUpdaters.<Integer, Integer>groupLocked() : + GridDataLoadCacheUpdaters.<Integer, Integer>individual()); + ldr.perNodeBufferSize(2); + + // Define count of puts. + final AtomicInteger idxGen = new AtomicInteger(); + + final AtomicBoolean done = new AtomicBoolean(); + + try { + final int totalPutCnt = 50000; + + IgniteFuture<?> fut1 = multithreadedAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + Collection<IgniteFuture<?>> futs = new ArrayList<>(); + + while (!done.get()) { + int idx = idxGen.getAndIncrement(); + + if (idx >= totalPutCnt) { + info(">>> Stopping producer thread since maximum count of puts reached."); + + break; + } + + futs.add(ldr.addData(idx, idx)); + } + + ldr.flush(); + + for (IgniteFuture<?> fut : futs) + fut.get(); + + return null; + } + }, 5, "producer"); + + IgniteFuture<?> fut2 = multithreadedAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + while (!done.get()) { + ldr.flush(); + + U.sleep(100); + } + + return null; + } + }, 1, "flusher"); + + // Define index of node being restarted. + final int restartNodeIdx = nodesCntCache + nodesCntNoCache + 1; + + IgniteFuture<?> fut3 = multithreadedAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + try { + for (int i = 0; i < 5; i++) { + Ignite g = startGrid(restartNodeIdx); + + UUID id = g.cluster().localNode().id(); + + info(">>>>>>> Started node: " + id); + + U.sleep(1000); + + stopGrid(getTestGridName(restartNodeIdx), true); + + info(">>>>>>> Stopped node: " + id); + } + } + finally { + done.set(true); + + info("Start stop thread finished."); + } + + return null; + } + }, 1, "start-stop-thread"); + + fut1.get(); + fut2.get(); + fut3.get(); + } + finally { + ldr.close(false); + } + + info("Cache size on second grid: " + grid(nodesCntNoCache + 1).cache(null).primaryKeySet().size()); + } + finally { + stopAllGrids(); + } + } + + /** + * @throws Exception If failed. + */ + public void testLoaderApi() throws Exception { + useCache = true; + + try { + Ignite g1 = startGrid(1); + + IgniteDataLoader<Object, Object> ldr = g1.dataLoader(null); + + ldr.close(false); + + try { + ldr.addData(0, 0); + + assert false; + } + catch (IllegalStateException e) { + info("Caught expected exception: " + e); + } + + assert ldr.future().isDone(); + + ldr.future().get(); + + try { + // Create another loader. + ldr = g1.dataLoader("UNKNOWN_CACHE"); + + assert false; + } + catch (IllegalStateException e) { + info("Caught expected exception: " + e); + } + + ldr.close(true); + + assert ldr.future().isDone(); + + ldr.future().get(); + + // Create another loader. + ldr = g1.dataLoader(null); + + // Cancel with future. + ldr.future().cancel(); + + try { + ldr.addData(0, 0); + + assert false; + } + catch (IllegalStateException e) { + info("Caught expected exception: " + e); + } + + assert ldr.future().isDone(); + + try { + ldr.future().get(); + + assert false; + } + catch (IgniteFutureCancelledException e) { + info("Caught expected exception: " + e); + } + + // Create another loader. + ldr = g1.dataLoader(null); + + // This will close loader. + stopGrid(getTestGridName(1), false); + + try { + ldr.addData(0, 0); + + assert false; + } + catch (IllegalStateException e) { + info("Caught expected exception: " + e); + } + + assert ldr.future().isDone(); + + ldr.future().get(); + } + finally { + stopAllGrids(); + } + } + + /** + * Wraps integer to closure returning it. + * + * @param i Value to wrap. + * @return Callable. + */ + private static Callable<Integer> callable(@Nullable final Integer i) { + return new Callable<Integer>() { + @Override public Integer call() throws Exception { + return i; + } + }; + } + + /** + * Wraps integer to closure returning it. + * + * @param i Value to wrap. + * @return Closure. + */ + private static IgniteClosure<Integer, Integer> closure(@Nullable final Integer i) { + return new IgniteClosure<Integer, Integer>() { + @Override public Integer apply(Integer e) { + return e == null ? i : e + i; + } + }; + } + + /** + * Wraps object to closure returning it. + * + * @param obj Value to wrap. + * @return Closure. + */ + private static <T> IgniteClosure<T, T> fixedClosure(@Nullable final T obj) { + return new IgniteClosure<T, T>() { + @Override public T apply(T e) { + assert e == null || obj == null || e.getClass() == obj.getClass() : + "Expects the same types [e=" + e + ", obj=" + obj + ']'; + + return obj; + } + }; + } + + /** + * Wraps integer to closure expecting it and returning {@code null}. + * + * @param exp Expected closure value. + * @return Remove expected cache value closure. + */ + private static <T> IgniteClosure<T, T> removeClosure(@Nullable final T exp) { + return new IgniteClosure<T, T>() { + @Override public T apply(T act) { + if (exp == null ? act == null : exp.equals(act)) + return null; + + throw new AssertionError("Unexpected value [exp=" + exp + ", act=" + act + ']'); + } + }; + } + + /** + * @throws Exception If failed. + */ + public void testFlush() throws Exception { + mode = LOCAL; + + useCache = true; + + try { + Ignite g = startGrid(); + + final GridCache<Integer, Integer> c = g.cache(null); + + final IgniteDataLoader<Integer, Integer> ldr = g.dataLoader(null); + + ldr.perNodeBufferSize(10); + + for (int i = 0; i < 9; i++) + ldr.addData(i, i); + + assertTrue(c.isEmpty()); + + multithreaded(new Callable<Void>() { + @Override + public Void call() throws Exception { + ldr.flush(); + + assertEquals(9, c.size()); + + return null; + } + }, 5, "flush-checker"); + + ldr.addData(100, 100); + + ldr.flush(); + + assertEquals(10, c.size()); + + ldr.addData(200, 200); + + ldr.close(false); + + ldr.future().get(); + + assertEquals(11, c.size()); + } + finally { + stopAllGrids(); + } + } + + /** + * @throws Exception If failed. + */ + public void testTryFlush() throws Exception { + mode = LOCAL; + + useCache = true; + + try { + Ignite g = startGrid(); + + GridCache<Integer, Integer> c = g.cache(null); + + IgniteDataLoader<Integer, Integer> ldr = g.dataLoader(null); + + ldr.perNodeBufferSize(10); + + for (int i = 0; i < 9; i++) + ldr.addData(i, i); + + assertTrue(c.isEmpty()); + + ldr.tryFlush(); + + Thread.sleep(100); + + assertEquals(9, c.size()); + + ldr.close(false); + } + finally { + stopAllGrids(); + } + } + + /** + * @throws Exception If failed. + */ + public void testFlushTimeout() throws Exception { + mode = LOCAL; + + useCache = true; + + try { + Ignite g = startGrid(); + + final CountDownLatch latch = new CountDownLatch(9); + + g.events().localListen(new IgnitePredicate<IgniteEvent>() { + @Override public boolean apply(IgniteEvent evt) { + latch.countDown(); + + return true; + } + }, EVT_CACHE_OBJECT_PUT); + + GridCache<Integer, Integer> c = g.cache(null); + + assertTrue(c.isEmpty()); + + IgniteDataLoader<Integer, Integer> ldr = g.dataLoader(null); + + ldr.perNodeBufferSize(10); + ldr.autoFlushFrequency(3000); + + for (int i = 0; i < 9; i++) + ldr.addData(i, i); + + assertTrue(c.isEmpty()); + + assertFalse(latch.await(1000, MILLISECONDS)); + + assertTrue(c.isEmpty()); + + assertTrue(latch.await(3000, MILLISECONDS)); + + assertEquals(9, c.size()); + + ldr.close(false); + } + finally { + stopAllGrids(); + } + } + + /** + * @throws Exception If failed. + */ + public void testUpdateStore() throws Exception { + storeMap = new ConcurrentHashMap<>(); + + try { + store = new TestStore(); + + useCache = true; + + Ignite ignite = startGrid(1); + + startGrid(2); + startGrid(3); + + for (int i = 0; i < 1000; i++) + storeMap.put(i, i); + + try (IgniteDataLoader<Object, Object> ldr = ignite.dataLoader(null)) { + assertFalse(ldr.skipStore()); + + for (int i = 0; i < 1000; i++) + ldr.removeData(i); + + for (int i = 1000; i < 2000; i++) + ldr.addData(i, i); + } + + for (int i = 0; i < 1000; i++) + assertNull(storeMap.get(i)); + + for (int i = 1000; i < 2000; i++) + assertEquals(i, storeMap.get(i)); + + try (IgniteDataLoader<Object, Object> ldr = ignite.dataLoader(null)) { + ldr.skipStore(true); + + for (int i = 0; i < 1000; i++) + ldr.addData(i, i); + + for (int i = 1000; i < 2000; i++) + ldr.removeData(i); + } + + IgniteCache<Object, Object> cache = ignite.jcache(null); + + for (int i = 0; i < 1000; i++) { + assertNull(storeMap.get(i)); + + assertEquals(i, cache.get(i)); + } + + for (int i = 1000; i < 2000; i++) { + assertEquals(i, storeMap.get(i)); + + assertNull(cache.localPeek(i)); + } + } + finally { + storeMap = null; + } + } + + /** + * + */ + private static class TestObject { + /** Value. */ + private final int val; + + /** + * @param val Value. + */ + private TestObject(int val) { + this.val = val; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + TestObject obj = (TestObject)o; + + return val == obj.val; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return val; + } + } + + /** + * + */ + private class TestStore extends CacheStoreAdapter<Object, Object> { + /** {@inheritDoc} */ + @Nullable @Override public Object load(Object key) { + return storeMap.get(key); + } + + /** {@inheritDoc} */ + @Override public void write(Cache.Entry<?, ?> entry) { + storeMap.put(entry.getKey(), entry.getValue()); + } + + /** {@inheritDoc} */ + @Override public void delete(Object key) { + storeMap.remove(key); + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorAbstractSelfTest.java index 548f8e1..b09c1cf 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorAbstractSelfTest.java @@ -24,7 +24,7 @@ import org.apache.ignite.configuration.*; import org.apache.ignite.lang.*; import org.apache.ignite.managed.*; import org.apache.ignite.resources.*; -import org.gridgain.grid.kernal.processors.affinity.*; +import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.spi.discovery.tcp.*; import org.apache.ignite.spi.discovery.tcp.ipfinder.*; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/test/java/org/apache/ignite/internal/util/future/GridFutureAdapterSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/future/GridFutureAdapterSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/future/GridFutureAdapterSelfTest.java index 80e9a24..642d954 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/util/future/GridFutureAdapterSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/util/future/GridFutureAdapterSelfTest.java @@ -20,7 +20,7 @@ package org.apache.ignite.internal.util.future; import org.apache.ignite.*; import org.apache.ignite.cluster.*; import org.apache.ignite.lang.*; -import org.gridgain.grid.kernal.processors.closure.*; +import org.apache.ignite.internal.processors.closure.*; import org.apache.ignite.internal.util.io.*; import org.apache.ignite.internal.util.typedef.*; import org.gridgain.testframework.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/test/java/org/gridgain/grid/cache/affinity/fair/GridCachePartitionFairAffinitySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/cache/affinity/fair/GridCachePartitionFairAffinitySelfTest.java b/modules/core/src/test/java/org/gridgain/grid/cache/affinity/fair/GridCachePartitionFairAffinitySelfTest.java index d37f57f..eea4930 100644 --- a/modules/core/src/test/java/org/gridgain/grid/cache/affinity/fair/GridCachePartitionFairAffinitySelfTest.java +++ b/modules/core/src/test/java/org/gridgain/grid/cache/affinity/fair/GridCachePartitionFairAffinitySelfTest.java @@ -21,7 +21,7 @@ import org.apache.ignite.cache.affinity.*; import org.apache.ignite.cache.affinity.fair.*; import org.apache.ignite.cluster.*; import org.apache.ignite.events.*; -import org.gridgain.grid.kernal.processors.affinity.*; +import org.apache.ignite.internal.processors.affinity.*; import org.gridgain.testframework.*; import org.gridgain.testframework.junits.common.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/test/java/org/gridgain/grid/kernal/processors/affinity/GridAffinityProcessorAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/affinity/GridAffinityProcessorAbstractSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/affinity/GridAffinityProcessorAbstractSelfTest.java deleted file mode 100644 index ee156fa..0000000 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/affinity/GridAffinityProcessorAbstractSelfTest.java +++ /dev/null @@ -1,194 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.affinity; - -import org.apache.ignite.cache.*; -import org.apache.ignite.cache.affinity.*; -import org.apache.ignite.cluster.*; -import org.apache.ignite.configuration.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.spi.discovery.tcp.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; -import org.gridgain.testframework.*; -import org.gridgain.testframework.junits.common.*; - -import java.util.*; -import java.util.concurrent.*; - -import static org.apache.ignite.cache.GridCacheMode.*; - -/** - * Tests for {@link GridAffinityProcessor}. - */ -@GridCommonTest(group = "Affinity Processor") -public abstract class GridAffinityProcessorAbstractSelfTest extends GridCommonAbstractTest { - /** Number of grids started for tests. Should not be less than 2. */ - private static final int NODES_CNT = 3; - - /** Cache name. */ - private static final String CACHE_NAME = "cache"; - - /** IP finder. */ - private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); - - /** Flag to start grid with cache. */ - private boolean withCache; - - /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration cfg = super.getConfiguration(gridName); - - TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); - - discoSpi.setIpFinder(ipFinder); - - cfg.setDiscoverySpi(discoSpi); - - if (withCache) { - CacheConfiguration cacheCfg = defaultCacheConfiguration(); - - cacheCfg.setName(CACHE_NAME); - cacheCfg.setCacheMode(PARTITIONED); - cacheCfg.setBackups(1); - cacheCfg.setAffinity(affinityFunction()); - - cfg.setCacheConfiguration(cacheCfg); - } - - return cfg; - } - - /** - * Creates affinity function for test. - * - * @return Affinity function. - */ - protected abstract GridCacheAffinityFunction affinityFunction(); - - /** {@inheritDoc} */ - @SuppressWarnings({"ConstantConditions"}) - @Override protected void beforeTestsStarted() throws Exception { - assert NODES_CNT >= 1; - - withCache = false; - - for (int i = 0; i < NODES_CNT; i++) - startGrid(i); - - withCache = true; - - for (int i = NODES_CNT; i < 2 * NODES_CNT; i++) - startGrid(i); - } - - /** {@inheritDoc} */ - @Override protected void afterTestsStopped() throws Exception { - stopAllGrids(); - } - - /** - * Test affinity functions caching and clean up. - * - * @throws Exception In case of any exception. - */ - @SuppressWarnings("AssertEqualsBetweenInconvertibleTypes") - public void testAffinityProcessor() throws Exception { - Random rnd = new Random(); - - final GridKernal grid1 = (GridKernal)grid(rnd.nextInt(NODES_CNT)); // With cache. - GridKernal grid2 = (GridKernal)grid(NODES_CNT + rnd.nextInt(NODES_CNT)); // Without cache. - - assertEquals(NODES_CNT * 2, grid1.nodes().size()); - assertEquals(NODES_CNT * 2, grid2.nodes().size()); - - GridTestUtils.assertThrows(log, new Callable<Void>() { - @Override public Void call() throws Exception { - grid1.cache(CACHE_NAME); - - return null; - } - }, IllegalArgumentException.class, null); - - GridCache<Integer, Integer> cache = grid2.cache(CACHE_NAME); - - assertNotNull(cache); - - GridAffinityProcessor affPrc1 = grid1.context().affinity(); - GridAffinityProcessor affPrc2 = grid2.context().affinity(); - - // Create keys collection. - Collection<Integer> keys = new ArrayList<>(1000); - - for (int i = 0; i < 1000; i++) - keys.add(i); - - // - // Validate affinity functions collection updated on first call. - // - - Map<ClusterNode, Collection<Integer>> node1Map = affPrc1.mapKeysToNodes(CACHE_NAME, keys); - Map<ClusterNode, Collection<Integer>> node2Map = affPrc2.mapKeysToNodes(CACHE_NAME, keys); - Map<ClusterNode, Collection<Integer>> cacheMap = cache.affinity().mapKeysToNodes(keys); - - assertEquals(cacheMap.size(), node1Map.size()); - assertEquals(cacheMap.size(), node2Map.size()); - - for (Map.Entry<ClusterNode, Collection<Integer>> entry : cacheMap.entrySet()) { - ClusterNode node = entry.getKey(); - - Collection<Integer> mappedKeys = entry.getValue(); - - Collection<Integer> mapped1 = node1Map.get(node); - Collection<Integer> mapped2 = node2Map.get(node); - - assertTrue(mappedKeys.containsAll(mapped1) && mapped1.containsAll(mappedKeys)); - assertTrue(mappedKeys.containsAll(mapped2) && mapped2.containsAll(mappedKeys)); - } - } - - /** - * Test performance of affinity processor. - * - * @throws Exception In case of any exception. - */ - public void testPerformance() throws Exception { - GridKernal grid = (GridKernal)grid(0); - GridAffinityProcessor aff = grid.context().affinity(); - - int keysSize = 1000000; - - Collection<Integer> keys = new ArrayList<>(keysSize); - - for (int i = 0; i < keysSize; i++) - keys.add(i); - - long start = System.currentTimeMillis(); - - int iterations = 10000000; - - for (int i = 0; i < iterations; i++) - aff.mapKeyToNode(keys); - - long diff = System.currentTimeMillis() - start; - - info(">>> Map " + keysSize + " keys to " + grid.nodes().size() + " nodes " + iterations + " times in " + diff + "ms."); - - assertTrue(diff < 25000); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/test/java/org/gridgain/grid/kernal/processors/affinity/GridAffinityProcessorConsistentHashSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/affinity/GridAffinityProcessorConsistentHashSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/affinity/GridAffinityProcessorConsistentHashSelfTest.java deleted file mode 100644 index c44fc82..0000000 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/affinity/GridAffinityProcessorConsistentHashSelfTest.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.affinity; - -import org.apache.ignite.cache.affinity.*; -import org.apache.ignite.cache.affinity.consistenthash.*; - -/** - * Tests consistent hash affinity function. - */ -public class GridAffinityProcessorConsistentHashSelfTest extends GridAffinityProcessorAbstractSelfTest { - /** {@inheritDoc} */ - @Override protected GridCacheAffinityFunction affinityFunction() { - return new GridCacheConsistentHashAffinityFunction(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/test/java/org/gridgain/grid/kernal/processors/affinity/GridAffinityProcessorRendezvousSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/affinity/GridAffinityProcessorRendezvousSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/affinity/GridAffinityProcessorRendezvousSelfTest.java deleted file mode 100644 index 625d27c..0000000 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/affinity/GridAffinityProcessorRendezvousSelfTest.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.affinity; - -import org.apache.ignite.cache.affinity.*; -import org.apache.ignite.cache.affinity.rendezvous.*; - -/** - * Tests affinity processor with rendezvous affinity function. - */ -public class GridAffinityProcessorRendezvousSelfTest extends GridAffinityProcessorAbstractSelfTest { - /** {@inheritDoc} */ - @Override protected GridCacheAffinityFunction affinityFunction() { - return new GridCacheRendezvousAffinityFunction(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheAffinityApiSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheAffinityApiSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheAffinityApiSelfTest.java index cd78fb4..5d79125 100644 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheAffinityApiSelfTest.java +++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheAffinityApiSelfTest.java @@ -21,7 +21,7 @@ import org.apache.ignite.cache.*; import org.apache.ignite.cache.affinity.*; import org.apache.ignite.cluster.*; import org.apache.ignite.internal.*; -import org.gridgain.grid.kernal.processors.affinity.*; +import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.util.typedef.*; import java.util.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/datastructures/partitioned/GridCachePartitionedQueueEntryMoveSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/datastructures/partitioned/GridCachePartitionedQueueEntryMoveSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/datastructures/partitioned/GridCachePartitionedQueueEntryMoveSelfTest.java index 4240649..8122226 100644 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/datastructures/partitioned/GridCachePartitionedQueueEntryMoveSelfTest.java +++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/datastructures/partitioned/GridCachePartitionedQueueEntryMoveSelfTest.java @@ -24,7 +24,7 @@ import org.apache.ignite.cache.datastructures.*; import org.apache.ignite.cluster.*; import org.apache.ignite.configuration.*; import org.apache.ignite.lang.*; -import org.gridgain.grid.kernal.processors.affinity.*; +import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.spi.discovery.tcp.*; import org.apache.ignite.spi.discovery.tcp.ipfinder.*; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java index 4de364c..a3505be 100644 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java +++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java @@ -31,7 +31,7 @@ import org.apache.ignite.marshaller.optimized.*; import org.apache.ignite.spi.discovery.tcp.*; import org.apache.ignite.spi.discovery.tcp.ipfinder.*; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; -import org.gridgain.grid.kernal.processors.continuous.*; +import org.apache.ignite.internal.processors.continuous.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.gridgain.testframework.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/test/java/org/gridgain/grid/kernal/processors/closure/GridClosureProcessorRemoteTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/closure/GridClosureProcessorRemoteTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/closure/GridClosureProcessorRemoteTest.java deleted file mode 100644 index a30d7aa..0000000 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/closure/GridClosureProcessorRemoteTest.java +++ /dev/null @@ -1,119 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.closure; - -import org.apache.ignite.*; -import org.apache.ignite.cluster.*; -import org.apache.ignite.configuration.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.spi.discovery.tcp.*; -import org.apache.ignite.internal.util.typedef.*; -import org.gridgain.testframework.junits.common.*; -import java.util.*; - -/** - * Tests execution of anonymous closures on remote nodes. - */ -@GridCommonTest(group = "Closure Processor") -public class GridClosureProcessorRemoteTest extends GridCommonAbstractTest { - /** - * - */ - public GridClosureProcessorRemoteTest() { - super(true); // Start grid. - } - - /** {@inheritDoc} */ - @Override public String getTestGridName() { - return null; - } - - /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration() throws Exception { - IgniteConfiguration cfg = new IgniteConfiguration(); - - cfg.setDiscoverySpi(new TcpDiscoverySpi()); - - return cfg; - } - - /** - * @throws Exception Thrown in case of failure. - */ - public void testAnonymousBroadcast() throws Exception { - Ignite g = grid(); - - assert g.cluster().nodes().size() >= 2; - - g.compute().run(new CA() { - @Override public void apply() { - System.out.println("BROADCASTING...."); - } - }); - - Thread.sleep(2000); - } - - /** - * @throws Exception Thrown in case of failure. - */ - public void testAnonymousUnicast() throws Exception { - Ignite g = grid(); - - assert g.cluster().nodes().size() >= 2; - - ClusterNode rmt = F.first(g.cluster().forRemotes().nodes()); - - compute(g.cluster().forNode(rmt)).run(new CA() { - @Override public void apply() { - System.out.println("UNICASTING...."); - } - }); - - Thread.sleep(2000); - } - - /** - * - * @throws Exception Thrown in case of failure. - */ - public void testAnonymousUnicastRequest() throws Exception { - Ignite g = grid(); - - assert g.cluster().nodes().size() >= 2; - - ClusterNode rmt = F.first(g.cluster().forRemotes().nodes()); - final ClusterNode loc = g.cluster().localNode(); - - compute(g.cluster().forNode(rmt)).run(new CA() { - @Override public void apply() { - message(grid().forNode(loc)).localListen(new IgniteBiPredicate<UUID, String>() { - @Override public boolean apply(UUID uuid, String s) { - System.out.println("Received test message [nodeId: " + uuid + ", s=" + s + ']'); - - return false; - } - }, null); - } - }); - - message(g.cluster().forNode(rmt)).send(null, "TESTING..."); - - Thread.sleep(2000); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/test/java/org/gridgain/grid/kernal/processors/closure/GridClosureProcessorSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/closure/GridClosureProcessorSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/closure/GridClosureProcessorSelfTest.java deleted file mode 100644 index 8f7ebc7..0000000 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/closure/GridClosureProcessorSelfTest.java +++ /dev/null @@ -1,541 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.closure; - -import org.apache.ignite.*; -import org.apache.ignite.cluster.*; -import org.apache.ignite.compute.*; -import org.apache.ignite.configuration.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.marshaller.optimized.*; -import org.apache.ignite.resources.*; -import org.apache.ignite.spi.discovery.tcp.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; -import org.apache.ignite.internal.util.typedef.*; -import org.gridgain.testframework.*; -import org.gridgain.testframework.junits.common.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.util.*; -import java.util.concurrent.*; -import java.util.concurrent.atomic.*; - -/** - * Tests for {@link GridClosureProcessor}. - */ -@GridCommonTest(group = "Closure Processor") -public class GridClosureProcessorSelfTest extends GridCommonAbstractTest { - /** Number of grids started for tests. Should not be less than 2. */ - private static final int NODES_CNT = 2; - - /** Job sleep duration in order to initiate timeout exception. */ - private static final long JOB_SLEEP = 200; - - /** Timeout used in timed tests. */ - private static final long JOB_TIMEOUT = 100; - - /** IP finder. */ - private final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); - - /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration cfg = super.getConfiguration(gridName); - - cfg.setMarshaller(new IgniteOptimizedMarshaller(false)); - - TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); - - discoSpi.setIpFinder(ipFinder); - - cfg.setDiscoverySpi(discoSpi); - - cfg.setCacheConfiguration(); - - return cfg; - } - - /** {@inheritDoc} */ - @SuppressWarnings({"ConstantConditions"}) - @Override protected void beforeTestsStarted() throws Exception { - assert NODES_CNT >= 2; - - startGrids(NODES_CNT); - } - - /** {@inheritDoc} */ - @Override protected void afterTestsStopped() throws Exception { - stopAllGrids(); - } - - /** {@inheritDoc} */ - @Override protected void beforeTest() throws Exception { - execCntr.set(0); - } - - /** Execution counter for runnable and callable jobs. */ - private static AtomicInteger execCntr = new AtomicInteger(0); - - /** - * Test runnable job. - */ - private static class TestRunnable implements IgniteRunnable { - /** */ - @IgniteInstanceResource - private Ignite ignite; - - /** */ - @IgniteLoggerResource - private IgniteLogger log; - - /** @{inheritDoc} */ - @Override public void run() { - log.info("Runnable job executed on node: " + ignite.cluster().localNode().id()); - - assert ignite != null; - - execCntr.incrementAndGet(); - } - } - - /** - * Base class for test callables. - */ - private abstract static class AbstractTestCallable implements IgniteCallable<Integer> { - /** */ - @IgniteInstanceResource - protected Ignite ignite; - - /** */ - @IgniteLoggerResource - protected IgniteLogger log; - } - - /** - * Test callable job. - */ - private static class TestCallable extends AbstractTestCallable { - /** {@inheritDoc} */ - @Override public Integer call() { - log.info("Callable job executed on node: " + ignite.cluster().localNode().id()); - - assert ignite != null; - - return execCntr.incrementAndGet(); - } - } - - /** - * Test callable job which throws class not found exception. - */ - private static class TestCallableError extends AbstractTestCallable implements Externalizable { - /** - * - */ - public TestCallableError() { - // No-op. - } - - /** {@inheritDoc} */ - @Override public Integer call() { - log.info("Callable job executed on node: " + ignite.cluster().localNode().id()); - - return null; - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - // No-op. - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - throw new ClassNotFoundException(); - } - } - - /** - * Test callable job which sleeps for some time. Is used in timeout tests. - */ - private static class TestCallableTimeout extends AbstractTestCallable { - /** {@inheritDoc} */ - @Override public Integer call() throws Exception { - Thread.sleep(JOB_SLEEP); - - return null; - } - } - - /** - * @param idx Node index. - * @param job Runnable job. - * @param p Optional node predicate. - * @return Future object. - * @throws IgniteCheckedException If failed. - */ - private IgniteFuture<?> runAsync(int idx, Runnable job, @Nullable IgnitePredicate<ClusterNode> p) - throws IgniteCheckedException { - assert idx >= 0 && idx < NODES_CNT; - assert job != null; - - execCntr.set(0); - - IgniteCompute comp = p != null ? compute(grid(idx).forPredicate(p)) : grid(idx).compute(); - - comp = comp.enableAsync(); - - comp.run(job); - - return comp.future(); - } - - /** - * @param idx Node index. - * @param job Runnable job. - * @param p Optional node predicate. - * @return Future object. - * @throws IgniteCheckedException If failed. - */ - private IgniteFuture<?> broadcast(int idx, Runnable job, @Nullable IgnitePredicate<ClusterNode> p) - throws IgniteCheckedException { - assert idx >= 0 && idx < NODES_CNT; - assert job != null; - - execCntr.set(0); - - ClusterGroup prj = grid(idx); - - if (p != null) - prj = prj.forPredicate(p); - - IgniteCompute comp = compute(prj).enableAsync(); - - comp.broadcast(job); - - return comp.future(); - } - - /** - * @param idx Node index. - * @param jobs Runnable jobs. - * @param p Optional node predicate. - * @return Future object. - * @throws IgniteCheckedException If failed. - */ - private IgniteFuture<?> runAsync(int idx, Collection<TestRunnable> jobs, @Nullable IgnitePredicate<ClusterNode> p) - throws IgniteCheckedException { - assert idx >= 0 && idx < NODES_CNT; - assert !F.isEmpty(jobs); - - execCntr.set(0); - - IgniteCompute comp = p != null ? compute(grid(idx).forPredicate(p)) : grid(idx).compute(); - - comp = comp.enableAsync(); - - comp.run(jobs); - - return comp.future(); - } - - /** - * @param idx Node index. - * @param job Callable job. - * @param p Optional node predicate. - * @return Future object. - * @throws IgniteCheckedException If failed. - */ - private IgniteFuture<Integer> callAsync(int idx, Callable<Integer> job, @Nullable IgnitePredicate<ClusterNode> p) - throws IgniteCheckedException { - assert idx >= 0 && idx < NODES_CNT; - assert job != null; - - execCntr.set(0); - - IgniteCompute comp = p != null ? compute(grid(idx).forPredicate(p)) : grid(idx).compute(); - - comp = comp.enableAsync(); - - comp.call(job); - - return comp.future(); - } - - /** - * @param idx Node index. - * @param job Callable job. - * @param p Optional node predicate. - * @return Future object. - * @throws IgniteCheckedException If failed. - */ - private IgniteFuture<Collection<Integer>> broadcast(int idx, Callable<Integer> job, - @Nullable IgnitePredicate<ClusterNode> p) throws IgniteCheckedException { - assert idx >= 0 && idx < NODES_CNT; - assert job != null; - - execCntr.set(0); - - IgniteCompute comp = p != null ? compute(grid(idx).forPredicate(p)) : grid(idx).compute(); - - comp = comp.enableAsync(); - - comp.broadcast(job); - - return comp.future(); - } - - /** - * @param idx Node index. - * @param jobs Callable job. - * @param p Optional node predicate. - * @return Future object. - * @throws IgniteCheckedException If failed. - */ - private IgniteFuture<Collection<Integer>> callAsync(int idx, Collection<TestCallable> jobs, - @Nullable IgnitePredicate<ClusterNode> p) throws IgniteCheckedException { - assert idx >= 0 && idx < NODES_CNT; - assert !F.isEmpty(jobs); - - execCntr.set(0); - - IgniteCompute comp = p != null ? compute(grid(idx).forPredicate(p)) : grid(idx).compute(); - - comp = comp.enableAsync(); - - comp.call(jobs); - - return comp.future(); - } - - /** - * @param idx Node index. - * @return Predicate. - */ - private IgnitePredicate<ClusterNode> singleNodePredicate(final int idx) { - assert idx >= 0 && idx < NODES_CNT; - - return new IgnitePredicate<ClusterNode>() { - @Override public boolean apply(ClusterNode e) { return grid(idx).localNode().id().equals(e.id()); } - }; - } - - /** - * @throws Exception If failed. - */ - public void testRunAsyncSingle() throws Exception { - Runnable job = new TestRunnable(); - - IgniteFuture<?> fut = broadcast(0, job, null); - - assert fut.get() == null; - - assertEquals(NODES_CNT, execCntr.getAndSet(0)); - - fut = broadcast(0, job, singleNodePredicate(0)); - - assert fut.get() == null; - - assertEquals(1, execCntr.get()); - - fut = runAsync(0, job, null); - - assert fut.get() == null : "Execution result must be null."; - - assert execCntr.get() == 1 : - "Execution counter must be equal to 1, actual: " + execCntr.get(); - } - - /** - * @throws Exception If failed. - */ - public void testRunAsyncMultiple() throws Exception { - Collection<TestRunnable> jobs = F.asList(new TestRunnable(), new TestRunnable()); - - IgniteFuture<?> fut = runAsync(0, jobs, null); - - assert fut.get() == null : "Execution result must be null."; - - assert execCntr.get() == jobs.size() : - "Execution counter must be equal to " + jobs.size() + ", actual: " + execCntr.get(); - } - - /** - * @throws Exception If failed. - */ - public void testCallAsyncSingle() throws Exception { - Callable<Integer> job = new TestCallable(); - - IgniteFuture<Collection<Integer>> fut1 = broadcast(0, job, null); - - assert fut1.get() != null; - - assertEquals(NODES_CNT, execCntr.getAndSet(0)); - - fut1 = broadcast(0, job, singleNodePredicate(0)); - - // We left one node so we can get definite result. - assertEquals(Integer.valueOf(1), F.first(fut1.get())); - - assertEquals(1, execCntr.get()); - - IgniteFuture<Integer> fut2 = callAsync(0, job, null); - - assert fut2.get() == 1 : - "Execution result must be equal to 1, actual: " + fut2.get(); - - assert execCntr.get() == 1 : - "Execution counter must be equal to 1, actual: " + execCntr.get(); - } - - /** - * @throws Exception If failed. - */ - public void testCallAsyncErrorNoFailover() throws Exception { - IgniteCompute comp = compute(grid(0).forPredicate(F.notEqualTo(grid(0).localNode()))).enableAsync(); - - comp.withNoFailover().call(new TestCallableError()); - - IgniteFuture<Integer> fut = comp.future(); - - try { - fut.get(); - - assert false : "Exception should have been thrown."; - } - catch (IgniteCheckedException e) { - info("Caught expected exception: " + e); - } - } - - /** - * @throws Exception If failed. - */ - public void testWithName() throws Exception { - grid(0).compute().withName("TestTaskName").call(new TestCallable()); - } - - /** - * @throws Exception If failed. - */ - public void testWithTimeout() throws Exception { - Collection<TestCallableTimeout> jobs = F.asList(new TestCallableTimeout()); - - boolean timedOut = false; - - try { - // Ensure that we will get timeout exception. - grid(0).compute().withTimeout(JOB_TIMEOUT).call(jobs); - } - catch (ComputeTaskTimeoutException ignore) { - timedOut = true; - } - - assert timedOut : "Task has not timed out."; - - timedOut = false; - - try { - // Previous task invocation cleared the timeout. - grid(0).compute().call(jobs); - } - catch (ComputeTaskTimeoutException ignore) { - timedOut = true; - } - - assert !timedOut : "Subsequently called task has timed out."; - } - - /** - * @throws Exception If failed. - */ - public void testCallAsyncMultiple() throws Exception { - Collection<TestCallable> jobs = F.asList(new TestCallable(), new TestCallable()); - - IgniteFuture<Collection<Integer>> fut = callAsync(0, jobs, null); - - Collection<Integer> results = fut.get(); - - assert !results.isEmpty() : "Collection of results is empty."; - - assert results.size() == jobs.size() : - "Collection of results must be of size: " + jobs.size() + "."; - - for (int i = 1; i <= jobs.size(); i++) - assert results.contains(i) : "Collection of results does not contain value: " + i; - } - - /** - * @throws Exception If failed. - */ - public void testReduceAsync() throws Exception { - Collection<TestCallable> jobs = F.asList(new TestCallable(), new TestCallable()); - - IgniteCompute comp = grid(0).compute().enableAsync(); - - comp.call(jobs, F.sumIntReducer()); - - IgniteFuture<Integer> fut = comp.future(); - - // Sum of arithmetic progression. - int exp = (1 + jobs.size()) * jobs.size() / 2; - - assert fut.get() == exp : - "Execution result must be equal to " + exp + ", actual: " + fut.get(); - - assert execCntr.get() == jobs.size() : - "Execution counter must be equal to " + jobs.size() + ", actual: " + execCntr.get(); - - execCntr.set(0); - } - - /** - * @throws Exception If failed. - */ - public void testReducerError() throws Exception { - final Ignite g = grid(0); - - final Collection<Callable<Integer>> jobs = new ArrayList<>(); - - for (int i = 0; i < g.cluster().nodes().size(); i++) { - jobs.add(new IgniteCallable<Integer>() { - @Override public Integer call() throws Exception { - throw new RuntimeException("Test exception."); - } - }); - } - - GridTestUtils.assertThrows(log, new Callable<Void>() { - @Override public Void call() throws Exception { - g.compute().call(jobs, new IgniteReducer<Integer, Object>() { - @Override public boolean collect(@Nullable Integer e) { - fail("Expects failed jobs never call 'collect' method."); - - return true; - } - - @Override public Object reduce() { - return null; - } - }); - - return null; - } - }, IgniteCheckedException.class, null); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b458bd09/modules/core/src/test/java/org/gridgain/grid/kernal/processors/closure/package.html ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/closure/package.html b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/closure/package.html deleted file mode 100644 index 1f85ff2..0000000 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/closure/package.html +++ /dev/null @@ -1,23 +0,0 @@ -<!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - --> -<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> -<html> -<body> - <!-- Package description. --> - Contains internal tests or test related classes and interfaces. -</body> -</html>
