IGNITE-264 - Process check committed finish response properly.

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/e9fd4f7e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/e9fd4f7e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/e9fd4f7e

Branch: refs/heads/ignite-264
Commit: e9fd4f7e36df7a1406af99040fd7b4a776bee2fa
Parents: 30fef4a
Author: Alexey Goncharuk <agoncha...@gridgain.com>
Authored: Tue Feb 24 17:32:33 2015 -0800
Committer: Alexey Goncharuk <agoncha...@gridgain.com>
Committed: Tue Feb 24 17:32:33 2015 -0800

----------------------------------------------------------------------
 .../dht/GridDhtTxFinishResponse.java            | 24 ++++++++++++
 .../near/GridNearTxFinishFuture.java            | 39 ++++++++++++++++----
 .../cache/transactions/IgniteTxHandler.java     | 37 +++++++++++++++----
 3 files changed, 85 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e9fd4f7e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java
index 03b8227..1627334 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java
@@ -46,6 +46,9 @@ public class GridDhtTxFinishResponse<K, V> extends 
GridDistributedTxFinishRespon
     /** Serialized error. */
     private byte[] errBytes;
 
+    /** Flag indicating if this is a check-committed response. */
+    private boolean checkCommitted;
+
     /**
      * Empty constructor required by {@link Externalizable}.
      */
@@ -80,6 +83,27 @@ public class GridDhtTxFinishResponse<K, V> extends 
GridDistributedTxFinishRespon
         return err;
     }
 
+    /**
+     * @param err Error for check committed backup requests.
+     */
+    public void error(Throwable err) {
+        this.err = err;
+    }
+
+    /**
+     * @return Check committed flag.
+     */
+    public boolean checkCommitted() {
+        return checkCommitted;
+    }
+
+    /**
+     * @param checkCommitted Check committed flag.
+     */
+    public void checkCommitted(boolean checkCommitted) {
+        this.checkCommitted = checkCommitted;
+    }
+
     /** {@inheritDoc} */
     @Override public void prepareMarshal(GridCacheSharedContext<K, V> ctx) 
throws IgniteCheckedException {
         super.prepareMarshal(ctx);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e9fd4f7e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
index 0152b39..9b29ee3 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
@@ -213,6 +213,25 @@ public final class GridNearTxFinishFuture<K, V> extends 
GridCompoundIdentityFutu
             }
     }
 
+    /**
+     * @param nodeId Sender.
+     * @param res Result.
+     */
+    public void onResult(UUID nodeId, GridDhtTxFinishResponse<K, V> res) {
+        if (!isDone())
+            for (IgniteInternalFuture<IgniteInternalTx> fut : futures()) {
+                if (isMini(fut)) {
+                    MiniFuture f = (MiniFuture)fut;
+
+                    if (f.futureId().equals(res.miniId())) {
+                        assert f.node().id().equals(nodeId);
+
+                        f.onResult(res);
+                    }
+                }
+            }
+    }
+
     /** {@inheritDoc} */
     @Override public boolean onDone(IgniteInternalTx tx, Throwable err) {
         if ((initialized() || err != null)) {
@@ -348,7 +367,13 @@ public final class GridNearTxFinishFuture<K, V> extends 
GridCompoundIdentityFutu
             if (!F.isEmpty(backups)) {
                 assert backups.size() == 1;
 
-                UUID backup = F.first(backups);
+                UUID backupId = F.first(backups);
+
+                ClusterNode backup = ctx.discovery().node(backupId);
+
+                // Nothing to do if backup has left the grid.
+                if (backup == null)
+                    return;
 
                 MiniFuture mini = new MiniFuture(backup);
 
@@ -512,7 +537,7 @@ public final class GridNearTxFinishFuture<K, V> extends 
GridCompoundIdentityFutu
         private GridDistributedTxMapping<K, V> m;
 
         /** Backup check flag. */
-        private UUID backupId;
+        private ClusterNode backup;
 
         /**
          * Empty constructor required for {@link Externalizable}.
@@ -531,12 +556,12 @@ public final class GridNearTxFinishFuture<K, V> extends 
GridCompoundIdentityFutu
         }
 
         /**
-         * @param backupId Backup ID to check.
+         * @param backup Backup to check.
          */
-        MiniFuture(UUID backupId) {
+        MiniFuture(ClusterNode backup) {
             super(cctx.kernalContext());
 
-            this.backupId = backupId;
+            this.backup = backup;
         }
 
         /**
@@ -586,7 +611,7 @@ public final class GridNearTxFinishFuture<K, V> extends 
GridCompoundIdentityFutu
          * @param res Result callback.
          */
         void onResult(GridNearTxFinishResponse<K, V> res) {
-            assert backupId == null;
+            assert backup == null;
 
             if (res.error() != null)
                 onDone(res.error());
@@ -598,7 +623,7 @@ public final class GridNearTxFinishFuture<K, V> extends 
GridCompoundIdentityFutu
          * @param res Response.
          */
         void onResult(GridDhtTxFinishResponse<K, V> res) {
-            assert backupId != null;
+            assert backup != null;
 
             if (res.error() != null)
                 onDone(res.error());

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e9fd4f7e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index 880d6a5..0633964 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -409,17 +409,32 @@ public class IgniteTxHandler<K, V> {
         assert nodeId != null;
         assert res != null;
 
-        GridDhtTxFinishFuture<K, V> fut = (GridDhtTxFinishFuture<K, 
V>)ctx.mvcc().<IgniteInternalTx>future(res.xid(),
-            res.futureId());
+        if (res.checkCommitted()) {
+            GridNearTxFinishFuture<K, V> fut = (GridNearTxFinishFuture<K, 
V>)ctx.mvcc().<IgniteInternalTx>future(
+                res.xid(), res.futureId());
 
-        if (fut == null) {
-            if (log.isDebugEnabled())
-                log.debug("Received response for unknown future (will ignore): 
" + res);
+            if (fut == null) {
+                if (log.isDebugEnabled())
+                    log.debug("Received response for unknown future (will 
ignore): " + res);
 
-            return;
+                return;
+            }
+
+            fut.onResult(nodeId, res);
         }
+        else {
+            GridDhtTxFinishFuture<K, V> fut = (GridDhtTxFinishFuture<K, 
V>)ctx.mvcc().<IgniteInternalTx>future(
+                res.xid(), res.futureId());
 
-        fut.onResult(nodeId, res);
+            if (fut == null) {
+                if (log.isDebugEnabled())
+                    log.debug("Received response for unknown future (will 
ignore): " + res);
+
+                return;
+            }
+
+            fut.onResult(nodeId, res);
+        }
     }
 
     /**
@@ -854,9 +869,15 @@ public class IgniteTxHandler<K, V> {
      */
     protected void sendReply(UUID nodeId, GridDhtTxFinishRequest<K, V> req, 
boolean committed) {
         if (req.replyRequired()) {
-            GridCacheMessage<K, V> res = new 
GridDhtTxFinishResponse<>(req.version(), req.futureId(), req.miniId());
+            GridDhtTxFinishResponse<K, V> res = new 
GridDhtTxFinishResponse<>(req.version(), req.futureId(), req.miniId());
 
+            if (req.checkCommitted()) {
+                res.checkCommitted(true);
 
+                if (!committed)
+                    res.error(new IgniteTxRollbackCheckedException("Failed to 
commit transaction (transaction has been " +
+                        "rolled back on backup node): " + req.version()));
+            }
 
             try {
                 ctx.io().send(nodeId, res, req.system() ? UTILITY_CACHE_POOL : 
SYSTEM_POOL);

Reply via email to