# ignite-157-2 rollback transactions created on lock step

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/80629476
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/80629476
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/80629476

Branch: refs/heads/ignite-424
Commit: 806294762888809c5f780e2ba0ebbfd6bba76aca
Parents: 95ce93f
Author: sboikov <sboi...@gridgain.com>
Authored: Mon Apr 27 13:42:18 2015 +0300
Committer: sboikov <sboi...@gridgain.com>
Committed: Mon Apr 27 13:42:18 2015 +0300

----------------------------------------------------------------------
 .../distributed/dht/GridDhtLockFuture.java      |  18 ++--
 .../distributed/dht/GridDhtTxFinishFuture.java  | 102 +++++++++++++++++--
 .../distributed/dht/GridDhtTxLocalAdapter.java  |  23 +++++
 3 files changed, 125 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/80629476/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
index dd893ab..5b0275c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
@@ -872,10 +872,7 @@ public final class GridDhtLockFuture<K, V> extends 
GridCompoundIdentityFuture<Bo
 
                             boolean invalidateRdr = e.readerId(n.id()) != null;
 
-                            req.addDhtKey(
-                                e.key(),
-                                invalidateRdr,
-                                cctx);
+                            req.addDhtKey(e.key(), invalidateRdr, cctx);
 
                             if (needVal) {
                                 // Mark last added key as needed to be 
preloaded.
@@ -893,12 +890,17 @@ public final class GridDhtLockFuture<K, V> extends 
GridCompoundIdentityFuture<Bo
                             it.set(addOwned(req, e));
                         }
 
-                        add(fut); // Append new future.
+                        if (!F.isEmpty(req.keys())) {
+                            if (tx != null)
+                                tx.addLockTransactionNode(n);
 
-                        if (log.isDebugEnabled())
-                            log.debug("Sending DHT lock request to DHT node 
[node=" + n.id() + ", req=" + req + ']');
+                            add(fut); // Append new future.
 
-                        cctx.io().send(n, req, cctx.ioPolicy());
+                            if (log.isDebugEnabled())
+                                log.debug("Sending DHT lock request to DHT 
node [node=" + n.id() + ", req=" + req + ']');
+
+                            cctx.io().send(n, req, cctx.ioPolicy());
+                        }
                     }
                     catch (IgniteCheckedException e) {
                         // Fail the whole thing.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/80629476/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
index 1340ba2..7c35fc5 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
@@ -245,21 +245,92 @@ public final class GridDhtTxFinishFuture<K, V> extends 
GridCompoundIdentityFutur
     /**
      * Initializes future.
      */
+    @SuppressWarnings("SimplifiableIfStatement")
     public void finish() {
-        if (!F.isEmpty(dhtMap) || !F.isEmpty(nearMap)) {
-            boolean sync = finish(dhtMap, nearMap);
+        boolean sync;
 
-            markInitialized();
+        if (!F.isEmpty(dhtMap) || !F.isEmpty(nearMap))
+            sync = finish(dhtMap, nearMap);
+        else if (!commit && !F.isEmpty(tx.lockTransactionNodes()))
+            sync = rollbackLockTransactions(tx.lockTransactionNodes());
+        else
+            // No backup or near nodes to send commit message to (just 
complete then).
+            sync = false;
 
-            if (!sync)
-                onComplete();
-        }
-        else {
-            markInitialized();
+        markInitialized();
 
-            // No backup or near nodes to send commit message to (just 
complete then).
+        if (!sync)
             onComplete();
+    }
+
+    /**
+     * @param nodes Nodes.
+     * @return {@code True} in case there is at least one synchronous {@code 
MiniFuture} to wait for.
+     */
+    private boolean rollbackLockTransactions(Set<ClusterNode> nodes) {
+        assert !commit;
+        assert !F.isEmpty(nodes);
+
+        if (tx.onePhaseCommit())
+            return false;
+
+        boolean sync = commit ? tx.syncCommit() : tx.syncRollback();
+
+        if (tx.explicitLock())
+            sync = true;
+
+        boolean res = false;
+
+        for (ClusterNode n : nodes) {
+            assert !n.isLocal();
+
+            MiniFuture fut = new MiniFuture(n);
+
+            add(fut); // Append new future.
+
+            GridDhtTxFinishRequest req = new GridDhtTxFinishRequest(
+                tx.nearNodeId(),
+                futId,
+                fut.futureId(),
+                tx.topologyVersion(),
+                tx.xidVersion(),
+                tx.commitVersion(),
+                tx.threadId(),
+                tx.isolation(),
+                commit,
+                tx.isInvalidate(),
+                tx.system(),
+                tx.ioPolicy(),
+                tx.isSystemInvalidate(),
+                sync,
+                sync,
+                tx.completedBase(),
+                tx.committedVersions(),
+                tx.rolledbackVersions(),
+                tx.pendingVersions(),
+                tx.size(),
+                tx.groupLockKey(),
+                tx.subjectId(),
+                tx.taskNameHash());
+
+            try {
+                cctx.io().send(n, req, tx.ioPolicy());
+
+                if (sync)
+                    res = true;
+                else
+                    fut.onDone();
+            }
+            catch (IgniteCheckedException e) {
+                // Fail the whole thing.
+                if (e instanceof ClusterTopologyCheckedException)
+                    fut.onResult((ClusterTopologyCheckedException)e);
+                else
+                    fut.onResult(e);
+            }
         }
+
+        return res;
     }
 
     /**
@@ -420,6 +491,17 @@ public final class GridDhtTxFinishFuture<K, V> extends 
GridCompoundIdentityFutur
         @GridToStringInclude
         private GridDistributedTxMapping nearMapping;
 
+        /** */
+        @GridToStringInclude
+        private ClusterNode node;
+
+        /**
+         * @param node Node.
+         */
+        public MiniFuture(ClusterNode node) {
+            this.node = node;
+        }
+
         /**
          * @param dhtMapping Mapping.
          * @param nearMapping nearMapping.
@@ -442,7 +524,7 @@ public final class GridDhtTxFinishFuture<K, V> extends 
GridCompoundIdentityFutur
          * @return Node ID.
          */
         public ClusterNode node() {
-            return dhtMapping != null ? dhtMapping.node() : nearMapping.node();
+            return node != null ? node : dhtMapping != null ? 
dhtMapping.node() : nearMapping.node();
         }
 
         /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/80629476/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
index b32f0bb..08fcaf6 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
@@ -73,6 +73,9 @@ public abstract class GridDhtTxLocalAdapter extends 
IgniteTxLocalAdapter {
     /** Versions of pending locks for entries of this tx. */
     private Collection<GridCacheVersion> pendingVers;
 
+    /** Nodes where transactions were started on lock step. */
+    private Set<ClusterNode> lockTxNodes;
+
     /**
      * Empty constructor required for {@link Externalizable}.
      */
@@ -124,6 +127,26 @@ public abstract class GridDhtTxLocalAdapter extends 
IgniteTxLocalAdapter {
     }
 
     /**
+     * @param node Node.
+     */
+    public void addLockTransactionNode(ClusterNode node) {
+        assert node != null;
+        assert !node.isLocal();
+
+        if (lockTxNodes == null)
+            lockTxNodes = new HashSet<>();
+
+        lockTxNodes.add(node);
+    }
+
+    /**
+     * @return Nodes where transactions were started on lock step.
+     */
+    @Nullable public Set<ClusterNode> lockTransactionNodes() {
+        return lockTxNodes;
+    }
+
+    /**
      * @return Near node id.
      */
     protected abstract UUID nearNodeId();

Reply via email to