Added sanity test for cross cache transactions, fixed assert in IgniteTxLocalAdapter.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/abafd410 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/abafd410 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/abafd410 Branch: refs/heads/ignite-gg-9615-1 Commit: abafd4100561e5eb651e25e41a85f247ced7a67a Parents: 0399ccd Author: sboikov <sboi...@gridgain.com> Authored: Fri Aug 21 11:18:54 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Fri Aug 21 11:18:54 2015 +0300 ---------------------------------------------------------------------- .../transactions/IgniteTxLocalAdapter.java | 4 +- .../cache/CrossCacheTxRandomOperationsTest.java | 156 ++++++++++++------- .../testsuites/IgniteCacheTestSuite2.java | 1 + 3 files changed, 107 insertions(+), 54 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/abafd410/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index b354fed..a32e7b1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -2216,8 +2216,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter missedForLoad.add(cacheKey); } else { - assert !transform; - assert txEntry.op() != TRANSFORM; + assert !implicit() || !transform : this; + assert txEntry.op() != TRANSFORM : txEntry; if (retval) ret.set(cacheCtx, null, true); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/abafd410/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CrossCacheTxRandomOperationsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CrossCacheTxRandomOperationsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CrossCacheTxRandomOperationsTest.java index f3159a3..e6db0ef 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CrossCacheTxRandomOperationsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CrossCacheTxRandomOperationsTest.java @@ -17,15 +17,18 @@ package org.apache.ignite.internal.processors.cache; +import junit.framework.*; import org.apache.ignite.*; import org.apache.ignite.cache.*; import org.apache.ignite.cache.affinity.fair.*; import org.apache.ignite.cache.affinity.rendezvous.*; import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.util.lang.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.spi.discovery.tcp.*; import org.apache.ignite.spi.discovery.tcp.ipfinder.*; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.testframework.*; import org.apache.ignite.testframework.junits.common.*; import org.apache.ignite.transactions.*; import org.jetbrains.annotations.*; @@ -76,7 +79,7 @@ public class CrossCacheTxRandomOperationsTest extends GridCommonAbstractTest { @Override protected void beforeTestsStarted() throws Exception { super.beforeTestsStarted(); - startGrids(GRID_CNT); + startGridsMultiThreaded(GRID_CNT); } /** {@inheritDoc} */ @@ -89,36 +92,43 @@ public class CrossCacheTxRandomOperationsTest extends GridCommonAbstractTest { /** * @throws Exception If failed. */ + public void testTxOperations() throws Exception { + txOperations(PARTITIONED, FULL_SYNC, false, false); + } + + /** + * @throws Exception If failed. + */ public void testCrossCacheTxOperations() throws Exception { - txOperations(PARTITIONED, FULL_SYNC, false); + txOperations(PARTITIONED, FULL_SYNC, true, false); } /** * @throws Exception If failed. */ public void testCrossCacheTxOperationsPrimarySync() throws Exception { - txOperations(PARTITIONED, PRIMARY_SYNC, false); + txOperations(PARTITIONED, PRIMARY_SYNC, true, false); } /** * @throws Exception If failed. */ - public void testCrossCacheTxOperationsFairAffinity() throws Exception { - txOperations(PARTITIONED, FULL_SYNC, true); + public void _testCrossCacheTxOperationsFairAffinity() throws Exception { + txOperations(PARTITIONED, FULL_SYNC, true, true); } /** * @throws Exception If failed. */ public void testCrossCacheTxOperationsReplicated() throws Exception { - txOperations(REPLICATED, FULL_SYNC, false); + txOperations(REPLICATED, FULL_SYNC, true, false); } /** * @throws Exception If failed. */ public void testCrossCacheTxOperationsReplicatedPrimarySync() throws Exception { - txOperations(REPLICATED, PRIMARY_SYNC, false); + txOperations(REPLICATED, PRIMARY_SYNC, true, false); } /** @@ -150,11 +160,13 @@ public class CrossCacheTxRandomOperationsTest extends GridCommonAbstractTest { /** * @param cacheMode Cache mode. * @param writeSync Write synchronization mode. + * @param crossCacheTx If {@code true} uses cross cache transaction. * @param fairAff If {@code true} uses {@link FairAffinityFunction}, otherwise {@link RendezvousAffinityFunction}. * @throws Exception If failed. */ private void txOperations(CacheMode cacheMode, CacheWriteSynchronizationMode writeSync, + boolean crossCacheTx, boolean fairAff) throws Exception { Ignite ignite = ignite(0); @@ -162,11 +174,11 @@ public class CrossCacheTxRandomOperationsTest extends GridCommonAbstractTest { ignite.createCache(cacheConfiguration(CACHE1, cacheMode, writeSync, fairAff)); ignite.createCache(cacheConfiguration(CACHE2, cacheMode, writeSync, fairAff)); - txOperations(PESSIMISTIC, REPEATABLE_READ, false); - txOperations(PESSIMISTIC, REPEATABLE_READ, true); + txOperations(PESSIMISTIC, REPEATABLE_READ, crossCacheTx, false); + txOperations(PESSIMISTIC, REPEATABLE_READ, crossCacheTx, true); - txOperations(OPTIMISTIC, REPEATABLE_READ, false); - txOperations(OPTIMISTIC, REPEATABLE_READ, true); + txOperations(OPTIMISTIC, REPEATABLE_READ, crossCacheTx, false); + txOperations(OPTIMISTIC, REPEATABLE_READ, crossCacheTx, true); } finally { ignite.destroyCache(CACHE1); @@ -177,13 +189,16 @@ public class CrossCacheTxRandomOperationsTest extends GridCommonAbstractTest { /** * @param concurrency Transaction concurrency. * @param isolation Transaction isolation. + * @param crossCacheTx If {@code true} uses cross cache transaction. * @param client If {@code true} uses client node. + * @throws Exception If failed. */ private void txOperations(TransactionConcurrency concurrency, TransactionIsolation isolation, - boolean client) { - Map<TestKey, TestValue> expData1 = new HashMap<>(); - Map<TestKey, TestValue> expData2 = new HashMap<>(); + boolean crossCacheTx, + boolean client) throws Exception { + final Map<TestKey, TestValue> expData1 = new HashMap<>(); + final Map<TestKey, TestValue> expData2 = new HashMap<>(); Ignite ignite = client ? ignite(GRID_CNT - 1) : ignite(0); @@ -196,52 +211,81 @@ public class CrossCacheTxRandomOperationsTest extends GridCommonAbstractTest { assertNotNull(cache2); assertNotSame(cache1, cache2); - ThreadLocalRandom rnd = ThreadLocalRandom.current(); + try { + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + long seed = System.currentTimeMillis(); - long seed = System.currentTimeMillis(); + log.info("Test tx operations [concurrency=" + concurrency + + ", isolation=" + isolation + + ", client=" + client + + ", seed=" + seed + ']'); - log.info("Test tx operations [concurrency=" + concurrency + - ", isolation=" + isolation + - ", client=" + client + - ", seed=" + seed + ']'); + IgniteTransactions txs = ignite.transactions(); - IgniteTransactions txs = ignite.transactions(); + final List<TestKey> keys = new ArrayList<>(); - List<TestKey> keys = new ArrayList<>(); + for (int i = 0; i < KEY_RANGE; i++) + keys.add(new TestKey(i)); - for (int i = 0; i < KEY_RANGE; i++) - keys.add(new TestKey(i)); + for (int i = 0; i < 10_000; i++) { + if (i % 100 == 0) + log.info("Iteration: " + i); - for (int i = 0; i < 10_000; i++) { - if (i % 100 == 0) - log.info("Iteration: " + i); + boolean rollback = i % 10 == 0; - boolean rollback = i % 10 == 0; + try (Transaction tx = txs.txStart(concurrency, isolation)) { + cacheOperation(expData1, rnd, cache1, concurrency == OPTIMISTIC, rollback); - try (Transaction tx = txs.txStart(concurrency, isolation)) { - cacheOperation(expData1, rnd, cache1, rollback); - cacheOperation(expData2, rnd, cache2, rollback); + if (crossCacheTx) + cacheOperation(expData2, rnd, cache2, concurrency == OPTIMISTIC, rollback); - if (rollback) - tx.rollback(); - else - tx.commit(); + if (rollback) + tx.rollback(); + else + tx.commit(); + } } - } - List<IgniteCache<TestKey, TestValue>> caches1 = new ArrayList<>(); - List<IgniteCache<TestKey, TestValue>> caches2 = new ArrayList<>(); + final List<IgniteCache<TestKey, TestValue>> caches1 = new ArrayList<>(); + final List<IgniteCache<TestKey, TestValue>> caches2 = new ArrayList<>(); - for (int i = 0; i < GRID_CNT; i++) { - caches1.add(ignite(i).<TestKey, TestValue>cache(CACHE1)); - caches2.add(ignite(i).<TestKey, TestValue>cache(CACHE2)); - } + for (int i = 0; i < GRID_CNT; i++) { + caches1.add(ignite(i).<TestKey, TestValue>cache(CACHE1)); + caches2.add(ignite(i).<TestKey, TestValue>cache(CACHE2)); + } - checkData(caches1, keys, expData1); - checkData(caches2, keys, expData2); + CacheConfiguration ccfg = cache1.getConfiguration(CacheConfiguration.class); - cache1.removeAll(); - cache2.removeAll(); + if (ccfg.getWriteSynchronizationMode() == FULL_SYNC) { + checkData(caches1, keys, expData1); + checkData(caches2, keys, expData2); + } + else { + boolean pass = GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + try { + checkData(caches1, keys, expData1); + checkData(caches2, keys, expData2); + } + catch (AssertionFailedError e) { + log.info("Data check failed, will retry."); + } + + return true; + } + }, 5000); + + if (!pass) { + checkData(caches1, keys, expData1); + checkData(caches2, keys, expData2); + } + } + } + finally { + cache1.removeAll(); + cache2.removeAll(); + } } /** @@ -265,12 +309,14 @@ public class CrossCacheTxRandomOperationsTest extends GridCommonAbstractTest { * @param expData Expected cache data. * @param rnd Random. * @param cache Cache. + * @param optimistic {@code True} if test uses optimistic transaction. * @param willRollback {@code True} if will rollback transaction. */ private void cacheOperation( Map<TestKey, TestValue> expData, ThreadLocalRandom rnd, IgniteCache<TestKey, TestValue> cache, + boolean optimistic, boolean willRollback) { TestKey key = key(rnd); TestValue val = new TestValue(rnd.nextLong()); @@ -290,7 +336,8 @@ public class CrossCacheTxRandomOperationsTest extends GridCommonAbstractTest { TestValue expOld = expData.get(key); - assertEquals(expOld, oldVal); + if (!optimistic) + assertEquals(expOld, oldVal); if (!willRollback) expData.put(key, val); @@ -301,7 +348,8 @@ public class CrossCacheTxRandomOperationsTest extends GridCommonAbstractTest { case 2: { boolean rmv = cache.remove(key); - assertEquals(expData.containsKey(key), rmv); + if (!optimistic) + assertEquals(expData.containsKey(key), rmv); if (!willRollback) expData.remove(key); @@ -314,7 +362,8 @@ public class CrossCacheTxRandomOperationsTest extends GridCommonAbstractTest { TestValue expOld = expData.get(key); - assertEquals(expOld, oldVal); + if (!optimistic) + assertEquals(expOld, oldVal); if (!willRollback) expData.remove(key); @@ -327,7 +376,8 @@ public class CrossCacheTxRandomOperationsTest extends GridCommonAbstractTest { boolean expPut = !expData.containsKey(key); - assertEquals(expPut, put); + if (!optimistic) + assertEquals(expPut, put); if (expPut && !willRollback) expData.put(key, val); @@ -339,7 +389,8 @@ public class CrossCacheTxRandomOperationsTest extends GridCommonAbstractTest { TestValue oldVal = cache.invoke(key, new TestEntryProcessor(val.value())); TestValue expOld = expData.get(key); - assertEquals(expOld, oldVal); + if (!optimistic) + assertEquals(expOld, oldVal); if (!willRollback) expData.put(key, val); @@ -351,7 +402,8 @@ public class CrossCacheTxRandomOperationsTest extends GridCommonAbstractTest { TestValue oldVal = cache.invoke(key, new TestEntryProcessor(null)); TestValue expOld = expData.get(key); - assertEquals(expOld, oldVal); + if (!optimistic) + assertEquals(expOld, oldVal); break; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/abafd410/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java index cb17501..4926590 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java @@ -143,6 +143,7 @@ public class IgniteCacheTestSuite2 extends TestSuite { suite.addTest(new TestSuite(IgniteCacheEntryProcessorNodeJoinTest.class)); suite.addTest(new TestSuite(IgniteAtomicCacheEntryProcessorNodeJoinTest.class)); suite.addTest(new TestSuite(GridCacheNearTxForceKeyTest.class)); + suite.addTest(new TestSuite(CrossCacheTxRandomOperationsTest.class)); return suite; }