# ignite-157 wait for 'preparing' transactions in 'processCheckPreparedTxRequest'
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/ceab5f3d Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/ceab5f3d Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/ceab5f3d Branch: refs/heads/ignite-286 Commit: ceab5f3d30e0d4d65631aed72dbbfe1bd56082bf Parents: 10c2483 Author: sboikov <sboi...@gridgain.com> Authored: Fri Apr 24 12:00:42 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Fri Apr 24 14:02:36 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheMvccManager.java | 2 +- ...ridCacheOptimisticCheckPreparedTxFuture.java | 59 +++++++++++- .../cache/distributed/dht/GridDhtTxLocal.java | 8 +- .../cache/distributed/near/GridNearTxLocal.java | 6 ++ .../cache/transactions/IgniteInternalTx.java | 5 ++ .../cache/transactions/IgniteTxAdapter.java | 10 +++ .../cache/transactions/IgniteTxHandler.java | 48 +++++++++- .../cache/transactions/IgniteTxManager.java | 95 +++++++++++++++++--- ...xOriginatingNodeFailureAbstractSelfTest.java | 2 +- .../testsuites/IgniteCacheRestartTestSuite.java | 4 +- 10 files changed, 215 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ceab5f3d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java index a569e56..0bb97a9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java @@ -295,7 +295,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { * Cancels all client futures. */ public void cancelClientFutures() { - IgniteException e = new IgniteException("Operation has been cancelled (grid is stopping)."); + IgniteCheckedException e = new IgniteCheckedException("Operation has been cancelled (grid is stopping)."); for (Collection<GridCacheFuture<?>> futures : futs.values()) { for (GridCacheFuture<?> future : futures) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ceab5f3d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxFuture.java index 6d3007b..8a14b48 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxFuture.java @@ -26,6 +26,7 @@ import org.apache.ignite.internal.processors.cache.transactions.*; import org.apache.ignite.internal.processors.cache.version.*; import org.apache.ignite.internal.util.*; import org.apache.ignite.internal.util.future.*; +import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; import org.jetbrains.annotations.*; @@ -123,14 +124,64 @@ public class GridCacheOptimisticCheckPreparedTxFuture<K, V> extends GridCompound // First check transactions on local node. int locTxNum = nodeTransactions(cctx.localNodeId()); - if (locTxNum > 1 && !cctx.tm().txsPreparedOrCommitted(tx.nearXidVersion(), locTxNum)) { - onDone(false); + if (locTxNum > 1) { + IgniteInternalFuture<Boolean> fut = cctx.tm().txsPreparedOrCommitted(tx.nearXidVersion(), locTxNum); - markInitialized(); + if (fut == null || fut.isDone()) { + boolean prepared; - return; + try { + prepared = fut == null ? true : fut.get(); + } + catch (IgniteCheckedException e) { + U.error(log, "Check prepared transaction future failed: " + e, e); + + prepared = false; + } + + if (!prepared) { + onDone(false); + + markInitialized(); + + return; + } + } + else { + fut.listen(new CI1<IgniteInternalFuture<Boolean>>() { + @Override public void apply(IgniteInternalFuture<Boolean> fut) { + boolean prepared; + + try { + prepared = fut.get(); + } + catch (IgniteCheckedException e) { + U.error(log, "Check prepared transaction future failed: " + e, e); + + prepared = false; + } + + if (!prepared) { + onDone(false); + + markInitialized(); + } + else + proceedPrepare(); + } + }); + + return; + } } + proceedPrepare(); + } + + /** + * Process prepare after local check. + */ + private void proceedPrepare() { for (Map.Entry<UUID, Collection<UUID>> entry : txNodes.entrySet()) { UUID nodeId = entry.getKey(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ceab5f3d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java index 34637a7..07ced0d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java @@ -603,7 +603,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa } } else { - // prepFut.complete(); + prepFut.complete(); prepFut.listen(new CI1<IgniteInternalFuture<IgniteInternalTx>>() { @Override public void apply(IgniteInternalFuture<IgniteInternalTx> f) { @@ -685,6 +685,12 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa } /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Nullable @Override public IgniteInternalFuture<IgniteInternalTx> currentPrepareFuture() { + return prepFut.get(); + } + + /** {@inheritDoc} */ @Override public String toString() { return GridToStringBuilder.toString(GridDhtTxLocal.class, this, "super", super.toString()); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ceab5f3d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java index 1dc459e..c665354 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java @@ -1232,6 +1232,12 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { } /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Nullable @Override public IgniteInternalFuture<IgniteInternalTx> currentPrepareFuture() { + return prepFut.get(); + } + + /** {@inheritDoc} */ @Override public String toString() { return S.toString(GridNearTxLocal.class, this, "mappings", mappings.keySet(), "super", super.toString()); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ceab5f3d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java index 8dcfcb1..30367c5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java @@ -579,6 +579,11 @@ public interface IgniteInternalTx extends AutoCloseable, GridTimeoutObject { public IgniteInternalFuture<IgniteInternalTx> finishFuture(); /** + * @return Future for transaction prepare if prepare is in progress. + */ + @Nullable public IgniteInternalFuture<IgniteInternalTx> currentPrepareFuture(); + + /** * @param state Transaction state. * @return {@code True} if transition was valid, {@code false} otherwise. */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ceab5f3d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java index 0e38c43..1c02356 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java @@ -1007,6 +1007,11 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter return fut; } + /** {@inheritDoc} */ + @Nullable @Override public IgniteInternalFuture<IgniteInternalTx> currentPrepareFuture() { + return null; + } + /** * * @param state State to set. @@ -2022,6 +2027,11 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter } /** {@inheritDoc} */ + @Nullable @Override public IgniteInternalFuture<IgniteInternalTx> currentPrepareFuture() { + return null; + } + + /** {@inheritDoc} */ @Override public boolean state(TransactionState state) { return false; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ceab5f3d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java index 483f8df..b2abe49 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java @@ -1175,12 +1175,56 @@ public class IgniteTxHandler { * @param nodeId Node ID. * @param req Request. */ - protected void processCheckPreparedTxRequest(UUID nodeId, GridCacheOptimisticCheckPreparedTxRequest req) { + protected void processCheckPreparedTxRequest(final UUID nodeId, + final GridCacheOptimisticCheckPreparedTxRequest req) + { if (log.isDebugEnabled()) log.debug("Processing check prepared transaction requests [nodeId=" + nodeId + ", req=" + req + ']'); - boolean prepared = ctx.tm().txsPreparedOrCommitted(req.nearXidVersion(), req.transactions()); + IgniteInternalFuture<Boolean> fut = ctx.tm().txsPreparedOrCommitted(req.nearXidVersion(), req.transactions()); + if (fut == null || fut.isDone()) { + boolean prepared; + + try { + prepared = fut == null ? true : fut.get(); + } + catch (IgniteCheckedException e) { + U.error(log, "Check prepared transaction future failed [req=" + req + ']', e); + + prepared = false; + } + + sendCheckPreparedResponse(nodeId, req, prepared); + } + else { + fut.listen(new CI1<IgniteInternalFuture<Boolean>>() { + @Override public void apply(IgniteInternalFuture<Boolean> fut) { + boolean prepared; + + try { + prepared = fut.get(); + } + catch (IgniteCheckedException e) { + U.error(log, "Check prepared transaction future failed [req=" + req + ']', e); + + prepared = false; + } + + sendCheckPreparedResponse(nodeId, req, prepared); + } + }); + } + } + + /** + * @param nodeId Node ID. + * @param req Request. + * @param prepared {@code True} if all transaction prepared or committed. + */ + private void sendCheckPreparedResponse(UUID nodeId, + GridCacheOptimisticCheckPreparedTxRequest req, + boolean prepared) { GridCacheOptimisticCheckPreparedTxResponse res = new GridCacheOptimisticCheckPreparedTxResponse(req.version(), req.futureId(), req.miniId(), prepared); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ceab5f3d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java index c05e559..d139afd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java @@ -1763,18 +1763,64 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { * * @param nearVer Near version ID. * @param txNum Number of transactions. - * @return {@code True} if transactions were prepared or committed. + * @return Future for flag indicating if transactions were prepared or committed or {@code null} for success future. */ - public boolean txsPreparedOrCommitted(GridCacheVersion nearVer, int txNum) { - Collection<GridCacheVersion> processedVers = null; + @Nullable public IgniteInternalFuture<Boolean> txsPreparedOrCommitted(GridCacheVersion nearVer, int txNum) { + return txsPreparedOrCommitted(nearVer, txNum, null, null); + } - for (IgniteInternalTx tx : txs()) { + /** + * @param nearVer Near version ID. + * @param txNum Number of transactions. + * @param fut Result future. + * @param processedVers Processed versions. + * @return Future for flag indicating if transactions were prepared or committed or {@code null} for success future. + */ + @Nullable private IgniteInternalFuture<Boolean> txsPreparedOrCommitted(final GridCacheVersion nearVer, + int txNum, + @Nullable GridFutureAdapter<Boolean> fut, + @Nullable Collection<GridCacheVersion> processedVers) + { + for (final IgniteInternalTx tx : txs()) { if (nearVer.equals(tx.nearXidVersion())) { TransactionState state = tx.state(); + IgniteInternalFuture<IgniteInternalTx> prepFut = tx.currentPrepareFuture(); + + if (prepFut != null && !prepFut.isDone()) { + if (log.isDebugEnabled()) + log.debug("Transaction is preparing (will wait): " + tx); + + final GridFutureAdapter<Boolean> fut0 = fut != null ? fut : new GridFutureAdapter<Boolean>(); + + final int txNum0 = txNum; + + final Collection<GridCacheVersion> processedVers0 = processedVers; + + prepFut.listen(new CI1<IgniteInternalFuture<IgniteInternalTx>>() { + @Override public void apply(IgniteInternalFuture<IgniteInternalTx> prepFut) { + if (log.isDebugEnabled()) + log.debug("Transaction prepare future finished: " + tx); + + IgniteInternalFuture<Boolean> fut = txsPreparedOrCommitted(nearVer, + txNum0, + fut0, + processedVers0); + + assert fut == fut0; + } + }); + + return fut0; + } + if (state == PREPARED || state == COMMITTING || state == COMMITTED) { - if (--txNum == 0) - return true; + if (--txNum == 0) { + if (fut != null) + fut.onDone(true); + + return fut; + } } else { if (tx.state(MARKED_ROLLBACK) || tx.state() == UNKNOWN) { @@ -1783,18 +1829,32 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { if (log.isDebugEnabled()) log.debug("Transaction was not prepared (rolled back): " + tx); - return false; + if (fut == null) + fut = new GridFutureAdapter<>(); + + fut.onDone(false); + + return fut; } else { if (tx.state() == COMMITTED) { - if (--txNum == 0) - return true; + if (--txNum == 0) { + if (fut != null) + fut.onDone(true); + + return fut; + } } else { if (log.isDebugEnabled()) log.debug("Transaction is not prepared: " + tx); - return false; + if (fut == null) + fut = new GridFutureAdapter<>(); + + fut.onDone(false); + + return fut; } } } @@ -1821,13 +1881,22 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { CommittedVersion commitVer = (CommittedVersion)ver; if (commitVer.nearVer.equals(nearVer)) { - if (--txNum == 0) - return true; + if (--txNum == 0) { + if (fut != null) + fut.onDone(true); + + return fut; + } } } } - return false; + if (fut == null) + fut = new GridFutureAdapter<>(); + + fut.onDone(false); + + return fut; } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ceab5f3d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxOriginatingNodeFailureAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxOriginatingNodeFailureAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxOriginatingNodeFailureAbstractSelfTest.java index 3173622..ea8c60b 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxOriginatingNodeFailureAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxOriginatingNodeFailureAbstractSelfTest.java @@ -141,7 +141,7 @@ public abstract class IgniteTxOriginatingNodeFailureAbstractSelfTest extends Gri } info("Starting tx [values=" + map + ", topVer=" + - ((IgniteKernal)grid(1)).context().discovery().topologyVersion() + ']'); + (grid(1)).context().discovery().topologyVersion() + ']'); if (partial) ignoreMessages(grid(1).localNode().id(), ignoreMessageClass()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ceab5f3d/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite.java index 5682f67..0ced1c8 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite.java @@ -46,8 +46,8 @@ public class IgniteCacheRestartTestSuite extends TestSuite { // suite.addTestSuite(IgniteCacheAtomicReplicatedNodeRestartSelfTest.class); // TODO IGNITE-747 suite.addTestSuite(IgniteCacheAtomicPutAllFailoverSelfTest.class); - // suite.addTestSuite(GridCachePutAllFailoverSelfTest.class); TODO IGNITE-157 - // suite.addTestSuite(IgniteCacheAtomicNodeRestartTest.class); TODO IGNITE-157 + suite.addTestSuite(IgniteCachePutAllRestartTest.class); + suite.addTestSuite(GridCachePutAllFailoverSelfTest.class); return suite; }