ignite-646
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/a47974c3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/a47974c3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/a47974c3 Branch: refs/heads/ignite-286 Commit: a47974c3444da2f1804ca1c0b80cd74f92cdf137 Parents: 7a8e075 Author: avinogradov <avinogra...@gridgain.com> Authored: Tue Apr 21 16:33:13 2015 +0300 Committer: avinogradov <avinogra...@gridgain.com> Committed: Tue Apr 21 16:33:13 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheIoManager.java | 160 ++++++++++++------- .../distributed/dht/GridDhtCacheAdapter.java | 19 --- .../dht/atomic/GridDhtAtomicCache.java | 31 +--- .../IgniteCacheP2pUnmarshallingErrorTest.java | 46 +++--- 4 files changed, 131 insertions(+), 125 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a47974c3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java index 56ee65e..0df824f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java @@ -24,6 +24,8 @@ import org.apache.ignite.internal.cluster.*; import org.apache.ignite.internal.managers.communication.*; import org.apache.ignite.internal.managers.deployment.*; import org.apache.ignite.internal.processors.affinity.*; +import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.*; +import org.apache.ignite.internal.processors.cache.distributed.near.*; import org.apache.ignite.internal.util.*; import org.apache.ignite.internal.util.lang.*; import org.apache.ignite.internal.util.typedef.*; @@ -32,7 +34,6 @@ import org.apache.ignite.lang.*; import org.jetbrains.annotations.*; import org.jsr166.*; -import java.io.*; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.*; @@ -226,68 +227,66 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { unmarshall(nodeId, cacheMsg); - if (cacheMsg.allowForStartup()) - processMessage(nodeId, cacheMsg, c); + if (cacheMsg.classError() != null) + processFailedMessage(nodeId, cacheMsg); else { - IgniteInternalFuture<?> startFut = startFuture(cacheMsg); - - if (startFut.isDone()) + if (cacheMsg.allowForStartup()) processMessage(nodeId, cacheMsg, c); else { - if (log.isDebugEnabled()) - log.debug("Waiting for start future to complete for message [nodeId=" + nodeId + - ", locId=" + cctx.localNodeId() + ", msg=" + cacheMsg + ']'); - - // Don't hold this thread waiting for preloading to complete. - startFut.listen(new CI1<IgniteInternalFuture<?>>() { - @Override public void apply(final IgniteInternalFuture<?> f) { - cctx.kernalContext().closure().runLocalSafe( - new GridPlainRunnable() { - @Override public void run() { - rw.readLock(); - - try { - if (stopping) { - if (log.isDebugEnabled()) - log.debug("Received cache communication message while stopping " + - "(will ignore) [nodeId=" + nodeId + ", msg=" + cacheMsg + ']'); + IgniteInternalFuture<?> startFut = startFuture(cacheMsg); - return; - } + if (startFut.isDone()) + processMessage(nodeId, cacheMsg, c); + else { + if (log.isDebugEnabled()) + log.debug("Waiting for start future to complete for message [nodeId=" + nodeId + + ", locId=" + cctx.localNodeId() + ", msg=" + cacheMsg + ']'); - f.get(); + // Don't hold this thread waiting for preloading to complete. + startFut.listen(new CI1<IgniteInternalFuture<?>>() { + @Override public void apply(final IgniteInternalFuture<?> f) { + cctx.kernalContext().closure().runLocalSafe( + new GridPlainRunnable() { + @Override public void run() { + rw.readLock(); - if (log.isDebugEnabled()) - log.debug("Start future completed for message [nodeId=" + nodeId + - ", locId=" + cctx.localNodeId() + ", msg=" + cacheMsg + ']'); + try { + if (stopping) { + if (log.isDebugEnabled()) + log.debug("Received cache communication message while stopping " + + "(will ignore) [nodeId=" + nodeId + ", msg=" + cacheMsg + ']'); - processMessage(nodeId, cacheMsg, c); - } - catch (IgniteCheckedException e) { - // Log once. - if (startErr.compareAndSet(false, true)) - U.error(log, "Failed to complete preload start future " + - "(will ignore message) " + - "[fut=" + f + ", nodeId=" + nodeId + ", msg=" + cacheMsg + ']', e); - } - finally { - rw.readUnlock(); + return; + } + + f.get(); + + if (log.isDebugEnabled()) + log.debug("Start future completed for message [nodeId=" + nodeId + + ", locId=" + cctx.localNodeId() + ", msg=" + cacheMsg + ']'); + + processMessage(nodeId, cacheMsg, c); + } + catch (IgniteCheckedException e) { + // Log once. + if (startErr.compareAndSet(false, true)) + U.error(log, "Failed to complete preload start future " + + "(will ignore message) " + + "[fut=" + f + ", nodeId=" + nodeId + ", msg=" + cacheMsg + ']', e); + } + finally { + rw.readUnlock(); + } } } - } - ); - } - }); + ); + } + }); + } } } } catch (Throwable e) { -// if (X.hasCause(e, ClassNotFoundException.class)) -// U.error(log, "Failed to process message (note that distributed services " + -// "do not support peer class loading, if you deploy distributed service " + -// "you should have all required classes in CLASSPATH on all nodes in topology) " + -// "[senderId=" + nodeId + ", err=" + X.cause(e, ClassNotFoundException.class).getMessage() + ']'); -// else U.error(log, "Failed to process message [senderId=" + nodeId + ", messageType=" + cacheMsg.getClass() + ']', e); } finally { @@ -298,6 +297,61 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { } } + private void sendResponseOnFailedMessage(UUID nodeId, GridCacheMessage res, GridCacheContext ctx) { + try { + ctx.io().send(nodeId, res, ctx.ioPolicy()); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to send get response to node (is node still alive?) [nodeId=" + nodeId + + ",res=" + res + ']', e); + } + } + + private void processFailedMessage(UUID nodeId, GridCacheMessage msg) { + GridCacheContext ctx = cctx.cacheContext(msg.cacheId()); + + switch (msg.directType()) { + case 38: { + GridDhtAtomicUpdateRequest req = (GridDhtAtomicUpdateRequest)msg; + + GridDhtAtomicUpdateResponse res = new GridDhtAtomicUpdateResponse(ctx.cacheId(), req.futureVersion()); + + res.onError(req.classError()); + + sendResponseOnFailedMessage(nodeId, res, ctx); + } + + break; + case 40: { + GridNearAtomicUpdateRequest req = (GridNearAtomicUpdateRequest)msg; + + GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(ctx.cacheId(), + nodeId, + req.futureVersion()); + + res.onError(req.classError()); + + sendResponseOnFailedMessage(nodeId, res, ctx); + } + + break; + case 49: { + GridNearGetRequest req = (GridNearGetRequest)msg; + + GridNearGetResponse res = new GridNearGetResponse(ctx.cacheId(), + req.futureId(), + req.miniId(), + req.version()); + + res.error(req.classError()); + + sendResponseOnFailedMessage(nodeId, res, ctx); + } + + break; + } + } + /** * @param cacheMsg Cache message to get start future. * @return Preloader start future. @@ -738,11 +792,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { cacheMsg.finishUnmarshal(cctx, cctx.deploy().globalLoader()); } catch (IgniteCheckedException e) { -// if (cacheMsg.ignoreClassErrors() && X.hasCause(e, InvalidClassException.class, -// ClassNotFoundException.class, NoClassDefFoundError.class, UnsupportedClassVersionError.class)) - cacheMsg.onClassError(e); -// else -// throw e; + cacheMsg.onClassError(e); } catch (Error e) { if (cacheMsg.ignoreClassErrors() && X.hasCause(e, NoClassDefFoundError.class, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a47974c3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java index c6ebe0a..d85bc75 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java @@ -606,25 +606,6 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap protected void processNearGetRequest(final UUID nodeId, final GridNearGetRequest req) { assert ctx.affinityNode(); - if (req.classError() != null) { - GridNearGetResponse res = new GridNearGetResponse(ctx.cacheId(), - req.futureId(), - req.miniId(), - req.version()); - - res.error(req.classError()); - - try { - ctx.io().send(nodeId, res, ctx.ioPolicy()); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to send get response to node (is node still alive?) [nodeId=" + nodeId + - ",req=" + req + ", res=" + res + ']', e); - } - - return; - } - long ttl = req.accessTtl(); final CacheExpiryPolicy expiryPlc = CacheExpiryPolicy.forAccess(ttl); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a47974c3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index ec9e5a6..85f11b5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -2329,18 +2329,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { req.nodeId(ctx.localNodeId()); - if (req.classError() != null) { - GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(ctx.cacheId(), - nodeId, - req.futureVersion()); - - res.onError(req.classError()); - - sendNearUpdateReply(nodeId, res); - } - else { - updateAllAsyncInternal(nodeId, req, updateReplyClos); - } + updateAllAsyncInternal(nodeId, req, updateReplyClos); } /** @@ -2376,24 +2365,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { // Always send update reply. GridDhtAtomicUpdateResponse res = new GridDhtAtomicUpdateResponse(ctx.cacheId(), req.futureVersion()); - if (req.classError() != null) { - res.onError(req.classError()); - - try { - ctx.io().send(nodeId, res, ctx.ioPolicy()); - } - catch (ClusterTopologyCheckedException ignored) { - U.warn(log, "Failed to send DHT atomic update response to node because it left grid: " + - req.nodeId()); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to send DHT atomic update response (did node leave grid?) [nodeId=" + nodeId + - ", req=" + req + ']', e); - } - - return; - } - Boolean replicate = ctx.isDrEnabled(); boolean intercept = req.forceTransformBackups() && ctx.config().getInterceptor() != null; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a47974c3/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingErrorTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingErrorTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingErrorTest.java index d21c219..60f2226 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingErrorTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingErrorTest.java @@ -26,11 +26,11 @@ import java.io.*; import java.util.concurrent.atomic.*; /** - * Check behavior on exception while unmarshalling key + * Check behavior on exception while unmarshalling key. */ public class IgniteCacheP2pUnmarshallingErrorTest extends IgniteCacheAbstractTest { - /** Allows to change behavior of readExternal method */ - private static AtomicInteger nodeCnt = new AtomicInteger(); + /** Allows to change behavior of readExternal method. */ + private static AtomicInteger readCnt = new AtomicInteger(); /** {@inheritDoc} */ @Override protected int gridCount() { @@ -67,18 +67,18 @@ public class IgniteCacheP2pUnmarshallingErrorTest extends IgniteCacheAbstractTes return cfg; } - /** Test key 1 */ + /** Test key 1. */ public static class TestKey implements Externalizable { - /** Test key 1 */ + /** Test key 1. */ public TestKey(String field) { this.field = field; } - /** Test key 1 */ + /** Test key 1. */ public TestKey() { } - /** field */ + /** field. */ private String field; /** {@inheritDoc} */ @@ -105,29 +105,28 @@ public class IgniteCacheP2pUnmarshallingErrorTest extends IgniteCacheAbstractTes /** {@inheritDoc} */ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - if (nodeCnt.decrementAndGet() < 1) { //will throw exception on backup node only + if (readCnt.decrementAndGet() <= 0) { //will throw exception on backup node only throw new IOException("Class can not be unmarshalled"); } } } /** - * Test key 2. - * Unmarshalling always failed. + * Test key 2. Unmarshalling always failed. */ public static class TestKeyAlwaysFailed extends TestKey { - /** Test key 2 */ + /** Test key 2. */ public TestKeyAlwaysFailed(String field) { super(field); } - /** Test key 2 */ + /** Test key 2. */ public TestKeyAlwaysFailed() { } /** {@inheritDoc} */ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - nodeCnt.decrementAndGet(); + readCnt.decrementAndGet(); throw new IOException("Class can not be unmarshalled"); //will throw exception on primary node } @@ -138,20 +137,23 @@ public class IgniteCacheP2pUnmarshallingErrorTest extends IgniteCacheAbstractTes */ public void testResponseMessageOnUnmarshallingFailed() { - nodeCnt.set(1); + //Checking failed unmarshalling on primary node. + readCnt.set(1); try { - jcache(0).put(new TestKeyAlwaysFailed("1"), ""); + jcache(0).put(new TestKeyAlwaysFailed("1"), ""); //put will fail at primary node. + assert false : "p2p marshalling failed, but error response was not sent"; } catch (CacheException e) { assert X.hasCause(e, IOException.class); } - assert nodeCnt.get() == 0;//put request should not go to backup node in case failed at primary. + assert readCnt.get() == 0; //put request should not be handled by backup node in case failed at primary. try { assert jcache(0).get(new TestKeyAlwaysFailed("1")) == null; + assert false : "p2p marshalling failed, but error response was not sent"; } catch (CacheException e) { @@ -160,20 +162,22 @@ public class IgniteCacheP2pUnmarshallingErrorTest extends IgniteCacheAbstractTes assert grid(0).cachex().entrySet().size() == 0; - nodeCnt.set(2); //put request will be unmarshalled twice (at primary and at backup node). + //Checking failed unmarshalling on backup node. + readCnt.set(2); //put request will be unmarshalled twice (at primary and at backup node). try { - jcache(0).put(new TestKey("1"), "");//put will fail at backup node. + jcache(0).put(new TestKey("1"), ""); //put will fail at backup node only. + assert false : "p2p marshalling failed, but error response was not sent"; } catch (CacheException e) { assert X.hasCause(e, IOException.class); } - assert nodeCnt.get() == 0;//put request should go to primary and backup node. + assert readCnt.get() == 0; //put request should be handled by primary and backup node. - // Need to have to exception while unmarshalling getKeyResponse. - nodeCnt.set(3); //get response will me unmarshalled twice (request at primary node and response at client). + // Need to have no exception while unmarshalling getKeyResponse. + readCnt.set(3); //get response will me unmarshalled twice (request at primary node and response at client). assert jcache(0).get(new TestKey("1")) == null;