# ignite-157 wait for 'preparing' transactions in 
'processCheckPreparedTxRequest'


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/ceab5f3d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/ceab5f3d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/ceab5f3d

Branch: refs/heads/ignite-286
Commit: ceab5f3d30e0d4d65631aed72dbbfe1bd56082bf
Parents: 10c2483
Author: sboikov <sboi...@gridgain.com>
Authored: Fri Apr 24 12:00:42 2015 +0300
Committer: sboikov <sboi...@gridgain.com>
Committed: Fri Apr 24 14:02:36 2015 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheMvccManager.java  |  2 +-
 ...ridCacheOptimisticCheckPreparedTxFuture.java | 59 +++++++++++-
 .../cache/distributed/dht/GridDhtTxLocal.java   |  8 +-
 .../cache/distributed/near/GridNearTxLocal.java |  6 ++
 .../cache/transactions/IgniteInternalTx.java    |  5 ++
 .../cache/transactions/IgniteTxAdapter.java     | 10 +++
 .../cache/transactions/IgniteTxHandler.java     | 48 +++++++++-
 .../cache/transactions/IgniteTxManager.java     | 95 +++++++++++++++++---
 ...xOriginatingNodeFailureAbstractSelfTest.java |  2 +-
 .../testsuites/IgniteCacheRestartTestSuite.java |  4 +-
 10 files changed, 215 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ceab5f3d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
index a569e56..0bb97a9 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
@@ -295,7 +295,7 @@ public class GridCacheMvccManager extends 
GridCacheSharedManagerAdapter {
      * Cancels all client futures.
      */
     public void cancelClientFutures() {
-        IgniteException e = new IgniteException("Operation has been cancelled 
(grid is stopping).");
+        IgniteCheckedException e = new IgniteCheckedException("Operation has 
been cancelled (grid is stopping).");
 
         for (Collection<GridCacheFuture<?>> futures : futs.values()) {
             for (GridCacheFuture<?> future : futures)

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ceab5f3d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxFuture.java
index 6d3007b..8a14b48 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxFuture.java
@@ -26,6 +26,7 @@ import 
org.apache.ignite.internal.processors.cache.transactions.*;
 import org.apache.ignite.internal.processors.cache.version.*;
 import org.apache.ignite.internal.util.*;
 import org.apache.ignite.internal.util.future.*;
+import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.lang.*;
 import org.jetbrains.annotations.*;
@@ -123,14 +124,64 @@ public class GridCacheOptimisticCheckPreparedTxFuture<K, 
V> extends GridCompound
         // First check transactions on local node.
         int locTxNum = nodeTransactions(cctx.localNodeId());
 
-        if (locTxNum > 1 && 
!cctx.tm().txsPreparedOrCommitted(tx.nearXidVersion(), locTxNum)) {
-            onDone(false);
+        if (locTxNum > 1) {
+            IgniteInternalFuture<Boolean> fut = 
cctx.tm().txsPreparedOrCommitted(tx.nearXidVersion(), locTxNum);
 
-            markInitialized();
+            if (fut == null || fut.isDone()) {
+                boolean prepared;
 
-            return;
+                try {
+                    prepared = fut == null ? true : fut.get();
+                }
+                catch (IgniteCheckedException e) {
+                    U.error(log, "Check prepared transaction future failed: " 
+ e, e);
+
+                    prepared = false;
+                }
+
+                if (!prepared) {
+                    onDone(false);
+
+                    markInitialized();
+
+                    return;
+                }
+            }
+            else {
+                fut.listen(new CI1<IgniteInternalFuture<Boolean>>() {
+                    @Override public void apply(IgniteInternalFuture<Boolean> 
fut) {
+                        boolean prepared;
+
+                        try {
+                            prepared = fut.get();
+                        }
+                        catch (IgniteCheckedException e) {
+                            U.error(log, "Check prepared transaction future 
failed: " + e, e);
+
+                            prepared = false;
+                        }
+
+                        if (!prepared) {
+                            onDone(false);
+
+                            markInitialized();
+                        }
+                        else
+                            proceedPrepare();
+                    }
+                });
+
+                return;
+            }
         }
 
+        proceedPrepare();
+    }
+
+    /**
+     * Process prepare after local check.
+     */
+    private void proceedPrepare() {
         for (Map.Entry<UUID, Collection<UUID>> entry : txNodes.entrySet()) {
             UUID nodeId = entry.getKey();
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ceab5f3d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
index 34637a7..07ced0d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
@@ -603,7 +603,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter 
implements GridCacheMa
             }
         }
         else {
-            // prepFut.complete();
+            prepFut.complete();
 
             prepFut.listen(new CI1<IgniteInternalFuture<IgniteInternalTx>>() {
                 @Override public void 
apply(IgniteInternalFuture<IgniteInternalTx> f) {
@@ -685,6 +685,12 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter 
implements GridCacheMa
     }
 
     /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Nullable @Override public IgniteInternalFuture<IgniteInternalTx> 
currentPrepareFuture() {
+        return prepFut.get();
+    }
+
+    /** {@inheritDoc} */
     @Override public String toString() {
         return GridToStringBuilder.toString(GridDhtTxLocal.class, this, 
"super", super.toString());
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ceab5f3d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index 1dc459e..c665354 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -1232,6 +1232,12 @@ public class GridNearTxLocal extends 
GridDhtTxLocalAdapter {
     }
 
     /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Nullable @Override public IgniteInternalFuture<IgniteInternalTx> 
currentPrepareFuture() {
+        return prepFut.get();
+    }
+
+    /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(GridNearTxLocal.class, this, "mappings", 
mappings.keySet(), "super", super.toString());
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ceab5f3d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
index 8dcfcb1..30367c5 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
@@ -579,6 +579,11 @@ public interface IgniteInternalTx extends AutoCloseable, 
GridTimeoutObject {
     public IgniteInternalFuture<IgniteInternalTx> finishFuture();
 
     /**
+     * @return Future for transaction prepare if prepare is in progress.
+     */
+    @Nullable public IgniteInternalFuture<IgniteInternalTx> 
currentPrepareFuture();
+
+    /**
      * @param state Transaction state.
      * @return {@code True} if transition was valid, {@code false} otherwise.
      */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ceab5f3d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
index 0e38c43..1c02356 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
@@ -1007,6 +1007,11 @@ public abstract class IgniteTxAdapter extends 
GridMetadataAwareAdapter
         return fut;
     }
 
+    /** {@inheritDoc} */
+    @Nullable @Override public IgniteInternalFuture<IgniteInternalTx> 
currentPrepareFuture() {
+        return null;
+    }
+
     /**
      *
      * @param state State to set.
@@ -2022,6 +2027,11 @@ public abstract class IgniteTxAdapter extends 
GridMetadataAwareAdapter
         }
 
         /** {@inheritDoc} */
+        @Nullable @Override public IgniteInternalFuture<IgniteInternalTx> 
currentPrepareFuture() {
+            return null;
+        }
+
+        /** {@inheritDoc} */
         @Override public boolean state(TransactionState state) {
             return false;
         }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ceab5f3d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index 483f8df..b2abe49 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -1175,12 +1175,56 @@ public class IgniteTxHandler {
      * @param nodeId Node ID.
      * @param req Request.
      */
-    protected void processCheckPreparedTxRequest(UUID nodeId, 
GridCacheOptimisticCheckPreparedTxRequest req) {
+    protected void processCheckPreparedTxRequest(final UUID nodeId,
+        final GridCacheOptimisticCheckPreparedTxRequest req)
+    {
         if (log.isDebugEnabled())
             log.debug("Processing check prepared transaction requests 
[nodeId=" + nodeId + ", req=" + req + ']');
 
-        boolean prepared = 
ctx.tm().txsPreparedOrCommitted(req.nearXidVersion(), req.transactions());
+        IgniteInternalFuture<Boolean> fut = 
ctx.tm().txsPreparedOrCommitted(req.nearXidVersion(), req.transactions());
 
+        if (fut == null || fut.isDone()) {
+            boolean prepared;
+
+            try {
+                prepared = fut == null ? true : fut.get();
+            }
+            catch (IgniteCheckedException e) {
+                U.error(log, "Check prepared transaction future failed [req=" 
+ req + ']', e);
+
+                prepared = false;
+            }
+
+            sendCheckPreparedResponse(nodeId, req, prepared);
+        }
+        else {
+            fut.listen(new CI1<IgniteInternalFuture<Boolean>>() {
+                @Override public void apply(IgniteInternalFuture<Boolean> fut) 
{
+                    boolean prepared;
+
+                    try {
+                        prepared = fut.get();
+                    }
+                    catch (IgniteCheckedException e) {
+                        U.error(log, "Check prepared transaction future failed 
[req=" + req + ']', e);
+
+                        prepared = false;
+                    }
+
+                    sendCheckPreparedResponse(nodeId, req, prepared);
+                }
+            });
+        }
+    }
+
+    /**
+     * @param nodeId Node ID.
+     * @param req Request.
+     * @param prepared {@code True} if all transaction prepared or committed.
+     */
+    private void sendCheckPreparedResponse(UUID nodeId,
+        GridCacheOptimisticCheckPreparedTxRequest req,
+        boolean prepared) {
         GridCacheOptimisticCheckPreparedTxResponse res =
             new GridCacheOptimisticCheckPreparedTxResponse(req.version(), 
req.futureId(), req.miniId(), prepared);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ceab5f3d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index c05e559..d139afd 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -1763,18 +1763,64 @@ public class IgniteTxManager extends 
GridCacheSharedManagerAdapter {
      *
      * @param nearVer Near version ID.
      * @param txNum Number of transactions.
-     * @return {@code True} if transactions were prepared or committed.
+     * @return Future for flag indicating if transactions were prepared or 
committed or {@code null} for success future.
      */
-    public boolean txsPreparedOrCommitted(GridCacheVersion nearVer, int txNum) 
{
-        Collection<GridCacheVersion> processedVers = null;
+    @Nullable public IgniteInternalFuture<Boolean> 
txsPreparedOrCommitted(GridCacheVersion nearVer, int txNum) {
+        return txsPreparedOrCommitted(nearVer, txNum, null, null);
+    }
 
-        for (IgniteInternalTx tx : txs()) {
+    /**
+     * @param nearVer Near version ID.
+     * @param txNum Number of transactions.
+     * @param fut Result future.
+     * @param processedVers Processed versions.
+     * @return Future for flag indicating if transactions were prepared or 
committed or {@code null} for success future.
+     */
+    @Nullable private IgniteInternalFuture<Boolean> 
txsPreparedOrCommitted(final GridCacheVersion nearVer,
+        int txNum,
+        @Nullable GridFutureAdapter<Boolean> fut,
+        @Nullable Collection<GridCacheVersion> processedVers)
+    {
+        for (final IgniteInternalTx tx : txs()) {
             if (nearVer.equals(tx.nearXidVersion())) {
                 TransactionState state = tx.state();
 
+                IgniteInternalFuture<IgniteInternalTx> prepFut = 
tx.currentPrepareFuture();
+
+                if (prepFut != null && !prepFut.isDone()) {
+                    if (log.isDebugEnabled())
+                        log.debug("Transaction is preparing (will wait): " + 
tx);
+
+                    final GridFutureAdapter<Boolean> fut0 = fut != null ? fut 
: new GridFutureAdapter<Boolean>();
+
+                    final int txNum0 = txNum;
+
+                    final Collection<GridCacheVersion> processedVers0 = 
processedVers;
+
+                    prepFut.listen(new 
CI1<IgniteInternalFuture<IgniteInternalTx>>() {
+                        @Override public void 
apply(IgniteInternalFuture<IgniteInternalTx> prepFut) {
+                            if (log.isDebugEnabled())
+                                log.debug("Transaction prepare future 
finished: " + tx);
+
+                            IgniteInternalFuture<Boolean> fut = 
txsPreparedOrCommitted(nearVer,
+                                txNum0,
+                                fut0,
+                                processedVers0);
+
+                            assert fut == fut0;
+                        }
+                    });
+
+                    return fut0;
+                }
+
                 if (state == PREPARED || state == COMMITTING || state == 
COMMITTED) {
-                    if (--txNum == 0)
-                        return true;
+                    if (--txNum == 0) {
+                        if (fut != null)
+                            fut.onDone(true);
+
+                        return fut;
+                    }
                 }
                 else {
                     if (tx.state(MARKED_ROLLBACK) || tx.state() == UNKNOWN) {
@@ -1783,18 +1829,32 @@ public class IgniteTxManager extends 
GridCacheSharedManagerAdapter {
                         if (log.isDebugEnabled())
                             log.debug("Transaction was not prepared (rolled 
back): " + tx);
 
-                        return false;
+                        if (fut == null)
+                            fut = new GridFutureAdapter<>();
+
+                        fut.onDone(false);
+
+                        return fut;
                     }
                     else {
                         if (tx.state() == COMMITTED) {
-                            if (--txNum == 0)
-                                return true;
+                            if (--txNum == 0) {
+                                if (fut != null)
+                                    fut.onDone(true);
+
+                                return fut;
+                            }
                         }
                         else {
                             if (log.isDebugEnabled())
                                 log.debug("Transaction is not prepared: " + 
tx);
 
-                            return false;
+                            if (fut == null)
+                                fut = new GridFutureAdapter<>();
+
+                            fut.onDone(false);
+
+                            return fut;
                         }
                     }
                 }
@@ -1821,13 +1881,22 @@ public class IgniteTxManager extends 
GridCacheSharedManagerAdapter {
                 CommittedVersion commitVer = (CommittedVersion)ver;
 
                 if (commitVer.nearVer.equals(nearVer)) {
-                    if (--txNum == 0)
-                        return true;
+                    if (--txNum == 0) {
+                        if (fut != null)
+                            fut.onDone(true);
+
+                        return fut;
+                    }
                 }
             }
         }
 
-        return false;
+        if (fut == null)
+            fut = new GridFutureAdapter<>();
+
+        fut.onDone(false);
+
+        return fut;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ceab5f3d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxOriginatingNodeFailureAbstractSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxOriginatingNodeFailureAbstractSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxOriginatingNodeFailureAbstractSelfTest.java
index 3173622..ea8c60b 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxOriginatingNodeFailureAbstractSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxOriginatingNodeFailureAbstractSelfTest.java
@@ -141,7 +141,7 @@ public abstract class 
IgniteTxOriginatingNodeFailureAbstractSelfTest extends Gri
         }
 
         info("Starting tx [values=" + map + ", topVer=" +
-            ((IgniteKernal)grid(1)).context().discovery().topologyVersion() + 
']');
+            (grid(1)).context().discovery().topologyVersion() + ']');
 
         if (partial)
             ignoreMessages(grid(1).localNode().id(), ignoreMessageClass());

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ceab5f3d/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite.java
 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite.java
index 5682f67..0ced1c8 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite.java
@@ -46,8 +46,8 @@ public class IgniteCacheRestartTestSuite extends TestSuite {
         // 
suite.addTestSuite(IgniteCacheAtomicReplicatedNodeRestartSelfTest.class); // 
TODO IGNITE-747
 
         suite.addTestSuite(IgniteCacheAtomicPutAllFailoverSelfTest.class);
-        // suite.addTestSuite(GridCachePutAllFailoverSelfTest.class); TODO 
IGNITE-157
-        // suite.addTestSuite(IgniteCacheAtomicNodeRestartTest.class); TODO 
IGNITE-157
+        suite.addTestSuite(IgniteCachePutAllRestartTest.class);
+        suite.addTestSuite(GridCachePutAllFailoverSelfTest.class);
 
         return suite;
     }

Reply via email to