Fixed tests.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/b056a739 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/b056a739 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/b056a739 Branch: refs/heads/ignite-1108 Commit: b056a739ebf001d3c63ebc8afa52dba520055221 Parents: d0a6f55 Author: nikolay_tikhonov <ntikho...@gridgain.com> Authored: Mon Aug 3 18:26:26 2015 +0300 Committer: nikolay_tikhonov <ntikho...@gridgain.com> Committed: Mon Aug 3 18:26:26 2015 +0300 ---------------------------------------------------------------------- .../integration/ClientAbstractMultiNodeSelfTest.java | 4 ++-- .../managers/communication/GridIoManager.java | 2 +- .../internal/GridJobMasterLeaveAwareSelfTest.java | 10 ++++++---- .../internal/IgniteClientReconnectAbstractTest.java | 5 +++-- .../internal/IgniteClientReconnectCacheTest.java | 5 +++-- .../GridDeploymentMessageCountSelfTest.java | 5 +++-- .../cache/CacheStopAndDestroySelfTest.java | 8 ++++---- .../cache/GridCacheAtomicMessageCountSelfTest.java | 6 ++++-- .../GridCacheReplicatedSynchronousCommitTest.java | 5 +++-- .../cache/IgniteCacheAbstractStopBusySelfTest.java | 6 ++++-- .../cache/IgniteCacheNearLockValueSelfTest.java | 6 ++++-- ...IgniteDynamicCacheStartNoExchangeTimeoutTest.java | 4 ++-- .../cache/IgniteTxReentryAbstractSelfTest.java | 5 +++-- .../IgniteCacheClientNodeChangingTopologyTest.java | 6 ++++-- .../IgniteCacheClientNodePartitionsExchangeTest.java | 4 ++-- ...niteTxOriginatingNodeFailureAbstractSelfTest.java | 6 +++--- ...misticOriginatingNodeFailureAbstractSelfTest.java | 6 +++--- .../dht/GridCacheDhtPreloadMessageCountTest.java | 5 +++-- ...eCachePrimaryNodeFailureRecoveryAbstractTest.java | 6 ++++-- ...dCacheAtomicInvalidPartitionHandlingSelfTest.java | 5 +++-- .../near/IgniteCacheNearTxRollbackTest.java | 6 ++++-- .../GridCacheReplicatedInvalidateSelfTest.java | 6 ++++-- .../apache/ignite/util/TestTcpCommunicationSpi.java | 6 ++++-- .../CacheScanPartitionQueryFallbackSelfTest.java | 15 +++++++++------ 24 files changed, 85 insertions(+), 57 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b056a739/modules/clients/src/test/java/org/apache/ignite/internal/client/integration/ClientAbstractMultiNodeSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/client/integration/ClientAbstractMultiNodeSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/client/integration/ClientAbstractMultiNodeSelfTest.java index cf1b735..66f266a 100644 --- a/modules/clients/src/test/java/org/apache/ignite/internal/client/integration/ClientAbstractMultiNodeSelfTest.java +++ b/modules/clients/src/test/java/org/apache/ignite/internal/client/integration/ClientAbstractMultiNodeSelfTest.java @@ -456,11 +456,11 @@ public abstract class ClientAbstractMultiNodeSelfTest extends GridCommonAbstract @SuppressWarnings("unchecked") private static class TestCommunicationSpi extends TcpCommunicationSpi { /** {@inheritDoc} */ - @Override public void sendMessage(ClusterNode node, Message msg) + @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackClosure) throws IgniteSpiException { checkSyncFlags((GridIoMessage)msg); - super.sendMessage(node, msg); + super.sendMessage(node, msg, ackClosure); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b056a739/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java index 53ccdfe..85e8421 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java @@ -1039,7 +1039,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa * <p> * How to use it: * <ol> - * <li>Replace {@link #send(ClusterNode, Object, int, Message, byte, boolean, long, boolean)} + * <li>Replace {@link #send(ClusterNode, Object, int, Message, byte, boolean, long, boolean, IgniteInClosure)} * with this method.</li> * <li>Start all grids for your test, then set {@link #TURBO_DEBUG_MODE} to {@code true}.</li> * <li>Perform test operations on the topology. No network will be there.</li> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b056a739/modules/core/src/test/java/org/apache/ignite/internal/GridJobMasterLeaveAwareSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridJobMasterLeaveAwareSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridJobMasterLeaveAwareSelfTest.java index 55fc2f7..355c795 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/GridJobMasterLeaveAwareSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridJobMasterLeaveAwareSelfTest.java @@ -730,9 +730,9 @@ public class GridJobMasterLeaveAwareSelfTest extends GridCommonAbstractTest { private CountDownLatch waitLatch = new CountDownLatch(1); /** {@inheritDoc} */ - @Override public void sendMessage(ClusterNode node, Message msg) + @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackClosure) throws IgniteSpiException { - sendMessage0(node, msg); + sendMessage0(node, msg, ackClosure); } /** @@ -741,9 +741,11 @@ public class GridJobMasterLeaveAwareSelfTest extends GridCommonAbstractTest { * * @param node Destination node. * @param msg Message to be sent. + * @param ackClosure Ack closure. * @throws org.apache.ignite.spi.IgniteSpiException If failed. */ - private void sendMessage0(ClusterNode node, Message msg) throws IgniteSpiException { + private void sendMessage0(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackClosure) + throws IgniteSpiException { if (msg instanceof GridIoMessage) { GridIoMessage msg0 = (GridIoMessage)msg; @@ -762,7 +764,7 @@ public class GridJobMasterLeaveAwareSelfTest extends GridCommonAbstractTest { } if (!block) - super.sendMessage(node, msg); + super.sendMessage(node, msg, ackClosure); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b056a739/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java index 7247d54..38522af 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java @@ -315,7 +315,8 @@ public abstract class IgniteClientReconnectAbstractTest extends GridCommonAbstra private IgniteLogger log; /** {@inheritDoc} */ - @Override public void sendMessage(ClusterNode node, Message msg) throws IgniteSpiException { + @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackClosure) + throws IgniteSpiException { Class msgCls0 = msgCls; if (collectStart.get() && msg instanceof GridIoMessage) @@ -328,7 +329,7 @@ public abstract class IgniteClientReconnectAbstractTest extends GridCommonAbstra return; } - super.sendMessage(node, msg); + super.sendMessage(node, msg, ackClosure); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b056a739/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java index be3234d..073f6f3 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java @@ -1138,7 +1138,8 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac private Map<Class<?>, Set<UUID>> blockCls = new HashMap<>(); /** {@inheritDoc} */ - @Override public void sendMessage(ClusterNode node, Message msg) throws IgniteSpiException { + @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackClosure) + throws IgniteSpiException { if (msg instanceof GridIoMessage) { Object msg0 = ((GridIoMessage)msg).message(); @@ -1156,7 +1157,7 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac } } - super.sendMessage(node, msg); + super.sendMessage(node, msg, ackClosure); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b056a739/modules/core/src/test/java/org/apache/ignite/internal/managers/deployment/GridDeploymentMessageCountSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/deployment/GridDeploymentMessageCountSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/deployment/GridDeploymentMessageCountSelfTest.java index 9df66b3..728be6b 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/managers/deployment/GridDeploymentMessageCountSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/deployment/GridDeploymentMessageCountSelfTest.java @@ -23,6 +23,7 @@ import org.apache.ignite.cluster.*; import org.apache.ignite.compute.*; import org.apache.ignite.configuration.*; import org.apache.ignite.internal.managers.communication.*; +import org.apache.ignite.lang.*; import org.apache.ignite.plugin.extensions.communication.*; import org.apache.ignite.spi.*; import org.apache.ignite.spi.communication.tcp.*; @@ -169,12 +170,12 @@ public class GridDeploymentMessageCountSelfTest extends GridCommonAbstractTest { private AtomicInteger msgCnt = new AtomicInteger(); /** {@inheritDoc} */ - @Override public void sendMessage(ClusterNode node, Message msg) + @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackClosure) throws IgniteSpiException { if (isDeploymentMessage((GridIoMessage)msg)) msgCnt.incrementAndGet(); - super.sendMessage(node, msg); + super.sendMessage(node, msg, ackClosure); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b056a739/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheStopAndDestroySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheStopAndDestroySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheStopAndDestroySelfTest.java index 20284a8..803789e 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheStopAndDestroySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheStopAndDestroySelfTest.java @@ -18,13 +18,12 @@ package org.apache.ignite.internal.processors.cache; import org.apache.ignite.*; -import org.apache.ignite.cache.*; import org.apache.ignite.cluster.*; import org.apache.ignite.configuration.*; -import org.apache.ignite.internal.*; import org.apache.ignite.internal.managers.communication.*; import org.apache.ignite.internal.processors.cache.distributed.dht.*; import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; import org.apache.ignite.plugin.extensions.communication.*; import org.apache.ignite.spi.*; import org.apache.ignite.spi.communication.tcp.*; @@ -121,8 +120,9 @@ public class CacheStopAndDestroySelfTest extends GridCommonAbstractTest { public static UUID nodeFilter; /** {@inheritDoc} */ - @Override public void sendMessage(ClusterNode node, Message msg) throws IgniteSpiException { - super.sendMessage(node, msg); + @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackClosure) + throws IgniteSpiException { + super.sendMessage(node, msg, ackClosure); if (nodeFilter != null && node.id().equals(nodeFilter) && http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b056a739/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicMessageCountSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicMessageCountSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicMessageCountSelfTest.java index db4061a..f2be20a 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicMessageCountSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicMessageCountSelfTest.java @@ -17,12 +17,14 @@ package org.apache.ignite.internal.processors.cache; +import org.apache.ignite.*; import org.apache.ignite.cache.*; import org.apache.ignite.cache.affinity.*; import org.apache.ignite.cluster.*; import org.apache.ignite.configuration.*; import org.apache.ignite.internal.managers.communication.*; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.*; +import org.apache.ignite.lang.*; import org.apache.ignite.plugin.extensions.communication.*; import org.apache.ignite.spi.*; import org.apache.ignite.spi.communication.tcp.*; @@ -193,14 +195,14 @@ public class GridCacheAtomicMessageCountSelfTest extends GridCommonAbstractTest private Map<Class<?>, AtomicInteger> cntMap = new HashMap<>(); /** {@inheritDoc} */ - @Override public void sendMessage(ClusterNode node, Message msg) + @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackClosure) throws IgniteSpiException { AtomicInteger cntr = cntMap.get(((GridIoMessage)msg).message().getClass()); if (cntr != null) cntr.incrementAndGet(); - super.sendMessage(node, msg); + super.sendMessage(node, msg, ackClosure); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b056a739/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheReplicatedSynchronousCommitTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheReplicatedSynchronousCommitTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheReplicatedSynchronousCommitTest.java index 6c04ed8..2352352 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheReplicatedSynchronousCommitTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheReplicatedSynchronousCommitTest.java @@ -24,6 +24,7 @@ import org.apache.ignite.configuration.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.managers.communication.*; import org.apache.ignite.internal.processors.cache.distributed.*; +import org.apache.ignite.lang.*; import org.apache.ignite.plugin.extensions.communication.*; import org.apache.ignite.spi.*; import org.apache.ignite.spi.communication.tcp.*; @@ -185,7 +186,7 @@ public class GridCacheReplicatedSynchronousCommitTest extends GridCommonAbstract } /** {@inheritDoc} */ - @Override public void sendMessage(ClusterNode node, Message msg) + @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackClosure) throws IgniteSpiException { Object obj = ((GridIoMessage)msg).message(); @@ -196,7 +197,7 @@ public class GridCacheReplicatedSynchronousCommitTest extends GridCommonAbstract return; } - super.sendMessage(node, msg); + super.sendMessage(node, msg, ackClosure); } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b056a739/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractStopBusySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractStopBusySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractStopBusySelfTest.java index 9c30f23..16e9c74 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractStopBusySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractStopBusySelfTest.java @@ -25,6 +25,7 @@ import org.apache.ignite.internal.*; import org.apache.ignite.internal.managers.communication.*; import org.apache.ignite.internal.processors.cache.distributed.near.*; import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.lang.*; import org.apache.ignite.plugin.extensions.communication.*; import org.apache.ignite.spi.*; import org.apache.ignite.spi.communication.tcp.*; @@ -343,7 +344,8 @@ public abstract class IgniteCacheAbstractStopBusySelfTest extends GridCommonAbst */ private class TestTpcCommunicationSpi extends TcpCommunicationSpi { /** {@inheritDoc} */ - @Override public void sendMessage(ClusterNode node, Message msg) throws IgniteSpiException { + @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackClosure) + throws IgniteSpiException { if (suspended.get()) { assert bannedMsg.get() != null; @@ -355,7 +357,7 @@ public abstract class IgniteCacheAbstractStopBusySelfTest extends GridCommonAbst } } - super.sendMessage(node, msg); + super.sendMessage(node, msg, ackClosure); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b056a739/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheNearLockValueSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheNearLockValueSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheNearLockValueSelfTest.java index c500bbf..d3eef68 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheNearLockValueSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheNearLockValueSelfTest.java @@ -24,6 +24,7 @@ import org.apache.ignite.configuration.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.managers.communication.*; import org.apache.ignite.internal.processors.cache.distributed.near.*; +import org.apache.ignite.lang.*; import org.apache.ignite.plugin.extensions.communication.*; import org.apache.ignite.spi.*; import org.apache.ignite.spi.communication.tcp.*; @@ -122,7 +123,8 @@ public class IgniteCacheNearLockValueSelfTest extends GridCommonAbstractTest { private Collection<GridNearLockRequest> reqs = new ConcurrentLinkedDeque<>(); /** {@inheritDoc} */ - @Override public void sendMessage(ClusterNode node, Message msg) throws IgniteSpiException { + @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackClosure) + throws IgniteSpiException { if (msg instanceof GridIoMessage) { GridIoMessage ioMsg = (GridIoMessage)msg; @@ -130,7 +132,7 @@ public class IgniteCacheNearLockValueSelfTest extends GridCommonAbstractTest { reqs.add((GridNearLockRequest)ioMsg.message()); } - super.sendMessage(node, msg); + super.sendMessage(node, msg, ackClosure); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b056a739/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartNoExchangeTimeoutTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartNoExchangeTimeoutTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartNoExchangeTimeoutTest.java index 5011e5f..ec48bd1 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartNoExchangeTimeoutTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartNoExchangeTimeoutTest.java @@ -453,14 +453,14 @@ public class IgniteDynamicCacheStartNoExchangeTimeoutTest extends GridCommonAbst */ private static class TestCommunicationSpi extends TcpCommunicationSpi { /** {@inheritDoc} */ - @Override public void sendMessage(ClusterNode node, Message msg) + @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackClosure) throws IgniteSpiException { Object msg0 = ((GridIoMessage)msg).message(); if (msg0 instanceof GridDhtPartitionsSingleRequest) // Sent in case of exchange timeout. fail("Unexpected message: " + msg0); - super.sendMessage(node, msg); + super.sendMessage(node, msg, ackClosure); } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b056a739/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxReentryAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxReentryAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxReentryAbstractSelfTest.java index de62955..9614613 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxReentryAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxReentryAbstractSelfTest.java @@ -25,6 +25,7 @@ import org.apache.ignite.internal.managers.communication.*; import org.apache.ignite.internal.processors.cache.distributed.*; import org.apache.ignite.internal.processors.cache.distributed.dht.*; import org.apache.ignite.internal.processors.cache.distributed.near.*; +import org.apache.ignite.lang.*; import org.apache.ignite.plugin.extensions.communication.*; import org.apache.ignite.spi.*; import org.apache.ignite.spi.communication.tcp.*; @@ -136,11 +137,11 @@ public abstract class IgniteTxReentryAbstractSelfTest extends GridCommonAbstract private AtomicInteger dhtLocks = new AtomicInteger(); /** {@inheritDoc} */ - @Override public void sendMessage(ClusterNode node, Message msg) + @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackClosure) throws IgniteSpiException { countMsg((GridIoMessage)msg); - super.sendMessage(node, msg); + super.sendMessage(node, msg, ackClosure); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b056a739/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java index 6782ff4..58c5f95 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java @@ -32,6 +32,7 @@ import org.apache.ignite.internal.processors.cache.version.*; import org.apache.ignite.internal.util.lang.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; import org.apache.ignite.plugin.extensions.communication.*; import org.apache.ignite.resources.*; import org.apache.ignite.spi.*; @@ -1701,7 +1702,8 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac private List<Object> recordedMsgs = new ArrayList<>(); /** {@inheritDoc} */ - @Override public void sendMessage(ClusterNode node, Message msg) throws IgniteSpiException { + @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackClosure) + throws IgniteSpiException { if (msg instanceof GridIoMessage) { Object msg0 = ((GridIoMessage)msg).message(); @@ -1722,7 +1724,7 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac } } - super.sendMessage(node, msg); + super.sendMessage(node, msg, ackClosure); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b056a739/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodePartitionsExchangeTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodePartitionsExchangeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodePartitionsExchangeTest.java index 5a51a1b..822b807 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodePartitionsExchangeTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodePartitionsExchangeTest.java @@ -609,8 +609,8 @@ public class IgniteCacheClientNodePartitionsExchangeTest extends GridCommonAbstr private IgniteLogger log; /** {@inheritDoc} */ - @Override public void sendMessage(ClusterNode node, Message msg) { - super.sendMessage(node, msg); + @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackClosure) { + super.sendMessage(node, msg, ackClosure); Object msg0 = ((GridIoMessage)msg).message(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b056a739/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxOriginatingNodeFailureAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxOriginatingNodeFailureAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxOriginatingNodeFailureAbstractSelfTest.java index d664aa8..cf17094 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxOriginatingNodeFailureAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxOriginatingNodeFailureAbstractSelfTest.java @@ -239,10 +239,10 @@ public abstract class IgniteTxOriginatingNodeFailureAbstractSelfTest extends Gri IgniteConfiguration cfg = super.getConfiguration(gridName); cfg.setCommunicationSpi(new TcpCommunicationSpi() { - @Override public void sendMessage(ClusterNode node, Message msg) - throws IgniteSpiException { + @Override public void sendMessage(ClusterNode node, Message msg, + IgniteInClosure<IgniteException> ackClosure) throws IgniteSpiException { if (!F.eq(ignoreMsgNodeId, node.id()) || !ignoredMessage((GridIoMessage)msg)) - super.sendMessage(node, msg); + super.sendMessage(node, msg, ackClosure); } }); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b056a739/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxPessimisticOriginatingNodeFailureAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxPessimisticOriginatingNodeFailureAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxPessimisticOriginatingNodeFailureAbstractSelfTest.java index 13fadc4..da5b14b 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxPessimisticOriginatingNodeFailureAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxPessimisticOriginatingNodeFailureAbstractSelfTest.java @@ -432,8 +432,8 @@ public abstract class IgniteTxPessimisticOriginatingNodeFailureAbstractSelfTest IgniteConfiguration cfg = super.getConfiguration(gridName); cfg.setCommunicationSpi(new TcpCommunicationSpi() { - @Override public void sendMessage(ClusterNode node, Message msg) - throws IgniteSpiException { + @Override public void sendMessage(ClusterNode node, Message msg, + IgniteInClosure<IgniteException> ackClosure) throws IgniteSpiException { if (getSpiContext().localNode().id().equals(failingNodeId)) { if (ignoredMessage((GridIoMessage)msg) && ignoreMsgNodeIds != null) { for (UUID ignored : ignoreMsgNodeIds) { @@ -443,7 +443,7 @@ public abstract class IgniteTxPessimisticOriginatingNodeFailureAbstractSelfTest } } - super.sendMessage(node, msg); + super.sendMessage(node, msg, ackClosure); } }); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b056a739/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadMessageCountTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadMessageCountTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadMessageCountTest.java index 0650436..f8c346b 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadMessageCountTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheDhtPreloadMessageCountTest.java @@ -25,6 +25,7 @@ import org.apache.ignite.configuration.*; import org.apache.ignite.internal.managers.communication.*; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.*; import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; import org.apache.ignite.plugin.extensions.communication.*; import org.apache.ignite.spi.*; import org.apache.ignite.spi.communication.tcp.*; @@ -141,11 +142,11 @@ public class GridCacheDhtPreloadMessageCountTest extends GridCommonAbstractTest private Collection<GridDhtPartitionsSingleMessage> sentMsgs = new ConcurrentLinkedQueue<>(); /** {@inheritDoc} */ - @Override public void sendMessage(ClusterNode node, Message msg) + @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackClosure) throws IgniteSpiException { recordMessage((GridIoMessage)msg); - super.sendMessage(node, msg); + super.sendMessage(node, msg, ackClosure); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b056a739/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java index f996877..7504645 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java @@ -30,6 +30,7 @@ import org.apache.ignite.internal.processors.cache.transactions.*; import org.apache.ignite.internal.util.lang.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; import org.apache.ignite.plugin.extensions.communication.*; import org.apache.ignite.resources.*; import org.apache.ignite.spi.*; @@ -488,7 +489,8 @@ public abstract class IgniteCachePrimaryNodeFailureRecoveryAbstractTest extends private List<T2<ClusterNode, GridIoMessage>> blockedMsgs = new ArrayList<>(); /** {@inheritDoc} */ - @Override public void sendMessage(ClusterNode node, Message msg) throws IgniteSpiException { + @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackClosure) + throws IgniteSpiException { if (msg instanceof GridIoMessage) { Object msg0 = ((GridIoMessage)msg).message(); @@ -505,7 +507,7 @@ public abstract class IgniteCachePrimaryNodeFailureRecoveryAbstractTest extends } } - super.sendMessage(node, msg); + super.sendMessage(node, msg, ackClosure); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b056a739/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicInvalidPartitionHandlingSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicInvalidPartitionHandlingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicInvalidPartitionHandlingSelfTest.java index d3bcf67..34ae647 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicInvalidPartitionHandlingSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicInvalidPartitionHandlingSelfTest.java @@ -28,6 +28,7 @@ import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.cache.version.*; import org.apache.ignite.internal.util.lang.*; import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; import org.apache.ignite.plugin.extensions.communication.*; import org.apache.ignite.spi.*; import org.apache.ignite.spi.communication.tcp.*; @@ -365,7 +366,7 @@ public class GridCacheAtomicInvalidPartitionHandlingSelfTest extends GridCommonA */ private static class DelayCommunicationSpi extends TcpCommunicationSpi { /** {@inheritDoc} */ - @Override public void sendMessage(ClusterNode node, Message msg) + @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackClosure) throws IgniteSpiException { try { if (delayMessage((GridIoMessage)msg)) @@ -375,7 +376,7 @@ public class GridCacheAtomicInvalidPartitionHandlingSelfTest extends GridCommonA throw new IgniteSpiException(e); } - super.sendMessage(node, msg); + super.sendMessage(node, msg, ackClosure); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b056a739/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheNearTxRollbackTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheNearTxRollbackTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheNearTxRollbackTest.java index 8df7fb1..82ca6d7 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheNearTxRollbackTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheNearTxRollbackTest.java @@ -24,6 +24,7 @@ import org.apache.ignite.configuration.*; import org.apache.ignite.internal.managers.communication.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.lang.*; import org.apache.ignite.plugin.extensions.communication.*; import org.apache.ignite.spi.*; import org.apache.ignite.spi.communication.tcp.*; @@ -119,7 +120,8 @@ public class IgniteCacheNearTxRollbackTest extends IgniteCacheAbstractTest { private volatile boolean sndFail; /** {@inheritDoc} */ - @Override public void sendMessage(ClusterNode node, Message msg) throws IgniteSpiException { + @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackClosure) + throws IgniteSpiException { if (msg instanceof GridIoMessage) { Object msg0 = ((GridIoMessage)msg).message(); @@ -127,7 +129,7 @@ public class IgniteCacheNearTxRollbackTest extends IgniteCacheAbstractTest { throw new IgniteSpiException("Test error"); } - super.sendMessage(node, msg); + super.sendMessage(node, msg, ackClosure); } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b056a739/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedInvalidateSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedInvalidateSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedInvalidateSelfTest.java index 7a2e8b3..759e9d2 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedInvalidateSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedInvalidateSelfTest.java @@ -22,6 +22,7 @@ import org.apache.ignite.cluster.*; import org.apache.ignite.configuration.*; import org.apache.ignite.internal.managers.communication.*; import org.apache.ignite.internal.processors.clock.*; +import org.apache.ignite.lang.*; import org.apache.ignite.plugin.extensions.communication.*; import org.apache.ignite.spi.*; import org.apache.ignite.spi.communication.tcp.*; @@ -215,7 +216,8 @@ public class GridCacheReplicatedInvalidateSelfTest extends GridCommonAbstractTes } /** {@inheritDoc} */ - @Override public void sendMessage(ClusterNode destNode, Message msg) + @Override public void sendMessage(ClusterNode destNode, Message msg, + IgniteInClosure<IgniteException> ackClosure) throws IgniteSpiException { Object msg0 = ((GridIoMessage)msg).message(); @@ -231,7 +233,7 @@ public class GridCacheReplicatedInvalidateSelfTest extends GridCommonAbstractTes } } - super.sendMessage(destNode, msg); + super.sendMessage(destNode, msg, ackClosure); } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b056a739/modules/core/src/test/java/org/apache/ignite/util/TestTcpCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/util/TestTcpCommunicationSpi.java b/modules/core/src/test/java/org/apache/ignite/util/TestTcpCommunicationSpi.java index 6e4e50b..5b618a2 100644 --- a/modules/core/src/test/java/org/apache/ignite/util/TestTcpCommunicationSpi.java +++ b/modules/core/src/test/java/org/apache/ignite/util/TestTcpCommunicationSpi.java @@ -20,6 +20,7 @@ package org.apache.ignite.util; import org.apache.ignite.*; import org.apache.ignite.cluster.*; import org.apache.ignite.internal.managers.communication.*; +import org.apache.ignite.lang.*; import org.apache.ignite.plugin.extensions.communication.*; import org.apache.ignite.spi.*; import org.apache.ignite.spi.communication.tcp.*; @@ -35,14 +36,15 @@ public class TestTcpCommunicationSpi extends TcpCommunicationSpi { private Class ignoreMsg; /** {@inheritDoc} */ - @Override public void sendMessage(final ClusterNode node, final Message msg) throws IgniteSpiException { + @Override public void sendMessage(final ClusterNode node, final Message msg, + IgniteInClosure<IgniteException> ackClosure) throws IgniteSpiException { if (stopped) return; if (ignoreMsg != null && ((GridIoMessage)msg).message().getClass().equals(ignoreMsg)) return; - super.sendMessage(node, msg); + super.sendMessage(node, msg, ackClosure); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b056a739/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheScanPartitionQueryFallbackSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheScanPartitionQueryFallbackSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheScanPartitionQueryFallbackSelfTest.java index b7f5fa8..84ceafd 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheScanPartitionQueryFallbackSelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheScanPartitionQueryFallbackSelfTest.java @@ -345,13 +345,14 @@ public class CacheScanPartitionQueryFallbackSelfTest extends GridCommonAbstractT /** {@inheritDoc} */ @Override public TcpCommunicationSpi create() { return new TcpCommunicationSpi() { - @Override public void sendMessage(ClusterNode node, Message msg) throws IgniteSpiException { + @Override public void sendMessage(ClusterNode node, Message msg, + IgniteInClosure<IgniteException> ackClosure) throws IgniteSpiException { Object origMsg = ((GridIoMessage)msg).message(); if (origMsg instanceof GridCacheQueryRequest) fail(); //should use local node - super.sendMessage(node, msg); + super.sendMessage(node, msg, ackClosure); } }; } @@ -364,13 +365,14 @@ public class CacheScanPartitionQueryFallbackSelfTest extends GridCommonAbstractT /** {@inheritDoc} */ @Override public TcpCommunicationSpi create() { return new TcpCommunicationSpi() { - @Override public void sendMessage(ClusterNode node, Message msg) throws IgniteSpiException { + @Override public void sendMessage(ClusterNode node, Message msg, + IgniteInClosure<IgniteException> ackClosure) throws IgniteSpiException { Object origMsg = ((GridIoMessage)msg).message(); if (origMsg instanceof GridCacheQueryRequest) assertEquals(expNodeId, node.id()); - super.sendMessage(node, msg); + super.sendMessage(node, msg, ackClosure); } }; } @@ -383,7 +385,8 @@ public class CacheScanPartitionQueryFallbackSelfTest extends GridCommonAbstractT /** {@inheritDoc} */ @Override public TcpCommunicationSpi create() { return new TcpCommunicationSpi() { - @Override public void sendMessage(ClusterNode node, Message msg) throws IgniteSpiException { + @Override public void sendMessage(ClusterNode node, Message msg, + IgniteInClosure<IgniteException> ackClosure) throws IgniteSpiException { Object origMsg = ((GridIoMessage)msg).message(); if (origMsg instanceof GridCacheQueryRequest) { @@ -400,7 +403,7 @@ public class CacheScanPartitionQueryFallbackSelfTest extends GridCommonAbstractT } } - super.sendMessage(node, msg); + super.sendMessage(node, msg, ackClosure); } }; }