ignite-646

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/0d982f76
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/0d982f76
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/0d982f76

Branch: refs/heads/ignite-286
Commit: 0d982f76671da38e74de80d69522c6f19d67c7ba
Parents: 9a79e70
Author: avinogradov <avinogra...@gridgain.com>
Authored: Thu Apr 23 15:01:03 2015 +0300
Committer: avinogradov <avinogra...@gridgain.com>
Committed: Thu Apr 23 15:01:03 2015 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheIoManager.java    | 37 ++++++--
 .../IgniteCacheP2pUnmarshallingErrorTest.java   |  3 +-
 .../IgniteCacheP2pUnmarshallingErrorTxTest.java | 92 ++++++++++++++++++++
 3 files changed, 125 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0d982f76/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index 957f3c8..b584b17 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -24,6 +24,7 @@ import org.apache.ignite.internal.cluster.*;
 import org.apache.ignite.internal.managers.communication.*;
 import org.apache.ignite.internal.managers.deployment.*;
 import org.apache.ignite.internal.processors.affinity.*;
+import org.apache.ignite.internal.processors.cache.distributed.dht.*;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.*;
 import org.apache.ignite.internal.processors.cache.distributed.near.*;
 import org.apache.ignite.internal.util.*;
@@ -297,9 +298,9 @@ public class GridCacheIoManager extends 
GridCacheSharedManagerAdapter {
         }
     }
 
-    private void sendResponseOnFailedMessage(UUID nodeId, GridCacheMessage 
res, GridCacheContext ctx) {
+    private void sendResponseOnFailedMessage(UUID nodeId, GridCacheMessage 
res, GridCacheSharedContext cctx, GridIoPolicy plc) {
         try {
-            ctx.io().send(nodeId, res, ctx.ioPolicy());
+            cctx.io().send(nodeId, res, plc);
         }
         catch (IgniteCheckedException e) {
             U.error(log, "Failed to send response to node (is node still 
alive?) [nodeId=" + nodeId +
@@ -311,6 +312,18 @@ public class GridCacheIoManager extends 
GridCacheSharedManagerAdapter {
         GridCacheContext ctx = cctx.cacheContext(msg.cacheId());
 
         switch (msg.directType()) {
+            case 34:{
+                GridDhtTxPrepareRequest req = (GridDhtTxPrepareRequest)msg;
+
+                GridDhtTxPrepareResponse res = new 
GridDhtTxPrepareResponse(req.version(), req.futureId(), req.miniId());
+
+                res.error(req.classError());
+
+                sendResponseOnFailedMessage(nodeId, res, cctx, req.policy());
+            }
+
+            break;
+
             case 38: {
                 GridDhtAtomicUpdateRequest req = 
(GridDhtAtomicUpdateRequest)msg;
 
@@ -318,7 +331,7 @@ public class GridCacheIoManager extends 
GridCacheSharedManagerAdapter {
 
                 res.onError(req.classError());
 
-                sendResponseOnFailedMessage(nodeId, res, ctx);
+                sendResponseOnFailedMessage(nodeId, res, cctx, ctx.ioPolicy());
             }
 
             break;
@@ -332,7 +345,7 @@ public class GridCacheIoManager extends 
GridCacheSharedManagerAdapter {
 
                 res.onError(req.classError());
 
-                sendResponseOnFailedMessage(nodeId, res, ctx);
+                sendResponseOnFailedMessage(nodeId, res, cctx, ctx.ioPolicy());
             }
 
             break;
@@ -347,11 +360,25 @@ public class GridCacheIoManager extends 
GridCacheSharedManagerAdapter {
 
                 res.error(req.classError());
 
-                sendResponseOnFailedMessage(nodeId, res, ctx);
+                sendResponseOnFailedMessage(nodeId, res, cctx, ctx.ioPolicy());
             }
 
             break;
 
+            case 55: {
+                GridNearTxPrepareRequest req = (GridNearTxPrepareRequest)msg;
+
+                GridNearTxPrepareResponse res = new 
GridNearTxPrepareResponse(req.version(), req.futureId(),
+                    req.miniId(), req.version(), null, null, null);
+
+                res.error(req.classError());
+
+                sendResponseOnFailedMessage(nodeId, res, cctx, req.policy());
+            }
+
+            break;
+
+
             default:
                 throw new IgniteCheckedException("Failed to send response to 
node. Unsupported direct type [message="
                     + msg + "]");

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0d982f76/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingErrorTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingErrorTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingErrorTest.java
index 60f2226..7edbff2 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingErrorTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingErrorTest.java
@@ -30,7 +30,7 @@ import java.util.concurrent.atomic.*;
  */
 public class IgniteCacheP2pUnmarshallingErrorTest extends 
IgniteCacheAbstractTest {
     /** Allows to change behavior of readExternal method. */
-    private static AtomicInteger readCnt = new AtomicInteger();
+    protected static AtomicInteger readCnt = new AtomicInteger();
 
     /** {@inheritDoc} */
     @Override protected int gridCount() {
@@ -136,7 +136,6 @@ public class IgniteCacheP2pUnmarshallingErrorTest extends 
IgniteCacheAbstractTes
      * Tests that correct response will be sent to client node in case of 
unmarshalling failed.
      */
     public void testResponseMessageOnUnmarshallingFailed() {
-
         //Checking failed unmarshalling on primary node.
         readCnt.set(1);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0d982f76/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingErrorTxTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingErrorTxTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingErrorTxTest.java
new file mode 100644
index 0000000..cb6d444
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingErrorTxTest.java
@@ -0,0 +1,92 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.transactions.*;
+
+import java.io.*;
+
+/**
+ * Check behavior on exception while unmarshalling key.
+ */
+public class IgniteCacheP2pUnmarshallingErrorTxTest extends 
IgniteCacheP2pUnmarshallingErrorTest {
+
+    /** {@inheritDoc} */
+    @Override protected CacheAtomicityMode atomicityMode() {
+        return CacheAtomicityMode.TRANSACTIONAL;
+    }
+
+    /**
+     * Sends put with optimistic lock and handles fail.
+     */
+    protected void failOptimistic() {
+        try (Transaction tx = 
grid(0).transactions().txStart(TransactionConcurrency.OPTIMISTIC, 
TransactionIsolation.REPEATABLE_READ)) {
+
+            jcache(0).put(new TestKey("1"), "");
+
+            tx.commit();
+
+            assert false : "p2p marshalling failed, but error response was not 
sent";
+        }
+        catch (IgniteException e) {
+            assert X.hasCause(e, IOException.class);
+        }
+
+        assert readCnt.get() == 0; //ensure we have read counts as expected.
+    }
+
+    /**
+     * Sends put with pessimistic lock and handles fail.
+     */
+    protected void failPessimictic() {
+        try (Transaction tx = 
grid(0).transactions().txStart(TransactionConcurrency.PESSIMISTIC, 
TransactionIsolation.REPEATABLE_READ)) {
+
+            jcache(0).put(new TestKey("1"), "");
+
+            assert false : "p2p marshalling failed, but error response was not 
sent";
+        }
+        catch (IgniteException e) {
+            assert X.hasCause(e, IOException.class);
+        }
+
+        assert readCnt.get() == 0; //ensure we have read counts as expected.
+    }
+
+    /**
+     * Tests that correct response will be sent to client node in case of 
unmarshalling failed.
+     */
+    public void testResponseMessageOnUnmarshallingFailed() {
+//        //GridNearTxPrepareRequest unmarshalling failed test
+//        readCnt.set(2);
+//
+//        failOptimistic();
+//
+//        //GridDhtTxPrepareRequest unmarshalling failed test
+//        readCnt.set(3);
+//
+//        failOptimistic();
+
+//        readCnt.set(1);
+//
+//        failPessimictic();
+
+    }
+}

Reply via email to