Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-286 dc1317da7 -> 7c596ddb1


ignite-646


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/4843b661
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/4843b661
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/4843b661

Branch: refs/heads/ignite-286
Commit: 4843b661a6ae61288aaa1fb483d5f8f68c3729c6
Parents: 5049704
Author: avinogradov <avinogra...@gridgain.com>
Authored: Fri Apr 17 19:49:14 2015 +0300
Committer: avinogradov <avinogra...@gridgain.com>
Committed: Fri Apr 17 19:49:14 2015 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheIoManager.java    |  22 +--
 .../processors/cache/GridCacheMessage.java      |   8 +-
 .../distributed/dht/GridDhtCacheAdapter.java    |  19 ++
 .../dht/atomic/GridDhtAtomicCache.java          |  31 +++-
 .../dht/atomic/GridDhtAtomicUpdateResponse.java |   8 +
 .../dht/atomic/GridNearAtomicUpdateFuture.java  |   2 +-
 .../atomic/GridNearAtomicUpdateResponse.java    |  18 +-
 .../distributed/near/GridNearGetResponse.java   |   8 +-
 .../IgniteCacheP2pUnmarshallingErrorTest.java   | 182 +++++++++++++++++++
 9 files changed, 273 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4843b661/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index 6fefdfd..56ee65e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -282,13 +282,13 @@ public class GridCacheIoManager extends 
GridCacheSharedManagerAdapter {
             }
         }
         catch (Throwable e) {
-            if (X.hasCause(e, ClassNotFoundException.class))
-                U.error(log, "Failed to process message (note that distributed 
services " +
-                    "do not support peer class loading, if you deploy 
distributed service " +
-                    "you should have all required classes in CLASSPATH on all 
nodes in topology) " +
-                    "[senderId=" + nodeId + ", err=" + X.cause(e, 
ClassNotFoundException.class).getMessage() + ']');
-            else
-                U.error(log, "Failed to process message [senderId=" + nodeId + 
']', e);
+//            if (X.hasCause(e, ClassNotFoundException.class))
+//                U.error(log, "Failed to process message (note that 
distributed services " +
+//                    "do not support peer class loading, if you deploy 
distributed service " +
+//                    "you should have all required classes in CLASSPATH on 
all nodes in topology) " +
+//                    "[senderId=" + nodeId + ", err=" + X.cause(e, 
ClassNotFoundException.class).getMessage() + ']');
+//            else
+            U.error(log, "Failed to process message [senderId=" + nodeId + ", 
messageType=" + cacheMsg.getClass() + ']', e);
         }
         finally {
             if (depEnabled)
@@ -738,11 +738,11 @@ public class GridCacheIoManager extends 
GridCacheSharedManagerAdapter {
             cacheMsg.finishUnmarshal(cctx, cctx.deploy().globalLoader());
         }
         catch (IgniteCheckedException e) {
-            if (cacheMsg.ignoreClassErrors() && X.hasCause(e, 
InvalidClassException.class,
-                    ClassNotFoundException.class, NoClassDefFoundError.class, 
UnsupportedClassVersionError.class))
+//            if (cacheMsg.ignoreClassErrors() && X.hasCause(e, 
InvalidClassException.class,
+//                    ClassNotFoundException.class, 
NoClassDefFoundError.class, UnsupportedClassVersionError.class))
                 cacheMsg.onClassError(e);
-            else
-                throw e;
+//            else
+//                throw e;
         }
         catch (Error e) {
             if (cacheMsg.ignoreClassErrors() && X.hasCause(e, 
NoClassDefFoundError.class,

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4843b661/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
index fefd582..5432c90 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
@@ -60,7 +60,7 @@ public abstract class GridCacheMessage implements Message {
 
     /** */
     @GridDirectTransient
-    private Exception err;
+    private IgniteCheckedException err;
 
     /** */
     @GridDirectTransient
@@ -115,14 +115,14 @@ public abstract class GridCacheMessage implements Message 
{
      *
      * @param err Error.
      */
-    public void onClassError(Exception err) {
+    public void onClassError(IgniteCheckedException err) {
         this.err = err;
     }
 
     /**
-     * @return Error set via {@link #onClassError(Exception)} method.
+     * @return Error set via {@link #onClassError(IgniteCheckedException)} 
method.
      */
-    public Exception classError() {
+    public IgniteCheckedException classError() {
         return err;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4843b661/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
index 6e6cb04..e5312bf 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
@@ -606,6 +606,25 @@ public abstract class GridDhtCacheAdapter<K, V> extends 
GridDistributedCacheAdap
     protected void processNearGetRequest(final UUID nodeId, final 
GridNearGetRequest req) {
         assert ctx.affinityNode();
 
+        if (req.classError() != null) {
+            GridNearGetResponse res = new GridNearGetResponse(ctx.cacheId(),
+                req.futureId(),
+                req.miniId(),
+                req.version());
+
+            res.error(req.classError());
+
+            try {
+                ctx.io().send(nodeId, res, ctx.ioPolicy());
+            }
+            catch (IgniteCheckedException e) {
+                U.error(log, "Failed to send get response to node (is node 
still alive?) [nodeId=" + nodeId +
+                    ",req=" + req + ", res=" + res + ']', e);
+            }
+
+            return;
+        }
+
         long ttl = req.accessTtl();
 
         final CacheExpiryPolicy expiryPlc = CacheExpiryPolicy.forAccess(ttl);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4843b661/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 6984238..b46cd57 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -2329,7 +2329,18 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
 
         req.nodeId(ctx.localNodeId());
 
-        updateAllAsyncInternal(nodeId, req, updateReplyClos);
+        if (req.classError() != null) {
+            GridNearAtomicUpdateResponse res = new 
GridNearAtomicUpdateResponse(ctx.cacheId(),
+                nodeId,
+                req.futureVersion());
+
+            res.onError(req.classError());
+
+            sendNearUpdateReply(nodeId, res);
+        }
+        else {
+            updateAllAsyncInternal(nodeId, req, updateReplyClos);
+        }
     }
 
     /**
@@ -2365,6 +2376,24 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
         // Always send update reply.
         GridDhtAtomicUpdateResponse res = new 
GridDhtAtomicUpdateResponse(ctx.cacheId(), req.futureVersion());
 
+        if (req.classError() != null) {
+            res.onError(req.classError());
+
+            try {
+                ctx.io().send(nodeId, res, ctx.ioPolicy());
+            }
+            catch (ClusterTopologyCheckedException ignored) {
+                U.warn(log, "Failed to send DHT atomic update response to node 
because it left grid: " +
+                    req.nodeId());
+            }
+            catch (IgniteCheckedException e) {
+                U.error(log, "Failed to send DHT atomic update response (did 
node leave grid?) [nodeId=" + nodeId +
+                    ", req=" + req + ']', e);
+            }
+
+            return;
+        }
+
         Boolean replicate = ctx.isDrEnabled();
 
         boolean intercept = req.forceTransformBackups() && 
ctx.config().getInterceptor() != null;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4843b661/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java
index d0a7620..c5b5a37 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java
@@ -88,6 +88,14 @@ public class GridDhtAtomicUpdateResponse extends 
GridCacheMessage implements Gri
     }
 
     /**
+     * Sets update error.
+     * @param err
+     */
+    public void onError(IgniteCheckedException err){
+        this.err = err;
+    }
+
+    /**
      * @return Gets update error.
      */
     public IgniteCheckedException error() {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4843b661/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index 974a197..159da99 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -335,7 +335,7 @@ public class GridNearAtomicUpdateFuture extends 
GridFutureAdapter<Object>
             updateNear(singleReq, res);
 
             if (res.error() != null)
-                onDone(addFailedKeys(res.failedKeys(), res.error()));
+                onDone(res.failedKeys() != null ? 
addFailedKeys(res.failedKeys(), res.error()) : res.error());
             else {
                 if (op == TRANSFORM) {
                     if (ret != null)

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4843b661/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
index 01d5722..773b847 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
@@ -137,9 +137,17 @@ public class GridNearAtomicUpdateResponse extends 
GridCacheMessage implements Gr
     }
 
     /**
+     * Sets update error.
+     * @param err
+     */
+    public void onError(IgniteCheckedException err){
+        this.err = err;
+    }
+
+    /**
      * @return Update error, if any.
      */
-    public Throwable error() {
+    public IgniteCheckedException error() {
         return err;
     }
 
@@ -335,10 +343,12 @@ public class GridNearAtomicUpdateResponse extends 
GridCacheMessage implements Gr
      * @param e Error cause.
      */
     public synchronized void addFailedKeys(Collection<KeyCacheObject> keys, 
Throwable e) {
-        if (failedKeys == null)
-            failedKeys = new ArrayList<>(keys.size());
+        if (keys != null) {
+            if (failedKeys == null)
+                failedKeys = new ArrayList<>(keys.size());
 
-        failedKeys.addAll(keys);
+            failedKeys.addAll(keys);
+        }
 
         if (err == null)
             err = new IgniteCheckedException("Failed to update keys on primary 
node.");

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4843b661/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetResponse.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetResponse.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetResponse.java
index 57652bd..73d877a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetResponse.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetResponse.java
@@ -65,7 +65,7 @@ public class GridNearGetResponse extends GridCacheMessage 
implements GridCacheDe
 
     /** Error. */
     @GridDirectTransient
-    private Throwable err;
+    private IgniteCheckedException err;
 
     /** Serialized error. */
     private byte[] errBytes;
@@ -152,20 +152,20 @@ public class GridNearGetResponse extends GridCacheMessage 
implements GridCacheDe
      * @return Topology version if this response has invalid partitions.
      */
     @Override public AffinityTopologyVersion topologyVersion() {
-        return topVer;
+        return topVer != null ? topVer : super.topologyVersion();
     }
 
     /**
      * @return Error.
      */
-    public Throwable error() {
+    public IgniteCheckedException error() {
         return err;
     }
 
     /**
      * @param err Error.
      */
-    public void error(Throwable err) {
+    public void error(IgniteCheckedException err) {
         this.err = err;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4843b661/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingErrorTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingErrorTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingErrorTest.java
new file mode 100644
index 0000000..d21c219
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingErrorTest.java
@@ -0,0 +1,182 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.cache.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.util.typedef.*;
+
+import javax.cache.*;
+import java.io.*;
+import java.util.concurrent.atomic.*;
+
+/**
+ * Check behavior on exception while unmarshalling key
+ */
+public class IgniteCacheP2pUnmarshallingErrorTest extends 
IgniteCacheAbstractTest {
+    /** Allows to change behavior of readExternal method */
+    private static AtomicInteger nodeCnt = new AtomicInteger();
+
+    /** {@inheritDoc} */
+    @Override protected int gridCount() {
+        return 3;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected CacheMode cacheMode() {
+        return CacheMode.PARTITIONED;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected CacheAtomicityMode atomicityMode() {
+        return CacheAtomicityMode.ATOMIC;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected CacheAtomicWriteOrderMode atomicWriteOrderMode() {
+        return CacheAtomicWriteOrderMode.PRIMARY;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected NearCacheConfiguration nearConfiguration() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        if (gridName.endsWith("0"))
+            cfg.setClientMode(true);
+
+        return cfg;
+    }
+
+    /** Test key 1 */
+    public static class TestKey implements Externalizable {
+        /** Test key 1 */
+        public TestKey(String field) {
+            this.field = field;
+        }
+
+        /** Test key 1 */
+        public TestKey() {
+        }
+
+        /** field */
+        private String field;
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            TestKey key = (TestKey)o;
+
+            return !(field != null ? !field.equals(key.field) : key.field != 
null);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return field != null ? field.hashCode() : 0;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void writeExternal(ObjectOutput out) throws 
IOException {
+
+        }
+
+        /** {@inheritDoc} */
+        @Override public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException {
+            if (nodeCnt.decrementAndGet() < 1) { //will throw exception on 
backup node only
+                throw new IOException("Class can not be unmarshalled");
+            }
+        }
+    }
+
+    /**
+     * Test key 2.
+     * Unmarshalling always failed.
+     */
+    public static class TestKeyAlwaysFailed extends TestKey {
+        /** Test key 2 */
+        public TestKeyAlwaysFailed(String field) {
+            super(field);
+        }
+
+        /** Test key 2 */
+        public TestKeyAlwaysFailed() {
+        }
+
+        /** {@inheritDoc} */
+        @Override public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException {
+            nodeCnt.decrementAndGet();
+            throw new IOException("Class can not be unmarshalled"); //will 
throw exception on primary node
+        }
+
+    }
+
+    /**
+     * Tests that correct response will be sent to client node in case of 
unmarshalling failed.
+     */
+    public void testResponseMessageOnUnmarshallingFailed() {
+
+        nodeCnt.set(1);
+
+        try {
+            jcache(0).put(new TestKeyAlwaysFailed("1"), "");
+            assert false : "p2p marshalling failed, but error response was not 
sent";
+        }
+        catch (CacheException e) {
+            assert X.hasCause(e, IOException.class);
+        }
+
+        assert nodeCnt.get() == 0;//put request should not go to backup node 
in case failed at primary.
+
+        try {
+            assert jcache(0).get(new TestKeyAlwaysFailed("1")) == null;
+            assert false : "p2p marshalling failed, but error response was not 
sent";
+        }
+        catch (CacheException e) {
+            assert X.hasCause(e, IOException.class);
+        }
+
+        assert grid(0).cachex().entrySet().size() == 0;
+
+        nodeCnt.set(2); //put request will be unmarshalled twice (at primary 
and at backup node).
+
+        try {
+            jcache(0).put(new TestKey("1"), "");//put will fail at backup node.
+            assert false : "p2p marshalling failed, but error response was not 
sent";
+        }
+        catch (CacheException e) {
+            assert X.hasCause(e, IOException.class);
+        }
+
+        assert nodeCnt.get() == 0;//put request should go to primary and 
backup node.
+
+        // Need to have to exception while unmarshalling getKeyResponse.
+        nodeCnt.set(3); //get response will me unmarshalled twice (request at 
primary node and response at client).
+
+        assert jcache(0).get(new TestKey("1")) == null;
+
+        assert grid(0).cachex().entrySet().size() == 0;
+    }
+}

Reply via email to