http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheGroupLockAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheGroupLockAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheGroupLockAbstractSelfTest.java new file mode 100644 index 0000000..2fa5e5d --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheGroupLockAbstractSelfTest.java @@ -0,0 +1,1328 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.affinity.*; +import org.apache.ignite.cache.store.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.events.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.util.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.spi.communication.tcp.*; +import org.apache.ignite.transactions.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.testframework.*; +import org.apache.ignite.testframework.junits.common.*; +import org.jdk8.backport.*; + +import javax.cache.*; +import javax.cache.configuration.*; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.events.IgniteEventType.*; +import static org.apache.ignite.cache.GridCacheAtomicityMode.*; +import static org.apache.ignite.cache.GridCacheDistributionMode.*; +import static org.apache.ignite.transactions.IgniteTxConcurrency.*; +import static org.apache.ignite.transactions.IgniteTxIsolation.*; + +/** + * Test for group locking. + */ +public abstract class GridCacheGroupLockAbstractSelfTest extends GridCommonAbstractTest { + /** */ + private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** Event wait timeout. */ + private static final int WAIT_TIMEOUT = 3000; + + /** */ + private TestStore store; + + /** @return Grid count to run in test. */ + protected int gridCount() { + return 1; + } + + /** @return Whether near cache is enabled. */ + protected abstract boolean nearEnabled(); + + /** @return Cache mode for test. */ + protected abstract GridCacheMode cacheMode(); + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + CacheConfiguration cacheCfg = defaultCacheConfiguration(); + + cacheCfg.setCacheMode(cacheMode()); + cacheCfg.setDistributionMode(nearEnabled() ? NEAR_PARTITIONED : PARTITIONED_ONLY); + cacheCfg.setWriteSynchronizationMode(GridCacheWriteSynchronizationMode.FULL_SYNC); + cacheCfg.setAtomicityMode(TRANSACTIONAL); + + cacheCfg.setCacheStoreFactory(new Factory<CacheStore<? super Object, ? super Object>>() { + @Override public CacheStore<? super Object, ? super Object> create() { + return store; + } + }); + cacheCfg.setReadThrough(true); + cacheCfg.setWriteThrough(true); + cacheCfg.setLoadPreviousValue(true); + + cfg.setCacheConfiguration(cacheCfg); + cfg.setCacheSanityCheckEnabled(sanityCheckEnabled()); + + TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); + + discoSpi.setIpFinder(ipFinder); + + cfg.setDiscoverySpi(discoSpi); + + TcpCommunicationSpi commSpi = new TcpCommunicationSpi(); + + commSpi.setSharedMemoryPort(-1); + + cfg.setCommunicationSpi(commSpi); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + store = new TestStore(); + + startGridsMultiThreaded(gridCount()); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(true); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + store = null; + + super.afterTestsStopped(); + } + + /** + * @throws Exception If failed. + */ + public void testGroupLockPutOneKeyOptimistic() throws Exception { + checkGroupLockPutOneKey(OPTIMISTIC); + } + + /** + * @throws Exception If failed. + */ + public void testGroupLockPutOneKeyPessimistic() throws Exception { + checkGroupLockPutOneKey(PESSIMISTIC); + } + + /** + * @param concurrency Transaction concurrency mode. + * @throws Exception If failed. + */ + private void checkGroupLockPutOneKey(IgniteTxConcurrency concurrency) throws Exception { + CollectingEventListener locks = new CollectingEventListener(); + CollectingEventListener unlocks = new CollectingEventListener(); + + grid(0).events().localListen(locks, EVT_CACHE_OBJECT_LOCKED); + grid(0).events().localListen(unlocks, EVT_CACHE_OBJECT_UNLOCKED); + + UUID affinityKey = primaryKeyForCache(grid(0)); + + GridCache<GridCacheAffinityKey<String>, String> cache = grid(0).cache(null); + + GridCacheAffinityKey<String> key1; + GridCacheAffinityKey<String> key2; + + try (IgniteTx tx = cache.txStartAffinity(affinityKey, concurrency, READ_COMMITTED, 0, 2)) { + if (concurrency == PESSIMISTIC) + assertTrue("Failed to wait for lock events: " + affinityKey, locks.awaitKeys(WAIT_TIMEOUT, affinityKey)); + else + assertEquals("Unexpected number of lock events: " + locks.affectedKeys(), 0, locks.affectedKeys().size()); + + assertEquals("Unexpected number of unlock events: " + unlocks.affectedKeys(), 0, unlocks.affectedKeys().size()); + + key1 = new GridCacheAffinityKey<>("key1", affinityKey); + key2 = new GridCacheAffinityKey<>("key2", affinityKey); + + cache.putAll(F.asMap( + key1, "val1", + key2, "val2") + ); + + tx.commit(); + } + + // Check that there are no further locks after transaction commit. + assertEquals("Unexpected number of lock events: " + locks.affectedKeys(), 1, locks.affectedKeys().size()); + assertTrue("Failed to wait for unlock events: " + affinityKey, unlocks.awaitKeys(WAIT_TIMEOUT, affinityKey)); + + for (int i = 0; i < gridCount(); i++) { + Ignite g = grid(i); + + GridCache<Object, Object> gCache = g.cache(null); + + if (gCache.affinity().isPrimaryOrBackup(g.cluster().localNode(), key1)) + assertEquals("For index: " + i, "val1", gCache.peek(key1)); + + if (gCache.affinity().isPrimaryOrBackup(g.cluster().localNode(), key2)) + assertEquals("For index: " + i, "val2", gCache.peek(key2)); + } + } + + /** + * @throws Exception If failed. + */ + public void testGroupLockRemoveOneKeyOptimistic() throws Exception { + checkGroupLockRemoveOneKey(OPTIMISTIC); + } + + /** + * @throws Exception If failed. + */ + public void testGroupLockRemoveOneKeyPessimistic() throws Exception { + checkGroupLockRemoveOneKey(PESSIMISTIC); + } + + /** + * @param concurrency Transaction concurrency mode. + * @throws Exception If failed. + */ + private void checkGroupLockRemoveOneKey(IgniteTxConcurrency concurrency) throws Exception { + CollectingEventListener locks = new CollectingEventListener(); + CollectingEventListener unlocks = new CollectingEventListener(); + + UUID affinityKey = primaryKeyForCache(grid(0)); + + GridCacheAffinityKey<String> key1 = new GridCacheAffinityKey<>("key1", affinityKey); + GridCacheAffinityKey<String> key2 = new GridCacheAffinityKey<>("key2", affinityKey); + + GridCache<GridCacheAffinityKey<String>, String> cache = grid(0).cache(null); + + // Populate cache. + cache.putAll(F.asMap( + key1, "val1", + key2, "val2") + ); + + for (int i = 0; i < gridCount(); i++) { + Ignite g = grid(i); + + GridCache<Object, Object> gCache = g.cache(null); + + if (gCache.affinity().isPrimaryOrBackup(g.cluster().localNode(), key1)) + assertEquals("For index: " + i, "val1", gCache.peek(key1)); + + if (gCache.affinity().isPrimaryOrBackup(g.cluster().localNode(), key2)) + assertEquals("For index: " + i, "val2", gCache.peek(key2)); + } + + grid(0).events().localListen(locks, EVT_CACHE_OBJECT_LOCKED); + grid(0).events().localListen(unlocks, EVT_CACHE_OBJECT_UNLOCKED); + + try (IgniteTx tx = cache.txStartAffinity(affinityKey, concurrency, READ_COMMITTED, 0, 2)) { + if (concurrency == PESSIMISTIC) + assertTrue("Failed to wait for lock events: " + affinityKey, locks.awaitKeys(WAIT_TIMEOUT, affinityKey)); + else + assertEquals("Unexpected number of lock events: " + locks.affectedKeys(), 0, locks.affectedKeys().size()); + + assertEquals("Unexpected number of unlock events: " + unlocks.affectedKeys(), 0, unlocks.affectedKeys().size()); + + + cache.removeAll(F.asList(key1, key2)); + + tx.commit(); + } + + // Check that there are no further locks after transaction commit. + assertEquals("Unexpected number of lock events: " + locks.affectedKeys(), 1, locks.affectedKeys().size()); + assertTrue("Failed to wait for unlock events: " + affinityKey, unlocks.awaitKeys(WAIT_TIMEOUT, affinityKey)); + + for (int i = 0; i < gridCount(); i++) { + Ignite g = grid(i); + + GridCache<Object, Object> gCache = g.cache(null); + + if (gCache.affinity().isPrimaryOrBackup(g.cluster().localNode(), key1)) + assertNull("For index: " + i, gCache.peek(key1)); + + if (gCache.affinity().isPrimaryOrBackup(g.cluster().localNode(), key2)) + assertNull("For index: " + i, gCache.peek(key2)); + } + } + + /** + * @throws Exception If failed. + */ + public void testGroupLockGetOneKeyOptimistic() throws Exception { + checkGroupLockGetOneKey(OPTIMISTIC); + } + + /** + * @throws Exception If failed. + */ + public void testGroupLockGetOneKeyPessimistic() throws Exception { + checkGroupLockGetOneKey(PESSIMISTIC); + } + + /** + * @param concurrency Transaction concurrency mode. + * @throws Exception If failed. + */ + private void checkGroupLockGetOneKey(IgniteTxConcurrency concurrency) throws Exception { + CollectingEventListener locks = new CollectingEventListener(); + CollectingEventListener unlocks = new CollectingEventListener(); + + UUID affinityKey = primaryKeyForCache(grid(0)); + + GridCacheAffinityKey<String> key1 = new GridCacheAffinityKey<>("key1", affinityKey); + GridCacheAffinityKey<String> key2 = new GridCacheAffinityKey<>("key2", affinityKey); + + GridCache<GridCacheAffinityKey<String>, String> cache = grid(0).cache(null); + + // Populate cache. + cache.putAll(F.asMap( + key1, "val1", + key2, "val2") + ); + + for (int i = 0; i < gridCount(); i++) { + Ignite g = grid(i); + + GridCache<Object, Object> gCache = g.cache(null); + + if (gCache.affinity().isPrimaryOrBackup(g.cluster().localNode(), key1)) + assertEquals("For index: " + i, "val1", gCache.peek(key1)); + + if (gCache.affinity().isPrimaryOrBackup(g.cluster().localNode(), key2)) + assertEquals("For index: " + i, "val2", gCache.peek(key2)); + } + + grid(0).events().localListen(locks, EVT_CACHE_OBJECT_LOCKED); + grid(0).events().localListen(unlocks, EVT_CACHE_OBJECT_UNLOCKED); + + try (IgniteTx tx = cache.txStartAffinity(affinityKey, concurrency, READ_COMMITTED, 0, 2)) { + if (concurrency == PESSIMISTIC) + assertTrue("Failed to wait for lock events: " + affinityKey, locks.awaitKeys(WAIT_TIMEOUT, affinityKey)); + else + assertEquals("Unexpected number of lock events: " + locks.affectedKeys(), 0, locks.affectedKeys().size()); + + assertEquals("Unexpected number of unlock events: " + unlocks.affectedKeys(), 0, unlocks.affectedKeys().size()); + + assertEquals("val1", cache.get(key1)); + + assertEquals("val2", cache.get(key2)); + + tx.commit(); + } + + // Check that there are no further locks after transaction commit. + assertEquals("Unexpected number of lock events: " + locks.affectedKeys(), 1, locks.affectedKeys().size()); + assertTrue("Failed to wait for unlock events: " + affinityKey, unlocks.awaitKeys(WAIT_TIMEOUT, affinityKey)); + } + + /** + * @throws Exception If failed. + */ + public void testGroupLockWithExternalLockOptimistic() throws Exception { + checkGroupLockWithExternalLock(OPTIMISTIC); + } + + /** + * @throws Exception If failed. + */ + public void testGroupLockWithExternalLockPessimistic() throws Exception { + checkGroupLockWithExternalLock(PESSIMISTIC); + } + + /** + * @param concurrency Transaction concurrency mode. + * @throws Exception If failed. + */ + private void checkGroupLockWithExternalLock(final IgniteTxConcurrency concurrency) throws Exception { + assert sanityCheckEnabled(); + + final UUID affinityKey = primaryKeyForCache(grid(0)); + + final GridCacheAffinityKey<String> key1 = new GridCacheAffinityKey<>("key1", affinityKey); + + final IgniteCache<GridCacheAffinityKey<String>, String> cache = grid(0).jcache(null); + + // Populate cache. + cache.put(key1, "val1"); + + for (int i = 0; i < gridCount(); i++) { + Ignite g = grid(i); + + GridCache<Object, Object> gCache = g.cache(null); + + if (gCache.affinity().isPrimaryOrBackup(g.cluster().localNode(), key1)) + assertEquals("For index: " + i, "val1", gCache.peek(key1)); + } + + final CountDownLatch unlockLatch = new CountDownLatch(1); + final CountDownLatch lockLatch = new CountDownLatch(1); + + IgniteFuture<?> fut = multithreadedAsync(new Runnable() { + @Override public void run() { + try { + cache.lock(key1).lock(); + + try { + lockLatch.countDown(); + unlockLatch.await(); + } + finally { + cache.lock(key1).unlock(); + } + } + catch (CacheException e) { + fail(e.getMessage()); + } + catch (InterruptedException ignored) { + Thread.currentThread().interrupt(); + } + } + }, 1); + + try { + lockLatch.await(); + + GridTestUtils.assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + try (IgniteTx tx = grid(0).transactions().txStartAffinity(null, affinityKey, concurrency, + READ_COMMITTED, 0, 1)) { + cache.put(key1, "val01"); + + tx.commit(); + } + + return null; + } + }, IgniteTxHeuristicException.class, null); + } + finally { + unlockLatch.countDown(); + + fut.get(); + } + } + + /** + * @throws Exception If failed. + */ + public void testSanityCheckDisabledOptimistic() throws Exception { + checkSanityCheckDisabled(OPTIMISTIC); + } + + /** + * @throws Exception If failed. + */ + public void testSanityCheckDisabledPessimistic() throws Exception { + checkSanityCheckDisabled(PESSIMISTIC); + } + + /** + * @param concurrency Transaction concurrency mode. + * @throws Exception If failed. + */ + private void checkSanityCheckDisabled(final IgniteTxConcurrency concurrency) throws Exception { + assert !sanityCheckEnabled(); + + GridEx grid = grid(0); + + final UUID affinityKey = primaryKeyForCache(grid); + + final GridCacheAffinityKey<String> key1 = new GridCacheAffinityKey<>("key1", affinityKey); + + final IgniteCache<GridCacheAffinityKey<String>, String> cache = grid.jcache(null); + + // Populate cache. + cache.put(key1, "val1"); + + for (int i = 0; i < gridCount(); i++) { + Ignite g = grid(i); + + GridCache<Object, Object> gCache = g.cache(null); + + if (gCache.affinity().isPrimaryOrBackup(g.cluster().localNode(), key1)) + assertEquals("For index: " + i, "val1", gCache.peek(key1)); + } + + cache.lock(key1).lock(); + + try { + try (IgniteTx tx = grid.transactions().txStartAffinity(null, affinityKey, concurrency, READ_COMMITTED, 0, 1)) { + cache.put(key1, "val01"); + + tx.commit(); + } + + for (int i = 0; i < gridCount(); i++) { + Ignite g = grid(i); + + GridCache<Object, Object> gCache = g.cache(null); + + if (gCache.affinity().isPrimaryOrBackup(g.cluster().localNode(), key1)) + assertEquals("For index: " + i, "val01", gCache.peek(key1)); + } + } + finally { + cache.lock(key1).unlock(); + } + } + + /** + * @throws Exception If failed. + */ + public void testGroupPartitionLockOptimistic() throws Exception { + checkGroupPartitionLock(OPTIMISTIC); + } + + /** + * @throws Exception If failed. + */ + public void testGroupPartitionLockPessimistic() throws Exception { + checkGroupPartitionLock(PESSIMISTIC); + } + + /** + * @param concurrency Transaction concurrency mode. + * @throws Exception If failed. + */ + private void checkGroupPartitionLock(IgniteTxConcurrency concurrency) throws Exception { + CollectingEventListener locks = new CollectingEventListener(); + CollectingEventListener unlocks = new CollectingEventListener(); + + grid(0).events().localListen(locks, EVT_CACHE_OBJECT_LOCKED); + grid(0).events().localListen(unlocks, EVT_CACHE_OBJECT_UNLOCKED); + + UUID affinityKey = primaryKeyForCache(grid(0)); + + GridCache<UUID, String> cache = grid(0).cache(null); + + UUID key1; + UUID key2; + + try (IgniteTx tx = cache.txStartPartition(cache.affinity().partition(affinityKey), concurrency, + READ_COMMITTED, 0, 2)) { + // Note that events are not generated for internal keys. + assertEquals("Unexpected number of lock events: " + locks.affectedKeys(), 0, locks.affectedKeys().size()); + assertEquals("Unexpected number of unlock events: " + unlocks.affectedKeys(), 0, + unlocks.affectedKeys().size()); + + GridCacheAdapter<Object, Object> cacheAdapter = ((GridKernal)grid(0)).internalCache(); + + GridCacheAffinityManager<Object, Object> affMgr = cacheAdapter.context().affinity(); + + GridPartitionLockKey partAffKey = affMgr.partitionAffinityKey(cache.affinity().partition(affinityKey)); + + if (concurrency == PESSIMISTIC) + assertTrue(cacheAdapter.entryEx(partAffKey).lockedByThread()); + + do { + key1 = UUID.randomUUID(); + } + while (cache.affinity().partition(key1) != cache.affinity().partition(affinityKey)); + + do { + key2 = UUID.randomUUID(); + } + while (cache.affinity().partition(key2) != cache.affinity().partition(affinityKey)); + + cache.putAll(F.asMap( + key1, "val1", + key2, "val2") + ); + + tx.commit(); + } + + // Check that there are no further locks after transaction commit. + assertEquals("Unexpected number of lock events: " + locks.affectedKeys(), 0, locks.affectedKeys().size()); + assertEquals("Unexpected number of unlock events: " + unlocks.affectedKeys(), 0, unlocks.affectedKeys().size()); + + for (int i = 0; i < gridCount(); i++) { + Ignite g = grid(i); + + GridCache<Object, Object> gCache = g.cache(null); + + if (gCache.affinity().isPrimaryOrBackup(g.cluster().localNode(), key1)) + assertEquals("For index: " + i, "val1", gCache.peek(key1)); + + if (gCache.affinity().isPrimaryOrBackup(g.cluster().localNode(), key2)) + assertEquals("For index: " + i, "val2", gCache.peek(key2)); + } + } + + /** + * @throws Exception If failed. + */ + public void testGetPutOptimisticReadCommitted() throws Exception { + checkGetPut(OPTIMISTIC, READ_COMMITTED); + } + + /** + * @throws Exception If failed. + */ + public void testGetPutOptimisticRepeatableRead() throws Exception { + checkGetPut(OPTIMISTIC, REPEATABLE_READ); + } + + /** + * @throws Exception If failed. + */ + public void testGetPutPessimisticReadCommitted() throws Exception { + checkGetPut(PESSIMISTIC, READ_COMMITTED); + } + + /** + * @throws Exception If failed. + */ + public void testGetPutPessimisticRepeatableRead() throws Exception { + checkGetPut(PESSIMISTIC, REPEATABLE_READ); + } + + /** + * @throws Exception If failed. + */ + public void testGetPutEmptyCachePessimisticReadCommitted() throws Exception { + checkGetPutEmptyCache(PESSIMISTIC, READ_COMMITTED); + } + + /** + * @throws Exception If failed. + */ + public void testGetPutEmptyCachePessimisticRepeatableRead() throws Exception { + checkGetPutEmptyCache(PESSIMISTIC, REPEATABLE_READ); + } + + /** + * @throws Exception If failed. + */ + public void testGetPutEmptyCacheOptimisticReadCommitted() throws Exception { + checkGetPutEmptyCache(OPTIMISTIC, READ_COMMITTED); + } + + /** + * @throws Exception If failed. + */ + public void testGetPutEmptyCacheOptimisticRepeatableRead() throws Exception { + checkGetPutEmptyCache(OPTIMISTIC, REPEATABLE_READ); + } + + /** + * @param concurrency Transaction concurrency mode. + * @param isolation Transaction isolation mode. + * @throws Exception If failed. + */ + private void checkGetPut(IgniteTxConcurrency concurrency, IgniteTxIsolation isolation) throws Exception { + CollectingEventListener locks = new CollectingEventListener(); + CollectingEventListener unlocks = new CollectingEventListener(); + + UUID affinityKey = primaryKeyForCache(grid(0)); + + GridCacheAffinityKey<String> key1 = new GridCacheAffinityKey<>("key1", affinityKey); + GridCacheAffinityKey<String> key2 = new GridCacheAffinityKey<>("key2", affinityKey); + + GridCache<GridCacheAffinityKey<String>, String> cache = grid(0).cache(null); + + // Populate cache. + cache.putAll(F.asMap( + key1, "val1", + key2, "val2") + ); + + for (int i = 0; i < gridCount(); i++) { + Ignite g = grid(i); + + GridCache<Object, Object> gCache = g.cache(null); + + if (gCache.affinity().isPrimaryOrBackup(g.cluster().localNode(), key1)) + assertEquals("For index: " + i, "val1", gCache.peek(key1)); + + if (gCache.affinity().isPrimaryOrBackup(g.cluster().localNode(), key2)) + assertEquals("For index: " + i, "val2", gCache.peek(key2)); + } + + grid(0).events().localListen(locks, EVT_CACHE_OBJECT_LOCKED); + grid(0).events().localListen(unlocks, EVT_CACHE_OBJECT_UNLOCKED); + + try (IgniteTx tx = cache.txStartAffinity(affinityKey, concurrency, isolation, 0, 2)) { + if (concurrency == PESSIMISTIC) + assertTrue("Failed to wait for lock events: " + affinityKey, locks.awaitKeys(WAIT_TIMEOUT, affinityKey)); + else + assertEquals("Unexpected number of lock events: " + locks.affectedKeys(), 0, locks.affectedKeys().size()); + + assertEquals("Unexpected number of unlock events: " + unlocks.affectedKeys(), 0, unlocks.affectedKeys().size()); + + assertEquals("val1", cache.get(key1)); + + assertEquals("val2", cache.get(key2)); + + cache.put(key1, "val01"); + + cache.put(key2, "val02"); + + tx.commit(); + } + + // Check that there are no further locks after transaction commit. + assertEquals("Unexpected number of lock events: " + locks.affectedKeys(), 1, locks.affectedKeys().size()); + assertTrue("Failed to wait for unlock events: " + affinityKey, unlocks.awaitKeys(WAIT_TIMEOUT, affinityKey)); + } + + /** + * @param concurrency Transaction concurrency mode. + * @param isolation Transaction isolation mode. + * @throws Exception If failed. + */ + private void checkGetPutEmptyCache(IgniteTxConcurrency concurrency, IgniteTxIsolation isolation) throws Exception { + CollectingEventListener locks = new CollectingEventListener(); + CollectingEventListener unlocks = new CollectingEventListener(); + + UUID affinityKey = primaryKeyForCache(grid(0)); + + GridCacheAffinityKey<String> key1 = new GridCacheAffinityKey<>("key1", affinityKey); + GridCacheAffinityKey<String> key2 = new GridCacheAffinityKey<>("key2", affinityKey); + + GridCache<GridCacheAffinityKey<String>, String> cache = grid(0).cache(null); + + grid(0).events().localListen(locks, EVT_CACHE_OBJECT_LOCKED); + grid(0).events().localListen(unlocks, EVT_CACHE_OBJECT_UNLOCKED); + + try (IgniteTx tx = cache.txStartAffinity(affinityKey, concurrency, isolation, 0, 2)) { + if (concurrency == PESSIMISTIC) + assertTrue("Failed to wait for lock events: " + affinityKey, locks.awaitKeys(WAIT_TIMEOUT, affinityKey)); + else + assertEquals("Unexpected number of lock events: " + locks.affectedKeys(), 0, locks.affectedKeys().size()); + + assertEquals("Unexpected number of unlock events: " + unlocks.affectedKeys(), 0, unlocks.affectedKeys().size()); + + assertEquals(null, cache.get(key1)); + + assertEquals(null, cache.get(key2)); + + cache.put(key1, "val01"); + + cache.put(key2, "val02"); + + tx.commit(); + } + + // Check that there are no further locks after transaction commit. + assertEquals("Unexpected number of lock events: " + locks.affectedKeys(), 1, locks.affectedKeys().size()); + assertTrue("Failed to wait for unlock events: " + affinityKey, unlocks.awaitKeys(WAIT_TIMEOUT, affinityKey)); + + for (int i = 0; i < gridCount(); i++) { + Ignite g = grid(i); + + GridCache<Object, Object> gCache = g.cache(null); + + if (gCache.affinity().isPrimaryOrBackup(g.cluster().localNode(), key1)) + assertEquals("For index: " + i, "val01", gCache.peek(key1)); + + if (gCache.affinity().isPrimaryOrBackup(g.cluster().localNode(), key2)) + assertEquals("For index: " + i, "val02", gCache.peek(key2)); + } + } + + /** + * @throws Exception If failed. + */ + public void testGetRemoveOptimisticReadCommitted() throws Exception { + checkGetRemove(OPTIMISTIC, READ_COMMITTED); + } + + /** + * @throws Exception If failed. + */ + public void testGetRemoveOptimisticRepeatableRead() throws Exception { + checkGetRemove(OPTIMISTIC, REPEATABLE_READ); + } + + /** + * @throws Exception If failed. + */ + public void testGetRemovePessimisticReadCommitted() throws Exception { + checkGetRemove(PESSIMISTIC, READ_COMMITTED); + } + + /** + * @throws Exception If failed. + */ + public void testGetRemovePessimisticRepeatableRead() throws Exception { + checkGetRemove(PESSIMISTIC, REPEATABLE_READ); + } + + /** + * @param concurrency Transaction concurrency mode. + * @param isolation Transaction isolation mode. + * @throws Exception If failed. + */ + private void checkGetRemove(IgniteTxConcurrency concurrency, IgniteTxIsolation isolation) throws Exception { + CollectingEventListener locks = new CollectingEventListener(); + CollectingEventListener unlocks = new CollectingEventListener(); + + UUID affinityKey = primaryKeyForCache(grid(0)); + + GridCacheAffinityKey<String> key1 = new GridCacheAffinityKey<>("key1", affinityKey); + GridCacheAffinityKey<String> key2 = new GridCacheAffinityKey<>("key2", affinityKey); + + GridCache<GridCacheAffinityKey<String>, String> cache = grid(0).cache(null); + + // Populate cache. + cache.putAll(F.asMap( + key1, "val1", + key2, "val2") + ); + + for (int i = 0; i < gridCount(); i++) { + Ignite g = grid(i); + + GridCache<Object, Object> gCache = g.cache(null); + + if (gCache.affinity().isPrimaryOrBackup(g.cluster().localNode(), key1)) + assertEquals("For index: " + i, "val1", gCache.peek(key1)); + + if (gCache.affinity().isPrimaryOrBackup(g.cluster().localNode(), key2)) + assertEquals("For index: " + i, "val2", gCache.peek(key2)); + } + + grid(0).events().localListen(locks, EVT_CACHE_OBJECT_LOCKED); + grid(0).events().localListen(unlocks, EVT_CACHE_OBJECT_UNLOCKED); + + try (IgniteTx tx = cache.txStartAffinity(affinityKey, concurrency, isolation, 0, 2)) { + if (concurrency == PESSIMISTIC) + assertTrue("Failed to wait for lock events: " + affinityKey, locks.awaitKeys(WAIT_TIMEOUT, affinityKey)); + else + assertEquals("Unexpected number of lock events: " + locks.affectedKeys(), 0, locks.affectedKeys().size()); + + assertEquals("Unexpected number of unlock events: " + unlocks.affectedKeys(), 0, unlocks.affectedKeys().size()); + + assertEquals("val1", cache.get(key1)); + + assertEquals("val2", cache.get(key2)); + + cache.remove(key1); + + cache.remove(key2); + + tx.commit(); + } + + for (int i = 0; i < gridCount(); i++) { + assertNull("For cache [i=" + i + ", val=" + cache(i).peek(key1) + ']', cache(i).peek(key1)); + assertNull("For cache [i=" + i + ", val=" + cache(i).peek(key2) + ']', cache(i).peek(key2)); + + assertTrue("For cache [idx=" + i + ", keySet=" + cache(i).keySet() + ']', cache(i).size() <= 1); + } + + // Check that there are no further locks after transaction commit. + assertEquals("Unexpected number of lock events: " + locks.affectedKeys(), 1, locks.affectedKeys().size()); + assertTrue("Failed to wait for unlock events: " + affinityKey, unlocks.awaitKeys(WAIT_TIMEOUT, affinityKey)); + } + + /** + * @throws Exception If failed. + */ + public void testGetAfterPutOptimistic() throws Exception { + checkGetAfterPut(OPTIMISTIC); + } + + /** + * @throws Exception If failed. + */ + public void testGetAfterPut() throws Exception { + checkGetAfterPut(PESSIMISTIC); + } + + /** + * @param concurrency Transaction concurrency mode. + * @throws Exception If failed. + */ + private void checkGetAfterPut(IgniteTxConcurrency concurrency) throws Exception { + CollectingEventListener locks = new CollectingEventListener(); + CollectingEventListener unlocks = new CollectingEventListener(); + + UUID affinityKey = primaryKeyForCache(grid(0)); + + GridCacheAffinityKey<String> key1 = new GridCacheAffinityKey<>("key1", affinityKey); + GridCacheAffinityKey<String> key2 = new GridCacheAffinityKey<>("key2", affinityKey); + + GridCache<GridCacheAffinityKey<String>, String> cache = grid(0).cache(null); + + // Populate cache. + cache.putAll(F.asMap( + key1, "val1", + key2, "val2") + ); + + for (int i = 0; i < gridCount(); i++) { + Ignite g = grid(i); + + GridCache<Object, Object> gCache = g.cache(null); + + if (gCache.affinity().isPrimaryOrBackup(g.cluster().localNode(), key1)) + assertEquals("For index: " + i, "val1", gCache.peek(key1)); + + if (gCache.affinity().isPrimaryOrBackup(g.cluster().localNode(), key2)) + assertEquals("For index: " + i, "val2", gCache.peek(key2)); + } + + grid(0).events().localListen(locks, EVT_CACHE_OBJECT_LOCKED); + grid(0).events().localListen(unlocks, EVT_CACHE_OBJECT_UNLOCKED); + + try (IgniteTx tx = cache.txStartAffinity(affinityKey, concurrency, READ_COMMITTED, 0, 2)) { + if (concurrency == PESSIMISTIC) + assertTrue("Failed to wait for lock events: " + affinityKey, locks.awaitKeys(WAIT_TIMEOUT, affinityKey)); + else + assertEquals("Unexpected number of lock events: " + locks.affectedKeys(), 0, locks.affectedKeys().size()); + + assertEquals("Unexpected number of unlock events: " + unlocks.affectedKeys(), 0, unlocks.affectedKeys().size()); + + assertEquals("val1", cache.get(key1)); + + assertEquals("val2", cache.get(key2)); + + cache.put(key1, "val01"); + + cache.put(key2, "val02"); + + assertEquals("val01", cache.get(key1)); + + assertEquals("val02", cache.get(key2)); + + tx.commit(); + } + + // Check that there are no further locks after transaction commit. + assertEquals("Unexpected number of lock events: " + locks.affectedKeys(), 1, locks.affectedKeys().size()); + assertTrue("Failed to wait for unlock events: " + affinityKey, unlocks.awaitKeys(WAIT_TIMEOUT, affinityKey)); + + assertEquals("val01", cache.get(key1)); + + assertEquals("val02", cache.get(key2)); + } + + /** + * @throws Exception If failed. + */ + public void testGetRepeatableReadOptimistic() throws Exception { + checkGetRepeatableRead(OPTIMISTIC); + } + + /** + * @throws Exception If failed. + */ + public void testGetRepeatableReadPessimistic() throws Exception { + checkGetRepeatableRead(PESSIMISTIC); + } + + /** + * @param concurrency Transaction concurrency mode. + * @throws Exception If failed. + */ + private void checkGetRepeatableRead(IgniteTxConcurrency concurrency) throws Exception { + UUID key = primaryKeyForCache(grid(0)); + + cache(0).put(key, "val"); + + try (IgniteTx ignored = cache(0).txStartPartition(cache(0).affinity().partition(key), concurrency, + REPEATABLE_READ, 0, 1)) { + assertEquals("val", cache(0).get(key)); + } + } + + /** + * @throws Exception If failed. + */ + public void testGroupLockPutWrongKeyOptimistic() throws Exception { + checkGroupLockPutWrongKey(OPTIMISTIC); + } + + /** + * @throws Exception If failed. + */ + public void testGroupLockPutWrongKeyPessimistic() throws Exception { + checkGroupLockPutWrongKey(PESSIMISTIC); + } + + /** + * @param concurrency Transaction concurrency mode. + * @throws Exception If failed. + */ + private void checkGroupLockPutWrongKey(IgniteTxConcurrency concurrency) throws Exception { + UUID affinityKey = primaryKeyForCache(grid(0)); + + final GridCache<GridCacheAffinityKey<String>, String> cache = grid(0).cache(null); + + try (IgniteTx ignored = cache.txStartAffinity(affinityKey, concurrency, READ_COMMITTED, 0, 1)) { + // Key with affinity key different from enlisted on tx start should raise exception. + cache.put(new GridCacheAffinityKey<>("key1", UUID.randomUUID()), "val1"); + + fail("Exception should be thrown"); + } + catch (IgniteCheckedException ignored) { + // Expected exception. + } + + assertNull(cache.tx()); + } + + /** + * @throws Exception If failed. + */ + public void testGroupLockRemoveWrongKeyOptimistic() throws Exception { + checkGroupLockRemoveWrongKey(OPTIMISTIC); + } + + /** + * @throws Exception If failed. + */ + public void testGroupLockRemoveWrongKeyPessimistic() throws Exception { + checkGroupLockRemoveWrongKey(PESSIMISTIC); + } + + /** + * @param concurrency Transaction concurrency mode. + * @throws Exception If failed. + */ + private void checkGroupLockRemoveWrongKey(IgniteTxConcurrency concurrency) throws Exception { + UUID affinityKey = primaryKeyForCache(grid(0)); + + final GridCache<GridCacheAffinityKey<String>, String> cache = grid(0).cache(null); + + final GridCacheAffinityKey<String> key = new GridCacheAffinityKey<>("key1", UUID.randomUUID()); + + cache.put(key, "val"); + + try (IgniteTx ignored = cache.txStartAffinity(affinityKey, concurrency, READ_COMMITTED, 0, 1)) { + // Key with affinity key different from enlisted on tx start should raise exception. + cache.remove(key); + + fail("Exception should be thrown."); + } + catch (IgniteCheckedException ignored) { + // Expected exception. + } + + assertNull(cache.tx()); + } + + /** + * @throws Exception If failed. + */ + public void testGroupLockReadAffinityKeyPessimitsticRepeatableRead() throws Exception { + checkGroupLockReadAffinityKey(PESSIMISTIC, REPEATABLE_READ); + } + + /** + * @throws Exception If failed. + */ + public void testGroupLockReadAffinityKeyPessimitsticReadCommitted() throws Exception { + checkGroupLockReadAffinityKey(PESSIMISTIC, READ_COMMITTED); + } + + /** + * @throws Exception If failed. + */ + public void testGroupLockReadAffinityKeyOptimisticRepeatableRead() throws Exception { + checkGroupLockReadAffinityKey(OPTIMISTIC, REPEATABLE_READ); + } + + /** + * @throws Exception If failed. + */ + public void testGroupLockReadAffinityKeyOptimisticReadCommitted() throws Exception { + checkGroupLockReadAffinityKey(OPTIMISTIC, READ_COMMITTED); + } + + /** + * @param concurrency Concurrency. + * @param isolation Isolation. + * @throws Exception If failed. + */ + private void checkGroupLockReadAffinityKey(IgniteTxConcurrency concurrency, IgniteTxIsolation isolation) + throws Exception { + UUID affinityKey = primaryKeyForCache(grid(0)); + + final GridCache<Object, String> cache = grid(0).cache(null); + + final GridCacheAffinityKey<String> key1 = new GridCacheAffinityKey<>("key1", affinityKey); + final GridCacheAffinityKey<String> key2 = new GridCacheAffinityKey<>("key2", affinityKey); + + cache.put(affinityKey, "0"); + cache.put(key1, "0"); + cache.put(key2, "0"); + + try (IgniteTx tx = cache.txStartAffinity(affinityKey, concurrency, isolation, 0, 3)) { + assertEquals("0", cache.get(affinityKey)); + assertEquals("0", cache.get(key1)); + assertEquals("0", cache.get(key2)); + + cache.put(affinityKey, "1"); + cache.put(key1, "1"); + cache.put(key2, "1"); + + assertEquals("1", cache.get(affinityKey)); + assertEquals("1", cache.get(key1)); + assertEquals("1", cache.get(key2)); + + tx.commit(); + } + + assertEquals("1", cache.get(affinityKey)); + assertEquals("1", cache.get(key1)); + assertEquals("1", cache.get(key2)); + } + + /** + * @throws Exception If failed. + */ + public void testGroupLockWriteThroughBatchUpdateOptimistic() throws Exception { + checkGroupLockWriteThrough(OPTIMISTIC); + } + + /** + * @throws Exception If failed. + */ + public void testGroupLockWriteThroughBatchUpdatePessimistic() throws Exception { + checkGroupLockWriteThrough(PESSIMISTIC); + } + + /** + * @param concurrency Transaction concurrency mode. + * @throws Exception If failed. + */ + private void checkGroupLockWriteThrough(IgniteTxConcurrency concurrency) throws Exception { + UUID affinityKey = primaryKeyForCache(grid(0)); + + GridCache<GridCacheAffinityKey<String>, String> cache = grid(0).cache(null); + + GridCacheAffinityKey<String> key1 = new GridCacheAffinityKey<>("key1", affinityKey); + GridCacheAffinityKey<String> key2 = new GridCacheAffinityKey<>("key2", affinityKey); + GridCacheAffinityKey<String> key3 = new GridCacheAffinityKey<>("key3", affinityKey); + GridCacheAffinityKey<String> key4 = new GridCacheAffinityKey<>("key4", affinityKey); + + Map<GridCacheAffinityKey<String>, String> putMap = F.asMap( + key1, "val1", + key2, "val2", + key3, "val3", + key4, "val4"); + + try (IgniteTx tx = cache.txStartAffinity(affinityKey, concurrency, READ_COMMITTED, 0, 4)) { + cache.put(key1, "val1"); + cache.put(key2, "val2"); + cache.put(key3, "val3"); + cache.put(key4, "val4"); + + tx.commit(); + } + + for (int i = 0; i < gridCount(); i++) { + Ignite g = grid(i); + + GridCache<Object, Object> gCache = g.cache(null); + + if (gCache.affinity().isPrimaryOrBackup(g.cluster().localNode(), key1)) + assertEquals("For index: " + i, "val1", gCache.peek(key1)); + + if (gCache.affinity().isPrimaryOrBackup(g.cluster().localNode(), key2)) + assertEquals("For index: " + i, "val2", gCache.peek(key2)); + + if (gCache.affinity().isPrimaryOrBackup(g.cluster().localNode(), key3)) + assertEquals("For index: " + i, "val3", gCache.peek(key3)); + + if (gCache.affinity().isPrimaryOrBackup(g.cluster().localNode(), key4)) + assertEquals("For index: " + i, "val4", gCache.peek(key4)); + } + + // Check the store. + assertTrue(store.storeMap().equals(putMap)); + assertEquals(1, store.putCount()); + } + + /** @return {@code True} if sanity check should be enabled. */ + private boolean sanityCheckEnabled() { + return !getName().contains("SanityCheckDisabled"); + } + + /** + * @param primary Primary node for which key should be calculated. + * @return Key for which given node is primary. + * @throws IgniteCheckedException If affinity can not be calculated. + */ + protected UUID primaryKeyForCache(Ignite primary) throws IgniteCheckedException { + UUID res; + + int cnt = 0; + + UUID primaryId = primary.cluster().localNode().id(); + + do { + res = UUID.randomUUID(); + + cnt++; + + if (cnt > 10000) + throw new IllegalStateException("Cannot find key for primary node: " + primaryId); + } + while (!primary.cluster().mapKeyToNode(null, res).id().equals(primaryId)); + + return res; + } + + /** + * @param primary Primary node for which keys should be calculated. + * @param cnt Key count. + * @return Collection of generated keys. + * @throws IgniteCheckedException If affinity can not be calculated. + */ + protected UUID[] primaryKeysForCache(Ignite primary, int cnt) throws IgniteCheckedException { + Collection<UUID> keys = new LinkedHashSet<>(); + + int iters = 0; + + do { + keys.add(primaryKeyForCache(primary)); + + iters++; + + if (iters > 10000) + throw new IllegalStateException("Cannot find keys for primary node [nodeId=" + + primary.cluster().localNode().id() + ", cnt=" + cnt + ']'); + } + while (keys.size() < cnt); + + UUID[] res = new UUID[keys.size()]; + + return keys.toArray(res); + } + + /** Event listener that collects all incoming events. */ + protected static class CollectingEventListener implements IgnitePredicate<IgniteEvent> { + /** Collected events. */ + private final Collection<Object> affectedKeys = new GridConcurrentLinkedHashSet<>(); + + /** {@inheritDoc} */ + @Override public boolean apply(IgniteEvent evt) { + assert evt.type() == EVT_CACHE_OBJECT_LOCKED || evt.type() == EVT_CACHE_OBJECT_UNLOCKED; + + IgniteCacheEvent cacheEvt = (IgniteCacheEvent)evt; + + synchronized (this) { + affectedKeys.add(cacheEvt.key()); + + notifyAll(); + } + + return true; + } + + /** @return Collection of affected keys. */ + public Collection<Object> affectedKeys() { + return affectedKeys; + } + + /** + * Waits until events received for all supplied keys. + * + * @param timeout Timeout to wait. + * @param keys Keys to wait for. + * @return {@code True} if wait was successful, {@code false} if wait timed out. + * @throws InterruptedException If thread was interrupted. + */ + public boolean awaitKeys(long timeout, Object... keys) throws InterruptedException { + long start = System.currentTimeMillis(); + + Collection<Object> keysCol = Arrays.asList(keys); + + synchronized (this) { + while (true) { + long now = System.currentTimeMillis(); + + if (affectedKeys.containsAll(keysCol)) + return true; + else if (start + timeout > now) + wait(start + timeout - now); + else + return false; + } + } + } + } + + /** Test store that accumulates values into map. */ + private static class TestStore extends CacheStoreAdapter<Object, Object> { + /** */ + private ConcurrentMap<Object, Object> storeMap = new ConcurrentHashMap8<>(); + + /** */ + private AtomicInteger putCnt = new AtomicInteger(); + + /** {@inheritDoc} */ + @Override public Object load(Object key) { + return null; + } + + /** {@inheritDoc} */ + @Override public void writeAll(Collection<Cache.Entry<?, ?>> entries) { + for (Cache.Entry<?, ?> e : entries) + storeMap.put(e.getKey(), e.getValue()); + + putCnt.incrementAndGet(); + } + + /** {@inheritDoc} */ + @Override public void write(Cache.Entry<?, ?> e) { + storeMap.put(e.getKey(), e.getValue()); + + putCnt.incrementAndGet(); + } + + /** {@inheritDoc} */ + @Override public void delete(Object key) { + storeMap.remove(key); + } + + /** @return Stored values map. */ + public ConcurrentMap<Object, Object> storeMap() { + return storeMap; + } + + /** @return Number of calls to put(). */ + public int putCount() { + return putCnt.get(); + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheGroupLockFailoverOptimisticTxSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheGroupLockFailoverOptimisticTxSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheGroupLockFailoverOptimisticTxSelfTest.java new file mode 100644 index 0000000..16d58c0 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheGroupLockFailoverOptimisticTxSelfTest.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +/** + * Tests optimistic group lock transactions. + */ +public class GridCacheGroupLockFailoverOptimisticTxSelfTest extends GridCacheGroupLockFailoverSelfTest { + /** {@inheritDoc} */ + @Override protected boolean optimisticTx() { + return true; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheGroupLockFailoverSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheGroupLockFailoverSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheGroupLockFailoverSelfTest.java new file mode 100644 index 0000000..8758b21 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheGroupLockFailoverSelfTest.java @@ -0,0 +1,532 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import com.google.common.collect.*; +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.spi.failover.*; +import org.apache.ignite.spi.failover.always.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.util.*; +import java.util.concurrent.*; + +import static org.apache.ignite.cache.GridCacheAtomicityMode.*; +import static org.apache.ignite.cache.GridCacheDistributionMode.*; +import static org.apache.ignite.cache.GridCacheWriteSynchronizationMode.*; + +/** + * Tests group lock transaction failover. + */ +public class GridCacheGroupLockFailoverSelfTest extends GridCommonAbstractTest { + /** IP finder. */ + private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** Size of the test map. */ + private static final int TEST_MAP_SIZE = 200000; + + /** Cache name. */ + private static final String CACHE_NAME = "partitioned"; + + /** Size of data chunk, sent to a remote node. */ + private static final int DATA_CHUNK_SIZE = 1000; + + /** Number of chunk on which to fail worker node. */ + public static final int FAIL_ON_CHUNK_NO = (TEST_MAP_SIZE / DATA_CHUNK_SIZE) / 3; + + /** */ + private static final int FAILOVER_PUSH_GAP = 30; + + /** Master node name. */ + private static final String MASTER = "master"; + + /** Near enabled flag. */ + private boolean nearEnabled; + + /** Backups count. */ + private int backups; + + /** Filter to include only worker nodes. */ + private static final IgnitePredicate<ClusterNode> workerNodesFilter = new PN() { + @SuppressWarnings("unchecked") + @Override public boolean apply(ClusterNode n) { + return "worker".equals(n.attribute("segment")); + } + }; + + /** + * Result future queue (restrict the queue size + * to 50 in order to prevent in-memory data grid from over loading). + */ + private final BlockingQueue<ComputeTaskFuture<?>> resQueue = new LinkedBlockingQueue<>(10); + + /** + * @return {@code True} if test should use optimistic transactions. + */ + protected boolean optimisticTx() { + return false; + } + + /** + * @throws Exception If failed. + */ + public void testPutAllFailoverGroupLockNearEnabledOneBackup() throws Exception { + checkPutAllFailoverGroupLock(true, 3, 1); + } + + /** + * @throws Exception If failed. + */ + public void testPutAllFailoverGroupLockNearDisabledOneBackup() throws Exception { + checkPutAllFailoverGroupLock(false, 3, 1); + } + + /** + * @throws Exception If failed. + */ + public void testPutAllFailoverGroupLockNearEnabledTwoBackups() throws Exception { + checkPutAllFailoverGroupLock(true, 5, 2); + } + + /** + * @throws Exception If failed. + */ + public void testPutAllFailoverGroupLockNearDisabledTwoBackups() throws Exception { + checkPutAllFailoverGroupLock(false, 5, 2); + } + + /** + * @throws Exception If failed. + */ + public void testPutAllFailoverGroupLockNearEnabledThreeBackups() throws Exception { + checkPutAllFailoverGroupLock(true, 7, 3); + } + + /** + * @throws Exception If failed. + */ + public void testPutAllFailoverGroupLockNearDisabledThreeBackups() throws Exception { + checkPutAllFailoverGroupLock(false, 7, 3); + } + + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return super.getTestTimeout() * 5; + } + + /** + * Tests putAll() method along with failover and cache backup. + * + * Checks that the resulting primary cache size is the same as + * expected. + * + * @param near {@code True} for near enabled. + * @param workerCnt Workers count. + * @param shutdownCnt Shutdown count. + * @throws Exception If failed. + */ + public void checkPutAllFailoverGroupLock(boolean near, int workerCnt, int shutdownCnt) throws Exception { + nearEnabled = near; + backups = shutdownCnt; + + Collection<Integer> testKeys = generateTestKeys(); + + Ignite master = startGrid(MASTER); + + List<Ignite> workers = new ArrayList<>(workerCnt); + + for (int i = 1; i <= workerCnt; i++) + workers.add(startGrid("worker" + i)); + + info("Master: " + master.cluster().localNode().id()); + + List<Ignite> runningWorkers = new ArrayList<>(workerCnt); + + for (int i = 1; i <= workerCnt; i++) { + UUID id = workers.get(i - 1).cluster().localNode().id(); + + info(String.format("Worker%d: %s", i, id)); + + runningWorkers.add(workers.get(i - 1)); + } + + try { + // Dummy call to fetch affinity function from remote node + master.cluster().mapKeyToNode(CACHE_NAME, "Dummy"); + + Map<UUID, Collection<Integer>> dataChunks = new HashMap<>(); + + int chunkCntr = 0; + + int failoverPushGap = 0; + + for (Integer key : testKeys) { + ClusterNode mappedNode = master.cluster().mapKeyToNode(CACHE_NAME, key); + + UUID nodeId = mappedNode.id(); + + Collection<Integer> data = dataChunks.get(nodeId); + + if (data == null) { + data = new ArrayList<>(DATA_CHUNK_SIZE); + + dataChunks.put(nodeId, data); + } + + data.add(key); + + if (data.size() == DATA_CHUNK_SIZE) { // time to send data + chunkCntr++; + + info("Pushing data chunk: " + chunkCntr); + + submitDataChunk(master, nodeId, data); + + data = new ArrayList<>(DATA_CHUNK_SIZE); + + dataChunks.put(nodeId, data); + + if (chunkCntr >= FAIL_ON_CHUNK_NO) { + if (workerCnt - runningWorkers.size() < shutdownCnt) { + if (failoverPushGap > 0) + failoverPushGap--; + else { + Ignite victim = runningWorkers.remove(0); + + info("Shutting down node: " + victim.cluster().localNode().id()); + + stopGrid(victim.name()); + + // Fail next node after some jobs have been pushed. + failoverPushGap = FAILOVER_PUSH_GAP; + } + } + } + } + } + + // Submit the rest of data. + for (Map.Entry<UUID, Collection<Integer>> entry : dataChunks.entrySet()) + submitDataChunk(master, entry.getKey(), entry.getValue()); + + // Wait for queue to empty. + info("Waiting for empty queue..."); + + long seenSize = resQueue.size(); + + while (true) { + U.sleep(10000); + + if (!resQueue.isEmpty()) { + long size = resQueue.size(); + + if (seenSize == size) { + info(">>> Failed to wait for queue to empty."); + + break; + } + + seenSize = size; + } + else + break; + } + + Collection<Integer> absentKeys = findAbsentKeys(runningWorkers.get(0), testKeys); + + info(">>> Absent keys: " + absentKeys); + + assertTrue(absentKeys.isEmpty()); + + // Actual primary cache size. + int primaryCacheSize = 0; + + for (Ignite g : runningWorkers) { + info(">>>>> " + g.cache(CACHE_NAME).size()); + + primaryCacheSize += g.cache(CACHE_NAME).primarySize(); + } + + assertTrue(TEST_MAP_SIZE <= primaryCacheSize); + } + finally { + stopAllGrids(); + } + } + + /** + * Does remapping. + * @param master Master grid. + * @param keys Keys. + * @throws IgniteCheckedException If failed. + */ + private void remap(final Ignite master, Iterable<Integer> keys) throws IgniteCheckedException { + Map<UUID, Collection<Integer>> dataChunks = new HashMap<>(); + + for (Integer key : keys) { + ClusterNode mappedNode = master.cluster().mapKeyToNode(CACHE_NAME, key); + + UUID nodeId = mappedNode.id(); + + Collection<Integer> data = dataChunks.get(nodeId); + + if (data == null) { + data = new ArrayList<>(DATA_CHUNK_SIZE); + + dataChunks.put(nodeId, data); + } + + data.add(key); + } + + for (Map.Entry<UUID, Collection<Integer>> entry : dataChunks.entrySet()) + submitDataChunk(master, entry.getKey(), entry.getValue()); + } + + /** + * Submits next data chunk as grid task. Blocks if queue is full. + * + * @param master Master node to submit from. + * @param preferredNodeId Node id to execute job on. + * @param dataChunk Data chunk to put in cache. + * @throws IgniteCheckedException If failed. + */ + private void submitDataChunk(final Ignite master, UUID preferredNodeId, final Collection<Integer> dataChunk) + throws IgniteCheckedException { + ClusterGroup prj = master.cluster().forPredicate(workerNodesFilter); + + IgniteCompute comp = master.compute(prj).enableAsync(); + + comp.execute(new GridCacheGroupLockPutTask(preferredNodeId, CACHE_NAME, optimisticTx()), dataChunk); + + ComputeTaskFuture<Void> fut = comp.future(); + + fut.listenAsync(new CI1<IgniteFuture<Void>>() { + @Override public void apply(IgniteFuture<Void> f) { + ComputeTaskFuture taskFut = (ComputeTaskFuture)f; + + boolean fail = false; + + try { + f.get(); //if something went wrong - we'll get exception here + } + catch (IgniteCheckedException ignore) { + info("Put task failed, going to remap keys: " + dataChunk.size()); + + fail = true; + } + finally { + // Remove complete future from queue to allow other jobs to proceed. + resQueue.remove(taskFut); + + try { + if (fail) + remap(master, dataChunk); + } + catch (IgniteCheckedException e) { + info("Failed to remap task [data=" + dataChunk.size() + ", e=" + e + ']'); + } + } + } + }); + + try { + resQueue.put(fut); + + if (fut.isDone()) + resQueue.remove(fut); + } + catch (InterruptedException ignored) { + info(">>>> Failed to wait for future submission: " + fut); + + Thread.currentThread().interrupt(); + } + } + + /** + * Tries to find keys, that are absent in cache. + * + * @param workerNode Worker node. + * @param keys Keys that are suspected to be absent + * @return List of absent keys. If no keys are absent, the list is empty. + * @throws IgniteCheckedException If error occurs. + */ + private Collection<Integer> findAbsentKeys(Ignite workerNode, + Collection<Integer> keys) throws IgniteCheckedException { + + Collection<Integer> ret = new ArrayList<>(keys.size()); + + GridCache<Object, Object> cache = workerNode.cache(CACHE_NAME); + + for (Integer key : keys) { + if (cache.get(key) == null) // Key is absent. + ret.add(key); + } + + return ret; + } + + /** + * Generates a test keys collection. + * + * @return A test keys collection. + */ + private Collection<Integer> generateTestKeys() { + Collection<Integer> ret = new ArrayList<>(TEST_MAP_SIZE); + + for (int i = 0; i < TEST_MAP_SIZE; i++) + ret.add(i); + + return ret; + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setPeerClassLoadingEnabled(false); + + cfg.setDeploymentMode(IgniteDeploymentMode.CONTINUOUS); + + TcpDiscoverySpi discoverySpi = new TcpDiscoverySpi(); + + discoverySpi.setAckTimeout(60000); + discoverySpi.setIpFinder(ipFinder); + + cfg.setDiscoverySpi(discoverySpi); + + if (gridName.startsWith("master")) { + cfg.setUserAttributes(ImmutableMap.of("segment", "master")); + + GridTestFailoverSpi failoverSpi = new GridTestFailoverSpi(true, (IgnitePredicate)workerNodesFilter); + + // For sure. + failoverSpi.setMaximumFailoverAttempts(50); + + cfg.setFailoverSpi(failoverSpi); + } + else if (gridName.startsWith("worker")) { + GridTestFailoverSpi failoverSpi = new GridTestFailoverSpi(false); + + cfg.setFailoverSpi(failoverSpi); + + cfg.setUserAttributes(ImmutableMap.of("segment", "worker")); + + CacheConfiguration cacheCfg = defaultCacheConfiguration(); + cacheCfg.setName("partitioned"); + cacheCfg.setCacheMode(GridCacheMode.PARTITIONED); + cacheCfg.setStartSize(4500000); + cacheCfg.setBackups(backups); + cacheCfg.setStoreValueBytes(true); + cacheCfg.setDistributionMode(nearEnabled ? NEAR_PARTITIONED : PARTITIONED_ONLY); + cacheCfg.setQueryIndexEnabled(false); + cacheCfg.setWriteSynchronizationMode(FULL_SYNC); + cacheCfg.setAtomicityMode(TRANSACTIONAL); + + cfg.setCacheConfiguration(cacheCfg); + } + else + throw new IllegalStateException("Unexpected grid name: " + gridName); + + return cfg; + } + + /** + * Test failover SPI that remembers the job contexts of failed jobs. + */ + private class GridTestFailoverSpi extends AlwaysFailoverSpi { + /** */ + private static final String FAILOVER_NUMBER_ATTR = "failover:number:attr"; + + /** */ + private final boolean master; + + /** */ + private Set<ComputeJobContext> failedOverJobs = new HashSet<>(); + + /** Node filter. */ + private IgnitePredicate<? super ClusterNode>[] filter; + + /** + * @param master Master flag. + * @param filter Filters. + */ + @SafeVarargs + GridTestFailoverSpi(boolean master, IgnitePredicate<? super ClusterNode>... filter) { + this.master = master; + this.filter = filter; + } + + /** {@inheritDoc} */ + @Override public ClusterNode failover(FailoverContext ctx, List<ClusterNode> top) { + List<ClusterNode> cp = null; + if (master) { + failedOverJobs.add(ctx.getJobResult().getJobContext()); + + // Clear failed nodes list - allow to failover on the same node. + ctx.getJobResult().getJobContext().setAttribute(FAILED_NODE_LIST_ATTR, null); + + // Account for maximum number of failover attempts since we clear failed node list. + Integer failoverCnt = ctx.getJobResult().getJobContext().getAttribute(FAILOVER_NUMBER_ATTR); + + if (failoverCnt == null) + ctx.getJobResult().getJobContext().setAttribute(FAILOVER_NUMBER_ATTR, 1); + else { + if (failoverCnt >= getMaximumFailoverAttempts()) { + info("Job failover failed because number of maximum failover attempts is exceeded " + + "[failedJob=" + ctx.getJobResult().getJob() + ", maxFailoverAttempts=" + + getMaximumFailoverAttempts() + ']'); + + return null; + } + + ctx.getJobResult().getJobContext().setAttribute(FAILOVER_NUMBER_ATTR, failoverCnt + 1); + } + + cp = new ArrayList<>(top); + + // Keep collection type. + F.retain(cp, false, new IgnitePredicate<ClusterNode>() { + @Override public boolean apply(ClusterNode node) { + return F.isAll(node, filter); + } + }); + } + + return super.failover(ctx, cp); //use cp to ensure we don't failover on failed node + } + + /** + * @return Job contexts for failed over jobs. + */ + public Set<ComputeJobContext> getFailedOverJobs() { + return failedOverJobs; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheGroupLockMultiNodeAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheGroupLockMultiNodeAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheGroupLockMultiNodeAbstractSelfTest.java new file mode 100644 index 0000000..f581a0a --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheGroupLockMultiNodeAbstractSelfTest.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +/** + * Abstract test for multi-node group lock tests. + */ +public abstract class GridCacheGroupLockMultiNodeAbstractSelfTest extends GridCacheGroupLockAbstractSelfTest { + /** {@inheritDoc} */ + @Override protected int gridCount() { + return 3; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheGroupLockPutTask.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheGroupLockPutTask.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheGroupLockPutTask.java new file mode 100644 index 0000000..4fa788b --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheGroupLockPutTask.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.resources.*; +import org.apache.ignite.transactions.*; +import org.apache.ignite.internal.util.typedef.*; +import org.jetbrains.annotations.*; + +import java.util.*; + +import static org.apache.ignite.transactions.IgniteTxConcurrency.*; +import static org.apache.ignite.transactions.IgniteTxIsolation.*; + +/** + * Puts all the passed data into partitioned cache in small chunks. + */ +class GridCacheGroupLockPutTask extends ComputeTaskAdapter<Collection<Integer>, Void> { + /** Preferred node. */ + private final UUID preferredNode; + + /** Cache name. */ + private final String cacheName; + + /** Optimistic transaction flag. */ + private final boolean optimistic; + + /** + * + * @param preferredNode A node that we'd prefer to take from grid. + * @param cacheName A name of the cache to work with. + * @param optimistic Optimistic transaction flag. + */ + GridCacheGroupLockPutTask(UUID preferredNode, String cacheName, boolean optimistic) { + this.preferredNode = preferredNode; + this.cacheName = cacheName; + this.optimistic = optimistic; + } + + /** + * This method is called to map or split grid task into multiple grid jobs. This is the first method that gets called + * when task execution starts. + * + * @param data Task execution argument. Can be {@code null}. This is the same argument as the one passed into {@code + * Grid#execute(...)} methods. + * @param subgrid Nodes available for this task execution. Note that order of nodes is guaranteed to be randomized by + * container. This ensures that every time you simply iterate through grid nodes, the order of nodes + * will be random which over time should result into all nodes being used equally. + * @return Map of grid jobs assigned to subgrid node. Unless {@link org.apache.ignite.compute.ComputeTaskContinuousMapper} is injected into task, if + * {@code null} or empty map is returned, exception will be thrown. + * @throws IgniteCheckedException If mapping could not complete successfully. This exception will be thrown out of {@link + * org.apache.ignite.compute.ComputeTaskFuture#get()} method. + */ + @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, + @Nullable final Collection<Integer> data) throws IgniteCheckedException { + assert !subgrid.isEmpty(); + + // Give preference to wanted node. Otherwise, take the first one. + ClusterNode targetNode = F.find(subgrid, subgrid.get(0), new IgnitePredicate<ClusterNode>() { + @Override public boolean apply(ClusterNode e) { + return preferredNode.equals(e.id()); + } + }); + + return Collections.singletonMap( + new ComputeJobAdapter() { + @IgniteLoggerResource + private IgniteLogger log; + + @IgniteInstanceResource + private Ignite ignite; + + @Override public Object execute() throws IgniteCheckedException { + log.info("Going to put data: " + data.size()); + + GridCache<Object, Object> cache = ignite.cache(cacheName); + + assert cache != null; + + Map<Integer, T2<Integer, Collection<Integer>>> putMap = groupData(data); + + for (Map.Entry<Integer, T2<Integer, Collection<Integer>>> entry : putMap.entrySet()) { + T2<Integer, Collection<Integer>> pair = entry.getValue(); + + Object affKey = pair.get1(); + + // Group lock partition. + try (IgniteTx tx = cache.txStartPartition(cache.affinity().partition(affKey), + optimistic ? OPTIMISTIC : PESSIMISTIC, REPEATABLE_READ, 0, pair.get2().size())) { + for (Integer val : pair.get2()) + cache.put(val, val); + + tx.commit(); + } + } + + log.info("Finished put data: " + data.size()); + + return data; + } + + /** + * Groups values by partitions. + * + * @param data Data to put. + * @return Grouped map. + */ + private Map<Integer, T2<Integer, Collection<Integer>>> groupData(Iterable<Integer> data) { + GridCache<Object, Object> cache = ignite.cache(cacheName); + + Map<Integer, T2<Integer, Collection<Integer>>> res = new HashMap<>(); + + for (Integer val : data) { + int part = cache.affinity().partition(val); + + T2<Integer, Collection<Integer>> tup = res.get(part); + + if (tup == null) { + tup = new T2<Integer, Collection<Integer>>(val, new LinkedList<Integer>()); + + res.put(part, tup); + } + + tup.get2().add(val); + } + + return res; + } + }, + targetNode); + } + + /** {@inheritDoc} */ + @Nullable @Override public Void reduce(List<ComputeJobResult> results) throws IgniteCheckedException { + return null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/968c3cf8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheIncrementTransformTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheIncrementTransformTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheIncrementTransformTest.java new file mode 100644 index 0000000..bb7d67f --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheIncrementTransformTest.java @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.testframework.*; +import org.apache.ignite.testframework.junits.common.*; + +import javax.cache.processor.*; +import java.io.*; +import java.util.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.cache.GridCacheAtomicWriteOrderMode.*; +import static org.apache.ignite.cache.GridCacheAtomicityMode.*; +import static org.apache.ignite.cache.GridCacheMode.*; +import static org.apache.ignite.cache.GridCacheDistributionMode.*; +import static org.apache.ignite.cache.GridCachePreloadMode.*; +import static org.apache.ignite.cache.GridCacheWriteSynchronizationMode.*; + +/** + * Tests cache in-place modification logic with iterative value increment. + */ +public class GridCacheIncrementTransformTest extends GridCommonAbstractTest { + /** IP finder. */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** Number of nodes to test on. */ + private static final int GRID_CNT = 4; + + /** Number of increment iterations. */ + private static final int NUM_ITERS = 5000; + + /** Helper for excluding stopped node from iteration logic. */ + private AtomicReferenceArray<Ignite> grids; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + CacheConfiguration cache = new CacheConfiguration(); + + cache.setCacheMode(PARTITIONED); + cache.setAtomicityMode(ATOMIC); + cache.setDistributionMode(PARTITIONED_ONLY); + cache.setAtomicWriteOrderMode(PRIMARY); + cache.setWriteSynchronizationMode(FULL_SYNC); + cache.setBackups(1); + cache.setPreloadMode(SYNC); + + cfg.setCacheConfiguration(cache); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(IP_FINDER); + + cfg.setDiscoverySpi(disco); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + startGrids(GRID_CNT); + + grids = new AtomicReferenceArray<>(GRID_CNT); + + for (int i = 0; i < GRID_CNT; i++) + grids.set(i, grid(i)); + + cache(0).put("key", new TestObject(0)); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + + grids = null; + } + + /** + * @throws Exception If failed. + */ + public void testIncrement() throws Exception { + testIncrement(false); + } + + /** + * @throws Exception If failed. + */ + public void testIncrementRestart() throws Exception { + final AtomicBoolean stop = new AtomicBoolean(); + final AtomicReference<Throwable> error = new AtomicReference<>(); + + IgniteFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(new Runnable() { + @Override public void run() { + try { + Random rnd = new Random(); + + while (!stop.get()) { + int idx = -1; + + Ignite ignite = null; + + while (ignite == null) { + idx = rnd.nextInt(GRID_CNT); + + ignite = grids.getAndSet(idx, null); + } + + stopGrid(idx); + + assert grids.compareAndSet(idx, null, startGrid(idx)); + } + } + catch (Exception e) { + error.set(e); + } + } + }, 1, "restarter"); + + try { + testIncrement(true); + + assertNull(error.get()); + } + finally { + stop.set(true); + + fut.get(getTestTimeout()); + } + } + + /** + * @param restarts Whether test is running with node restarts. + * @throws Exception If failed. + */ + private void testIncrement(boolean restarts) throws Exception { + Random rnd = new Random(); + + for (int i = 0; i < NUM_ITERS; i++) { + int idx = -1; + + Ignite ignite = null; + + while (ignite == null) { + idx = rnd.nextInt(GRID_CNT); + + ignite = restarts ? grids.getAndSet(idx, null) : grid(idx); + } + + IgniteCache<String, TestObject> cache = ignite.jcache(null); + + assertNotNull(cache); + + TestObject obj = cache.get("key"); + + assertNotNull(obj); + assertEquals(i, obj.val); + + while (true) { + try { + cache.invoke("key", new Processor()); + + break; + } + catch (CachePartialUpdateException ignored) { + // Need to re-check if update actually succeeded. + TestObject updated = cache.get("key"); + + if (updated != null && updated.val == i + 1) + break; + } + } + + if (restarts) + assert grids.compareAndSet(idx, null, ignite); + } + } + + /** */ + private static class TestObject implements Serializable { + /** Value. */ + private int val; + + /** + * @param val Value. + */ + private TestObject(int val) { + this.val = val; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return "TestObject [val=" + val + ']'; + } + } + + /** */ + private static class Processor implements EntryProcessor<String, TestObject, Void>, Serializable { + /** {@inheritDoc} */ + @Override public Void process(MutableEntry<String, TestObject> e, Object... args) { + TestObject obj = e.getValue(); + + assert obj != null; + + e.setValue(new TestObject(obj.val + 1)); + + return null; + } + } +}