IGNITE-264 - Fixing tests WIP.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/ed5edc14 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/ed5edc14 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/ed5edc14 Branch: refs/heads/ignite-264 Commit: ed5edc1424afb0d7177dbe733a28a804ce091fb9 Parents: f4b5d2c Author: Alexey Goncharuk <agoncha...@gridgain.com> Authored: Tue Aug 11 19:28:55 2015 -0700 Committer: Alexey Goncharuk <agoncha...@gridgain.com> Committed: Tue Aug 11 19:28:55 2015 -0700 ---------------------------------------------------------------------- .../near/GridNearTxFinishFuture.java | 67 +++++++++++--------- .../cache/transactions/IgniteTxHandler.java | 47 ++++++++------ .../dht/GridCacheTxNodeFailureSelfTest.java | 47 +++++++++++--- 3 files changed, 103 insertions(+), 58 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ed5edc14/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java index 568dc0b..94c5150 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java @@ -376,36 +376,45 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu add(mini); - GridDhtTxFinishRequest finishReq = new GridDhtTxFinishRequest( - cctx.localNodeId(), - futureId(), - mini.futureId(), - tx.topologyVersion(), - tx.xidVersion(), - tx.commitVersion(), - tx.threadId(), - tx.isolation(), - true, - false, - tx.system(), - tx.ioPolicy(), - false, - true, - true, - 0, - null, - 0); - - finishReq.checkCommitted(true); - - try { - cctx.io().send(backup, finishReq, tx.ioPolicy()); + if (backup.isLocal()) { + if (cctx.tm().txHandler().checkDhtRemoteTxCommitted(tx.xidVersion())) + mini.onDone(tx); + else + mini.onDone(new IgniteTxRollbackCheckedException("Failed to commit transaction " + + "(transaction has been rolled back on backup node): " + tx.xidVersion())); } - catch (ClusterTopologyCheckedException e) { - mini.onResult(e); - } - catch (IgniteCheckedException e) { - mini.onResult(e); + else { + GridDhtTxFinishRequest finishReq = new GridDhtTxFinishRequest( + cctx.localNodeId(), + futureId(), + mini.futureId(), + tx.topologyVersion(), + tx.xidVersion(), + tx.commitVersion(), + tx.threadId(), + tx.isolation(), + true, + false, + tx.system(), + tx.ioPolicy(), + false, + true, + true, + 0, + null, + 0); + + finishReq.checkCommitted(true); + + try { + cctx.io().send(backup, finishReq, tx.ioPolicy()); + } + catch (ClusterTopologyCheckedException e) { + mini.onResult(e); + } + catch (IgniteCheckedException e) { + mini.onResult(e); + } } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ed5edc14/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java index c0c8293..9e927a9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java @@ -834,6 +834,12 @@ public class IgniteTxHandler { if (log.isDebugEnabled()) log.debug("Processing dht tx finish request [nodeId=" + nodeId + ", req=" + req + ']'); + if (req.checkCommitted()) { + sendReply(nodeId, req, checkDhtRemoteTxCommitted(req.version())); + + return; + } + GridDhtTxRemote dhtTx = ctx.tm().tx(req.version()); GridNearTxRemote nearTx = ctx.tm().nearTx(req.version()); @@ -841,22 +847,7 @@ public class IgniteTxHandler { if (nearTx != null && nearTx.local()) nearTx = null; - if (req.checkCommitted()) { - assert dhtTx == null || dhtTx.onePhaseCommit() : "Invalid transaction: " + dhtTx; - - boolean committed = true; - - if (dhtTx == null) { - if (ctx.tm().addRolledbackTx(req.version())) - committed = false; - } - - sendReply(nodeId, req, committed); - - return; - } - else - finish(nodeId, dhtTx, req); + finish(nodeId, dhtTx, req); if (nearTx != null) finish(nodeId, nearTx, req); @@ -884,15 +875,33 @@ public class IgniteTxHandler { completeFut.listen(new CI1<IgniteInternalFuture<IgniteInternalTx>>() { @Override public void apply(IgniteInternalFuture<IgniteInternalTx> igniteTxIgniteFuture) { - sendReply(nodeId, req, req.commit()); + sendReply(nodeId, req, true); } }); } else - sendReply(nodeId, req, req.commit()); + sendReply(nodeId, req, true); } else - sendReply(nodeId, req, req.commit()); + sendReply(nodeId, req, true); + } + + /** + * Checks whether DHT remote transaction with given version has been committed. If not, will add version + * to rollback version set so that late response will not falsely commit this transaction. + * + * @param writeVer Write version to check. + * @return {@code True} if transaction has been committed, {@code false} otherwise. + */ + public boolean checkDhtRemoteTxCommitted(GridCacheVersion writeVer) { + assert writeVer != null; + + boolean committed = true; + + if (ctx.tm().addRolledbackTx(writeVer)) + committed = false; + + return committed; } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ed5edc14/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheTxNodeFailureSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheTxNodeFailureSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheTxNodeFailureSelfTest.java index 43e2348..773ec25 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheTxNodeFailureSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheTxNodeFailureSelfTest.java @@ -67,35 +67,56 @@ public class GridCacheTxNodeFailureSelfTest extends GridCommonAbstractTest { /** * @throws Exception If failed. */ - public void testPrimaryNodeFailureBackipCommitPessimistic() throws Exception { - checkPrimaryNodeFailureBackupCommit(PESSIMISTIC); + public void testPrimaryNodeFailureBackupCommitPessimistic() throws Exception { + checkPrimaryNodeFailureBackupCommit(PESSIMISTIC, false); } /** * @throws Exception If failed. */ public void testPrimaryNodeFailureBackupCommitOptimistic() throws Exception { - checkPrimaryNodeFailureBackupCommit(OPTIMISTIC); + checkPrimaryNodeFailureBackupCommit(OPTIMISTIC, false); } /** * @throws Exception If failed. */ - private void checkPrimaryNodeFailureBackupCommit(final TransactionConcurrency conc) throws Exception { + public void testPrimaryNodeFailureBackupCommitPessimisticOnBackup() throws Exception { + checkPrimaryNodeFailureBackupCommit(PESSIMISTIC, true); + } + + /** + * @throws Exception If failed. + */ + public void testPrimaryNodeFailureBackupCommitOptimisticOnBackup() throws Exception { + checkPrimaryNodeFailureBackupCommit(OPTIMISTIC, true); + } + + /** + * @throws Exception If failed. + */ + private void checkPrimaryNodeFailureBackupCommit(final TransactionConcurrency conc, boolean backup) throws Exception { startGrids(gridCount()); awaitPartitionMapExchange(); + for (int i = 0; i < gridCount(); i++) + info("Grid " + i + ": " + ignite(i).cluster().localNode().id()); + try { final Ignite ignite = ignite(0); final IgniteCache<Object, Object> cache = ignite.cache(null); - final int key = generateKey(ignite); + final int key = generateKey(ignite, backup); final CountDownLatch commitLatch = new CountDownLatch(1); - communication(2).bannedClasses(Collections.<Class>singletonList(GridDhtTxPrepareResponse.class)); - communication(3).bannedClasses(Collections.<Class>singletonList(GridDhtTxPrepareResponse.class)); + if (!backup) { + communication(2).bannedClasses(Collections.<Class>singletonList(GridDhtTxPrepareResponse.class)); + communication(3).bannedClasses(Collections.<Class>singletonList(GridDhtTxPrepareResponse.class)); + } + else + communication(0).bannedClasses(Collections.<Class>singletonList(GridDhtTxPrepareResponse.class)); IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() { @Override public Object call() throws Exception { @@ -140,12 +161,18 @@ public class GridCacheTxNodeFailureSelfTest extends GridCommonAbstractTest { * @return Generated key that is not primary nor backup for {@code ignite(0)} and primary for * {@code ignite(1)}. */ - private int generateKey(Ignite ignite) { + private int generateKey(Ignite ignite, boolean backup) { Affinity<Object> aff = ignite.affinity(null); for (int key = 0;;key++) { - if (aff.isPrimaryOrBackup(ignite(0).cluster().localNode(), key)) - continue; + if (backup) { + if (!aff.isBackup(ignite(0).cluster().localNode(), key)) + continue; + } + else { + if (aff.isPrimaryOrBackup(ignite(0).cluster().localNode(), key)) + continue; + } if (aff.isPrimary(ignite(1).cluster().localNode(), key)) return key;