http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheGroupLockAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheGroupLockAbstractSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheGroupLockAbstractSelfTest.java deleted file mode 100644 index 7ac1f0c..0000000 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheGroupLockAbstractSelfTest.java +++ /dev/null @@ -1,1328 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.cache; - -import org.apache.ignite.*; -import org.apache.ignite.cache.*; -import org.apache.ignite.cache.affinity.*; -import org.apache.ignite.cache.store.*; -import org.apache.ignite.configuration.*; -import org.apache.ignite.events.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.processors.cache.*; -import org.apache.ignite.internal.util.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.spi.communication.tcp.*; -import org.apache.ignite.transactions.*; -import org.apache.ignite.spi.discovery.tcp.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.testframework.*; -import org.apache.ignite.testframework.junits.common.*; -import org.jdk8.backport.*; - -import javax.cache.*; -import javax.cache.configuration.*; -import java.util.*; -import java.util.concurrent.*; -import java.util.concurrent.atomic.*; - -import static org.apache.ignite.events.IgniteEventType.*; -import static org.apache.ignite.cache.GridCacheAtomicityMode.*; -import static org.apache.ignite.cache.GridCacheDistributionMode.*; -import static org.apache.ignite.transactions.IgniteTxConcurrency.*; -import static org.apache.ignite.transactions.IgniteTxIsolation.*; - -/** - * Test for group locking. - */ -public abstract class GridCacheGroupLockAbstractSelfTest extends GridCommonAbstractTest { - /** */ - private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); - - /** Event wait timeout. */ - private static final int WAIT_TIMEOUT = 3000; - - /** */ - private TestStore store; - - /** @return Grid count to run in test. */ - protected int gridCount() { - return 1; - } - - /** @return Whether near cache is enabled. */ - protected abstract boolean nearEnabled(); - - /** @return Cache mode for test. */ - protected abstract GridCacheMode cacheMode(); - - /** {@inheritDoc} */ - @SuppressWarnings("unchecked") - @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration cfg = super.getConfiguration(gridName); - - CacheConfiguration cacheCfg = defaultCacheConfiguration(); - - cacheCfg.setCacheMode(cacheMode()); - cacheCfg.setDistributionMode(nearEnabled() ? NEAR_PARTITIONED : PARTITIONED_ONLY); - cacheCfg.setWriteSynchronizationMode(GridCacheWriteSynchronizationMode.FULL_SYNC); - cacheCfg.setAtomicityMode(TRANSACTIONAL); - - cacheCfg.setCacheStoreFactory(new Factory<CacheStore<? super Object, ? super Object>>() { - @Override public CacheStore<? super Object, ? super Object> create() { - return store; - } - }); - cacheCfg.setReadThrough(true); - cacheCfg.setWriteThrough(true); - cacheCfg.setLoadPreviousValue(true); - - cfg.setCacheConfiguration(cacheCfg); - cfg.setCacheSanityCheckEnabled(sanityCheckEnabled()); - - TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); - - discoSpi.setIpFinder(ipFinder); - - cfg.setDiscoverySpi(discoSpi); - - TcpCommunicationSpi commSpi = new TcpCommunicationSpi(); - - commSpi.setSharedMemoryPort(-1); - - cfg.setCommunicationSpi(commSpi); - - return cfg; - } - - /** {@inheritDoc} */ - @Override protected void beforeTest() throws Exception { - store = new TestStore(); - - startGridsMultiThreaded(gridCount()); - } - - /** {@inheritDoc} */ - @Override protected void afterTest() throws Exception { - stopAllGrids(true); - } - - /** {@inheritDoc} */ - @Override protected void afterTestsStopped() throws Exception { - store = null; - - super.afterTestsStopped(); - } - - /** - * @throws Exception If failed. - */ - public void testGroupLockPutOneKeyOptimistic() throws Exception { - checkGroupLockPutOneKey(OPTIMISTIC); - } - - /** - * @throws Exception If failed. - */ - public void testGroupLockPutOneKeyPessimistic() throws Exception { - checkGroupLockPutOneKey(PESSIMISTIC); - } - - /** - * @param concurrency Transaction concurrency mode. - * @throws Exception If failed. - */ - private void checkGroupLockPutOneKey(IgniteTxConcurrency concurrency) throws Exception { - CollectingEventListener locks = new CollectingEventListener(); - CollectingEventListener unlocks = new CollectingEventListener(); - - grid(0).events().localListen(locks, EVT_CACHE_OBJECT_LOCKED); - grid(0).events().localListen(unlocks, EVT_CACHE_OBJECT_UNLOCKED); - - UUID affinityKey = primaryKeyForCache(grid(0)); - - GridCache<GridCacheAffinityKey<String>, String> cache = grid(0).cache(null); - - GridCacheAffinityKey<String> key1; - GridCacheAffinityKey<String> key2; - - try (IgniteTx tx = cache.txStartAffinity(affinityKey, concurrency, READ_COMMITTED, 0, 2)) { - if (concurrency == PESSIMISTIC) - assertTrue("Failed to wait for lock events: " + affinityKey, locks.awaitKeys(WAIT_TIMEOUT, affinityKey)); - else - assertEquals("Unexpected number of lock events: " + locks.affectedKeys(), 0, locks.affectedKeys().size()); - - assertEquals("Unexpected number of unlock events: " + unlocks.affectedKeys(), 0, unlocks.affectedKeys().size()); - - key1 = new GridCacheAffinityKey<>("key1", affinityKey); - key2 = new GridCacheAffinityKey<>("key2", affinityKey); - - cache.putAll(F.asMap( - key1, "val1", - key2, "val2") - ); - - tx.commit(); - } - - // Check that there are no further locks after transaction commit. - assertEquals("Unexpected number of lock events: " + locks.affectedKeys(), 1, locks.affectedKeys().size()); - assertTrue("Failed to wait for unlock events: " + affinityKey, unlocks.awaitKeys(WAIT_TIMEOUT, affinityKey)); - - for (int i = 0; i < gridCount(); i++) { - Ignite g = grid(i); - - GridCache<Object, Object> gCache = g.cache(null); - - if (gCache.affinity().isPrimaryOrBackup(g.cluster().localNode(), key1)) - assertEquals("For index: " + i, "val1", gCache.peek(key1)); - - if (gCache.affinity().isPrimaryOrBackup(g.cluster().localNode(), key2)) - assertEquals("For index: " + i, "val2", gCache.peek(key2)); - } - } - - /** - * @throws Exception If failed. - */ - public void testGroupLockRemoveOneKeyOptimistic() throws Exception { - checkGroupLockRemoveOneKey(OPTIMISTIC); - } - - /** - * @throws Exception If failed. - */ - public void testGroupLockRemoveOneKeyPessimistic() throws Exception { - checkGroupLockRemoveOneKey(PESSIMISTIC); - } - - /** - * @param concurrency Transaction concurrency mode. - * @throws Exception If failed. - */ - private void checkGroupLockRemoveOneKey(IgniteTxConcurrency concurrency) throws Exception { - CollectingEventListener locks = new CollectingEventListener(); - CollectingEventListener unlocks = new CollectingEventListener(); - - UUID affinityKey = primaryKeyForCache(grid(0)); - - GridCacheAffinityKey<String> key1 = new GridCacheAffinityKey<>("key1", affinityKey); - GridCacheAffinityKey<String> key2 = new GridCacheAffinityKey<>("key2", affinityKey); - - GridCache<GridCacheAffinityKey<String>, String> cache = grid(0).cache(null); - - // Populate cache. - cache.putAll(F.asMap( - key1, "val1", - key2, "val2") - ); - - for (int i = 0; i < gridCount(); i++) { - Ignite g = grid(i); - - GridCache<Object, Object> gCache = g.cache(null); - - if (gCache.affinity().isPrimaryOrBackup(g.cluster().localNode(), key1)) - assertEquals("For index: " + i, "val1", gCache.peek(key1)); - - if (gCache.affinity().isPrimaryOrBackup(g.cluster().localNode(), key2)) - assertEquals("For index: " + i, "val2", gCache.peek(key2)); - } - - grid(0).events().localListen(locks, EVT_CACHE_OBJECT_LOCKED); - grid(0).events().localListen(unlocks, EVT_CACHE_OBJECT_UNLOCKED); - - try (IgniteTx tx = cache.txStartAffinity(affinityKey, concurrency, READ_COMMITTED, 0, 2)) { - if (concurrency == PESSIMISTIC) - assertTrue("Failed to wait for lock events: " + affinityKey, locks.awaitKeys(WAIT_TIMEOUT, affinityKey)); - else - assertEquals("Unexpected number of lock events: " + locks.affectedKeys(), 0, locks.affectedKeys().size()); - - assertEquals("Unexpected number of unlock events: " + unlocks.affectedKeys(), 0, unlocks.affectedKeys().size()); - - - cache.removeAll(F.asList(key1, key2)); - - tx.commit(); - } - - // Check that there are no further locks after transaction commit. - assertEquals("Unexpected number of lock events: " + locks.affectedKeys(), 1, locks.affectedKeys().size()); - assertTrue("Failed to wait for unlock events: " + affinityKey, unlocks.awaitKeys(WAIT_TIMEOUT, affinityKey)); - - for (int i = 0; i < gridCount(); i++) { - Ignite g = grid(i); - - GridCache<Object, Object> gCache = g.cache(null); - - if (gCache.affinity().isPrimaryOrBackup(g.cluster().localNode(), key1)) - assertNull("For index: " + i, gCache.peek(key1)); - - if (gCache.affinity().isPrimaryOrBackup(g.cluster().localNode(), key2)) - assertNull("For index: " + i, gCache.peek(key2)); - } - } - - /** - * @throws Exception If failed. - */ - public void testGroupLockGetOneKeyOptimistic() throws Exception { - checkGroupLockGetOneKey(OPTIMISTIC); - } - - /** - * @throws Exception If failed. - */ - public void testGroupLockGetOneKeyPessimistic() throws Exception { - checkGroupLockGetOneKey(PESSIMISTIC); - } - - /** - * @param concurrency Transaction concurrency mode. - * @throws Exception If failed. - */ - private void checkGroupLockGetOneKey(IgniteTxConcurrency concurrency) throws Exception { - CollectingEventListener locks = new CollectingEventListener(); - CollectingEventListener unlocks = new CollectingEventListener(); - - UUID affinityKey = primaryKeyForCache(grid(0)); - - GridCacheAffinityKey<String> key1 = new GridCacheAffinityKey<>("key1", affinityKey); - GridCacheAffinityKey<String> key2 = new GridCacheAffinityKey<>("key2", affinityKey); - - GridCache<GridCacheAffinityKey<String>, String> cache = grid(0).cache(null); - - // Populate cache. - cache.putAll(F.asMap( - key1, "val1", - key2, "val2") - ); - - for (int i = 0; i < gridCount(); i++) { - Ignite g = grid(i); - - GridCache<Object, Object> gCache = g.cache(null); - - if (gCache.affinity().isPrimaryOrBackup(g.cluster().localNode(), key1)) - assertEquals("For index: " + i, "val1", gCache.peek(key1)); - - if (gCache.affinity().isPrimaryOrBackup(g.cluster().localNode(), key2)) - assertEquals("For index: " + i, "val2", gCache.peek(key2)); - } - - grid(0).events().localListen(locks, EVT_CACHE_OBJECT_LOCKED); - grid(0).events().localListen(unlocks, EVT_CACHE_OBJECT_UNLOCKED); - - try (IgniteTx tx = cache.txStartAffinity(affinityKey, concurrency, READ_COMMITTED, 0, 2)) { - if (concurrency == PESSIMISTIC) - assertTrue("Failed to wait for lock events: " + affinityKey, locks.awaitKeys(WAIT_TIMEOUT, affinityKey)); - else - assertEquals("Unexpected number of lock events: " + locks.affectedKeys(), 0, locks.affectedKeys().size()); - - assertEquals("Unexpected number of unlock events: " + unlocks.affectedKeys(), 0, unlocks.affectedKeys().size()); - - assertEquals("val1", cache.get(key1)); - - assertEquals("val2", cache.get(key2)); - - tx.commit(); - } - - // Check that there are no further locks after transaction commit. - assertEquals("Unexpected number of lock events: " + locks.affectedKeys(), 1, locks.affectedKeys().size()); - assertTrue("Failed to wait for unlock events: " + affinityKey, unlocks.awaitKeys(WAIT_TIMEOUT, affinityKey)); - } - - /** - * @throws Exception If failed. - */ - public void testGroupLockWithExternalLockOptimistic() throws Exception { - checkGroupLockWithExternalLock(OPTIMISTIC); - } - - /** - * @throws Exception If failed. - */ - public void testGroupLockWithExternalLockPessimistic() throws Exception { - checkGroupLockWithExternalLock(PESSIMISTIC); - } - - /** - * @param concurrency Transaction concurrency mode. - * @throws Exception If failed. - */ - private void checkGroupLockWithExternalLock(final IgniteTxConcurrency concurrency) throws Exception { - assert sanityCheckEnabled(); - - final UUID affinityKey = primaryKeyForCache(grid(0)); - - final GridCacheAffinityKey<String> key1 = new GridCacheAffinityKey<>("key1", affinityKey); - - final IgniteCache<GridCacheAffinityKey<String>, String> cache = grid(0).jcache(null); - - // Populate cache. - cache.put(key1, "val1"); - - for (int i = 0; i < gridCount(); i++) { - Ignite g = grid(i); - - GridCache<Object, Object> gCache = g.cache(null); - - if (gCache.affinity().isPrimaryOrBackup(g.cluster().localNode(), key1)) - assertEquals("For index: " + i, "val1", gCache.peek(key1)); - } - - final CountDownLatch unlockLatch = new CountDownLatch(1); - final CountDownLatch lockLatch = new CountDownLatch(1); - - IgniteFuture<?> fut = multithreadedAsync(new Runnable() { - @Override public void run() { - try { - cache.lock(key1).lock(); - - try { - lockLatch.countDown(); - unlockLatch.await(); - } - finally { - cache.lock(key1).unlock(); - } - } - catch (CacheException e) { - fail(e.getMessage()); - } - catch (InterruptedException ignored) { - Thread.currentThread().interrupt(); - } - } - }, 1); - - try { - lockLatch.await(); - - GridTestUtils.assertThrows(log, new Callable<Object>() { - @Override public Object call() throws Exception { - try (IgniteTx tx = grid(0).transactions().txStartAffinity(null, affinityKey, concurrency, - READ_COMMITTED, 0, 1)) { - cache.put(key1, "val01"); - - tx.commit(); - } - - return null; - } - }, IgniteTxHeuristicException.class, null); - } - finally { - unlockLatch.countDown(); - - fut.get(); - } - } - - /** - * @throws Exception If failed. - */ - public void testSanityCheckDisabledOptimistic() throws Exception { - checkSanityCheckDisabled(OPTIMISTIC); - } - - /** - * @throws Exception If failed. - */ - public void testSanityCheckDisabledPessimistic() throws Exception { - checkSanityCheckDisabled(PESSIMISTIC); - } - - /** - * @param concurrency Transaction concurrency mode. - * @throws Exception If failed. - */ - private void checkSanityCheckDisabled(final IgniteTxConcurrency concurrency) throws Exception { - assert !sanityCheckEnabled(); - - GridEx grid = grid(0); - - final UUID affinityKey = primaryKeyForCache(grid); - - final GridCacheAffinityKey<String> key1 = new GridCacheAffinityKey<>("key1", affinityKey); - - final IgniteCache<GridCacheAffinityKey<String>, String> cache = grid.jcache(null); - - // Populate cache. - cache.put(key1, "val1"); - - for (int i = 0; i < gridCount(); i++) { - Ignite g = grid(i); - - GridCache<Object, Object> gCache = g.cache(null); - - if (gCache.affinity().isPrimaryOrBackup(g.cluster().localNode(), key1)) - assertEquals("For index: " + i, "val1", gCache.peek(key1)); - } - - cache.lock(key1).lock(); - - try { - try (IgniteTx tx = grid.transactions().txStartAffinity(null, affinityKey, concurrency, READ_COMMITTED, 0, 1)) { - cache.put(key1, "val01"); - - tx.commit(); - } - - for (int i = 0; i < gridCount(); i++) { - Ignite g = grid(i); - - GridCache<Object, Object> gCache = g.cache(null); - - if (gCache.affinity().isPrimaryOrBackup(g.cluster().localNode(), key1)) - assertEquals("For index: " + i, "val01", gCache.peek(key1)); - } - } - finally { - cache.lock(key1).unlock(); - } - } - - /** - * @throws Exception If failed. - */ - public void testGroupPartitionLockOptimistic() throws Exception { - checkGroupPartitionLock(OPTIMISTIC); - } - - /** - * @throws Exception If failed. - */ - public void testGroupPartitionLockPessimistic() throws Exception { - checkGroupPartitionLock(PESSIMISTIC); - } - - /** - * @param concurrency Transaction concurrency mode. - * @throws Exception If failed. - */ - private void checkGroupPartitionLock(IgniteTxConcurrency concurrency) throws Exception { - CollectingEventListener locks = new CollectingEventListener(); - CollectingEventListener unlocks = new CollectingEventListener(); - - grid(0).events().localListen(locks, EVT_CACHE_OBJECT_LOCKED); - grid(0).events().localListen(unlocks, EVT_CACHE_OBJECT_UNLOCKED); - - UUID affinityKey = primaryKeyForCache(grid(0)); - - GridCache<UUID, String> cache = grid(0).cache(null); - - UUID key1; - UUID key2; - - try (IgniteTx tx = cache.txStartPartition(cache.affinity().partition(affinityKey), concurrency, - READ_COMMITTED, 0, 2)) { - // Note that events are not generated for internal keys. - assertEquals("Unexpected number of lock events: " + locks.affectedKeys(), 0, locks.affectedKeys().size()); - assertEquals("Unexpected number of unlock events: " + unlocks.affectedKeys(), 0, - unlocks.affectedKeys().size()); - - GridCacheAdapter<Object, Object> cacheAdapter = ((GridKernal)grid(0)).internalCache(); - - GridCacheAffinityManager<Object, Object> affMgr = cacheAdapter.context().affinity(); - - GridPartitionLockKey partAffKey = affMgr.partitionAffinityKey(cache.affinity().partition(affinityKey)); - - if (concurrency == PESSIMISTIC) - assertTrue(cacheAdapter.entryEx(partAffKey).lockedByThread()); - - do { - key1 = UUID.randomUUID(); - } - while (cache.affinity().partition(key1) != cache.affinity().partition(affinityKey)); - - do { - key2 = UUID.randomUUID(); - } - while (cache.affinity().partition(key2) != cache.affinity().partition(affinityKey)); - - cache.putAll(F.asMap( - key1, "val1", - key2, "val2") - ); - - tx.commit(); - } - - // Check that there are no further locks after transaction commit. - assertEquals("Unexpected number of lock events: " + locks.affectedKeys(), 0, locks.affectedKeys().size()); - assertEquals("Unexpected number of unlock events: " + unlocks.affectedKeys(), 0, unlocks.affectedKeys().size()); - - for (int i = 0; i < gridCount(); i++) { - Ignite g = grid(i); - - GridCache<Object, Object> gCache = g.cache(null); - - if (gCache.affinity().isPrimaryOrBackup(g.cluster().localNode(), key1)) - assertEquals("For index: " + i, "val1", gCache.peek(key1)); - - if (gCache.affinity().isPrimaryOrBackup(g.cluster().localNode(), key2)) - assertEquals("For index: " + i, "val2", gCache.peek(key2)); - } - } - - /** - * @throws Exception If failed. - */ - public void testGetPutOptimisticReadCommitted() throws Exception { - checkGetPut(OPTIMISTIC, READ_COMMITTED); - } - - /** - * @throws Exception If failed. - */ - public void testGetPutOptimisticRepeatableRead() throws Exception { - checkGetPut(OPTIMISTIC, REPEATABLE_READ); - } - - /** - * @throws Exception If failed. - */ - public void testGetPutPessimisticReadCommitted() throws Exception { - checkGetPut(PESSIMISTIC, READ_COMMITTED); - } - - /** - * @throws Exception If failed. - */ - public void testGetPutPessimisticRepeatableRead() throws Exception { - checkGetPut(PESSIMISTIC, REPEATABLE_READ); - } - - /** - * @throws Exception If failed. - */ - public void testGetPutEmptyCachePessimisticReadCommitted() throws Exception { - checkGetPutEmptyCache(PESSIMISTIC, READ_COMMITTED); - } - - /** - * @throws Exception If failed. - */ - public void testGetPutEmptyCachePessimisticRepeatableRead() throws Exception { - checkGetPutEmptyCache(PESSIMISTIC, REPEATABLE_READ); - } - - /** - * @throws Exception If failed. - */ - public void testGetPutEmptyCacheOptimisticReadCommitted() throws Exception { - checkGetPutEmptyCache(OPTIMISTIC, READ_COMMITTED); - } - - /** - * @throws Exception If failed. - */ - public void testGetPutEmptyCacheOptimisticRepeatableRead() throws Exception { - checkGetPutEmptyCache(OPTIMISTIC, REPEATABLE_READ); - } - - /** - * @param concurrency Transaction concurrency mode. - * @param isolation Transaction isolation mode. - * @throws Exception If failed. - */ - private void checkGetPut(IgniteTxConcurrency concurrency, IgniteTxIsolation isolation) throws Exception { - CollectingEventListener locks = new CollectingEventListener(); - CollectingEventListener unlocks = new CollectingEventListener(); - - UUID affinityKey = primaryKeyForCache(grid(0)); - - GridCacheAffinityKey<String> key1 = new GridCacheAffinityKey<>("key1", affinityKey); - GridCacheAffinityKey<String> key2 = new GridCacheAffinityKey<>("key2", affinityKey); - - GridCache<GridCacheAffinityKey<String>, String> cache = grid(0).cache(null); - - // Populate cache. - cache.putAll(F.asMap( - key1, "val1", - key2, "val2") - ); - - for (int i = 0; i < gridCount(); i++) { - Ignite g = grid(i); - - GridCache<Object, Object> gCache = g.cache(null); - - if (gCache.affinity().isPrimaryOrBackup(g.cluster().localNode(), key1)) - assertEquals("For index: " + i, "val1", gCache.peek(key1)); - - if (gCache.affinity().isPrimaryOrBackup(g.cluster().localNode(), key2)) - assertEquals("For index: " + i, "val2", gCache.peek(key2)); - } - - grid(0).events().localListen(locks, EVT_CACHE_OBJECT_LOCKED); - grid(0).events().localListen(unlocks, EVT_CACHE_OBJECT_UNLOCKED); - - try (IgniteTx tx = cache.txStartAffinity(affinityKey, concurrency, isolation, 0, 2)) { - if (concurrency == PESSIMISTIC) - assertTrue("Failed to wait for lock events: " + affinityKey, locks.awaitKeys(WAIT_TIMEOUT, affinityKey)); - else - assertEquals("Unexpected number of lock events: " + locks.affectedKeys(), 0, locks.affectedKeys().size()); - - assertEquals("Unexpected number of unlock events: " + unlocks.affectedKeys(), 0, unlocks.affectedKeys().size()); - - assertEquals("val1", cache.get(key1)); - - assertEquals("val2", cache.get(key2)); - - cache.put(key1, "val01"); - - cache.put(key2, "val02"); - - tx.commit(); - } - - // Check that there are no further locks after transaction commit. - assertEquals("Unexpected number of lock events: " + locks.affectedKeys(), 1, locks.affectedKeys().size()); - assertTrue("Failed to wait for unlock events: " + affinityKey, unlocks.awaitKeys(WAIT_TIMEOUT, affinityKey)); - } - - /** - * @param concurrency Transaction concurrency mode. - * @param isolation Transaction isolation mode. - * @throws Exception If failed. - */ - private void checkGetPutEmptyCache(IgniteTxConcurrency concurrency, IgniteTxIsolation isolation) throws Exception { - CollectingEventListener locks = new CollectingEventListener(); - CollectingEventListener unlocks = new CollectingEventListener(); - - UUID affinityKey = primaryKeyForCache(grid(0)); - - GridCacheAffinityKey<String> key1 = new GridCacheAffinityKey<>("key1", affinityKey); - GridCacheAffinityKey<String> key2 = new GridCacheAffinityKey<>("key2", affinityKey); - - GridCache<GridCacheAffinityKey<String>, String> cache = grid(0).cache(null); - - grid(0).events().localListen(locks, EVT_CACHE_OBJECT_LOCKED); - grid(0).events().localListen(unlocks, EVT_CACHE_OBJECT_UNLOCKED); - - try (IgniteTx tx = cache.txStartAffinity(affinityKey, concurrency, isolation, 0, 2)) { - if (concurrency == PESSIMISTIC) - assertTrue("Failed to wait for lock events: " + affinityKey, locks.awaitKeys(WAIT_TIMEOUT, affinityKey)); - else - assertEquals("Unexpected number of lock events: " + locks.affectedKeys(), 0, locks.affectedKeys().size()); - - assertEquals("Unexpected number of unlock events: " + unlocks.affectedKeys(), 0, unlocks.affectedKeys().size()); - - assertEquals(null, cache.get(key1)); - - assertEquals(null, cache.get(key2)); - - cache.put(key1, "val01"); - - cache.put(key2, "val02"); - - tx.commit(); - } - - // Check that there are no further locks after transaction commit. - assertEquals("Unexpected number of lock events: " + locks.affectedKeys(), 1, locks.affectedKeys().size()); - assertTrue("Failed to wait for unlock events: " + affinityKey, unlocks.awaitKeys(WAIT_TIMEOUT, affinityKey)); - - for (int i = 0; i < gridCount(); i++) { - Ignite g = grid(i); - - GridCache<Object, Object> gCache = g.cache(null); - - if (gCache.affinity().isPrimaryOrBackup(g.cluster().localNode(), key1)) - assertEquals("For index: " + i, "val01", gCache.peek(key1)); - - if (gCache.affinity().isPrimaryOrBackup(g.cluster().localNode(), key2)) - assertEquals("For index: " + i, "val02", gCache.peek(key2)); - } - } - - /** - * @throws Exception If failed. - */ - public void testGetRemoveOptimisticReadCommitted() throws Exception { - checkGetRemove(OPTIMISTIC, READ_COMMITTED); - } - - /** - * @throws Exception If failed. - */ - public void testGetRemoveOptimisticRepeatableRead() throws Exception { - checkGetRemove(OPTIMISTIC, REPEATABLE_READ); - } - - /** - * @throws Exception If failed. - */ - public void testGetRemovePessimisticReadCommitted() throws Exception { - checkGetRemove(PESSIMISTIC, READ_COMMITTED); - } - - /** - * @throws Exception If failed. - */ - public void testGetRemovePessimisticRepeatableRead() throws Exception { - checkGetRemove(PESSIMISTIC, REPEATABLE_READ); - } - - /** - * @param concurrency Transaction concurrency mode. - * @param isolation Transaction isolation mode. - * @throws Exception If failed. - */ - private void checkGetRemove(IgniteTxConcurrency concurrency, IgniteTxIsolation isolation) throws Exception { - CollectingEventListener locks = new CollectingEventListener(); - CollectingEventListener unlocks = new CollectingEventListener(); - - UUID affinityKey = primaryKeyForCache(grid(0)); - - GridCacheAffinityKey<String> key1 = new GridCacheAffinityKey<>("key1", affinityKey); - GridCacheAffinityKey<String> key2 = new GridCacheAffinityKey<>("key2", affinityKey); - - GridCache<GridCacheAffinityKey<String>, String> cache = grid(0).cache(null); - - // Populate cache. - cache.putAll(F.asMap( - key1, "val1", - key2, "val2") - ); - - for (int i = 0; i < gridCount(); i++) { - Ignite g = grid(i); - - GridCache<Object, Object> gCache = g.cache(null); - - if (gCache.affinity().isPrimaryOrBackup(g.cluster().localNode(), key1)) - assertEquals("For index: " + i, "val1", gCache.peek(key1)); - - if (gCache.affinity().isPrimaryOrBackup(g.cluster().localNode(), key2)) - assertEquals("For index: " + i, "val2", gCache.peek(key2)); - } - - grid(0).events().localListen(locks, EVT_CACHE_OBJECT_LOCKED); - grid(0).events().localListen(unlocks, EVT_CACHE_OBJECT_UNLOCKED); - - try (IgniteTx tx = cache.txStartAffinity(affinityKey, concurrency, isolation, 0, 2)) { - if (concurrency == PESSIMISTIC) - assertTrue("Failed to wait for lock events: " + affinityKey, locks.awaitKeys(WAIT_TIMEOUT, affinityKey)); - else - assertEquals("Unexpected number of lock events: " + locks.affectedKeys(), 0, locks.affectedKeys().size()); - - assertEquals("Unexpected number of unlock events: " + unlocks.affectedKeys(), 0, unlocks.affectedKeys().size()); - - assertEquals("val1", cache.get(key1)); - - assertEquals("val2", cache.get(key2)); - - cache.remove(key1); - - cache.remove(key2); - - tx.commit(); - } - - for (int i = 0; i < gridCount(); i++) { - assertNull("For cache [i=" + i + ", val=" + cache(i).peek(key1) + ']', cache(i).peek(key1)); - assertNull("For cache [i=" + i + ", val=" + cache(i).peek(key2) + ']', cache(i).peek(key2)); - - assertTrue("For cache [idx=" + i + ", keySet=" + cache(i).keySet() + ']', cache(i).size() <= 1); - } - - // Check that there are no further locks after transaction commit. - assertEquals("Unexpected number of lock events: " + locks.affectedKeys(), 1, locks.affectedKeys().size()); - assertTrue("Failed to wait for unlock events: " + affinityKey, unlocks.awaitKeys(WAIT_TIMEOUT, affinityKey)); - } - - /** - * @throws Exception If failed. - */ - public void testGetAfterPutOptimistic() throws Exception { - checkGetAfterPut(OPTIMISTIC); - } - - /** - * @throws Exception If failed. - */ - public void testGetAfterPut() throws Exception { - checkGetAfterPut(PESSIMISTIC); - } - - /** - * @param concurrency Transaction concurrency mode. - * @throws Exception If failed. - */ - private void checkGetAfterPut(IgniteTxConcurrency concurrency) throws Exception { - CollectingEventListener locks = new CollectingEventListener(); - CollectingEventListener unlocks = new CollectingEventListener(); - - UUID affinityKey = primaryKeyForCache(grid(0)); - - GridCacheAffinityKey<String> key1 = new GridCacheAffinityKey<>("key1", affinityKey); - GridCacheAffinityKey<String> key2 = new GridCacheAffinityKey<>("key2", affinityKey); - - GridCache<GridCacheAffinityKey<String>, String> cache = grid(0).cache(null); - - // Populate cache. - cache.putAll(F.asMap( - key1, "val1", - key2, "val2") - ); - - for (int i = 0; i < gridCount(); i++) { - Ignite g = grid(i); - - GridCache<Object, Object> gCache = g.cache(null); - - if (gCache.affinity().isPrimaryOrBackup(g.cluster().localNode(), key1)) - assertEquals("For index: " + i, "val1", gCache.peek(key1)); - - if (gCache.affinity().isPrimaryOrBackup(g.cluster().localNode(), key2)) - assertEquals("For index: " + i, "val2", gCache.peek(key2)); - } - - grid(0).events().localListen(locks, EVT_CACHE_OBJECT_LOCKED); - grid(0).events().localListen(unlocks, EVT_CACHE_OBJECT_UNLOCKED); - - try (IgniteTx tx = cache.txStartAffinity(affinityKey, concurrency, READ_COMMITTED, 0, 2)) { - if (concurrency == PESSIMISTIC) - assertTrue("Failed to wait for lock events: " + affinityKey, locks.awaitKeys(WAIT_TIMEOUT, affinityKey)); - else - assertEquals("Unexpected number of lock events: " + locks.affectedKeys(), 0, locks.affectedKeys().size()); - - assertEquals("Unexpected number of unlock events: " + unlocks.affectedKeys(), 0, unlocks.affectedKeys().size()); - - assertEquals("val1", cache.get(key1)); - - assertEquals("val2", cache.get(key2)); - - cache.put(key1, "val01"); - - cache.put(key2, "val02"); - - assertEquals("val01", cache.get(key1)); - - assertEquals("val02", cache.get(key2)); - - tx.commit(); - } - - // Check that there are no further locks after transaction commit. - assertEquals("Unexpected number of lock events: " + locks.affectedKeys(), 1, locks.affectedKeys().size()); - assertTrue("Failed to wait for unlock events: " + affinityKey, unlocks.awaitKeys(WAIT_TIMEOUT, affinityKey)); - - assertEquals("val01", cache.get(key1)); - - assertEquals("val02", cache.get(key2)); - } - - /** - * @throws Exception If failed. - */ - public void testGetRepeatableReadOptimistic() throws Exception { - checkGetRepeatableRead(OPTIMISTIC); - } - - /** - * @throws Exception If failed. - */ - public void testGetRepeatableReadPessimistic() throws Exception { - checkGetRepeatableRead(PESSIMISTIC); - } - - /** - * @param concurrency Transaction concurrency mode. - * @throws Exception If failed. - */ - private void checkGetRepeatableRead(IgniteTxConcurrency concurrency) throws Exception { - UUID key = primaryKeyForCache(grid(0)); - - cache(0).put(key, "val"); - - try (IgniteTx ignored = cache(0).txStartPartition(cache(0).affinity().partition(key), concurrency, - REPEATABLE_READ, 0, 1)) { - assertEquals("val", cache(0).get(key)); - } - } - - /** - * @throws Exception If failed. - */ - public void testGroupLockPutWrongKeyOptimistic() throws Exception { - checkGroupLockPutWrongKey(OPTIMISTIC); - } - - /** - * @throws Exception If failed. - */ - public void testGroupLockPutWrongKeyPessimistic() throws Exception { - checkGroupLockPutWrongKey(PESSIMISTIC); - } - - /** - * @param concurrency Transaction concurrency mode. - * @throws Exception If failed. - */ - private void checkGroupLockPutWrongKey(IgniteTxConcurrency concurrency) throws Exception { - UUID affinityKey = primaryKeyForCache(grid(0)); - - final GridCache<GridCacheAffinityKey<String>, String> cache = grid(0).cache(null); - - try (IgniteTx ignored = cache.txStartAffinity(affinityKey, concurrency, READ_COMMITTED, 0, 1)) { - // Key with affinity key different from enlisted on tx start should raise exception. - cache.put(new GridCacheAffinityKey<>("key1", UUID.randomUUID()), "val1"); - - fail("Exception should be thrown"); - } - catch (IgniteCheckedException ignored) { - // Expected exception. - } - - assertNull(cache.tx()); - } - - /** - * @throws Exception If failed. - */ - public void testGroupLockRemoveWrongKeyOptimistic() throws Exception { - checkGroupLockRemoveWrongKey(OPTIMISTIC); - } - - /** - * @throws Exception If failed. - */ - public void testGroupLockRemoveWrongKeyPessimistic() throws Exception { - checkGroupLockRemoveWrongKey(PESSIMISTIC); - } - - /** - * @param concurrency Transaction concurrency mode. - * @throws Exception If failed. - */ - private void checkGroupLockRemoveWrongKey(IgniteTxConcurrency concurrency) throws Exception { - UUID affinityKey = primaryKeyForCache(grid(0)); - - final GridCache<GridCacheAffinityKey<String>, String> cache = grid(0).cache(null); - - final GridCacheAffinityKey<String> key = new GridCacheAffinityKey<>("key1", UUID.randomUUID()); - - cache.put(key, "val"); - - try (IgniteTx ignored = cache.txStartAffinity(affinityKey, concurrency, READ_COMMITTED, 0, 1)) { - // Key with affinity key different from enlisted on tx start should raise exception. - cache.remove(key); - - fail("Exception should be thrown."); - } - catch (IgniteCheckedException ignored) { - // Expected exception. - } - - assertNull(cache.tx()); - } - - /** - * @throws Exception If failed. - */ - public void testGroupLockReadAffinityKeyPessimitsticRepeatableRead() throws Exception { - checkGroupLockReadAffinityKey(PESSIMISTIC, REPEATABLE_READ); - } - - /** - * @throws Exception If failed. - */ - public void testGroupLockReadAffinityKeyPessimitsticReadCommitted() throws Exception { - checkGroupLockReadAffinityKey(PESSIMISTIC, READ_COMMITTED); - } - - /** - * @throws Exception If failed. - */ - public void testGroupLockReadAffinityKeyOptimisticRepeatableRead() throws Exception { - checkGroupLockReadAffinityKey(OPTIMISTIC, REPEATABLE_READ); - } - - /** - * @throws Exception If failed. - */ - public void testGroupLockReadAffinityKeyOptimisticReadCommitted() throws Exception { - checkGroupLockReadAffinityKey(OPTIMISTIC, READ_COMMITTED); - } - - /** - * @param concurrency Concurrency. - * @param isolation Isolation. - * @throws Exception If failed. - */ - private void checkGroupLockReadAffinityKey(IgniteTxConcurrency concurrency, IgniteTxIsolation isolation) - throws Exception { - UUID affinityKey = primaryKeyForCache(grid(0)); - - final GridCache<Object, String> cache = grid(0).cache(null); - - final GridCacheAffinityKey<String> key1 = new GridCacheAffinityKey<>("key1", affinityKey); - final GridCacheAffinityKey<String> key2 = new GridCacheAffinityKey<>("key2", affinityKey); - - cache.put(affinityKey, "0"); - cache.put(key1, "0"); - cache.put(key2, "0"); - - try (IgniteTx tx = cache.txStartAffinity(affinityKey, concurrency, isolation, 0, 3)) { - assertEquals("0", cache.get(affinityKey)); - assertEquals("0", cache.get(key1)); - assertEquals("0", cache.get(key2)); - - cache.put(affinityKey, "1"); - cache.put(key1, "1"); - cache.put(key2, "1"); - - assertEquals("1", cache.get(affinityKey)); - assertEquals("1", cache.get(key1)); - assertEquals("1", cache.get(key2)); - - tx.commit(); - } - - assertEquals("1", cache.get(affinityKey)); - assertEquals("1", cache.get(key1)); - assertEquals("1", cache.get(key2)); - } - - /** - * @throws Exception If failed. - */ - public void testGroupLockWriteThroughBatchUpdateOptimistic() throws Exception { - checkGroupLockWriteThrough(OPTIMISTIC); - } - - /** - * @throws Exception If failed. - */ - public void testGroupLockWriteThroughBatchUpdatePessimistic() throws Exception { - checkGroupLockWriteThrough(PESSIMISTIC); - } - - /** - * @param concurrency Transaction concurrency mode. - * @throws Exception If failed. - */ - private void checkGroupLockWriteThrough(IgniteTxConcurrency concurrency) throws Exception { - UUID affinityKey = primaryKeyForCache(grid(0)); - - GridCache<GridCacheAffinityKey<String>, String> cache = grid(0).cache(null); - - GridCacheAffinityKey<String> key1 = new GridCacheAffinityKey<>("key1", affinityKey); - GridCacheAffinityKey<String> key2 = new GridCacheAffinityKey<>("key2", affinityKey); - GridCacheAffinityKey<String> key3 = new GridCacheAffinityKey<>("key3", affinityKey); - GridCacheAffinityKey<String> key4 = new GridCacheAffinityKey<>("key4", affinityKey); - - Map<GridCacheAffinityKey<String>, String> putMap = F.asMap( - key1, "val1", - key2, "val2", - key3, "val3", - key4, "val4"); - - try (IgniteTx tx = cache.txStartAffinity(affinityKey, concurrency, READ_COMMITTED, 0, 4)) { - cache.put(key1, "val1"); - cache.put(key2, "val2"); - cache.put(key3, "val3"); - cache.put(key4, "val4"); - - tx.commit(); - } - - for (int i = 0; i < gridCount(); i++) { - Ignite g = grid(i); - - GridCache<Object, Object> gCache = g.cache(null); - - if (gCache.affinity().isPrimaryOrBackup(g.cluster().localNode(), key1)) - assertEquals("For index: " + i, "val1", gCache.peek(key1)); - - if (gCache.affinity().isPrimaryOrBackup(g.cluster().localNode(), key2)) - assertEquals("For index: " + i, "val2", gCache.peek(key2)); - - if (gCache.affinity().isPrimaryOrBackup(g.cluster().localNode(), key3)) - assertEquals("For index: " + i, "val3", gCache.peek(key3)); - - if (gCache.affinity().isPrimaryOrBackup(g.cluster().localNode(), key4)) - assertEquals("For index: " + i, "val4", gCache.peek(key4)); - } - - // Check the store. - assertTrue(store.storeMap().equals(putMap)); - assertEquals(1, store.putCount()); - } - - /** @return {@code True} if sanity check should be enabled. */ - private boolean sanityCheckEnabled() { - return !getName().contains("SanityCheckDisabled"); - } - - /** - * @param primary Primary node for which key should be calculated. - * @return Key for which given node is primary. - * @throws IgniteCheckedException If affinity can not be calculated. - */ - protected UUID primaryKeyForCache(Ignite primary) throws IgniteCheckedException { - UUID res; - - int cnt = 0; - - UUID primaryId = primary.cluster().localNode().id(); - - do { - res = UUID.randomUUID(); - - cnt++; - - if (cnt > 10000) - throw new IllegalStateException("Cannot find key for primary node: " + primaryId); - } - while (!primary.cluster().mapKeyToNode(null, res).id().equals(primaryId)); - - return res; - } - - /** - * @param primary Primary node for which keys should be calculated. - * @param cnt Key count. - * @return Collection of generated keys. - * @throws IgniteCheckedException If affinity can not be calculated. - */ - protected UUID[] primaryKeysForCache(Ignite primary, int cnt) throws IgniteCheckedException { - Collection<UUID> keys = new LinkedHashSet<>(); - - int iters = 0; - - do { - keys.add(primaryKeyForCache(primary)); - - iters++; - - if (iters > 10000) - throw new IllegalStateException("Cannot find keys for primary node [nodeId=" + - primary.cluster().localNode().id() + ", cnt=" + cnt + ']'); - } - while (keys.size() < cnt); - - UUID[] res = new UUID[keys.size()]; - - return keys.toArray(res); - } - - /** Event listener that collects all incoming events. */ - protected static class CollectingEventListener implements IgnitePredicate<IgniteEvent> { - /** Collected events. */ - private final Collection<Object> affectedKeys = new GridConcurrentLinkedHashSet<>(); - - /** {@inheritDoc} */ - @Override public boolean apply(IgniteEvent evt) { - assert evt.type() == EVT_CACHE_OBJECT_LOCKED || evt.type() == EVT_CACHE_OBJECT_UNLOCKED; - - IgniteCacheEvent cacheEvt = (IgniteCacheEvent)evt; - - synchronized (this) { - affectedKeys.add(cacheEvt.key()); - - notifyAll(); - } - - return true; - } - - /** @return Collection of affected keys. */ - public Collection<Object> affectedKeys() { - return affectedKeys; - } - - /** - * Waits until events received for all supplied keys. - * - * @param timeout Timeout to wait. - * @param keys Keys to wait for. - * @return {@code True} if wait was successful, {@code false} if wait timed out. - * @throws InterruptedException If thread was interrupted. - */ - public boolean awaitKeys(long timeout, Object... keys) throws InterruptedException { - long start = System.currentTimeMillis(); - - Collection<Object> keysCol = Arrays.asList(keys); - - synchronized (this) { - while (true) { - long now = System.currentTimeMillis(); - - if (affectedKeys.containsAll(keysCol)) - return true; - else if (start + timeout > now) - wait(start + timeout - now); - else - return false; - } - } - } - } - - /** Test store that accumulates values into map. */ - private static class TestStore extends CacheStoreAdapter<Object, Object> { - /** */ - private ConcurrentMap<Object, Object> storeMap = new ConcurrentHashMap8<>(); - - /** */ - private AtomicInteger putCnt = new AtomicInteger(); - - /** {@inheritDoc} */ - @Override public Object load(Object key) { - return null; - } - - /** {@inheritDoc} */ - @Override public void writeAll(Collection<Cache.Entry<?, ?>> entries) { - for (Cache.Entry<?, ?> e : entries) - storeMap.put(e.getKey(), e.getValue()); - - putCnt.incrementAndGet(); - } - - /** {@inheritDoc} */ - @Override public void write(Cache.Entry<?, ?> e) { - storeMap.put(e.getKey(), e.getValue()); - - putCnt.incrementAndGet(); - } - - /** {@inheritDoc} */ - @Override public void delete(Object key) { - storeMap.remove(key); - } - - /** @return Stored values map. */ - public ConcurrentMap<Object, Object> storeMap() { - return storeMap; - } - - /** @return Number of calls to put(). */ - public int putCount() { - return putCnt.get(); - } - } -}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheGroupLockFailoverOptimisticTxSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheGroupLockFailoverOptimisticTxSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheGroupLockFailoverOptimisticTxSelfTest.java deleted file mode 100644 index 2c99423..0000000 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheGroupLockFailoverOptimisticTxSelfTest.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.cache; - -/** - * Tests optimistic group lock transactions. - */ -public class GridCacheGroupLockFailoverOptimisticTxSelfTest extends GridCacheGroupLockFailoverSelfTest { - /** {@inheritDoc} */ - @Override protected boolean optimisticTx() { - return true; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheGroupLockFailoverSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheGroupLockFailoverSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheGroupLockFailoverSelfTest.java deleted file mode 100644 index 7a92374..0000000 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheGroupLockFailoverSelfTest.java +++ /dev/null @@ -1,532 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.cache; - -import com.google.common.collect.*; -import org.apache.ignite.*; -import org.apache.ignite.cache.*; -import org.apache.ignite.cluster.*; -import org.apache.ignite.compute.*; -import org.apache.ignite.configuration.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.spi.discovery.tcp.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; -import org.apache.ignite.spi.failover.*; -import org.apache.ignite.spi.failover.always.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.testframework.junits.common.*; - -import java.util.*; -import java.util.concurrent.*; - -import static org.apache.ignite.cache.GridCacheAtomicityMode.*; -import static org.apache.ignite.cache.GridCacheDistributionMode.*; -import static org.apache.ignite.cache.GridCacheWriteSynchronizationMode.*; - -/** - * Tests group lock transaction failover. - */ -public class GridCacheGroupLockFailoverSelfTest extends GridCommonAbstractTest { - /** IP finder. */ - private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); - - /** Size of the test map. */ - private static final int TEST_MAP_SIZE = 200000; - - /** Cache name. */ - private static final String CACHE_NAME = "partitioned"; - - /** Size of data chunk, sent to a remote node. */ - private static final int DATA_CHUNK_SIZE = 1000; - - /** Number of chunk on which to fail worker node. */ - public static final int FAIL_ON_CHUNK_NO = (TEST_MAP_SIZE / DATA_CHUNK_SIZE) / 3; - - /** */ - private static final int FAILOVER_PUSH_GAP = 30; - - /** Master node name. */ - private static final String MASTER = "master"; - - /** Near enabled flag. */ - private boolean nearEnabled; - - /** Backups count. */ - private int backups; - - /** Filter to include only worker nodes. */ - private static final IgnitePredicate<ClusterNode> workerNodesFilter = new PN() { - @SuppressWarnings("unchecked") - @Override public boolean apply(ClusterNode n) { - return "worker".equals(n.attribute("segment")); - } - }; - - /** - * Result future queue (restrict the queue size - * to 50 in order to prevent in-memory data grid from over loading). - */ - private final BlockingQueue<ComputeTaskFuture<?>> resQueue = new LinkedBlockingQueue<>(10); - - /** - * @return {@code True} if test should use optimistic transactions. - */ - protected boolean optimisticTx() { - return false; - } - - /** - * @throws Exception If failed. - */ - public void testPutAllFailoverGroupLockNearEnabledOneBackup() throws Exception { - checkPutAllFailoverGroupLock(true, 3, 1); - } - - /** - * @throws Exception If failed. - */ - public void testPutAllFailoverGroupLockNearDisabledOneBackup() throws Exception { - checkPutAllFailoverGroupLock(false, 3, 1); - } - - /** - * @throws Exception If failed. - */ - public void testPutAllFailoverGroupLockNearEnabledTwoBackups() throws Exception { - checkPutAllFailoverGroupLock(true, 5, 2); - } - - /** - * @throws Exception If failed. - */ - public void testPutAllFailoverGroupLockNearDisabledTwoBackups() throws Exception { - checkPutAllFailoverGroupLock(false, 5, 2); - } - - /** - * @throws Exception If failed. - */ - public void testPutAllFailoverGroupLockNearEnabledThreeBackups() throws Exception { - checkPutAllFailoverGroupLock(true, 7, 3); - } - - /** - * @throws Exception If failed. - */ - public void testPutAllFailoverGroupLockNearDisabledThreeBackups() throws Exception { - checkPutAllFailoverGroupLock(false, 7, 3); - } - - /** {@inheritDoc} */ - @Override protected long getTestTimeout() { - return super.getTestTimeout() * 5; - } - - /** - * Tests putAll() method along with failover and cache backup. - * - * Checks that the resulting primary cache size is the same as - * expected. - * - * @param near {@code True} for near enabled. - * @param workerCnt Workers count. - * @param shutdownCnt Shutdown count. - * @throws Exception If failed. - */ - public void checkPutAllFailoverGroupLock(boolean near, int workerCnt, int shutdownCnt) throws Exception { - nearEnabled = near; - backups = shutdownCnt; - - Collection<Integer> testKeys = generateTestKeys(); - - Ignite master = startGrid(MASTER); - - List<Ignite> workers = new ArrayList<>(workerCnt); - - for (int i = 1; i <= workerCnt; i++) - workers.add(startGrid("worker" + i)); - - info("Master: " + master.cluster().localNode().id()); - - List<Ignite> runningWorkers = new ArrayList<>(workerCnt); - - for (int i = 1; i <= workerCnt; i++) { - UUID id = workers.get(i - 1).cluster().localNode().id(); - - info(String.format("Worker%d: %s", i, id)); - - runningWorkers.add(workers.get(i - 1)); - } - - try { - // Dummy call to fetch affinity function from remote node - master.cluster().mapKeyToNode(CACHE_NAME, "Dummy"); - - Map<UUID, Collection<Integer>> dataChunks = new HashMap<>(); - - int chunkCntr = 0; - - int failoverPushGap = 0; - - for (Integer key : testKeys) { - ClusterNode mappedNode = master.cluster().mapKeyToNode(CACHE_NAME, key); - - UUID nodeId = mappedNode.id(); - - Collection<Integer> data = dataChunks.get(nodeId); - - if (data == null) { - data = new ArrayList<>(DATA_CHUNK_SIZE); - - dataChunks.put(nodeId, data); - } - - data.add(key); - - if (data.size() == DATA_CHUNK_SIZE) { // time to send data - chunkCntr++; - - info("Pushing data chunk: " + chunkCntr); - - submitDataChunk(master, nodeId, data); - - data = new ArrayList<>(DATA_CHUNK_SIZE); - - dataChunks.put(nodeId, data); - - if (chunkCntr >= FAIL_ON_CHUNK_NO) { - if (workerCnt - runningWorkers.size() < shutdownCnt) { - if (failoverPushGap > 0) - failoverPushGap--; - else { - Ignite victim = runningWorkers.remove(0); - - info("Shutting down node: " + victim.cluster().localNode().id()); - - stopGrid(victim.name()); - - // Fail next node after some jobs have been pushed. - failoverPushGap = FAILOVER_PUSH_GAP; - } - } - } - } - } - - // Submit the rest of data. - for (Map.Entry<UUID, Collection<Integer>> entry : dataChunks.entrySet()) - submitDataChunk(master, entry.getKey(), entry.getValue()); - - // Wait for queue to empty. - info("Waiting for empty queue..."); - - long seenSize = resQueue.size(); - - while (true) { - U.sleep(10000); - - if (!resQueue.isEmpty()) { - long size = resQueue.size(); - - if (seenSize == size) { - info(">>> Failed to wait for queue to empty."); - - break; - } - - seenSize = size; - } - else - break; - } - - Collection<Integer> absentKeys = findAbsentKeys(runningWorkers.get(0), testKeys); - - info(">>> Absent keys: " + absentKeys); - - assertTrue(absentKeys.isEmpty()); - - // Actual primary cache size. - int primaryCacheSize = 0; - - for (Ignite g : runningWorkers) { - info(">>>>> " + g.cache(CACHE_NAME).size()); - - primaryCacheSize += g.cache(CACHE_NAME).primarySize(); - } - - assertTrue(TEST_MAP_SIZE <= primaryCacheSize); - } - finally { - stopAllGrids(); - } - } - - /** - * Does remapping. - * @param master Master grid. - * @param keys Keys. - * @throws IgniteCheckedException If failed. - */ - private void remap(final Ignite master, Iterable<Integer> keys) throws IgniteCheckedException { - Map<UUID, Collection<Integer>> dataChunks = new HashMap<>(); - - for (Integer key : keys) { - ClusterNode mappedNode = master.cluster().mapKeyToNode(CACHE_NAME, key); - - UUID nodeId = mappedNode.id(); - - Collection<Integer> data = dataChunks.get(nodeId); - - if (data == null) { - data = new ArrayList<>(DATA_CHUNK_SIZE); - - dataChunks.put(nodeId, data); - } - - data.add(key); - } - - for (Map.Entry<UUID, Collection<Integer>> entry : dataChunks.entrySet()) - submitDataChunk(master, entry.getKey(), entry.getValue()); - } - - /** - * Submits next data chunk as grid task. Blocks if queue is full. - * - * @param master Master node to submit from. - * @param preferredNodeId Node id to execute job on. - * @param dataChunk Data chunk to put in cache. - * @throws IgniteCheckedException If failed. - */ - private void submitDataChunk(final Ignite master, UUID preferredNodeId, final Collection<Integer> dataChunk) - throws IgniteCheckedException { - ClusterGroup prj = master.cluster().forPredicate(workerNodesFilter); - - IgniteCompute comp = master.compute(prj).enableAsync(); - - comp.execute(new GridCacheGroupLockPutTask(preferredNodeId, CACHE_NAME, optimisticTx()), dataChunk); - - ComputeTaskFuture<Void> fut = comp.future(); - - fut.listenAsync(new CI1<IgniteFuture<Void>>() { - @Override public void apply(IgniteFuture<Void> f) { - ComputeTaskFuture taskFut = (ComputeTaskFuture)f; - - boolean fail = false; - - try { - f.get(); //if something went wrong - we'll get exception here - } - catch (IgniteCheckedException ignore) { - info("Put task failed, going to remap keys: " + dataChunk.size()); - - fail = true; - } - finally { - // Remove complete future from queue to allow other jobs to proceed. - resQueue.remove(taskFut); - - try { - if (fail) - remap(master, dataChunk); - } - catch (IgniteCheckedException e) { - info("Failed to remap task [data=" + dataChunk.size() + ", e=" + e + ']'); - } - } - } - }); - - try { - resQueue.put(fut); - - if (fut.isDone()) - resQueue.remove(fut); - } - catch (InterruptedException ignored) { - info(">>>> Failed to wait for future submission: " + fut); - - Thread.currentThread().interrupt(); - } - } - - /** - * Tries to find keys, that are absent in cache. - * - * @param workerNode Worker node. - * @param keys Keys that are suspected to be absent - * @return List of absent keys. If no keys are absent, the list is empty. - * @throws IgniteCheckedException If error occurs. - */ - private Collection<Integer> findAbsentKeys(Ignite workerNode, - Collection<Integer> keys) throws IgniteCheckedException { - - Collection<Integer> ret = new ArrayList<>(keys.size()); - - GridCache<Object, Object> cache = workerNode.cache(CACHE_NAME); - - for (Integer key : keys) { - if (cache.get(key) == null) // Key is absent. - ret.add(key); - } - - return ret; - } - - /** - * Generates a test keys collection. - * - * @return A test keys collection. - */ - private Collection<Integer> generateTestKeys() { - Collection<Integer> ret = new ArrayList<>(TEST_MAP_SIZE); - - for (int i = 0; i < TEST_MAP_SIZE; i++) - ret.add(i); - - return ret; - } - - /** {@inheritDoc} */ - @SuppressWarnings("unchecked") - @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration cfg = super.getConfiguration(gridName); - - cfg.setPeerClassLoadingEnabled(false); - - cfg.setDeploymentMode(IgniteDeploymentMode.CONTINUOUS); - - TcpDiscoverySpi discoverySpi = new TcpDiscoverySpi(); - - discoverySpi.setAckTimeout(60000); - discoverySpi.setIpFinder(ipFinder); - - cfg.setDiscoverySpi(discoverySpi); - - if (gridName.startsWith("master")) { - cfg.setUserAttributes(ImmutableMap.of("segment", "master")); - - GridTestFailoverSpi failoverSpi = new GridTestFailoverSpi(true, (IgnitePredicate)workerNodesFilter); - - // For sure. - failoverSpi.setMaximumFailoverAttempts(50); - - cfg.setFailoverSpi(failoverSpi); - } - else if (gridName.startsWith("worker")) { - GridTestFailoverSpi failoverSpi = new GridTestFailoverSpi(false); - - cfg.setFailoverSpi(failoverSpi); - - cfg.setUserAttributes(ImmutableMap.of("segment", "worker")); - - CacheConfiguration cacheCfg = defaultCacheConfiguration(); - cacheCfg.setName("partitioned"); - cacheCfg.setCacheMode(GridCacheMode.PARTITIONED); - cacheCfg.setStartSize(4500000); - cacheCfg.setBackups(backups); - cacheCfg.setStoreValueBytes(true); - cacheCfg.setDistributionMode(nearEnabled ? NEAR_PARTITIONED : PARTITIONED_ONLY); - cacheCfg.setQueryIndexEnabled(false); - cacheCfg.setWriteSynchronizationMode(FULL_SYNC); - cacheCfg.setAtomicityMode(TRANSACTIONAL); - - cfg.setCacheConfiguration(cacheCfg); - } - else - throw new IllegalStateException("Unexpected grid name: " + gridName); - - return cfg; - } - - /** - * Test failover SPI that remembers the job contexts of failed jobs. - */ - private class GridTestFailoverSpi extends AlwaysFailoverSpi { - /** */ - private static final String FAILOVER_NUMBER_ATTR = "failover:number:attr"; - - /** */ - private final boolean master; - - /** */ - private Set<ComputeJobContext> failedOverJobs = new HashSet<>(); - - /** Node filter. */ - private IgnitePredicate<? super ClusterNode>[] filter; - - /** - * @param master Master flag. - * @param filter Filters. - */ - @SafeVarargs - GridTestFailoverSpi(boolean master, IgnitePredicate<? super ClusterNode>... filter) { - this.master = master; - this.filter = filter; - } - - /** {@inheritDoc} */ - @Override public ClusterNode failover(FailoverContext ctx, List<ClusterNode> top) { - List<ClusterNode> cp = null; - if (master) { - failedOverJobs.add(ctx.getJobResult().getJobContext()); - - // Clear failed nodes list - allow to failover on the same node. - ctx.getJobResult().getJobContext().setAttribute(FAILED_NODE_LIST_ATTR, null); - - // Account for maximum number of failover attempts since we clear failed node list. - Integer failoverCnt = ctx.getJobResult().getJobContext().getAttribute(FAILOVER_NUMBER_ATTR); - - if (failoverCnt == null) - ctx.getJobResult().getJobContext().setAttribute(FAILOVER_NUMBER_ATTR, 1); - else { - if (failoverCnt >= getMaximumFailoverAttempts()) { - info("Job failover failed because number of maximum failover attempts is exceeded " + - "[failedJob=" + ctx.getJobResult().getJob() + ", maxFailoverAttempts=" + - getMaximumFailoverAttempts() + ']'); - - return null; - } - - ctx.getJobResult().getJobContext().setAttribute(FAILOVER_NUMBER_ATTR, failoverCnt + 1); - } - - cp = new ArrayList<>(top); - - // Keep collection type. - F.retain(cp, false, new IgnitePredicate<ClusterNode>() { - @Override public boolean apply(ClusterNode node) { - return F.isAll(node, filter); - } - }); - } - - return super.failover(ctx, cp); //use cp to ensure we don't failover on failed node - } - - /** - * @return Job contexts for failed over jobs. - */ - public Set<ComputeJobContext> getFailedOverJobs() { - return failedOverJobs; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheGroupLockMultiNodeAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheGroupLockMultiNodeAbstractSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheGroupLockMultiNodeAbstractSelfTest.java deleted file mode 100644 index 2073bf7..0000000 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheGroupLockMultiNodeAbstractSelfTest.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.cache; - -/** - * Abstract test for multi-node group lock tests. - */ -public abstract class GridCacheGroupLockMultiNodeAbstractSelfTest extends GridCacheGroupLockAbstractSelfTest { - /** {@inheritDoc} */ - @Override protected int gridCount() { - return 3; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheGroupLockPutTask.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheGroupLockPutTask.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheGroupLockPutTask.java deleted file mode 100644 index ebf1e2f..0000000 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheGroupLockPutTask.java +++ /dev/null @@ -1,157 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.cache; - -import org.apache.ignite.*; -import org.apache.ignite.cache.*; -import org.apache.ignite.cluster.*; -import org.apache.ignite.compute.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.resources.*; -import org.apache.ignite.transactions.*; -import org.apache.ignite.internal.util.typedef.*; -import org.jetbrains.annotations.*; - -import java.util.*; - -import static org.apache.ignite.transactions.IgniteTxConcurrency.*; -import static org.apache.ignite.transactions.IgniteTxIsolation.*; - -/** - * Puts all the passed data into partitioned cache in small chunks. - */ -class GridCacheGroupLockPutTask extends ComputeTaskAdapter<Collection<Integer>, Void> { - /** Preferred node. */ - private final UUID preferredNode; - - /** Cache name. */ - private final String cacheName; - - /** Optimistic transaction flag. */ - private final boolean optimistic; - - /** - * - * @param preferredNode A node that we'd prefer to take from grid. - * @param cacheName A name of the cache to work with. - * @param optimistic Optimistic transaction flag. - */ - GridCacheGroupLockPutTask(UUID preferredNode, String cacheName, boolean optimistic) { - this.preferredNode = preferredNode; - this.cacheName = cacheName; - this.optimistic = optimistic; - } - - /** - * This method is called to map or split grid task into multiple grid jobs. This is the first method that gets called - * when task execution starts. - * - * @param data Task execution argument. Can be {@code null}. This is the same argument as the one passed into {@code - * Grid#execute(...)} methods. - * @param subgrid Nodes available for this task execution. Note that order of nodes is guaranteed to be randomized by - * container. This ensures that every time you simply iterate through grid nodes, the order of nodes - * will be random which over time should result into all nodes being used equally. - * @return Map of grid jobs assigned to subgrid node. Unless {@link org.apache.ignite.compute.ComputeTaskContinuousMapper} is injected into task, if - * {@code null} or empty map is returned, exception will be thrown. - * @throws IgniteCheckedException If mapping could not complete successfully. This exception will be thrown out of {@link - * org.apache.ignite.compute.ComputeTaskFuture#get()} method. - */ - @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, - @Nullable final Collection<Integer> data) throws IgniteCheckedException { - assert !subgrid.isEmpty(); - - // Give preference to wanted node. Otherwise, take the first one. - ClusterNode targetNode = F.find(subgrid, subgrid.get(0), new IgnitePredicate<ClusterNode>() { - @Override public boolean apply(ClusterNode e) { - return preferredNode.equals(e.id()); - } - }); - - return Collections.singletonMap( - new ComputeJobAdapter() { - @IgniteLoggerResource - private IgniteLogger log; - - @IgniteInstanceResource - private Ignite ignite; - - @Override public Object execute() throws IgniteCheckedException { - log.info("Going to put data: " + data.size()); - - GridCache<Object, Object> cache = ignite.cache(cacheName); - - assert cache != null; - - Map<Integer, T2<Integer, Collection<Integer>>> putMap = groupData(data); - - for (Map.Entry<Integer, T2<Integer, Collection<Integer>>> entry : putMap.entrySet()) { - T2<Integer, Collection<Integer>> pair = entry.getValue(); - - Object affKey = pair.get1(); - - // Group lock partition. - try (IgniteTx tx = cache.txStartPartition(cache.affinity().partition(affKey), - optimistic ? OPTIMISTIC : PESSIMISTIC, REPEATABLE_READ, 0, pair.get2().size())) { - for (Integer val : pair.get2()) - cache.put(val, val); - - tx.commit(); - } - } - - log.info("Finished put data: " + data.size()); - - return data; - } - - /** - * Groups values by partitions. - * - * @param data Data to put. - * @return Grouped map. - */ - private Map<Integer, T2<Integer, Collection<Integer>>> groupData(Iterable<Integer> data) { - GridCache<Object, Object> cache = ignite.cache(cacheName); - - Map<Integer, T2<Integer, Collection<Integer>>> res = new HashMap<>(); - - for (Integer val : data) { - int part = cache.affinity().partition(val); - - T2<Integer, Collection<Integer>> tup = res.get(part); - - if (tup == null) { - tup = new T2<Integer, Collection<Integer>>(val, new LinkedList<Integer>()); - - res.put(part, tup); - } - - tup.get2().add(val); - } - - return res; - } - }, - targetNode); - } - - /** {@inheritDoc} */ - @Nullable @Override public Void reduce(List<ComputeJobResult> results) throws IgniteCheckedException { - return null; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheIncrementTransformTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheIncrementTransformTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheIncrementTransformTest.java deleted file mode 100644 index 189b731..0000000 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheIncrementTransformTest.java +++ /dev/null @@ -1,233 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.cache; - -import org.apache.ignite.*; -import org.apache.ignite.cache.*; -import org.apache.ignite.configuration.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.spi.discovery.tcp.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; -import org.apache.ignite.testframework.*; -import org.apache.ignite.testframework.junits.common.*; - -import javax.cache.processor.*; -import java.io.*; -import java.util.*; -import java.util.concurrent.atomic.*; - -import static org.apache.ignite.cache.GridCacheAtomicWriteOrderMode.*; -import static org.apache.ignite.cache.GridCacheAtomicityMode.*; -import static org.apache.ignite.cache.GridCacheMode.*; -import static org.apache.ignite.cache.GridCacheDistributionMode.*; -import static org.apache.ignite.cache.GridCachePreloadMode.*; -import static org.apache.ignite.cache.GridCacheWriteSynchronizationMode.*; - -/** - * Tests cache in-place modification logic with iterative value increment. - */ -public class GridCacheIncrementTransformTest extends GridCommonAbstractTest { - /** IP finder. */ - private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); - - /** Number of nodes to test on. */ - private static final int GRID_CNT = 4; - - /** Number of increment iterations. */ - private static final int NUM_ITERS = 5000; - - /** Helper for excluding stopped node from iteration logic. */ - private AtomicReferenceArray<Ignite> grids; - - /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration cfg = super.getConfiguration(gridName); - - CacheConfiguration cache = new CacheConfiguration(); - - cache.setCacheMode(PARTITIONED); - cache.setAtomicityMode(ATOMIC); - cache.setDistributionMode(PARTITIONED_ONLY); - cache.setAtomicWriteOrderMode(PRIMARY); - cache.setWriteSynchronizationMode(FULL_SYNC); - cache.setBackups(1); - cache.setPreloadMode(SYNC); - - cfg.setCacheConfiguration(cache); - - TcpDiscoverySpi disco = new TcpDiscoverySpi(); - - disco.setIpFinder(IP_FINDER); - - cfg.setDiscoverySpi(disco); - - return cfg; - } - - /** {@inheritDoc} */ - @Override protected void beforeTest() throws Exception { - startGrids(GRID_CNT); - - grids = new AtomicReferenceArray<>(GRID_CNT); - - for (int i = 0; i < GRID_CNT; i++) - grids.set(i, grid(i)); - - cache(0).put("key", new TestObject(0)); - } - - /** {@inheritDoc} */ - @Override protected void afterTest() throws Exception { - stopAllGrids(); - - grids = null; - } - - /** - * @throws Exception If failed. - */ - public void testIncrement() throws Exception { - testIncrement(false); - } - - /** - * @throws Exception If failed. - */ - public void testIncrementRestart() throws Exception { - final AtomicBoolean stop = new AtomicBoolean(); - final AtomicReference<Throwable> error = new AtomicReference<>(); - - IgniteFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(new Runnable() { - @Override public void run() { - try { - Random rnd = new Random(); - - while (!stop.get()) { - int idx = -1; - - Ignite ignite = null; - - while (ignite == null) { - idx = rnd.nextInt(GRID_CNT); - - ignite = grids.getAndSet(idx, null); - } - - stopGrid(idx); - - assert grids.compareAndSet(idx, null, startGrid(idx)); - } - } - catch (Exception e) { - error.set(e); - } - } - }, 1, "restarter"); - - try { - testIncrement(true); - - assertNull(error.get()); - } - finally { - stop.set(true); - - fut.get(getTestTimeout()); - } - } - - /** - * @param restarts Whether test is running with node restarts. - * @throws Exception If failed. - */ - private void testIncrement(boolean restarts) throws Exception { - Random rnd = new Random(); - - for (int i = 0; i < NUM_ITERS; i++) { - int idx = -1; - - Ignite ignite = null; - - while (ignite == null) { - idx = rnd.nextInt(GRID_CNT); - - ignite = restarts ? grids.getAndSet(idx, null) : grid(idx); - } - - IgniteCache<String, TestObject> cache = ignite.jcache(null); - - assertNotNull(cache); - - TestObject obj = cache.get("key"); - - assertNotNull(obj); - assertEquals(i, obj.val); - - while (true) { - try { - cache.invoke("key", new Processor()); - - break; - } - catch (CachePartialUpdateException ignored) { - // Need to re-check if update actually succeeded. - TestObject updated = cache.get("key"); - - if (updated != null && updated.val == i + 1) - break; - } - } - - if (restarts) - assert grids.compareAndSet(idx, null, ignite); - } - } - - /** */ - private static class TestObject implements Serializable { - /** Value. */ - private int val; - - /** - * @param val Value. - */ - private TestObject(int val) { - this.val = val; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return "TestObject [val=" + val + ']'; - } - } - - /** */ - private static class Processor implements EntryProcessor<String, TestObject, Void>, Serializable { - /** {@inheritDoc} */ - @Override public Void process(MutableEntry<String, TestObject> e, Object... args) { - TestObject obj = e.getValue(); - - assert obj != null; - - e.setValue(new TestObject(obj.val + 1)); - - return null; - } - } -}