IGNITE-264 - Fixing tests WIP.

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/ed5edc14
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/ed5edc14
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/ed5edc14

Branch: refs/heads/ignite-264
Commit: ed5edc1424afb0d7177dbe733a28a804ce091fb9
Parents: f4b5d2c
Author: Alexey Goncharuk <agoncha...@gridgain.com>
Authored: Tue Aug 11 19:28:55 2015 -0700
Committer: Alexey Goncharuk <agoncha...@gridgain.com>
Committed: Tue Aug 11 19:28:55 2015 -0700

----------------------------------------------------------------------
 .../near/GridNearTxFinishFuture.java            | 67 +++++++++++---------
 .../cache/transactions/IgniteTxHandler.java     | 47 ++++++++------
 .../dht/GridCacheTxNodeFailureSelfTest.java     | 47 +++++++++++---
 3 files changed, 103 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ed5edc14/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
index 568dc0b..94c5150 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
@@ -376,36 +376,45 @@ public final class GridNearTxFinishFuture<K, V> extends 
GridCompoundIdentityFutu
 
                 add(mini);
 
-                GridDhtTxFinishRequest finishReq = new GridDhtTxFinishRequest(
-                    cctx.localNodeId(),
-                    futureId(),
-                    mini.futureId(),
-                    tx.topologyVersion(),
-                    tx.xidVersion(),
-                    tx.commitVersion(),
-                    tx.threadId(),
-                    tx.isolation(),
-                    true,
-                    false,
-                    tx.system(),
-                    tx.ioPolicy(),
-                    false,
-                    true,
-                    true,
-                    0,
-                    null,
-                    0);
-
-                finishReq.checkCommitted(true);
-
-                try {
-                    cctx.io().send(backup, finishReq, tx.ioPolicy());
+                if (backup.isLocal()) {
+                    if 
(cctx.tm().txHandler().checkDhtRemoteTxCommitted(tx.xidVersion()))
+                        mini.onDone(tx);
+                    else
+                        mini.onDone(new 
IgniteTxRollbackCheckedException("Failed to commit transaction " +
+                        "(transaction has been rolled back on backup node): " 
+ tx.xidVersion()));
                 }
-                catch (ClusterTopologyCheckedException e) {
-                    mini.onResult(e);
-                }
-                catch (IgniteCheckedException e) {
-                    mini.onResult(e);
+                else {
+                    GridDhtTxFinishRequest finishReq = new 
GridDhtTxFinishRequest(
+                        cctx.localNodeId(),
+                        futureId(),
+                        mini.futureId(),
+                        tx.topologyVersion(),
+                        tx.xidVersion(),
+                        tx.commitVersion(),
+                        tx.threadId(),
+                        tx.isolation(),
+                        true,
+                        false,
+                        tx.system(),
+                        tx.ioPolicy(),
+                        false,
+                        true,
+                        true,
+                        0,
+                        null,
+                        0);
+
+                    finishReq.checkCommitted(true);
+
+                    try {
+                        cctx.io().send(backup, finishReq, tx.ioPolicy());
+                    }
+                    catch (ClusterTopologyCheckedException e) {
+                        mini.onResult(e);
+                    }
+                    catch (IgniteCheckedException e) {
+                        mini.onResult(e);
+                    }
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ed5edc14/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index c0c8293..9e927a9 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -834,6 +834,12 @@ public class IgniteTxHandler {
         if (log.isDebugEnabled())
             log.debug("Processing dht tx finish request [nodeId=" + nodeId + 
", req=" + req + ']');
 
+        if (req.checkCommitted()) {
+            sendReply(nodeId, req, checkDhtRemoteTxCommitted(req.version()));
+
+            return;
+        }
+
         GridDhtTxRemote dhtTx = ctx.tm().tx(req.version());
         GridNearTxRemote nearTx = ctx.tm().nearTx(req.version());
 
@@ -841,22 +847,7 @@ public class IgniteTxHandler {
         if (nearTx != null && nearTx.local())
             nearTx = null;
 
-        if (req.checkCommitted()) {
-            assert dhtTx == null || dhtTx.onePhaseCommit() : "Invalid 
transaction: " + dhtTx;
-
-            boolean committed = true;
-
-            if (dhtTx == null) {
-                if (ctx.tm().addRolledbackTx(req.version()))
-                    committed = false;
-            }
-
-            sendReply(nodeId, req, committed);
-
-            return;
-        }
-        else
-            finish(nodeId, dhtTx, req);
+        finish(nodeId, dhtTx, req);
 
         if (nearTx != null)
             finish(nodeId, nearTx, req);
@@ -884,15 +875,33 @@ public class IgniteTxHandler {
                 completeFut.listen(new 
CI1<IgniteInternalFuture<IgniteInternalTx>>() {
                     @Override
                     public void apply(IgniteInternalFuture<IgniteInternalTx> 
igniteTxIgniteFuture) {
-                        sendReply(nodeId, req, req.commit());
+                        sendReply(nodeId, req, true);
                     }
                 });
             }
             else
-                sendReply(nodeId, req, req.commit());
+                sendReply(nodeId, req, true);
         }
         else
-            sendReply(nodeId, req, req.commit());
+            sendReply(nodeId, req, true);
+    }
+
+    /**
+     * Checks whether DHT remote transaction with given version has been 
committed. If not, will add version
+     * to rollback version set so that late response will not falsely commit 
this transaction.
+     *
+     * @param writeVer Write version to check.
+     * @return {@code True} if transaction has been committed, {@code false} 
otherwise.
+     */
+    public boolean checkDhtRemoteTxCommitted(GridCacheVersion writeVer) {
+        assert writeVer != null;
+
+        boolean committed = true;
+
+        if (ctx.tm().addRolledbackTx(writeVer))
+            committed = false;
+
+        return committed;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ed5edc14/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheTxNodeFailureSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheTxNodeFailureSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheTxNodeFailureSelfTest.java
index 43e2348..773ec25 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheTxNodeFailureSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheTxNodeFailureSelfTest.java
@@ -67,35 +67,56 @@ public class GridCacheTxNodeFailureSelfTest extends 
GridCommonAbstractTest {
     /**
      * @throws Exception If failed.
      */
-    public void testPrimaryNodeFailureBackipCommitPessimistic() throws 
Exception {
-        checkPrimaryNodeFailureBackupCommit(PESSIMISTIC);
+    public void testPrimaryNodeFailureBackupCommitPessimistic() throws 
Exception {
+        checkPrimaryNodeFailureBackupCommit(PESSIMISTIC, false);
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testPrimaryNodeFailureBackupCommitOptimistic() throws 
Exception {
-        checkPrimaryNodeFailureBackupCommit(OPTIMISTIC);
+        checkPrimaryNodeFailureBackupCommit(OPTIMISTIC, false);
     }
 
     /**
      * @throws Exception If failed.
      */
-    private void checkPrimaryNodeFailureBackupCommit(final 
TransactionConcurrency conc) throws Exception {
+    public void testPrimaryNodeFailureBackupCommitPessimisticOnBackup() throws 
Exception {
+        checkPrimaryNodeFailureBackupCommit(PESSIMISTIC, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPrimaryNodeFailureBackupCommitOptimisticOnBackup() throws 
Exception {
+        checkPrimaryNodeFailureBackupCommit(OPTIMISTIC, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void checkPrimaryNodeFailureBackupCommit(final 
TransactionConcurrency conc, boolean backup) throws Exception {
         startGrids(gridCount());
         awaitPartitionMapExchange();
 
+        for (int i = 0; i < gridCount(); i++)
+            info("Grid " + i + ": " + ignite(i).cluster().localNode().id());
+
         try {
             final Ignite ignite = ignite(0);
 
             final IgniteCache<Object, Object> cache = ignite.cache(null);
 
-            final int key = generateKey(ignite);
+            final int key = generateKey(ignite, backup);
 
             final CountDownLatch commitLatch = new CountDownLatch(1);
 
-            
communication(2).bannedClasses(Collections.<Class>singletonList(GridDhtTxPrepareResponse.class));
-            
communication(3).bannedClasses(Collections.<Class>singletonList(GridDhtTxPrepareResponse.class));
+            if (!backup) {
+                
communication(2).bannedClasses(Collections.<Class>singletonList(GridDhtTxPrepareResponse.class));
+                
communication(3).bannedClasses(Collections.<Class>singletonList(GridDhtTxPrepareResponse.class));
+            }
+            else
+                
communication(0).bannedClasses(Collections.<Class>singletonList(GridDhtTxPrepareResponse.class));
 
             IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new 
Callable<Object>() {
                 @Override public Object call() throws Exception {
@@ -140,12 +161,18 @@ public class GridCacheTxNodeFailureSelfTest extends 
GridCommonAbstractTest {
      * @return Generated key that is not primary nor backup for {@code 
ignite(0)} and primary for
      *      {@code ignite(1)}.
      */
-    private int generateKey(Ignite ignite) {
+    private int generateKey(Ignite ignite, boolean backup) {
         Affinity<Object> aff = ignite.affinity(null);
 
         for (int key = 0;;key++) {
-            if (aff.isPrimaryOrBackup(ignite(0).cluster().localNode(), key))
-                continue;
+            if (backup) {
+                if (!aff.isBackup(ignite(0).cluster().localNode(), key))
+                    continue;
+            }
+            else {
+                if (aff.isPrimaryOrBackup(ignite(0).cluster().localNode(), 
key))
+                    continue;
+            }
 
             if (aff.isPrimary(ignite(1).cluster().localNode(), key))
                 return key;

Reply via email to