# ignite-157-2 rollback transactions created on lock step
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/80629476 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/80629476 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/80629476 Branch: refs/heads/ignite-757 Commit: 806294762888809c5f780e2ba0ebbfd6bba76aca Parents: 95ce93f Author: sboikov <sboi...@gridgain.com> Authored: Mon Apr 27 13:42:18 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Mon Apr 27 13:42:18 2015 +0300 ---------------------------------------------------------------------- .../distributed/dht/GridDhtLockFuture.java | 18 ++-- .../distributed/dht/GridDhtTxFinishFuture.java | 102 +++++++++++++++++-- .../distributed/dht/GridDhtTxLocalAdapter.java | 23 +++++ 3 files changed, 125 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/80629476/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java index dd893ab..5b0275c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java @@ -872,10 +872,7 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo boolean invalidateRdr = e.readerId(n.id()) != null; - req.addDhtKey( - e.key(), - invalidateRdr, - cctx); + req.addDhtKey(e.key(), invalidateRdr, cctx); if (needVal) { // Mark last added key as needed to be preloaded. @@ -893,12 +890,17 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo it.set(addOwned(req, e)); } - add(fut); // Append new future. + if (!F.isEmpty(req.keys())) { + if (tx != null) + tx.addLockTransactionNode(n); - if (log.isDebugEnabled()) - log.debug("Sending DHT lock request to DHT node [node=" + n.id() + ", req=" + req + ']'); + add(fut); // Append new future. - cctx.io().send(n, req, cctx.ioPolicy()); + if (log.isDebugEnabled()) + log.debug("Sending DHT lock request to DHT node [node=" + n.id() + ", req=" + req + ']'); + + cctx.io().send(n, req, cctx.ioPolicy()); + } } catch (IgniteCheckedException e) { // Fail the whole thing. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/80629476/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java index 1340ba2..7c35fc5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java @@ -245,21 +245,92 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur /** * Initializes future. */ + @SuppressWarnings("SimplifiableIfStatement") public void finish() { - if (!F.isEmpty(dhtMap) || !F.isEmpty(nearMap)) { - boolean sync = finish(dhtMap, nearMap); + boolean sync; - markInitialized(); + if (!F.isEmpty(dhtMap) || !F.isEmpty(nearMap)) + sync = finish(dhtMap, nearMap); + else if (!commit && !F.isEmpty(tx.lockTransactionNodes())) + sync = rollbackLockTransactions(tx.lockTransactionNodes()); + else + // No backup or near nodes to send commit message to (just complete then). + sync = false; - if (!sync) - onComplete(); - } - else { - markInitialized(); + markInitialized(); - // No backup or near nodes to send commit message to (just complete then). + if (!sync) onComplete(); + } + + /** + * @param nodes Nodes. + * @return {@code True} in case there is at least one synchronous {@code MiniFuture} to wait for. + */ + private boolean rollbackLockTransactions(Set<ClusterNode> nodes) { + assert !commit; + assert !F.isEmpty(nodes); + + if (tx.onePhaseCommit()) + return false; + + boolean sync = commit ? tx.syncCommit() : tx.syncRollback(); + + if (tx.explicitLock()) + sync = true; + + boolean res = false; + + for (ClusterNode n : nodes) { + assert !n.isLocal(); + + MiniFuture fut = new MiniFuture(n); + + add(fut); // Append new future. + + GridDhtTxFinishRequest req = new GridDhtTxFinishRequest( + tx.nearNodeId(), + futId, + fut.futureId(), + tx.topologyVersion(), + tx.xidVersion(), + tx.commitVersion(), + tx.threadId(), + tx.isolation(), + commit, + tx.isInvalidate(), + tx.system(), + tx.ioPolicy(), + tx.isSystemInvalidate(), + sync, + sync, + tx.completedBase(), + tx.committedVersions(), + tx.rolledbackVersions(), + tx.pendingVersions(), + tx.size(), + tx.groupLockKey(), + tx.subjectId(), + tx.taskNameHash()); + + try { + cctx.io().send(n, req, tx.ioPolicy()); + + if (sync) + res = true; + else + fut.onDone(); + } + catch (IgniteCheckedException e) { + // Fail the whole thing. + if (e instanceof ClusterTopologyCheckedException) + fut.onResult((ClusterTopologyCheckedException)e); + else + fut.onResult(e); + } } + + return res; } /** @@ -420,6 +491,17 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur @GridToStringInclude private GridDistributedTxMapping nearMapping; + /** */ + @GridToStringInclude + private ClusterNode node; + + /** + * @param node Node. + */ + public MiniFuture(ClusterNode node) { + this.node = node; + } + /** * @param dhtMapping Mapping. * @param nearMapping nearMapping. @@ -442,7 +524,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur * @return Node ID. */ public ClusterNode node() { - return dhtMapping != null ? dhtMapping.node() : nearMapping.node(); + return node != null ? node : dhtMapping != null ? dhtMapping.node() : nearMapping.node(); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/80629476/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java index b32f0bb..08fcaf6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java @@ -73,6 +73,9 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { /** Versions of pending locks for entries of this tx. */ private Collection<GridCacheVersion> pendingVers; + /** Nodes where transactions were started on lock step. */ + private Set<ClusterNode> lockTxNodes; + /** * Empty constructor required for {@link Externalizable}. */ @@ -124,6 +127,26 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { } /** + * @param node Node. + */ + public void addLockTransactionNode(ClusterNode node) { + assert node != null; + assert !node.isLocal(); + + if (lockTxNodes == null) + lockTxNodes = new HashSet<>(); + + lockTxNodes.add(node); + } + + /** + * @return Nodes where transactions were started on lock step. + */ + @Nullable public Set<ClusterNode> lockTransactionNodes() { + return lockTxNodes; + } + + /** * @return Near node id. */ protected abstract UUID nearNodeId();