ignite-646
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/d24064a1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/d24064a1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/d24064a1 Branch: refs/heads/ignite-286 Commit: d24064a124558014300b945432942abc2e4998df Parents: 300cc75 Author: avinogradov <avinogra...@gridgain.com> Authored: Thu Apr 23 19:11:36 2015 +0300 Committer: avinogradov <avinogra...@gridgain.com> Committed: Thu Apr 23 19:11:36 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheIoManager.java | 41 +++++++++++++++--- .../IgniteCacheP2pUnmarshallingErrorTest.java | 3 ++ .../IgniteCacheP2pUnmarshallingErrorTxTest.java | 45 ++++++++++++++++++-- 3 files changed, 78 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d24064a1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java index b584b17..affdaf0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java @@ -315,7 +315,10 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { case 34:{ GridDhtTxPrepareRequest req = (GridDhtTxPrepareRequest)msg; - GridDhtTxPrepareResponse res = new GridDhtTxPrepareResponse(req.version(), req.futureId(), req.miniId()); + GridDhtTxPrepareResponse res = new GridDhtTxPrepareResponse( + req.version(), + req.futureId(), + req.miniId()); res.error(req.classError()); @@ -327,7 +330,9 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { case 38: { GridDhtAtomicUpdateRequest req = (GridDhtAtomicUpdateRequest)msg; - GridDhtAtomicUpdateResponse res = new GridDhtAtomicUpdateResponse(ctx.cacheId(), req.futureVersion()); + GridDhtAtomicUpdateResponse res = new GridDhtAtomicUpdateResponse( + ctx.cacheId(), + req.futureVersion()); res.onError(req.classError()); @@ -339,7 +344,8 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { case 40: { GridNearAtomicUpdateRequest req = (GridNearAtomicUpdateRequest)msg; - GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(ctx.cacheId(), + GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse( + ctx.cacheId(), nodeId, req.futureVersion()); @@ -353,7 +359,8 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { case 49: { GridNearGetRequest req = (GridNearGetRequest)msg; - GridNearGetResponse res = new GridNearGetResponse(ctx.cacheId(), + GridNearGetResponse res = new GridNearGetResponse( + ctx.cacheId(), req.futureId(), req.miniId(), req.version()); @@ -365,11 +372,32 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { break; + case 51: { + GridNearLockRequest req = (GridNearLockRequest)msg; + + GridNearLockResponse res = new GridNearLockResponse( + ctx.cacheId(), + req.version(), + req.futureId(), + req.miniId(), + false, + 0, + req.classError()); + + sendResponseOnFailedMessage(nodeId, res, cctx, ctx.ioPolicy()); + } + + break; + case 55: { GridNearTxPrepareRequest req = (GridNearTxPrepareRequest)msg; - GridNearTxPrepareResponse res = new GridNearTxPrepareResponse(req.version(), req.futureId(), - req.miniId(), req.version(), null, null, null); + GridNearTxPrepareResponse res = new GridNearTxPrepareResponse( + req.version(), + req.futureId(), + req.miniId(), + req.version(), + null, null, null); res.error(req.classError()); @@ -378,7 +406,6 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { break; - default: throw new IgniteCheckedException("Failed to send response to node. Unsupported direct type [message=" + msg + "]"); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d24064a1/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingErrorTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingErrorTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingErrorTest.java index 1f622bd..a50f07c 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingErrorTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingErrorTest.java @@ -32,6 +32,9 @@ public class IgniteCacheP2pUnmarshallingErrorTest extends IgniteCacheAbstractTes /** Allows to change behavior of readExternal method. */ protected static AtomicInteger readCnt = new AtomicInteger(); + /** iterable key */ + protected static int key = 0; + /** {@inheritDoc} */ @Override protected int gridCount() { return 3; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d24064a1/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingErrorTxTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingErrorTxTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingErrorTxTest.java index ed3749f..1a42407 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingErrorTxTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingErrorTxTest.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache; import org.apache.ignite.*; import org.apache.ignite.cache.*; +import org.apache.ignite.configuration.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.transactions.*; @@ -34,13 +35,17 @@ public class IgniteCacheP2pUnmarshallingErrorTxTest extends IgniteCacheP2pUnmars return CacheAtomicityMode.TRANSACTIONAL; } + @Override protected NearCacheConfiguration nearConfiguration() { + return null; + } + /** * Sends put with optimistic lock and handles fail. */ protected void failOptimistic() { try (Transaction tx = grid(0).transactions().txStart(TransactionConcurrency.OPTIMISTIC, TransactionIsolation.REPEATABLE_READ)) { - jcache(0).put(new TestKey("1"), ""); + jcache(0).put(new TestKey(String.valueOf(++key)), ""); tx.commit(); @@ -59,7 +64,26 @@ public class IgniteCacheP2pUnmarshallingErrorTxTest extends IgniteCacheP2pUnmars protected void failPessimictic() { try (Transaction tx = grid(0).transactions().txStart(TransactionConcurrency.PESSIMISTIC, TransactionIsolation.REPEATABLE_READ)) { - jcache(0).put(new TestKey("1"), ""); + jcache(0).put(new TestKey(String.valueOf(++key)), ""); + + assert false : "p2p marshalling failed, but error response was not sent"; + } +// catch (IgniteException e) { +// assert X.hasCause(e, IOException.class); +// } + + assert readCnt.get() == 0; //ensure we have read count as expected. + } + + /** + * Sends put with pessimistic lock and handles fail. + */ + protected void failPessimicticOnCommit() { + try (Transaction tx = grid(0).transactions().txStart(TransactionConcurrency.PESSIMISTIC, TransactionIsolation.REPEATABLE_READ)) { + + jcache(0).put(new TestKey(String.valueOf(++key)), ""); + + tx.commit(); assert false : "p2p marshalling failed, but error response was not sent"; } @@ -84,9 +108,22 @@ public class IgniteCacheP2pUnmarshallingErrorTxTest extends IgniteCacheP2pUnmars // // failOptimistic(); - readCnt.set(100); +// //GridNearLockRequest unmarshalling failed test +// readCnt.set(2); +// +// failPessimictic(); + + //? unmarshalling failed test + readCnt.set(1000); + try (Transaction tx = grid(0).transactions().txStart(TransactionConcurrency.PESSIMISTIC, TransactionIsolation.REPEATABLE_READ)) { + + TestKey tstKey = new TestKey(String.valueOf(++key)); + jcache(0).put(tstKey, ""); + jcache(0).lock(tstKey).lock(); + } + + - failPessimictic(); } }