Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-831 838bccb3e -> 877f1d958


# ignite-157 wait for 'preparing' transactions in 
'processCheckPreparedTxRequest'


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/7b5f91a2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/7b5f91a2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/7b5f91a2

Branch: refs/heads/ignite-831
Commit: 7b5f91a2d7723a7c4f4c87bbdc8b2b5772253651
Parents: 62d8053
Author: sboikov <sboi...@gridgain.com>
Authored: Fri Apr 24 15:08:04 2015 +0300
Committer: sboikov <sboi...@gridgain.com>
Committed: Fri Apr 24 15:43:52 2015 +0300

----------------------------------------------------------------------
 .../cache/transactions/IgniteTxManager.java     | 22 +++--
 ...xOriginatingNodeFailureAbstractSelfTest.java | 91 +++++++++++++++++++-
 ...itionedTxOriginatingNodeFailureSelfTest.java | 86 ++++++++++++++++--
 3 files changed, 184 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7b5f91a2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index d139afd..e4cf28b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -2054,7 +2054,7 @@ public class IgniteTxManager extends 
GridCacheSharedManagerAdapter {
                     log.debug("Processing node failed event [locNodeId=" + 
cctx.localNodeId() +
                         ", failedNodeId=" + evtNodeId + ']');
 
-                for (IgniteInternalTx tx : txs()) {
+                for (final IgniteInternalTx tx : txs()) {
                     if ((tx.near() && !tx.local()) || (tx.storeUsed() && 
tx.masterNodeIds().contains(evtNodeId))) {
                         // Invalidate transactions.
                         salvageTx(tx, false, RECOVERY_FINISH);
@@ -2062,12 +2062,24 @@ public class IgniteTxManager extends 
GridCacheSharedManagerAdapter {
                     else {
                         // Check prepare only if originating node ID failed. 
Otherwise parent node will finish this tx.
                         if (tx.originatingNodeId().equals(evtNodeId)) {
-                            if (tx.state() == PREPARED)
+                            if (tx.optimistic() && tx.state() == PREPARED)
                                 commitIfPrepared(tx);
                             else {
-                                if (tx.setRollbackOnly())
-                                    tx.rollbackAsync();
-                                // If we could not mark tx as rollback, it 
means that transaction is being committed.
+                                IgniteInternalFuture<IgniteInternalTx> prepFut 
= tx.currentPrepareFuture();
+
+                                if (prepFut != null) {
+                                    prepFut.listen(new 
CI1<IgniteInternalFuture<IgniteInternalTx>>() {
+                                        @Override public void 
apply(IgniteInternalFuture<IgniteInternalTx> fut) {
+                                            if (tx.setRollbackOnly())
+                                                tx.rollbackAsync();
+                                        }
+                                    });
+                                }
+                                else {
+                                    // If we could not mark tx as rollback, it 
means that transaction is being committed.
+                                    if (tx.setRollbackOnly())
+                                        tx.rollbackAsync();
+                                }
                             }
                         }
                     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7b5f91a2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxOriginatingNodeFailureAbstractSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxOriginatingNodeFailureAbstractSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxOriginatingNodeFailureAbstractSelfTest.java
index ea8c60b..865f1f6 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxOriginatingNodeFailureAbstractSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxOriginatingNodeFailureAbstractSelfTest.java
@@ -37,7 +37,9 @@ import org.apache.ignite.testframework.*;
 import java.util.*;
 import java.util.concurrent.*;
 
+import static org.apache.ignite.cache.CachePeekMode.*;
 import static org.apache.ignite.transactions.TransactionConcurrency.*;
+import static org.apache.ignite.transactions.TransactionIsolation.*;
 
 /**
  * Abstract test for originating node failure.
@@ -67,6 +69,18 @@ public abstract class 
IgniteTxOriginatingNodeFailureAbstractSelfTest extends Gri
     /**
      * @throws Exception If failed.
      */
+    public void testPessimisticManyKeysCommit() throws Exception {
+        Collection<Integer> keys = new ArrayList<>(200);
+
+        for (int i = 0; i < 200; i++)
+            keys.add(i);
+
+        testPessimisticTxOriginatingNodeFails(keys);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testManyKeysRollback() throws Exception {
         Collection<Integer> keys = new ArrayList<>(200);
 
@@ -103,6 +117,75 @@ public abstract class 
IgniteTxOriginatingNodeFailureAbstractSelfTest extends Gri
 
     /**
      * @param keys Keys to update.
+     * @throws Exception If failed.
+     */
+    protected void testPessimisticTxOriginatingNodeFails(Collection<Integer> 
keys) throws Exception {
+        final Map<Integer, String> map = new HashMap<>();
+
+        final String initVal = "initialValue";
+
+        for (Integer key : keys) {
+            grid(originatingNode()).cache(null).put(key, initVal);
+
+            map.put(key, String.valueOf(key));
+        }
+
+        ClusterNode txNode = grid(originatingNode()).localNode();
+
+        final Ignite txIgniteNode = G.ignite(txNode.id());
+
+        info("Starting pessimistic tx " +
+            "[values=" + map + ", topVer=" + 
(grid(1)).context().discovery().topologyVersion() + ']');
+
+        GridTestUtils.runAsync(new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                IgniteCache<Integer, String> cache = txIgniteNode.cache(null);
+
+                assertNotNull(cache);
+
+                TransactionProxyImpl tx =
+                    
(TransactionProxyImpl)txIgniteNode.transactions().txStart(PESSIMISTIC, 
REPEATABLE_READ);
+
+                IgniteInternalTx txEx = GridTestUtils.getFieldValue(tx, "tx");
+
+                assertTrue(txEx.pessimistic());
+
+                cache.putAll(map);
+
+                return null;
+            }
+        }).get();
+
+        info("Stopping originating node " + txNode);
+
+        G.stop(G.ignite(txNode.id()).name(), true);
+
+        info("Stopped grid, waiting for transactions to complete.");
+
+        boolean txFinished = GridTestUtils.waitForCondition(new 
GridAbsPredicate() {
+            @Override public boolean apply() {
+                for (Ignite ignite : G.allGrids()) {
+                    IgniteKernal g = (IgniteKernal)ignite;
+
+                    GridCacheSharedContext<Object, Object> ctx = 
g.context().cache().context();
+
+                    int txNum = ctx.tm().idMapSize();
+
+                    if (txNum != 0)
+                        return false;
+                }
+
+                return true;
+            }
+        }, 10_000);
+
+        assertTrue(txFinished);
+
+        info("Transactions finished.");
+    }
+
+    /**
+     * @param keys Keys to update.
      * @param partial Flag indicating whether to simulate partial prepared 
state.
      * @throws Exception If failed.
      */
@@ -140,8 +223,8 @@ public abstract class 
IgniteTxOriginatingNodeFailureAbstractSelfTest extends Gri
             nodeMap.put(key, nodes);
         }
 
-        info("Starting tx [values=" + map + ", topVer=" +
-            (grid(1)).context().discovery().topologyVersion() + ']');
+        info("Starting optimistic tx " +
+            "[values=" + map + ", topVer=" + 
(grid(1)).context().discovery().topologyVersion() + ']');
 
         if (partial)
             ignoreMessages(grid(1).localNode().id(), ignoreMessageClass());
@@ -158,6 +241,8 @@ public abstract class 
IgniteTxOriginatingNodeFailureAbstractSelfTest extends Gri
 
                 IgniteInternalTx txEx = GridTestUtils.getFieldValue(tx, "tx");
 
+                assertTrue(txEx.optimistic());
+
                 cache.putAll(map);
 
                 try {
@@ -214,7 +299,7 @@ public abstract class 
IgniteTxOriginatingNodeFailureAbstractSelfTest extends Gri
 
                         assertNotNull(cache);
 
-                        assertEquals(partial ? initVal : val, 
cache.localPeek(key, CachePeekMode.ONHEAP));
+                        assertEquals(partial ? initVal : val, 
cache.localPeek(key, ONHEAP));
 
                         return null;
                     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7b5f91a2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedTxOriginatingNodeFailureSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedTxOriginatingNodeFailureSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedTxOriginatingNodeFailureSelfTest.java
index d4e84a5..fc1c403 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedTxOriginatingNodeFailureSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedTxOriginatingNodeFailureSelfTest.java
@@ -20,13 +20,13 @@ package 
org.apache.ignite.internal.processors.cache.distributed.dht;
 import org.apache.ignite.cache.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.configuration.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.distributed.*;
 import org.apache.ignite.internal.processors.cache.distributed.near.*;
 
 import java.util.*;
 
+import static org.apache.ignite.cache.CacheMode.*;
+
 /**
  * Tests transaction consistency when originating node fails.
  */
@@ -37,7 +37,7 @@ public class 
GridCachePartitionedTxOriginatingNodeFailureSelfTest extends
 
     /** {@inheritDoc} */
     @Override protected CacheMode cacheMode() {
-        return CacheMode.PARTITIONED;
+        return PARTITIONED;
     }
 
     /** {@inheritDoc} */
@@ -58,6 +58,21 @@ public class 
GridCachePartitionedTxOriginatingNodeFailureSelfTest extends
      * @throws Exception If failed.
      */
     public void testTxFromPrimary() throws Exception {
+        txFromPrimary(true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPessimisticTxFromPrimary() throws Exception {
+        txFromPrimary(false);
+    }
+
+    /**
+     * @param optimistic If {@code true} tests optimistic transaction.
+     * @throws Exception If failed.
+     */
+    private void txFromPrimary(boolean optimistic) throws Exception {
         ClusterNode txNode = grid(originatingNode()).localNode();
 
         Integer key = null;
@@ -72,13 +87,31 @@ public class 
GridCachePartitionedTxOriginatingNodeFailureSelfTest extends
 
         assertNotNull(key);
 
-        testTxOriginatingNodeFails(Collections.singleton(key), false);
+        if (optimistic)
+            testTxOriginatingNodeFails(Collections.singleton(key), false);
+        else
+            testPessimisticTxOriginatingNodeFails(Collections.singleton(key));
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testTxFromBackup() throws Exception {
+        txFromBackup(true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPessimisticTxFromBackup() throws Exception {
+        txFromBackup(false);
+    }
+
+    /**
+     * @param optimistic If {@code true} tests optimistic transaction.
+     * @throws Exception If failed.
+     */
+    private void txFromBackup(boolean optimistic) throws Exception {
         ClusterNode txNode = grid(originatingNode()).localNode();
 
         Integer key = null;
@@ -93,13 +126,31 @@ public class 
GridCachePartitionedTxOriginatingNodeFailureSelfTest extends
 
         assertNotNull(key);
 
-        testTxOriginatingNodeFails(Collections.singleton(key), false);
+        if (optimistic)
+            testTxOriginatingNodeFails(Collections.singleton(key), false);
+        else
+            testPessimisticTxOriginatingNodeFails(Collections.singleton(key));
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testTxFromNotColocated() throws Exception {
+        txFromNotColocated(true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPessimisticTxFromNotColocated() throws Exception {
+        txFromNotColocated(false);
+    }
+
+    /**
+     * @param optimistic If {@code true} tests optimistic transaction.
+     * @throws Exception If failed.
+     */
+    private void txFromNotColocated(boolean optimistic) throws Exception {
         ClusterNode txNode = grid(originatingNode()).localNode();
 
         Integer key = null;
@@ -115,13 +166,31 @@ public class 
GridCachePartitionedTxOriginatingNodeFailureSelfTest extends
 
         assertNotNull(key);
 
-        testTxOriginatingNodeFails(Collections.singleton(key), false);
+        if (optimistic)
+            testTxOriginatingNodeFails(Collections.singleton(key), false);
+        else
+            testPessimisticTxOriginatingNodeFails(Collections.singleton(key));
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testTxAllNodes() throws Exception {
+        txAllNodes(true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPessimisticTxAllNodes() throws Exception {
+        txAllNodes(false);
+    }
+
+    /**
+     * @param optimistic If {@code true} tests optimistic transaction.
+     * @throws Exception If failed.
+     */
+    private void txAllNodes(boolean optimistic) throws Exception {
         List<ClusterNode> allNodes = new ArrayList<>(GRID_CNT);
 
         for (int i = 0; i < GRID_CNT; i++)
@@ -145,6 +214,9 @@ public class 
GridCachePartitionedTxOriginatingNodeFailureSelfTest extends
 
         assertEquals(GRID_CNT, keys.size());
 
-        testTxOriginatingNodeFails(keys, false);
+        if (optimistic)
+            testTxOriginatingNodeFails(keys, false);
+        else
+            testPessimisticTxOriginatingNodeFails(keys);
     }
 }

Reply via email to