http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3008d2ad/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxAbstractTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxAbstractTest.java new file mode 100644 index 0000000..58c3333 --- /dev/null +++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxAbstractTest.java @@ -0,0 +1,484 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.gridgain.grid.kernal.processors.cache; + +import org.apache.ignite.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.transactions.*; +import org.gridgain.grid.cache.*; +import org.gridgain.grid.cache.affinity.*; +import org.gridgain.grid.kernal.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.gridgain.grid.util.typedef.*; +import org.gridgain.grid.util.typedef.internal.*; +import org.gridgain.testframework.junits.common.*; + +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.transactions.GridCacheTxConcurrency.*; +import static org.apache.ignite.transactions.GridCacheTxIsolation.*; + +/** + * Tests for local transactions. + */ +@SuppressWarnings( {"BusyWait"}) +abstract class IgniteTxAbstractTest extends GridCommonAbstractTest { + /** Random number generator. */ + private static final Random RAND = new Random(); + + /** Execution count. */ + private static final AtomicInteger cntr = new AtomicInteger(); + + /** */ + private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** + * Start grid by default. + */ + protected IgniteTxAbstractTest() { + super(false /*start grid. */); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration c = super.getConfiguration(gridName); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(ipFinder); + + c.setDiscoverySpi(disco); + + return c; + } + + /** + * @return Grid count. + */ + protected abstract int gridCount(); + + /** + * @return Key count. + */ + protected abstract int keyCount(); + + /** + * @return Maximum key value. + */ + protected abstract int maxKeyValue(); + + /** + * @return Thread iterations. + */ + protected abstract int iterations(); + + /** + * @return True if in-test logging is enabled. + */ + protected abstract boolean isTestDebug(); + + /** + * @return {@code True} if memory stats should be printed. + */ + protected abstract boolean printMemoryStats(); + + /** {@inheritDoc} */ + private void debug(String msg) { + if (isTestDebug()) + info(msg); + } + + /** + * @throws Exception If failed. + */ + @Override protected void beforeTestsStarted() throws Exception { + for (int i = 0; i < gridCount(); i++) + startGrid(i); + } + + /** + * @throws Exception If failed. + */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + } + + /** + * @param i Grid index. + * @return Cache. + */ + @SuppressWarnings("unchecked") + @Override protected GridCache<Integer, String> cache(int i) { + return grid(i).cache(null); + } + + /** + * @return Keys. + */ + protected Iterable<Integer> getKeys() { + List<Integer> keys = new ArrayList<>(keyCount()); + + for (int i = 0; i < keyCount(); i++) + keys.add(RAND.nextInt(maxKeyValue()) + 1); + + Collections.sort(keys); + + return Collections.unmodifiableList(keys); + } + + /** + * @return Random cache operation. + */ + protected OP getOp() { + switch (RAND.nextInt(3)) { + case 0: { return OP.READ; } + case 1: { return OP.WRITE; } + case 2: { return OP.REMOVE; } + + // Should never be reached. + default: { assert false; return null; } + } + } + + /** + * @param concurrency Concurrency. + * @param isolation Isolation. + * @throws Exception If check failed. + */ + protected void checkCommit(GridCacheTxConcurrency concurrency, GridCacheTxIsolation isolation) throws Exception { + int gridIdx = RAND.nextInt(gridCount()); + + Ignite ignite = grid(gridIdx); + + if (isTestDebug()) + debug("Checking commit on grid: " + ignite.cluster().localNode().id()); + + for (int i = 0; i < iterations(); i++) { + GridCache<Integer, String> cache = cache(gridIdx); + + IgniteTx tx = cache.txStart(concurrency, isolation, 0, 0); + + try { + int prevKey = -1; + + for (Integer key : getKeys()) { + // Make sure we have the same locking order for all concurrent transactions. + assert key >= prevKey : "key: " + key + ", prevKey: " + prevKey; + + if (isTestDebug()) { + GridCacheAffinityFunction aff = cache.configuration().getAffinity(); + + int part = aff.partition(key); + + debug("Key affinity [key=" + key + ", partition=" + part + ", affinity=" + + U.toShortString(cache.affinity().mapPartitionToPrimaryAndBackups(part)) + ']'); + } + + String val = Integer.toString(key); + + switch (getOp()) { + case READ: { + if (isTestDebug()) + debug("Reading key [key=" + key + ", i=" + i + ']'); + + val = cache.get(key); + + if (isTestDebug()) + debug("Read value for key [key=" + key + ", val=" + val + ']'); + + break; + } + + case WRITE: { + if (isTestDebug()) + debug("Writing key and value [key=" + key + ", val=" + val + ", i=" + i + ']'); + + cache.put(key, val); + + break; + } + + case REMOVE: { + if (isTestDebug()) + debug("Removing key [key=" + key + ", i=" + i + ']'); + + cache.remove(key); + + break; + } + + default: { assert false; } + } + } + + tx.commit(); + + if (isTestDebug()) + debug("Committed transaction [i=" + i + ", tx=" + tx + ']'); + } + catch (GridCacheTxOptimisticException e) { + if (concurrency != OPTIMISTIC || isolation != SERIALIZABLE) { + error("Received invalid optimistic failure.", e); + + throw e; + } + + if (isTestDebug()) + info("Optimistic transaction failure (will rollback) [i=" + i + ", msg=" + e.getMessage() + + ", tx=" + tx.xid() + ']'); + + try { + tx.rollback(); + } + catch (IgniteCheckedException ex) { + error("Failed to rollback optimistic failure: " + tx, ex); + + throw ex; + } + } + catch (Exception e) { + error("Transaction failed (will rollback): " + tx, e); + + tx.rollback(); + + throw e; + } + catch (Error e) { + error("Error when executing transaction (will rollback): " + tx, e); + + tx.rollback(); + + throw e; + } + finally { + IgniteTx t = cache.tx(); + + assert t == null : "Thread should not have transaction upon completion ['t==tx'=" + (t == tx) + + ", t=" + t + (t != tx ? "tx=" + tx : "tx=''") + ']'; + } + } + + if (printMemoryStats()) { + if (cntr.getAndIncrement() % 100 == 0) + // Print transaction memory stats. + ((GridKernal)grid(gridIdx)).internalCache().context().tm().printMemoryStats(); + } + } + + /** + * @param concurrency Concurrency. + * @param isolation Isolation. + * @throws IgniteCheckedException If check failed. + */ + protected void checkRollback(GridCacheTxConcurrency concurrency, GridCacheTxIsolation isolation) + throws Exception { + checkRollback(new ConcurrentHashMap<Integer, String>(), concurrency, isolation); + } + + /** + * @param map Map to check. + * @param concurrency Concurrency. + * @param isolation Isolation. + * @throws IgniteCheckedException If check failed. + */ + protected void checkRollback(ConcurrentMap<Integer, String> map, GridCacheTxConcurrency concurrency, + GridCacheTxIsolation isolation) throws Exception { + int gridIdx = RAND.nextInt(gridCount()); + + Ignite ignite = grid(gridIdx); + + if (isTestDebug()) + debug("Checking commit on grid: " + ignite.cluster().localNode().id()); + + for (int i = 0; i < iterations(); i++) { + GridCache<Integer, String> cache = cache(gridIdx); + + IgniteTx tx = cache.txStart(concurrency, isolation, 0, 0); + + try { + for (Integer key : getKeys()) { + if (isTestDebug()) { + GridCacheAffinityFunction aff = cache.configuration().getAffinity(); + + int part = aff.partition(key); + + debug("Key affinity [key=" + key + ", partition=" + part + ", affinity=" + + U.toShortString(cache.affinity().mapPartitionToPrimaryAndBackups(part)) + ']'); + } + + String val = Integer.toString(key); + + switch (getOp()) { + case READ: { + debug("Reading key: " + key); + + checkMap(map, key, cache.get(key)); + + break; + } + + case WRITE: { + debug("Writing key and value [key=" + key + ", val=" + val + ']'); + + checkMap(map, key, cache.put(key, val)); + + break; + } + + case REMOVE: { + debug("Removing key: " + key); + + checkMap(map, key, cache.remove(key)); + + break; + } + + default: { assert false; } + } + } + + tx.rollback(); + + debug("Rolled back transaction: " + tx); + } + catch (GridCacheTxOptimisticException e) { + tx.rollback(); + + log.warning("Rolled back transaction due to optimistic exception [tx=" + tx + ", e=" + e + ']'); + + throw e; + } + catch (Exception e) { + tx.rollback(); + + error("Rolled back transaction due to exception [tx=" + tx + ", e=" + e + ']'); + + throw e; + } + finally { + IgniteTx t1 = cache.tx(); + + debug("t1=" + t1); + + assert t1 == null : "Thread should not have transaction upon completion ['t==tx'=" + (t1 == tx) + + ", t=" + t1 + ']'; + } + } + } + + /** + * @param map Map to check against. + * @param key Key. + * @param val Value. + */ + private void checkMap(ConcurrentMap<Integer, String> map, Integer key, String val) { + if (val != null) { + String v = map.putIfAbsent(key, val); + + assert v == null || v.equals(val); + } + } + + /** + * Checks integrity of all caches after tests. + * + * @throws IgniteCheckedException If check failed. + */ + @SuppressWarnings({"ErrorNotRethrown"}) + protected void finalChecks() throws Exception { + for (int i = 1; i <= maxKeyValue(); i++) { + for (int k = 0; k < 3; k++) { + try { + GridCacheEntry<Integer, String> e1 = null; + + String v1 = null; + + for (int j = 0; j < gridCount(); j++) { + GridCache<Integer, String> cache = cache(j); + + IgniteTx tx = cache.tx(); + + assertNull("Transaction is not completed: " + tx, tx); + + if (j == 0) { + e1 = cache.entry(i); + + v1 = e1.get(); + } + else { + GridCacheEntry<Integer, String> e2 = cache.entry(i); + + String v2 = e2.get(); + + if (!F.eq(v2, v1)) { + v1 = e1.get(); + v2 = e2.get(); + } + + assert F.eq(v2, v1) : + "Invalid cached value [key=" + i + ", v1=" + v1 + ", v2=" + v2 + ", e1=" + e1 + + ", e2=" + e2 + ", grid=" + j + ']'; + } + } + + break; + } + catch (AssertionError e) { + if (k == 2) + throw e; + else + // Wait for transactions to complete. + Thread.sleep(500); + } + } + } + + for (int i = 1; i <= maxKeyValue(); i++) { + for (int k = 0; k < 3; k++) { + try { + for (int j = 0; j < gridCount(); j++) { + GridCacheProjection<Integer, String> cache = cache(j); + + cache.removeAll(); + +// assert cache.keySet().isEmpty() : "Cache is not empty: " + cache.entrySet(); + } + + break; + } + catch (AssertionError e) { + if (k == 2) + throw e; + else + // Wait for transactions to complete. + Thread.sleep(500); + } + } + } + } + + /** + * Cache operation. + */ + protected enum OP { + /** Cache read. */ + READ, + + /** Cache write. */ + WRITE, + + /** Cache remove. */ + REMOVE + } +}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3008d2ad/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxConcurrentGetAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxConcurrentGetAbstractTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxConcurrentGetAbstractTest.java new file mode 100644 index 0000000..9548cd0 --- /dev/null +++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxConcurrentGetAbstractTest.java @@ -0,0 +1,134 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.gridgain.grid.kernal.processors.cache; + +import org.apache.ignite.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.transactions.*; +import org.gridgain.grid.kernal.*; +import org.gridgain.grid.kernal.processors.cache.distributed.dht.*; +import org.gridgain.grid.kernal.processors.cache.distributed.near.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.gridgain.testframework.junits.common.*; + +import java.util.*; +import java.util.concurrent.*; + +import static org.apache.ignite.transactions.GridCacheTxConcurrency.*; +import static org.apache.ignite.transactions.GridCacheTxIsolation.*; + +/** + * Checks multithreaded put/get cache operations on one node. + */ +public abstract class IgniteTxConcurrentGetAbstractTest extends GridCommonAbstractTest { + /** Debug flag. */ + private static final boolean DEBUG = false; + + /** */ + protected static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static final int THREAD_NUM = 20; + + /** + * Default constructor. + * + */ + protected IgniteTxConcurrentGetAbstractTest() { + super(true /** Start grid. */); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + TcpDiscoverySpi spi = new TcpDiscoverySpi(); + + spi.setIpFinder(ipFinder); + + cfg.setDiscoverySpi(spi); + + return cfg; + } + + /** + * @param g Grid. + * @return Near cache. + */ + GridNearCacheAdapter<String, Integer> near(Ignite g) { + return (GridNearCacheAdapter<String, Integer>)((GridKernal)g).<String, Integer>internalCache(); + } + + /** + * @param g Grid. + * @return DHT cache. + */ + GridDhtCacheAdapter<String, Integer> dht(Ignite g) { + return near(g).dht(); + } + + /** + * JUnit. + * + * @throws Exception If failed. + */ + public void testPutGet() throws Exception { + // Random key. + final String key = UUID.randomUUID().toString(); + + final Ignite ignite = grid(); + + ignite.cache(null).put(key, "val"); + + GridCacheEntryEx<String,Integer> dhtEntry = dht(ignite).peekEx(key); + + if (DEBUG) + info("DHT entry [hash=" + System.identityHashCode(dhtEntry) + ", entry=" + dhtEntry + ']'); + + String val = txGet(ignite, key); + + assertNotNull(val); + + info("Starting threads: " + THREAD_NUM); + + multithreaded(new Callable<String>() { + @Override public String call() throws Exception { + return txGet(ignite, key); + } + }, THREAD_NUM, "getter-thread"); + } + + /** + * @param ignite Grid. + * @param key Key. + * @return Value. + * @throws Exception If failed. + */ + private String txGet(Ignite ignite, String key) throws Exception { + try (IgniteTx tx = ignite.cache(null).txStart(PESSIMISTIC, REPEATABLE_READ)) { + GridCacheEntryEx<String, Integer> dhtEntry = dht(ignite).peekEx(key); + + if (DEBUG) + info("DHT entry [hash=" + System.identityHashCode(dhtEntry) + ", xid=" + tx.xid() + + ", entry=" + dhtEntry + ']'); + + String val = ignite.<String, String>cache(null).get(key); + + assertNotNull(val); + assertEquals("val", val); + + tx.commit(); + + return val; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3008d2ad/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxExceptionAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxExceptionAbstractSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxExceptionAbstractSelfTest.java new file mode 100644 index 0000000..7b39975 --- /dev/null +++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxExceptionAbstractSelfTest.java @@ -0,0 +1,631 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.gridgain.grid.kernal.processors.cache; + +import org.apache.ignite.cluster.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.spi.*; +import org.apache.ignite.spi.indexing.*; +import org.apache.ignite.transactions.*; +import org.gridgain.grid.cache.*; +import org.gridgain.grid.kernal.*; +import org.gridgain.grid.kernal.processors.cache.distributed.near.*; +import org.gridgain.testframework.*; +import org.jetbrains.annotations.*; + +import java.util.*; +import java.util.concurrent.*; + +import static org.gridgain.grid.cache.GridCacheMode.*; + +/** + * Tests that transaction is invalidated in case of {@link GridCacheTxHeuristicException}. + */ +public abstract class IgniteTxExceptionAbstractSelfTest extends GridCacheAbstractSelfTest { + /** Index SPI throwing exception. */ + private static TestIndexingSpi idxSpi = new TestIndexingSpi(); + + /** */ + private static final int PRIMARY = 0; + + /** */ + private static final int BACKUP = 1; + + /** */ + private static final int NOT_PRIMARY_AND_BACKUP = 2; + + /** */ + private static Integer lastKey; + + /** {@inheritDoc} */ + @Override protected int gridCount() { + return 3; + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setIndexingSpi(idxSpi); + + cfg.getTransactionsConfiguration().setTxSerializableEnabled(true); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected GridCacheConfiguration cacheConfiguration(String gridName) throws Exception { + GridCacheConfiguration ccfg = super.cacheConfiguration(gridName); + + ccfg.setQueryIndexEnabled(true); + ccfg.setStore(null); + + return ccfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + lastKey = 0; + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + idxSpi.forceFail(false); + } + + /** + * @throws Exception If failed. + */ + public void testPutNear() throws Exception { + checkPut(true, keyForNode(grid(0).localNode(), NOT_PRIMARY_AND_BACKUP)); + + checkPut(false, keyForNode(grid(0).localNode(), NOT_PRIMARY_AND_BACKUP)); + } + + /** + * @throws Exception If failed. + */ + public void testPutPrimary() throws Exception { + checkPut(true, keyForNode(grid(0).localNode(), PRIMARY)); + + checkPut(false, keyForNode(grid(0).localNode(), PRIMARY)); + } + + /** + * @throws Exception If failed. + */ + public void testPutBackup() throws Exception { + checkPut(true, keyForNode(grid(0).localNode(), BACKUP)); + + checkPut(false, keyForNode(grid(0).localNode(), BACKUP)); + } + + /** + * @throws Exception If failed. + */ + public void testPutAll() throws Exception { + checkPutAll(true, keyForNode(grid(0).localNode(), PRIMARY), + keyForNode(grid(0).localNode(), PRIMARY), + keyForNode(grid(0).localNode(), PRIMARY)); + + checkPutAll(false, keyForNode(grid(0).localNode(), PRIMARY), + keyForNode(grid(0).localNode(), PRIMARY), + keyForNode(grid(0).localNode(), PRIMARY)); + + if (gridCount() > 1) { + checkPutAll(true, keyForNode(grid(1).localNode(), PRIMARY), + keyForNode(grid(1).localNode(), PRIMARY), + keyForNode(grid(1).localNode(), PRIMARY)); + + checkPutAll(false, keyForNode(grid(1).localNode(), PRIMARY), + keyForNode(grid(1).localNode(), PRIMARY), + keyForNode(grid(1).localNode(), PRIMARY)); + } + } + + /** + * @throws Exception If failed. + */ + public void testRemoveNear() throws Exception { + checkRemove(false, keyForNode(grid(0).localNode(), NOT_PRIMARY_AND_BACKUP)); + + checkRemove(true, keyForNode(grid(0).localNode(), NOT_PRIMARY_AND_BACKUP)); + } + + /** + * @throws Exception If failed. + */ + public void testRemovePrimary() throws Exception { + checkRemove(false, keyForNode(grid(0).localNode(), PRIMARY)); + + checkRemove(true, keyForNode(grid(0).localNode(), PRIMARY)); + } + + /** + * @throws Exception If failed. + */ + public void testRemoveBackup() throws Exception { + checkRemove(false, keyForNode(grid(0).localNode(), BACKUP)); + + checkRemove(true, keyForNode(grid(0).localNode(), BACKUP)); + } + + /** + * @throws Exception If failed. + */ + public void testTransformNear() throws Exception { + checkTransform(false, keyForNode(grid(0).localNode(), NOT_PRIMARY_AND_BACKUP)); + + checkTransform(true, keyForNode(grid(0).localNode(), NOT_PRIMARY_AND_BACKUP)); + } + + /** + * @throws Exception If failed. + */ + public void testTransformPrimary() throws Exception { + checkTransform(false, keyForNode(grid(0).localNode(), PRIMARY)); + + checkTransform(true, keyForNode(grid(0).localNode(), PRIMARY)); + } + + /** + * @throws Exception If failed. + */ + public void testTransformBackup() throws Exception { + checkTransform(false, keyForNode(grid(0).localNode(), BACKUP)); + + checkTransform(true, keyForNode(grid(0).localNode(), BACKUP)); + } + + /** + * @throws Exception If failed. + */ + public void testPutNearTx() throws Exception { + for (GridCacheTxConcurrency concurrency : GridCacheTxConcurrency.values()) { + for (GridCacheTxIsolation isolation : GridCacheTxIsolation.values()) { + checkPutTx(true, concurrency, isolation, keyForNode(grid(0).localNode(), NOT_PRIMARY_AND_BACKUP)); + + checkPutTx(false, concurrency, isolation, keyForNode(grid(0).localNode(), NOT_PRIMARY_AND_BACKUP)); + } + } + } + + /** + * @throws Exception If failed. + */ + public void testPutPrimaryTx() throws Exception { + for (GridCacheTxConcurrency concurrency : GridCacheTxConcurrency.values()) { + for (GridCacheTxIsolation isolation : GridCacheTxIsolation.values()) { + checkPutTx(true, concurrency, isolation, keyForNode(grid(0).localNode(), PRIMARY)); + + checkPutTx(false, concurrency, isolation, keyForNode(grid(0).localNode(), PRIMARY)); + } + } + } + + /** + * @throws Exception If failed. + */ + public void testPutBackupTx() throws Exception { + for (GridCacheTxConcurrency concurrency : GridCacheTxConcurrency.values()) { + for (GridCacheTxIsolation isolation : GridCacheTxIsolation.values()) { + checkPutTx(true, concurrency, isolation, keyForNode(grid(0).localNode(), BACKUP)); + + checkPutTx(false, concurrency, isolation, keyForNode(grid(0).localNode(), BACKUP)); + } + } + } + + /** + * @throws Exception If failed. + */ + public void testPutMultipleKeysTx() throws Exception { + for (GridCacheTxConcurrency concurrency : GridCacheTxConcurrency.values()) { + for (GridCacheTxIsolation isolation : GridCacheTxIsolation.values()) { + checkPutTx(true, concurrency, isolation, + keyForNode(grid(0).localNode(), PRIMARY), + keyForNode(grid(0).localNode(), PRIMARY), + keyForNode(grid(0).localNode(), PRIMARY)); + + checkPutTx(false, concurrency, isolation, + keyForNode(grid(0).localNode(), PRIMARY), + keyForNode(grid(0).localNode(), PRIMARY), + keyForNode(grid(0).localNode(), PRIMARY)); + + if (gridCount() > 1) { + checkPutTx(true, concurrency, isolation, + keyForNode(grid(1).localNode(), PRIMARY), + keyForNode(grid(1).localNode(), PRIMARY), + keyForNode(grid(1).localNode(), PRIMARY)); + + checkPutTx(false, concurrency, isolation, + keyForNode(grid(1).localNode(), PRIMARY), + keyForNode(grid(1).localNode(), PRIMARY), + keyForNode(grid(1).localNode(), PRIMARY)); + } + } + } + } + + /** + * @param putBefore If {@code true} then puts some value before executing failing operation. + * @param keys Keys. + * @param concurrency Transaction concurrency. + * @param isolation Transaction isolation. + * @throws Exception If failed. + */ + private void checkPutTx(boolean putBefore, GridCacheTxConcurrency concurrency, + GridCacheTxIsolation isolation, final Integer... keys) throws Exception { + assertTrue(keys.length > 0); + + info("Test transaction [concurrency=" + concurrency + ", isolation=" + isolation + ']'); + + GridCache<Integer, Integer> cache = grid(0).cache(null); + + if (putBefore) { + idxSpi.forceFail(false); + + info("Start transaction."); + + try (IgniteTx tx = cache.txStart(concurrency, isolation)) { + for (Integer key : keys) { + info("Put " + key); + + cache.put(key, 1); + } + + info("Commit."); + + tx.commit(); + } + } + + // Execute get from all nodes to create readers for near cache. + for (int i = 0; i < gridCount(); i++) { + for (Integer key : keys) + grid(i).cache(null).get(key); + } + + idxSpi.forceFail(true); + + try { + info("Start transaction."); + + try (IgniteTx tx = cache.txStart(concurrency, isolation)) { + for (Integer key : keys) { + info("Put " + key); + + cache.put(key, 2); + } + + info("Commit."); + + tx.commit(); + } + + fail("Transaction should fail."); + } + catch (GridCacheTxHeuristicException e) { + log.info("Expected exception: " + e); + } + + for (Integer key : keys) + checkEmpty(key); + } + + /** + * @param key Key. + * @throws Exception If failed. + */ + private void checkEmpty(final Integer key) throws Exception { + idxSpi.forceFail(false); + + info("Check key: " + key); + + for (int i = 0; i < gridCount(); i++) { + GridKernal grid = (GridKernal) grid(i); + + GridCacheAdapter cache = grid.internalCache(null); + + GridCacheMapEntry entry = cache.map().getEntry(key); + + log.info("Entry: " + entry); + + if (entry != null) { + assertFalse("Unexpected entry for grid [i=" + i + ", entry=" + entry + ']', entry.lockedByAny()); + assertFalse("Unexpected entry for grid [i=" + i + ", entry=" + entry + ']', entry.hasValue()); + } + + if (cache.isNear()) { + entry = ((GridNearCacheAdapter)cache).dht().map().getEntry(key); + + log.info("Dht entry: " + entry); + + if (entry != null) { + assertFalse("Unexpected entry for grid [i=" + i + ", entry=" + entry + ']', entry.lockedByAny()); + assertFalse("Unexpected entry for grid [i=" + i + ", entry=" + entry + ']', entry.hasValue()); + } + } + } + + for (int i = 0; i < gridCount(); i++) + assertEquals("Unexpected value for grid " + i, null, grid(i).cache(null).get(key)); + } + + /** + * @param putBefore If {@code true} then puts some value before executing failing operation. + * @param key Key. + * @throws Exception If failed. + */ + private void checkPut(boolean putBefore, final Integer key) throws Exception { + if (putBefore) { + idxSpi.forceFail(false); + + info("Put key: " + key); + + grid(0).cache(null).put(key, 1); + } + + // Execute get from all nodes to create readers for near cache. + for (int i = 0; i < gridCount(); i++) + grid(i).cache(null).get(key); + + idxSpi.forceFail(true); + + info("Going to put: " + key); + + GridTestUtils.assertThrows(log, new Callable<Void>() { + @Override public Void call() throws Exception { + grid(0).cache(null).put(key, 2); + + return null; + } + }, GridCacheTxHeuristicException.class, null); + + checkEmpty(key); + } + + /** + * @param putBefore If {@code true} then puts some value before executing failing operation. + * @param key Key. + * @throws Exception If failed. + */ + private void checkTransform(boolean putBefore, final Integer key) throws Exception { + if (putBefore) { + idxSpi.forceFail(false); + + info("Put key: " + key); + + grid(0).cache(null).put(key, 1); + } + + // Execute get from all nodes to create readers for near cache. + for (int i = 0; i < gridCount(); i++) + grid(i).cache(null).get(key); + + idxSpi.forceFail(true); + + info("Going to transform: " + key); + + GridTestUtils.assertThrows(log, new Callable<Void>() { + @Override public Void call() throws Exception { + grid(0).cache(null).transform(key, new IgniteClosure<Object, Object>() { + @Override public Object apply(Object o) { + return 2; + } + }); + + return null; + } + }, GridCacheTxHeuristicException.class, null); + + checkEmpty(key); + } + + /** + * @param putBefore If {@code true} then puts some value before executing failing operation. + * @param keys Keys. + * @throws Exception If failed. + */ + private void checkPutAll(boolean putBefore, Integer ... keys) throws Exception { + assert keys.length > 1; + + if (putBefore) { + idxSpi.forceFail(false); + + Map<Integer, Integer> m = new HashMap<>(); + + for (Integer key : keys) + m.put(key, 1); + + info("Put data: " + m); + + grid(0).cache(null).putAll(m); + } + + // Execute get from all nodes to create readers for near cache. + for (int i = 0; i < gridCount(); i++) { + for (Integer key : keys) + grid(i).cache(null).get(key); + } + + idxSpi.forceFail(true); + + final Map<Integer, Integer> m = new HashMap<>(); + + for (Integer key : keys) + m.put(key, 2); + + info("Going to putAll: " + m); + + GridTestUtils.assertThrows(log, new Callable<Void>() { + @Override public Void call() throws Exception { + grid(0).cache(null).putAll(m); + + return null; + } + }, GridCacheTxHeuristicException.class, null); + + for (Integer key : m.keySet()) + checkEmpty(key); + } + + /** + * @param putBefore If {@code true} then puts some value before executing failing operation. + * @param key Key. + * @throws Exception If failed. + */ + private void checkRemove(boolean putBefore, final Integer key) throws Exception { + if (putBefore) { + idxSpi.forceFail(false); + + info("Put key: " + key); + + grid(0).cache(null).put(key, 1); + } + + // Execute get from all nodes to create readers for near cache. + for (int i = 0; i < gridCount(); i++) + grid(i).cache(null).get(key); + + idxSpi.forceFail(true); + + info("Going to remove: " + key); + + GridTestUtils.assertThrows(log, new Callable<Void>() { + @Override public Void call() throws Exception { + grid(0).cache(null).remove(key); + + return null; + } + }, GridCacheTxHeuristicException.class, null); + + checkEmpty(key); + } + + /** + * Generates key of a given type for given node. + * + * @param node Node. + * @param type Key type. + * @return Key. + */ + private Integer keyForNode(ClusterNode node, int type) { + GridCache<Integer, Integer> cache = grid(0).cache(null); + + if (cache.configuration().getCacheMode() == LOCAL) + return ++lastKey; + + if (cache.configuration().getCacheMode() == REPLICATED && type == NOT_PRIMARY_AND_BACKUP) + return ++lastKey; + + for (int key = lastKey + 1; key < (lastKey + 10_000); key++) { + switch (type) { + case NOT_PRIMARY_AND_BACKUP: { + if (!cache.affinity().isPrimaryOrBackup(node, key)) { + lastKey = key; + + return key; + } + + break; + } + + case PRIMARY: { + if (cache.affinity().isPrimary(node, key)) { + lastKey = key; + + return key; + } + + break; + } + + case BACKUP: { + if (cache.affinity().isBackup(node, key)) { + lastKey = key; + + return key; + } + + break; + } + + default: + fail(); + } + } + + throw new IllegalStateException("Failed to find key."); + } + + /** + * Indexing SPI that can fail on demand. + */ + private static class TestIndexingSpi extends IgniteSpiAdapter implements GridIndexingSpi { + /** Fail flag. */ + private volatile boolean fail; + + /** + * @param fail Fail flag. + */ + public void forceFail(boolean fail) { + this.fail = fail; + } + + /** {@inheritDoc} */ + @Override public Iterator<?> query(@Nullable String spaceName, Collection<Object> params, @Nullable GridIndexingQueryFilter filters) throws IgniteSpiException { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public void store(@Nullable String spaceName, Object key, Object val, long expirationTime) + throws IgniteSpiException { + if (fail) { + fail = false; + + throw new IgniteSpiException("Test exception."); + } + } + + /** {@inheritDoc} */ + @Override public void remove(@Nullable String spaceName, Object k) + throws IgniteSpiException { + if (fail) { + fail = false; + + throw new IgniteSpiException("Test exception."); + } + } + + /** {@inheritDoc} */ + @Override public void onSwap(@Nullable String spaceName, Object key) throws IgniteSpiException { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void onUnswap(@Nullable String spaceName, Object key, Object val) throws IgniteSpiException { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void spiStart(@Nullable String gridName) throws IgniteSpiException { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void spiStop() throws IgniteSpiException { + // No-op. + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3008d2ad/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxMultiNodeAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxMultiNodeAbstractTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxMultiNodeAbstractTest.java new file mode 100644 index 0000000..4822742 --- /dev/null +++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxMultiNodeAbstractTest.java @@ -0,0 +1,918 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.gridgain.grid.kernal.processors.cache; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.resources.*; +import org.apache.ignite.transactions.*; +import org.gridgain.grid.cache.*; +import org.gridgain.grid.cache.affinity.*; +import org.gridgain.grid.cache.query.*; +import org.gridgain.grid.kernal.*; +import org.gridgain.grid.kernal.processors.cache.distributed.dht.*; +import org.gridgain.grid.kernal.processors.cache.distributed.near.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.gridgain.grid.util.tostring.*; +import org.gridgain.grid.util.typedef.*; +import org.gridgain.grid.util.typedef.internal.*; +import org.gridgain.testframework.junits.common.*; +import org.jetbrains.annotations.*; + +import java.util.*; +import java.util.concurrent.atomic.*; + +import static org.gridgain.grid.cache.GridCacheMode.*; +import static org.apache.ignite.transactions.GridCacheTxConcurrency.*; +import static org.apache.ignite.transactions.GridCacheTxIsolation.*; + + +/** + * Checks basic multi-node transactional operations. + */ +@SuppressWarnings({"PointlessBooleanExpression", "ConstantConditions", "PointlessArithmeticExpression"}) +public abstract class IgniteTxMultiNodeAbstractTest extends GridCommonAbstractTest { + /** Debug flag. */ + private static final boolean DEBUG = false; + + /** */ + protected static final int GRID_CNT = 4; + + /** */ + protected static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** */ + protected static final int RETRIES = 300; + + /** Log frequency. */ + private static final int LOG_FREQ = RETRIES < 100 || DEBUG ? 1 : RETRIES / 5; + + /** Counter key. */ + private static final String CNTR_KEY = "CNTR_KEY"; + + /** Removed counter key. */ + private static final String RMVD_CNTR_KEY = "RMVD_CNTR_KEY"; + + /** */ + protected static final AtomicInteger cntr = new AtomicInteger(); + + /** */ + protected static final AtomicInteger cntrRmvd = new AtomicInteger(); + + /** Number of backups for partitioned tests. */ + protected int backups = 2; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + TcpDiscoverySpi spi = new TcpDiscoverySpi(); + + spi.setIpFinder(ipFinder); + + cfg.setDiscoverySpi(spi); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + backups = 0; + + cntr.set(0); + } + + /** + * + * @param ignite Grid + * @param key Key. + * @return Primary node id. + */ + @SuppressWarnings("unchecked") + private static UUID primaryId(Ignite ignite, Object key) { + GridCacheAffinity aff = ignite.cache(null).cache().affinity(); + + Collection<ClusterNode> affNodes = aff.mapPartitionToPrimaryAndBackups(aff.partition(key)); + + ClusterNode first = F.first(affNodes); + + assert first != null; + + return first.id(); + } + + /** + * @param nodeId Node ID. + * @param key Key. + * @return DHT entry. + */ + @Nullable private static GridCacheEntryEx<Object, Integer> dhtEntry(UUID nodeId, Object key) { + Ignite g = G.ignite(nodeId); + + GridDhtCacheAdapter<Object, Integer> dht = + ((GridKernal)g).<Object, Integer>internalCache().context().near().dht(); + + return dht.peekEx(key); + } + + /** + * @param nodeId Node ID. + * @param key Key. + * @return Near entry. + */ + @Nullable private static GridCacheEntryEx<Object, Integer> nearEntry(UUID nodeId, Object key) { + Ignite g = G.ignite(nodeId); + + GridNearCacheAdapter<Object, Integer> near = ((GridKernal)g).<Object, Integer>internalCache().context().near(); + + return near.peekEx(key); + } + + /** + * + * @param putCntr Put counter to cache. + * @param ignite Grid. + * @param itemKey Item key. + * @param retry Retry count. + * @throws IgniteCheckedException If failed. + */ + @SuppressWarnings("unchecked") + private void onItemNear(boolean putCntr, Ignite ignite, String itemKey, int retry) throws IgniteCheckedException { + GridCache<String, Integer> cache = ignite.cache(null); + + UUID locId = ignite.cluster().localNode().id(); + UUID itemPrimaryId = primaryId(ignite, itemKey); + UUID cntrPrimaryId = primaryId(ignite, CNTR_KEY); + + boolean isCntrPrimary = cntrPrimaryId.equals(locId); + + try (IgniteTx tx = cache.txStart(PESSIMISTIC, REPEATABLE_READ)) { + if (DEBUG) + info("Before near get [retry=" + retry + ", xid=" + tx.xid() + ", node=" + ignite.name() + + ", isCntrPrimary=" + isCntrPrimary + ", nearId=" + locId + + ", nearEntry=" + nearEntry(locId, CNTR_KEY) + + (isCntrPrimary ? ", dhtEntry=" + dhtEntry(locId, CNTR_KEY) : "") + ']'); + + Integer cntr = cache.get(CNTR_KEY); + + int newVal = cntr + 1; + + if (putCntr) { + if (DEBUG) + info("Before near put counter [retry=" + retry + ", isCntrPrimary=" + isCntrPrimary + + ", cur=" + cntr + ", new=" + newVal + ", nearEntry=" + nearEntry(locId, CNTR_KEY) + + (isCntrPrimary ? ", dhtEntry=" + dhtEntry(locId, CNTR_KEY) : "") + ']'); + + cache.putx(CNTR_KEY, newVal); + } + + if (DEBUG) + info("Before near put item [retry=" + retry + ", key=" + itemKey + ", cur=" + cntr + ", new=" + newVal + + ", nearEntry=" + nearEntry(locId, itemKey) + ", dhtEntry=" + dhtEntry(itemPrimaryId, itemKey) + ']'); + + cache.putx(itemKey, newVal); + + if (DEBUG) + info("After near put item [retry=" + retry + ", key=" + itemKey + ", old=" + cntr + ", new=" + newVal + + ", nearEntry=" + nearEntry(locId, itemKey) + ", dhtEntry" + dhtEntry(itemPrimaryId, itemKey) + ']'); + + tx.commit(); + } + } + + /** + * + * @param putCntr Put counter to cache. + * @param ignite Grid. + * @param itemKey Item key. + * @param retry Retry count. + * @throws IgniteCheckedException If failed. + */ + @SuppressWarnings("unchecked") + private void onItemPrimary(boolean putCntr, Ignite ignite, String itemKey, int retry) throws IgniteCheckedException { + GridCache<String, Integer> cache = ignite.cache(null); + + UUID locId = ignite.cluster().localNode().id(); + UUID itemPrimaryId = primaryId(ignite, itemKey); + UUID cntrPrimaryId = primaryId(ignite, CNTR_KEY); + + boolean isCntrPrimary = cntrPrimaryId.equals(locId); + + try (IgniteTx tx = cache.txStart(PESSIMISTIC, REPEATABLE_READ)) { + if (DEBUG) + info("Before item primary get [retry=" + retry + ", xid=" + tx.xid() + ", node=" + ignite.name() + + ", isCntrPrimary=" + isCntrPrimary + ", nearId=" + locId + + ", nearEntry=" + nearEntry(locId, CNTR_KEY) + + (isCntrPrimary ? ", dhtEntry=" + dhtEntry(locId, CNTR_KEY) : "") + ']'); + + Integer cntr = cache.get(CNTR_KEY); + + int newVal = cntr + 1; + + if (putCntr) { + if (DEBUG) + info("Before item primary put counter [retry=" + retry + ", isCntrPrimary=" + isCntrPrimary + + ", cur=" + cntr + ", new=" + newVal + ", nearEntry=" + nearEntry(locId, CNTR_KEY) + + (isCntrPrimary ? ", dhtEntry=" + dhtEntry(locId, CNTR_KEY) : "") + ']'); + + cache.putx(CNTR_KEY, newVal); + } + + if (DEBUG) + info("Before item primary put item [retry=" + retry + ", key=" + itemKey + ", cur=" + cntr + + ", new=" + newVal + ", nearEntry=" + nearEntry(locId, itemKey) + + ", dhtEntry=" + dhtEntry(itemPrimaryId, itemKey) + ']'); + + cache.putx(itemKey, cntr); + + if (DEBUG) + info("After item primary put item [retry=" + retry + ", key=" + itemKey + ", cur=" + cntr + + ", new=" + newVal + ", nearEntry=" + nearEntry(locId, itemKey) + + ", dhtEntry=" + dhtEntry(itemPrimaryId, itemKey) + ']'); + + tx.commit(); + } + } + + /** + * + * @param putCntr Put counter to cache. + * @param ignite Grid. + * @param retry Retry count. + * @throws IgniteCheckedException If failed. + */ + @SuppressWarnings("unchecked") + private void onRemoveItemQueried(boolean putCntr, Ignite ignite, int retry) throws IgniteCheckedException { + GridCache<String, Integer> cache = ignite.cache(null); + + UUID locId = ignite.cluster().localNode().id(); + UUID cntrPrimaryId = primaryId(ignite, RMVD_CNTR_KEY); + + boolean isCntrPrimary = cntrPrimaryId.equals(locId); + + try (IgniteTx tx = cache.txStart(PESSIMISTIC, REPEATABLE_READ)) { + if (DEBUG) + ignite.log().info("Before item lock [retry=" + retry + ", xid=" + tx.xid() + ", node=" + ignite.name() + + ", isCntrPrimary=" + isCntrPrimary + ", nearId=" + locId + + ", nearEntry=" + nearEntry(locId, RMVD_CNTR_KEY) + + (isCntrPrimary ? ", dhtEntry=" + dhtEntry(locId, RMVD_CNTR_KEY) : "") + ']'); + + Integer cntr = cache.get(RMVD_CNTR_KEY); + + assert cntr != null : "Received null counter [retry=" + retry + ", isCntrPrimary=" + isCntrPrimary + + ", nearEntry=" + nearEntry(locId, RMVD_CNTR_KEY) + + (isCntrPrimary ? ", dhtEntry=" + dhtEntry(locId, RMVD_CNTR_KEY) : "") + ']'; + + int newVal = cntr - 1; + + if (putCntr) { + if (DEBUG) + ignite.log().info("Before item put counter [retry=" + retry + ", isCntrPrimary=" + isCntrPrimary + + ", cur=" + cntr + ", new=" + newVal + ", nearEntry=" + nearEntry(locId, RMVD_CNTR_KEY) + + (isCntrPrimary ? ", dhtEntry=" + dhtEntry(locId, RMVD_CNTR_KEY) : "") + ']'); + + cache.putx(RMVD_CNTR_KEY, newVal); + } + + while (true) { + GridCacheQuery<Map.Entry<String, Integer>> qry = + cache.queries().createSqlQuery(Integer.class, "_key != 'RMVD_CNTR_KEY' and _val >= 0"); + + if (DEBUG) + ignite.log().info("Before executing query [retry=" + retry + ", locId=" + locId + + ", txId=" + tx.xid() + ']'); + + Map.Entry<String, Integer> entry = qry.execute().next(); + + if (entry == null) { + ignite.log().info("*** Queue is empty."); + + return; + } + + String itemKey = entry.getKey(); + + UUID itemPrimaryId = primaryId(ignite, itemKey); + + // Lock the item key. + if (cache.get(itemKey) != null) { + if (DEBUG) + ignite.log().info("Before item remove [retry=" + retry + ", key=" + itemKey + ", cur=" + cntr + + ", nearEntry=" + nearEntry(locId, itemKey) + + ", dhtEntry=" + dhtEntry(itemPrimaryId, itemKey) + ']'); + + assert cache.removex(itemKey) : "Failed to remove key [locId=" + locId + + ", primaryId=" + itemPrimaryId + ", key=" + itemKey + ']'; + + if (DEBUG) + info("After item remove item [retry=" + retry + ", key=" + itemKey + ", cur=" + cntr + + ", new=" + newVal + ", nearEntry=" + nearEntry(locId, itemKey) + + ", dhtEntry=" + dhtEntry(itemPrimaryId, itemKey) + ']'); + + break; + } + else + cache.removex(itemKey); + } + + tx.commit(); + } + catch (Error e) { + ignite.log().error("Error in test.", e); + + throw e; + } + } + + /** + * + * @param putCntr Put counter to cache. + * @param ignite Grid. + * @param retry Retry count. + * @throws IgniteCheckedException If failed. + */ + @SuppressWarnings("unchecked") + private void onRemoveItemSimple(boolean putCntr, Ignite ignite, int retry) throws IgniteCheckedException { + GridCache<String, Integer> cache = ignite.cache(null); + + UUID locId = ignite.cluster().localNode().id(); + UUID cntrPrimaryId = primaryId(ignite, RMVD_CNTR_KEY); + + boolean isCntrPrimary = cntrPrimaryId.equals(locId); + + try (IgniteTx tx = cache.txStart(PESSIMISTIC, REPEATABLE_READ)) { + if (DEBUG) + ignite.log().info("Before item lock [retry=" + retry + ", xid=" + tx.xid() + ", node=" + ignite.name() + + ", isCntrPrimary=" + isCntrPrimary + ", nearId=" + locId + + ", nearEntry=" + nearEntry(locId, RMVD_CNTR_KEY) + + (isCntrPrimary ? ", dhtEntry=" + dhtEntry(locId, RMVD_CNTR_KEY) : "") + ']'); + + Integer cntr = cache.get(RMVD_CNTR_KEY); + + assert cntr != null : "Received null counter [retry=" + retry + ", isCntrPrimary=" + isCntrPrimary + + ", nearEntry=" + nearEntry(locId, RMVD_CNTR_KEY) + + (isCntrPrimary ? ", dhtEntry=" + dhtEntry(locId, RMVD_CNTR_KEY) : "") + ']'; + + String itemKey = Integer.toString(cntrRmvd.getAndIncrement()); + + Integer val = cache.get(itemKey); + + assert val != null : "Received null val [retry=" + retry + ", cacheSize=" + cache.size() + ']'; + + UUID itemPrimaryId = primaryId(ignite, itemKey); + + int newVal = cntr - 1; + + if (putCntr) { + if (DEBUG) + ignite.log().info("Before item put counter [retry=" + retry + ", isCntrPrimary=" + isCntrPrimary + + ", cur=" + cntr + ", new=" + newVal + ", nearEntry=" + nearEntry(locId, RMVD_CNTR_KEY) + + (isCntrPrimary ? ", dhtEntry=" + dhtEntry(locId, RMVD_CNTR_KEY) : "") + ']'); + + cache.putx(RMVD_CNTR_KEY, newVal); + } + + if (DEBUG) + ignite.log().info("Before item remove item [retry=" + retry + ", key=" + itemKey + ", cur=" + cntr + + ", new=" + newVal + ", nearEntry=" + nearEntry(locId, itemKey) + + ", dhtEntry=" + dhtEntry(itemPrimaryId, itemKey) + ']'); + + assertTrue(cache.removex(itemKey)); + + if (DEBUG) + info("After item put item [retry=" + retry + ", key=" + itemKey + ", cur=" + cntr + + ", new=" + newVal + ", nearEntry=" + nearEntry(locId, itemKey) + + ", dhtEntry=" + dhtEntry(itemPrimaryId, itemKey) + ']'); + + tx.commit(); + } + catch (Error e) { + ignite.log().error("Error in test.", e); + + throw e; + } + } + + /** + * + * @param putCntr Put counter to cache. + * @param ignite Grid. + * @throws IgniteCheckedException If failed. + */ + private void retries(Ignite ignite, boolean putCntr) throws IgniteCheckedException { + UUID nodeId = ignite.cluster().localNode().id(); + + for (int i = 0; i < RETRIES; i++) { + int cnt = cntr.getAndIncrement(); + + if (DEBUG) + ignite.log().info("***"); + if (DEBUG || cnt % LOG_FREQ == 0) + ignite.log().info("*** Iteration #" + i + " ***"); + if (DEBUG) + ignite.log().info("***"); + + String itemKey = nodeId + "-#" + i; + + if (nodeId.equals(primaryId(ignite, itemKey))) + onItemPrimary(putCntr, ignite, itemKey, i); + else + onItemNear(putCntr, ignite, itemKey, i); + } + } + + /** + * + * @param putCntr Put counter to cache. + * @param ignite Grid. + * @throws IgniteCheckedException If failed. + */ + private void removeRetriesQueried(Ignite ignite, boolean putCntr) throws IgniteCheckedException { + for (int i = 0; i < RETRIES; i++) { + if (DEBUG) + ignite.log().info("***"); + + if (DEBUG || cntrRmvd.getAndIncrement() % LOG_FREQ == 0) + ignite.log().info("*** Iteration #" + i + " ***"); + + if (DEBUG) + ignite.log().info("***"); + + onRemoveItemQueried(putCntr, ignite, i); + + if (i % 50 == 0) + ((GridKernal) ignite).internalCache().context().tm().printMemoryStats(); + } + } + + /** + * + * @param putCntr Put counter to cache. + * @param ignite Grid. + * @throws IgniteCheckedException If failed. + */ + private void removeRetriesSimple(Ignite ignite, boolean putCntr) throws IgniteCheckedException { + for (int i = 0; i < RETRIES; i++) { + if (DEBUG) + ignite.log().info("***"); + + if (cntrRmvd.get() % LOG_FREQ == 0 || DEBUG) + ignite.log().info("*** Iteration #" + i + " ***"); + + if (DEBUG) + ignite.log().info("***"); + + onRemoveItemSimple(putCntr, ignite, i); + } + } + + /** + * JUnit. + * + * @throws Exception If failed. + */ + public void testPutOneEntryInTx() throws Exception { +// resetLog4j(Level.INFO, true, GridCacheTxManager.class.getName()); + + startGrids(GRID_CNT); + + try { + grid(0).cache(null).put(CNTR_KEY, 0); + + grid(0).compute().call(new PutOneEntryInTxJob()); + } + finally { + stopAllGrids(); + } + } + + /** + * JUnit. + * + * @throws Exception If failed. + */ + public void testPutTwoEntriesInTx() throws Exception { +// resetLog4j(Level.INFO, true, GridCacheTxManager.class.getName()); + + startGrids(GRID_CNT); + + try { + grid(0).cache(null).put(CNTR_KEY, 0); + + grid(0).compute().call(new PutTwoEntriesInTxJob()); + + printCounter(); + + assertEquals(GRID_CNT * RETRIES, grid(0).cache(null).get(CNTR_KEY)); + } + finally { + stopAllGrids(); + } + } + + /** + * JUnit. + * + * @throws Exception If failed. + */ + public void testPutOneEntryInTxMultiThreaded() throws Exception { +// resetLog4j(Level.INFO, true, GridCacheTxManager.class.getName()); + + startGrids(GRID_CNT); + + Collection<Thread> threads = new LinkedList<>(); + + try { + // Initialize. + grid(0).cache(null).put(CNTR_KEY, 0); + + for (int i = 0; i < GRID_CNT; i++) { + final int gridId = i; + + threads.add(new Thread("thread-#" + i) { + @Override public void run() { + try { + retries(grid(gridId), false); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + } + }); + } + + for (Thread th : threads) + th.start(); + + for (Thread th : threads) + th.join(); + + printCounter(); + } + finally { + stopAllGrids(); + } + } + + /** + * JUnit. + * + * @throws Exception If failed. + */ + public void testPutTwoEntryInTxMultiThreaded() throws Exception { +// resetLog4j(Level.INFO, true, GridCacheTxManager.class.getName()); + + startGrids(GRID_CNT); + + Collection<Thread> threads = new LinkedList<>(); + + try { + grid(0).cache(null).put(CNTR_KEY, 0); + + for (int i = 0; i < GRID_CNT; i++) { + final int gridId = i; + + threads.add(new Thread() { + @Override public void run() { + try { + retries(grid(gridId), true); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + } + }); + } + + for (Thread th : threads) + th.start(); + + for (Thread th : threads) + th.join(); + + printCounter(); + + assertEquals(GRID_CNT * RETRIES, grid(0).cache(null).get(CNTR_KEY)); + } + finally { + stopAllGrids(); + } + } + + /** + * JUnit. + * + * @throws Exception If failed. + */ + public void testRemoveInTxQueried() throws Exception { + //resetLog4j(Level.INFO, true, GridCacheTxManager.class.getPackage().getName()); + + startGrids(GRID_CNT); + + try { + GridCache<String, Integer> cache = grid(0).cache(null); + + cache.put(RMVD_CNTR_KEY, 0); + + for (int i = 0; i < GRID_CNT * RETRIES; i++) + cache.put(String.valueOf(i), i); + + for (int i = 0; i < RETRIES; i++) + for (int j = 0; j < GRID_CNT; j++) + assertEquals(i, grid(j).cache(null).get(String.valueOf(i))); + + GridCacheQuery<Map.Entry<String, Integer>> qry = cache.queries().createSqlQuery(Integer.class, " _val >= 0"); + + Collection<Map.Entry<String, Integer>> entries = qry.execute().get(); + + assertFalse(entries.isEmpty()); + + cntrRmvd.set(0); + + grid(0).compute().call(new RemoveInTxJobQueried()); + + for (int i = 0; i < GRID_CNT * RETRIES; i++) + for (int ii = 0; ii < GRID_CNT; ii++) + assertEquals(null, grid(ii).cache(null).get(Integer.toString(i))); + + assertEquals(-GRID_CNT * RETRIES, grid(0).cache(null).peek(RMVD_CNTR_KEY)); + } + finally { + stopAllGrids(); + } + } + + /** + * JUnit. + * + * @throws Exception If failed. + */ + public void testRemoveInTxSimple() throws Exception { + startGrids(GRID_CNT); + + try { + GridCache<String, Integer> cache = grid(0).cache(null); + + cache.put(RMVD_CNTR_KEY, 0); + + for (int i = 0; i < GRID_CNT * RETRIES; i++) + cache.put(Integer.toString(i), i); + + for (int i = 0; i < RETRIES; i++) + for (int j = 0; j < GRID_CNT; j++) + assertEquals(i, grid(j).cache(null).get(Integer.toString(i))); + + GridCacheQuery<Map.Entry<String, Integer>> qry = cache.queries().createSqlQuery(Integer.class, " _val >= 0"); + + Collection<Map.Entry<String, Integer>> entries = qry.execute().get(); + + assertFalse(entries.isEmpty()); + + cntrRmvd.set(0); + + grid(0).compute().call(new RemoveInTxJobSimple()); + + // Check using cache. + for (int i = 0; i < GRID_CNT * RETRIES; i++) + for (int ii = 0; ii < GRID_CNT; ii++) + assertEquals(null, grid(ii).cache(null).get(Integer.toString(i))); + + // Check using query. + entries = qry.execute().get(); + + assertTrue(entries.isEmpty()); + + assertEquals(-GRID_CNT * RETRIES, grid(0).cache(null).peek(RMVD_CNTR_KEY)); + } + finally { + stopAllGrids(); + } + } + + /** + * JUnit. + * + * @throws Exception If failed. + */ + public void testRemoveInTxQueriedMultiThreaded() throws Exception { + //resetLog4j(Level.INFO, true, GridCacheTxManager.class.getPackage().getName()); + + backups = 1; + + try { + startGrids(GRID_CNT); + + GridCache<String, Integer> cache = grid(0).cache(null); + + // Store counter. + cache.put(RMVD_CNTR_KEY, 0); + + // Store values. + for (int i = 1; i <= GRID_CNT * RETRIES; i++) + cache.put(String.valueOf(i), i); + + for (int j = 0; j < GRID_CNT; j++) + assertEquals(0, grid(j).cache(null).get(RMVD_CNTR_KEY)); + + for (int i = 1; i <= RETRIES; i++) + for (int j = 0; j < GRID_CNT; j++) + assertEquals(i, grid(j).cache(null).get(String.valueOf(i))); + + GridCacheQuery<Map.Entry<String, Integer>> qry = cache.queries().createSqlQuery(Integer.class, "_val >= 0"); + + // Load all results. + qry.keepAll(true); + qry.includeBackups(false); + + // NOTE: for replicated cache includeBackups(false) is not enough since + // all nodes are considered primary, so we have to deduplicate result set. + if (cache.configuration().getCacheMode() == REPLICATED) + qry.enableDedup(true); + + List<Map.Entry<String, Integer>> entries = + new ArrayList<>(qry.execute().get()); + + Collections.sort(entries, new Comparator<Map.Entry<String, Integer>>() { + @Override public int compare(Map.Entry<String, Integer> o1, Map.Entry<String, Integer> o2) { + return o1.getValue().compareTo(o2.getValue()); + } + }); + + info("Queried entries: " + entries); + + int val = 0; + + for (Map.Entry<String, Integer> e : entries) { + assertEquals(val, e.getValue().intValue()); + + val++; + } + + assertFalse(entries.isEmpty()); + + cntrRmvd.set(0); + + Collection<Thread> threads = new LinkedList<>(); + + for (int i = 0; i < GRID_CNT; i++) { + final int gridId = i; + + threads.add(new Thread() { + @Override public void run() { + try { + removeRetriesQueried(grid(gridId), true); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + } + }); + } + + for (Thread th : threads) + th.start(); + + for (Thread th : threads) + th.join(); + + for (int i = 0; i < GRID_CNT * RETRIES; i++) + for (int ii = 0; ii < GRID_CNT; ii++) + assertEquals("Got invalid value from cache [gridIdx=" + ii + ", key=" + i + ']', + null, grid(ii).cache(null).get(Integer.toString(i))); + + assertEquals(-GRID_CNT * RETRIES, grid(0).cache(null).peek(RMVD_CNTR_KEY)); + } + finally { + stopAllGrids(); + } + } + + /** + * @throws IgniteCheckedException If failed. + */ + private void printCounter() throws IgniteCheckedException { + info("***"); + info("*** Peeked counter: " + grid(0).cache(null).peek(CNTR_KEY)); + info("*** Got counter: " + grid(0).cache(null).get(CNTR_KEY)); + info("***"); + } + + /** + * Test job putting data to queue. + */ + protected class PutTwoEntriesInTxJob implements IgniteCallable<Integer> { + /** */ + @GridToStringExclude + @IgniteInstanceResource + private Ignite ignite; + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public Integer call() throws IgniteCheckedException { + assertNotNull(ignite); + + ignite.log().info("Running job [node=" + ignite.cluster().localNode().id() + ", job=" + this + "]"); + + retries(ignite, true); + + return 0; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(PutTwoEntriesInTxJob.class, this); + } + } + + /** + * Test job putting data to cache. + */ + protected class PutOneEntryInTxJob implements IgniteCallable<Integer> { + /** */ + @GridToStringExclude + @IgniteInstanceResource + private Ignite ignite; + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public Integer call() throws IgniteCheckedException { + assertNotNull(ignite); + + ignite.log().info("Running job [node=" + ignite.cluster().localNode().id() + ", job=" + this + "]"); + + retries(ignite, false); + + return 0; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(PutOneEntryInTxJob.class, this); + } + } + + /** + * Test job removing data from cache using query. + */ + protected class RemoveInTxJobQueried implements IgniteCallable<Integer> { + /** */ + @GridToStringExclude + @IgniteInstanceResource + private Ignite ignite; + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public Integer call() throws IgniteCheckedException { + assertNotNull(ignite); + + ignite.log().info("Running job [node=" + ignite.cluster().localNode().id() + ", job=" + this + "]"); + + removeRetriesQueried(ignite, true); + + return 0; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(RemoveInTxJobQueried.class, this); + } + } + + /** + * Test job removing data from cache. + */ + protected class RemoveInTxJobSimple implements IgniteCallable<Integer> { + /** */ + @GridToStringExclude + @IgniteInstanceResource + private Ignite ignite; + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public Integer call() throws IgniteCheckedException { + assertNotNull(ignite); + + ignite.log().info("Running job [node=" + ignite.cluster().localNode().id() + ", job=" + this + "]"); + + removeRetriesSimple(ignite, true); + + return 0; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(RemoveInTxJobSimple.class, this); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3008d2ad/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxMultiThreadedAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxMultiThreadedAbstractTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxMultiThreadedAbstractTest.java new file mode 100644 index 0000000..efb7a26 --- /dev/null +++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxMultiThreadedAbstractTest.java @@ -0,0 +1,275 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.gridgain.grid.kernal.processors.cache; + +import org.apache.ignite.lang.*; +import org.apache.ignite.transactions.*; +import org.gridgain.grid.cache.*; +import org.gridgain.testframework.*; +import org.jetbrains.annotations.*; + +import java.util.*; +import java.util.concurrent.*; + +import static org.apache.ignite.transactions.GridCacheTxConcurrency.*; +import static org.apache.ignite.transactions.GridCacheTxIsolation.*; + +/** + * Tests for local transactions. + */ +@SuppressWarnings( {"BusyWait"}) +public abstract class IgniteTxMultiThreadedAbstractTest extends IgniteTxAbstractTest { + /** + * @return Thread count. + */ + protected abstract int threadCount(); + + /** + * @param concurrency Concurrency. + * @param isolation Isolation. + * @throws Exception If check failed. + */ + protected void checkCommitMultithreaded(final GridCacheTxConcurrency concurrency, + final GridCacheTxIsolation isolation) throws Exception { + GridTestUtils.runMultiThreaded(new Callable<Object>() { + @Nullable @Override public Object call() throws Exception { + Thread t = Thread.currentThread(); + + t.setName(t.getName() + "-id-" + t.getId()); + + info("Starting commit thread: " + Thread.currentThread().getName()); + + try { + checkCommit(concurrency, isolation); + } + finally { + info("Finished commit thread: " + Thread.currentThread().getName()); + } + + return null; + } + }, threadCount(), concurrency + "-" + isolation); + } + + /** + * @param concurrency Concurrency. + * @param isolation Isolation. + * @throws Exception If check failed. + */ + protected void checkRollbackMultithreaded(final GridCacheTxConcurrency concurrency, + final GridCacheTxIsolation isolation) throws Exception { + final ConcurrentMap<Integer, String> map = new ConcurrentHashMap<>(); + + GridTestUtils.runMultiThreaded(new Callable<Object>() { + @Nullable @Override public Object call() throws Exception { + Thread t = Thread.currentThread(); + + t.setName(t.getName() + "-id-" + t.getId()); + + info("Starting rollback thread: " + Thread.currentThread().getName()); + + try { + checkRollback(map, concurrency, isolation); + + return null; + } + finally { + info("Finished rollback thread: " + Thread.currentThread().getName()); + } + } + }, threadCount(), concurrency + "-" + isolation); + } + + /** + * @throws IgniteCheckedException If test failed. + */ + public void testPessimisticReadCommittedCommitMultithreaded() throws Exception { + checkCommitMultithreaded(PESSIMISTIC, READ_COMMITTED); + + finalChecks(); + } + + /** + * @throws IgniteCheckedException If test failed. + */ + public void testPessimisticRepeatableReadCommitMultithreaded() throws Exception { + checkCommitMultithreaded(PESSIMISTIC, REPEATABLE_READ); + + finalChecks(); + } + + /** + * @throws IgniteCheckedException If test failed. + */ + public void testPessimisticSerializableCommitMultithreaded() throws Exception { + checkCommitMultithreaded(PESSIMISTIC, SERIALIZABLE); + + finalChecks(); + } + + /** + * @throws IgniteCheckedException If test failed. + */ + public void testOptimisticReadCommittedCommitMultithreaded() throws Exception { + checkCommitMultithreaded(OPTIMISTIC, READ_COMMITTED); + + finalChecks(); + } + + /** + * @throws IgniteCheckedException If test failed. + */ + public void testOptimisticRepeatableReadCommitMultithreaded() throws Exception { + checkCommitMultithreaded(OPTIMISTIC, REPEATABLE_READ); + + finalChecks(); + } + + /** + * @throws IgniteCheckedException If test failed. + */ + public void testOptimisticSerializableCommitMultithreaded() throws Exception { + checkCommitMultithreaded(OPTIMISTIC, SERIALIZABLE); + + finalChecks(); + } + + /** + * @throws IgniteCheckedException If test failed. + */ + public void testPessimisticReadCommittedRollbackMultithreaded() throws Exception { + checkRollbackMultithreaded(PESSIMISTIC, READ_COMMITTED); + + finalChecks(); + } + + /** + * @throws IgniteCheckedException If test failed. + */ + public void testPessimisticRepeatableReadRollbackMultithreaded() throws Exception { + checkRollbackMultithreaded(PESSIMISTIC, REPEATABLE_READ); + + finalChecks(); + } + + /** + * @throws IgniteCheckedException If test failed. + */ + public void testPessimisticSerializableRollbackMultithreaded() throws Exception { + checkRollbackMultithreaded(PESSIMISTIC, SERIALIZABLE); + + finalChecks(); + } + + /** + * @throws IgniteCheckedException If test failed. + */ + public void testOptimisticReadCommittedRollbackMultithreaded() throws Exception { + checkRollbackMultithreaded(OPTIMISTIC, READ_COMMITTED); + + finalChecks(); + } + + /** + * @throws IgniteCheckedException If test failed. + */ + public void testOptimisticRepeatableReadRollbackMultithreaded() throws Exception { + checkRollbackMultithreaded(OPTIMISTIC, REPEATABLE_READ); + + finalChecks(); + } + + /** + * @throws IgniteCheckedException If test failed. + */ + public void testOptimisticSerializableRollbackMultithreaded() throws Exception { + checkRollbackMultithreaded(OPTIMISTIC, SERIALIZABLE); + + finalChecks(); + } + + /** + * @throws Exception If failed. + */ + // TODO: GG-8063, enabled when fixed. + public void _testOptimisticSerializableConsistency() throws Exception { + final GridCache<Integer, Long> cache = grid(0).cache(null); + + final int THREADS = 2; + + final int ITERATIONS = 100; + + final int key = 0; + + cache.put(key, 0L); + + List<IgniteFuture<Collection<Long>>> futs = new ArrayList<>(THREADS); + + for (int i = 0; i < THREADS; i++) { + futs.add(GridTestUtils.runAsync(new Callable<Collection<Long>>() { + @Override public Collection<Long> call() throws Exception { + Collection<Long> res = new ArrayList<>(); + + for (int i = 0; i < ITERATIONS; i++) { + while (true) { + try (IgniteTx tx = cache.txStart(OPTIMISTIC, SERIALIZABLE)) { + long val = cache.get(key); + + cache.put(key, val + 1); + + tx.commit(); + + assertTrue(res.add(val + 1)); + + break; + } + catch(GridCacheTxOptimisticException e) { + log.info("Got error, will retry: " + e); + } + } + } + + return res; + } + })); + } + + List<Collection<Long>> cols = new ArrayList<>(THREADS); + + for (IgniteFuture<Collection<Long>> fut : futs) { + Collection<Long> col = fut.get(); + + assertEquals(ITERATIONS, col.size()); + + cols.add(col); + } + + Set<Long> duplicates = new HashSet<>(); + + for (Collection<Long> col1 : cols) { + for (Long val1 : col1) { + for (Collection<Long> col2 : cols) { + if (col1 == col2) + continue; + + for (Long val2 : col2) { + if (val1.equals(val2)) { + duplicates.add(val2); + + break; + } + } + } + } + } + + assertTrue("Found duplicated values: " + duplicates, duplicates.isEmpty()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3008d2ad/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxReentryAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxReentryAbstractSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxReentryAbstractSelfTest.java new file mode 100644 index 0000000..8decc56 --- /dev/null +++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxReentryAbstractSelfTest.java @@ -0,0 +1,169 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.gridgain.grid.kernal.processors.cache; + +import org.apache.ignite.cluster.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.spi.*; +import org.apache.ignite.transactions.*; +import org.gridgain.grid.cache.*; +import org.gridgain.grid.kernal.managers.communication.*; +import org.gridgain.grid.kernal.processors.cache.distributed.*; +import org.gridgain.grid.kernal.processors.cache.distributed.dht.*; +import org.gridgain.grid.kernal.processors.cache.distributed.near.*; +import org.apache.ignite.spi.communication.tcp.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.gridgain.grid.util.direct.*; +import org.gridgain.testframework.junits.common.*; + +import java.util.concurrent.atomic.*; + +import static org.gridgain.grid.cache.GridCacheAtomicityMode.*; +import static org.gridgain.grid.cache.GridCacheDistributionMode.*; +import static org.apache.ignite.transactions.GridCacheTxConcurrency.*; +import static org.apache.ignite.transactions.GridCacheTxIsolation.*; + +/** + * Tests reentry in pessimistic repeatable read tx. + */ +public abstract class IgniteTxReentryAbstractSelfTest extends GridCommonAbstractTest { + /** IP finder. */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** @return Cache mode. */ + protected abstract GridCacheMode cacheMode(); + + /** @return Near enabled. */ + protected abstract boolean nearEnabled(); + + /** @return Grid count. */ + protected abstract int gridCount(); + + /** @return Test key. */ + protected abstract int testKey(); + + /** @return Expected number of near lock requests. */ + protected abstract int expectedNearLockRequests(); + + /** @return Expected number of near lock requests. */ + protected abstract int expectedDhtLockRequests(); + + /** @return Expected number of near lock requests. */ + protected abstract int expectedDistributedLockRequests(); + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); + + discoSpi.setIpFinder(IP_FINDER); + + cfg.setCommunicationSpi(new CountingCommunicationSpi()); + cfg.setDiscoverySpi(discoSpi); + + GridCacheConfiguration cacheCfg = defaultCacheConfiguration(); + + cacheCfg.setCacheMode(cacheMode()); + cacheCfg.setWriteSynchronizationMode(GridCacheWriteSynchronizationMode.FULL_SYNC); + cacheCfg.setAtomicityMode(TRANSACTIONAL); + cacheCfg.setDistributionMode(nearEnabled() ? NEAR_PARTITIONED : PARTITIONED_ONLY); + + cfg.setCacheConfiguration(cacheCfg); + + return cfg; + } + + /** @throws Exception If failed. */ + public void testLockReentry() throws Exception { + startGrids(gridCount()); + + try { + GridCache<Object, Object> cache = grid(0).cache(null); + + // Find test key. + int key = testKey(); + + try (IgniteTx tx = cache.txStart(PESSIMISTIC, REPEATABLE_READ)) { + // One near lock request. + cache.get(key); + + // No more requests. + cache.remove(key); + + tx.commit(); + } + + CountingCommunicationSpi commSpi = (CountingCommunicationSpi)grid(0).configuration().getCommunicationSpi(); + + assertEquals(expectedNearLockRequests(), commSpi.nearLocks()); + assertEquals(expectedDhtLockRequests(), commSpi.dhtLocks()); + assertEquals(expectedDistributedLockRequests(), commSpi.distributedLocks()); + } + finally { + stopAllGrids(); + } + } + + /** Counting communication SPI. */ + protected static class CountingCommunicationSpi extends TcpCommunicationSpi { + /** Distributed lock requests. */ + private AtomicInteger distLocks = new AtomicInteger(); + + /** Near lock requests. */ + private AtomicInteger nearLocks = new AtomicInteger(); + + /** Dht locks. */ + private AtomicInteger dhtLocks = new AtomicInteger(); + + /** {@inheritDoc} */ + @Override public void sendMessage(ClusterNode node, GridTcpCommunicationMessageAdapter msg) + throws IgniteSpiException { + countMsg((GridIoMessage)msg); + + super.sendMessage(node, msg); + } + + /** + * Unmarshals the message and increments counters. + * + * @param msg Message to check. + */ + private void countMsg(GridIoMessage msg) { + Object origMsg = msg.message(); + + if (origMsg instanceof GridDistributedLockRequest) { + distLocks.incrementAndGet(); + + if (origMsg instanceof GridNearLockRequest) + nearLocks.incrementAndGet(); + else if (origMsg instanceof GridDhtLockRequest) + dhtLocks.incrementAndGet(); + } + } + + /** @return Number of recorded distributed locks. */ + public int distributedLocks() { + return distLocks.get(); + } + + /** @return Number of recorded distributed locks. */ + public int nearLocks() { + return nearLocks.get(); + } + + /** @return Number of recorded distributed locks. */ + public int dhtLocks() { + return dhtLocks.get(); + } + } +}