http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3008d2ad/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxSingleThreadedAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxSingleThreadedAbstractTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxSingleThreadedAbstractTest.java new file mode 100644 index 0000000..017bc16 --- /dev/null +++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxSingleThreadedAbstractTest.java @@ -0,0 +1,129 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.gridgain.grid.kernal.processors.cache; + +import org.apache.ignite.*; + +import static org.apache.ignite.transactions.GridCacheTxConcurrency.*; +import static org.apache.ignite.transactions.GridCacheTxIsolation.*; + +/** + * Tests for local transactions. + */ +@SuppressWarnings( {"BusyWait"}) +public abstract class IgniteTxSingleThreadedAbstractTest extends IgniteTxAbstractTest { + /** + * @throws IgniteCheckedException If test failed. + */ + public void testPessimisticReadCommittedCommit() throws Exception { + checkCommit(PESSIMISTIC, READ_COMMITTED); + + finalChecks(); + } + + /** + * @throws IgniteCheckedException If test failed. + */ + public void testPessimisticRepeatableReadCommit() throws Exception { + checkCommit(PESSIMISTIC, REPEATABLE_READ); + + finalChecks(); + } + + /** + * @throws IgniteCheckedException If test failed. + */ + public void testPessimisticSerializableCommit() throws Exception { + checkCommit(PESSIMISTIC, SERIALIZABLE); + + finalChecks(); + } + + /** + * @throws IgniteCheckedException If test failed. + */ + public void testOptimisticReadCommittedCommit() throws Exception { + checkCommit(OPTIMISTIC, READ_COMMITTED); + + finalChecks(); + } + + /** + * @throws IgniteCheckedException If test failed. + */ + public void testOptimisticRepeatableReadCommit() throws Exception { + checkCommit(OPTIMISTIC, REPEATABLE_READ); + + finalChecks(); + } + + /** + * @throws IgniteCheckedException If test failed. + */ + public void testOptimisticSerializableCommit() throws Exception { + checkCommit(OPTIMISTIC, SERIALIZABLE); + + finalChecks(); + } + + /** + * @throws IgniteCheckedException If test failed. + */ + public void testPessimisticReadCommittedRollback() throws Exception { + checkRollback(PESSIMISTIC, READ_COMMITTED); + + finalChecks(); + } + + /** + * @throws IgniteCheckedException If test failed. + */ + public void testPessimisticRepeatableReadRollback() throws Exception { + checkRollback(PESSIMISTIC, REPEATABLE_READ); + + finalChecks(); + } + + /** + * @throws IgniteCheckedException If test failed. + */ + public void testPessimisticSerializableRollback() throws Exception { + checkRollback(PESSIMISTIC, SERIALIZABLE); + + finalChecks(); + } + + /** + * @throws IgniteCheckedException If test failed. + */ + public void testOptimisticReadCommittedRollback() throws Exception { + checkRollback(OPTIMISTIC, READ_COMMITTED); + + finalChecks(); + } + + /** + * @throws IgniteCheckedException If test failed. + */ + public void testOptimisticRepeatableReadRollback() throws Exception { + checkRollback(OPTIMISTIC, REPEATABLE_READ); + + finalChecks(); + } + + /** + * @throws IgniteCheckedException If test failed. + */ + public void testOptimisticSerializableRollback() throws Exception { + checkRollback(OPTIMISTIC, SERIALIZABLE); + + finalChecks(); + } +}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3008d2ad/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxStoreExceptionAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxStoreExceptionAbstractSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxStoreExceptionAbstractSelfTest.java new file mode 100644 index 0000000..f79aea0 --- /dev/null +++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxStoreExceptionAbstractSelfTest.java @@ -0,0 +1,631 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.gridgain.grid.kernal.processors.cache; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.transactions.*; +import org.gridgain.grid.cache.*; +import org.gridgain.grid.cache.store.*; +import org.gridgain.grid.kernal.*; +import org.gridgain.grid.kernal.processors.cache.distributed.near.*; +import org.gridgain.testframework.*; +import org.jetbrains.annotations.*; + +import java.util.*; +import java.util.concurrent.*; + +import static org.gridgain.grid.cache.GridCacheMode.*; + +/** + * Tests that transaction is invalidated in case of {@link GridCacheTxHeuristicException}. + */ +public abstract class IgniteTxStoreExceptionAbstractSelfTest extends GridCacheAbstractSelfTest { + /** Index SPI throwing exception. */ + private static TestStore store = new TestStore(); + + /** */ + private static final int PRIMARY = 0; + + /** */ + private static final int BACKUP = 1; + + /** */ + private static final int NOT_PRIMARY_AND_BACKUP = 2; + + /** */ + private static Integer lastKey; + + /** {@inheritDoc} */ + @Override protected int gridCount() { + return 3; + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.getTransactionsConfiguration().setTxSerializableEnabled(true); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected GridCacheConfiguration cacheConfiguration(String gridName) throws Exception { + GridCacheConfiguration ccfg = super.cacheConfiguration(gridName); + + ccfg.setStore(store); + + return ccfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + lastKey = 0; + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + store.forceFail(false); + + super.afterTest(); + } + + /** + * @throws Exception If failed. + */ + public void testPutNear() throws Exception { + checkPut(true, keyForNode(grid(0).localNode(), NOT_PRIMARY_AND_BACKUP)); + + checkPut(false, keyForNode(grid(0).localNode(), NOT_PRIMARY_AND_BACKUP)); + } + + /** + * @throws Exception If failed. + */ + public void testPutPrimary() throws Exception { + checkPut(true, keyForNode(grid(0).localNode(), PRIMARY)); + + checkPut(false, keyForNode(grid(0).localNode(), PRIMARY)); + } + + /** + * @throws Exception If failed. + */ + public void testPutBackup() throws Exception { + checkPut(true, keyForNode(grid(0).localNode(), BACKUP)); + + checkPut(false, keyForNode(grid(0).localNode(), BACKUP)); + } + + /** + * @throws Exception If failed. + */ + public void testPutAll() throws Exception { + checkPutAll(true, keyForNode(grid(0).localNode(), PRIMARY), + keyForNode(grid(0).localNode(), PRIMARY), + keyForNode(grid(0).localNode(), PRIMARY)); + + checkPutAll(false, keyForNode(grid(0).localNode(), PRIMARY), + keyForNode(grid(0).localNode(), PRIMARY), + keyForNode(grid(0).localNode(), PRIMARY)); + + if (gridCount() > 1) { + checkPutAll(true, keyForNode(grid(1).localNode(), PRIMARY), + keyForNode(grid(1).localNode(), PRIMARY), + keyForNode(grid(1).localNode(), PRIMARY)); + + checkPutAll(false, keyForNode(grid(1).localNode(), PRIMARY), + keyForNode(grid(1).localNode(), PRIMARY), + keyForNode(grid(1).localNode(), PRIMARY)); + } + } + + /** + * @throws Exception If failed. + */ + public void testRemoveNear() throws Exception { + checkRemove(false, keyForNode(grid(0).localNode(), NOT_PRIMARY_AND_BACKUP)); + + checkRemove(true, keyForNode(grid(0).localNode(), NOT_PRIMARY_AND_BACKUP)); + } + + /** + * @throws Exception If failed. + */ + public void testRemovePrimary() throws Exception { + checkRemove(false, keyForNode(grid(0).localNode(), PRIMARY)); + + checkRemove(true, keyForNode(grid(0).localNode(), PRIMARY)); + } + + /** + * @throws Exception If failed. + */ + public void testRemoveBackup() throws Exception { + checkRemove(false, keyForNode(grid(0).localNode(), BACKUP)); + + checkRemove(true, keyForNode(grid(0).localNode(), BACKUP)); + } + + /** + * @throws Exception If failed. + */ + public void testTransformNear() throws Exception { + checkTransform(false, keyForNode(grid(0).localNode(), NOT_PRIMARY_AND_BACKUP)); + + checkTransform(true, keyForNode(grid(0).localNode(), NOT_PRIMARY_AND_BACKUP)); + } + + /** + * @throws Exception If failed. + */ + public void testTransformPrimary() throws Exception { + checkTransform(false, keyForNode(grid(0).localNode(), PRIMARY)); + + checkTransform(true, keyForNode(grid(0).localNode(), PRIMARY)); + } + + /** + * @throws Exception If failed. + */ + public void testTransformBackup() throws Exception { + checkTransform(false, keyForNode(grid(0).localNode(), BACKUP)); + + checkTransform(true, keyForNode(grid(0).localNode(), BACKUP)); + } + + /** + * @throws Exception If failed. + */ + public void testPutNearTx() throws Exception { + for (GridCacheTxConcurrency concurrency : GridCacheTxConcurrency.values()) { + for (GridCacheTxIsolation isolation : GridCacheTxIsolation.values()) { + checkPutTx(true, concurrency, isolation, keyForNode(grid(0).localNode(), NOT_PRIMARY_AND_BACKUP)); + + checkPutTx(false, concurrency, isolation, keyForNode(grid(0).localNode(), NOT_PRIMARY_AND_BACKUP)); + } + } + } + + /** + * @throws Exception If failed. + */ + public void testPutPrimaryTx() throws Exception { + for (GridCacheTxConcurrency concurrency : GridCacheTxConcurrency.values()) { + for (GridCacheTxIsolation isolation : GridCacheTxIsolation.values()) { + checkPutTx(true, concurrency, isolation, keyForNode(grid(0).localNode(), PRIMARY)); + + checkPutTx(false, concurrency, isolation, keyForNode(grid(0).localNode(), PRIMARY)); + } + } + } + + /** + * @throws Exception If failed. + */ + public void testPutBackupTx() throws Exception { + for (GridCacheTxConcurrency concurrency : GridCacheTxConcurrency.values()) { + for (GridCacheTxIsolation isolation : GridCacheTxIsolation.values()) { + checkPutTx(true, concurrency, isolation, keyForNode(grid(0).localNode(), BACKUP)); + + checkPutTx(false, concurrency, isolation, keyForNode(grid(0).localNode(), BACKUP)); + } + } + } + + /** + * @throws Exception If failed. + */ + public void testPutMultipleKeysTx() throws Exception { + for (GridCacheTxConcurrency concurrency : GridCacheTxConcurrency.values()) { + for (GridCacheTxIsolation isolation : GridCacheTxIsolation.values()) { + checkPutTx(true, concurrency, isolation, + keyForNode(grid(0).localNode(), PRIMARY), + keyForNode(grid(0).localNode(), PRIMARY), + keyForNode(grid(0).localNode(), PRIMARY)); + + checkPutTx(false, concurrency, isolation, + keyForNode(grid(0).localNode(), PRIMARY), + keyForNode(grid(0).localNode(), PRIMARY), + keyForNode(grid(0).localNode(), PRIMARY)); + + if (gridCount() > 1) { + checkPutTx(true, concurrency, isolation, + keyForNode(grid(1).localNode(), PRIMARY), + keyForNode(grid(1).localNode(), PRIMARY), + keyForNode(grid(1).localNode(), PRIMARY)); + + checkPutTx(false, concurrency, isolation, + keyForNode(grid(1).localNode(), PRIMARY), + keyForNode(grid(1).localNode(), PRIMARY), + keyForNode(grid(1).localNode(), PRIMARY)); + } + } + } + } + + /** + * @param putBefore If {@code true} then puts some value before executing failing operation. + * @param keys Keys. + * @param concurrency Transaction concurrency. + * @param isolation Transaction isolation. + * @throws Exception If failed. + */ + private void checkPutTx(boolean putBefore, GridCacheTxConcurrency concurrency, + GridCacheTxIsolation isolation, final Integer... keys) throws Exception { + assertTrue(keys.length > 0); + + info("Test transaction [concurrency=" + concurrency + ", isolation=" + isolation + ']'); + + GridCache<Integer, Integer> cache = grid(0).cache(null); + + if (putBefore) { + store.forceFail(false); + + info("Start transaction."); + + try (IgniteTx tx = cache.txStart(concurrency, isolation)) { + for (Integer key : keys) { + info("Put " + key); + + cache.put(key, 1); + } + + info("Commit."); + + tx.commit(); + } + } + + // Execute get from all nodes to create readers for near cache. + for (int i = 0; i < gridCount(); i++) { + for (Integer key : keys) + grid(i).cache(null).get(key); + } + + store.forceFail(true); + + try { + info("Start transaction."); + + try (IgniteTx tx = cache.txStart(concurrency, isolation)) { + for (Integer key : keys) { + info("Put " + key); + + cache.put(key, 2); + } + + info("Commit."); + + tx.commit(); + } + + fail("Transaction should fail."); + } + catch (IgniteCheckedException e) { + log.info("Expected exception: " + e); + } + + for (Integer key : keys) + checkValue(key, putBefore); + } + + /** + * @param key Key. + * @throws Exception If failed. + */ + private void checkValue(final Integer key, boolean putBefore) throws Exception { + store.forceFail(false); + + info("Check key: " + key); + + for (int i = 0; i < gridCount(); i++) { + GridKernal grid = (GridKernal) grid(i); + + GridCacheAdapter cache = grid.internalCache(null); + + GridCacheMapEntry entry = cache.map().getEntry(key); + + log.info("Entry: " + entry); + + if (entry != null) { + assertFalse("Unexpected entry for grid [idx=" + i + ", entry=" + entry + ']', entry.lockedByAny()); + assertEquals("Unexpected entry for grid [idx=" + i + ", entry=" + entry + ']', putBefore, + entry.hasValue()); + assertEquals("Unexpected entry for grid [idx=" + i + ", entry=" + entry + ']', putBefore ? 1 : null, + entry.rawGetOrUnmarshal(false)); + } + + if (cache.isNear()) { + entry = ((GridNearCacheAdapter)cache).dht().map().getEntry(key); + + log.info("Dht entry: " + entry); + + if (entry != null) { + assertFalse("Unexpected entry for grid [idx=" + i + ", entry=" + entry + ']', entry.lockedByAny()); + assertEquals("Unexpected entry for grid [idx=" + i + ", entry=" + entry + ']', putBefore, + entry.hasValue()); + assertEquals("Unexpected entry for grid [idx=" + i + ", entry=" + entry + ']', putBefore ? 1 : null, + entry.rawGetOrUnmarshal(false)); + } + } + } + + for (int i = 0; i < gridCount(); i++) + assertEquals("Unexpected value for grid " + i, putBefore ? 1 : null, grid(i).cache(null).get(key)); + } + + /** + * @param putBefore If {@code true} then puts some value before executing failing operation. + * @param key Key. + * @throws Exception If failed. + */ + private void checkPut(boolean putBefore, final Integer key) throws Exception { + if (putBefore) { + store.forceFail(false); + + info("Put key: " + key); + + grid(0).cache(null).put(key, 1); + } + + // Execute get from all nodes to create readers for near cache. + for (int i = 0; i < gridCount(); i++) + grid(i).cache(null).get(key); + + store.forceFail(true); + + info("Going to put: " + key); + + GridTestUtils.assertThrows(log, new Callable<Void>() { + @Override public Void call() throws Exception { + grid(0).cache(null).put(key, 2); + + return null; + } + }, GridCacheTxRollbackException.class, null); + + checkValue(key, putBefore); + } + + /** + * @param putBefore If {@code true} then puts some value before executing failing operation. + * @param key Key. + * @throws Exception If failed. + */ + private void checkTransform(boolean putBefore, final Integer key) throws Exception { + if (putBefore) { + store.forceFail(false); + + info("Put key: " + key); + + grid(0).cache(null).put(key, 1); + } + + // Execute get from all nodes to create readers for near cache. + for (int i = 0; i < gridCount(); i++) + grid(i).cache(null).get(key); + + store.forceFail(true); + + info("Going to transform: " + key); + + GridTestUtils.assertThrows(log, new Callable<Void>() { + @Override public Void call() throws Exception { + grid(0).cache(null).transform(key, new IgniteClosure<Object, Object>() { + @Override public Object apply(Object o) { + return 2; + } + }); + + return null; + } + }, GridCacheTxRollbackException.class, null); + + checkValue(key, putBefore); + } + + /** + * @param putBefore If {@code true} then puts some value before executing failing operation. + * @param keys Keys. + * @throws Exception If failed. + */ + private void checkPutAll(boolean putBefore, Integer ... keys) throws Exception { + assert keys.length > 1; + + if (putBefore) { + store.forceFail(false); + + Map<Integer, Integer> m = new HashMap<>(); + + for (Integer key : keys) + m.put(key, 1); + + info("Put data: " + m); + + grid(0).cache(null).putAll(m); + } + + // Execute get from all nodes to create readers for near cache. + for (int i = 0; i < gridCount(); i++) { + for (Integer key : keys) + grid(i).cache(null).get(key); + } + + store.forceFail(true); + + final Map<Integer, Integer> m = new HashMap<>(); + + for (Integer key : keys) + m.put(key, 2); + + info("Going to putAll: " + m); + + GridTestUtils.assertThrows(log, new Callable<Void>() { + @Override public Void call() throws Exception { + grid(0).cache(null).putAll(m); + + return null; + } + }, GridCacheTxRollbackException.class, null); + + for (Integer key : m.keySet()) + checkValue(key, putBefore); + } + + /** + * @param putBefore If {@code true} then puts some value before executing failing operation. + * @param key Key. + * @throws Exception If failed. + */ + private void checkRemove(boolean putBefore, final Integer key) throws Exception { + if (putBefore) { + store.forceFail(false); + + info("Put key: " + key); + + grid(0).cache(null).put(key, 1); + } + + // Execute get from all nodes to create readers for near cache. + for (int i = 0; i < gridCount(); i++) + grid(i).cache(null).get(key); + + store.forceFail(true); + + info("Going to remove: " + key); + + GridTestUtils.assertThrows(log, new Callable<Void>() { + @Override public Void call() throws Exception { + grid(0).cache(null).remove(key); + + return null; + } + }, GridCacheTxRollbackException.class, null); + + checkValue(key, putBefore); + } + + /** + * Generates key of a given type for given node. + * + * @param node Node. + * @param type Key type. + * @return Key. + */ + private Integer keyForNode(ClusterNode node, int type) { + GridCache<Integer, Integer> cache = grid(0).cache(null); + + if (cache.configuration().getCacheMode() == LOCAL) + return ++lastKey; + + if (cache.configuration().getCacheMode() == REPLICATED && type == NOT_PRIMARY_AND_BACKUP) + return ++lastKey; + + for (int key = lastKey + 1; key < (lastKey + 10_000); key++) { + switch (type) { + case NOT_PRIMARY_AND_BACKUP: { + if (!cache.affinity().isPrimaryOrBackup(node, key)) { + lastKey = key; + + return key; + } + + break; + } + + case PRIMARY: { + if (cache.affinity().isPrimary(node, key)) { + lastKey = key; + + return key; + } + + break; + } + + case BACKUP: { + if (cache.affinity().isBackup(node, key)) { + lastKey = key; + + return key; + } + + break; + } + + default: + fail(); + } + } + + throw new IllegalStateException("Failed to find key."); + } + + /** + * + */ + private static class TestStore implements GridCacheStore<Object, Object> { + /** Fail flag. */ + private volatile boolean fail; + + /** + * @param fail Fail flag. + */ + public void forceFail(boolean fail) { + this.fail = fail; + } + + + @Nullable @Override public Object load(@Nullable IgniteTx tx, Object key) throws IgniteCheckedException { + return null; + } + + @Override public void loadCache(IgniteBiInClosure<Object, Object> clo, @Nullable Object... args) + throws IgniteCheckedException { + if (fail) + throw new IgniteCheckedException("Store exception"); + } + + @Override public void loadAll(@Nullable IgniteTx tx, Collection<?> keys, IgniteBiInClosure<Object, Object> c) + throws IgniteCheckedException { + } + + @Override public void put(@Nullable IgniteTx tx, Object key, Object val) throws IgniteCheckedException { + if (fail) + throw new IgniteCheckedException("Store exception"); + } + + @Override public void putAll(@Nullable IgniteTx tx, Map<?, ?> map) throws IgniteCheckedException { + if (fail) + throw new IgniteCheckedException("Store exception"); + } + + @Override public void remove(@Nullable IgniteTx tx, Object key) throws IgniteCheckedException { + if (fail) + throw new IgniteCheckedException("Store exception"); + } + + @Override public void removeAll(@Nullable IgniteTx tx, Collection<?> keys) throws IgniteCheckedException { + if (fail) + throw new IgniteCheckedException("Store exception"); + } + + @Override public void txEnd(IgniteTx tx, boolean commit) throws IgniteCheckedException { + if (fail && commit) + throw new IgniteCheckedException("Store exception"); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3008d2ad/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheAtomicLongApiSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheAtomicLongApiSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheAtomicLongApiSelfTest.java index 1291faf..8026483 100644 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheAtomicLongApiSelfTest.java +++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheAtomicLongApiSelfTest.java @@ -464,7 +464,7 @@ public class GridCacheAtomicLongApiSelfTest extends GridCommonAbstractTest { assertEquals(0, cache.primarySize()); - try (GridCacheTx tx = cache.txStart()) { + try (IgniteTx tx = cache.txStart()) { long newVal = RND.nextLong(); long curAtomicVal = atomic.get(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3008d2ad/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheSequenceApiSelfAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheSequenceApiSelfAbstractTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheSequenceApiSelfAbstractTest.java index 9d165cc..db7115b 100644 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheSequenceApiSelfAbstractTest.java +++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheSequenceApiSelfAbstractTest.java @@ -220,7 +220,7 @@ public abstract class GridCacheSequenceApiSelfAbstractTest extends GridCommonAbs * @throws Exception If failed. */ public void testGetAndAddInTx() throws Exception { - try (GridCacheTx tx = grid().cache(null).txStart(PESSIMISTIC, REPEATABLE_READ)) { + try (IgniteTx tx = grid().cache(null).txStart(PESSIMISTIC, REPEATABLE_READ)) { for (int i = 1; i < MAX_LOOPS_NUM; i++) { for (GridCacheAtomicSequence seq : seqArr) getAndAdd(seq, i); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3008d2ad/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/datastructures/partitioned/GridCachePartitionedNodeRestartTxSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/datastructures/partitioned/GridCachePartitionedNodeRestartTxSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/datastructures/partitioned/GridCachePartitionedNodeRestartTxSelfTest.java index ead25ef..5ed6edd 100644 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/datastructures/partitioned/GridCachePartitionedNodeRestartTxSelfTest.java +++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/datastructures/partitioned/GridCachePartitionedNodeRestartTxSelfTest.java @@ -147,7 +147,7 @@ public class GridCachePartitionedNodeRestartTxSelfTest extends GridCommonAbstrac assert PARTITIONED == grid(i).cache(null).configuration().getCacheMode(); - try (GridCacheTx tx = grid(i).cache(null).txStart(PESSIMISTIC, REPEATABLE_READ)) { + try (IgniteTx tx = grid(i).cache(null).txStart(PESSIMISTIC, REPEATABLE_READ)) { Integer val = (Integer) grid(i).cache(null).get(key); assertEquals("Simple check failed for node: " + i, (Integer) i, val); @@ -172,7 +172,7 @@ public class GridCachePartitionedNodeRestartTxSelfTest extends GridCommonAbstrac assert PARTITIONED == grid(i).cache(null).configuration().getCacheMode(); - try (GridCacheTx tx = grid(i).cache(null).txStart(PESSIMISTIC, REPEATABLE_READ)) { + try (IgniteTx tx = grid(i).cache(null).txStart(PESSIMISTIC, REPEATABLE_READ)) { GridCacheInternalKey key = new GridCacheInternalKeyImpl(name); GridCacheAtomicLongValue atomicVal = ((GridCacheAtomicLongValue) grid(i).cache(null).get(key)); @@ -230,7 +230,7 @@ public class GridCachePartitionedNodeRestartTxSelfTest extends GridCommonAbstrac // Init cache data. - try (GridCacheTx tx = grid(0).cache(null).txStart(PESSIMISTIC, REPEATABLE_READ)) { + try (IgniteTx tx = grid(0).cache(null).txStart(PESSIMISTIC, REPEATABLE_READ)) { // Put simple value. grid(0).cache(null).put(key, INIT_GRID_NUM); @@ -253,7 +253,7 @@ public class GridCachePartitionedNodeRestartTxSelfTest extends GridCommonAbstrac // Init cache data. - try (GridCacheTx tx = grid(0).cache(null).txStart(PESSIMISTIC, REPEATABLE_READ)) { + try (IgniteTx tx = grid(0).cache(null).txStart(PESSIMISTIC, REPEATABLE_READ)) { // Put custom data grid(0).cache(null).put(new GridCacheInternalKeyImpl(key), new GridCacheAtomicLongValue(INIT_GRID_NUM)); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3008d2ad/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/datastructures/partitioned/GridCachePartitionedQueueCreateMultiNodeSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/datastructures/partitioned/GridCachePartitionedQueueCreateMultiNodeSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/datastructures/partitioned/GridCachePartitionedQueueCreateMultiNodeSelfTest.java index 2cb26c6..84340a9 100644 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/datastructures/partitioned/GridCachePartitionedQueueCreateMultiNodeSelfTest.java +++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/datastructures/partitioned/GridCachePartitionedQueueCreateMultiNodeSelfTest.java @@ -151,7 +151,7 @@ public class GridCachePartitionedQueueCreateMultiNodeSelfTest extends GridCommon info("Partition: " + cache.affinity().partition(1)); - try (GridCacheTx tx = cache.txStart(PESSIMISTIC, REPEATABLE_READ)) { + try (IgniteTx tx = cache.txStart(PESSIMISTIC, REPEATABLE_READ)) { // info("Getting value for key 1"); String s = cache.get(1); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3008d2ad/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheAbstractDistributedByteArrayValuesSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheAbstractDistributedByteArrayValuesSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheAbstractDistributedByteArrayValuesSelfTest.java index 3467e53..24a33bb 100644 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheAbstractDistributedByteArrayValuesSelfTest.java +++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheAbstractDistributedByteArrayValuesSelfTest.java @@ -328,7 +328,7 @@ public abstract class GridCacheAbstractDistributedByteArrayValuesSelfTest extend private void testTransactionMixed0(GridCache<Integer, Object>[] caches, GridCacheTxConcurrency concurrency, Integer key1, byte[] val1, @Nullable Integer key2, @Nullable Object val2) throws Exception { for (GridCache<Integer, Object> cache : caches) { - GridCacheTx tx = cache.txStart(concurrency, REPEATABLE_READ); + IgniteTx tx = cache.txStart(concurrency, REPEATABLE_READ); try { cache.put(key1, val1); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3008d2ad/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheAbstractJobExecutionTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheAbstractJobExecutionTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheAbstractJobExecutionTest.java index 45e20ae..5773714 100644 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheAbstractJobExecutionTest.java +++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheAbstractJobExecutionTest.java @@ -125,7 +125,7 @@ public abstract class GridCacheAbstractJobExecutionTest extends GridCommonAbstra @Override public Void applyx(final Integer i) throws IgniteCheckedException { GridCache<String, int[]> cache = this.ignite.cache(null); - try (GridCacheTx tx = cache.txStart(concur, isolation)) { + try (IgniteTx tx = cache.txStart(concur, isolation)) { int[] arr = cache.get("TestKey"); if (arr == null) @@ -159,7 +159,7 @@ public abstract class GridCacheAbstractJobExecutionTest extends GridCommonAbstra // Do within transaction to make sure that lock is acquired // which means that all previous transactions have committed. - try (GridCacheTx tx = c.txStart(concur, isolation)) { + try (IgniteTx tx = c.txStart(concur, isolation)) { int[] arr = c.get("TestKey"); assertNotNull(arr); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3008d2ad/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheAbstractNodeRestartSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheAbstractNodeRestartSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheAbstractNodeRestartSelfTest.java index 7cd73cc..372639f 100644 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheAbstractNodeRestartSelfTest.java +++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheAbstractNodeRestartSelfTest.java @@ -623,7 +623,7 @@ public abstract class GridCacheAbstractNodeRestartSelfTest extends GridCommonAbs int c = 0; try { - try (GridCacheTx tx = cache.txStart(txConcurrency(), REPEATABLE_READ)) { + try (IgniteTx tx = cache.txStart(txConcurrency(), REPEATABLE_READ)) { c = txCntr.incrementAndGet(); if (c % logFreq == 0) @@ -772,7 +772,7 @@ public abstract class GridCacheAbstractNodeRestartSelfTest extends GridCommonAbs int c = 0; - try (GridCacheTx tx = cache.txStart(PESSIMISTIC, REPEATABLE_READ)) { + try (IgniteTx tx = cache.txStart(PESSIMISTIC, REPEATABLE_READ)) { c = txCntr.incrementAndGet(); if (c % logFreq == 0) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3008d2ad/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheAbstractPrimarySyncSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheAbstractPrimarySyncSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheAbstractPrimarySyncSelfTest.java index 1c12f51..1c8bf17 100644 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheAbstractPrimarySyncSelfTest.java +++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheAbstractPrimarySyncSelfTest.java @@ -84,7 +84,7 @@ public abstract class GridCacheAbstractPrimarySyncSelfTest extends GridCommonAbs GridCache<Integer, Integer> cache = grid(j).cache(null); if (cache.entry(i).primary()) { - try (GridCacheTx tx = cache.txStart(PESSIMISTIC, REPEATABLE_READ)) { + try (IgniteTx tx = cache.txStart(PESSIMISTIC, REPEATABLE_READ)) { cache.put(i, i); tx.commit(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3008d2ad/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheBasicOpAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheBasicOpAbstractTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheBasicOpAbstractTest.java index 155ceee..a05eacc 100644 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheBasicOpAbstractTest.java +++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheBasicOpAbstractTest.java @@ -252,7 +252,7 @@ public abstract class GridCacheBasicOpAbstractTest extends GridCommonAbstractTes ignite2.events().localListen(lsnr, EVT_CACHE_OBJECT_PUT, EVT_CACHE_OBJECT_REMOVED); ignite3.events().localListen(lsnr, EVT_CACHE_OBJECT_PUT, EVT_CACHE_OBJECT_REMOVED); - GridCacheTx tx = cache1.txStart(OPTIMISTIC, READ_COMMITTED, 0, 0); + IgniteTx tx = cache1.txStart(OPTIMISTIC, READ_COMMITTED, 0, 0); try { cache1.put("tx1", "val1"); @@ -317,7 +317,7 @@ public abstract class GridCacheBasicOpAbstractTest extends GridCommonAbstractTes GridCache<String, String> cache2 = ignite2.cache(null); GridCache<String, String> cache3 = ignite3.cache(null); - GridCacheTx tx = cache1.txStart(); + IgniteTx tx = cache1.txStart(); cache1.put("key", "val"); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3008d2ad/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheEntrySetAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheEntrySetAbstractSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheEntrySetAbstractSelfTest.java index b833381..a89ced2 100644 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheEntrySetAbstractSelfTest.java +++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheEntrySetAbstractSelfTest.java @@ -82,7 +82,7 @@ public abstract class GridCacheEntrySetAbstractSelfTest extends GridCacheAbstrac * @throws Exception If failed. */ private void putAndCheckEntrySet(GridCache<Object, Object> cache) throws Exception { - try (GridCacheTx tx = cache.txStart(PESSIMISTIC, REPEATABLE_READ)) { + try (IgniteTx tx = cache.txStart(PESSIMISTIC, REPEATABLE_READ)) { Integer total = (Integer) cache.get(TX_KEY); if (total == null) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3008d2ad/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheEventAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheEventAbstractTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheEventAbstractTest.java index 9cc8f32..6801ffc 100644 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheEventAbstractTest.java +++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheEventAbstractTest.java @@ -253,7 +253,7 @@ public abstract class GridCacheEventAbstractTest extends GridCacheAbstractSelfTe String key = e.getKey(); Integer val = e.getValue(); - GridCacheTx tx = cache.txStart(); + IgniteTx tx = cache.txStart(); assert cache.put(key, val) == null; @@ -285,7 +285,7 @@ public abstract class GridCacheEventAbstractTest extends GridCacheAbstractSelfTe String key = e.getKey(); Integer val = e.getValue(); - GridCacheTx tx = cache.txStart(); + IgniteTx tx = cache.txStart(); assert cache.put(key, val) == null; @@ -351,7 +351,7 @@ public abstract class GridCacheEventAbstractTest extends GridCacheAbstractSelfTe String key = e.getKey(); Integer val = e.getValue(); - GridCacheTx tx = cache.txStart(); + IgniteTx tx = cache.txStart(); assert cache.putAsync(key, val).get() == null; @@ -383,7 +383,7 @@ public abstract class GridCacheEventAbstractTest extends GridCacheAbstractSelfTe String key = e.getKey(); Integer val = e.getValue(); - GridCacheTx tx = cache.txStart(); + IgniteTx tx = cache.txStart(); assert cache.putAsync(key, val).get() == null; @@ -443,7 +443,7 @@ public abstract class GridCacheEventAbstractTest extends GridCacheAbstractSelfTe String key = e.getKey(); Integer val = e.getValue(); - GridCacheTx tx = cache.txStart(); + IgniteTx tx = cache.txStart(); assert cache.putx(key, val); @@ -473,7 +473,7 @@ public abstract class GridCacheEventAbstractTest extends GridCacheAbstractSelfTe String key = e.getKey(); Integer val = e.getValue(); - GridCacheTx tx = cache.txStart(); + IgniteTx tx = cache.txStart(); assert cache.putx(key, val); @@ -533,7 +533,7 @@ public abstract class GridCacheEventAbstractTest extends GridCacheAbstractSelfTe @Override public void run(GridCache<String, Integer> cache) throws IgniteCheckedException { Iterator<Map.Entry<String, Integer>> iter = pairs(2).entrySet().iterator(); - GridCacheTx tx = cache.txStart(); + IgniteTx tx = cache.txStart(); Map.Entry<String, Integer> e = iter.next(); @@ -606,7 +606,7 @@ public abstract class GridCacheEventAbstractTest extends GridCacheAbstractSelfTe Iterator<Map.Entry<String, Integer>> iter = pairs(2).entrySet().iterator(); // Optimistic transaction. - GridCacheTx tx = cache.txStart(OPTIMISTIC, REPEATABLE_READ); + IgniteTx tx = cache.txStart(OPTIMISTIC, REPEATABLE_READ); Map.Entry<String, Integer> e = iter.next(); @@ -683,7 +683,7 @@ public abstract class GridCacheEventAbstractTest extends GridCacheAbstractSelfTe Integer val = e.getValue(); // Optimistic. - GridCacheTx tx = cache.txStart(); + IgniteTx tx = cache.txStart(); assert !cache.putx(key, val, hasPeekVal); assert cache.putx(key, val, noPeekVal); @@ -718,7 +718,7 @@ public abstract class GridCacheEventAbstractTest extends GridCacheAbstractSelfTe String key = e.getKey(); Integer val = e.getValue(); - GridCacheTx tx = cache.txStart(); + IgniteTx tx = cache.txStart(); assert !cache.putx(key, val, hasPeekVal); assert cache.putx(key, val, noPeekVal); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3008d2ad/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheMultithreadedFailoverAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheMultithreadedFailoverAbstractTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheMultithreadedFailoverAbstractTest.java index 1febbc4..c128bb3 100644 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheMultithreadedFailoverAbstractTest.java +++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheMultithreadedFailoverAbstractTest.java @@ -293,7 +293,7 @@ public class GridCacheMultithreadedFailoverAbstractTest extends GridCommonAbstra } } try { - GridCacheTx tx = atomicityMode() == TRANSACTIONAL ? cache.txStart() : null; + IgniteTx tx = atomicityMode() == TRANSACTIONAL ? cache.txStart() : null; try { cache.putAll(putMap); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3008d2ad/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheNodeFailureAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheNodeFailureAbstractTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheNodeFailureAbstractTest.java index 7d6af24..4238c15 100644 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheNodeFailureAbstractTest.java +++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheNodeFailureAbstractTest.java @@ -157,7 +157,7 @@ public abstract class GridCacheNodeFailureAbstractTest extends GridCommonAbstrac GridCache<Integer, String> cache = cache(idx); - GridCacheTx tx = cache.txStart(concurrency, isolation); + IgniteTx tx = cache.txStart(concurrency, isolation); try { cache.put(KEY, VALUE); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3008d2ad/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCachePartitionedReloadAllAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCachePartitionedReloadAllAbstractSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCachePartitionedReloadAllAbstractSelfTest.java index bd6cf00..82c0bae 100644 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCachePartitionedReloadAllAbstractSelfTest.java +++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCachePartitionedReloadAllAbstractSelfTest.java @@ -144,17 +144,17 @@ public abstract class GridCachePartitionedReloadAllAbstractSelfTest extends Grid c.apply(e.getKey(), e.getValue()); } - @Override public String load(GridCacheTx tx, Integer key) { + @Override public String load(IgniteTx tx, Integer key) { X.println("Loading on: " + caches.indexOf(g.<Integer, String>cache(null)) + " key=" + key); return map.get(key); } - @Override public void put(GridCacheTx tx, Integer key, @Nullable String val) { + @Override public void put(IgniteTx tx, Integer key, @Nullable String val) { fail("Should not be called within the test."); } - @Override public void remove(GridCacheTx tx, Integer key) { + @Override public void remove(IgniteTx tx, Integer key) { fail("Should not be called within the test."); } }; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3008d2ad/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheTxConsistencyRestartAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheTxConsistencyRestartAbstractSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheTxConsistencyRestartAbstractSelfTest.java deleted file mode 100644 index b7bea07..0000000 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheTxConsistencyRestartAbstractSelfTest.java +++ /dev/null @@ -1,194 +0,0 @@ -/* @java.file.header */ - -/* _________ _____ __________________ _____ - * __ ____/___________(_)______ /__ ____/______ ____(_)_______ - * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ - * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / - * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ - */ - -package org.gridgain.grid.kernal.processors.cache.distributed; - -import org.apache.ignite.*; -import org.apache.ignite.configuration.*; -import org.apache.ignite.transactions.*; -import org.gridgain.grid.cache.*; -import org.gridgain.grid.kernal.*; -import org.apache.ignite.spi.discovery.tcp.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; -import org.gridgain.testframework.junits.common.*; - -import java.util.*; -import java.util.concurrent.atomic.*; - -import static org.gridgain.grid.cache.GridCacheAtomicityMode.*; -import static org.gridgain.grid.cache.GridCachePreloadMode.*; -import static org.apache.ignite.transactions.GridCacheTxConcurrency.PESSIMISTIC; -import static org.apache.ignite.transactions.GridCacheTxIsolation.REPEATABLE_READ; -import static org.gridgain.grid.cache.GridCacheWriteSynchronizationMode.*; - -/** - * - */ -public abstract class GridCacheTxConsistencyRestartAbstractSelfTest extends GridCommonAbstractTest { - /** IP finder. */ - private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); - - /** Grid count. */ - private static final int GRID_CNT = 4; - - /** Key range. */ - private static final int RANGE = 100_000; - - /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration cfg = super.getConfiguration(gridName); - - TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); - - discoSpi.setIpFinder(IP_FINDER); - - cfg.setDiscoverySpi(discoSpi); - - cfg.setCacheConfiguration(cacheConfiguration(gridName)); - - return cfg; - } - - /** - * @param gridName Grid name. - * @return Cache configuration. - */ - public GridCacheConfiguration cacheConfiguration(String gridName) { - GridCacheConfiguration ccfg = new GridCacheConfiguration(); - - ccfg.setAtomicityMode(TRANSACTIONAL); - ccfg.setCacheMode(cacheMode()); - ccfg.setWriteSynchronizationMode(FULL_SYNC); - ccfg.setDistributionMode(partitionDistributionMode()); - ccfg.setPreloadMode(SYNC); - - if (cacheMode() == GridCacheMode.PARTITIONED) - ccfg.setBackups(1); - - return ccfg; - } - - /** - * @return Cache mode. - */ - protected abstract GridCacheMode cacheMode(); - - /** - * @return Partition distribution mode for PARTITIONED cache. - */ - protected abstract GridCacheDistributionMode partitionDistributionMode(); - - /** - * @throws Exception If failed. - */ - public void testTxConsistency() throws Exception { - startGridsMultiThreaded(GRID_CNT); - - IgniteDataLoader<Object, Object> ldr = grid(0).dataLoader(null); - - for (int i = 0; i < RANGE; i++) { - ldr.addData(i, 0); - - if (i > 0 && i % 1000 == 0) - info("Put keys: " + i); - } - - ldr.close(); - - final AtomicBoolean done = new AtomicBoolean(false); - - Thread restartThread = new Thread() { - @Override public void run() { - Random rnd = new Random(); - - while (!done.get()) { - try { - int idx = rnd.nextInt(GRID_CNT); - - stopGrid(idx); - - startGrid(idx); - } - catch (Exception e) { - e.printStackTrace(); - } - } - } - }; - - restartThread.start(); - - Random rnd = new Random(); - - // Make some iterations with 1-3 keys transactions. - for (int i = 0; i < 50_000; i++) { - int idx = i % GRID_CNT; - - if (i > 0 && i % 1000 == 0) - info("Running iteration: " + i); - - try { - GridKernal grid = (GridKernal)grid(idx); - - GridCache<Integer, Integer> cache = grid.cache(null); - - List<Integer> keys = new ArrayList<>(); - - int keyCnt = rnd.nextInt(3); - - for (int k = 0; k < keyCnt; k++) - keys.add(rnd.nextInt(RANGE)); - - Collections.sort(keys); - - try (GridCacheTx tx = cache.txStart(PESSIMISTIC, REPEATABLE_READ)) { - Map<Integer, Integer> map = cache.getAll(keys); - - for (Map.Entry<Integer, Integer> entry : map.entrySet()) { - assertNotNull("Null value received from cache [key=" + entry.getKey() + "]", entry.getValue()); - - cache.put(entry.getKey(), entry.getValue() + 1); - } - - tx.commit(); - } - } - catch (Exception e) { - info("Failed to update keys: " + e.getMessage()); - } - } - - done.set(true); - - restartThread.join(); - - for (int k = 0; k < RANGE; k++) { - Integer val = null; - - for (int i = 0; i < GRID_CNT; i++) { - GridEx grid = grid(i); - - GridCache<Integer, Integer> cache = grid.cache(null); - - if (cache.affinity().isPrimaryOrBackup(grid.localNode(), k)) { - if (val == null) { - val = cache.peek(k); - - assertNotNull("Failed to peek value for key: " + k, val); - } - else - assertEquals("Failed to find value in cache [primary=" + - cache.affinity().isPrimary(grid.localNode(), k) + ']', - val, cache.peek(k)); - } - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3008d2ad/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheTxOriginatingNodeFailureAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheTxOriginatingNodeFailureAbstractSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheTxOriginatingNodeFailureAbstractSelfTest.java deleted file mode 100644 index 4b15e83..0000000 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheTxOriginatingNodeFailureAbstractSelfTest.java +++ /dev/null @@ -1,294 +0,0 @@ -/* @java.file.header */ - -/* _________ _____ __________________ _____ - * __ ____/___________(_)______ /__ ____/______ ____(_)_______ - * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ - * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / - * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ - */ - -package org.gridgain.grid.kernal.processors.cache.distributed; - -import org.apache.ignite.*; -import org.apache.ignite.cluster.*; -import org.apache.ignite.configuration.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.resources.*; -import org.apache.ignite.spi.*; -import org.gridgain.grid.cache.*; -import org.gridgain.grid.kernal.*; -import org.gridgain.grid.kernal.managers.communication.*; -import org.gridgain.grid.kernal.processors.cache.*; -import org.apache.ignite.spi.communication.tcp.*; -import org.gridgain.grid.util.direct.*; -import org.gridgain.grid.util.typedef.*; -import org.gridgain.grid.util.lang.*; -import org.gridgain.testframework.*; - -import java.util.*; -import java.util.concurrent.*; - -import static org.apache.ignite.transactions.GridCacheTxConcurrency.*; - -/** - * Abstract test for originating node failure. - */ -public abstract class GridCacheTxOriginatingNodeFailureAbstractSelfTest extends GridCacheAbstractSelfTest { - /** */ - protected static final int GRID_CNT = 5; - - /** Ignore node ID. */ - private volatile UUID ignoreMsgNodeId; - - /** Ignore message class. */ - private Class<?> ignoreMsgCls; - - /** - * @throws Exception If failed. - */ - public void testManyKeysCommit() throws Exception { - Collection<Integer> keys = new ArrayList<>(200); - - for (int i = 0; i < 200; i++) - keys.add(i); - - testTxOriginatingNodeFails(keys, false); - } - - /** - * @throws Exception If failed. - */ - public void testManyKeysRollback() throws Exception { - Collection<Integer> keys = new ArrayList<>(200); - - for (int i = 0; i < 200; i++) - keys.add(i); - - testTxOriginatingNodeFails(keys, true); - } - - /** - * @return Index of node starting transaction. - */ - protected int originatingNode() { - return 0; - } - - /** - * Ignores messages to given node of given type. - * - * @param dstNodeId Destination node ID. - * @param msgCls Message type. - */ - protected void ignoreMessages(UUID dstNodeId, Class<?> msgCls) { - ignoreMsgNodeId = dstNodeId; - ignoreMsgCls = msgCls; - } - - /** - * Gets ignore message class to simulate partial prepare message. - * - * @return Ignore message class. - */ - protected abstract Class<?> ignoreMessageClass(); - - /** - * @param keys Keys to update. - * @param partial Flag indicating whether to simulate partial prepared state. - * @throws Exception If failed. - */ - protected void testTxOriginatingNodeFails(Collection<Integer> keys, final boolean partial) throws Exception { - assertFalse(keys.isEmpty()); - - final Collection<GridKernal> grids = new ArrayList<>(); - - ClusterNode txNode = grid(originatingNode()).localNode(); - - for (int i = 1; i < gridCount(); i++) - grids.add((GridKernal)grid(i)); - - final Map<Integer, String> map = new HashMap<>(); - - final String initVal = "initialValue"; - - for (Integer key : keys) { - grid(originatingNode()).cache(null).put(key, initVal); - - map.put(key, String.valueOf(key)); - } - - Map<Integer, Collection<ClusterNode>> nodeMap = new HashMap<>(); - - GridCacheAdapter<Integer, String> cache = ((GridKernal)grid(1)).internalCache(); - - info("Node being checked: " + grid(1).localNode().id()); - - for (Integer key : keys) { - Collection<ClusterNode> nodes = new ArrayList<>(); - - nodes.addAll(cache.affinity().mapKeyToPrimaryAndBackups(key)); - - nodes.remove(txNode); - - nodeMap.put(key, nodes); - } - - info("Starting tx [values=" + map + ", topVer=" + - ((GridKernal)grid(1)).context().discovery().topologyVersion() + ']'); - - if (partial) - ignoreMessages(grid(1).localNode().id(), ignoreMessageClass()); - - final Ignite txIgniteNode = G.ignite(txNode.id()); - - GridTestUtils.runAsync(new Callable<Object>() { - @Override public Object call() throws Exception { - GridCache<Integer, String> cache = txIgniteNode.cache(null); - - assertNotNull(cache); - - GridCacheTxProxyImpl tx = (GridCacheTxProxyImpl)cache.txStart(); - - GridCacheTxEx txEx = GridTestUtils.getFieldValue(tx, "tx"); - - cache.putAll(map); - - try { - txEx.prepareAsync().get(3, TimeUnit.SECONDS); - } - catch (IgniteFutureTimeoutException ignored) { - info("Failed to wait for prepare future completion: " + partial); - } - - return null; - } - }).get(); - - info("Stopping originating node " + txNode); - - G.stop(G.ignite(txNode.id()).name(), true); - - info("Stopped grid, waiting for transactions to complete."); - - boolean txFinished = GridTestUtils.waitForCondition(new GridAbsPredicate() { - @Override public boolean apply() { - for (GridKernal g : grids) { - GridCacheSharedContext<Object, Object> ctx = g.context().cache().context(); - - int txNum = ctx.tm().idMapSize(); - - if (txNum != 0) - return false; - } - - return true; - } - }, 10000); - - assertTrue(txFinished); - - info("Transactions finished."); - - for (Map.Entry<Integer, Collection<ClusterNode>> e : nodeMap.entrySet()) { - final Integer key = e.getKey(); - - final String val = map.get(key); - - assertFalse(e.getValue().isEmpty()); - - for (ClusterNode node : e.getValue()) { - compute(G.ignite(node.id()).cluster().forNode(node)).call(new Callable<Void>() { - /** */ - @IgniteInstanceResource - private Ignite ignite; - - @Override public Void call() throws Exception { - GridCache<Integer, String> cache = ignite.cache(null); - - assertNotNull(cache); - - assertEquals(partial ? initVal : val, cache.peek(key)); - - return null; - } - }); - } - } - - for (Map.Entry<Integer, String> e : map.entrySet()) { - for (Ignite g : G.allGrids()) { - UUID locNodeId = g.cluster().localNode().id(); - - assertEquals("Check failed for node: " + locNodeId, partial ? initVal : e.getValue(), - g.cache(null).get(e.getKey())); - } - } - } - - /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration cfg = super.getConfiguration(gridName); - - cfg.setCommunicationSpi(new TcpCommunicationSpi() { - @Override public void sendMessage(ClusterNode node, GridTcpCommunicationMessageAdapter msg) - throws IgniteSpiException { - if (!F.eq(ignoreMsgNodeId, node.id()) || !ignoredMessage((GridIoMessage)msg)) - super.sendMessage(node, msg); - } - }); - - cfg.getTransactionsConfiguration().setDefaultTxConcurrency(OPTIMISTIC); - - return cfg; - } - - /** {@inheritDoc} */ - @Override protected GridCacheConfiguration cacheConfiguration(String gridName) throws Exception { - GridCacheConfiguration cfg = super.cacheConfiguration(gridName); - - cfg.setStore(null); - - return cfg; - } - - /** {@inheritDoc} */ - @Override protected int gridCount() { - return GRID_CNT; - } - - /** {@inheritDoc} */ - @Override protected abstract GridCacheMode cacheMode(); - - /** {@inheritDoc} */ - @Override protected void beforeTestsStarted() throws Exception { - // No-op. - } - - /** {@inheritDoc} */ - @Override protected void beforeTest() throws Exception { - startGridsMultiThreaded(GRID_CNT); - } - - /** {@inheritDoc} */ - @Override protected void afterTestsStopped() throws Exception { - // No-op - } - - /** {@inheritDoc} */ - @Override protected void afterTest() throws Exception { - stopAllGrids(); - - ignoreMsgCls = null; - ignoreMsgNodeId = null; - } - - /** - * Checks if message should be ignored. - * - * @param msg Message. - * @return {@code True} if message should be ignored. - */ - private boolean ignoredMessage(GridIoMessage msg) { - return ignoreMsgCls != null && ignoreMsgCls.isAssignableFrom(msg.message().getClass()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3008d2ad/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheTxPessimisticOriginatingNodeFailureAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheTxPessimisticOriginatingNodeFailureAbstractSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheTxPessimisticOriginatingNodeFailureAbstractSelfTest.java deleted file mode 100644 index 4d257f6..0000000 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheTxPessimisticOriginatingNodeFailureAbstractSelfTest.java +++ /dev/null @@ -1,488 +0,0 @@ -/* @java.file.header */ - -/* _________ _____ __________________ _____ - * __ ____/___________(_)______ /__ ____/______ ____(_)_______ - * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ - * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / - * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ - */ - -package org.gridgain.grid.kernal.processors.cache.distributed; - -import org.apache.ignite.*; -import org.apache.ignite.cluster.*; -import org.apache.ignite.configuration.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.resources.*; -import org.apache.ignite.spi.*; -import org.apache.ignite.transactions.*; -import org.gridgain.grid.cache.*; -import org.gridgain.grid.kernal.*; -import org.gridgain.grid.kernal.managers.communication.*; -import org.gridgain.grid.kernal.processors.cache.*; -import org.gridgain.grid.kernal.processors.cache.distributed.near.*; -import org.apache.ignite.spi.communication.tcp.*; -import org.gridgain.grid.util.direct.*; -import org.gridgain.grid.util.typedef.*; -import org.gridgain.grid.util.lang.*; -import org.gridgain.testframework.*; - -import java.util.*; -import java.util.concurrent.*; - -import static org.apache.ignite.transactions.GridCacheTxConcurrency.*; - -/** - * Abstract test for originating node failure. - */ -public abstract class GridCacheTxPessimisticOriginatingNodeFailureAbstractSelfTest extends GridCacheAbstractSelfTest { - /** */ - protected static final int GRID_CNT = 5; - - /** Ignore node ID. */ - private volatile Collection<UUID> ignoreMsgNodeIds; - - /** Ignore message class. */ - private Collection<Class<?>> ignoreMsgCls; - - /** Failing node ID. */ - private UUID failingNodeId; - - /** - * @throws Exception If failed. - */ - public void testManyKeysCommit() throws Exception { - Collection<Integer> keys = new ArrayList<>(200); - - for (int i = 0; i < 200; i++) - keys.add(i); - - testTxOriginatingNodeFails(keys, false); - } - - /** - * @throws Exception If failed. - */ - public void testManyKeysRollback() throws Exception { - Collection<Integer> keys = new ArrayList<>(200); - - for (int i = 0; i < 200; i++) - keys.add(i); - - testTxOriginatingNodeFails(keys, true); - } - - /** - * @throws Exception If failed. - */ - public void testPrimaryNodeFailureCommit() throws Exception { - checkPrimaryNodeCrash(true); - } - - /** - * @throws Exception If failed. - */ - public void testPrimaryNodeFailureRollback() throws Exception { - checkPrimaryNodeCrash(false); - } - - /** - * @return Index of node starting transaction. - */ - protected int originatingNode() { - return 0; - } - - /** - * Ignores messages to given node of given type. - * - * @param dstNodeIds Destination node IDs. - * @param msgCls Message type. - */ - protected void ignoreMessages(Collection<Class<?>> msgCls, Collection<UUID> dstNodeIds) { - ignoreMsgNodeIds = dstNodeIds; - ignoreMsgCls = msgCls; - } - - /** - * Gets ignore message class to simulate partial prepare message. - * - * @return Ignore message class. - */ - protected abstract Collection<Class<?>> ignoreMessageClasses(); - - /** - * @param keys Keys to update. - * @param fullFailure Flag indicating whether to simulate rollback state. - * @throws Exception If failed. - */ - protected void testTxOriginatingNodeFails(Collection<Integer> keys, final boolean fullFailure) throws Exception { - assertFalse(keys.isEmpty()); - - final Collection<GridKernal> grids = new ArrayList<>(); - - ClusterNode txNode = grid(originatingNode()).localNode(); - - for (int i = 1; i < gridCount(); i++) - grids.add((GridKernal)grid(i)); - - failingNodeId = grid(0).localNode().id(); - - final Map<Integer, String> map = new HashMap<>(); - - final String initVal = "initialValue"; - - for (Integer key : keys) { - grid(originatingNode()).cache(null).put(key, initVal); - - map.put(key, String.valueOf(key)); - } - - Map<Integer, Collection<ClusterNode>> nodeMap = new HashMap<>(); - - GridCacheAdapter<Integer, String> cache = ((GridKernal)grid(1)).internalCache(); - - info("Node being checked: " + grid(1).localNode().id()); - - for (Integer key : keys) { - Collection<ClusterNode> nodes = new ArrayList<>(); - - nodes.addAll(cache.affinity().mapKeyToPrimaryAndBackups(key)); - - nodes.remove(txNode); - - nodeMap.put(key, nodes); - } - - info("Starting tx [values=" + map + ", topVer=" + - ((GridKernal)grid(1)).context().discovery().topologyVersion() + ']'); - - if (fullFailure) - ignoreMessages(ignoreMessageClasses(), allNodeIds()); - else - ignoreMessages(ignoreMessageClasses(), F.asList(grid(1).localNode().id())); - - final GridEx originatingNodeGrid = grid(originatingNode()); - - GridTestUtils.runAsync(new Callable<Void>() { - @Override public Void call() throws Exception { - GridCache<Integer, String> cache = originatingNodeGrid.cache(null); - - assertNotNull(cache); - - GridCacheTx tx = cache.txStart(); - - try { - cache.putAll(map); - - info("Before commitAsync"); - - IgniteFuture<GridCacheTx> fut = tx.commitAsync(); - - info("Got future for commitAsync()."); - - fut.get(3, TimeUnit.SECONDS); - } - catch (IgniteFutureTimeoutException ignored) { - info("Failed to wait for commit future completion [fullFailure=" + fullFailure + ']'); - } - - return null; - } - }).get(); - - info(">>> Stopping originating node " + txNode); - - G.stop(grid(originatingNode()).name(), true); - - ignoreMessages(Collections.<Class<?>>emptyList(), Collections.<UUID>emptyList()); - - info(">>> Stopped originating node: " + txNode.id()); - - boolean txFinished = GridTestUtils.waitForCondition(new GridAbsPredicate() { - @Override public boolean apply() { - for (GridKernal g : grids) { - GridCacheAdapter<?, ?> cache = g.internalCache(); - - GridCacheTxManager txMgr = cache.isNear() ? - ((GridNearCacheAdapter)cache).dht().context().tm() : - cache.context().tm(); - - int txNum = txMgr.idMapSize(); - - if (txNum != 0) - return false; - } - - return true; - } - }, 10000); - - assertTrue(txFinished); - - info("Transactions finished."); - - for (Map.Entry<Integer, Collection<ClusterNode>> e : nodeMap.entrySet()) { - final Integer key = e.getKey(); - - final String val = map.get(key); - - assertFalse(e.getValue().isEmpty()); - - for (ClusterNode node : e.getValue()) { - final UUID checkNodeId = node.id(); - - compute(G.ignite(checkNodeId).cluster().forNode(node)).call(new Callable<Void>() { - /** */ - @IgniteInstanceResource - private Ignite ignite; - - @Override public Void call() throws Exception { - GridCache<Integer, String> cache = ignite.cache(null); - - assertNotNull(cache); - - assertEquals("Failed to check entry value on node: " + checkNodeId, - fullFailure ? initVal : val, cache.peek(key)); - - return null; - } - }); - } - } - - for (Map.Entry<Integer, String> e : map.entrySet()) { - for (Ignite g : G.allGrids()) - assertEquals(fullFailure ? initVal : e.getValue(), g.cache(null).get(e.getKey())); - } - } - - /** - * Checks tx data consistency in case when primary node crashes. - * - * @param commmit Whether to commit or rollback a transaction. - * @throws Exception If failed. - */ - private void checkPrimaryNodeCrash(final boolean commmit) throws Exception { - Collection<Integer> keys = new ArrayList<>(20); - - for (int i = 0; i < 20; i++) - keys.add(i); - - final Collection<GridKernal> grids = new ArrayList<>(); - - ClusterNode primaryNode = grid(1).localNode(); - - for (int i = 0; i < gridCount(); i++) { - if (i != 1) - grids.add((GridKernal)grid(i)); - } - - failingNodeId = primaryNode.id(); - - final Map<Integer, String> map = new HashMap<>(); - - final String initVal = "initialValue"; - - for (Integer key : keys) { - grid(originatingNode()).cache(null).put(key, initVal); - - map.put(key, String.valueOf(key)); - } - - Map<Integer, Collection<ClusterNode>> nodeMap = new HashMap<>(); - - GridCache<Integer, String> cache = grid(0).cache(null); - - info("Failing node ID: " + grid(1).localNode().id()); - - for (Integer key : keys) { - Collection<ClusterNode> nodes = new ArrayList<>(); - - nodes.addAll(cache.affinity().mapKeyToPrimaryAndBackups(key)); - - nodes.remove(primaryNode); - - nodeMap.put(key, nodes); - } - - info("Starting tx [values=" + map + ", topVer=" + - ((GridKernal)grid(1)).context().discovery().topologyVersion() + ']'); - - assertNotNull(cache); - - try (GridCacheTx tx = cache.txStart()) { - cache.getAll(keys); - - // Should not send any messages. - cache.putAll(map); - - // Fail the node in the middle of transaction. - info(">>> Stopping primary node " + primaryNode); - - G.stop(G.ignite(primaryNode.id()).name(), true); - - info(">>> Stopped originating node, finishing transaction: " + primaryNode.id()); - - if (commmit) - tx.commit(); - else - tx.rollback(); - } - - boolean txFinished = GridTestUtils.waitForCondition(new GridAbsPredicate() { - @Override public boolean apply() { - for (GridKernal g : grids) { - GridCacheAdapter<?, ?> cache = g.internalCache(); - - GridCacheTxManager txMgr = cache.isNear() ? - ((GridNearCacheAdapter)cache).dht().context().tm() : - cache.context().tm(); - - int txNum = txMgr.idMapSize(); - - if (txNum != 0) - return false; - } - - return true; - } - }, 10000); - - assertTrue(txFinished); - - info("Transactions finished."); - - for (Map.Entry<Integer, Collection<ClusterNode>> e : nodeMap.entrySet()) { - final Integer key = e.getKey(); - - final String val = map.get(key); - - assertFalse(e.getValue().isEmpty()); - - for (ClusterNode node : e.getValue()) { - final UUID checkNodeId = node.id(); - - compute(G.ignite(checkNodeId).cluster().forNode(node)).call(new Callable<Void>() { - /** */ - @IgniteInstanceResource - private Ignite ignite; - - @Override public Void call() throws Exception { - GridCache<Integer, String> cache = ignite.cache(null); - - assertNotNull(cache); - - assertEquals("Failed to check entry value on node: " + checkNodeId, - !commmit ? initVal : val, cache.peek(key)); - - return null; - } - }); - } - } - - for (Map.Entry<Integer, String> e : map.entrySet()) { - for (Ignite g : G.allGrids()) - assertEquals(!commmit ? initVal : e.getValue(), g.cache(null).get(e.getKey())); - } - } - - /** - * @return All node IDs. - */ - private Collection<UUID> allNodeIds() { - Collection<UUID> nodeIds = new ArrayList<>(gridCount()); - - for (int i = 0; i < gridCount(); i++) - nodeIds.add(grid(i).localNode().id()); - - return nodeIds; - } - - /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration cfg = super.getConfiguration(gridName); - - cfg.setCommunicationSpi(new TcpCommunicationSpi() { - @Override public void sendMessage(ClusterNode node, GridTcpCommunicationMessageAdapter msg) - throws IgniteSpiException { - if (getSpiContext().localNode().id().equals(failingNodeId)) { - if (ignoredMessage((GridIoMessage)msg) && ignoreMsgNodeIds != null) { - for (UUID ignored : ignoreMsgNodeIds) { - if (node.id().equals(ignored)) - return; - } - } - } - - super.sendMessage(node, msg); - } - }); - - cfg.getTransactionsConfiguration().setDefaultTxConcurrency(PESSIMISTIC); - - return cfg; - } - - /** {@inheritDoc} */ - @Override protected GridCacheConfiguration cacheConfiguration(String gridName) throws Exception { - GridCacheConfiguration cfg = super.cacheConfiguration(gridName); - - cfg.setStore(null); - - return cfg; - } - - /** {@inheritDoc} */ - @Override protected int gridCount() { - return GRID_CNT; - } - - /** {@inheritDoc} */ - @Override protected abstract GridCacheMode cacheMode(); - - /** {@inheritDoc} */ - @Override protected void beforeTestsStarted() throws Exception { - // No-op. - } - - /** {@inheritDoc} */ - @Override protected void beforeTest() throws Exception { - startGridsMultiThreaded(GRID_CNT); - } - - /** {@inheritDoc} */ - @Override protected void afterTestsStopped() throws Exception { - // No-op - } - - /** {@inheritDoc} */ - @Override protected void afterTest() throws Exception { - stopAllGrids(); - - ignoreMsgCls = null; - ignoreMsgNodeIds = null; - } - - /** - * Checks if message should be ignored. - * - * @param msg Message. - * @return {@code True} if message should be ignored. - */ - private boolean ignoredMessage(GridIoMessage msg) { - Collection<Class<?>> ignoreClss = ignoreMsgCls; - - if (ignoreClss != null) { - for (Class<?> ignoreCls : ignoreClss) { - if (ignoreCls.isAssignableFrom(msg.message().getClass())) - return true; - } - - return false; - } - else - return false; - } -}