http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheBasicStoreMultithreadedAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheBasicStoreMultithreadedAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheBasicStoreMultithreadedAbstractTest.java new file mode 100644 index 0000000..3f6e7c4 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheBasicStoreMultithreadedAbstractTest.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.store.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.testframework.junits.common.*; + +import javax.cache.*; +import javax.cache.configuration.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.cache.GridCacheWriteSynchronizationMode.*; + +/** + * Basic store test. + */ +public abstract class GridCacheBasicStoreMultithreadedAbstractTest extends GridCommonAbstractTest { + /** Cache store. */ + private CacheStore<Integer, Integer> store; + + /** + * + */ + protected GridCacheBasicStoreMultithreadedAbstractTest() { + super(false /*start grid. */); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + GridCache<?, ?> cache = cache(); + + if (cache != null) + cache.clearAll(); + + stopAllGrids(); + } + + /** + * @return Caching mode. + */ + protected abstract GridCacheMode cacheMode(); + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override protected final IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration c = super.getConfiguration(gridName); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(new TcpDiscoveryVmIpFinder(true)); + + c.setDiscoverySpi(disco); + + CacheConfiguration cc = defaultCacheConfiguration(); + + cc.setCacheMode(cacheMode()); + cc.setWriteSynchronizationMode(FULL_SYNC); + cc.setSwapEnabled(false); + + cc.setCacheStoreFactory(new FactoryBuilder.SingletonFactory(store)); + cc.setReadThrough(true); + cc.setWriteThrough(true); + cc.setLoadPreviousValue(true); + + c.setCacheConfiguration(cc); + + return c; + } + + /** + * @throws Exception If failed. + */ + public void testConcurrentGet() throws Exception { + final AtomicInteger cntr = new AtomicInteger(); + + store = new CacheStoreAdapter<Integer, Integer>() { + @Override public Integer load(Integer key) { + return cntr.incrementAndGet(); + } + + /** {@inheritDoc} */ + @Override public void write(Cache.Entry<? extends Integer, ? extends Integer> e) { + assert false; + } + + /** {@inheritDoc} */ + @Override public void delete(Object key) { + assert false; + } + }; + + startGrid(); + + final GridCache<Integer, Integer> cache = cache(); + + int threads = 2; + + final CyclicBarrier barrier = new CyclicBarrier(threads); + + multithreaded(new Callable<Object>() { + @Override public Object call() throws Exception { + barrier.await(); + + cache.get(1); + + return null; + } + }, threads, "concurrent-get-worker"); + + assertEquals(1, cntr.get()); + } +}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheClearAllSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheClearAllSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheClearAllSelfTest.java new file mode 100644 index 0000000..4e8100e --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheClearAllSelfTest.java @@ -0,0 +1,335 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.transactions.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.lang.reflect.*; + +import static org.apache.ignite.cache.GridCacheMode.*; +import static org.apache.ignite.cache.GridCacheDistributionMode.*; +import static org.apache.ignite.cache.GridCacheWriteSynchronizationMode.*; +import static org.apache.ignite.cache.GridCacheAtomicityMode.*; +import static org.apache.ignite.internal.processors.cache.GridCacheAdapter.*; + +/** + * Test {@link org.apache.ignite.cache.GridCache#clearAll()} operations in multinode environment with nodes having caches with different names. + */ +public class GridCacheClearAllSelfTest extends GridCommonAbstractTest { + /** Local cache. */ + private static final String CACHE_LOCAL = "cache_local"; + + /** Partitioned cache. */ + private static final String CACHE_PARTITIONED = "cache_partitioned"; + + /** Co-located cache. */ + private static final String CACHE_COLOCATED = "cache_colocated"; + + /** Replicated cache. */ + private static final String CACHE_REPLICATED = "cache_replicated"; + + /** Grid nodes count. */ + private static final int GRID_CNT = 3; + + /** VM IP finder for TCP discovery SPI. */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** Local caches. */ + private GridCache<Integer, Integer>[] cachesLoc; + + /** Partitioned caches. */ + private GridCache<Integer, Integer>[] cachesPartitioned; + + /** Colocated caches. */ + private GridCache<Integer, Integer>[] cachesColocated; + + /** Replicated caches. */ + private GridCache<Integer, Integer>[] cachesReplicated; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + CacheConfiguration ccfgLoc = new CacheConfiguration(); + + ccfgLoc.setName(CACHE_LOCAL); + ccfgLoc.setCacheMode(LOCAL); + ccfgLoc.setWriteSynchronizationMode(FULL_SYNC); + ccfgLoc.setAtomicityMode(TRANSACTIONAL); + + CacheConfiguration ccfgPartitioned = new CacheConfiguration(); + + ccfgPartitioned.setName(CACHE_PARTITIONED); + ccfgPartitioned.setCacheMode(PARTITIONED); + ccfgPartitioned.setBackups(1); + ccfgPartitioned.setWriteSynchronizationMode(FULL_SYNC); + ccfgPartitioned.setDistributionMode(gridName.equals(getTestGridName(0)) ? NEAR_PARTITIONED : + gridName.equals(getTestGridName(1)) ? NEAR_ONLY : CLIENT_ONLY); + ccfgPartitioned.setAtomicityMode(TRANSACTIONAL); + + CacheConfiguration ccfgColocated = new CacheConfiguration(); + + ccfgColocated.setName(CACHE_COLOCATED); + ccfgColocated.setCacheMode(PARTITIONED); + ccfgColocated.setBackups(1); + ccfgColocated.setWriteSynchronizationMode(FULL_SYNC); + ccfgColocated.setDistributionMode(PARTITIONED_ONLY); + ccfgColocated.setAtomicityMode(TRANSACTIONAL); + + CacheConfiguration ccfgReplicated = new CacheConfiguration(); + + ccfgReplicated.setName(CACHE_REPLICATED); + ccfgReplicated.setCacheMode(REPLICATED); + ccfgReplicated.setWriteSynchronizationMode(FULL_SYNC); + ccfgReplicated.setAtomicityMode(TRANSACTIONAL); + + cfg.setCacheConfiguration(ccfgLoc, ccfgPartitioned, ccfgColocated, ccfgReplicated); + + TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); + + discoSpi.setIpFinder(IP_FINDER); + + cfg.setDiscoverySpi(discoSpi); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + + cachesLoc = null; + cachesPartitioned = null; + cachesColocated = null; + cachesReplicated = null; + } + + /** + * Startup routine. + * + * @throws Exception If failed. + */ + private void startUp() throws Exception { + cachesLoc = (GridCache<Integer, Integer>[])Array.newInstance(GridCache.class, GRID_CNT); + cachesPartitioned = (GridCache<Integer, Integer>[])Array.newInstance(GridCache.class, GRID_CNT); + cachesColocated = (GridCache<Integer, Integer>[])Array.newInstance(GridCache.class, GRID_CNT); + cachesReplicated = (GridCache<Integer, Integer>[])Array.newInstance(GridCache.class, GRID_CNT); + + for (int i = 0; i < GRID_CNT; i++) { + Ignite ignite = startGrid(i); + + cachesLoc[i] = ignite.cache(CACHE_LOCAL); + cachesPartitioned[i] = ignite.cache(CACHE_PARTITIONED); + cachesColocated[i] = ignite.cache(CACHE_COLOCATED); + cachesReplicated[i] = ignite.cache(CACHE_REPLICATED); + } + } + + /** + * Test {@link GridCache#clearAll()} on LOCAL cache with no split. + * + * @throws Exception If failed. + */ + public void testLocalNoSplit() throws Exception { + test(Mode.TEST_LOCAL, CLEAR_ALL_SPLIT_THRESHOLD / 2); + } + + /** + * Test {@link GridCache#clearAll()} on LOCAL cache with split. + * + * @throws Exception If failed. + */ + public void testLocalSplit() throws Exception { + test(Mode.TEST_LOCAL, CLEAR_ALL_SPLIT_THRESHOLD + 1); + } + + /** + * Test {@link GridCache#clearAll()} on PARTITIONED cache with no split. + * + * @throws Exception If failed. + */ + public void testPartitionedNoSplit() throws Exception { + test(Mode.TEST_PARTITIONED, CLEAR_ALL_SPLIT_THRESHOLD / 2); + } + + /** + * Test {@link GridCache#clearAll()} on PARTITIONED cache with split. + * + * @throws Exception If failed. + */ + public void testPartitionedSplit() throws Exception { + test(Mode.TEST_PARTITIONED, CLEAR_ALL_SPLIT_THRESHOLD + 1); + } + + /** + * Test {@link GridCache#clearAll()} on co-located cache with no split. + * + * @throws Exception If failed. + */ + public void testColocatedNoSplit() throws Exception { + test(Mode.TEST_COLOCATED, CLEAR_ALL_SPLIT_THRESHOLD / 2); + } + + /** + * Test {@link GridCache#clearAll()} on co-located cache with split. + * + * @throws Exception If failed. + */ + public void testColocatedSplit() throws Exception { + test(Mode.TEST_COLOCATED, CLEAR_ALL_SPLIT_THRESHOLD + 1); + } + + /** + * Test {@link GridCache#clearAll()} on REPLICATED cache with no split. + * + * @throws Exception If failed. + */ + public void testReplicatedNoSplit() throws Exception { + test(Mode.TEST_REPLICATED, CLEAR_ALL_SPLIT_THRESHOLD / 2); + } + + /** + * Test {@link GridCache#clearAll()} on REPLICATED cache with split. + * + * @throws Exception If failed. + */ + public void testReplicatedSplit() throws Exception { + test(Mode.TEST_REPLICATED, CLEAR_ALL_SPLIT_THRESHOLD + 1); + } + + /** + * Internal method for all tests. + * + * @param mode Test mode + * @param keysCnt Keys count. + * @throws Exception In case of exception. + */ + private void test(Mode mode, int keysCnt) throws Exception { + startUp(); + + switch (mode) { + case TEST_LOCAL: { + // Check on only one node. + GridCache<Integer, Integer> cache = cachesLoc[0]; + + fillCache(cache, keysCnt); + + cache.clearAll(); + + assert cache.isEmpty(); + + break; + } + case TEST_PARTITIONED: { + // Take in count special case for near-only cache as well. + fillCache(cachesPartitioned[0], keysCnt); + + // Ensure correct no-op clean of CLIENT_ONLY cache. + warmCache(cachesPartitioned[2], keysCnt); + assert cachesPartitioned[2].isEmpty(); + cachesPartitioned[2].clearAll(); + assert cachesPartitioned[2].isEmpty(); + + stopGrid(2); // Shutdown Grid in order to remove reader in NEAR_PARTITIONED cache. + + // Ensure correct clear of NEA_ONLY cache. + warmCache(cachesPartitioned[1], keysCnt); + assert !cachesPartitioned[1].isEmpty(); + cachesPartitioned[1].clearAll(); + assert cachesPartitioned[1].isEmpty(); + fillCache(cachesPartitioned[1], keysCnt); + + stopGrid(1); // Shutdown Grid in order to remove reader in NEAR_PARTITIONED cache. + + // Ensure correct clear of NEAR_PARTITIONED cache. + assert !cachesPartitioned[0].isEmpty(); + cachesPartitioned[0].clearAll(); + assert cachesPartitioned[0].isEmpty(); + + break; + } + default: { + assert mode == Mode.TEST_COLOCATED || mode == Mode.TEST_REPLICATED; + + GridCache<Integer, Integer>[] caches = mode == Mode.TEST_COLOCATED ? cachesColocated : cachesReplicated; + + fillCache(caches[0], keysCnt); + + for (GridCache<Integer, Integer> cache : caches) { + assert !cache.isEmpty(); + + cache.clearAll(); + + assert cache.isEmpty(); + } + } + } + } + + /** + * Fill cache with values. + * + * @param cache Cache. + * @param keysCnt Amount of keys to put. + * @throws Exception If failed. + */ + private void fillCache(GridCache<Integer, Integer> cache, int keysCnt) throws Exception { + try (IgniteTx tx = cache.txStart()) { + for (int i = 0; i < keysCnt; i++) + cache.put(i, i); + + tx.commit(); + } + } + + /** + * Warm cache up. + * + * @param cache Cache. + * @param keysCnt Amount of keys to get. + * @throws Exception If failed. + */ + private void warmCache(GridCache<Integer, Integer> cache, int keysCnt) throws Exception { + for (int i = 0; i < keysCnt; i++) + cache.get(i); + } + + /** + * Test mode. + */ + private enum Mode { + /** Local cache. */ + TEST_LOCAL, + + /** Partitioned cache. */ + TEST_PARTITIONED, + + /** Co-located cache. */ + TEST_COLOCATED, + + /** Replicated cache. */ + TEST_REPLICATED + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheColocatedTxStoreExceptionSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheColocatedTxStoreExceptionSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheColocatedTxStoreExceptionSelfTest.java new file mode 100644 index 0000000..ad96a96 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheColocatedTxStoreExceptionSelfTest.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.cache.*; + +import static org.apache.ignite.cache.GridCacheDistributionMode.*; +import static org.apache.ignite.cache.GridCacheMode.*; + +/** + * + */ +public class GridCacheColocatedTxStoreExceptionSelfTest extends IgniteTxStoreExceptionAbstractSelfTest { + /** {@inheritDoc} */ + @Override protected GridCacheMode cacheMode() { + return PARTITIONED; + } + + /** {@inheritDoc} */ + @Override protected GridCacheDistributionMode distributionMode() { + return PARTITIONED_ONLY; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMapTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMapTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMapTest.java new file mode 100644 index 0000000..02a5f98 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMapTest.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.cache.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.testframework.junits.common.*; +import org.jetbrains.annotations.*; + +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.cache.GridCacheMode.*; +import static org.apache.ignite.cache.GridCacheWriteSynchronizationMode.*; + +/** + * Grid cache concurrent hash map self test. + */ +public class GridCacheConcurrentMapTest extends GridCommonAbstractTest { + /** Random. */ + private static final Random RAND = new Random(); + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + CacheConfiguration cc = defaultCacheConfiguration(); + + cc.setCacheMode(LOCAL); + cc.setWriteSynchronizationMode(FULL_SYNC); + + cfg.setCacheConfiguration(cc); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + startGrid(0); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + grid(0).cache(null).removeAll(); + } + + /** + * @throws Exception If failed. + */ + public void testRandomEntry() throws Exception { + GridCache<String, String> cache = grid(0).cache(null); + + for (int i = 0; i < 500; i++) + cache.put("key" + i, "val" + i); + + for (int i = 0; i < 20; i++) { + GridCacheEntry<String, String> entry = cache.randomEntry(); + + assert entry != null; + + info("Random entry key: " + entry.getKey()); + } + } + + /** + * @throws Exception If failed. + */ + public void testRandomEntryMultiThreaded() throws Exception { + final GridCache<String, String> cache = grid(0).cache(null); + + final AtomicBoolean done = new AtomicBoolean(); + + IgniteFuture<?> fut1 = multithreadedAsync( + new Callable<Object>() { + @Nullable @Override public Object call() throws Exception { + while (!done.get()) { + int i = RAND.nextInt(500); + + boolean rmv = RAND.nextBoolean(); + + if (rmv) + cache.remove("key" + i); + else + cache.put("key" + i, "val" + i); + } + + return null; + } + }, + 3 + ); + + IgniteFuture<?> fut2 = multithreadedAsync( + new Callable<Object>() { + @Nullable @Override public Object call() throws Exception { + while (!done.get()) { + GridCacheEntry<String, String> entry = cache.randomEntry(); + + info("Random entry key: " + (entry != null ? entry.getKey() : "N/A")); + } + + return null; + } + }, + 1 + ); + + Thread.sleep( 60 * 1000); + + done.set(true); + + fut1.get(); + fut2.get(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentTxMultiNodeTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentTxMultiNodeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentTxMultiNodeTest.java new file mode 100644 index 0000000..9b9c205 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentTxMultiNodeTest.java @@ -0,0 +1,857 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.affinity.*; +import org.apache.ignite.cache.datastructures.*; +import org.apache.ignite.cache.eviction.lru.*; +import org.apache.ignite.cache.query.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.util.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.resources.*; +import org.apache.ignite.transactions.*; +import org.apache.ignite.internal.processors.cache.distributed.dht.*; +import org.apache.ignite.internal.processors.cache.distributed.near.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.testframework.junits.common.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.cache.GridCacheDistributionMode.*; +import static org.apache.ignite.cache.GridCacheMode.*; +import static org.apache.ignite.cache.GridCachePreloadMode.*; +import static org.apache.ignite.transactions.IgniteTxConcurrency.*; +import static org.apache.ignite.transactions.IgniteTxIsolation.*; +import static org.apache.ignite.cache.GridCacheWriteSynchronizationMode.*; + +/** + * + */ +public class GridCacheConcurrentTxMultiNodeTest extends GridCommonAbstractTest { + /** IP finder. */ + private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** Timers. */ + private static final ConcurrentMap<Thread, ConcurrentMap<String, T5<Long, Long, Long, IgniteUuid, Object>>> timers = + new ConcurrentHashMap<>(); + + /** */ + private static final long PRINT_FREQ = 10000; + + /** */ + private static final GridAtomicLong lastPrint = new GridAtomicLong(); + + /** */ + private static final IgnitePredicate<ClusterNode> serverNode = new P1<ClusterNode>() { + @Override public boolean apply(ClusterNode n) { + String gridName = G.ignite(n.id()).name(); + + return gridName != null && gridName.contains("server"); + } + }; + + /** */ + private static final IgnitePredicate<ClusterNode> clientNode = new P1<ClusterNode>() { + @Override public boolean apply(ClusterNode n) { + String gridName = G.ignite(n.id()).name(); + + return gridName != null && gridName.contains("client"); + } + }; + + /** */ + private GridCacheMode mode = PARTITIONED; + + /** */ + private boolean cacheOn; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration c = super.getConfiguration(gridName); + + c.getTransactionsConfiguration().setDefaultTxConcurrency(PESSIMISTIC); + c.getTransactionsConfiguration().setDefaultTxIsolation(REPEATABLE_READ); + + if (cacheOn) { + CacheConfiguration cc = defaultCacheConfiguration(); + + cc.setCacheMode(mode); + cc.setDistributionMode(PARTITIONED_ONLY); + cc.setEvictionPolicy(new GridCacheLruEvictionPolicy(1000)); + cc.setEvictSynchronized(false); + cc.setEvictNearSynchronized(false); + cc.setSwapEnabled(false); + cc.setWriteSynchronizationMode(FULL_SYNC); + cc.setAtomicSequenceReserveSize(100000); + cc.setPreloadMode(NONE); + + c.setCacheConfiguration(cc); + } + else + c.setCacheConfiguration(); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(ipFinder); + + c.setDiscoverySpi(disco); + + c.setPeerClassLoadingEnabled(false); + + // Enable tracing. +// Logger.getLogger("org.gridgain.grid.kernal.processors.cache.GridCacheDgcManager.trace").setLevel(Level.DEBUG); + + return c; + } + + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return Long.MAX_VALUE; + } + + /** + * @throws Exception If failed. + */ + public void testEvictions() throws Exception { + try { + cacheOn = true; + + Ignite srvr1 = startGrid("server1"); + + srvr1.cache(null).dataStructures().atomicSequence("ID", 0, true); + + startGrid("server2"); + + cacheOn = false; + + // Client processes count. + int clientCnt = 8; + + for (int i = 1; i <= clientCnt; i++) + startGrid("client" + i); + + Collection<ClusterNode> srvrNodes = srvr1.cluster().forPredicate(serverNode).nodes(); + Collection<ClusterNode> clientNodes = srvr1.cluster().forPredicate(clientNode).nodes(); + + assert srvrNodes.size() == 2; + + // Threads count per each client process. + int threadCnt = 2; + + int srvrMaxNoTerminals = threadCnt / srvrNodes.size(); + + if (srvrMaxNoTerminals * srvrNodes.size() != threadCnt) { + threadCnt = srvrMaxNoTerminals * srvrNodes.size(); + + info("Using " + threadCnt + " threads instead to ensure equal distribution of terminals"); + } + + Collection<Callable<Object>> clients = new ArrayList<>(threadCnt * clientCnt); + + info("No of servers: " + srvrNodes.size()); + info("No of clients: " + clientNodes.size()); + info("Thread count: " + threadCnt); + info("Max number of terminals / server: " + srvrMaxNoTerminals); + + // Distribute terminals evenly across all servers + for (ClusterNode node : srvrNodes) { + UUID srvrId = node.id(); + + info(">>> Node ID: " + srvrId); + + int terminalsPerSrvr = 0; + + int tid = 0; // Terminal ID. + + while (true) { + String terminalId = String.valueOf(++tid); + + // Server partition cache + UUID mappedId = srvr1.cluster().mapKeyToNode(null, terminalId).id(); + + if (!srvrId.equals(mappedId)) + continue; + + info("Affinity mapping [key=" + terminalId + ", nodeId=" + mappedId + ']'); + + for (int i = 1; i <= clientCnt; i++) + clients.add(new Client(G.ignite("client" + i), terminalId, srvrId)); + + info("Terminal ID: " + terminalId); + + terminalsPerSrvr++; + + if (terminalsPerSrvr == srvrMaxNoTerminals) + break; + } + } + + displayReqCount(); + + ExecutorService pool = Executors.newFixedThreadPool(clients.size()); + + pool.invokeAll(clients); + + Thread.sleep(Long.MAX_VALUE); + } + finally { + stopAllGrids(); + } + } + + /** + * + */ + private void displayReqCount() { + new Thread(new Runnable() { + @SuppressWarnings({"BusyWait", "InfiniteLoopStatement"}) + @Override public void run() { + int interval = 10; + + while (true) { + long cnt0 = Client.txCnt.get(); + long lt0 = Client.latency.get(); + + try { + Thread.sleep(interval * 1000); + } + catch (InterruptedException e) { + e.printStackTrace(); + } + + long cnt1 = Client.txCnt.get(); + long lt1 = Client.latency.get(); + + info(">>>"); + info(">>> Transaction/s: " + (cnt1 - cnt0) / interval); + info(">>> Avg Latency: " + ((cnt1 - cnt0) > 0 ? (lt1 - lt0) / (cnt1 - cnt0) + "ms" : "invalid")); + info(">>> Max Submit Time: " + Client.submitTime.getAndSet(0)); + + try { + PerfJob.printTimers(); + } + catch (Exception e) { + e.printStackTrace(); + } + } + } + }).start(); + } + + /** + * + */ + private static class Client implements Callable<Object> { + /** */ + private static AtomicLong txCnt = new AtomicLong(); + + /** */ + private static AtomicLong latency = new AtomicLong(); + + /** */ + private static GridAtomicLong submitTime = new GridAtomicLong(); + + + /** */ + private Ignite g; + + /** */ + private String terminalId; + + /** */ + private UUID nodeId; + + /** + * @param g Grid. + * @param terminalId Terminal ID. + * @param nodeId Node ID. + */ + private Client(Ignite g, String terminalId, UUID nodeId) { + this.g = g; + this.terminalId = terminalId; + this.nodeId = nodeId; + } + + /** {@inheritDoc} */ + @SuppressWarnings({"InfiniteLoopStatement"}) + @Override public Object call() throws Exception { + while (true) { + try { + long t0 = System.currentTimeMillis(); + + long submitTime1 = t0; + + IgniteCompute comp = g.compute(g.cluster().forPredicate(serverNode)).enableAsync(); + + comp.execute(RequestTask.class, new Message(terminalId, nodeId)); + + ComputeTaskFuture<Void> f1 = comp.future(); + + submitTime.setIfGreater(System.currentTimeMillis() - submitTime1); + + f1.get(); + + submitTime1 = System.currentTimeMillis(); + + comp.execute(ResponseTask.class, new Message(terminalId, nodeId)); + + ComputeTaskFuture<Void> f2 = comp.future(); + + submitTime.setIfGreater(System.currentTimeMillis() - submitTime1); + + f2.get(); + + long t1 = System.currentTimeMillis(); + + txCnt.incrementAndGet(); + + latency.addAndGet(t1 - t0); + } + catch (IgniteCheckedException e) { + e.printStackTrace(); + } + } + } + } + + + /** + * + */ + private static class Message implements Serializable { + /** */ + private String terminalId; + + /** */ + private UUID nodeId; + + /** + * @param terminalId Terminal ID. + * @param nodeId Node ID. + */ + Message(String terminalId, UUID nodeId) { + this.terminalId = terminalId; + this.nodeId = nodeId; + } + + /** + * @return Terminal ID. + */ + String getTerminalId() { + return terminalId; + } + + /** + * @param terminalId Terminal ID. + */ + void setTerminalId(String terminalId) { + this.terminalId = terminalId; + } + + /** + * @return Node ID. + */ + UUID getNodeId() { + return nodeId; + } + + /** + * @param nodeId Node ID. + */ + void setNodeId(UUID nodeId) { + this.nodeId = nodeId; + } + } + + /** + * + */ + private static class PerfJob extends ComputeJobAdapter { + /** */ + private static final long MAX = 5000; + + /** */ + @IgniteInstanceResource + private Ignite ignite; + + /** + * @param msg Message. + */ + PerfJob(@Nullable Message msg) { + super(msg); + } + + /** + * @return Message. + */ + private Message message() { + return argument(0); + } + + /** + * @return Terminal ID. + */ + @GridCacheAffinityKeyMapped + public String terminalId() { + return message().getTerminalId(); + } + + /** {@inheritDoc} */ + @Override public Object execute() { + ClusterNodeLocalMap<String, T2<AtomicLong, AtomicLong>> nodeLoc = ignite.cluster().nodeLocalMap(); + + T2<AtomicLong, AtomicLong> cntrs = nodeLoc.get("cntrs"); + + if (cntrs == null) { + T2<AtomicLong, AtomicLong> other = nodeLoc.putIfAbsent("cntrs", + cntrs = new T2<>(new AtomicLong(), new AtomicLong(System.currentTimeMillis()))); + + if (other != null) + cntrs = other; + } + + long cnt = cntrs.get1().incrementAndGet(); + + doWork(); + + GridNearCacheAdapter near = (GridNearCacheAdapter)((GridKernal) ignite).internalCache(); + GridDhtCacheAdapter dht = near.dht(); + + long start = cntrs.get2().get(); + + long now = System.currentTimeMillis(); + + long dur = now - start; + + if (dur > 20000 && cntrs.get2().compareAndSet(start, System.currentTimeMillis())) { + cntrs.get1().set(0); + + X.println("Stats [tx/sec=" + (cnt / (dur / 1000)) + ", nearSize=" + near.size() + + ", dhtSize=" + dht.size() + ']'); + } + + return null; + } + + /** + * @param name Timer name. + * @param xid XID. + * @param key Key. + * @param termId Terminal ID. + */ + private void startTimer(String name, @Nullable IgniteUuid xid, @Nullable String key, String termId) { + ConcurrentMap<String, T5<Long, Long, Long, IgniteUuid, Object>> m = timers.get(Thread.currentThread()); + + if (m == null) { + ConcurrentMap<String, T5<Long, Long, Long, IgniteUuid, Object>> old = + timers.putIfAbsent(Thread.currentThread(), + m = new ConcurrentHashMap<>()); + + if (old != null) + m = old; + } + + T5<Long, Long, Long, IgniteUuid, Object> t = m.get(name); + + if (t == null) { + T5<Long, Long, Long, IgniteUuid, Object> old = m.putIfAbsent(name, + t = new T5<>()); + + if (old != null) + t = old; + } + + t.set1(System.currentTimeMillis()); + t.set2(0L); + t.set4(xid); + t.set5(key == null ? null : new GridCacheAffinityKey<String>(key, termId) {}); + } + + /** + * @param name Timer name. + */ + private void stopTimer(String name) { + ConcurrentMap<String, T5<Long, Long, Long, IgniteUuid, Object>> m = timers.get(Thread.currentThread()); + + T5<Long, Long, Long, IgniteUuid, Object> t = m.get(name); + + assert t != null; + + long now = System.currentTimeMillis(); + + t.set2(now); + t.set3(Math.max(t.get3() == null ? 0 : t.get3(), now - t.get1())); + t.set4(null); + t.set5(null); + } + + /** + * @throws Exception If failed. + */ + private static void printTimers() throws Exception { + //String termId = terminalId(); + + long now = System.currentTimeMillis(); + + if (lastPrint.get() + PRINT_FREQ < now && lastPrint.setIfGreater(now)) { + Map<String, Long> maxes = new HashMap<>(); + + Set<GridCacheAffinityKey<String>> keys = null; + + for (Map.Entry<Thread, ConcurrentMap<String, T5<Long, Long, Long, IgniteUuid, Object>>> e1 : timers.entrySet()) { + for (Map.Entry<String, T5<Long, Long, Long, IgniteUuid, Object>> e2 : e1.getValue().entrySet()) { + T5<Long, Long, Long, IgniteUuid, Object> t = e2.getValue(); + + long start = t.get1(); + long end = t.get2(); + + IgniteUuid xid = t.get4(); + + long duration = end == 0 ? now - start : end - start; + + long max = t.get3() == null ? duration : t.get3(); + + if (duration < 0) + duration = now - start; + + if (duration > MAX) { + X.println("Maxed out timer [name=" + e2.getKey() + ", key=" + t.get5() + + ", duration=" + duration + ", ongoing=" + (end == 0) + + ", thread=" + e1.getKey().getName() + ", xid=" + xid + ']'); + + GridCacheAffinityKey<String> key = (GridCacheAffinityKey<String>)t.get5(); + + if (key != null) { + if (keys == null) + keys = new LinkedHashSet<>(); + + keys.add(key); + } + } + + Long cmax = maxes.get(e2.getKey()); + + if (cmax == null || max > cmax) + maxes.put(e2.getKey(), max); + + t.set3(null); + } + } + + if (!F.isEmpty(keys)) { + for (Ignite g : G.allGrids()) { + if (g.name().contains("server")) { + GridNearCacheAdapter<GridCacheAffinityKey<String>, Object> near = + (GridNearCacheAdapter<GridCacheAffinityKey<String>, Object>)((GridKernal)g). + <GridCacheAffinityKey<String>, Object>internalCache(); + GridDhtCacheAdapter<GridCacheAffinityKey<String>, Object> dht = near.dht(); + + for (GridCacheAffinityKey<String> k : keys) { + GridNearCacheEntry<?, ?> nearEntry = near.peekExx(k); + GridDhtCacheEntry<?, ?> dhtEntry = dht.peekExx(k); + + X.println("Near entry [grid="+ g.name() + ", key=" + k + ", entry=" + nearEntry); + X.println("DHT entry [grid=" + g.name() + ", key=" + k + ", entry=" + dhtEntry); + + GridCacheMvccCandidate<?> nearCand = + nearEntry == null ? null : F.first(nearEntry.localCandidates()); + + if (nearCand != null) + X.println("Near futures: " + + nearEntry.context().mvcc().futures(nearCand.version())); + + GridCacheMvccCandidate<?> dhtCand = + dhtEntry == null ? null : F.first(dhtEntry.localCandidates()); + + if (dhtCand != null) + X.println("Dht futures: " + + dhtEntry.context().mvcc().futures(dhtCand.version())); + + } + } + } + } + + for (Map.Entry<String, Long> e : maxes.entrySet()) + X.println("Timer [name=" + e.getKey() + ", maxTime=" + e.getValue() + ']'); + + X.println(">>>>"); + } + } + + /** + * + */ + private void doWork() { + GridCache cache = ignite.cache(null); + + Session ses = new Session(terminalId()); + + try { + try (IgniteTx tx = cache.txStart()) { + Request req = new Request(getId()); + + req.setMessageId(getId()); + + String key = req.getCacheKey(); + + startTimer("putRequest", tx.xid(), key, terminalId()); + + put(req, key, terminalId()); + + stopTimer("putRequest"); +// +// for (int i = 0; i < 5; i++) { +// Response rsp = new Response(getId()); +// +// startTimer("putResponse-" + i, tx.xid()); +// +// put(rsp, rsp.getCacheKey(), terminalId()); +// +// stopTimer("putResponse-" + i); +// } + + key = ses.getCacheKey(); + + startTimer("putSession", tx.xid(), key, terminalId()); + + put(ses, key, terminalId()); + + stopTimer("putSession"); + + startTimer("commit", tx.xid(), null, terminalId()); + + tx.commit(); + + stopTimer("commit"); + } + } + catch (IgniteCheckedException e) { + e.printStackTrace(); + } + } + + /** + * @return New ID. + * @throws IgniteCheckedException If failed. + */ + private long getId() throws IgniteCheckedException { + GridCacheAtomicSequence seq = ignite.cache(null).dataStructures().atomicSequence("ID", 0, true); + return seq.incrementAndGet(); + } + + /** + * @param msgId Message ID. + * @return Request. + */ + private Request findRequestWithMessageId(Long msgId) { + GridCacheProjection<Object, Request> cache = ignite.cache(null).projection(Object.class, Request.class); + + GridCacheQuery<Map.Entry<Object, Request>> qry = cache.queries().createSqlQuery( + Request.class, "messageId = ?"); + + try { + // taking out localNode() doesn't change the eviction timeout future + // problem + Map.Entry<Object, Request> entry = + F.first(qry.projection(ignite.cluster().forLocal()).execute(msgId).get()); + + if (entry == null) + return null; + + return entry.getValue(); + } + catch (IgniteCheckedException e) { + e.printStackTrace(); + + return null; + } + } + + /** + * @param o Object to put. + * @param cacheKey Cache key. + * @param terminalId Terminal ID. + * @throws IgniteCheckedException If failed. + */ + private void put(Object o, String cacheKey, String terminalId) throws IgniteCheckedException { + GridCache<GridCacheAffinityKey<String>, Object> cache = ignite.cache(null); + + GridCacheAffinityKey<String> affinityKey = new GridCacheAffinityKey<>(cacheKey, terminalId); + + GridCacheEntry<GridCacheAffinityKey<String>, Object> entry = cache.entry(affinityKey); + + entry.setx(o); + } + + /** + * @param cacheKey Cache key. + * @param terminalId Terminal ID. + * @return Cached object. + * @throws IgniteCheckedException If failed. + */ + @SuppressWarnings({"RedundantCast"}) + private <T> Object get(String cacheKey, String terminalId) throws IgniteCheckedException { + Object key = new GridCacheAffinityKey<>(cacheKey, terminalId); + + return (T) ignite.cache(null).get(key); + } + } + + /** + * + */ + @GridCacheQueryGroupIndex(name = "msg_tx", unique = true) + @SuppressWarnings({"UnusedDeclaration"}) + private static class Request implements Serializable { + /** */ + @GridCacheQuerySqlField(unique = true) + private Long id; + + /** */ + @GridCacheQuerySqlField(name = "messageId") + @GridCacheQuerySqlField.Group(name = "msg_tx", order = 3) + private long msgId; + + /** */ + @GridCacheQuerySqlField(name = "transactionId") + @GridCacheQuerySqlField.Group(name = "msg_tx", order = 1) + private long txId; + + /** + * @param id Request ID. + */ + Request(long id) { + this.id = id; + } + + /** + * @param msgId Message ID. + */ + public void setMessageId(long msgId) { + this.msgId = msgId; + } + + /** + * @return Cache key. + */ + public String getCacheKey() { + return "RESPONSE:" + id.toString(); + } + } + + /** + * + */ + @SuppressWarnings({"UnusedDeclaration"}) + private static class Response implements Serializable { + /** */ + @GridCacheQuerySqlField(unique = true) + private Long id; + + /** */ + @GridCacheQuerySqlField(name = "messageId") + private long msgId; + + /** */ + @GridCacheQuerySqlField(name = "transactionId") + private long txId; + + /** + * @param id Response ID. + */ + Response(long id) { + this.id = id; + } + + /** + * @return Cache key. + */ + public String getCacheKey() { + return "REQUEST:" + id.toString(); + } + } + + /** + * + */ + private static class Session implements Serializable { + /** */ + @GridCacheQuerySqlField(unique = true) + private String terminalId; + + /** + * @param terminalId Terminal ID. + */ + Session(String terminalId) { + this.terminalId = terminalId; + } + + /** + * @return Cache key. + */ + public String getCacheKey() { + return "SESSION:" + terminalId; + } + } + + /** + * + */ + @SuppressWarnings( {"UnusedDeclaration"}) + private static class ResponseTask extends ComputeTaskSplitAdapter<Message, Void> { + /** {@inheritDoc} */ + @Override protected Collection<? extends ComputeJob> split(int arg0, Message msg) throws IgniteCheckedException { + return Collections.singletonList(new PerfJob(msg)); + } + + /** {@inheritDoc} */ + @Nullable @Override public Void reduce(List<ComputeJobResult> results) throws IgniteCheckedException { + return null; + } + } + + /** + * + */ + private static class RequestTask extends ComputeTaskSplitAdapter<Message, Void> { + /** {@inheritDoc} */ + @Override protected Collection<? extends ComputeJob> split(int arg0, Message msg) throws IgniteCheckedException { + return Collections.singletonList(new PerfJob(msg)); + } + + /** {@inheritDoc} */ + @Nullable @Override public Void reduce(List<ComputeJobResult> results) throws IgniteCheckedException { + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConfigurationConsistencySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConfigurationConsistencySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConfigurationConsistencySelfTest.java new file mode 100644 index 0000000..d110a75 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConfigurationConsistencySelfTest.java @@ -0,0 +1,1074 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.affinity.*; +import org.apache.ignite.cache.affinity.consistenthash.*; +import org.apache.ignite.cache.affinity.fair.*; +import org.apache.ignite.cache.cloner.*; +import org.apache.ignite.cache.eviction.*; +import org.apache.ignite.cache.eviction.fifo.*; +import org.apache.ignite.cache.eviction.lru.*; +import org.apache.ignite.cache.eviction.random.*; +import org.apache.ignite.cache.store.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.testframework.*; +import org.apache.ignite.testframework.junits.common.*; +import org.jetbrains.annotations.*; + +import javax.cache.*; +import javax.cache.configuration.*; +import javax.cache.integration.*; +import java.util.*; +import java.util.concurrent.*; + +import static org.apache.ignite.configuration.IgniteDeploymentMode.*; +import static org.apache.ignite.cache.GridCacheAtomicityMode.*; +import static org.apache.ignite.cache.GridCacheMode.*; +import static org.apache.ignite.cache.GridCacheDistributionMode.*; +import static org.apache.ignite.cache.GridCachePreloadMode.*; +import static org.apache.ignite.cache.GridCacheWriteSynchronizationMode.*; + +/** + * + */ +public class GridCacheConfigurationConsistencySelfTest extends GridCommonAbstractTest { + /** */ + private boolean cacheEnabled; + + /** */ + private String cacheName; + + /** */ + private GridCacheMode cacheMode = REPLICATED; + + /** */ + private IgniteDeploymentMode depMode = SHARED; + + /** */ + private C1<CacheConfiguration, Void> initCache; + + /** */ + private boolean useStrLog; + + /** */ + private GridStringLogger strLog; + + /** */ + private GridCacheAffinityFunction aff; + + /** */ + private int backups; + + /** */ + private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + if (useStrLog) { + strLog = new GridStringLogger(false, cfg.getGridLogger()); + cfg.setGridLogger(strLog); + } + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(ipFinder); + + cfg.setDiscoverySpi(disco); + + cfg.setDeploymentMode(depMode); + + if (cacheEnabled) { + CacheConfiguration cacheCfg = defaultCacheConfiguration(); + + cacheCfg.setName(cacheName); + cacheCfg.setCacheMode(cacheMode); + cacheCfg.setAffinity(aff); + cacheCfg.setBackups(backups); + cacheCfg.setAtomicityMode(TRANSACTIONAL); + + if (initCache != null) + initCache.apply(cacheCfg); + + cfg.setCacheConfiguration(cacheCfg); + } + else + cfg.setCacheConfiguration(); + + return cfg; + } + + /** + * @throws Exception If failed. + */ + public void testCacheUtilsCheckAttributeMismatch() throws Exception { + Ignite ignite = startGrid(1); + + final ClusterNode node = ignite.cluster().localNode(); + + final GridStringLogger strLog = new GridStringLogger(false, log); + + CU.checkAttributeMismatch(strLog, "cache", node, "cacheMode", "Cache mode", LOCAL, PARTITIONED, false); + + assertTrue("No expected message in log: " + strLog.toString(), + strLog.toString().contains("Cache mode mismatch")); + + strLog.reset(); + + GridTestUtils.assertThrows(log, new Callable<Void>() { + /** {@inheritDoc} */ + @Override public Void call() throws Exception { + CU.checkAttributeMismatch(strLog, "cache", node, "cacheMode", "Cache mode", LOCAL, PARTITIONED, true); + return null; + } + }, IgniteCheckedException.class, "Cache mode mismatch"); + + final CacheConfiguration cfg1 = defaultCacheConfiguration(); + + cfg1.setCacheMode(LOCAL); + + final CacheConfiguration cfg2 = defaultCacheConfiguration(); + + cfg2.setCacheMode(PARTITIONED); + + CU.checkAttributeMismatch(strLog, cfg1, cfg2, node, new T2<>("cacheMode", "Cache mode"), false); + + assertTrue("No expected message in log: " + strLog.toString(), + strLog.toString().contains("Cache mode mismatch")); + + GridTestUtils.assertThrows(log, new Callable<Void>() { + /** {@inheritDoc} */ + @Override public Void call() throws Exception { + CU.checkAttributeMismatch(strLog, cfg1, cfg2, node, new T2<>("cacheMode", "Cache mode"), true); + return null; + } + }, IgniteCheckedException.class, "Cache mode mismatch"); + } + + /** + * @throws Exception If failed. + */ + public void testNullCacheMode() throws Exception { + // Grid with null cache mode. + // This is a legal case. The default cache mode should be used. + cacheEnabled = true; + cacheName = "myCache"; + cacheMode = null; + depMode = SHARED; + + assert startGrid(1).cache("myCache").configuration().getCacheMode() == CacheConfiguration.DFLT_CACHE_MODE; + } + + /** + * @throws Exception If failed. + */ + public void testWithCacheAndWithoutCache() throws Exception { + // 1st grid without cache. + cacheEnabled = false; + depMode = SHARED; + + startGrid(2); + + // 2nd grid with replicated cache on board. + cacheEnabled = true; + cacheName = "myCache"; + cacheMode = REPLICATED; + depMode = SHARED; + + startGrid(1); + } + + /** + * @throws Exception If failed. + */ + public void testSameCacheDifferentModes() throws Exception { + // 1st grid with replicated cache. + cacheEnabled = true; + cacheName = "myCache"; + cacheMode = REPLICATED; + depMode = SHARED; + + startGrid(1); + + // 2nd grid with partitioned cache. + cacheEnabled = true; + cacheName = "myCache"; + cacheMode = PARTITIONED; + depMode = SHARED; + + try { + startGrid(2); + + fail(); + } + catch (IgniteCheckedException e) { + info("Caught expected exception: " + e); + } + } + + /** + * @throws Exception If failed. + */ + public void testDifferentCacheDifferentModes() throws Exception { + // 1st grid with local cache. + cacheEnabled = true; + cacheName = "local"; + cacheMode = LOCAL; + depMode = SHARED; + + startGrid(1); + + // 2nd grid with replicated cache. + cacheEnabled = true; + cacheName = "replicated"; + cacheMode = REPLICATED; + depMode = SHARED; + + startGrid(2); + + // 3d grid with partitioned cache. + cacheEnabled = true; + cacheName = "partitioned"; + cacheMode = PARTITIONED; + depMode = SHARED; + + startGrid(3); + + // 4th grid with null cache mode (legal case, it should turn to REPLICATED mode). + cacheEnabled = true; + cacheName = "replicated"; + cacheMode = null; + depMode = SHARED; + + startGrid(4); + } + + /** + * @throws Exception If failed. + */ + public void testDifferentDeploymentModes() throws Exception { + // 1st grid with SHARED mode. + cacheEnabled = true; + cacheName = "partitioned"; + cacheMode = PARTITIONED; + depMode = SHARED; + + startGrid(1); + + // 2nd grid with CONTINUOUS mode. + cacheEnabled = true; + cacheName = "partitioned"; + cacheMode = PARTITIONED; + depMode = CONTINUOUS; + + GridTestUtils.assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + return startGrid(2); + } + }, IgniteCheckedException.class, null); + } + + /** + * @throws Exception If failed. + */ + public void testDifferentAffinities() throws Exception { + cacheMode = PARTITIONED; + + checkSecondGridStartFails( + new C1<CacheConfiguration, Void>() { + /** {@inheritDoc} */ + @Override public Void apply(CacheConfiguration cfg) { + cfg.setAffinity(new GridCacheConsistentHashAffinityFunction() {/*No-op.*/}); + return null; + } + }, + new C1<CacheConfiguration, Void>() { + /** {@inheritDoc} */ + @Override public Void apply(CacheConfiguration cfg) { + cfg.setAffinity(new GridCacheConsistentHashAffinityFunction()); + return null; + } + } + ); + } + + /** + * @throws Exception If failed. + */ + public void testDifferentPreloadModes() throws Exception { + checkSecondGridStartFails( + new C1<CacheConfiguration, Void>() { + /** {@inheritDoc} */ + @Override public Void apply(CacheConfiguration cfg) { + cfg.setPreloadMode(NONE); + return null; + } + }, + new C1<CacheConfiguration, Void>() { + /** {@inheritDoc} */ + @Override public Void apply(CacheConfiguration cfg) { + cfg.setPreloadMode(ASYNC); + return null; + } + } + ); + } + + /** + * @throws Exception If failed. + */ + public void testDifferentEvictionEnabled() throws Exception { + checkSecondGridStartFails( + new C1<CacheConfiguration, Void>() { + /** {@inheritDoc} */ + @Override public Void apply(CacheConfiguration cfg) { + cfg.setEvictionPolicy(new GridCacheFifoEvictionPolicy()); + return null; + } + }, + new C1<CacheConfiguration, Void>() { + /** {@inheritDoc} */ + @Override public Void apply(CacheConfiguration cfg) { + return null; + } + } + ); + } + + /** + * @throws Exception If failed. + */ + public void testDifferentEvictionPolicies() throws Exception { + checkSecondGridStartFails( + new C1<CacheConfiguration, Void>() { + /** {@inheritDoc} */ + @Override public Void apply(CacheConfiguration cfg) { + cfg.setEvictionPolicy(new GridCacheRandomEvictionPolicy()); + return null; + } + }, + new C1<CacheConfiguration, Void>() { + /** {@inheritDoc} */ + @Override public Void apply(CacheConfiguration cfg) { + cfg.setEvictionPolicy(new GridCacheFifoEvictionPolicy()); + return null; + } + } + ); + } + + /** + * @throws Exception If failed. + */ + public void testDifferentEvictionFilters() throws Exception { + checkSecondGridStartFails( + new C1<CacheConfiguration, Void>() { + /** {@inheritDoc} */ + @Override public Void apply(CacheConfiguration cfg) { + cfg.setEvictionFilter(new GridCacheEvictionFilter<Object, Object>() { + @Override public boolean evictAllowed(GridCacheEntry<Object, Object> entry) { + return false; + } + }); + return null; + } + }, + new C1<CacheConfiguration, Void>() { + /** {@inheritDoc} */ + @Override public Void apply(CacheConfiguration cfg) { + cfg.setEvictionFilter(new GridCacheEvictionFilter<Object, Object>() { + @Override public boolean evictAllowed(GridCacheEntry<Object, Object> entry) { + return true; + } + }); + return null; + } + } + ); + } + + /** + * @throws Exception If failed. + */ + public void testDifferentAffinityMappers() throws Exception { + checkSecondGridStartFails( + new C1<CacheConfiguration, Void>() { + /** {@inheritDoc} */ + @Override public Void apply(CacheConfiguration cfg) { + cfg.setAffinityMapper(new GridCacheDefaultAffinityKeyMapper() { + }); + return null; + } + }, + new C1<CacheConfiguration, Void>() { + /** {@inheritDoc} */ + @Override public Void apply(CacheConfiguration cfg) { + cfg.setAffinityMapper(new GridCacheDefaultAffinityKeyMapper()); + return null; + } + } + ); + } + + /** + * @throws Exception If failed. + */ + public void testDifferentEvictSynchronized() throws Exception { + cacheMode = PARTITIONED; + + checkSecondGridStartFails( + new C1<CacheConfiguration, Void>() { + /** {@inheritDoc} */ + @Override public Void apply(CacheConfiguration cfg) { + cfg.setEvictSynchronized(true); + cfg.setEvictionPolicy(new GridCacheFifoEvictionPolicy(100)); + return null; + } + }, + new C1<CacheConfiguration, Void>() { + /** {@inheritDoc} */ + @Override public Void apply(CacheConfiguration cfg) { + cfg.setEvictSynchronized(false); + cfg.setEvictionPolicy(new GridCacheFifoEvictionPolicy(100)); + return null; + } + } + ); + } + + /** + * @throws Exception If failed. + */ + public void testDifferentEvictNearSynchronized() throws Exception { + cacheMode = PARTITIONED; + + checkSecondGridStartFails( + new C1<CacheConfiguration, Void>() { + /** {@inheritDoc} */ + @SuppressWarnings("deprecation") + @Override public Void apply(CacheConfiguration cfg) { + cfg.setDistributionMode(NEAR_PARTITIONED); + cfg.setEvictNearSynchronized(true); + return null; + } + }, + new C1<CacheConfiguration, Void>() { + /** {@inheritDoc} */ + @SuppressWarnings("deprecation") + @Override public Void apply(CacheConfiguration cfg) { + cfg.setDistributionMode(NEAR_PARTITIONED); + cfg.setEvictNearSynchronized(false); + return null; + } + } + ); + } + + /** + * @throws Exception If failed. + */ + public void testDifferentAtomicity() throws Exception { + cacheMode = PARTITIONED; + + checkSecondGridStartFails( + new C1<CacheConfiguration, Void>() { + /** {@inheritDoc} */ + @Override public Void apply(CacheConfiguration cfg) { + cfg.setDistributionMode(PARTITIONED_ONLY); + cfg.setAtomicityMode(ATOMIC); + return null; + } + }, + new C1<CacheConfiguration, Void>() { + /** {@inheritDoc} */ + @Override public Void apply(CacheConfiguration cfg) { + cfg.setDistributionMode(PARTITIONED_ONLY); + cfg.setAtomicityMode(TRANSACTIONAL); + return null; + } + } + ); + } + + /** + * @throws Exception If failed. + */ + public void testDifferentSynchronization() throws Exception { + cacheMode = PARTITIONED; + + checkSecondGridStartFails( + new C1<CacheConfiguration, Void>() { + /** {@inheritDoc} */ + @Override public Void apply(CacheConfiguration cfg) { + cfg.setWriteSynchronizationMode(FULL_SYNC); + return null; + } + }, + new C1<CacheConfiguration, Void>() { + /** {@inheritDoc} */ + @Override public Void apply(CacheConfiguration cfg) { + cfg.setWriteSynchronizationMode(FULL_ASYNC); + return null; + } + } + ); + } + + /** + * @throws Exception If failed. + */ + public void testAttributesError() throws Exception { + checkSecondGridStartFails( + new C1<CacheConfiguration, Void>() { + @Override public Void apply(CacheConfiguration cfg) { + cfg.setQueryIndexEnabled(true); + return null; + } + }, + new C1<CacheConfiguration, Void>() { + @Override public Void apply(CacheConfiguration cfg) { + cfg.setQueryIndexEnabled(false); + return null; + } + } + ); + } + + /** + * @throws Exception If failed. + */ + public void testAffinityFunctionConsistency() throws Exception { + cacheEnabled = true; + cacheMode = PARTITIONED; + + backups = 1; + + aff = new GridCacheConsistentHashAffinityFunction(false, 100); + + startGrid(1); + + // 2nd grid with another affinity. + // Check include neighbors. + aff = new GridCacheConsistentHashAffinityFunction(true, 100); + + GridTestUtils.assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + return startGrid(2); + } + }, IgniteCheckedException.class, "Affinity include neighbors mismatch"); + + backups = 2; + + // Check backups. + aff = new GridCacheConsistentHashAffinityFunction(false, 100); + + GridTestUtils.assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + return startGrid(2); + } + }, IgniteCheckedException.class, "Affinity key backups mismatch"); + + backups = 1; + + // Partitions count. + aff = new GridCacheConsistentHashAffinityFunction(false, 1000); + + GridTestUtils.assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + return startGrid(2); + } + }, IgniteCheckedException.class, "Affinity partitions count mismatch"); + + // Replicas count. + aff = new GridCacheConsistentHashAffinityFunction(false, 100); + ((GridCacheConsistentHashAffinityFunction)aff).setDefaultReplicas(1024); + + GridTestUtils.assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + return startGrid(2); + } + }, IgniteCheckedException.class, "Affinity replicas mismatch"); + + // Replicas count attribute name. + aff = new GridCacheConsistentHashAffinityFunction(false, 100); + ((GridCacheConsistentHashAffinityFunction)aff).setReplicaCountAttributeName("attr_name"); + + GridTestUtils.assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + return startGrid(2); + } + }, IgniteCheckedException.class, "Affinity replica count attribute name mismatch"); + + // Different hash ID resolver. + GridCacheConsistentHashAffinityFunction aff0 = new GridCacheConsistentHashAffinityFunction(false, 100); + + aff0.setHashIdResolver(new GridCacheAffinityNodeIdHashResolver()); + + aff = aff0; + + GridTestUtils.assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + return startGrid(2); + } + }, IgniteCheckedException.class, "Partitioned cache affinity hash ID resolver class mismatch"); + } + + /** + * @throws Exception If failed. + */ + public void testAttributesWarnings() throws Exception { + cacheEnabled = true; + + initCache = new C1<CacheConfiguration, Void>() { + /** {@inheritDoc} */ + @Override public Void apply(CacheConfiguration cfg) { + cfg.setAtomicSequenceReserveSize(1000); + cfg.setCloner(new GridCacheCloner() { + @Nullable @Override public <T> T cloneValue(T val) { + return null; + } + }); + cfg.setDefaultLockTimeout(1000); + cfg.setDefaultQueryTimeout(1000); + cfg.setDefaultTimeToLive(1000); + return null; + } + }; + + startGrid(1); + + useStrLog = true; + + initCache = new C1<CacheConfiguration, Void>() { + /** {@inheritDoc} */ + @Override public Void apply(CacheConfiguration cfg) { + cfg.setAtomicSequenceReserveSize(2 * 1000); + cfg.setCloner(new GridCacheCloner() { + @Nullable @Override public <T> T cloneValue(T val) { + return null; + } + }); + cfg.setDefaultLockTimeout(2 * 1000); + cfg.setDefaultQueryTimeout(2 * 1000); + cfg.setDefaultTimeToLive(2 * 1000); + return null; + } + }; + + startGrid(2); + + String log = strLog.toString(); + + assertTrue(log.contains("Atomic sequence reserve size mismatch")); + assertTrue(log.contains("Cache cloner mismatch")); + assertTrue(log.contains("Default lock timeout")); + assertTrue(log.contains("Default query timeout")); + assertTrue(log.contains("Default time to live")); + } + + /** + * @throws Exception If failed. + */ + public void testPartitionedOnlyAttributesIgnoredForReplicated() throws Exception { + cacheEnabled = true; + + cacheMode = REPLICATED; + + initCache = new C1<CacheConfiguration, Void>() { + @Override public Void apply(CacheConfiguration cfg) { + cfg.setEvictNearSynchronized(true); + cfg.setNearEvictionPolicy(new GridCacheRandomEvictionPolicy()); + return null; + } + }; + + startGrid(1); + + initCache = new C1<CacheConfiguration, Void>() { + @Override public Void apply(CacheConfiguration cfg) { + cfg.setEvictNearSynchronized(false); + cfg.setNearEvictionPolicy(new GridCacheFifoEvictionPolicy()); + return null; + } + }; + + startGrid(2); + } + + /** + * @throws Exception If failed. + */ + public void testIgnoreMismatchForLocalCaches() throws Exception { + cacheEnabled = true; + + cacheMode = LOCAL; + + initCache = new C1<CacheConfiguration, Void>() { + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public Void apply(CacheConfiguration cfg) { + cfg.setAffinity(new GridCacheConsistentHashAffinityFunction() {/*No-op.*/}); + + cfg.setEvictionPolicy(new GridCacheFifoEvictionPolicy()); + + cfg.setCacheStoreFactory(new FactoryBuilder.SingletonFactory(new TestStore())); + cfg.setReadThrough(true); + cfg.setWriteThrough(true); + cfg.setLoadPreviousValue(true); + + return null; + } + }; + + startGrid(1); + + initCache = new C1<CacheConfiguration, Void>() { + /** {@inheritDoc} */ + @Override public Void apply(CacheConfiguration cfg) { + cfg.setAffinity(new GridCacheConsistentHashAffinityFunction()); + + cfg.setEvictionPolicy(new GridCacheLruEvictionPolicy()); + + cfg.setCacheStoreFactory(null); + + return null; + } + }; + + startGrid(2); + } + + /** + * @throws Exception If failed. + */ + public void testIgnoreStoreMismatchForAtomicClientCache() throws Exception { + cacheEnabled = true; + + cacheMode = PARTITIONED; + + initCache = new C1<CacheConfiguration, Void>() { + @SuppressWarnings("unchecked") + @Override public Void apply(CacheConfiguration cc) { + cc.setAtomicityMode(ATOMIC); + + cc.setDistributionMode(PARTITIONED_ONLY); + + cc.setCacheStoreFactory(new FactoryBuilder.SingletonFactory(new TestStore())); + cc.setReadThrough(true); + cc.setWriteThrough(true); + cc.setLoadPreviousValue(true); + + return null; + } + }; + + startGrid(1); + + initCache = new C1<CacheConfiguration, Void>() { + @Override public Void apply(CacheConfiguration cc) { + cc.setAtomicityMode(ATOMIC); + cc.setDistributionMode(CLIENT_ONLY); + cc.setCacheStoreFactory(null); + + return null; + } + }; + + startGrid(2); + } + + /** + * @throws Exception If failed. + */ + public void testStoreCheckAtomic() throws Exception { + cacheEnabled = true; + + cacheMode = PARTITIONED; + + initCache = new C1<CacheConfiguration, Void>() { + @SuppressWarnings("unchecked") + @Override public Void apply(CacheConfiguration cc) { + cc.setAtomicityMode(ATOMIC); + + cc.setDistributionMode(PARTITIONED_ONLY); + + cc.setCacheStoreFactory(new FactoryBuilder.SingletonFactory(new TestStore())); + cc.setReadThrough(true); + cc.setWriteThrough(true); + cc.setLoadPreviousValue(true); + + return null; + } + }; + + startGrid(1); + + initCache = new C1<CacheConfiguration, Void>() { + @Override public Void apply(CacheConfiguration cc) { + cc.setAtomicityMode(ATOMIC); + cc.setDistributionMode(PARTITIONED_ONLY); + cc.setCacheStoreFactory(null); + + return null; + } + }; + + GridTestUtils.assertThrows(log, new IgniteCallable<Object>() { + @Override public Object call() throws Exception { + startGrid(2); + + return null; + } + }, IgniteCheckedException.class, null); + } + + /** + * @throws Exception If failed. + */ + public void testStoreCheckTransactional() throws Exception { + cacheEnabled = true; + + cacheMode = PARTITIONED; + + initCache = new C1<CacheConfiguration, Void>() { + @SuppressWarnings("unchecked") + @Override public Void apply(CacheConfiguration cc) { + cc.setAtomicityMode(TRANSACTIONAL); + + cc.setDistributionMode(PARTITIONED_ONLY); + + cc.setCacheStoreFactory(new FactoryBuilder.SingletonFactory(new TestStore())); + cc.setReadThrough(true); + cc.setWriteThrough(true); + cc.setLoadPreviousValue(true); + + return null; + } + }; + + startGrid(1); + + initCache = new C1<CacheConfiguration, Void>() { + @Override public Void apply(CacheConfiguration cc) { + cc.setAtomicityMode(TRANSACTIONAL); + + cc.setDistributionMode(PARTITIONED_ONLY); + + cc.setCacheStoreFactory(null); + + return null; + } + }; + + GridTestUtils.assertThrows(log, new IgniteCallable<Object>() { + @Override public Object call() throws Exception { + startGrid(2); + + return null; + } + }, IgniteCheckedException.class, null); + } + + /** + * @throws Exception If failed. + */ + public void testStoreCheckTransactionalClient() throws Exception { + cacheEnabled = true; + + cacheMode = PARTITIONED; + + initCache = new C1<CacheConfiguration, Void>() { + @SuppressWarnings("unchecked") + @Override public Void apply(CacheConfiguration cc) { + cc.setAtomicityMode(TRANSACTIONAL); + + cc.setDistributionMode(PARTITIONED_ONLY); + + cc.setCacheStoreFactory(new FactoryBuilder.SingletonFactory(new TestStore())); + cc.setReadThrough(true); + cc.setWriteThrough(true); + cc.setLoadPreviousValue(true); + + return null; + } + }; + + startGrid(1); + + initCache = new C1<CacheConfiguration, Void>() { + @Override public Void apply(CacheConfiguration cc) { + cc.setAtomicityMode(TRANSACTIONAL); + + cc.setDistributionMode(CLIENT_ONLY); + + cc.setCacheStoreFactory(null); + + return null; + } + }; + + GridTestUtils.assertThrows(log, new IgniteCallable<Object>() { + @Override public Object call() throws Exception { + startGrid(2); + + return null; + } + }, IgniteCheckedException.class, null); + } + + /** + * @throws Exception If failed. + */ + public void testAffinityForReplicatedCache() throws Exception { + cacheEnabled = true; + + aff = new GridCachePartitionFairAffinity(); // Check cannot use GridCachePartitionFairAffinity. + + GridTestUtils.assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + return startGrid(1); + } + }, IgniteCheckedException.class, null); + + aff = new GridCacheConsistentHashAffinityFunction(true); // Check cannot set 'excludeNeighbors' flag. + backups = Integer.MAX_VALUE; + + GridTestUtils.assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + return startGrid(1); + } + }, IgniteCheckedException.class, null); + + aff = new GridCacheConsistentHashAffinityFunction(false, 100); + + startGrid(1); + + // Try to start node with different number of partitions. + aff = new GridCacheConsistentHashAffinityFunction(false, 200); + + GridTestUtils.assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + return startGrid(2); + } + }, IgniteCheckedException.class, "Affinity partitions count mismatch"); + } + + /** + * @throws Exception If failed. + */ + public void testDifferentInterceptors() throws Exception { + cacheMode = PARTITIONED; + + checkSecondGridStartFails( + new C1<CacheConfiguration, Void>() { + @Override public Void apply(CacheConfiguration cfg) { + cfg.setInterceptor(new GridCacheInterceptorAdapter() {/*No-op.*/}); + + return null; + } + }, + new C1<CacheConfiguration, Void>() { + @Override public Void apply(CacheConfiguration cfg) { + return null; + } + } + ); + } + + /** + * @param initCache1 Closure. + * @param initCache2 Closure. + * @throws Exception If failed. + */ + private void checkSecondGridStartFails(C1<CacheConfiguration, Void> initCache1, + C1<CacheConfiguration, Void> initCache2) throws Exception { + cacheEnabled = true; + + initCache = initCache1; + + startGrid(1); + + initCache = initCache2; + + GridTestUtils.assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + return startGrid(2); + } + }, IgniteCheckedException.class, null); + } + + /** */ + private static class TestStore extends CacheStore<Object,Object> { + /** {@inheritDoc} */ + @Nullable @Override public Object load(Object key) { + return null; + } + + /** {@inheritDoc} */ + @Override public void loadCache(IgniteBiInClosure<Object, Object> clo, @Nullable Object... args) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public Map<Object, Object> loadAll(Iterable<?> keys) throws CacheLoaderException { + return Collections.emptyMap(); + } + + /** {@inheritDoc} */ + @Override public void write(Cache.Entry<?, ?> entry) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void writeAll(Collection<Cache.Entry<?, ?>> entries) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void delete(Object key) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void deleteAll(Collection<?> keys) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void txEnd(boolean commit) { + // No-op. + } + } +}