Repository: incubator-ignite Updated Branches: refs/heads/ignite-286 dc1317da7 -> 7c596ddb1
ignite-646 Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/4843b661 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/4843b661 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/4843b661 Branch: refs/heads/ignite-286 Commit: 4843b661a6ae61288aaa1fb483d5f8f68c3729c6 Parents: 5049704 Author: avinogradov <avinogra...@gridgain.com> Authored: Fri Apr 17 19:49:14 2015 +0300 Committer: avinogradov <avinogra...@gridgain.com> Committed: Fri Apr 17 19:49:14 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheIoManager.java | 22 +-- .../processors/cache/GridCacheMessage.java | 8 +- .../distributed/dht/GridDhtCacheAdapter.java | 19 ++ .../dht/atomic/GridDhtAtomicCache.java | 31 +++- .../dht/atomic/GridDhtAtomicUpdateResponse.java | 8 + .../dht/atomic/GridNearAtomicUpdateFuture.java | 2 +- .../atomic/GridNearAtomicUpdateResponse.java | 18 +- .../distributed/near/GridNearGetResponse.java | 8 +- .../IgniteCacheP2pUnmarshallingErrorTest.java | 182 +++++++++++++++++++ 9 files changed, 273 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4843b661/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java index 6fefdfd..56ee65e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java @@ -282,13 +282,13 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { } } catch (Throwable e) { - if (X.hasCause(e, ClassNotFoundException.class)) - U.error(log, "Failed to process message (note that distributed services " + - "do not support peer class loading, if you deploy distributed service " + - "you should have all required classes in CLASSPATH on all nodes in topology) " + - "[senderId=" + nodeId + ", err=" + X.cause(e, ClassNotFoundException.class).getMessage() + ']'); - else - U.error(log, "Failed to process message [senderId=" + nodeId + ']', e); +// if (X.hasCause(e, ClassNotFoundException.class)) +// U.error(log, "Failed to process message (note that distributed services " + +// "do not support peer class loading, if you deploy distributed service " + +// "you should have all required classes in CLASSPATH on all nodes in topology) " + +// "[senderId=" + nodeId + ", err=" + X.cause(e, ClassNotFoundException.class).getMessage() + ']'); +// else + U.error(log, "Failed to process message [senderId=" + nodeId + ", messageType=" + cacheMsg.getClass() + ']', e); } finally { if (depEnabled) @@ -738,11 +738,11 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { cacheMsg.finishUnmarshal(cctx, cctx.deploy().globalLoader()); } catch (IgniteCheckedException e) { - if (cacheMsg.ignoreClassErrors() && X.hasCause(e, InvalidClassException.class, - ClassNotFoundException.class, NoClassDefFoundError.class, UnsupportedClassVersionError.class)) +// if (cacheMsg.ignoreClassErrors() && X.hasCause(e, InvalidClassException.class, +// ClassNotFoundException.class, NoClassDefFoundError.class, UnsupportedClassVersionError.class)) cacheMsg.onClassError(e); - else - throw e; +// else +// throw e; } catch (Error e) { if (cacheMsg.ignoreClassErrors() && X.hasCause(e, NoClassDefFoundError.class, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4843b661/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java index fefd582..5432c90 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java @@ -60,7 +60,7 @@ public abstract class GridCacheMessage implements Message { /** */ @GridDirectTransient - private Exception err; + private IgniteCheckedException err; /** */ @GridDirectTransient @@ -115,14 +115,14 @@ public abstract class GridCacheMessage implements Message { * * @param err Error. */ - public void onClassError(Exception err) { + public void onClassError(IgniteCheckedException err) { this.err = err; } /** - * @return Error set via {@link #onClassError(Exception)} method. + * @return Error set via {@link #onClassError(IgniteCheckedException)} method. */ - public Exception classError() { + public IgniteCheckedException classError() { return err; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4843b661/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java index 6e6cb04..e5312bf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java @@ -606,6 +606,25 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap protected void processNearGetRequest(final UUID nodeId, final GridNearGetRequest req) { assert ctx.affinityNode(); + if (req.classError() != null) { + GridNearGetResponse res = new GridNearGetResponse(ctx.cacheId(), + req.futureId(), + req.miniId(), + req.version()); + + res.error(req.classError()); + + try { + ctx.io().send(nodeId, res, ctx.ioPolicy()); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to send get response to node (is node still alive?) [nodeId=" + nodeId + + ",req=" + req + ", res=" + res + ']', e); + } + + return; + } + long ttl = req.accessTtl(); final CacheExpiryPolicy expiryPlc = CacheExpiryPolicy.forAccess(ttl); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4843b661/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 6984238..b46cd57 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -2329,7 +2329,18 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { req.nodeId(ctx.localNodeId()); - updateAllAsyncInternal(nodeId, req, updateReplyClos); + if (req.classError() != null) { + GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(ctx.cacheId(), + nodeId, + req.futureVersion()); + + res.onError(req.classError()); + + sendNearUpdateReply(nodeId, res); + } + else { + updateAllAsyncInternal(nodeId, req, updateReplyClos); + } } /** @@ -2365,6 +2376,24 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { // Always send update reply. GridDhtAtomicUpdateResponse res = new GridDhtAtomicUpdateResponse(ctx.cacheId(), req.futureVersion()); + if (req.classError() != null) { + res.onError(req.classError()); + + try { + ctx.io().send(nodeId, res, ctx.ioPolicy()); + } + catch (ClusterTopologyCheckedException ignored) { + U.warn(log, "Failed to send DHT atomic update response to node because it left grid: " + + req.nodeId()); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to send DHT atomic update response (did node leave grid?) [nodeId=" + nodeId + + ", req=" + req + ']', e); + } + + return; + } + Boolean replicate = ctx.isDrEnabled(); boolean intercept = req.forceTransformBackups() && ctx.config().getInterceptor() != null; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4843b661/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java index d0a7620..c5b5a37 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java @@ -88,6 +88,14 @@ public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements Gri } /** + * Sets update error. + * @param err + */ + public void onError(IgniteCheckedException err){ + this.err = err; + } + + /** * @return Gets update error. */ public IgniteCheckedException error() { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4843b661/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java index 974a197..159da99 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java @@ -335,7 +335,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> updateNear(singleReq, res); if (res.error() != null) - onDone(addFailedKeys(res.failedKeys(), res.error())); + onDone(res.failedKeys() != null ? addFailedKeys(res.failedKeys(), res.error()) : res.error()); else { if (op == TRANSFORM) { if (ret != null) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4843b661/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java index 01d5722..773b847 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java @@ -137,9 +137,17 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr } /** + * Sets update error. + * @param err + */ + public void onError(IgniteCheckedException err){ + this.err = err; + } + + /** * @return Update error, if any. */ - public Throwable error() { + public IgniteCheckedException error() { return err; } @@ -335,10 +343,12 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr * @param e Error cause. */ public synchronized void addFailedKeys(Collection<KeyCacheObject> keys, Throwable e) { - if (failedKeys == null) - failedKeys = new ArrayList<>(keys.size()); + if (keys != null) { + if (failedKeys == null) + failedKeys = new ArrayList<>(keys.size()); - failedKeys.addAll(keys); + failedKeys.addAll(keys); + } if (err == null) err = new IgniteCheckedException("Failed to update keys on primary node."); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4843b661/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetResponse.java index 57652bd..73d877a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetResponse.java @@ -65,7 +65,7 @@ public class GridNearGetResponse extends GridCacheMessage implements GridCacheDe /** Error. */ @GridDirectTransient - private Throwable err; + private IgniteCheckedException err; /** Serialized error. */ private byte[] errBytes; @@ -152,20 +152,20 @@ public class GridNearGetResponse extends GridCacheMessage implements GridCacheDe * @return Topology version if this response has invalid partitions. */ @Override public AffinityTopologyVersion topologyVersion() { - return topVer; + return topVer != null ? topVer : super.topologyVersion(); } /** * @return Error. */ - public Throwable error() { + public IgniteCheckedException error() { return err; } /** * @param err Error. */ - public void error(Throwable err) { + public void error(IgniteCheckedException err) { this.err = err; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4843b661/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingErrorTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingErrorTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingErrorTest.java new file mode 100644 index 0000000..d21c219 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingErrorTest.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.cache.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.util.typedef.*; + +import javax.cache.*; +import java.io.*; +import java.util.concurrent.atomic.*; + +/** + * Check behavior on exception while unmarshalling key + */ +public class IgniteCacheP2pUnmarshallingErrorTest extends IgniteCacheAbstractTest { + /** Allows to change behavior of readExternal method */ + private static AtomicInteger nodeCnt = new AtomicInteger(); + + /** {@inheritDoc} */ + @Override protected int gridCount() { + return 3; + } + + /** {@inheritDoc} */ + @Override protected CacheMode cacheMode() { + return CacheMode.PARTITIONED; + } + + /** {@inheritDoc} */ + @Override protected CacheAtomicityMode atomicityMode() { + return CacheAtomicityMode.ATOMIC; + } + + /** {@inheritDoc} */ + @Override protected CacheAtomicWriteOrderMode atomicWriteOrderMode() { + return CacheAtomicWriteOrderMode.PRIMARY; + } + + /** {@inheritDoc} */ + @Override protected NearCacheConfiguration nearConfiguration() { + return null; + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + if (gridName.endsWith("0")) + cfg.setClientMode(true); + + return cfg; + } + + /** Test key 1 */ + public static class TestKey implements Externalizable { + /** Test key 1 */ + public TestKey(String field) { + this.field = field; + } + + /** Test key 1 */ + public TestKey() { + } + + /** field */ + private String field; + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + + TestKey key = (TestKey)o; + + return !(field != null ? !field.equals(key.field) : key.field != null); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return field != null ? field.hashCode() : 0; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + if (nodeCnt.decrementAndGet() < 1) { //will throw exception on backup node only + throw new IOException("Class can not be unmarshalled"); + } + } + } + + /** + * Test key 2. + * Unmarshalling always failed. + */ + public static class TestKeyAlwaysFailed extends TestKey { + /** Test key 2 */ + public TestKeyAlwaysFailed(String field) { + super(field); + } + + /** Test key 2 */ + public TestKeyAlwaysFailed() { + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + nodeCnt.decrementAndGet(); + throw new IOException("Class can not be unmarshalled"); //will throw exception on primary node + } + + } + + /** + * Tests that correct response will be sent to client node in case of unmarshalling failed. + */ + public void testResponseMessageOnUnmarshallingFailed() { + + nodeCnt.set(1); + + try { + jcache(0).put(new TestKeyAlwaysFailed("1"), ""); + assert false : "p2p marshalling failed, but error response was not sent"; + } + catch (CacheException e) { + assert X.hasCause(e, IOException.class); + } + + assert nodeCnt.get() == 0;//put request should not go to backup node in case failed at primary. + + try { + assert jcache(0).get(new TestKeyAlwaysFailed("1")) == null; + assert false : "p2p marshalling failed, but error response was not sent"; + } + catch (CacheException e) { + assert X.hasCause(e, IOException.class); + } + + assert grid(0).cachex().entrySet().size() == 0; + + nodeCnt.set(2); //put request will be unmarshalled twice (at primary and at backup node). + + try { + jcache(0).put(new TestKey("1"), "");//put will fail at backup node. + assert false : "p2p marshalling failed, but error response was not sent"; + } + catch (CacheException e) { + assert X.hasCause(e, IOException.class); + } + + assert nodeCnt.get() == 0;//put request should go to primary and backup node. + + // Need to have to exception while unmarshalling getKeyResponse. + nodeCnt.set(3); //get response will me unmarshalled twice (request at primary node and response at client). + + assert jcache(0).get(new TestKey("1")) == null; + + assert grid(0).cachex().entrySet().size() == 0; + } +}