http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheOffHeapTieredSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheOffHeapTieredSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheOffHeapTieredSelfTest.java deleted file mode 100644 index 3a1ccaa..0000000 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheOffHeapTieredSelfTest.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.cache; - -import org.apache.ignite.cache.*; - -import static org.apache.ignite.cache.GridCacheAtomicityMode.*; - -/** - * - */ -public class GridCacheOffHeapTieredSelfTest extends GridCacheOffHeapTieredAbstractSelfTest { - /** {@inheritDoc} */ - @SuppressWarnings("RedundantMethodOverride") - @Override protected GridCacheAtomicityMode atomicityMode() { - return TRANSACTIONAL; - } -}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheOffheapUpdateSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheOffheapUpdateSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheOffheapUpdateSelfTest.java deleted file mode 100644 index 54a24bc..0000000 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheOffheapUpdateSelfTest.java +++ /dev/null @@ -1,139 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.cache; - -import org.apache.ignite.*; -import org.apache.ignite.cache.*; -import org.apache.ignite.cluster.*; -import org.apache.ignite.configuration.*; -import org.apache.ignite.transactions.*; -import org.apache.ignite.testframework.junits.common.*; - -import static org.apache.ignite.transactions.IgniteTxConcurrency.*; -import static org.apache.ignite.transactions.IgniteTxIsolation.*; - -/** - * Check for specific support issue. - */ -public class GridCacheOffheapUpdateSelfTest extends GridCommonAbstractTest { - /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration cfg = super.getConfiguration(gridName); - - cfg.setPeerClassLoadingEnabled(false); - - CacheConfiguration ccfg = new CacheConfiguration(); - - ccfg.setCacheMode(GridCacheMode.PARTITIONED); - ccfg.setDistributionMode(GridCacheDistributionMode.PARTITIONED_ONLY); - ccfg.setAtomicityMode(GridCacheAtomicityMode.TRANSACTIONAL); - ccfg.setOffHeapMaxMemory(0); - ccfg.setMemoryMode(GridCacheMemoryMode.OFFHEAP_TIERED); - - cfg.setCacheConfiguration(ccfg); - - return cfg; - } - - /** - * @throws Exception If failed. - */ - public void testUpdateInPessimisticTxOnRemoteNode() throws Exception { - try { - Ignite ignite = startGrids(2); - - GridCache<Object, Object> rmtCache = ignite.cache(null); - - int key = 0; - - while (!rmtCache.affinity().isPrimary(grid(1).localNode(), key)) - key++; - - GridCache<Object, Object> locCache = grid(1).cache(null); - - try (IgniteTx tx = locCache.txStart(PESSIMISTIC, REPEATABLE_READ)) { - locCache.putxIfAbsent(key, 0); - - tx.commit(); - } - - try (IgniteTx tx = rmtCache.txStart(PESSIMISTIC, REPEATABLE_READ)) { - assertEquals(0, rmtCache.get(key)); - - rmtCache.putx(key, 1); - - tx.commit(); - } - - try (IgniteTx tx = rmtCache.txStart(PESSIMISTIC, REPEATABLE_READ)) { - assertEquals(1, rmtCache.get(key)); - - rmtCache.putx(key, 2); - - tx.commit(); - } - } - finally { - stopAllGrids(); - } - } - - /** - * @throws Exception If failed. - */ - public void testReadEvictedPartition() throws Exception { - try { - Ignite grid = startGrid(0); - - GridCache<Object, Object> cache = grid.cache(null); - - for (int i = 0; i < 30; i++) - cache.put(i, 0); - - startGrid(1); - - awaitPartitionMapExchange(); - - for (int i = 0; i < 30; i++) - grid(1).cache(null).put(i, 10); - - // Find a key that does not belong to started node anymore. - int key = 0; - - ClusterNode locNode = grid.cluster().localNode(); - - for (;key < 30; key++) { - if (!cache.affinity().isPrimary(locNode, key) && !cache.affinity().isBackup(locNode, key)) - break; - } - - assertEquals(10, cache.get(key)); - - try (IgniteTx ignored = cache.txStart(OPTIMISTIC, REPEATABLE_READ)) { - assertEquals(10, cache.get(key)); - } - - try (IgniteTx ignored = cache.txStart(PESSIMISTIC, READ_COMMITTED)) { - assertEquals(10, cache.get(key)); - } - } - finally { - stopAllGrids(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheOrderedPreloadingSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheOrderedPreloadingSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheOrderedPreloadingSelfTest.java deleted file mode 100644 index 289138e..0000000 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheOrderedPreloadingSelfTest.java +++ /dev/null @@ -1,157 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.cache; - -import org.apache.ignite.*; -import org.apache.ignite.cache.*; -import org.apache.ignite.configuration.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.spi.discovery.tcp.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; -import org.apache.ignite.internal.util.future.*; -import org.apache.ignite.testframework.junits.common.*; - -import static org.apache.ignite.cache.GridCacheMode.*; -import static org.apache.ignite.cache.GridCachePreloadMode.*; - -/** - * Checks ordered preloading. - */ -public class GridCacheOrderedPreloadingSelfTest extends GridCommonAbstractTest { - /** IP finder. */ - private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); - - /** Number of grids in test. */ - private static final int GRID_CNT = 4; - - /** First cache name. */ - public static final String FIRST_CACHE_NAME = "first"; - - /** Second cache name. */ - public static final String SECOND_CACHE_NAME = "second"; - - /** First cache mode. */ - private GridCacheMode firstCacheMode; - - /** Second cache mode. */ - private GridCacheMode secondCacheMode; - - /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration cfg = super.getConfiguration(gridName); - - cfg.setCacheConfiguration( - cacheConfig(firstCacheMode, 1, FIRST_CACHE_NAME), - cacheConfig(secondCacheMode, 2, SECOND_CACHE_NAME)); - - TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); - - discoSpi.setIpFinder(IP_FINDER); - - cfg.setDiscoverySpi(discoSpi); - - return cfg; - } - - /** - * @param cacheMode Cache mode. - * @param preloadOrder Preload order. - * @param name Cache name. - * @return Cache configuration. - */ - private CacheConfiguration cacheConfig(GridCacheMode cacheMode, int preloadOrder, String name) { - CacheConfiguration cfg = defaultCacheConfiguration(); - - cfg.setName(name); - cfg.setCacheMode(cacheMode); - cfg.setPreloadOrder(preloadOrder); - cfg.setPreloadMode(ASYNC); - - return cfg; - } - - /** - * @throws Exception If failed. - */ - public void testPreloadOrderPartitionedPartitioned() throws Exception { - checkPreloadOrder(PARTITIONED, PARTITIONED); - } - - /** - * @throws Exception If failed. - */ - public void testPreloadOrderReplicatedReplicated() throws Exception { - checkPreloadOrder(REPLICATED, REPLICATED); - } - - /** - * @throws Exception If failed. - */ - public void testPreloadOrderPartitionedReplicated() throws Exception { - checkPreloadOrder(PARTITIONED, REPLICATED); - } - - /** - * @throws Exception If failed. - */ - public void testPreloadOrderReplicatedPartitioned() throws Exception { - checkPreloadOrder(REPLICATED, PARTITIONED); - } - - /** - * @param first First cache mode. - * @param second Second cache mode. - * @throws Exception If failed. - */ - private void checkPreloadOrder(GridCacheMode first, GridCacheMode second) throws Exception { - firstCacheMode = first; - secondCacheMode = second; - - Ignite g = startGrid(0); - - try { - GridCache<Object, Object> cache = g.cache("first"); - - // Put some data into cache. - for (int i = 0; i < 1000; i++) - cache.put(i, i); - - for (int i = 1; i < GRID_CNT; i++) - startGrid(i); - - // For first node in topology replicated preloader gets completed right away. - for (int i = 1; i < GRID_CNT; i++) { - GridKernal kernal = (GridKernal)grid(i); - - GridFutureAdapter<?> fut1 = (GridFutureAdapter<?>)kernal.internalCache(FIRST_CACHE_NAME).preloader() - .syncFuture(); - GridFutureAdapter<?> fut2 = (GridFutureAdapter<?>)kernal.internalCache(SECOND_CACHE_NAME).preloader() - .syncFuture(); - - fut1.get(); - fut2.get(); - - assertTrue("[i=" + i + ", fut1=" + fut1 + ", fut2=" + fut2 + ']', fut1.endTime() <= fut2.endTime()); - } - } - finally { - stopAllGrids(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheP2PUndeploySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheP2PUndeploySelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheP2PUndeploySelfTest.java deleted file mode 100644 index 17cf318..0000000 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheP2PUndeploySelfTest.java +++ /dev/null @@ -1,306 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.cache; - -import org.apache.ignite.*; -import org.apache.ignite.cache.*; -import org.apache.ignite.configuration.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.processors.cache.*; -import org.apache.ignite.internal.processors.cache.distributed.*; -import org.apache.ignite.marshaller.jdk.*; -import org.apache.ignite.spi.discovery.tcp.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; -import org.apache.ignite.spi.swapspace.file.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.testframework.junits.common.*; - -import java.util.concurrent.atomic.*; - -import static org.apache.ignite.configuration.IgniteDeploymentMode.*; -import static org.apache.ignite.cache.GridCacheAtomicityMode.*; -import static org.apache.ignite.cache.GridCacheMode.*; -import static org.apache.ignite.cache.GridCacheDistributionMode.*; -import static org.apache.ignite.cache.GridCachePreloadMode.*; - -/** - * - */ -public class GridCacheP2PUndeploySelfTest extends GridCommonAbstractTest { - /** Test p2p value. */ - private static final String TEST_VALUE = "org.gridgain.grid.tests.p2p.GridCacheDeploymentTestValue3"; - - /** */ - private static final long OFFHEAP = 0;// 4 * 1024 * 1024; - - /** */ - private final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); - - /** */ - private final AtomicInteger idxGen = new AtomicInteger(); - - /** */ - private GridCachePreloadMode mode = SYNC; - - /** */ - private boolean offheap; - - /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration cfg = super.getConfiguration(gridName); - - cfg.setNetworkTimeout(2000); - - TcpDiscoverySpi spi = new TcpDiscoverySpi(); - - spi.setIpFinder(ipFinder); - - cfg.setDiscoverySpi(spi); - - cfg.setMarshaller(new IgniteJdkMarshaller()); - - cfg.setSwapSpaceSpi(new FileSwapSpaceSpi()); - - CacheConfiguration repCacheCfg = defaultCacheConfiguration(); - - repCacheCfg.setName("replicated"); - repCacheCfg.setCacheMode(REPLICATED); - repCacheCfg.setPreloadMode(mode); - repCacheCfg.setWriteSynchronizationMode(GridCacheWriteSynchronizationMode.FULL_SYNC); - repCacheCfg.setQueryIndexEnabled(false); - repCacheCfg.setAtomicityMode(TRANSACTIONAL); - - if (offheap) - repCacheCfg.setOffHeapMaxMemory(OFFHEAP); - else - repCacheCfg.setSwapEnabled(true); - - CacheConfiguration partCacheCfg = defaultCacheConfiguration(); - - partCacheCfg.setName("partitioned"); - partCacheCfg.setCacheMode(PARTITIONED); - partCacheCfg.setPreloadMode(mode); - partCacheCfg.setAffinity(new GridCacheModuloAffinityFunction(11, 1)); - partCacheCfg.setWriteSynchronizationMode(GridCacheWriteSynchronizationMode.FULL_SYNC); - partCacheCfg.setEvictNearSynchronized(false); - partCacheCfg.setQueryIndexEnabled(false); - partCacheCfg.setAtomicityMode(TRANSACTIONAL); - partCacheCfg.setDistributionMode(NEAR_PARTITIONED); - - if (offheap) - partCacheCfg.setOffHeapMaxMemory(OFFHEAP); - else - partCacheCfg.setSwapEnabled(true); - - cfg.setCacheConfiguration(repCacheCfg, partCacheCfg); - - cfg.setDeploymentMode(SHARED); - cfg.setPeerClassLoadingLocalClassPathExclude(GridCacheP2PUndeploySelfTest.class.getName()); - - cfg.setUserAttributes(F.asMap(GridCacheModuloAffinityFunction.IDX_ATTR, idxGen.getAndIncrement())); - - return cfg; - } - - /** @throws Exception If failed. */ - public void testSwapP2PReplicated() throws Exception { - offheap = false; - - checkP2PUndeploy("replicated"); - } - - /** @throws Exception If failed. */ - public void testOffHeapP2PReplicated() throws Exception { - offheap = true; - - checkP2PUndeploy("replicated"); - } - - /** @throws Exception If failed. */ - public void testSwapP2PPartitioned() throws Exception { - offheap = false; - - checkP2PUndeploy("partitioned"); - } - - /** @throws Exception If failed. */ - public void testOffheapP2PPartitioned() throws Exception { - offheap = true; - - checkP2PUndeploy("partitioned"); - } - - /** @throws Exception If failed. */ - public void testSwapP2PReplicatedNoPreloading() throws Exception { - mode = NONE; - offheap = false; - - checkP2PUndeploy("replicated"); - } - - /** @throws Exception If failed. */ - public void testOffHeapP2PReplicatedNoPreloading() throws Exception { - mode = NONE; - offheap = true; - - checkP2PUndeploy("replicated"); - } - - /** @throws Exception If failed. */ - public void testSwapP2PPartitionedNoPreloading() throws Exception { - mode = NONE; - offheap = false; - - checkP2PUndeploy("partitioned"); - } - - /** @throws Exception If failed. */ - public void testOffHeapP2PPartitionedNoPreloading() throws Exception { - mode = NONE; - offheap = true; - - checkP2PUndeploy("partitioned"); - } - - /** - * @param cacheName Cache name. - * @param g Grid. - * @return Size. - * @throws IgniteCheckedException If failed. - */ - private long size(String cacheName, GridKernal g) throws IgniteCheckedException { - if (offheap) - return g.cache(cacheName).offHeapEntriesCount(); - - return g.context().swap().swapSize(swapSpaceName(cacheName, g)); - } - - /** - * @param cacheName Cache name. - * @throws Exception If failed. - */ - private void checkP2PUndeploy(String cacheName) throws Exception { - assert !F.isEmpty(cacheName); - - ClassLoader ldr = getExternalClassLoader(); - - Class valCls = ldr.loadClass(TEST_VALUE); - - try { - Ignite ignite1 = startGrid(1); - GridKernal grid2 = (GridKernal)startGrid(2); - - GridCache<Integer, Object> cache1 = ignite1.cache(cacheName); - GridCache<Integer, Object> cache2 = grid2.cache(cacheName); - - Object v1 = valCls.newInstance(); - - cache1.put(1, v1); - cache1.put(2, valCls.newInstance()); - cache1.put(3, valCls.newInstance()); - cache1.put(4, valCls.newInstance()); - - info("Stored value in cache1 [v=" + v1 + ", ldr=" + v1.getClass().getClassLoader() + ']'); - - Object v2 = cache2.get(1); - - assert v2 != null; - - info("Read value from cache2 [v=" + v2 + ", ldr=" + v2.getClass().getClassLoader() + ']'); - - assert v2 != null; - assert v2.toString().equals(v1.toString()); - assert !v2.getClass().getClassLoader().equals(getClass().getClassLoader()); - assert v2.getClass().getClassLoader().getClass().getName().contains("GridDeploymentClassLoader"); - - assert cache2.evict(2); - assert cache2.evict(3); - assert cache2.evict(4); - - long swapSize = size(cacheName, grid2); - - info("Swap size: " + swapSize); - - assert swapSize > 0; - - stopGrid(1); - - assert waitCacheEmpty(cache2, 10000); - - for (int i = 0; i < 3; i++) { - swapSize = size(cacheName, grid2); - - if (swapSize > 0) { - if (i < 2) { - U.warn(log, "Swap size check failed (will retry in 1000 ms): " + swapSize); - - U.sleep(1000); - - continue; - } - - fail("Swap size check failed: " + swapSize); - } - else if (swapSize == 0) - break; - else - assert false : "Negative swap size: " + swapSize; - } - } - finally { - stopAllGrids(); - } - } - - /** - * @param cacheName Cache name. - * @param grid Kernal. - * @return Name for swap space. - */ - private String swapSpaceName(String cacheName, GridKernal grid) { - GridCacheContext<Object, Object> cctx = grid.internalCache(cacheName).context(); - - return CU.swapSpaceName(cctx.isNear() ? cctx.near().dht().context() : cctx); - } - - /** - * @param cache Cache. - * @param timeout Timeout. - * @return {@code True} if success. - * @throws InterruptedException If thread was interrupted. - */ - @SuppressWarnings({"BusyWait"}) - private boolean waitCacheEmpty(GridCacheProjection<Integer, Object> cache, long timeout) - throws InterruptedException { - assert cache != null; - assert timeout >= 0; - - long end = System.currentTimeMillis() + timeout; - - while (end - System.currentTimeMillis() >= 0) { - if (cache.isEmpty()) - return true; - - Thread.sleep(500); - } - - return cache.isEmpty(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCachePartitionedAffinitySpreadTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCachePartitionedAffinitySpreadTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCachePartitionedAffinitySpreadTest.java deleted file mode 100644 index 1237455..0000000 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCachePartitionedAffinitySpreadTest.java +++ /dev/null @@ -1,167 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.cache; - -import org.apache.ignite.cache.affinity.consistenthash.*; -import org.apache.ignite.cluster.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.testframework.*; -import org.apache.ignite.testframework.junits.common.*; - -import java.util.*; - -import static org.apache.ignite.cache.affinity.consistenthash.GridCacheConsistentHashAffinityFunction.*; - -/** - * - */ -public class GridCachePartitionedAffinitySpreadTest extends GridCommonAbstractTest { - /** */ - public static final int NODES_CNT = 50; - - /** - * @throws Exception If failed. - */ - public void testPartitionSpreading() throws Exception { - System.out.printf("%6s, %6s, %6s, %6s, %8s\n", "Nodes", "Reps", "Min", "Max", "Dev"); - - for (int i = 5; i < NODES_CNT; i = i * 3 / 2) { - for (int replicas = 128; replicas <= 4096; replicas*=2) { - Collection<ClusterNode> nodes = createNodes(i, replicas); - - GridCacheConsistentHashAffinityFunction aff = new GridCacheConsistentHashAffinityFunction(false, 10000); - - checkDistribution(aff, nodes); - } - - System.out.println(); - } - } - - /** - * @param nodesCnt Nodes count. - * @param replicas Value of - * @return Collection of test nodes. - */ - private Collection<ClusterNode> createNodes(int nodesCnt, int replicas) { - Collection<ClusterNode> nodes = new ArrayList<>(nodesCnt); - - for (int i = 0; i < nodesCnt; i++) - nodes.add(new TestRichNode(replicas)); - - return nodes; - } - - /** - * @param aff Affinity to check. - * @param nodes Collection of nodes to test on. - */ - private void checkDistribution(GridCacheConsistentHashAffinityFunction aff, Collection<ClusterNode> nodes) { - Map<ClusterNode, Integer> parts = new HashMap<>(nodes.size()); - - for (int part = 0; part < aff.getPartitions(); part++) { - Collection<ClusterNode> affNodes = aff.nodes(part, nodes, 0); - - assertEquals(1, affNodes.size()); - - ClusterNode node = F.first(affNodes); - - parts.put(node, parts.get(node) != null ? parts.get(node) + 1 : 1); - } - - int min = Integer.MAX_VALUE; - int max = Integer.MIN_VALUE; - int total = 0; - - float mean = 0; - float m2 = 0; - int n = 0; - - for (ClusterNode node : nodes) { - int partsCnt = parts.get(node) != null ? parts.get(node) : 0; - - total += partsCnt; - - if (partsCnt < min) - min = partsCnt; - - if (partsCnt > max) - max = partsCnt; - - n++; - float delta = partsCnt - mean; - mean += delta / n; - m2 += delta * (partsCnt - mean); - } - - m2 /= (n - 1); - assertEquals(aff.getPartitions(), total); - - System.out.printf("%6s, %6s, %6s, %6s, %8.4f\n", nodes.size(), - F.first(nodes).attribute(DFLT_REPLICA_COUNT_ATTR_NAME), min, max, Math.sqrt(m2)); - } - - /** - * Rich node stub to use in emulated server topology. - */ - private static class TestRichNode extends GridTestNode { - /** */ - private final UUID nodeId; - - /** */ - private final int replicas; - - /** - * Externalizable class requires public no-arg constructor. - */ - @SuppressWarnings("UnusedDeclaration") - private TestRichNode(int replicas) { - this(UUID.randomUUID(), replicas); - } - - /** - * Constructs rich node stub to use in emulated server topology. - * - * @param nodeId Node id. - */ - private TestRichNode(UUID nodeId, int replicas) { - this.nodeId = nodeId; - this.replicas = replicas; - } - - /** - * Unused constructor for externalizable support. - */ - public TestRichNode() { - throw new UnsupportedOperationException(); - } - - /** {@inheritDoc} */ - @Override public UUID id() { - return nodeId; - } - - /** {@inheritDoc} */ - @Override public <T> T attribute(String name) { - if (DFLT_REPLICA_COUNT_ATTR_NAME.equals(name)) - return (T)new Integer(replicas); - - return super.attribute(name); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCachePartitionedGetSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCachePartitionedGetSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCachePartitionedGetSelfTest.java deleted file mode 100644 index 98ed140..0000000 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCachePartitionedGetSelfTest.java +++ /dev/null @@ -1,252 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.cache; - -import org.apache.ignite.*; -import org.apache.ignite.cache.*; -import org.apache.ignite.configuration.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.managers.communication.*; -import org.apache.ignite.internal.processors.cache.distributed.near.*; -import org.apache.ignite.spi.discovery.*; -import org.apache.ignite.spi.discovery.tcp.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; -import org.apache.ignite.testframework.junits.common.*; - -import java.util.*; -import java.util.concurrent.atomic.*; - -import static org.apache.ignite.cache.GridCacheMode.*; -import static org.apache.ignite.cache.GridCacheDistributionMode.*; -import static org.apache.ignite.cache.GridCachePreloadMode.*; -import static org.apache.ignite.cache.GridCacheWriteSynchronizationMode.*; -import static org.apache.ignite.internal.GridTopic.*; - -/** - * - */ -public class GridCachePartitionedGetSelfTest extends GridCommonAbstractTest { - /** */ - private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); - - /** */ - private static final int GRID_CNT = 3; - - /** */ - private static final String KEY = "key"; - - /** */ - private static final int VAL = 1; - - /** */ - private static final AtomicBoolean received = new AtomicBoolean(); - - /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration cfg = super.getConfiguration(gridName); - - cfg.setDiscoverySpi(discoverySpi()); - cfg.setCacheConfiguration(cacheConfiguration()); - - return cfg; - } - - /** - * @return Discovery SPI; - */ - private DiscoverySpi discoverySpi() { - TcpDiscoverySpi spi = new TcpDiscoverySpi(); - - spi.setIpFinder(IP_FINDER); - - return spi; - } - - /** - * @return Cache configuration. - */ - private CacheConfiguration cacheConfiguration() { - CacheConfiguration cc = defaultCacheConfiguration(); - - cc.setCacheMode(PARTITIONED); - cc.setBackups(1); - cc.setPreloadMode(SYNC); - cc.setWriteSynchronizationMode(FULL_SYNC); - cc.setSwapEnabled(true); - cc.setEvictNearSynchronized(false); - cc.setEvictSynchronized(false); - cc.setDistributionMode(PARTITIONED_ONLY); - - return cc; - } - - /** {@inheritDoc} */ - @Override protected void beforeTestsStarted() throws Exception { - startGridsMultiThreaded(GRID_CNT); - - prepare(); - } - - @Override protected void beforeTest() throws Exception { - received.set(false); - } - - /** {@inheritDoc} */ - @Override protected void afterTestsStopped() throws Exception { - stopAllGrids(); - } - - /** - * @throws Exception If failed. - */ - public void testGetFromPrimaryNode() throws Exception { - for (int i = 0; i < GRID_CNT; i++) { - GridCache<String, Integer> c = grid(i).cache(null); - - GridCacheEntry<String, Integer> e = c.entry(KEY); - - if (e.primary()) { - info("Primary node: " + grid(i).localNode().id()); - - c.get(KEY); - - break; - } - } - - assert !await(); - } - - /** - * @throws Exception If failed. - */ - public void testGetFromBackupNode() throws Exception { - for (int i = 0; i < GRID_CNT; i++) { - GridCache<String, Integer> c = grid(i).cache(null); - - GridCacheEntry<String, Integer> e = c.entry(KEY); - - if (e.backup()) { - info("Backup node: " + grid(i).localNode().id()); - - Integer val = c.get(KEY); - - assert val != null && val == 1; - - assert !await(); - - assert c.evict(KEY); - - assert c.peek(KEY) == null; - - val = c.get(KEY); - - assert val != null && val == 1; - - assert !await(); - - break; - } - } - } - - /** - * @throws Exception If failed. - */ - public void testGetFromNearNode() throws Exception { - for (int i = 0; i < GRID_CNT; i++) { - GridCache<String, Integer> c = grid(i).cache(null); - - GridCacheEntry<String, Integer> e = c.entry(KEY); - - if (!e.primary() && !e.backup()) { - info("Near node: " + grid(i).localNode().id()); - - Integer val = c.get(KEY); - - assert val != null && val == 1; - - break; - } - } - - assert await(); - } - - /** - * @return {@code True} if awaited message. - * @throws Exception If failed. - */ - @SuppressWarnings({"BusyWait"}) - private boolean await() throws Exception { - info("Checking flag: " + System.identityHashCode(received)); - - for (int i = 0; i < 3; i++) { - if (received.get()) - return true; - - info("Flag is false."); - - Thread.sleep(500); - } - - return received.get(); - } - - /** - * Puts value to primary node and registers listener - * that sets {@link #received} flag to {@code true} - * if {@link GridNearGetRequest} was received on primary node. - * - * @throws Exception If failed. - */ - @SuppressWarnings("deprecation") - private void prepare() throws Exception { - for (int i = 0; i < GRID_CNT; i++) { - Ignite g = grid(i); - - GridCacheEntry<String, Integer> e = g.<String, Integer>cache(null).entry(KEY); - - if (e.primary()) { - info("Primary node: " + g.cluster().localNode().id()); - - // Put value. - g.cache(null).put(KEY, VAL); - - // Register listener. - ((GridKernal)g).context().io().addMessageListener( - TOPIC_CACHE, - new GridMessageListener() { - @Override public void onMessage(UUID nodeId, Object msg) { - info("Received message from node [nodeId=" + nodeId + ", msg=" + msg + ']'); - - if (msg instanceof GridNearGetRequest) { - info("Setting flag: " + System.identityHashCode(received)); - - received.set(true); - } - } - } - ); - - break; - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCachePartitionedProjectionAffinitySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCachePartitionedProjectionAffinitySelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCachePartitionedProjectionAffinitySelfTest.java deleted file mode 100644 index f3862fc..0000000 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCachePartitionedProjectionAffinitySelfTest.java +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.cache; - -import org.apache.ignite.*; -import org.apache.ignite.cache.*; -import org.apache.ignite.cluster.*; -import org.apache.ignite.configuration.*; -import org.apache.ignite.spi.discovery.tcp.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.testframework.*; -import org.apache.ignite.testframework.junits.common.*; - -import static org.apache.ignite.cache.GridCacheAtomicityMode.*; -import static org.apache.ignite.cache.GridCacheMode.*; -import static org.apache.ignite.cache.GridCacheDistributionMode.*; -import static org.apache.ignite.cache.GridCachePreloadMode.*; - -/** - * Partitioned affinity test for projections. - */ -@SuppressWarnings({"PointlessArithmeticExpression"}) -public class GridCachePartitionedProjectionAffinitySelfTest extends GridCommonAbstractTest { - /** Backup count. */ - private static final int BACKUPS = 1; - - /** Grid count. */ - private static final int GRIDS = 3; - - /** */ - private TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); - - /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration cfg = super.getConfiguration(gridName); - - CacheConfiguration cacheCfg = defaultCacheConfiguration(); - - cacheCfg.setCacheMode(PARTITIONED); - cacheCfg.setBackups(BACKUPS); - cacheCfg.setWriteSynchronizationMode(GridCacheWriteSynchronizationMode.FULL_SYNC); - cacheCfg.setPreloadMode(SYNC); - cacheCfg.setAtomicityMode(TRANSACTIONAL); - cacheCfg.setDistributionMode(NEAR_PARTITIONED); - - cfg.setCacheConfiguration(cacheCfg); - - TcpDiscoverySpi spi = new TcpDiscoverySpi(); - - spi.setIpFinder(ipFinder); - - cfg.setDiscoverySpi(spi); - - return cfg; - } - - /** {@inheritDoc} */ - @Override protected void beforeTest() throws Exception { - startGrids(GRIDS); - } - - /** {@inheritDoc} */ - @Override protected void afterTest() throws Exception { - stopAllGrids(); - } - - /** @throws Exception If failed. */ - public void testAffinity() throws Exception { - waitTopologyUpdate(); - - Ignite g0 = grid(0); - Ignite g1 = grid(1); - - for (int i = 0; i < 100; i++) - assertEquals(g0.cluster().mapKeyToNode(null, i).id(), g1.cluster().mapKeyToNode(null, i).id()); - } - - /** @throws Exception If failed. */ - @SuppressWarnings("deprecation") - public void testProjectionAffinity() throws Exception { - waitTopologyUpdate(); - - Ignite g0 = grid(0); - Ignite g1 = grid(1); - - ClusterGroup g0Pinned = g0.cluster().forNodeIds(F.asList(g0.cluster().localNode().id())); - - ClusterGroup g01Pinned = - g1.cluster().forNodeIds(F.asList(g0.cluster().localNode().id(), g1.cluster().localNode().id())); - - for (int i = 0; i < 100; i++) - assertEquals(g0Pinned.ignite().cluster().mapKeyToNode(null, i).id(), - g01Pinned.ignite().cluster().mapKeyToNode(null, i).id()); - } - - /** @throws Exception If failed. */ - @SuppressWarnings("BusyWait") - private void waitTopologyUpdate() throws Exception { - GridTestUtils.waitTopologyUpdate(null, BACKUPS, log()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCachePartitionedWritesTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCachePartitionedWritesTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCachePartitionedWritesTest.java deleted file mode 100644 index da18d0f..0000000 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCachePartitionedWritesTest.java +++ /dev/null @@ -1,149 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.cache; - -import org.apache.ignite.cache.*; -import org.apache.ignite.cache.store.*; -import org.apache.ignite.configuration.*; -import org.apache.ignite.internal.processors.cache.*; -import org.apache.ignite.transactions.*; -import org.apache.ignite.spi.discovery.tcp.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; -import org.apache.ignite.testframework.junits.common.*; - -import javax.cache.*; -import javax.cache.configuration.*; -import java.util.concurrent.atomic.*; - -import static org.apache.ignite.cache.GridCacheAtomicityMode.*; -import static org.apache.ignite.cache.GridCacheDistributionMode.*; - -/** - * Test that in {@link GridCacheMode#PARTITIONED} mode cache writes values only to the near cache store. <p/> This check - * is needed because in current implementation if {@link GridCacheWriteBehindStore} assumes that and user store is - * wrapped only in near cache (see {@link GridCacheProcessor} init logic). - */ -@SuppressWarnings({"unchecked"}) -public class GridCachePartitionedWritesTest extends GridCommonAbstractTest { - /** Cache store. */ - private CacheStore store; - - /** {@inheritDoc} */ - @Override protected final IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration c = super.getConfiguration(gridName); - - TcpDiscoverySpi disco = new TcpDiscoverySpi(); - - disco.setIpFinder(new TcpDiscoveryVmIpFinder(true)); - - c.setDiscoverySpi(disco); - - CacheConfiguration cc = defaultCacheConfiguration(); - - cc.setCacheMode(GridCacheMode.PARTITIONED); - cc.setWriteSynchronizationMode(GridCacheWriteSynchronizationMode.FULL_SYNC); - cc.setSwapEnabled(false); - cc.setAtomicityMode(TRANSACTIONAL); - cc.setDistributionMode(NEAR_PARTITIONED); - - assert store != null; - - cc.setCacheStoreFactory(new FactoryBuilder.SingletonFactory(store)); - cc.setReadThrough(true); - cc.setWriteThrough(true); - cc.setLoadPreviousValue(true); - - c.setCacheConfiguration(cc); - - return c; - } - - /** {@inheritDoc} */ - @Override protected void afterTest() throws Exception { - store = null; - - super.afterTest(); - } - - /** @throws Exception If test fails. */ - public void testWrite() throws Exception { - final AtomicInteger putCnt = new AtomicInteger(); - final AtomicInteger rmvCnt = new AtomicInteger(); - - store = new CacheStoreAdapter<Object, Object>() { - @Override public Object load(Object key) { - info(">>> Get [key=" + key + ']'); - - return null; - } - - @Override public void write(Cache.Entry<? extends Object, ? extends Object> entry) { - putCnt.incrementAndGet(); - } - - @Override public void delete(Object key) { - rmvCnt.incrementAndGet(); - } - }; - - startGrid(); - - GridCache<Integer, String> cache = cache(); - - try { - cache.get(1); - - IgniteTx tx = cache.txStart(); - - try { - for (int i = 1; i <= 10; i++) - cache.putx(i, Integer.toString(i)); - - tx.commit(); - } - finally { - tx.close(); - } - - assert cache.size() == 10; - - assert putCnt.get() == 10; - - tx = cache.txStart(); - - try { - for (int i = 1; i <= 10; i++) { - String val = cache.remove(i); - - assert val != null; - assert val.equals(Integer.toString(i)); - } - - tx.commit(); - } - finally { - tx.close(); - } - - assert rmvCnt.get() == 10; - } - finally { - stopGrid(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCachePreloadingEvictionsSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCachePreloadingEvictionsSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCachePreloadingEvictionsSelfTest.java deleted file mode 100644 index 3bf381f..0000000 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCachePreloadingEvictionsSelfTest.java +++ /dev/null @@ -1,262 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.cache; - -import org.apache.ignite.*; -import org.apache.ignite.cache.*; -import org.apache.ignite.configuration.*; -import org.apache.ignite.events.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.processors.cache.*; -import org.apache.ignite.internal.processors.cache.distributed.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.spi.discovery.tcp.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.testframework.*; -import org.apache.ignite.testframework.junits.common.*; -import org.jetbrains.annotations.*; - -import java.util.concurrent.*; -import java.util.concurrent.atomic.*; - -import static org.apache.ignite.cache.GridCacheAtomicityMode.*; -import static org.apache.ignite.cache.GridCacheMode.*; -import static org.apache.ignite.cache.GridCacheDistributionMode.*; -import static org.apache.ignite.cache.GridCachePreloadMode.*; -import static org.apache.ignite.cache.GridCacheWriteSynchronizationMode.*; -import static org.apache.ignite.events.IgniteEventType.*; - -/** - * - */ -public class GridCachePreloadingEvictionsSelfTest extends GridCommonAbstractTest { - /** */ - private static final String VALUE = createValue(); - - /** */ - private final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); - - /** */ - private final AtomicInteger idxGen = new AtomicInteger(); - - /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration cfg = super.getConfiguration(gridName); - - TcpDiscoverySpi spi = new TcpDiscoverySpi(); - - spi.setIpFinder(ipFinder); - - cfg.setDiscoverySpi(spi); - - CacheConfiguration partCacheCfg = defaultCacheConfiguration(); - - partCacheCfg.setCacheMode(PARTITIONED); - partCacheCfg.setAffinity(new GridCacheModuloAffinityFunction(1, 1)); - partCacheCfg.setWriteSynchronizationMode(FULL_SYNC); - partCacheCfg.setDistributionMode(PARTITIONED_ONLY); - partCacheCfg.setEvictSynchronized(true); - partCacheCfg.setSwapEnabled(false); - partCacheCfg.setEvictionPolicy(null); - partCacheCfg.setEvictSynchronizedKeyBufferSize(25); - partCacheCfg.setEvictMaxOverflowRatio(0.99f); - partCacheCfg.setPreloadMode(ASYNC); - partCacheCfg.setAtomicityMode(TRANSACTIONAL); - - // This test requires artificial slowing down of the preloading. - partCacheCfg.setPreloadThrottle(2000); - - cfg.setCacheConfiguration(partCacheCfg); - - cfg.setUserAttributes(F.asMap(GridCacheModuloAffinityFunction.IDX_ATTR, idxGen.getAndIncrement())); - - cfg.setNetworkTimeout(60000); - - return cfg; - } - - /** - * @throws Exception If failed. - */ - @SuppressWarnings("BusyWait") - public void testEvictions() throws Exception { - try { - final Ignite ignite1 = startGrid(1); - - GridCache<Integer, Object> cache1 = ignite1.cache(null); - - for (int i = 0; i < 5000; i++) - cache1.put(i, VALUE + i); - - info("Finished data population."); - - final AtomicBoolean done = new AtomicBoolean(); - - final CountDownLatch startLatch = new CountDownLatch(1); - - int oldSize = cache1.size(); - - IgniteFuture fut = multithreadedAsync( - new Callable<Object>() { - @Nullable @Override public Object call() throws Exception { - startLatch.await(); - - info("Started evicting..."); - - for (int i = 0; i < 3000 && !done.get(); i++) { - GridCacheEntry<Integer, Object> entry = randomEntry(ignite1); - - if (entry != null) - entry.evict(); - else - info("Entry is null."); - } - - info("Finished evicting."); - - return null; - } - }, - 1); - - ignite1.events().localListen( - new IgnitePredicate<IgniteEvent>() { - @Override public boolean apply(IgniteEvent evt) { - startLatch.countDown(); - - return true; - } - }, - EVT_NODE_JOINED); - - final Ignite ignite2 = startGrid(2); - - done.set(true); - - fut.get(); - - sleepUntilCashesEqualize(ignite1, ignite2, oldSize); - - checkCachesConsistency(ignite1, ignite2); - - oldSize = cache1.size(); - - info("Evicting on constant topology."); - - for (int i = 0; i < 1000; i++) { - GridCacheEntry<Integer, Object> entry = randomEntry(ignite1); - - if (entry != null) - entry.evict(); - else - info("Entry is null."); - } - - sleepUntilCashesEqualize(ignite1, ignite2, oldSize); - - checkCachesConsistency(ignite1, ignite2); - } - finally { - stopAllGrids(); - } - } - - /** - * Waits until cache stabilizes on new value. - * - * @param ignite1 Grid 1. - * @param ignite2 Grid 2. - * @param oldSize Old size, stable size should be . - * @throws org.apache.ignite.IgniteInterruptedException If interrupted. - */ - private void sleepUntilCashesEqualize(final Ignite ignite1, final Ignite ignite2, final int oldSize) - throws IgniteInterruptedException { - info("Sleeping..."); - - assertTrue(GridTestUtils.waitForCondition(new PA() { - @Override public boolean apply() { - int size1 = ignite1.cache(null).size(); - return size1 != oldSize && size1 == ignite2.cache(null).size(); - } - }, getTestTimeout())); - - info("Sleep finished."); - } - - /** - * @param g Grid. - * @return Random entry from cache. - */ - @Nullable private GridCacheEntry<Integer, Object> randomEntry(Ignite g) { - GridKernal g1 = (GridKernal)g; - - return g1.<Integer, Object>internalCache().randomEntry(); - } - - /** - * @param ignite1 Grid 1. - * @param ignite2 Grid 2. - * @throws Exception If failed. - */ - private void checkCachesConsistency(Ignite ignite1, Ignite ignite2) throws Exception { - GridKernal g1 = (GridKernal) ignite1; - GridKernal g2 = (GridKernal) ignite2; - - GridCacheAdapter<Integer, Object> cache1 = g1.internalCache(); - GridCacheAdapter<Integer, Object> cache2 = g2.internalCache(); - - for (int i = 0; i < 3; i++) { - if (cache1.size() != cache2.size()) { - U.warn(log, "Sizes do not match (will retry in 1000 ms) [s1=" + cache1.size() + - ", s2=" + cache2.size() + ']'); - - U.sleep(1000); - } - else - break; - } - - info("Cache1 size: " + cache1.size()); - info("Cache2 size: " + cache2.size()); - - assert cache1.size() == cache2.size() : "Sizes do not match [s1=" + cache1.size() + - ", s2=" + cache2.size() + ']'; - - for (Integer key : cache1.keySet()) { - Object e = cache1.peek(key); - - if (e != null) - assert cache2.containsKey(key, null) : "Cache2 does not contain key: " + key; - } - } - - /** - * @return Large value for test. - */ - private static String createValue() { - SB sb = new SB(1024); - - for (int i = 0; i < 64; i++) - sb.a("val1"); - - return sb.toString(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCachePutAllFailoverSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCachePutAllFailoverSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCachePutAllFailoverSelfTest.java deleted file mode 100644 index f99de18..0000000 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCachePutAllFailoverSelfTest.java +++ /dev/null @@ -1,707 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.cache; - -import com.google.common.collect.*; -import org.apache.ignite.*; -import org.apache.ignite.cache.*; -import org.apache.ignite.cluster.*; -import org.apache.ignite.compute.*; -import org.apache.ignite.configuration.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.resources.*; -import org.apache.ignite.spi.*; -import org.apache.ignite.spi.discovery.tcp.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; -import org.apache.ignite.spi.failover.*; -import org.apache.ignite.spi.failover.always.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.testframework.junits.common.*; - -import java.util.*; -import java.util.concurrent.*; -import java.util.concurrent.atomic.*; - -import static org.apache.ignite.cache.GridCacheDistributionMode.*; -import static org.apache.ignite.cache.GridCacheWriteSynchronizationMode.*; - -/** - * Tests putAll() method along with failover and different configurations. - */ -public class GridCachePutAllFailoverSelfTest extends GridCommonAbstractTest { - /** IP finder. */ - private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); - - /** Size of the test map. */ - private static final int TEST_MAP_SIZE = 100000; - - /** Cache name. */ - private static final String CACHE_NAME = "partitioned"; - - /** Size of data chunk, sent to a remote node. */ - private static final int DATA_CHUNK_SIZE = 1000; - - /** Number of chunk on which to fail worker node. */ - public static final int FAIL_ON_CHUNK_NO = (TEST_MAP_SIZE / DATA_CHUNK_SIZE) / 3; - - /** Await timeout in seconds. */ - public static final int AWAIT_TIMEOUT_SEC = 65; - - /** */ - private static final int FAILOVER_PUSH_GAP = 30; - - /** Master node name. */ - private static final String MASTER = "master"; - - /** Near enabled flag. */ - private boolean nearEnabled; - - /** Backups count. */ - private int backups; - - /** Filter to include only worker nodes. */ - private static final IgnitePredicate<ClusterNode> workerNodesFilter = new PN() { - @SuppressWarnings("unchecked") - @Override public boolean apply(ClusterNode n) { - return "worker".equals(n.attribute("segment")); - } - }; - - /** - * Result future queue (restrict the queue size - * to 50 in order to prevent in-memory data grid from over loading). - */ - private final BlockingQueue<ComputeTaskFuture<?>> resQueue = new LinkedBlockingQueue<>(50); - - /** Test failover SPI. */ - private MasterFailoverSpi failoverSpi = new MasterFailoverSpi((IgnitePredicate)workerNodesFilter); - - /** - * @throws Exception If failed. - */ - public void testPutAllFailoverColocatedNearEnabledThreeBackups() throws Exception { - checkPutAllFailoverColocated(true, 7, 3); - } - - /** - * @throws Exception If failed. - */ - public void testPutAllFailoverColocatedNearDisabledThreeBackups() throws Exception { - checkPutAllFailoverColocated(false, 7, 3); - } - - /** - * @throws Exception If failed. - */ - public void testPutAllFailoverNearEnabledOneBackup() throws Exception { - checkPutAllFailover(true, 3, 1); - } - - /** - * @throws Exception If failed. - */ - public void testPutAllFailoverNearDisabledOneBackup() throws Exception { - checkPutAllFailover(false, 3, 1); - } - - /** - * @throws Exception If failed. - */ - public void testPutAllFailoverNearEnabledTwoBackups() throws Exception { - checkPutAllFailover(true, 5, 2); - } - - /** - * @throws Exception If failed. - */ - public void testPutAllFailoverNearDisabledTwoBackups() throws Exception { - checkPutAllFailover(false, 5, 2); - } - - /** - * @throws Exception If failed. - */ - public void testPutAllFailoverNearEnabledThreeBackups() throws Exception { - checkPutAllFailover(true, 7, 3); - } - - /** - * @throws Exception If failed. - */ - public void testPutAllFailoverNearDisabledThreeBackups() throws Exception { - checkPutAllFailover(false, 7, 3); - } - - /** - * @throws Exception If failed. - */ - public void testPutAllFailoverColocatedNearEnabledOneBackup() throws Exception { - checkPutAllFailoverColocated(true, 3, 1); - } - - /** - * @throws Exception If failed. - */ - public void testPutAllFailoverColocatedNearDisabledOneBackup() throws Exception { - checkPutAllFailoverColocated(false, 3, 1); - } - - /** - * @throws Exception If failed. - */ - public void testPutAllFailoverColocatedNearEnabledTwoBackups() throws Exception { - checkPutAllFailoverColocated(true, 5, 2); - } - - /** - * @throws Exception If failed. - */ - public void testPutAllFailoverColocatedNearDisabledTwoBackups() throws Exception { - checkPutAllFailoverColocated(false, 5, 2); - } - - /** {@inheritDoc} */ - @Override protected long getTestTimeout() { - return super.getTestTimeout() * 5; - } - - /** - * Tests putAll() method along with failover and cache backup. - * - * Checks that the resulting primary cache size is the same as - * expected. - * - * @param near Near enabled. - * @param workerCnt Worker count. - * @param shutdownCnt Shutdown count. - * @throws Exception If failed. - */ - public void checkPutAllFailover(boolean near, int workerCnt, int shutdownCnt) throws Exception { - nearEnabled = near; - backups = shutdownCnt; - - Collection<Integer> testKeys = generateTestKeys(); - - final Ignite master = startGrid(MASTER); - - List<Ignite> workers = new ArrayList<>(workerCnt); - - for (int i = 1; i <= workerCnt; i++) - workers.add(startGrid("worker" + i)); - - info("Master: " + master.cluster().localNode().id()); - - List<Ignite> runningWorkers = new ArrayList<>(workerCnt); - - for (int i = 1; i <= workerCnt; i++) { - UUID id = workers.get(i - 1).cluster().localNode().id(); - - info(String.format("Worker%d - %s", i, id)); - - runningWorkers.add(workers.get(i - 1)); - } - - try { - // Dummy call to fetch affinity function from remote node - master.cluster().mapKeyToNode(CACHE_NAME, "Dummy"); - - Random rnd = new Random(); - - Collection<Integer> dataChunk = new ArrayList<>(DATA_CHUNK_SIZE); - int entryCntr = 0; - int chunkCntr = 0; - final AtomicBoolean jobFailed = new AtomicBoolean(false); - - int failoverPushGap = 0; - - final CountDownLatch emptyLatch = new CountDownLatch(1); - - final AtomicBoolean inputExhausted = new AtomicBoolean(); - - IgniteCompute comp = compute(master.cluster().forPredicate(workerNodesFilter)).enableAsync(); - - for (Integer key : testKeys) { - dataChunk.add(key); - entryCntr++; - - if (entryCntr == DATA_CHUNK_SIZE) { // time to send data - chunkCntr++; - - assert dataChunk.size() == DATA_CHUNK_SIZE; - - log.info("Pushing data chunk [chunkNo=" + chunkCntr + "]"); - - comp.execute( - new GridCachePutAllTask( - runningWorkers.get(rnd.nextInt(runningWorkers.size())).cluster().localNode().id(), - CACHE_NAME), - dataChunk); - - ComputeTaskFuture<Void> fut = comp.future(); - - resQueue.put(fut); // Blocks if queue is full. - - fut.listenAsync(new CI1<IgniteFuture<Void>>() { - @Override public void apply(IgniteFuture<Void> f) { - ComputeTaskFuture<?> taskFut = (ComputeTaskFuture<?>)f; - - try { - taskFut.get(); //if something went wrong - we'll get exception here - } - catch (IgniteCheckedException e) { - log.error("Job failed", e); - - jobFailed.set(true); - } - - // Remove complete future from queue to allow other jobs to proceed. - resQueue.remove(taskFut); - - if (inputExhausted.get() && resQueue.isEmpty()) - emptyLatch.countDown(); - } - }); - - entryCntr = 0; - dataChunk = new ArrayList<>(DATA_CHUNK_SIZE); - - if (chunkCntr >= FAIL_ON_CHUNK_NO) { - if (workerCnt - runningWorkers.size() < shutdownCnt) { - if (failoverPushGap > 0) - failoverPushGap--; - else { - Ignite victim = runningWorkers.remove(0); - - info("Shutting down node: " + victim.cluster().localNode().id()); - - stopGrid(victim.name()); - - // Fail next node after some jobs have been pushed. - failoverPushGap = FAILOVER_PUSH_GAP; - } - } - } - } - } - - inputExhausted.set(true); - - if (resQueue.isEmpty()) - emptyLatch.countDown(); - - assert chunkCntr == TEST_MAP_SIZE / DATA_CHUNK_SIZE; - - // Wait for queue to empty. - log.info("Waiting for empty queue..."); - - boolean failedWait = false; - - if (!emptyLatch.await(AWAIT_TIMEOUT_SEC, TimeUnit.SECONDS)) { - info(">>> Failed to wait for queue to empty."); - - failedWait = true; - } - - if (!failedWait) - assertFalse("One or more jobs have failed.", jobFailed.get()); - - Collection<Integer> absentKeys = findAbsentKeys(runningWorkers.get(0), testKeys); - - if (!failedWait && !absentKeys.isEmpty()) { - // Give some time to preloader. - U.sleep(15000); - - absentKeys = findAbsentKeys(runningWorkers.get(0), testKeys); - } - - info(">>> Absent keys: " + absentKeys); - - assertTrue(absentKeys.isEmpty()); - - // Actual primary cache size. - int primaryCacheSize = 0; - - for (Ignite g : runningWorkers) { - info(">>>>> " + g.cache(CACHE_NAME).size()); - - primaryCacheSize += g.cache(CACHE_NAME).primarySize(); - } - - assertEquals(TEST_MAP_SIZE, primaryCacheSize); - } - finally { - stopAllGrids(); - } - } - - /** - * Tests putAll() method along with failover and cache backup. - * - * Checks that the resulting primary cache size is the same as - * expected. - * - * @param near Near enabled. - * @param workerCnt Worker count. - * @param shutdownCnt Shutdown count. - * @throws Exception If failed. - */ - public void checkPutAllFailoverColocated(boolean near, int workerCnt, int shutdownCnt) throws Exception { - nearEnabled = near; - backups = shutdownCnt; - - Collection<Integer> testKeys = generateTestKeys(); - - final Ignite master = startGrid(MASTER); - - List<Ignite> workers = new ArrayList<>(workerCnt); - - for (int i = 1; i <= workerCnt; i++) - workers.add(startGrid("worker" + i)); - - info("Master: " + master.cluster().localNode().id()); - - List<Ignite> runningWorkers = new ArrayList<>(workerCnt); - - for (int i = 1; i <= workerCnt; i++) { - UUID id = workers.get(i - 1).cluster().localNode().id(); - - info(String.format("Worker%d: %s", i, id)); - - runningWorkers.add(workers.get(i - 1)); - } - - try { - // Dummy call to fetch affinity function from remote node - master.cluster().mapKeyToNode(CACHE_NAME, "Dummy"); - - Map<UUID, Collection<Integer>> dataChunks = new HashMap<>(); - - int chunkCntr = 0; - final AtomicBoolean jobFailed = new AtomicBoolean(false); - - int failoverPushGap = 0; - - final CountDownLatch emptyLatch = new CountDownLatch(1); - - final AtomicBoolean inputExhausted = new AtomicBoolean(); - - IgniteCompute comp = compute(master.cluster().forPredicate(workerNodesFilter)).enableAsync(); - - for (Integer key : testKeys) { - ClusterNode mappedNode = master.cluster().mapKeyToNode(CACHE_NAME, key); - - UUID nodeId = mappedNode.id(); - - Collection<Integer> data = dataChunks.get(nodeId); - - if (data == null) { - data = new ArrayList<>(DATA_CHUNK_SIZE); - - dataChunks.put(nodeId, data); - } - - data.add(key); - - if (data.size() == DATA_CHUNK_SIZE) { // time to send data - chunkCntr++; - - log.info("Pushing data chunk [chunkNo=" + chunkCntr + "]"); - - comp.execute(new GridCachePutAllTask(nodeId, CACHE_NAME), data); - - ComputeTaskFuture<Void> fut = comp.future(); - - resQueue.put(fut); // Blocks if queue is full. - - fut.listenAsync(new CI1<IgniteFuture<Void>>() { - @Override public void apply(IgniteFuture<Void> f) { - ComputeTaskFuture<?> taskFut = (ComputeTaskFuture<?>)f; - - try { - taskFut.get(); //if something went wrong - we'll get exception here - } - catch (IgniteCheckedException e) { - log.error("Job failed", e); - - jobFailed.set(true); - } - - // Remove complete future from queue to allow other jobs to proceed. - resQueue.remove(taskFut); - - if (inputExhausted.get() && resQueue.isEmpty()) - emptyLatch.countDown(); - } - }); - - data = new ArrayList<>(DATA_CHUNK_SIZE); - - dataChunks.put(nodeId, data); - - if (chunkCntr >= FAIL_ON_CHUNK_NO) { - if (workerCnt - runningWorkers.size() < shutdownCnt) { - if (failoverPushGap > 0) - failoverPushGap--; - else { - Ignite victim = runningWorkers.remove(0); - - info("Shutting down node: " + victim.cluster().localNode().id()); - - stopGrid(victim.name()); - - // Fail next node after some jobs have been pushed. - failoverPushGap = FAILOVER_PUSH_GAP; - } - } - } - } - } - - for (Map.Entry<UUID, Collection<Integer>> entry : dataChunks.entrySet()) { - comp.execute(new GridCachePutAllTask(entry.getKey(), CACHE_NAME), entry.getValue()); - - ComputeTaskFuture<Void> fut = comp.future(); - - resQueue.put(fut); // Blocks if queue is full. - - fut.listenAsync(new CI1<IgniteFuture<Void>>() { - @Override public void apply(IgniteFuture<Void> f) { - ComputeTaskFuture<?> taskFut = (ComputeTaskFuture<?>)f; - - try { - taskFut.get(); //if something went wrong - we'll get exception here - } - catch (IgniteCheckedException e) { - log.error("Job failed", e); - - jobFailed.set(true); - } - - // Remove complete future from queue to allow other jobs to proceed. - resQueue.remove(taskFut); - - if (inputExhausted.get() && resQueue.isEmpty()) - emptyLatch.countDown(); - } - }); - } - - inputExhausted.set(true); - - if (resQueue.isEmpty()) - emptyLatch.countDown(); - - // Wait for queue to empty. - log.info("Waiting for empty queue..."); - - boolean failedWait = false; - - if (!emptyLatch.await(AWAIT_TIMEOUT_SEC, TimeUnit.SECONDS)) { - info(">>> Failed to wait for queue to empty."); - - failedWait = true; - } - - if (!failedWait) - assertFalse("One or more jobs have failed.", jobFailed.get()); - - Collection<Integer> absentKeys = findAbsentKeys(runningWorkers.get(0), testKeys); - - if (!failedWait && !absentKeys.isEmpty()) { - // Give some time to preloader. - U.sleep(15000); - - absentKeys = findAbsentKeys(runningWorkers.get(0), testKeys); - } - - info(">>> Absent keys: " + absentKeys); - - assertTrue(absentKeys.isEmpty()); - - // Actual primary cache size. - int primaryCacheSize = 0; - - for (Ignite g : runningWorkers) { - info(">>>>> " + g.cache(CACHE_NAME).size()); - - primaryCacheSize += g.cache(CACHE_NAME).primarySize(); - } - - assertEquals(TEST_MAP_SIZE, primaryCacheSize); - } - finally { - stopAllGrids(); - } - } - - /** - * Tries to find keys, that are absent in cache. - * - * @param workerNode Worker node. - * @param keys Keys that are suspected to be absent - * @return List of absent keys. If no keys are absent, the list is empty. - * @throws IgniteCheckedException If error occurs. - */ - private Collection<Integer> findAbsentKeys(Ignite workerNode, - Collection<Integer> keys) throws IgniteCheckedException { - - Collection<Integer> ret = new ArrayList<>(keys.size()); - - GridCache<Object, Object> cache = workerNode.cache(CACHE_NAME); - - for (Integer key : keys) { - if (cache.get(key) == null) // Key is absent. - ret.add(key); - } - - return ret; - } - - /** - * Generates a test keys collection. - * - * @return A test keys collection. - */ - private Collection<Integer> generateTestKeys() { - Collection<Integer> ret = new ArrayList<>(TEST_MAP_SIZE); - - for (int i = 0; i < TEST_MAP_SIZE; i++) - ret.add(i); - - return ret; - } - - /** {@inheritDoc} */ - @SuppressWarnings("unchecked") - @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration cfg = super.getConfiguration(gridName); - - cfg.setPeerClassLoadingEnabled(false); - - cfg.setDeploymentMode(IgniteDeploymentMode.CONTINUOUS); - - TcpDiscoverySpi discoverySpi = new TcpDiscoverySpi(); - - discoverySpi.setAckTimeout(60000); - discoverySpi.setIpFinder(ipFinder); - - cfg.setDiscoverySpi(discoverySpi); - - if (gridName.startsWith("master")) { - cfg.setUserAttributes(ImmutableMap.of("segment", "master")); - - // For sure. - failoverSpi.setMaximumFailoverAttempts(50); - - cfg.setFailoverSpi(failoverSpi); - } - else if (gridName.startsWith("worker")) { - cfg.setUserAttributes(ImmutableMap.of("segment", "worker")); - - CacheConfiguration cacheCfg = defaultCacheConfiguration(); - cacheCfg.setName("partitioned"); - cacheCfg.setCacheMode(GridCacheMode.PARTITIONED); - cacheCfg.setStartSize(4500000); - - cacheCfg.setBackups(backups); - - cacheCfg.setStoreValueBytes(true); - cacheCfg.setDistributionMode(nearEnabled ? NEAR_PARTITIONED : PARTITIONED_ONLY); - cacheCfg.setQueryIndexEnabled(false); - - cacheCfg.setWriteSynchronizationMode(FULL_SYNC); - - cfg.setCacheConfiguration(cacheCfg); - } - else - throw new IllegalStateException("Unexpected grid name: " + gridName); - - return cfg; - } - - /** - * Test failover SPI for master node. - */ - @IgniteSpiConsistencyChecked(optional = true) - private static class MasterFailoverSpi extends AlwaysFailoverSpi { - /** */ - private static final String FAILOVER_NUMBER_ATTR = "failover:number:attr"; - - /** */ - private Set<ComputeJobContext> failedOverJobs = new HashSet<>(); - - /** Node filter. */ - private IgnitePredicate<? super ClusterNode>[] filter; - - /** */ - @IgniteLoggerResource - private IgniteLogger log; - - /** - * @param filter Filter. - */ - MasterFailoverSpi(IgnitePredicate<? super ClusterNode>... filter) { - this.filter = filter; - } - - /** {@inheritDoc} */ - @Override public ClusterNode failover(FailoverContext ctx, List<ClusterNode> top) { - failedOverJobs.add(ctx.getJobResult().getJobContext()); - - // Clear failed nodes list - allow to failover on the same node. - ctx.getJobResult().getJobContext().setAttribute(FAILED_NODE_LIST_ATTR, null); - - // Account for maximum number of failover attempts since we clear failed node list. - Integer failoverCnt = ctx.getJobResult().getJobContext().getAttribute(FAILOVER_NUMBER_ATTR); - - if (failoverCnt == null) - ctx.getJobResult().getJobContext().setAttribute(FAILOVER_NUMBER_ATTR, 1); - else { - if (failoverCnt >= getMaximumFailoverAttempts()) { - U.warn(log, "Job failover failed because number of maximum failover attempts is exceeded " + - "[failedJob=" + ctx.getJobResult().getJob() + ", maxFailoverAttempts=" + - getMaximumFailoverAttempts() + ']'); - - return null; - } - - ctx.getJobResult().getJobContext().setAttribute(FAILOVER_NUMBER_ATTR, failoverCnt + 1); - } - - List<ClusterNode> cp = new ArrayList<>(top); - - // Keep collection type. - F.retain(cp, false, new IgnitePredicate<ClusterNode>() { - @Override public boolean apply(ClusterNode node) { - return F.isAll(node, filter); - } - }); - - return super.failover(ctx, cp); //use cp to ensure we don't failover on failed node - } - - /** - * @return Job contexts for failed over jobs. - */ - public Set<ComputeJobContext> getFailedOverJobs() { - return failedOverJobs; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCachePutAllTask.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCachePutAllTask.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCachePutAllTask.java deleted file mode 100644 index 54c87ca..0000000 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCachePutAllTask.java +++ /dev/null @@ -1,132 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.cache; - -import org.apache.ignite.*; -import org.apache.ignite.cache.*; -import org.apache.ignite.cluster.*; -import org.apache.ignite.compute.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.resources.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.jetbrains.annotations.*; - -import java.util.*; - -/** - * Puts all the passed data into partitioned cache in small chunks. - */ -class GridCachePutAllTask extends ComputeTaskAdapter<Collection<Integer>, Void> { - /** Number of entries per put. */ - private static final int TX_BOUND = 30; - - /** Preferred node. */ - private final UUID preferredNode; - - /** Cache name. */ - private final String cacheName; - - /** - * - * @param preferredNode A node that we'd prefer to take from grid. - * @param cacheName A name of the cache to work with. - */ - GridCachePutAllTask(UUID preferredNode, String cacheName) { - this.preferredNode = preferredNode; - this.cacheName = cacheName; - } - - /** {@inheritDoc} */ - @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, - @Nullable final Collection<Integer> data) throws IgniteCheckedException { - assert !subgrid.isEmpty(); - - // Give preference to wanted node. Otherwise, take the first one. - ClusterNode targetNode = F.find(subgrid, subgrid.get(0), new IgnitePredicate<ClusterNode>() { - /** {@inheritDoc} */ - @Override public boolean apply(ClusterNode e) { - return preferredNode.equals(e.id()); - } - }); - - return Collections.singletonMap( - new ComputeJobAdapter() { - @IgniteLoggerResource - private IgniteLogger log; - - @IgniteInstanceResource - private Ignite ignite; - - @Override public Object execute() throws IgniteCheckedException { - log.info("Going to put data: " + data); - - GridCacheProjection<Object, Object> cache = ignite.cache(cacheName); - - assert cache != null; - - HashMap<Integer, Integer> putMap = U.newLinkedHashMap(TX_BOUND); - - Iterator<Integer> it = data.iterator(); - - int cnt = 0; - - while (it.hasNext()) { - Integer val = it.next(); - - putMap.put(val, val); - - if (++cnt == TX_BOUND) { - log.info("Putting keys to cache: " + putMap.keySet()); - - cache.putAll(putMap); - - cnt = 0; - - putMap = U.newLinkedHashMap(TX_BOUND); - } - } - - assert cnt < TX_BOUND; - assert putMap.size() == (data.size() % TX_BOUND) : "putMap.size() = " + putMap.size(); - - log.info("Putting keys to cache: " + putMap.keySet()); - - cache.putAll(putMap); - - log.info("Finished putting data: " + data); - - return data; - } - }, - targetNode); - } - - /** {@inheritDoc} */ - @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) throws IgniteCheckedException { - if (res.getException() != null) - return ComputeJobResultPolicy.FAILOVER; - - return ComputeJobResultPolicy.WAIT; - } - - /** {@inheritDoc} */ - @Nullable @Override public Void reduce(List<ComputeJobResult> results) throws IgniteCheckedException { - return null; - } -}