Repository: incubator-ignite Updated Branches: refs/heads/ignite-901 614f848a1 -> 6d6628a97
# ignite-901 Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/6d6628a9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/6d6628a9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/6d6628a9 Branch: refs/heads/ignite-901 Commit: 6d6628a97ceec7070d70a419e1282004d67d9c03 Parents: 614f848 Author: sboikov <sboi...@gridgain.com> Authored: Wed Jul 15 14:41:35 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Wed Jul 15 14:41:35 2015 +0300 ---------------------------------------------------------------------- .../distributed/dht/GridPartitionedGetFuture.java | 13 ++++++++++--- .../cache/distributed/near/GridNearGetFuture.java | 13 ++++++++++--- .../apache/ignite/spi/discovery/tcp/ClientImpl.java | 15 +++++++++------ .../tcp/messages/TcpDiscoveryClientAckResponse.java | 6 ++++++ .../discovery/tcp/TcpClientDiscoverySpiSelfTest.java | 15 ++++++++------- 5 files changed, 43 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6d6628a9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java index 79d5e75..bb3673d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java @@ -639,10 +639,17 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M if (timeout.finish()) { cctx.kernalContext().timeout().removeTimeoutObject(timeout); - // Remap. - map(keys.keySet(), F.t(node, keys), updTopVer); + try { + fut.get(); - onDone(Collections.<K, V>emptyMap()); + // Remap. + map(keys.keySet(), F.t(node, keys), updTopVer); + + onDone(Collections.<K, V>emptyMap()); + } + catch (IgniteCheckedException e) { + GridPartitionedGetFuture.this.onDone(e); + } } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6d6628a9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java index 58f6fe5..0691d39 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java @@ -764,10 +764,17 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma if (timeout.finish()) { cctx.kernalContext().timeout().removeTimeoutObject(timeout); - // Remap. - map(keys.keySet(), F.t(node, keys), updTopVer); + try { + fut.get(); - onDone(Collections.<K, V>emptyMap()); + // Remap. + map(keys.keySet(), F.t(node, keys), updTopVer); + + onDone(Collections.<K, V>emptyMap()); + } + catch (IgniteCheckedException e) { + GridNearGetFuture.this.onDone(e); + } } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6d6628a9/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java index 1f92ad7..e23c191 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java @@ -818,15 +818,18 @@ class ClientImpl extends TcpDiscoveryImpl { if (log.isDebugEnabled()) log.debug("Message has been received: " + msg); - if (msg instanceof TcpDiscoveryClientAckResponse) - sockWriter.ackReceived((TcpDiscoveryClientAckResponse)msg); - spi.stats.onMessageReceived(msg); - if (spi.ensured(msg) && joinLatch.getCount() == 0L) - lastMsgId = msg.id(); + boolean ack = msg instanceof TcpDiscoveryClientAckResponse; + + if (!ack) { + if (spi.ensured(msg) && joinLatch.getCount() == 0L) + lastMsgId = msg.id(); - msgWorker.addMessage(msg); + msgWorker.addMessage(msg); + } + else + sockWriter.ackReceived((TcpDiscoveryClientAckResponse)msg); } } catch (IOException e) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6d6628a9/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientAckResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientAckResponse.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientAckResponse.java index 89cc071..ce3943a 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientAckResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientAckResponse.java @@ -17,6 +17,7 @@ package org.apache.ignite.spi.discovery.tcp.messages; +import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; import java.util.*; @@ -55,4 +56,9 @@ public class TcpDiscoveryClientAckResponse extends TcpDiscoveryAbstractMessage { @Override public boolean highPriority() { return true; } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(TcpDiscoveryClientAckResponse.class, this, "super", super.toString()); + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6d6628a9/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java index 92c1f13..63db0c1 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java @@ -529,7 +529,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { TestTcpDiscoverySpi spi = ((TestTcpDiscoverySpi)ignite.configuration().getDiscoverySpi()); - spi.pauseAll(); + spi.pauseAll(false); try { spi.brakeConnection(); @@ -573,7 +573,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { TestTcpDiscoverySpi spi = ((TestTcpDiscoverySpi)ignite.configuration().getDiscoverySpi()); - spi.pauseAll(); + spi.pauseAll(false); try { spi.brakeConnection(); @@ -611,7 +611,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { attachListeners(2, 2); - ((TestTcpDiscoverySpi)G.ignite("client-1").configuration().getDiscoverySpi()).pauseAll(); + ((TestTcpDiscoverySpi)G.ignite("client-1").configuration().getDiscoverySpi()).pauseAll(true); stopGrid("server-2"); @@ -1293,7 +1293,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { disconnectLatch.countDown(); if (changeTop) - clientSpi.pauseAll(); + clientSpi.pauseAll(false); } else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) { assertEquals(0, disconnectLatch.getCount()); @@ -2118,12 +2118,13 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { } /** - * + * @param suspend If {@code true} suspends worker threads. */ - public void pauseAll() { + public void pauseAll(boolean suspend) { pauseResumeOperation(true, openSockLock, writeLock); - impl.workerThread().suspend(); + if (suspend) + impl.workerThread().suspend(); } /**