Repository: incubator-ignite Updated Branches: refs/heads/ignite-901 1a2ed51a4 -> 29c7fa734
# ignite-901 WIP Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/29c7fa73 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/29c7fa73 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/29c7fa73 Branch: refs/heads/ignite-901 Commit: 29c7fa7349e01763f657978a322753ffd8f96984 Parents: 1a2ed51 Author: sboikov <sboi...@gridgain.com> Authored: Mon Jul 6 17:22:30 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Mon Jul 6 17:35:34 2015 +0300 ---------------------------------------------------------------------- .../ignite/internal/GridKernalContextImpl.java | 12 +- .../apache/ignite/internal/IgniteKernal.java | 6 +- .../datastreamer/DataStreamProcessor.java | 7 + .../datastreamer/DataStreamerImpl.java | 7 + .../processors/service/GridServiceProxy.java | 3 + ...IgniteClientReconnectDiscoveryStateTest.java | 8 +- .../IgniteClientReconnectFailoverSelfTest.java | 290 ------------------- .../IgniteClientReconnectFailoverTest.java | 290 +++++++++++++++++++ .../IgniteClientReconnectServicesTest.java | 4 +- .../IgniteClientReconnectStreamerTest.java | 1 + .../IgniteClientReconnectTestSuite.java | 2 +- 11 files changed, 334 insertions(+), 296 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/29c7fa73/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java index a4edefb..4a60e28 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java @@ -305,6 +305,9 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable /** Marshaller context. */ private MarshallerContextImpl marshCtx; + /** */ + private volatile boolean disconnected; + /** * No-arg constructor is required by externalization. */ @@ -915,7 +918,14 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable /** {@inheritDoc} */ @Override public boolean clientDisconnected() { - return locNode.isClient() && gateway().getState() == DISCONNECTED; + return locNode.isClient() && disconnected; + } + + /** + * @param disconnected Disconnected flag. + */ + void disconnected(boolean disconnected) { + this.disconnected = disconnected; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/29c7fa73/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java index 5876288..4af69f7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java @@ -2818,6 +2818,8 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { ctx.cluster().get().clientReconnectFuture(userFut); + ctx.disconnected(true); + List<GridComponent> comps = ctx.components(); for (ListIterator<GridComponent> it = comps.listIterator(comps.size()); it.hasPrevious();) { @@ -2850,10 +2852,12 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { /** * @param clusterRestarted {@code True} if all cluster nodes restarted while client was disconnected. */ - public void reconnected(boolean clusterRestarted) { + public void reconnected(final boolean clusterRestarted) { Throwable err = null; try { + ctx.disconnected(false); + for (GridComponent comp : ctx.components()) comp.onReconnected(clusterRestarted); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/29c7fa73/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java index 9e53bb5..ee95019 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java @@ -27,6 +27,7 @@ import org.apache.ignite.internal.util.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.internal.util.worker.*; +import org.apache.ignite.lang.*; import org.apache.ignite.marshaller.*; import org.apache.ignite.stream.*; import org.apache.ignite.thread.*; @@ -139,6 +140,12 @@ public class DataStreamProcessor<K, V> extends GridProcessorAdapter { log.debug("Stopped data streamer processor."); } + /** {@inheritDoc} */ + @Override public void onDisconnected(IgniteFuture<?> reconnectFut) throws IgniteCheckedException { + for (DataStreamerImpl<?, ?> ldr : ldrs) + ldr.onDisconnected(reconnectFut); + } + /** * @param cacheName Cache name ({@code null} for default cache). * @return Data loader. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/29c7fa73/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java index 26b0568..b0be06d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java @@ -888,6 +888,13 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed } /** + * @param reconnectFut Reconnect future. + */ + public void onDisconnected(IgniteFuture<?> reconnectFut) { + + } + + /** * @return {@code true} If the loader is closed. */ boolean isClosed() { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/29c7fa73/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProxy.java index 67ddc6f..556beea 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProxy.java @@ -165,6 +165,9 @@ class GridServiceProxy<T> implements Serializable { catch (RuntimeException | Error e) { throw e; } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } catch (Exception e) { throw new IgniteException(e); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/29c7fa73/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectDiscoveryStateTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectDiscoveryStateTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectDiscoveryStateTest.java index 77927a7..7bd3531 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectDiscoveryStateTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectDiscoveryStateTest.java @@ -79,8 +79,14 @@ public class IgniteClientReconnectDiscoveryStateTest extends IgniteClientReconne client.events().localListen(new IgnitePredicate<Event>() { @Override public boolean apply(Event evt) { - if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) + if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) { info("Disconnected: " + evt); + + IgniteFuture<?> fut = client.cluster().clientReconnectFuture(); + + assertNotNull(fut); + assertFalse(fut.isDone()); + } else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) { info("Reconnected: " + evt); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/29c7fa73/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverSelfTest.java deleted file mode 100644 index f938733..0000000 --- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverSelfTest.java +++ /dev/null @@ -1,290 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal; - -import org.apache.ignite.*; -import org.apache.ignite.cache.*; -import org.apache.ignite.configuration.*; -import org.apache.ignite.events.*; -import org.apache.ignite.internal.util.lang.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.testframework.*; -import org.apache.ignite.transactions.*; - -import javax.cache.*; -import javax.cache.processor.*; -import java.util.*; -import java.util.concurrent.*; -import java.util.concurrent.atomic.*; - -import static org.apache.ignite.events.EventType.*; - -/** - * - */ -public class IgniteClientReconnectFailoverSelfTest extends IgniteClientReconnectAbstractTest { - /** */ - public final Integer THREADS = 8; - - /** */ - public final Integer RESTART_CNT = 30; - - /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration cfg = super.getConfiguration(gridName); - - cfg.setCacheConfiguration(new CacheConfiguration()); - - return cfg; - } - - /** {@inheritDoc} */ - @Override protected int serverCount() { - return 1; - } - - /** */ - private volatile CyclicBarrier barrier; - - /** - * @throws Exception If failed. - */ - public void testCacheOperationReconnectApi() throws Exception { - clientMode = true; - - final Ignite client = startGrid(serverCount()); - - assertNotNull(client.cache(null)); - - Ignite srv = clientRouter(client); - - TestTcpDiscoverySpi srvSpi = spi(srv); - - final AtomicBoolean stop = new AtomicBoolean(false); - - final AtomicLong cntr = new AtomicLong(); - - final IgniteQueue<Object> queue = client.queue("test-queue", 1000, new CollectionConfiguration()); - - final IgniteAtomicLong atomicLong = client.atomicLong("counter", 0, true); - - final IgniteAtomicSequence sequence = client.atomicSequence("sequence", 0, true); - - final IgniteInternalFuture<Long> future = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() { - @Override public Object call() throws Exception { - IgniteCache<Integer, Integer> cache = client.cache(null); - - IgniteCompute compute = client.compute(); - - Set<Integer> keys = new TreeSet<>(); - final Map<Integer, Integer> entries = new TreeMap<>(); - - for (int i = 0; i < 50; i++) { - keys.add(i); - entries.put(i, i); - } - - while (!stop.get()) { - cntr.incrementAndGet(); - - try { - // Start cache operations. - for (int i = 0; i < 10; i++) { - cache.put(i, i); - cache.get(i); - cache.remove(i); - - cache.putAll(entries); - - cache.invokeAll(keys, new CacheEntryProcessor<Integer, Integer, Object>() { - @Override public Object process(MutableEntry<Integer, Integer> entry, - Object... arguments) throws EntryProcessorException { - if (ThreadLocalRandom.current().nextBoolean()) - entry.setValue(entry.getValue() * 100); - else - entry.remove(); - - return entry; - } - }); - } - - try (Transaction tx = client.transactions().txStart()) { - for (int i = 0; i < 10; i++) { - cache.put(i, i); - cache.get(i); - } - - tx.commit(); - } - - // Start async cache operations. - IgniteCache<Integer, Integer> asyncCache = cache.withAsync(); - - for (int i = 0; i < 10; i++) { - asyncCache.put(i, i); - - asyncCache.future().get(); - - asyncCache.get(i); - - asyncCache.future().get(); - } - - // Compute. -// for (int i = 0; i < 10; i++) { -// compute.broadcast(new IgniteCallable<Integer>() { -// @IgniteInstanceResource -// private Ignite ignite; -// -// @Override public Integer call() throws Exception { -// return ignite.cache(null).localSize(); -// } -// }); -// -// compute.broadcast(new IgniteRunnable() { -// @Override public void run() { -// // No-op. -// } -// }); -// -// compute.apply(new C1<String, String>() { -// @Override public String apply(String o) { -// return o.toUpperCase(); -// } -// }, Arrays.asList("a", "b", "c")); -// } - - //Data structures. -// for (int i = 0; i < 10; i++) { -// assert atomicLong.incrementAndGet() >= 0; -// -// queue.offer("Test item"); -// -// if (ThreadLocalRandom.current().nextBoolean()) -// for (int j = 0; j < 50; j++) -// queue.poll(); -// -// assert queue.size() <= 1000; -// -// assert sequence.addAndGet(i + 1) >= 0; -// } - } - catch (CacheException | IgniteException e) { - log.info("Operation failed, ignore: " + e); - } - - if (cntr.get() % 100 == 0) - log.info("Iteration: " + cntr); - - if (barrier != null) - try { - barrier.await(); - } - catch (BrokenBarrierException e) { - log.warning("Broken barrier.", e); - - break; - } - } - - return null; - } - }, THREADS, "test-operation-thread-" + client.name()); - - final AtomicBoolean disconnected = new AtomicBoolean(false); - - final AtomicBoolean reconnected = new AtomicBoolean(false); - - client.events().localListen(new IgnitePredicate<Event>() { - @Override public boolean apply(Event evt) { - if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) { - info("Disconnected: " + evt); - - if (!reconnected.get()) - disconnected.set(true); - } - else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) { - info("Reconnected: " + evt); - - if (disconnected.get()) - reconnected.set(true); - } - - return true; - } - }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED); - - for (int i = 0; i < RESTART_CNT; i++) { - U.sleep(2000); - - log.info("Block reconnect."); - - reconnected.set(false); - - disconnected.set(false); - - log.info("Fail client."); - - srvSpi.failNode(client.cluster().localNode().id(), null); - - GridTestUtils.waitForCondition(new GridAbsPredicate() { - @Override public boolean apply() { - return disconnected.get(); - } - }, 5000L); - - barrier = new CyclicBarrier(THREADS + 1, new Runnable() { - @Override public void run() { - barrier = null; - } - }); - - GridTestUtils.waitForCondition(new GridAbsPredicate() { - @Override public boolean apply() { - return reconnected.get(); - } - }, 5000L); - - try { - barrier.await(10, TimeUnit.SECONDS); - } - catch (TimeoutException e) { - log.error("Failed. Operation hangs."); - - for (Ignite ignite : G.allGrids()) - dumpCacheDebugInfo(ignite); - - U.dumpThreads(log); - - CyclicBarrier barrier0 = barrier; - - if (barrier0 != null) - barrier0.reset(); - - stop.set(true); - - fail("Failed to wait for update."); - } - } - - stop.set(true); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/29c7fa73/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverTest.java new file mode 100644 index 0000000..e51d68a --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverTest.java @@ -0,0 +1,290 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.events.*; +import org.apache.ignite.internal.util.lang.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.testframework.*; +import org.apache.ignite.transactions.*; + +import javax.cache.*; +import javax.cache.processor.*; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.events.EventType.*; + +/** + * + */ +public class IgniteClientReconnectFailoverTest extends IgniteClientReconnectAbstractTest { + /** */ + public final Integer THREADS = 8; + + /** */ + public final Integer RESTART_CNT = 30; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setCacheConfiguration(new CacheConfiguration()); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected int serverCount() { + return 1; + } + + /** */ + private volatile CyclicBarrier barrier; + + /** + * @throws Exception If failed. + */ + public void testCacheOperationReconnectApi() throws Exception { + clientMode = true; + + final Ignite client = startGrid(serverCount()); + + assertNotNull(client.cache(null)); + + Ignite srv = clientRouter(client); + + TestTcpDiscoverySpi srvSpi = spi(srv); + + final AtomicBoolean stop = new AtomicBoolean(false); + + final AtomicLong cntr = new AtomicLong(); + + final IgniteQueue<Object> queue = client.queue("test-queue", 1000, new CollectionConfiguration()); + + final IgniteAtomicLong atomicLong = client.atomicLong("counter", 0, true); + + final IgniteAtomicSequence sequence = client.atomicSequence("sequence", 0, true); + + final IgniteInternalFuture<Long> future = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + IgniteCache<Integer, Integer> cache = client.cache(null); + + IgniteCompute compute = client.compute(); + + Set<Integer> keys = new TreeSet<>(); + final Map<Integer, Integer> entries = new TreeMap<>(); + + for (int i = 0; i < 50; i++) { + keys.add(i); + entries.put(i, i); + } + + while (!stop.get()) { + cntr.incrementAndGet(); + + try { + // Start cache operations. + for (int i = 0; i < 10; i++) { + cache.put(i, i); + cache.get(i); + cache.remove(i); + + cache.putAll(entries); + + cache.invokeAll(keys, new CacheEntryProcessor<Integer, Integer, Object>() { + @Override public Object process(MutableEntry<Integer, Integer> entry, + Object... arguments) throws EntryProcessorException { + if (ThreadLocalRandom.current().nextBoolean()) + entry.setValue(entry.getValue() * 100); + else + entry.remove(); + + return entry; + } + }); + } + + try (Transaction tx = client.transactions().txStart()) { + for (int i = 0; i < 10; i++) { + cache.put(i, i); + cache.get(i); + } + + tx.commit(); + } + + // Start async cache operations. + IgniteCache<Integer, Integer> asyncCache = cache.withAsync(); + + for (int i = 0; i < 10; i++) { + asyncCache.put(i, i); + + asyncCache.future().get(); + + asyncCache.get(i); + + asyncCache.future().get(); + } + + // Compute. +// for (int i = 0; i < 10; i++) { +// compute.broadcast(new IgniteCallable<Integer>() { +// @IgniteInstanceResource +// private Ignite ignite; +// +// @Override public Integer call() throws Exception { +// return ignite.cache(null).localSize(); +// } +// }); +// +// compute.broadcast(new IgniteRunnable() { +// @Override public void run() { +// // No-op. +// } +// }); +// +// compute.apply(new C1<String, String>() { +// @Override public String apply(String o) { +// return o.toUpperCase(); +// } +// }, Arrays.asList("a", "b", "c")); +// } + + //Data structures. +// for (int i = 0; i < 10; i++) { +// assert atomicLong.incrementAndGet() >= 0; +// +// queue.offer("Test item"); +// +// if (ThreadLocalRandom.current().nextBoolean()) +// for (int j = 0; j < 50; j++) +// queue.poll(); +// +// assert queue.size() <= 1000; +// +// assert sequence.addAndGet(i + 1) >= 0; +// } + } + catch (CacheException | IgniteException e) { + log.info("Operation failed, ignore: " + e); + } + + if (cntr.get() % 100 == 0) + log.info("Iteration: " + cntr); + + if (barrier != null) + try { + barrier.await(); + } + catch (BrokenBarrierException e) { + log.warning("Broken barrier.", e); + + break; + } + } + + return null; + } + }, THREADS, "test-operation-thread-" + client.name()); + + final AtomicBoolean disconnected = new AtomicBoolean(false); + + final AtomicBoolean reconnected = new AtomicBoolean(false); + + client.events().localListen(new IgnitePredicate<Event>() { + @Override public boolean apply(Event evt) { + if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) { + info("Disconnected: " + evt); + + if (!reconnected.get()) + disconnected.set(true); + } + else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) { + info("Reconnected: " + evt); + + if (disconnected.get()) + reconnected.set(true); + } + + return true; + } + }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED); + + for (int i = 0; i < RESTART_CNT; i++) { + U.sleep(2000); + + log.info("Block reconnect."); + + reconnected.set(false); + + disconnected.set(false); + + log.info("Fail client."); + + srvSpi.failNode(client.cluster().localNode().id(), null); + + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return disconnected.get(); + } + }, 5000L); + + barrier = new CyclicBarrier(THREADS + 1, new Runnable() { + @Override public void run() { + barrier = null; + } + }); + + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return reconnected.get(); + } + }, 5000L); + + try { + barrier.await(10, TimeUnit.SECONDS); + } + catch (TimeoutException e) { + log.error("Failed. Operation hangs."); + + for (Ignite ignite : G.allGrids()) + dumpCacheDebugInfo(ignite); + + U.dumpThreads(log); + + CyclicBarrier barrier0 = barrier; + + if (barrier0 != null) + barrier0.reset(); + + stop.set(true); + + fail("Failed to wait for update."); + } + } + + stop.set(true); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/29c7fa73/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectServicesTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectServicesTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectServicesTest.java index 58715a1..6ccbbe0 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectServicesTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectServicesTest.java @@ -18,7 +18,7 @@ package org.apache.ignite.internal; import org.apache.ignite.*; -import org.apache.ignite.internal.processors.continuous.*; +import org.apache.ignite.internal.processors.cache.distributed.near.*; import org.apache.ignite.internal.processors.service.*; import org.apache.ignite.resources.*; import org.apache.ignite.services.*; @@ -129,7 +129,7 @@ public class IgniteClientReconnectServicesTest extends IgniteClientReconnectAbst BlockTpcCommunicationSpi commSpi = commSpi(srv); - commSpi.blockMsg(GridContinuousMessage.class); + commSpi.blockMsg(GridNearTxPrepareResponse.class); final IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() { @Override public Object call() throws Exception { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/29c7fa73/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectStreamerTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectStreamerTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectStreamerTest.java index e85c315..50feb86 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectStreamerTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectStreamerTest.java @@ -81,6 +81,7 @@ public class IgniteClientReconnectStreamerTest extends IgniteClientReconnectAbst return srvCache.localSize() == 50; } }, 2000L); + reconnectClientNode(client, srv, null); for (int i = 0; i < 50; i++) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/29c7fa73/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientReconnectTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientReconnectTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientReconnectTestSuite.java index fb41f0f..93137bf 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientReconnectTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientReconnectTestSuite.java @@ -33,7 +33,6 @@ public class IgniteClientReconnectTestSuite extends TestSuite { suite.addTestSuite(IgniteClientReconnectStopTest.class); suite.addTestSuite(IgniteClientReconnectApiBlockTest.class); - suite.addTestSuite(IgniteClientReconnectFailoverSelfTest.class); suite.addTestSuite(IgniteClientReconnectDiscoveryStateTest.class); suite.addTestSuite(IgniteClientReconnectCacheTest.class); suite.addTestSuite(IgniteClientReconnectContinuousProcessorTest.class); @@ -42,6 +41,7 @@ public class IgniteClientReconnectTestSuite extends TestSuite { suite.addTestSuite(IgniteClientReconnectCollectionsTest.class); suite.addTestSuite(IgniteClientReconnectServicesTest.class); suite.addTestSuite(IgniteClientReconnectStreamerTest.class); + suite.addTestSuite(IgniteClientReconnectFailoverTest.class); return suite; }