Repository: incubator-ignite Updated Branches: refs/heads/ignite-264 a733984d4 -> f9511aff9
IGNITE-264 - Fixing tests WIP. Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/f9511aff Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/f9511aff Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/f9511aff Branch: refs/heads/ignite-264 Commit: f9511aff95fd6fecff3da3bc70143d3e74e4aaaf Parents: a733984 Author: Alexey Goncharuk <agoncha...@gridgain.com> Authored: Thu Aug 13 18:28:00 2015 -0700 Committer: Alexey Goncharuk <agoncha...@gridgain.com> Committed: Thu Aug 13 18:28:00 2015 -0700 ---------------------------------------------------------------------- .../near/GridNearOptimisticTxPrepareFuture.java | 33 ++-- .../GridNearPessimisticTxPrepareFuture.java | 8 +- .../near/GridNearTxFinishFuture.java | 98 +++++------ .../cache/distributed/near/GridNearTxLocal.java | 43 +---- .../dht/GridCacheTxNodeFailureSelfTest.java | 165 ++++++++++++++++--- 5 files changed, 230 insertions(+), 117 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f9511aff/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java index 95e1847..28069b2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java @@ -118,8 +118,13 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd */ void onError(@Nullable UUID nodeId, @Nullable Iterable<GridDistributedTxMapping> mappings, Throwable e) { if (X.hasCause(e, ClusterTopologyCheckedException.class) || X.hasCause(e, ClusterTopologyException.class)) { - if (tx.onePhaseCommit()) + if (tx.onePhaseCommit()) { tx.markForBackupCheck(); + + onComplete(); + + return; + } } if (err.compareAndSet(null, e)) { @@ -189,17 +194,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd this.err.compareAndSet(null, err); - if (err == null) - tx.state(PREPARED); - - if (super.onDone(tx, err)) { - // Don't forget to clean up. - cctx.mvcc().removeFuture(this); - - return true; - } - - return false; + return onComplete(); } /** @@ -213,10 +208,20 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd /** * Completeness callback. */ - private void onComplete() { - if (super.onDone(tx, err.get())) + private boolean onComplete() { + Throwable err0 = err.get(); + + if (err0 == null || tx.needCheckBackup()) + tx.state(PREPARED); + + if (super.onDone(tx, err0)) { // Don't forget to clean up. cctx.mvcc().removeFuture(this); + + return true; + } + + return false; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f9511aff/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java index 6de46f4..6ac1033 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java @@ -242,7 +242,7 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA err = this.err.get(); - if (err == null) + if (err == null || tx.needCheckBackup()) tx.state(PREPARED); if (super.onDone(tx, err)) { @@ -320,9 +320,13 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA * @param e Error. */ void onNodeLeft(ClusterTopologyCheckedException e) { - if (tx.onePhaseCommit()) + if (tx.onePhaseCommit()) { tx.markForBackupCheck(); + // Do not fail future for one-phase transaction right away. + onDone(tx); + } + onError(e); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f9511aff/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java index 1e16982..95f5149 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java @@ -33,6 +33,7 @@ import org.apache.ignite.internal.util.tostring.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; +import org.apache.ignite.transactions.*; import org.jetbrains.annotations.*; import java.util.*; @@ -227,29 +228,46 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu } /** {@inheritDoc} */ - @Override public boolean onDone(IgniteInternalTx tx, Throwable err) { + @Override public boolean onDone(IgniteInternalTx tx0, Throwable err) { if ((initialized() || err != null)) { - if (this.tx.onePhaseCommit() && (this.tx.state() == COMMITTING)) { + if (tx.needCheckBackup()) { + assert tx.onePhaseCommit(); + + if (err != null) + err = new TransactionRollbackException("Failed to commit transaction.", err); + + try { + tx.finish(err == null); + } + catch (IgniteCheckedException e) { + if (err != null) + err.addSuppressed(e); + else + err = e; + } + } + + if (tx.onePhaseCommit()) { finishOnePhase(); - this.tx.tmFinish(err == null); + tx.tmFinish(err == null); } Throwable th = this.err.get(); - if (super.onDone(tx, th != null ? th : err)) { + if (super.onDone(tx0, th != null ? th : err)) { if (error() instanceof IgniteTxHeuristicCheckedException) { - AffinityTopologyVersion topVer = this.tx.topologyVersion(); + AffinityTopologyVersion topVer = tx.topologyVersion(); - for (IgniteTxEntry e : this.tx.writeMap().values()) { + for (IgniteTxEntry e : tx.writeMap().values()) { GridCacheContext cacheCtx = e.context(); try { if (e.op() != NOOP && !cacheCtx.affinity().localNode(e.key(), topVer)) { - GridCacheEntryEx Entry = cacheCtx.cache().peekEx(e.key()); + GridCacheEntryEx entry = cacheCtx.cache().peekEx(e.key()); - if (Entry != null) - Entry.invalidate(null, this.tx.xidVersion()); + if (entry != null) + entry.invalidate(null, tx.xidVersion()); } } catch (Throwable t) { @@ -297,13 +315,24 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu * Initializes future. */ void finish() { - if (tx.onePhaseCommit()) { - if (commit) { - if (tx.needCheckBackup()) - checkBackup(); - else if (needFinishOnePhase()) { + if (tx.needCheckBackup()) { + assert tx.onePhaseCommit(); + + checkBackup(); + + // If checkBackup is set, it means that primary node has crashed and we will not need to send + // finish request to it, so we can mark future as initialized. + markInitialized(); + } + + try { + if (tx.finish(commit) || (!commit && tx.state() == UNKNOWN)) { + if ((tx.onePhaseCommit() && needFinishOnePhase()) || (!tx.onePhaseCommit() && mappings != null)) finish(mappings.values()); + markInitialized(); + + if (!isSync()) { boolean complete = true; for (IgniteInternalFuture<?> f : pending()) @@ -315,40 +344,16 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu onComplete(); } } - - markInitialized(); - - return; + else + onError(new IgniteCheckedException("Failed to commit transaction: " + CU.txString(tx))); } + catch (Error | RuntimeException e) { + onError(e); - if (mappings != null) { - finish(mappings.values()); - - markInitialized(); - - if (!isSync()) { - boolean complete = true; - - for (IgniteInternalFuture<?> f : pending()) - // Mini-future in non-sync mode gets done when message gets sent. - if (isMini(f) && !f.isDone()) - complete = false; - - if (complete) - onComplete(); - } + throw e; } - else { - assert !commit; - - try { - tx.rollback(); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to rollback empty transaction: " + tx, e); - } - - markInitialized(); + catch (IgniteCheckedException e) { + onError(e); } } @@ -641,8 +646,9 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu void onResult(GridDhtTxFinishResponse res) { assert backup != null; - if (res.checkCommittedError() != null) + if (res.checkCommittedError() != null) { onDone(res.checkCommittedError()); + } else onDone(tx); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f9511aff/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java index c40ac5e..0421309 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java @@ -712,7 +712,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { cctx.mvcc().addFuture(fut); - IgniteInternalFuture<?> prepareFut = prepFut.get(); + final IgniteInternalFuture<?> prepareFut = prepFut.get(); prepareFut.listen(new CI1<IgniteInternalFuture<?>>() { @Override public void apply(IgniteInternalFuture<?> f) { @@ -720,24 +720,15 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { try { // Make sure that here are no exceptions. - if (!needCheckBackup()) { - f.get(); - - if (finish(true)) - fut0.finish(); - else - fut0.onError(new IgniteCheckedException("Failed to commit transaction: " + - CU.txString(GridNearTxLocal.this))); - } - else { - assert onePhaseCommit(); + prepareFut.get(); - fut0.finish(); - } + fut0.finish(); } catch (Error | RuntimeException e) { commitErr.compareAndSet(null, e); + fut0.onError(e); + throw e; } catch (IgniteCheckedException e) { @@ -779,15 +770,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { log.debug("Got optimistic tx failure [tx=" + this + ", err=" + e + ']'); } - try { - if (finish(false) || state() == UNKNOWN) - fut.finish(); - else - fut.onError(new IgniteCheckedException("Failed to gracefully rollback transaction: " + CU.txString(this))); - } - catch (IgniteCheckedException e) { - fut.onError(e); - } + fut.finish(); } else { prepFut.listen(new CI1<IgniteInternalFuture<?>>() { @@ -803,19 +786,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { GridNearTxFinishFuture fut0 = rollbackFut.get(); - try { - if (finish(false) || state() == UNKNOWN) - fut0.finish(); - else - fut0.onError(new IgniteCheckedException("Failed to gracefully rollback transaction: " + - CU.txString(GridNearTxLocal.this))); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to gracefully rollback transaction: " + - CU.txString(GridNearTxLocal.this), e); - - fut0.onError(e); - } + fut0.finish(); } }); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f9511aff/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheTxNodeFailureSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheTxNodeFailureSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheTxNodeFailureSelfTest.java index 773ec25..bca3b6f 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheTxNodeFailureSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheTxNodeFailureSelfTest.java @@ -23,6 +23,10 @@ import org.apache.ignite.cache.affinity.*; import org.apache.ignite.cluster.*; import org.apache.ignite.configuration.*; import org.apache.ignite.internal.*; +import org.apache.ignite.internal.managers.communication.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; import org.apache.ignite.plugin.extensions.communication.*; import org.apache.ignite.spi.*; import org.apache.ignite.spi.communication.tcp.*; @@ -30,6 +34,7 @@ import org.apache.ignite.testframework.*; import org.apache.ignite.testframework.junits.common.*; import org.apache.ignite.transactions.*; +import javax.cache.*; import java.util.*; import java.util.concurrent.*; @@ -68,34 +73,94 @@ public class GridCacheTxNodeFailureSelfTest extends GridCommonAbstractTest { * @throws Exception If failed. */ public void testPrimaryNodeFailureBackupCommitPessimistic() throws Exception { - checkPrimaryNodeFailureBackupCommit(PESSIMISTIC, false); + checkPrimaryNodeFailureBackupCommit(PESSIMISTIC, false, true); } /** * @throws Exception If failed. */ public void testPrimaryNodeFailureBackupCommitOptimistic() throws Exception { - checkPrimaryNodeFailureBackupCommit(OPTIMISTIC, false); + checkPrimaryNodeFailureBackupCommit(OPTIMISTIC, false, true); } /** * @throws Exception If failed. */ public void testPrimaryNodeFailureBackupCommitPessimisticOnBackup() throws Exception { - checkPrimaryNodeFailureBackupCommit(PESSIMISTIC, true); + checkPrimaryNodeFailureBackupCommit(PESSIMISTIC, true, true); } /** * @throws Exception If failed. */ public void testPrimaryNodeFailureBackupCommitOptimisticOnBackup() throws Exception { - checkPrimaryNodeFailureBackupCommit(OPTIMISTIC, true); + checkPrimaryNodeFailureBackupCommit(OPTIMISTIC, true, true); } /** * @throws Exception If failed. */ - private void checkPrimaryNodeFailureBackupCommit(final TransactionConcurrency conc, boolean backup) throws Exception { + public void testPrimaryNodeFailureBackupRollbackPessimistic() throws Exception { + checkPrimaryNodeFailureBackupCommit(PESSIMISTIC, false, false); + } + + /** + * @throws Exception If failed. + */ + public void testPrimaryNodeFailureBackupRollbackOptimistic() throws Exception { + checkPrimaryNodeFailureBackupCommit(OPTIMISTIC, false, false); + } + + /** + * @throws Exception If failed. + */ + public void testPrimaryNodeFailureBackupRollbackPessimisticOnBackup() throws Exception { + checkPrimaryNodeFailureBackupCommit(PESSIMISTIC, true, false); + } + + /** + * @throws Exception If failed. + */ + public void testPrimaryNodeFailureBackupRollbackOptimisticOnBackup() throws Exception { + checkPrimaryNodeFailureBackupCommit(OPTIMISTIC, true, false); + } + + /** + * @throws Exception If failed. + */ + public void testPrimaryNodeFailureBackupCommitImplicit() throws Exception { + checkPrimaryNodeFailureBackupCommit(null, false, true); + } + + /** + * @throws Exception If failed. + */ + public void testPrimaryNodeFailureBackupCommitImplicitOnBackup() throws Exception { + checkPrimaryNodeFailureBackupCommit(null, true, true); + } + + /** + * @throws Exception If failed. + */ + public void testPrimaryNodeFailureBackupRollbackImplicit() throws Exception { + checkPrimaryNodeFailureBackupCommit(null, false, false); + } + + /** + * @throws Exception If failed. + */ + public void testPrimaryNodeFailureBackupRollbackImplicitOnBackup() throws Exception { + checkPrimaryNodeFailureBackupCommit(null, true, false); + } + + /** + * @throws Exception If failed. + */ + private void checkPrimaryNodeFailureBackupCommit( + final TransactionConcurrency conc, + boolean backup, + final boolean commit + ) throws Exception { startGrids(gridCount()); awaitPartitionMapExchange(); @@ -111,25 +176,79 @@ public class GridCacheTxNodeFailureSelfTest extends GridCommonAbstractTest { final CountDownLatch commitLatch = new CountDownLatch(1); - if (!backup) { - communication(2).bannedClasses(Collections.<Class>singletonList(GridDhtTxPrepareResponse.class)); - communication(3).bannedClasses(Collections.<Class>singletonList(GridDhtTxPrepareResponse.class)); + if (!commit) { + communication(1).bannedClasses(Collections.<Class>singletonList(GridDhtTxPrepareRequest.class)); + } + else { + if (!backup) { + communication(2).bannedClasses(Collections.<Class>singletonList(GridDhtTxPrepareResponse.class)); + communication(3).bannedClasses(Collections.<Class>singletonList(GridDhtTxPrepareResponse.class)); + } + else + communication(0).bannedClasses(Collections.<Class>singletonList(GridDhtTxPrepareResponse.class)); } - else - communication(0).bannedClasses(Collections.<Class>singletonList(GridDhtTxPrepareResponse.class)); IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() { @Override public Object call() throws Exception { - try (Transaction tx = ignite.transactions().txStart(conc, REPEATABLE_READ)) { - cache.put(key, key); + if (conc != null) { + try (Transaction tx = ignite.transactions().txStart(conc, REPEATABLE_READ)) { + cache.put(key, key); + + Transaction asyncTx = (Transaction)tx.withAsync(); + + asyncTx.commit(); - Transaction asyncTx = (Transaction)tx.withAsync(); + commitLatch.countDown(); - asyncTx.commit(); + try { + IgniteFuture<Object> fut = asyncTx.future(); + + fut.get(); + + if (!commit) { + error("Transaction has been committed"); + + fail("Transaction has been committed: " + tx); + } + } + catch (TransactionRollbackException e) { + if (commit) { + error(e.getMessage(), e); + + fail("Failed to commit: " + e); + } + else + assertTrue(X.hasCause(e, TransactionRollbackException.class)); + } + } + } + else { + IgniteCache<Object, Object> cache0 = cache.withAsync(); + + cache0.put(key, key); + + Thread.sleep(1000); commitLatch.countDown(); - asyncTx.future().get(); + try { + cache0.future().get(); + + if (!commit) { + error("Transaction has been committed"); + + fail("Transaction has been committed."); + } + } + catch (CacheException e) { + if (commit) { + error(e.getMessage(), e); + + fail("Failed to commit: " + e); + } + else + assertTrue(X.hasCause(e, TransactionRollbackException.class)); + } } return null; @@ -140,8 +259,11 @@ public class GridCacheTxNodeFailureSelfTest extends GridCommonAbstractTest { stopGrid(1); - // No exception should happen since transaction is committed on the backup node. + // Check that thread successfully finished. fut.get(); + + // Check there are no hanging transactions. + assertEquals(0, ((IgniteEx)ignite).context().cache().context().tm().idMapSize()); } finally { stopAllGrids(); @@ -194,9 +316,14 @@ public class GridCacheTxNodeFailureSelfTest extends GridCommonAbstractTest { } /** {@inheritDoc} */ - @Override public void sendMessage(ClusterNode node, Message msg) throws IgniteSpiException { - if (!bannedClasses.contains(msg.getClass())) - super.sendMessage(node, msg); + @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackClosure) throws IgniteSpiException { + GridIoMessage ioMsg = (GridIoMessage)msg; + + if (!bannedClasses.contains(ioMsg.message().getClass())) { + super.sendMessage(node, msg, ackClosure); + + U.debug(">>> Sending message: " + msg); + } } } }