http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOffHeapTieredSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOffHeapTieredSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOffHeapTieredSelfTest.java new file mode 100644 index 0000000..ac37cf1 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOffHeapTieredSelfTest.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.cache.*; + +import static org.apache.ignite.cache.GridCacheAtomicityMode.*; + +/** + * + */ +public class GridCacheOffHeapTieredSelfTest extends GridCacheOffHeapTieredAbstractSelfTest { + /** {@inheritDoc} */ + @SuppressWarnings("RedundantMethodOverride") + @Override protected GridCacheAtomicityMode atomicityMode() { + return TRANSACTIONAL; + } +}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOffheapUpdateSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOffheapUpdateSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOffheapUpdateSelfTest.java new file mode 100644 index 0000000..0754ec4 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOffheapUpdateSelfTest.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.transactions.*; +import org.apache.ignite.testframework.junits.common.*; + +import static org.apache.ignite.transactions.IgniteTxConcurrency.*; +import static org.apache.ignite.transactions.IgniteTxIsolation.*; + +/** + * Check for specific support issue. + */ +public class GridCacheOffheapUpdateSelfTest extends GridCommonAbstractTest { + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setPeerClassLoadingEnabled(false); + + CacheConfiguration ccfg = new CacheConfiguration(); + + ccfg.setCacheMode(GridCacheMode.PARTITIONED); + ccfg.setDistributionMode(GridCacheDistributionMode.PARTITIONED_ONLY); + ccfg.setAtomicityMode(GridCacheAtomicityMode.TRANSACTIONAL); + ccfg.setOffHeapMaxMemory(0); + ccfg.setMemoryMode(GridCacheMemoryMode.OFFHEAP_TIERED); + + cfg.setCacheConfiguration(ccfg); + + return cfg; + } + + /** + * @throws Exception If failed. + */ + public void testUpdateInPessimisticTxOnRemoteNode() throws Exception { + try { + Ignite ignite = startGrids(2); + + GridCache<Object, Object> rmtCache = ignite.cache(null); + + int key = 0; + + while (!rmtCache.affinity().isPrimary(grid(1).localNode(), key)) + key++; + + GridCache<Object, Object> locCache = grid(1).cache(null); + + try (IgniteTx tx = locCache.txStart(PESSIMISTIC, REPEATABLE_READ)) { + locCache.putxIfAbsent(key, 0); + + tx.commit(); + } + + try (IgniteTx tx = rmtCache.txStart(PESSIMISTIC, REPEATABLE_READ)) { + assertEquals(0, rmtCache.get(key)); + + rmtCache.putx(key, 1); + + tx.commit(); + } + + try (IgniteTx tx = rmtCache.txStart(PESSIMISTIC, REPEATABLE_READ)) { + assertEquals(1, rmtCache.get(key)); + + rmtCache.putx(key, 2); + + tx.commit(); + } + } + finally { + stopAllGrids(); + } + } + + /** + * @throws Exception If failed. + */ + public void testReadEvictedPartition() throws Exception { + try { + Ignite grid = startGrid(0); + + GridCache<Object, Object> cache = grid.cache(null); + + for (int i = 0; i < 30; i++) + cache.put(i, 0); + + startGrid(1); + + awaitPartitionMapExchange(); + + for (int i = 0; i < 30; i++) + grid(1).cache(null).put(i, 10); + + // Find a key that does not belong to started node anymore. + int key = 0; + + ClusterNode locNode = grid.cluster().localNode(); + + for (;key < 30; key++) { + if (!cache.affinity().isPrimary(locNode, key) && !cache.affinity().isBackup(locNode, key)) + break; + } + + assertEquals(10, cache.get(key)); + + try (IgniteTx ignored = cache.txStart(OPTIMISTIC, REPEATABLE_READ)) { + assertEquals(10, cache.get(key)); + } + + try (IgniteTx ignored = cache.txStart(PESSIMISTIC, READ_COMMITTED)) { + assertEquals(10, cache.get(key)); + } + } + finally { + stopAllGrids(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOrderedPreloadingSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOrderedPreloadingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOrderedPreloadingSelfTest.java new file mode 100644 index 0000000..56fc683 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOrderedPreloadingSelfTest.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.internal.util.future.*; +import org.apache.ignite.testframework.junits.common.*; + +import static org.apache.ignite.cache.GridCacheMode.*; +import static org.apache.ignite.cache.GridCachePreloadMode.*; + +/** + * Checks ordered preloading. + */ +public class GridCacheOrderedPreloadingSelfTest extends GridCommonAbstractTest { + /** IP finder. */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** Number of grids in test. */ + private static final int GRID_CNT = 4; + + /** First cache name. */ + public static final String FIRST_CACHE_NAME = "first"; + + /** Second cache name. */ + public static final String SECOND_CACHE_NAME = "second"; + + /** First cache mode. */ + private GridCacheMode firstCacheMode; + + /** Second cache mode. */ + private GridCacheMode secondCacheMode; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setCacheConfiguration( + cacheConfig(firstCacheMode, 1, FIRST_CACHE_NAME), + cacheConfig(secondCacheMode, 2, SECOND_CACHE_NAME)); + + TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); + + discoSpi.setIpFinder(IP_FINDER); + + cfg.setDiscoverySpi(discoSpi); + + return cfg; + } + + /** + * @param cacheMode Cache mode. + * @param preloadOrder Preload order. + * @param name Cache name. + * @return Cache configuration. + */ + private CacheConfiguration cacheConfig(GridCacheMode cacheMode, int preloadOrder, String name) { + CacheConfiguration cfg = defaultCacheConfiguration(); + + cfg.setName(name); + cfg.setCacheMode(cacheMode); + cfg.setPreloadOrder(preloadOrder); + cfg.setPreloadMode(ASYNC); + + return cfg; + } + + /** + * @throws Exception If failed. + */ + public void testPreloadOrderPartitionedPartitioned() throws Exception { + checkPreloadOrder(PARTITIONED, PARTITIONED); + } + + /** + * @throws Exception If failed. + */ + public void testPreloadOrderReplicatedReplicated() throws Exception { + checkPreloadOrder(REPLICATED, REPLICATED); + } + + /** + * @throws Exception If failed. + */ + public void testPreloadOrderPartitionedReplicated() throws Exception { + checkPreloadOrder(PARTITIONED, REPLICATED); + } + + /** + * @throws Exception If failed. + */ + public void testPreloadOrderReplicatedPartitioned() throws Exception { + checkPreloadOrder(REPLICATED, PARTITIONED); + } + + /** + * @param first First cache mode. + * @param second Second cache mode. + * @throws Exception If failed. + */ + private void checkPreloadOrder(GridCacheMode first, GridCacheMode second) throws Exception { + firstCacheMode = first; + secondCacheMode = second; + + Ignite g = startGrid(0); + + try { + GridCache<Object, Object> cache = g.cache("first"); + + // Put some data into cache. + for (int i = 0; i < 1000; i++) + cache.put(i, i); + + for (int i = 1; i < GRID_CNT; i++) + startGrid(i); + + // For first node in topology replicated preloader gets completed right away. + for (int i = 1; i < GRID_CNT; i++) { + GridKernal kernal = (GridKernal)grid(i); + + GridFutureAdapter<?> fut1 = (GridFutureAdapter<?>)kernal.internalCache(FIRST_CACHE_NAME).preloader() + .syncFuture(); + GridFutureAdapter<?> fut2 = (GridFutureAdapter<?>)kernal.internalCache(SECOND_CACHE_NAME).preloader() + .syncFuture(); + + fut1.get(); + fut2.get(); + + assertTrue("[i=" + i + ", fut1=" + fut1 + ", fut2=" + fut2 + ']', fut1.endTime() <= fut2.endTime()); + } + } + finally { + stopAllGrids(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheP2PUndeploySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheP2PUndeploySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheP2PUndeploySelfTest.java new file mode 100644 index 0000000..01a1212 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheP2PUndeploySelfTest.java @@ -0,0 +1,305 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.cache.distributed.*; +import org.apache.ignite.marshaller.jdk.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.spi.swapspace.file.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.configuration.IgniteDeploymentMode.*; +import static org.apache.ignite.cache.GridCacheAtomicityMode.*; +import static org.apache.ignite.cache.GridCacheMode.*; +import static org.apache.ignite.cache.GridCacheDistributionMode.*; +import static org.apache.ignite.cache.GridCachePreloadMode.*; + +/** + * + */ +public class GridCacheP2PUndeploySelfTest extends GridCommonAbstractTest { + /** Test p2p value. */ + private static final String TEST_VALUE = "org.gridgain.grid.tests.p2p.GridCacheDeploymentTestValue3"; + + /** */ + private static final long OFFHEAP = 0;// 4 * 1024 * 1024; + + /** */ + private final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** */ + private final AtomicInteger idxGen = new AtomicInteger(); + + /** */ + private GridCachePreloadMode mode = SYNC; + + /** */ + private boolean offheap; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setNetworkTimeout(2000); + + TcpDiscoverySpi spi = new TcpDiscoverySpi(); + + spi.setIpFinder(ipFinder); + + cfg.setDiscoverySpi(spi); + + cfg.setMarshaller(new IgniteJdkMarshaller()); + + cfg.setSwapSpaceSpi(new FileSwapSpaceSpi()); + + CacheConfiguration repCacheCfg = defaultCacheConfiguration(); + + repCacheCfg.setName("replicated"); + repCacheCfg.setCacheMode(REPLICATED); + repCacheCfg.setPreloadMode(mode); + repCacheCfg.setWriteSynchronizationMode(GridCacheWriteSynchronizationMode.FULL_SYNC); + repCacheCfg.setQueryIndexEnabled(false); + repCacheCfg.setAtomicityMode(TRANSACTIONAL); + + if (offheap) + repCacheCfg.setOffHeapMaxMemory(OFFHEAP); + else + repCacheCfg.setSwapEnabled(true); + + CacheConfiguration partCacheCfg = defaultCacheConfiguration(); + + partCacheCfg.setName("partitioned"); + partCacheCfg.setCacheMode(PARTITIONED); + partCacheCfg.setPreloadMode(mode); + partCacheCfg.setAffinity(new GridCacheModuloAffinityFunction(11, 1)); + partCacheCfg.setWriteSynchronizationMode(GridCacheWriteSynchronizationMode.FULL_SYNC); + partCacheCfg.setEvictNearSynchronized(false); + partCacheCfg.setQueryIndexEnabled(false); + partCacheCfg.setAtomicityMode(TRANSACTIONAL); + partCacheCfg.setDistributionMode(NEAR_PARTITIONED); + + if (offheap) + partCacheCfg.setOffHeapMaxMemory(OFFHEAP); + else + partCacheCfg.setSwapEnabled(true); + + cfg.setCacheConfiguration(repCacheCfg, partCacheCfg); + + cfg.setDeploymentMode(SHARED); + cfg.setPeerClassLoadingLocalClassPathExclude(GridCacheP2PUndeploySelfTest.class.getName()); + + cfg.setUserAttributes(F.asMap(GridCacheModuloAffinityFunction.IDX_ATTR, idxGen.getAndIncrement())); + + return cfg; + } + + /** @throws Exception If failed. */ + public void testSwapP2PReplicated() throws Exception { + offheap = false; + + checkP2PUndeploy("replicated"); + } + + /** @throws Exception If failed. */ + public void testOffHeapP2PReplicated() throws Exception { + offheap = true; + + checkP2PUndeploy("replicated"); + } + + /** @throws Exception If failed. */ + public void testSwapP2PPartitioned() throws Exception { + offheap = false; + + checkP2PUndeploy("partitioned"); + } + + /** @throws Exception If failed. */ + public void testOffheapP2PPartitioned() throws Exception { + offheap = true; + + checkP2PUndeploy("partitioned"); + } + + /** @throws Exception If failed. */ + public void testSwapP2PReplicatedNoPreloading() throws Exception { + mode = NONE; + offheap = false; + + checkP2PUndeploy("replicated"); + } + + /** @throws Exception If failed. */ + public void testOffHeapP2PReplicatedNoPreloading() throws Exception { + mode = NONE; + offheap = true; + + checkP2PUndeploy("replicated"); + } + + /** @throws Exception If failed. */ + public void testSwapP2PPartitionedNoPreloading() throws Exception { + mode = NONE; + offheap = false; + + checkP2PUndeploy("partitioned"); + } + + /** @throws Exception If failed. */ + public void testOffHeapP2PPartitionedNoPreloading() throws Exception { + mode = NONE; + offheap = true; + + checkP2PUndeploy("partitioned"); + } + + /** + * @param cacheName Cache name. + * @param g Grid. + * @return Size. + * @throws IgniteCheckedException If failed. + */ + private long size(String cacheName, GridKernal g) throws IgniteCheckedException { + if (offheap) + return g.cache(cacheName).offHeapEntriesCount(); + + return g.context().swap().swapSize(swapSpaceName(cacheName, g)); + } + + /** + * @param cacheName Cache name. + * @throws Exception If failed. + */ + private void checkP2PUndeploy(String cacheName) throws Exception { + assert !F.isEmpty(cacheName); + + ClassLoader ldr = getExternalClassLoader(); + + Class valCls = ldr.loadClass(TEST_VALUE); + + try { + Ignite ignite1 = startGrid(1); + GridKernal grid2 = (GridKernal)startGrid(2); + + GridCache<Integer, Object> cache1 = ignite1.cache(cacheName); + GridCache<Integer, Object> cache2 = grid2.cache(cacheName); + + Object v1 = valCls.newInstance(); + + cache1.put(1, v1); + cache1.put(2, valCls.newInstance()); + cache1.put(3, valCls.newInstance()); + cache1.put(4, valCls.newInstance()); + + info("Stored value in cache1 [v=" + v1 + ", ldr=" + v1.getClass().getClassLoader() + ']'); + + Object v2 = cache2.get(1); + + assert v2 != null; + + info("Read value from cache2 [v=" + v2 + ", ldr=" + v2.getClass().getClassLoader() + ']'); + + assert v2 != null; + assert v2.toString().equals(v1.toString()); + assert !v2.getClass().getClassLoader().equals(getClass().getClassLoader()); + assert v2.getClass().getClassLoader().getClass().getName().contains("GridDeploymentClassLoader"); + + assert cache2.evict(2); + assert cache2.evict(3); + assert cache2.evict(4); + + long swapSize = size(cacheName, grid2); + + info("Swap size: " + swapSize); + + assert swapSize > 0; + + stopGrid(1); + + assert waitCacheEmpty(cache2, 10000); + + for (int i = 0; i < 3; i++) { + swapSize = size(cacheName, grid2); + + if (swapSize > 0) { + if (i < 2) { + U.warn(log, "Swap size check failed (will retry in 1000 ms): " + swapSize); + + U.sleep(1000); + + continue; + } + + fail("Swap size check failed: " + swapSize); + } + else if (swapSize == 0) + break; + else + assert false : "Negative swap size: " + swapSize; + } + } + finally { + stopAllGrids(); + } + } + + /** + * @param cacheName Cache name. + * @param grid Kernal. + * @return Name for swap space. + */ + private String swapSpaceName(String cacheName, GridKernal grid) { + GridCacheContext<Object, Object> cctx = grid.internalCache(cacheName).context(); + + return CU.swapSpaceName(cctx.isNear() ? cctx.near().dht().context() : cctx); + } + + /** + * @param cache Cache. + * @param timeout Timeout. + * @return {@code True} if success. + * @throws InterruptedException If thread was interrupted. + */ + @SuppressWarnings({"BusyWait"}) + private boolean waitCacheEmpty(GridCacheProjection<Integer, Object> cache, long timeout) + throws InterruptedException { + assert cache != null; + assert timeout >= 0; + + long end = System.currentTimeMillis() + timeout; + + while (end - System.currentTimeMillis() >= 0) { + if (cache.isEmpty()) + return true; + + Thread.sleep(500); + } + + return cache.isEmpty(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedAffinitySpreadTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedAffinitySpreadTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedAffinitySpreadTest.java new file mode 100644 index 0000000..e5c9d21 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedAffinitySpreadTest.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.cache.affinity.consistenthash.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.testframework.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.util.*; + +import static org.apache.ignite.cache.affinity.consistenthash.GridCacheConsistentHashAffinityFunction.*; + +/** + * + */ +public class GridCachePartitionedAffinitySpreadTest extends GridCommonAbstractTest { + /** */ + public static final int NODES_CNT = 50; + + /** + * @throws Exception If failed. + */ + public void testPartitionSpreading() throws Exception { + System.out.printf("%6s, %6s, %6s, %6s, %8s\n", "Nodes", "Reps", "Min", "Max", "Dev"); + + for (int i = 5; i < NODES_CNT; i = i * 3 / 2) { + for (int replicas = 128; replicas <= 4096; replicas*=2) { + Collection<ClusterNode> nodes = createNodes(i, replicas); + + GridCacheConsistentHashAffinityFunction aff = new GridCacheConsistentHashAffinityFunction(false, 10000); + + checkDistribution(aff, nodes); + } + + System.out.println(); + } + } + + /** + * @param nodesCnt Nodes count. + * @param replicas Value of + * @return Collection of test nodes. + */ + private Collection<ClusterNode> createNodes(int nodesCnt, int replicas) { + Collection<ClusterNode> nodes = new ArrayList<>(nodesCnt); + + for (int i = 0; i < nodesCnt; i++) + nodes.add(new TestRichNode(replicas)); + + return nodes; + } + + /** + * @param aff Affinity to check. + * @param nodes Collection of nodes to test on. + */ + private void checkDistribution(GridCacheConsistentHashAffinityFunction aff, Collection<ClusterNode> nodes) { + Map<ClusterNode, Integer> parts = new HashMap<>(nodes.size()); + + for (int part = 0; part < aff.getPartitions(); part++) { + Collection<ClusterNode> affNodes = aff.nodes(part, nodes, 0); + + assertEquals(1, affNodes.size()); + + ClusterNode node = F.first(affNodes); + + parts.put(node, parts.get(node) != null ? parts.get(node) + 1 : 1); + } + + int min = Integer.MAX_VALUE; + int max = Integer.MIN_VALUE; + int total = 0; + + float mean = 0; + float m2 = 0; + int n = 0; + + for (ClusterNode node : nodes) { + int partsCnt = parts.get(node) != null ? parts.get(node) : 0; + + total += partsCnt; + + if (partsCnt < min) + min = partsCnt; + + if (partsCnt > max) + max = partsCnt; + + n++; + float delta = partsCnt - mean; + mean += delta / n; + m2 += delta * (partsCnt - mean); + } + + m2 /= (n - 1); + assertEquals(aff.getPartitions(), total); + + System.out.printf("%6s, %6s, %6s, %6s, %8.4f\n", nodes.size(), + F.first(nodes).attribute(DFLT_REPLICA_COUNT_ATTR_NAME), min, max, Math.sqrt(m2)); + } + + /** + * Rich node stub to use in emulated server topology. + */ + private static class TestRichNode extends GridTestNode { + /** */ + private final UUID nodeId; + + /** */ + private final int replicas; + + /** + * Externalizable class requires public no-arg constructor. + */ + @SuppressWarnings("UnusedDeclaration") + private TestRichNode(int replicas) { + this(UUID.randomUUID(), replicas); + } + + /** + * Constructs rich node stub to use in emulated server topology. + * + * @param nodeId Node id. + */ + private TestRichNode(UUID nodeId, int replicas) { + this.nodeId = nodeId; + this.replicas = replicas; + } + + /** + * Unused constructor for externalizable support. + */ + public TestRichNode() { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public UUID id() { + return nodeId; + } + + /** {@inheritDoc} */ + @Override public <T> T attribute(String name) { + if (DFLT_REPLICA_COUNT_ATTR_NAME.equals(name)) + return (T)new Integer(replicas); + + return super.attribute(name); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedGetSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedGetSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedGetSelfTest.java new file mode 100644 index 0000000..5d51126 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedGetSelfTest.java @@ -0,0 +1,252 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.managers.communication.*; +import org.apache.ignite.internal.processors.cache.distributed.near.*; +import org.apache.ignite.spi.discovery.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.util.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.cache.GridCacheMode.*; +import static org.apache.ignite.cache.GridCacheDistributionMode.*; +import static org.apache.ignite.cache.GridCachePreloadMode.*; +import static org.apache.ignite.cache.GridCacheWriteSynchronizationMode.*; +import static org.apache.ignite.internal.GridTopic.*; + +/** + * + */ +public class GridCachePartitionedGetSelfTest extends GridCommonAbstractTest { + /** */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static final int GRID_CNT = 3; + + /** */ + private static final String KEY = "key"; + + /** */ + private static final int VAL = 1; + + /** */ + private static final AtomicBoolean received = new AtomicBoolean(); + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setDiscoverySpi(discoverySpi()); + cfg.setCacheConfiguration(cacheConfiguration()); + + return cfg; + } + + /** + * @return Discovery SPI; + */ + private DiscoverySpi discoverySpi() { + TcpDiscoverySpi spi = new TcpDiscoverySpi(); + + spi.setIpFinder(IP_FINDER); + + return spi; + } + + /** + * @return Cache configuration. + */ + private CacheConfiguration cacheConfiguration() { + CacheConfiguration cc = defaultCacheConfiguration(); + + cc.setCacheMode(PARTITIONED); + cc.setBackups(1); + cc.setPreloadMode(SYNC); + cc.setWriteSynchronizationMode(FULL_SYNC); + cc.setSwapEnabled(true); + cc.setEvictNearSynchronized(false); + cc.setEvictSynchronized(false); + cc.setDistributionMode(PARTITIONED_ONLY); + + return cc; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + startGridsMultiThreaded(GRID_CNT); + + prepare(); + } + + @Override protected void beforeTest() throws Exception { + received.set(false); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + } + + /** + * @throws Exception If failed. + */ + public void testGetFromPrimaryNode() throws Exception { + for (int i = 0; i < GRID_CNT; i++) { + GridCache<String, Integer> c = grid(i).cache(null); + + GridCacheEntry<String, Integer> e = c.entry(KEY); + + if (e.primary()) { + info("Primary node: " + grid(i).localNode().id()); + + c.get(KEY); + + break; + } + } + + assert !await(); + } + + /** + * @throws Exception If failed. + */ + public void testGetFromBackupNode() throws Exception { + for (int i = 0; i < GRID_CNT; i++) { + GridCache<String, Integer> c = grid(i).cache(null); + + GridCacheEntry<String, Integer> e = c.entry(KEY); + + if (e.backup()) { + info("Backup node: " + grid(i).localNode().id()); + + Integer val = c.get(KEY); + + assert val != null && val == 1; + + assert !await(); + + assert c.evict(KEY); + + assert c.peek(KEY) == null; + + val = c.get(KEY); + + assert val != null && val == 1; + + assert !await(); + + break; + } + } + } + + /** + * @throws Exception If failed. + */ + public void testGetFromNearNode() throws Exception { + for (int i = 0; i < GRID_CNT; i++) { + GridCache<String, Integer> c = grid(i).cache(null); + + GridCacheEntry<String, Integer> e = c.entry(KEY); + + if (!e.primary() && !e.backup()) { + info("Near node: " + grid(i).localNode().id()); + + Integer val = c.get(KEY); + + assert val != null && val == 1; + + break; + } + } + + assert await(); + } + + /** + * @return {@code True} if awaited message. + * @throws Exception If failed. + */ + @SuppressWarnings({"BusyWait"}) + private boolean await() throws Exception { + info("Checking flag: " + System.identityHashCode(received)); + + for (int i = 0; i < 3; i++) { + if (received.get()) + return true; + + info("Flag is false."); + + Thread.sleep(500); + } + + return received.get(); + } + + /** + * Puts value to primary node and registers listener + * that sets {@link #received} flag to {@code true} + * if {@link GridNearGetRequest} was received on primary node. + * + * @throws Exception If failed. + */ + @SuppressWarnings("deprecation") + private void prepare() throws Exception { + for (int i = 0; i < GRID_CNT; i++) { + Ignite g = grid(i); + + GridCacheEntry<String, Integer> e = g.<String, Integer>cache(null).entry(KEY); + + if (e.primary()) { + info("Primary node: " + g.cluster().localNode().id()); + + // Put value. + g.cache(null).put(KEY, VAL); + + // Register listener. + ((GridKernal)g).context().io().addMessageListener( + TOPIC_CACHE, + new GridMessageListener() { + @Override public void onMessage(UUID nodeId, Object msg) { + info("Received message from node [nodeId=" + nodeId + ", msg=" + msg + ']'); + + if (msg instanceof GridNearGetRequest) { + info("Setting flag: " + System.identityHashCode(received)); + + received.set(true); + } + } + } + ); + + break; + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedProjectionAffinitySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedProjectionAffinitySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedProjectionAffinitySelfTest.java new file mode 100644 index 0000000..7cd7102 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedProjectionAffinitySelfTest.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.testframework.*; +import org.apache.ignite.testframework.junits.common.*; + +import static org.apache.ignite.cache.GridCacheAtomicityMode.*; +import static org.apache.ignite.cache.GridCacheMode.*; +import static org.apache.ignite.cache.GridCacheDistributionMode.*; +import static org.apache.ignite.cache.GridCachePreloadMode.*; + +/** + * Partitioned affinity test for projections. + */ +@SuppressWarnings({"PointlessArithmeticExpression"}) +public class GridCachePartitionedProjectionAffinitySelfTest extends GridCommonAbstractTest { + /** Backup count. */ + private static final int BACKUPS = 1; + + /** Grid count. */ + private static final int GRIDS = 3; + + /** */ + private TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + CacheConfiguration cacheCfg = defaultCacheConfiguration(); + + cacheCfg.setCacheMode(PARTITIONED); + cacheCfg.setBackups(BACKUPS); + cacheCfg.setWriteSynchronizationMode(GridCacheWriteSynchronizationMode.FULL_SYNC); + cacheCfg.setPreloadMode(SYNC); + cacheCfg.setAtomicityMode(TRANSACTIONAL); + cacheCfg.setDistributionMode(NEAR_PARTITIONED); + + cfg.setCacheConfiguration(cacheCfg); + + TcpDiscoverySpi spi = new TcpDiscoverySpi(); + + spi.setIpFinder(ipFinder); + + cfg.setDiscoverySpi(spi); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + startGrids(GRIDS); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + } + + /** @throws Exception If failed. */ + public void testAffinity() throws Exception { + waitTopologyUpdate(); + + Ignite g0 = grid(0); + Ignite g1 = grid(1); + + for (int i = 0; i < 100; i++) + assertEquals(g0.cluster().mapKeyToNode(null, i).id(), g1.cluster().mapKeyToNode(null, i).id()); + } + + /** @throws Exception If failed. */ + @SuppressWarnings("deprecation") + public void testProjectionAffinity() throws Exception { + waitTopologyUpdate(); + + Ignite g0 = grid(0); + Ignite g1 = grid(1); + + ClusterGroup g0Pinned = g0.cluster().forNodeIds(F.asList(g0.cluster().localNode().id())); + + ClusterGroup g01Pinned = + g1.cluster().forNodeIds(F.asList(g0.cluster().localNode().id(), g1.cluster().localNode().id())); + + for (int i = 0; i < 100; i++) + assertEquals(g0Pinned.ignite().cluster().mapKeyToNode(null, i).id(), + g01Pinned.ignite().cluster().mapKeyToNode(null, i).id()); + } + + /** @throws Exception If failed. */ + @SuppressWarnings("BusyWait") + private void waitTopologyUpdate() throws Exception { + GridTestUtils.waitTopologyUpdate(null, BACKUPS, log()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedWritesTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedWritesTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedWritesTest.java new file mode 100644 index 0000000..57807e4 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedWritesTest.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.store.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.transactions.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.testframework.junits.common.*; + +import javax.cache.*; +import javax.cache.configuration.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.cache.GridCacheAtomicityMode.*; +import static org.apache.ignite.cache.GridCacheDistributionMode.*; + +/** + * Test that in {@link GridCacheMode#PARTITIONED} mode cache writes values only to the near cache store. <p/> This check + * is needed because in current implementation if {@link GridCacheWriteBehindStore} assumes that and user store is + * wrapped only in near cache (see {@link GridCacheProcessor} init logic). + */ +@SuppressWarnings({"unchecked"}) +public class GridCachePartitionedWritesTest extends GridCommonAbstractTest { + /** Cache store. */ + private CacheStore store; + + /** {@inheritDoc} */ + @Override protected final IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration c = super.getConfiguration(gridName); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(new TcpDiscoveryVmIpFinder(true)); + + c.setDiscoverySpi(disco); + + CacheConfiguration cc = defaultCacheConfiguration(); + + cc.setCacheMode(GridCacheMode.PARTITIONED); + cc.setWriteSynchronizationMode(GridCacheWriteSynchronizationMode.FULL_SYNC); + cc.setSwapEnabled(false); + cc.setAtomicityMode(TRANSACTIONAL); + cc.setDistributionMode(NEAR_PARTITIONED); + + assert store != null; + + cc.setCacheStoreFactory(new FactoryBuilder.SingletonFactory(store)); + cc.setReadThrough(true); + cc.setWriteThrough(true); + cc.setLoadPreviousValue(true); + + c.setCacheConfiguration(cc); + + return c; + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + store = null; + + super.afterTest(); + } + + /** @throws Exception If test fails. */ + public void testWrite() throws Exception { + final AtomicInteger putCnt = new AtomicInteger(); + final AtomicInteger rmvCnt = new AtomicInteger(); + + store = new CacheStoreAdapter<Object, Object>() { + @Override public Object load(Object key) { + info(">>> Get [key=" + key + ']'); + + return null; + } + + @Override public void write(Cache.Entry<? extends Object, ? extends Object> entry) { + putCnt.incrementAndGet(); + } + + @Override public void delete(Object key) { + rmvCnt.incrementAndGet(); + } + }; + + startGrid(); + + GridCache<Integer, String> cache = cache(); + + try { + cache.get(1); + + IgniteTx tx = cache.txStart(); + + try { + for (int i = 1; i <= 10; i++) + cache.putx(i, Integer.toString(i)); + + tx.commit(); + } + finally { + tx.close(); + } + + assert cache.size() == 10; + + assert putCnt.get() == 10; + + tx = cache.txStart(); + + try { + for (int i = 1; i <= 10; i++) { + String val = cache.remove(i); + + assert val != null; + assert val.equals(Integer.toString(i)); + } + + tx.commit(); + } + finally { + tx.close(); + } + + assert rmvCnt.get() == 10; + } + finally { + stopGrid(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePreloadingEvictionsSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePreloadingEvictionsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePreloadingEvictionsSelfTest.java new file mode 100644 index 0000000..7382503 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePreloadingEvictionsSelfTest.java @@ -0,0 +1,262 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.events.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.processors.cache.distributed.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.testframework.*; +import org.apache.ignite.testframework.junits.common.*; +import org.jetbrains.annotations.*; + +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.cache.GridCacheAtomicityMode.*; +import static org.apache.ignite.cache.GridCacheMode.*; +import static org.apache.ignite.cache.GridCacheDistributionMode.*; +import static org.apache.ignite.cache.GridCachePreloadMode.*; +import static org.apache.ignite.cache.GridCacheWriteSynchronizationMode.*; +import static org.apache.ignite.events.IgniteEventType.*; + +/** + * + */ +public class GridCachePreloadingEvictionsSelfTest extends GridCommonAbstractTest { + /** */ + private static final String VALUE = createValue(); + + /** */ + private final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** */ + private final AtomicInteger idxGen = new AtomicInteger(); + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + TcpDiscoverySpi spi = new TcpDiscoverySpi(); + + spi.setIpFinder(ipFinder); + + cfg.setDiscoverySpi(spi); + + CacheConfiguration partCacheCfg = defaultCacheConfiguration(); + + partCacheCfg.setCacheMode(PARTITIONED); + partCacheCfg.setAffinity(new GridCacheModuloAffinityFunction(1, 1)); + partCacheCfg.setWriteSynchronizationMode(FULL_SYNC); + partCacheCfg.setDistributionMode(PARTITIONED_ONLY); + partCacheCfg.setEvictSynchronized(true); + partCacheCfg.setSwapEnabled(false); + partCacheCfg.setEvictionPolicy(null); + partCacheCfg.setEvictSynchronizedKeyBufferSize(25); + partCacheCfg.setEvictMaxOverflowRatio(0.99f); + partCacheCfg.setPreloadMode(ASYNC); + partCacheCfg.setAtomicityMode(TRANSACTIONAL); + + // This test requires artificial slowing down of the preloading. + partCacheCfg.setPreloadThrottle(2000); + + cfg.setCacheConfiguration(partCacheCfg); + + cfg.setUserAttributes(F.asMap(GridCacheModuloAffinityFunction.IDX_ATTR, idxGen.getAndIncrement())); + + cfg.setNetworkTimeout(60000); + + return cfg; + } + + /** + * @throws Exception If failed. + */ + @SuppressWarnings("BusyWait") + public void testEvictions() throws Exception { + try { + final Ignite ignite1 = startGrid(1); + + GridCache<Integer, Object> cache1 = ignite1.cache(null); + + for (int i = 0; i < 5000; i++) + cache1.put(i, VALUE + i); + + info("Finished data population."); + + final AtomicBoolean done = new AtomicBoolean(); + + final CountDownLatch startLatch = new CountDownLatch(1); + + int oldSize = cache1.size(); + + IgniteFuture fut = multithreadedAsync( + new Callable<Object>() { + @Nullable @Override public Object call() throws Exception { + startLatch.await(); + + info("Started evicting..."); + + for (int i = 0; i < 3000 && !done.get(); i++) { + GridCacheEntry<Integer, Object> entry = randomEntry(ignite1); + + if (entry != null) + entry.evict(); + else + info("Entry is null."); + } + + info("Finished evicting."); + + return null; + } + }, + 1); + + ignite1.events().localListen( + new IgnitePredicate<IgniteEvent>() { + @Override public boolean apply(IgniteEvent evt) { + startLatch.countDown(); + + return true; + } + }, + EVT_NODE_JOINED); + + final Ignite ignite2 = startGrid(2); + + done.set(true); + + fut.get(); + + sleepUntilCashesEqualize(ignite1, ignite2, oldSize); + + checkCachesConsistency(ignite1, ignite2); + + oldSize = cache1.size(); + + info("Evicting on constant topology."); + + for (int i = 0; i < 1000; i++) { + GridCacheEntry<Integer, Object> entry = randomEntry(ignite1); + + if (entry != null) + entry.evict(); + else + info("Entry is null."); + } + + sleepUntilCashesEqualize(ignite1, ignite2, oldSize); + + checkCachesConsistency(ignite1, ignite2); + } + finally { + stopAllGrids(); + } + } + + /** + * Waits until cache stabilizes on new value. + * + * @param ignite1 Grid 1. + * @param ignite2 Grid 2. + * @param oldSize Old size, stable size should be . + * @throws org.apache.ignite.IgniteInterruptedException If interrupted. + */ + private void sleepUntilCashesEqualize(final Ignite ignite1, final Ignite ignite2, final int oldSize) + throws IgniteInterruptedException { + info("Sleeping..."); + + assertTrue(GridTestUtils.waitForCondition(new PA() { + @Override public boolean apply() { + int size1 = ignite1.cache(null).size(); + return size1 != oldSize && size1 == ignite2.cache(null).size(); + } + }, getTestTimeout())); + + info("Sleep finished."); + } + + /** + * @param g Grid. + * @return Random entry from cache. + */ + @Nullable private GridCacheEntry<Integer, Object> randomEntry(Ignite g) { + GridKernal g1 = (GridKernal)g; + + return g1.<Integer, Object>internalCache().randomEntry(); + } + + /** + * @param ignite1 Grid 1. + * @param ignite2 Grid 2. + * @throws Exception If failed. + */ + private void checkCachesConsistency(Ignite ignite1, Ignite ignite2) throws Exception { + GridKernal g1 = (GridKernal) ignite1; + GridKernal g2 = (GridKernal) ignite2; + + GridCacheAdapter<Integer, Object> cache1 = g1.internalCache(); + GridCacheAdapter<Integer, Object> cache2 = g2.internalCache(); + + for (int i = 0; i < 3; i++) { + if (cache1.size() != cache2.size()) { + U.warn(log, "Sizes do not match (will retry in 1000 ms) [s1=" + cache1.size() + + ", s2=" + cache2.size() + ']'); + + U.sleep(1000); + } + else + break; + } + + info("Cache1 size: " + cache1.size()); + info("Cache2 size: " + cache2.size()); + + assert cache1.size() == cache2.size() : "Sizes do not match [s1=" + cache1.size() + + ", s2=" + cache2.size() + ']'; + + for (Integer key : cache1.keySet()) { + Object e = cache1.peek(key); + + if (e != null) + assert cache2.containsKey(key, null) : "Cache2 does not contain key: " + key; + } + } + + /** + * @return Large value for test. + */ + private static String createValue() { + SB sb = new SB(1024); + + for (int i = 0; i < 64; i++) + sb.a("val1"); + + return sb.toString(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePutAllFailoverSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePutAllFailoverSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePutAllFailoverSelfTest.java new file mode 100644 index 0000000..b311058 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePutAllFailoverSelfTest.java @@ -0,0 +1,707 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import com.google.common.collect.*; +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.resources.*; +import org.apache.ignite.spi.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.spi.failover.*; +import org.apache.ignite.spi.failover.always.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.cache.GridCacheDistributionMode.*; +import static org.apache.ignite.cache.GridCacheWriteSynchronizationMode.*; + +/** + * Tests putAll() method along with failover and different configurations. + */ +public class GridCachePutAllFailoverSelfTest extends GridCommonAbstractTest { + /** IP finder. */ + private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** Size of the test map. */ + private static final int TEST_MAP_SIZE = 100000; + + /** Cache name. */ + private static final String CACHE_NAME = "partitioned"; + + /** Size of data chunk, sent to a remote node. */ + private static final int DATA_CHUNK_SIZE = 1000; + + /** Number of chunk on which to fail worker node. */ + public static final int FAIL_ON_CHUNK_NO = (TEST_MAP_SIZE / DATA_CHUNK_SIZE) / 3; + + /** Await timeout in seconds. */ + public static final int AWAIT_TIMEOUT_SEC = 65; + + /** */ + private static final int FAILOVER_PUSH_GAP = 30; + + /** Master node name. */ + private static final String MASTER = "master"; + + /** Near enabled flag. */ + private boolean nearEnabled; + + /** Backups count. */ + private int backups; + + /** Filter to include only worker nodes. */ + private static final IgnitePredicate<ClusterNode> workerNodesFilter = new PN() { + @SuppressWarnings("unchecked") + @Override public boolean apply(ClusterNode n) { + return "worker".equals(n.attribute("segment")); + } + }; + + /** + * Result future queue (restrict the queue size + * to 50 in order to prevent in-memory data grid from over loading). + */ + private final BlockingQueue<ComputeTaskFuture<?>> resQueue = new LinkedBlockingQueue<>(50); + + /** Test failover SPI. */ + private MasterFailoverSpi failoverSpi = new MasterFailoverSpi((IgnitePredicate)workerNodesFilter); + + /** + * @throws Exception If failed. + */ + public void testPutAllFailoverColocatedNearEnabledThreeBackups() throws Exception { + checkPutAllFailoverColocated(true, 7, 3); + } + + /** + * @throws Exception If failed. + */ + public void testPutAllFailoverColocatedNearDisabledThreeBackups() throws Exception { + checkPutAllFailoverColocated(false, 7, 3); + } + + /** + * @throws Exception If failed. + */ + public void testPutAllFailoverNearEnabledOneBackup() throws Exception { + checkPutAllFailover(true, 3, 1); + } + + /** + * @throws Exception If failed. + */ + public void testPutAllFailoverNearDisabledOneBackup() throws Exception { + checkPutAllFailover(false, 3, 1); + } + + /** + * @throws Exception If failed. + */ + public void testPutAllFailoverNearEnabledTwoBackups() throws Exception { + checkPutAllFailover(true, 5, 2); + } + + /** + * @throws Exception If failed. + */ + public void testPutAllFailoverNearDisabledTwoBackups() throws Exception { + checkPutAllFailover(false, 5, 2); + } + + /** + * @throws Exception If failed. + */ + public void testPutAllFailoverNearEnabledThreeBackups() throws Exception { + checkPutAllFailover(true, 7, 3); + } + + /** + * @throws Exception If failed. + */ + public void testPutAllFailoverNearDisabledThreeBackups() throws Exception { + checkPutAllFailover(false, 7, 3); + } + + /** + * @throws Exception If failed. + */ + public void testPutAllFailoverColocatedNearEnabledOneBackup() throws Exception { + checkPutAllFailoverColocated(true, 3, 1); + } + + /** + * @throws Exception If failed. + */ + public void testPutAllFailoverColocatedNearDisabledOneBackup() throws Exception { + checkPutAllFailoverColocated(false, 3, 1); + } + + /** + * @throws Exception If failed. + */ + public void testPutAllFailoverColocatedNearEnabledTwoBackups() throws Exception { + checkPutAllFailoverColocated(true, 5, 2); + } + + /** + * @throws Exception If failed. + */ + public void testPutAllFailoverColocatedNearDisabledTwoBackups() throws Exception { + checkPutAllFailoverColocated(false, 5, 2); + } + + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return super.getTestTimeout() * 5; + } + + /** + * Tests putAll() method along with failover and cache backup. + * + * Checks that the resulting primary cache size is the same as + * expected. + * + * @param near Near enabled. + * @param workerCnt Worker count. + * @param shutdownCnt Shutdown count. + * @throws Exception If failed. + */ + public void checkPutAllFailover(boolean near, int workerCnt, int shutdownCnt) throws Exception { + nearEnabled = near; + backups = shutdownCnt; + + Collection<Integer> testKeys = generateTestKeys(); + + final Ignite master = startGrid(MASTER); + + List<Ignite> workers = new ArrayList<>(workerCnt); + + for (int i = 1; i <= workerCnt; i++) + workers.add(startGrid("worker" + i)); + + info("Master: " + master.cluster().localNode().id()); + + List<Ignite> runningWorkers = new ArrayList<>(workerCnt); + + for (int i = 1; i <= workerCnt; i++) { + UUID id = workers.get(i - 1).cluster().localNode().id(); + + info(String.format("Worker%d - %s", i, id)); + + runningWorkers.add(workers.get(i - 1)); + } + + try { + // Dummy call to fetch affinity function from remote node + master.cluster().mapKeyToNode(CACHE_NAME, "Dummy"); + + Random rnd = new Random(); + + Collection<Integer> dataChunk = new ArrayList<>(DATA_CHUNK_SIZE); + int entryCntr = 0; + int chunkCntr = 0; + final AtomicBoolean jobFailed = new AtomicBoolean(false); + + int failoverPushGap = 0; + + final CountDownLatch emptyLatch = new CountDownLatch(1); + + final AtomicBoolean inputExhausted = new AtomicBoolean(); + + IgniteCompute comp = compute(master.cluster().forPredicate(workerNodesFilter)).enableAsync(); + + for (Integer key : testKeys) { + dataChunk.add(key); + entryCntr++; + + if (entryCntr == DATA_CHUNK_SIZE) { // time to send data + chunkCntr++; + + assert dataChunk.size() == DATA_CHUNK_SIZE; + + log.info("Pushing data chunk [chunkNo=" + chunkCntr + "]"); + + comp.execute( + new GridCachePutAllTask( + runningWorkers.get(rnd.nextInt(runningWorkers.size())).cluster().localNode().id(), + CACHE_NAME), + dataChunk); + + ComputeTaskFuture<Void> fut = comp.future(); + + resQueue.put(fut); // Blocks if queue is full. + + fut.listenAsync(new CI1<IgniteFuture<Void>>() { + @Override public void apply(IgniteFuture<Void> f) { + ComputeTaskFuture<?> taskFut = (ComputeTaskFuture<?>)f; + + try { + taskFut.get(); //if something went wrong - we'll get exception here + } + catch (IgniteCheckedException e) { + log.error("Job failed", e); + + jobFailed.set(true); + } + + // Remove complete future from queue to allow other jobs to proceed. + resQueue.remove(taskFut); + + if (inputExhausted.get() && resQueue.isEmpty()) + emptyLatch.countDown(); + } + }); + + entryCntr = 0; + dataChunk = new ArrayList<>(DATA_CHUNK_SIZE); + + if (chunkCntr >= FAIL_ON_CHUNK_NO) { + if (workerCnt - runningWorkers.size() < shutdownCnt) { + if (failoverPushGap > 0) + failoverPushGap--; + else { + Ignite victim = runningWorkers.remove(0); + + info("Shutting down node: " + victim.cluster().localNode().id()); + + stopGrid(victim.name()); + + // Fail next node after some jobs have been pushed. + failoverPushGap = FAILOVER_PUSH_GAP; + } + } + } + } + } + + inputExhausted.set(true); + + if (resQueue.isEmpty()) + emptyLatch.countDown(); + + assert chunkCntr == TEST_MAP_SIZE / DATA_CHUNK_SIZE; + + // Wait for queue to empty. + log.info("Waiting for empty queue..."); + + boolean failedWait = false; + + if (!emptyLatch.await(AWAIT_TIMEOUT_SEC, TimeUnit.SECONDS)) { + info(">>> Failed to wait for queue to empty."); + + failedWait = true; + } + + if (!failedWait) + assertFalse("One or more jobs have failed.", jobFailed.get()); + + Collection<Integer> absentKeys = findAbsentKeys(runningWorkers.get(0), testKeys); + + if (!failedWait && !absentKeys.isEmpty()) { + // Give some time to preloader. + U.sleep(15000); + + absentKeys = findAbsentKeys(runningWorkers.get(0), testKeys); + } + + info(">>> Absent keys: " + absentKeys); + + assertTrue(absentKeys.isEmpty()); + + // Actual primary cache size. + int primaryCacheSize = 0; + + for (Ignite g : runningWorkers) { + info(">>>>> " + g.cache(CACHE_NAME).size()); + + primaryCacheSize += g.cache(CACHE_NAME).primarySize(); + } + + assertEquals(TEST_MAP_SIZE, primaryCacheSize); + } + finally { + stopAllGrids(); + } + } + + /** + * Tests putAll() method along with failover and cache backup. + * + * Checks that the resulting primary cache size is the same as + * expected. + * + * @param near Near enabled. + * @param workerCnt Worker count. + * @param shutdownCnt Shutdown count. + * @throws Exception If failed. + */ + public void checkPutAllFailoverColocated(boolean near, int workerCnt, int shutdownCnt) throws Exception { + nearEnabled = near; + backups = shutdownCnt; + + Collection<Integer> testKeys = generateTestKeys(); + + final Ignite master = startGrid(MASTER); + + List<Ignite> workers = new ArrayList<>(workerCnt); + + for (int i = 1; i <= workerCnt; i++) + workers.add(startGrid("worker" + i)); + + info("Master: " + master.cluster().localNode().id()); + + List<Ignite> runningWorkers = new ArrayList<>(workerCnt); + + for (int i = 1; i <= workerCnt; i++) { + UUID id = workers.get(i - 1).cluster().localNode().id(); + + info(String.format("Worker%d: %s", i, id)); + + runningWorkers.add(workers.get(i - 1)); + } + + try { + // Dummy call to fetch affinity function from remote node + master.cluster().mapKeyToNode(CACHE_NAME, "Dummy"); + + Map<UUID, Collection<Integer>> dataChunks = new HashMap<>(); + + int chunkCntr = 0; + final AtomicBoolean jobFailed = new AtomicBoolean(false); + + int failoverPushGap = 0; + + final CountDownLatch emptyLatch = new CountDownLatch(1); + + final AtomicBoolean inputExhausted = new AtomicBoolean(); + + IgniteCompute comp = compute(master.cluster().forPredicate(workerNodesFilter)).enableAsync(); + + for (Integer key : testKeys) { + ClusterNode mappedNode = master.cluster().mapKeyToNode(CACHE_NAME, key); + + UUID nodeId = mappedNode.id(); + + Collection<Integer> data = dataChunks.get(nodeId); + + if (data == null) { + data = new ArrayList<>(DATA_CHUNK_SIZE); + + dataChunks.put(nodeId, data); + } + + data.add(key); + + if (data.size() == DATA_CHUNK_SIZE) { // time to send data + chunkCntr++; + + log.info("Pushing data chunk [chunkNo=" + chunkCntr + "]"); + + comp.execute(new GridCachePutAllTask(nodeId, CACHE_NAME), data); + + ComputeTaskFuture<Void> fut = comp.future(); + + resQueue.put(fut); // Blocks if queue is full. + + fut.listenAsync(new CI1<IgniteFuture<Void>>() { + @Override public void apply(IgniteFuture<Void> f) { + ComputeTaskFuture<?> taskFut = (ComputeTaskFuture<?>)f; + + try { + taskFut.get(); //if something went wrong - we'll get exception here + } + catch (IgniteCheckedException e) { + log.error("Job failed", e); + + jobFailed.set(true); + } + + // Remove complete future from queue to allow other jobs to proceed. + resQueue.remove(taskFut); + + if (inputExhausted.get() && resQueue.isEmpty()) + emptyLatch.countDown(); + } + }); + + data = new ArrayList<>(DATA_CHUNK_SIZE); + + dataChunks.put(nodeId, data); + + if (chunkCntr >= FAIL_ON_CHUNK_NO) { + if (workerCnt - runningWorkers.size() < shutdownCnt) { + if (failoverPushGap > 0) + failoverPushGap--; + else { + Ignite victim = runningWorkers.remove(0); + + info("Shutting down node: " + victim.cluster().localNode().id()); + + stopGrid(victim.name()); + + // Fail next node after some jobs have been pushed. + failoverPushGap = FAILOVER_PUSH_GAP; + } + } + } + } + } + + for (Map.Entry<UUID, Collection<Integer>> entry : dataChunks.entrySet()) { + comp.execute(new GridCachePutAllTask(entry.getKey(), CACHE_NAME), entry.getValue()); + + ComputeTaskFuture<Void> fut = comp.future(); + + resQueue.put(fut); // Blocks if queue is full. + + fut.listenAsync(new CI1<IgniteFuture<Void>>() { + @Override public void apply(IgniteFuture<Void> f) { + ComputeTaskFuture<?> taskFut = (ComputeTaskFuture<?>)f; + + try { + taskFut.get(); //if something went wrong - we'll get exception here + } + catch (IgniteCheckedException e) { + log.error("Job failed", e); + + jobFailed.set(true); + } + + // Remove complete future from queue to allow other jobs to proceed. + resQueue.remove(taskFut); + + if (inputExhausted.get() && resQueue.isEmpty()) + emptyLatch.countDown(); + } + }); + } + + inputExhausted.set(true); + + if (resQueue.isEmpty()) + emptyLatch.countDown(); + + // Wait for queue to empty. + log.info("Waiting for empty queue..."); + + boolean failedWait = false; + + if (!emptyLatch.await(AWAIT_TIMEOUT_SEC, TimeUnit.SECONDS)) { + info(">>> Failed to wait for queue to empty."); + + failedWait = true; + } + + if (!failedWait) + assertFalse("One or more jobs have failed.", jobFailed.get()); + + Collection<Integer> absentKeys = findAbsentKeys(runningWorkers.get(0), testKeys); + + if (!failedWait && !absentKeys.isEmpty()) { + // Give some time to preloader. + U.sleep(15000); + + absentKeys = findAbsentKeys(runningWorkers.get(0), testKeys); + } + + info(">>> Absent keys: " + absentKeys); + + assertTrue(absentKeys.isEmpty()); + + // Actual primary cache size. + int primaryCacheSize = 0; + + for (Ignite g : runningWorkers) { + info(">>>>> " + g.cache(CACHE_NAME).size()); + + primaryCacheSize += g.cache(CACHE_NAME).primarySize(); + } + + assertEquals(TEST_MAP_SIZE, primaryCacheSize); + } + finally { + stopAllGrids(); + } + } + + /** + * Tries to find keys, that are absent in cache. + * + * @param workerNode Worker node. + * @param keys Keys that are suspected to be absent + * @return List of absent keys. If no keys are absent, the list is empty. + * @throws IgniteCheckedException If error occurs. + */ + private Collection<Integer> findAbsentKeys(Ignite workerNode, + Collection<Integer> keys) throws IgniteCheckedException { + + Collection<Integer> ret = new ArrayList<>(keys.size()); + + GridCache<Object, Object> cache = workerNode.cache(CACHE_NAME); + + for (Integer key : keys) { + if (cache.get(key) == null) // Key is absent. + ret.add(key); + } + + return ret; + } + + /** + * Generates a test keys collection. + * + * @return A test keys collection. + */ + private Collection<Integer> generateTestKeys() { + Collection<Integer> ret = new ArrayList<>(TEST_MAP_SIZE); + + for (int i = 0; i < TEST_MAP_SIZE; i++) + ret.add(i); + + return ret; + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setPeerClassLoadingEnabled(false); + + cfg.setDeploymentMode(IgniteDeploymentMode.CONTINUOUS); + + TcpDiscoverySpi discoverySpi = new TcpDiscoverySpi(); + + discoverySpi.setAckTimeout(60000); + discoverySpi.setIpFinder(ipFinder); + + cfg.setDiscoverySpi(discoverySpi); + + if (gridName.startsWith("master")) { + cfg.setUserAttributes(ImmutableMap.of("segment", "master")); + + // For sure. + failoverSpi.setMaximumFailoverAttempts(50); + + cfg.setFailoverSpi(failoverSpi); + } + else if (gridName.startsWith("worker")) { + cfg.setUserAttributes(ImmutableMap.of("segment", "worker")); + + CacheConfiguration cacheCfg = defaultCacheConfiguration(); + cacheCfg.setName("partitioned"); + cacheCfg.setCacheMode(GridCacheMode.PARTITIONED); + cacheCfg.setStartSize(4500000); + + cacheCfg.setBackups(backups); + + cacheCfg.setStoreValueBytes(true); + cacheCfg.setDistributionMode(nearEnabled ? NEAR_PARTITIONED : PARTITIONED_ONLY); + cacheCfg.setQueryIndexEnabled(false); + + cacheCfg.setWriteSynchronizationMode(FULL_SYNC); + + cfg.setCacheConfiguration(cacheCfg); + } + else + throw new IllegalStateException("Unexpected grid name: " + gridName); + + return cfg; + } + + /** + * Test failover SPI for master node. + */ + @IgniteSpiConsistencyChecked(optional = true) + private static class MasterFailoverSpi extends AlwaysFailoverSpi { + /** */ + private static final String FAILOVER_NUMBER_ATTR = "failover:number:attr"; + + /** */ + private Set<ComputeJobContext> failedOverJobs = new HashSet<>(); + + /** Node filter. */ + private IgnitePredicate<? super ClusterNode>[] filter; + + /** */ + @IgniteLoggerResource + private IgniteLogger log; + + /** + * @param filter Filter. + */ + MasterFailoverSpi(IgnitePredicate<? super ClusterNode>... filter) { + this.filter = filter; + } + + /** {@inheritDoc} */ + @Override public ClusterNode failover(FailoverContext ctx, List<ClusterNode> top) { + failedOverJobs.add(ctx.getJobResult().getJobContext()); + + // Clear failed nodes list - allow to failover on the same node. + ctx.getJobResult().getJobContext().setAttribute(FAILED_NODE_LIST_ATTR, null); + + // Account for maximum number of failover attempts since we clear failed node list. + Integer failoverCnt = ctx.getJobResult().getJobContext().getAttribute(FAILOVER_NUMBER_ATTR); + + if (failoverCnt == null) + ctx.getJobResult().getJobContext().setAttribute(FAILOVER_NUMBER_ATTR, 1); + else { + if (failoverCnt >= getMaximumFailoverAttempts()) { + U.warn(log, "Job failover failed because number of maximum failover attempts is exceeded " + + "[failedJob=" + ctx.getJobResult().getJob() + ", maxFailoverAttempts=" + + getMaximumFailoverAttempts() + ']'); + + return null; + } + + ctx.getJobResult().getJobContext().setAttribute(FAILOVER_NUMBER_ATTR, failoverCnt + 1); + } + + List<ClusterNode> cp = new ArrayList<>(top); + + // Keep collection type. + F.retain(cp, false, new IgnitePredicate<ClusterNode>() { + @Override public boolean apply(ClusterNode node) { + return F.isAll(node, filter); + } + }); + + return super.failover(ctx, cp); //use cp to ensure we don't failover on failed node + } + + /** + * @return Job contexts for failed over jobs. + */ + public Set<ComputeJobContext> getFailedOverJobs() { + return failedOverJobs; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePutAllTask.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePutAllTask.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePutAllTask.java new file mode 100644 index 0000000..0bc4309 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePutAllTask.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.resources.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.jetbrains.annotations.*; + +import java.util.*; + +/** + * Puts all the passed data into partitioned cache in small chunks. + */ +class GridCachePutAllTask extends ComputeTaskAdapter<Collection<Integer>, Void> { + /** Number of entries per put. */ + private static final int TX_BOUND = 30; + + /** Preferred node. */ + private final UUID preferredNode; + + /** Cache name. */ + private final String cacheName; + + /** + * + * @param preferredNode A node that we'd prefer to take from grid. + * @param cacheName A name of the cache to work with. + */ + GridCachePutAllTask(UUID preferredNode, String cacheName) { + this.preferredNode = preferredNode; + this.cacheName = cacheName; + } + + /** {@inheritDoc} */ + @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, + @Nullable final Collection<Integer> data) throws IgniteCheckedException { + assert !subgrid.isEmpty(); + + // Give preference to wanted node. Otherwise, take the first one. + ClusterNode targetNode = F.find(subgrid, subgrid.get(0), new IgnitePredicate<ClusterNode>() { + /** {@inheritDoc} */ + @Override public boolean apply(ClusterNode e) { + return preferredNode.equals(e.id()); + } + }); + + return Collections.singletonMap( + new ComputeJobAdapter() { + @IgniteLoggerResource + private IgniteLogger log; + + @IgniteInstanceResource + private Ignite ignite; + + @Override public Object execute() throws IgniteCheckedException { + log.info("Going to put data: " + data); + + GridCacheProjection<Object, Object> cache = ignite.cache(cacheName); + + assert cache != null; + + HashMap<Integer, Integer> putMap = U.newLinkedHashMap(TX_BOUND); + + Iterator<Integer> it = data.iterator(); + + int cnt = 0; + + while (it.hasNext()) { + Integer val = it.next(); + + putMap.put(val, val); + + if (++cnt == TX_BOUND) { + log.info("Putting keys to cache: " + putMap.keySet()); + + cache.putAll(putMap); + + cnt = 0; + + putMap = U.newLinkedHashMap(TX_BOUND); + } + } + + assert cnt < TX_BOUND; + assert putMap.size() == (data.size() % TX_BOUND) : "putMap.size() = " + putMap.size(); + + log.info("Putting keys to cache: " + putMap.keySet()); + + cache.putAll(putMap); + + log.info("Finished putting data: " + data); + + return data; + } + }, + targetNode); + } + + /** {@inheritDoc} */ + @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) throws IgniteCheckedException { + if (res.getException() != null) + return ComputeJobResultPolicy.FAILOVER; + + return ComputeJobResultPolicy.WAIT; + } + + /** {@inheritDoc} */ + @Nullable @Override public Void reduce(List<ComputeJobResult> results) throws IgniteCheckedException { + return null; + } +}