Repository: incubator-ignite Updated Branches: refs/heads/ignite-460 7f1146f1f -> be1c801ee
ignite-646 Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/0d982f76 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/0d982f76 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/0d982f76 Branch: refs/heads/ignite-460 Commit: 0d982f76671da38e74de80d69522c6f19d67c7ba Parents: 9a79e70 Author: avinogradov <avinogra...@gridgain.com> Authored: Thu Apr 23 15:01:03 2015 +0300 Committer: avinogradov <avinogra...@gridgain.com> Committed: Thu Apr 23 15:01:03 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheIoManager.java | 37 ++++++-- .../IgniteCacheP2pUnmarshallingErrorTest.java | 3 +- .../IgniteCacheP2pUnmarshallingErrorTxTest.java | 92 ++++++++++++++++++++ 3 files changed, 125 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0d982f76/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java index 957f3c8..b584b17 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java @@ -24,6 +24,7 @@ import org.apache.ignite.internal.cluster.*; import org.apache.ignite.internal.managers.communication.*; import org.apache.ignite.internal.managers.deployment.*; import org.apache.ignite.internal.processors.affinity.*; +import org.apache.ignite.internal.processors.cache.distributed.dht.*; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.*; import org.apache.ignite.internal.processors.cache.distributed.near.*; import org.apache.ignite.internal.util.*; @@ -297,9 +298,9 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { } } - private void sendResponseOnFailedMessage(UUID nodeId, GridCacheMessage res, GridCacheContext ctx) { + private void sendResponseOnFailedMessage(UUID nodeId, GridCacheMessage res, GridCacheSharedContext cctx, GridIoPolicy plc) { try { - ctx.io().send(nodeId, res, ctx.ioPolicy()); + cctx.io().send(nodeId, res, plc); } catch (IgniteCheckedException e) { U.error(log, "Failed to send response to node (is node still alive?) [nodeId=" + nodeId + @@ -311,6 +312,18 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { GridCacheContext ctx = cctx.cacheContext(msg.cacheId()); switch (msg.directType()) { + case 34:{ + GridDhtTxPrepareRequest req = (GridDhtTxPrepareRequest)msg; + + GridDhtTxPrepareResponse res = new GridDhtTxPrepareResponse(req.version(), req.futureId(), req.miniId()); + + res.error(req.classError()); + + sendResponseOnFailedMessage(nodeId, res, cctx, req.policy()); + } + + break; + case 38: { GridDhtAtomicUpdateRequest req = (GridDhtAtomicUpdateRequest)msg; @@ -318,7 +331,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { res.onError(req.classError()); - sendResponseOnFailedMessage(nodeId, res, ctx); + sendResponseOnFailedMessage(nodeId, res, cctx, ctx.ioPolicy()); } break; @@ -332,7 +345,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { res.onError(req.classError()); - sendResponseOnFailedMessage(nodeId, res, ctx); + sendResponseOnFailedMessage(nodeId, res, cctx, ctx.ioPolicy()); } break; @@ -347,11 +360,25 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { res.error(req.classError()); - sendResponseOnFailedMessage(nodeId, res, ctx); + sendResponseOnFailedMessage(nodeId, res, cctx, ctx.ioPolicy()); } break; + case 55: { + GridNearTxPrepareRequest req = (GridNearTxPrepareRequest)msg; + + GridNearTxPrepareResponse res = new GridNearTxPrepareResponse(req.version(), req.futureId(), + req.miniId(), req.version(), null, null, null); + + res.error(req.classError()); + + sendResponseOnFailedMessage(nodeId, res, cctx, req.policy()); + } + + break; + + default: throw new IgniteCheckedException("Failed to send response to node. Unsupported direct type [message=" + msg + "]"); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0d982f76/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingErrorTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingErrorTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingErrorTest.java index 60f2226..7edbff2 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingErrorTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingErrorTest.java @@ -30,7 +30,7 @@ import java.util.concurrent.atomic.*; */ public class IgniteCacheP2pUnmarshallingErrorTest extends IgniteCacheAbstractTest { /** Allows to change behavior of readExternal method. */ - private static AtomicInteger readCnt = new AtomicInteger(); + protected static AtomicInteger readCnt = new AtomicInteger(); /** {@inheritDoc} */ @Override protected int gridCount() { @@ -136,7 +136,6 @@ public class IgniteCacheP2pUnmarshallingErrorTest extends IgniteCacheAbstractTes * Tests that correct response will be sent to client node in case of unmarshalling failed. */ public void testResponseMessageOnUnmarshallingFailed() { - //Checking failed unmarshalling on primary node. readCnt.set(1); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0d982f76/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingErrorTxTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingErrorTxTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingErrorTxTest.java new file mode 100644 index 0000000..cb6d444 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingErrorTxTest.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.transactions.*; + +import java.io.*; + +/** + * Check behavior on exception while unmarshalling key. + */ +public class IgniteCacheP2pUnmarshallingErrorTxTest extends IgniteCacheP2pUnmarshallingErrorTest { + + /** {@inheritDoc} */ + @Override protected CacheAtomicityMode atomicityMode() { + return CacheAtomicityMode.TRANSACTIONAL; + } + + /** + * Sends put with optimistic lock and handles fail. + */ + protected void failOptimistic() { + try (Transaction tx = grid(0).transactions().txStart(TransactionConcurrency.OPTIMISTIC, TransactionIsolation.REPEATABLE_READ)) { + + jcache(0).put(new TestKey("1"), ""); + + tx.commit(); + + assert false : "p2p marshalling failed, but error response was not sent"; + } + catch (IgniteException e) { + assert X.hasCause(e, IOException.class); + } + + assert readCnt.get() == 0; //ensure we have read counts as expected. + } + + /** + * Sends put with pessimistic lock and handles fail. + */ + protected void failPessimictic() { + try (Transaction tx = grid(0).transactions().txStart(TransactionConcurrency.PESSIMISTIC, TransactionIsolation.REPEATABLE_READ)) { + + jcache(0).put(new TestKey("1"), ""); + + assert false : "p2p marshalling failed, but error response was not sent"; + } + catch (IgniteException e) { + assert X.hasCause(e, IOException.class); + } + + assert readCnt.get() == 0; //ensure we have read counts as expected. + } + + /** + * Tests that correct response will be sent to client node in case of unmarshalling failed. + */ + public void testResponseMessageOnUnmarshallingFailed() { +// //GridNearTxPrepareRequest unmarshalling failed test +// readCnt.set(2); +// +// failOptimistic(); +// +// //GridDhtTxPrepareRequest unmarshalling failed test +// readCnt.set(3); +// +// failOptimistic(); + +// readCnt.set(1); +// +// failPessimictic(); + + } +}