Repository: incubator-ignite Updated Branches: refs/heads/ignite-901 f5f3efd16 -> ff3d61fcc
# ignite-901 client reconnect WIP Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/ff3d61fc Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/ff3d61fc Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/ff3d61fc Branch: refs/heads/ignite-901 Commit: ff3d61fcc6ac310c247e2a5c91d91495f3f66a04 Parents: f5f3efd Author: sboikov <sboi...@gridgain.com> Authored: Thu Jul 2 13:18:25 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Thu Jul 2 17:38:41 2015 +0300 ---------------------------------------------------------------------- .../IgniteClientDisconnectedException.java | 33 ++ .../apache/ignite/internal/GridComponent.java | 1 + ...gniteClientDisconnectedCheckedException.java | 32 ++ .../IgniteDisconnectedCheckedException.java | 32 -- .../apache/ignite/internal/IgniteKernal.java | 65 ---- .../processors/cache/GridCacheMvccManager.java | 2 +- .../GridCachePartitionExchangeManager.java | 6 +- .../continuous/GridContinuousHandler.java | 9 +- .../continuous/GridContinuousProcessor.java | 23 +- .../datastructures/DataStructuresProcessor.java | 7 +- .../GridCacheCountDownLatchImpl.java | 49 ++- .../service/GridServiceProcessor.java | 8 +- .../processors/service/GridServiceProxy.java | 10 +- .../ignite/internal/util/IgniteUtils.java | 6 + .../spi/discovery/DiscoverySpiDataExchange.java | 3 +- .../IgniteClientReconnectAbstractTest.java | 19 +- .../IgniteClientReconnectAtomicsTest.java | 315 +++++++++++++++++ .../IgniteClientReconnectCacheTest.java | 9 +- .../IgniteClientReconnectCollectionsTest.java | 132 +++++++ .../IgniteClientReconnectComputeTest.java | 35 ++ ...eClientReconnectContinuousProcessorTest.java | 350 +++++++++++++++++++ .../IgniteClientReconnectTestSuite.java | 4 + 22 files changed, 1010 insertions(+), 140 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ff3d61fc/modules/core/src/main/java/org/apache/ignite/IgniteClientDisconnectedException.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteClientDisconnectedException.java b/modules/core/src/main/java/org/apache/ignite/IgniteClientDisconnectedException.java new file mode 100644 index 0000000..9500ac2 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/IgniteClientDisconnectedException.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite; + +import org.jetbrains.annotations.*; + +/** + * + */ +public class IgniteClientDisconnectedException extends IgniteException { + /** + * @param msg Error message. + * @param cause Optional nested exception (can be {@code null}). + */ + public IgniteClientDisconnectedException(String msg, @Nullable Throwable cause) { + super(msg, cause); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ff3d61fc/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java b/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java index 5b3b0c3..7f0f1b8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java @@ -87,6 +87,7 @@ public interface GridComponent { /** * Receives discovery data object from remote nodes (called * on new node during discovery process). + * * @param joiningNodeId Joining node ID. * @param rmtNodeId Remote node ID for which data is provided. * @param data Discovery data object or {@code null} if nothing was http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ff3d61fc/modules/core/src/main/java/org/apache/ignite/internal/IgniteClientDisconnectedCheckedException.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteClientDisconnectedCheckedException.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteClientDisconnectedCheckedException.java new file mode 100644 index 0000000..2e999f4 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteClientDisconnectedCheckedException.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import org.apache.ignite.*; + +/** + * + */ +public class IgniteClientDisconnectedCheckedException extends IgniteCheckedException { + /** + * @param msg Message. + */ + public IgniteClientDisconnectedCheckedException(String msg) { + super(msg); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ff3d61fc/modules/core/src/main/java/org/apache/ignite/internal/IgniteDisconnectedCheckedException.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteDisconnectedCheckedException.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteDisconnectedCheckedException.java deleted file mode 100644 index 0684356..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteDisconnectedCheckedException.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal; - -import org.apache.ignite.*; - -/** - * - */ -public class IgniteDisconnectedCheckedException extends IgniteCheckedException { - /** - * @param msg Message. - */ - public IgniteDisconnectedCheckedException(String msg) { - super(msg); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ff3d61fc/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java index 821a1f5..c04f327 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java @@ -2816,71 +2816,6 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { } } - private void stopOnDisconnect() { - GridCacheProcessor cacheProcessor = ctx.cache(); - - List<GridComponent> comps = ctx.components(); - - // Callback component in reverse order while kernal is still functional - // if called in the same thread, at least. - for (ListIterator<GridComponent> it = comps.listIterator(comps.size()); it.hasPrevious();) { - GridComponent comp = it.previous(); - - try { - if (!skipDaemon(comp) && (!(comp instanceof GridManager))) - comp.onKernalStop(true); - } - catch (Throwable e) { - errOnStop = true; - - U.error(log, "Failed to pre-stop processor: " + comp, e); - - if (e instanceof Error) - throw e; - } - } - - if (cacheProcessor != null) - cacheProcessor.cancelUserOperations(); - - for (ListIterator<GridComponent> it = comps.listIterator(comps.size()); it.hasPrevious();) { - GridComponent comp = it.previous(); - - try { - if (!skipDaemon(comp) && (!(comp instanceof GridManager))) { - comp.stop(true); - - if (log.isDebugEnabled()) - log.debug("Component stopped: " + comp); - } - } - catch (Throwable e) { - errOnStop = true; - - U.error(log, "Failed to stop component (ignoring): " + comp, e); - - if (e instanceof Error) - throw (Error)e; - } - } - - ctx.marshallerContext().onDisconnected(); - } - - private void restart() throws IgniteCheckedException { - List<PluginProvider> plugins = U.allPluginProviders(); - - startProcessor(new ClusterProcessor(ctx)); - - GridResourceProcessor rsrcProc = new GridResourceProcessor(ctx); - - rsrcProc.setSpringContext(rsrcCtx); - - scheduler = new IgniteSchedulerImpl(ctx); - - startProcessor(rsrcProc); - } - /** * */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ff3d61fc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java index e2d22dd..9c89041 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java @@ -299,7 +299,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { public void cancelClientFutures(boolean stop) { IgniteCheckedException e = stop ? new IgniteCheckedException("Operation has been cancelled (node is stopping).") : - new IgniteCheckedException("Operation has been cancelled (node disconnected)."); + new IgniteClientDisconnectedCheckedException("Operation has been cancelled (node disconnected)."); for (Collection<GridCacheFuture<?>> futures : futs.values()) { for (GridCacheFuture<?> future : futures) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ff3d61fc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index f0c9b3b..3f06e8e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -286,7 +286,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana break; } - catch (IgniteDisconnectedCheckedException e) { + catch (IgniteClientDisconnectedCheckedException e) { log.info("Disconnected while waiting for initial partition map exchange: " + e); break; @@ -331,7 +331,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana cctx.io().removeHandler(0, GridDhtPartitionsSingleRequest.class); IgniteCheckedException err = disconnected ? - new IgniteDisconnectedCheckedException("Node disconnected: " + cctx.gridName()) : + new IgniteClientDisconnectedCheckedException("Node disconnected: " + cctx.gridName()) : new IgniteInterruptedCheckedException("Node is stopping: " + cctx.gridName()); // Finish all exchange futures. @@ -1119,7 +1119,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana catch (IgniteInterruptedCheckedException e) { throw e; } - catch (IgniteDisconnectedCheckedException e) { + catch (IgniteClientDisconnectedCheckedException e) { return; } catch (IgniteCheckedException e) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ff3d61fc/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java index ce9b7c0..79020da 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java @@ -33,7 +33,14 @@ public interface GridContinuousHandler extends Externalizable, Cloneable { * Listener registration status. */ public enum RegisterStatus { - REGISTERED, NOT_REGISTERED, DELAYED + /** */ + REGISTERED, + + /** */ + NOT_REGISTERED, + + /** */ + DELAYED } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ff3d61fc/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java index dd04bf4..7508acd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java @@ -318,11 +318,10 @@ public class GridContinuousProcessor extends GridProcessorAdapter { /** {@inheritDoc} */ @Override @Nullable public Serializable collectDiscoveryData(UUID nodeId) { - if (!nodeId.equals(ctx.localNodeId())) { + if (!nodeId.equals(ctx.localNodeId()) || !locInfos.isEmpty()) { DiscoveryData data = new DiscoveryData(ctx.localNodeId(), clientInfos); - // Collect listeners information (will be sent to - // joining node during discovery process). + // Collect listeners information (will be sent to joining node during discovery process). for (Map.Entry<UUID, LocalRoutineInfo> e : locInfos.entrySet()) { UUID routineId = e.getKey(); LocalRoutineInfo info = e.getValue(); @@ -333,8 +332,8 @@ public class GridContinuousProcessor extends GridProcessorAdapter { return data; } - else - return null; + + return null; } /** {@inheritDoc} */ @@ -377,6 +376,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter { * Callback invoked when cache is started. * * @param ctx Cache context. + * @throws IgniteCheckedException If failed. */ public void onCacheStart(GridCacheContext ctx) throws IgniteCheckedException { for (Map.Entry<UUID, RemoteRoutineInfo> entry : rmtInfos.entrySet()) { @@ -630,6 +630,16 @@ public class GridContinuousProcessor extends GridProcessorAdapter { } } + /** {@inheritDoc} */ + @Override public void onDisconnected() throws IgniteCheckedException { + for (UUID rmtId : rmtInfos.keySet()) + unregisterRemote(rmtId); + + rmtInfos.clear(); + + clientInfos.clear(); + } + /** * @param nodeId Node ID. * @param routineId Routine ID. @@ -637,6 +647,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter { * @param toSnd Notification object to send. * @param orderedTopic Topic for ordered notifications. * If {@code null}, non-ordered message will be sent. + * @param msg If {@code true} then sent data is collection of messages. * @throws IgniteCheckedException In case of error. */ private void sendNotification(UUID nodeId, @@ -1221,6 +1232,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter { @GridToStringInclude private Collection<DiscoveryDataItem> items; + /** */ private Map<UUID, Map<UUID, LocalRoutineInfo>> clientInfos; /** @@ -1232,6 +1244,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter { /** * @param nodeId Node ID. + * @param clientInfos Client information. */ DiscoveryData(UUID nodeId, Map<UUID, Map<UUID, LocalRoutineInfo>> clientInfos) { assert nodeId != null; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ff3d61fc/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java index e6335b6..d4f67fa 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java @@ -1001,8 +1001,11 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { dsView.put(key, val); } - latch = new GridCacheCountDownLatchImpl(name, val.get(), val.initialCount(), - val.autoDelete(), key, cntDownLatchView, dsCacheCtx); + latch = new GridCacheCountDownLatchImpl(name, val.initialCount(), + val.autoDelete(), + key, + cntDownLatchView, + dsCacheCtx); dsMap.put(key, latch); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ff3d61fc/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java index 33547d9..6a0f5af 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java @@ -66,9 +66,6 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc /** Cache context. */ private GridCacheContext ctx; - /** Current count. */ - private int cnt; - /** Initial count. */ private int initCnt; @@ -95,7 +92,6 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc * Constructor. * * @param name Latch name. - * @param cnt Current count. * @param initCnt Initial count. * @param autoDel Auto delete flag. * @param key Latch key. @@ -103,7 +99,6 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc * @param ctx Cache context. */ public GridCacheCountDownLatchImpl(String name, - int cnt, int initCnt, boolean autoDel, GridCacheInternalKey key, @@ -111,14 +106,12 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc GridCacheContext ctx) { assert name != null; - assert cnt >= 0; assert initCnt >= 0; assert key != null; assert latchView != null; assert ctx != null; this.name = name; - this.cnt = cnt; this.initCnt = initCnt; this.autoDel = autoDel; this.key = key; @@ -135,7 +128,12 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc /** {@inheritDoc} */ @Override public int count() { - return cnt; + try { + return CU.outTx(new GetCountCallable(), ctx); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } } /** {@inheritDoc} */ @@ -211,8 +209,6 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc /** {@inheritDoc} */ @Override public boolean onRemoved() { - assert cnt == 0; - return rmvd = true; } @@ -235,8 +231,6 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc @Override public void onUpdate(int cnt) { assert cnt >= 0; - this.cnt = cnt; - while (internalLatch != null && internalLatch.getCount() > cnt) internalLatch.countDown(); } @@ -257,9 +251,7 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc if (log.isDebugEnabled()) log.debug("Failed to find count down latch with given name: " + name); - assert cnt == 0; - - return new CountDownLatch(cnt); + return new CountDownLatch(0); } tx.commit(); @@ -342,6 +334,29 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc /** * */ + private class GetCountCallable implements Callable<Integer> { + /** {@inheritDoc} */ + @Override public Integer call() throws Exception { + Integer val; + + try (IgniteInternalTx tx = CU.txStartInternal(ctx, latchView, PESSIMISTIC, REPEATABLE_READ)) { + GridCacheCountDownLatchValue latchVal = latchView.get(key); + + if (latchVal == null) + throw new IgniteCheckedException("Failed to find count down latch with given name: " + name); + + val = latchVal.get(); + + tx.rollback(); + } + + return val; + } + } + + /** + * + */ private class CountDownCallable implements Callable<Integer> { /** Value to count down on (if 0 then latch is counted down to 0). */ private final int val; @@ -364,9 +379,7 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc if (log.isDebugEnabled()) log.debug("Failed to find count down latch with given name: " + name); - assert cnt == 0; - - return cnt; + return 0; } int retVal; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ff3d61fc/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java index bb451c7..89b2a31 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java @@ -124,7 +124,8 @@ public class GridServiceProcessor extends GridProcessorAdapter { cache = ctx.cache().utilityCache(); - ctx.event().addLocalEventListener(topLsnr, EVTS_DISCOVERY); + if (!ctx.clientNode()) + ctx.event().addLocalEventListener(topLsnr, EVTS_DISCOVERY); try { if (ctx.deploy().enabled()) @@ -165,7 +166,8 @@ public class GridServiceProcessor extends GridProcessorAdapter { busyLock.block(); - ctx.event().removeLocalEventListener(topLsnr); + if (!ctx.clientNode()) + ctx.event().removeLocalEventListener(topLsnr); if (cfgQryId != null) cache.context().continuousQueries().cancelInternalQuery(cfgQryId); @@ -1164,7 +1166,7 @@ public class GridServiceProcessor extends GridProcessorAdapter { } } catch (IgniteCheckedException ex) { - log.error("Failed to clean up zombie assignments for service: " + name, ex); + U.error(log, "Failed to clean up zombie assignments for service: " + name, ex); } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ff3d61fc/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProxy.java index 8e13bc4..67ddc6f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProxy.java @@ -68,9 +68,15 @@ class GridServiceProxy<T> implements Serializable { * @param name Service name. * @param svc Service type class. * @param sticky Whether multi-node request should be done. + * @param ctx Context. */ - @SuppressWarnings("unchecked") GridServiceProxy(ClusterGroup prj, String name, Class<? super T> svc, - boolean sticky, GridKernalContext ctx) { + @SuppressWarnings("unchecked") + GridServiceProxy(ClusterGroup prj, + String name, + Class<? super T> svc, + boolean sticky, + GridKernalContext ctx) + { this.prj = prj; this.ctx = ctx; hasLocNode = hasLocalNode(prj); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ff3d61fc/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java index f457d6c..7516f79 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java @@ -626,6 +626,12 @@ public abstract class IgniteUtils { } }); + m.put(IgniteClientDisconnectedCheckedException.class, new C1<IgniteCheckedException, IgniteException>() { + @Override public IgniteException apply(IgniteCheckedException e) { + return new IgniteClientDisconnectedException(e.getMessage(), e); + } + }); + return m; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ff3d61fc/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiDataExchange.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiDataExchange.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiDataExchange.java index 46d6716..038ea59 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiDataExchange.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiDataExchange.java @@ -39,7 +39,8 @@ public interface DiscoverySpiDataExchange { /** * Notifies discovery manager about data received from remote node. * - * @param joiningNodeId Remote node ID. + * @param joiningNodeId ID of new node that joins topology. + * @param nodeId ID of the node provided data. * @param data Collection of discovery data objects from different components. */ public void onExchange(UUID joiningNodeId, UUID nodeId, Map<Integer, Serializable> data); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ff3d61fc/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java index 0512074..23b8a15 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java @@ -65,6 +65,13 @@ public abstract class IgniteClientReconnectAbstractTest extends GridCommonAbstra protected abstract int serverCount(); /** + * @return Number of client nodes started before tests. + */ + protected int clientCount() { + return 0; + } + + /** * @param ignite Node. * @return Discovery SPI. */ @@ -79,7 +86,17 @@ public abstract class IgniteClientReconnectAbstractTest extends GridCommonAbstra int srvs = serverCount(); if (srvs > 0) - startGrids(srvs); + startGridsMultiThreaded(srvs); + + int clients = clientCount(); + + if (clients > 0) { + clientMode = true; + + startGridsMultiThreaded(srvs, clients); + + clientMode = false; + } } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ff3d61fc/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java new file mode 100644 index 0000000..bbb7eef --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java @@ -0,0 +1,315 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import org.apache.ignite.*; +import org.apache.ignite.events.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.testframework.*; + +import java.util.concurrent.*; + +import static java.util.concurrent.TimeUnit.*; +import static org.apache.ignite.events.EventType.*; + +/** + * TODO IGNITE-901: test AtomicReference, AtomicStamped, usage after remove, test API block, fail current call on disconnect. + */ +public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstractTest { + /** {@inheritDoc} */ + @Override protected int serverCount() { + return 1; + } + + /** {@inheritDoc} */ + @Override protected int clientCount() { + return 1; + } + + /** + * @throws Exception If failed. + */ + public void testAtomicLongReconnect() throws Exception { + Ignite client = grid(serverCount()); + + assertTrue(client.cluster().localNode().isClient()); + + Ignite srv = clientRouter(client); + + TestTcpDiscoverySpi srvSpi = spi(srv); + + IgniteAtomicLong clientAtomicLong = client.atomicLong("atomicLong", 0, true); + + assertEquals(0L, clientAtomicLong.getAndAdd(1)); + + IgniteAtomicLong srvAtomicLong = srv.atomicLong("atomicLong", 0, false); + + assertEquals(1L, srvAtomicLong.getAndAdd(1)); + + final CountDownLatch disconnectLatch = new CountDownLatch(1); + final CountDownLatch reconnectLatch = new CountDownLatch(1); + + final TestTcpDiscoverySpi clientSpi = spi(client); + + log.info("Block reconnect."); + + clientSpi.writeLatch = new CountDownLatch(1); + + client.events().localListen(new IgnitePredicate<Event>() { + @Override public boolean apply(Event evt) { + if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) { + info("Disconnected: " + evt); + + disconnectLatch.countDown(); + } + else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) { + info("Reconnected: " + evt); + + reconnectLatch.countDown(); + } + + return true; + } + }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED); + + srvSpi.failNode(client.cluster().localNode().id(), null); + + assertTrue(disconnectLatch.await(5000, MILLISECONDS)); + + assertEquals(2L, srvAtomicLong.getAndAdd(1)); + + log.info("Allow reconnect."); + + clientSpi.writeLatch.countDown(); + + assertTrue(reconnectLatch.await(5000, MILLISECONDS)); + + assertEquals(3L, clientAtomicLong.getAndAdd(1)); + + assertEquals(4L, srvAtomicLong.getAndAdd(1)); + + assertEquals(5L, clientAtomicLong.getAndAdd(1)); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicLongReconnectRemoved() throws Exception { + Ignite client = grid(serverCount()); + + assertTrue(client.cluster().localNode().isClient()); + + Ignite srv = clientRouter(client); + + TestTcpDiscoverySpi srvSpi = spi(srv); + + final IgniteAtomicLong clientAtomicLong = client.atomicLong("atomicLongRmv", 0, true); + + assertEquals(0L, clientAtomicLong.getAndAdd(1)); + + IgniteAtomicLong srvAtomicLong = srv.atomicLong("atomicLongRmv", 0, false); + + assertEquals(1L, srvAtomicLong.getAndAdd(1)); + + final CountDownLatch disconnectLatch = new CountDownLatch(1); + final CountDownLatch reconnectLatch = new CountDownLatch(1); + + final TestTcpDiscoverySpi clientSpi = spi(client); + + log.info("Block reconnect."); + + clientSpi.writeLatch = new CountDownLatch(1); + + client.events().localListen(new IgnitePredicate<Event>() { + @Override public boolean apply(Event evt) { + if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) { + info("Disconnected: " + evt); + + disconnectLatch.countDown(); + } else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) { + info("Reconnected: " + evt); + + reconnectLatch.countDown(); + } + + return true; + } + }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED); + + srvSpi.failNode(client.cluster().localNode().id(), null); + + assertTrue(disconnectLatch.await(5000, MILLISECONDS)); + + srvAtomicLong.close(); + + log.info("Allow reconnect."); + + clientSpi.writeLatch.countDown(); + + assertTrue(reconnectLatch.await(5000, MILLISECONDS)); + + GridTestUtils.assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + clientAtomicLong.getAndAdd(1); + + return null; + } + }, IgniteException.class, null); + } + + /** + * @throws Exception If failed. + */ + public void testLatchReconnect1() throws Exception { + Ignite client = grid(serverCount()); + + assertTrue(client.cluster().localNode().isClient()); + + Ignite srv = clientRouter(client); + + TestTcpDiscoverySpi srvSpi = spi(srv); + + IgniteCountDownLatch clientLatch = client.countDownLatch("latch1", 3, false, true); + + assertEquals(3, clientLatch.count()); + + IgniteCountDownLatch srvLatch = srv.countDownLatch("latch1", 3, false, false); + + assertEquals(3, srvLatch.count()); + + final CountDownLatch disconnectLatch = new CountDownLatch(1); + final CountDownLatch reconnectLatch = new CountDownLatch(1); + + final TestTcpDiscoverySpi clientSpi = spi(client); + + log.info("Block reconnect."); + + clientSpi.writeLatch = new CountDownLatch(1); + + client.events().localListen(new IgnitePredicate<Event>() { + @Override + public boolean apply(Event evt) { + if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) { + info("Disconnected: " + evt); + + disconnectLatch.countDown(); + } else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) { + info("Reconnected: " + evt); + + reconnectLatch.countDown(); + } + + return true; + } + }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED); + + srvSpi.failNode(client.cluster().localNode().id(), null); + + assertTrue(disconnectLatch.await(5000, MILLISECONDS)); + + srvLatch.countDown(); + + log.info("Allow reconnect."); + + clientSpi.writeLatch.countDown(); + + assertTrue(reconnectLatch.await(5000, MILLISECONDS)); + + assertEquals(2, srvLatch.count()); + assertEquals(2, clientLatch.count()); + + srvLatch.countDown(); + + assertEquals(1, srvLatch.count()); + assertEquals(1, clientLatch.count()); + + clientLatch.countDown(); + + assertEquals(0, srvLatch.count()); + assertEquals(0, clientLatch.count()); + + assertTrue(srvLatch.await(1000)); + assertTrue(clientLatch.await(1000)); + } + + /** + * @throws Exception If failed. + */ + public void _testLatchReconnect2() throws Exception { + Ignite client = grid(serverCount()); + + assertTrue(client.cluster().localNode().isClient()); + + Ignite srv = clientRouter(client); + + TestTcpDiscoverySpi srvSpi = spi(srv); + + final IgniteCountDownLatch clientLatch = client.countDownLatch("latch2", 1, false, true); + + IgniteCountDownLatch srvLatch = srv.countDownLatch("latch2", 1, false, false); + + assertFalse(clientLatch.await(100)); + + IgniteInternalFuture<Boolean> waitFut = GridTestUtils.runAsync(new Callable<Boolean>() { + @Override public Boolean call() throws Exception { + return clientLatch.await(60_000, MILLISECONDS); + } + }); + + final CountDownLatch disconnectLatch = new CountDownLatch(1); + final CountDownLatch reconnectLatch = new CountDownLatch(1); + + final TestTcpDiscoverySpi clientSpi = spi(client); + + log.info("Block reconnect."); + + clientSpi.writeLatch = new CountDownLatch(1); + + client.events().localListen(new IgnitePredicate<Event>() { + @Override public boolean apply(Event evt) { + if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) { + info("Disconnected: " + evt); + + disconnectLatch.countDown(); + } else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) { + info("Reconnected: " + evt); + + reconnectLatch.countDown(); + } + + return true; + } + }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED); + + srvSpi.failNode(client.cluster().localNode().id(), null); + + assertTrue(disconnectLatch.await(5000, MILLISECONDS)); + + srvLatch.countDown(); + + assertNotDone(waitFut); + + log.info("Allow reconnect."); + + clientSpi.writeLatch.countDown(); + + assertTrue(reconnectLatch.await(5000, MILLISECONDS)); + + assertTrue(waitFut.get(5000)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ff3d61fc/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java index 5687010..452f808 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java @@ -129,8 +129,7 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac assertEquals(1, reconnectLatch.getCount()); blockPutRef.set(GridTestUtils.runAsync(new Callable() { - @Override - public Object call() throws Exception { + @Override public Object call() throws Exception { log.info("Start put."); cache.put(2, 2); @@ -218,8 +217,7 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac final CountDownLatch reconnectLatch = new CountDownLatch(1); client.events().localListen(new IgnitePredicate<Event>() { - @Override - public boolean apply(Event evt) { + @Override public boolean apply(Event evt) { if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) { info("Reconnected: " + evt); @@ -305,8 +303,7 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac }, new int[]{EVT_CLIENT_NODE_RECONNECTED}); IgniteInternalFuture<Ignite> fut = GridTestUtils.runAsync(new Callable<Ignite>() { - @Override - public Ignite call() throws Exception { + @Override public Ignite call() throws Exception { try { return startGrid(SRV_CNT); } catch (Throwable e) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ff3d61fc/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCollectionsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCollectionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCollectionsTest.java new file mode 100644 index 0000000..fcb74cd --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCollectionsTest.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import org.apache.ignite.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.events.*; +import org.apache.ignite.lang.*; + +import java.util.concurrent.*; + +import static java.util.concurrent.TimeUnit.*; +import static org.apache.ignite.cache.CacheAtomicityMode.*; +import static org.apache.ignite.cache.CacheMode.*; +import static org.apache.ignite.events.EventType.*; + +/** + * TODO IGNITE-901: test for queue, check set/queue usage after remove, test API block, fail current call on disconnect. + */ +public class IgniteClientReconnectCollectionsTest extends IgniteClientReconnectAbstractTest { + /** {@inheritDoc} */ + @Override protected int serverCount() { + return 1; + } + + /** {@inheritDoc} */ + @Override protected int clientCount() { + return 1; + } + + /** + * @throws Exception If failed. + */ + public void testSetReconnect() throws Exception { + CollectionConfiguration colCfg = new CollectionConfiguration(); + + colCfg.setCacheMode(PARTITIONED); + colCfg.setAtomicityMode(TRANSACTIONAL); + + setReconnect(colCfg); + + colCfg = new CollectionConfiguration(); + + colCfg.setCacheMode(PARTITIONED); + colCfg.setAtomicityMode(ATOMIC); + + setReconnect(colCfg); + } + + /** + * @param colCfg Collection configuration. + * @throws Exception If failed. + */ + private void setReconnect(CollectionConfiguration colCfg) throws Exception { + Ignite client = grid(serverCount()); + + assertTrue(client.cluster().localNode().isClient()); + + Ignite srv = clientRouter(client); + + TestTcpDiscoverySpi srvSpi = spi(srv); + + final String setName = "set-" + colCfg.getAtomicityMode(); + + IgniteSet<String> clientSet = client.set(setName, colCfg); + + IgniteSet<String> srvSet = srv.set(setName, null); + + assertTrue(clientSet.add("1")); + + assertFalse(srvSet.add("1")); + + final CountDownLatch disconnectLatch = new CountDownLatch(1); + final CountDownLatch reconnectLatch = new CountDownLatch(1); + + final TestTcpDiscoverySpi clientSpi = spi(client); + + log.info("Block reconnect."); + + clientSpi.writeLatch = new CountDownLatch(1); + + client.events().localListen(new IgnitePredicate<Event>() { + @Override public boolean apply(Event evt) { + if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) { + info("Disconnected: " + evt); + + disconnectLatch.countDown(); + } + else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) { + info("Reconnected: " + evt); + + reconnectLatch.countDown(); + } + + return true; + } + }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED); + + srvSpi.failNode(client.cluster().localNode().id(), null); + + assertTrue(disconnectLatch.await(5000, MILLISECONDS)); + + assertTrue(srvSet.add("2")); + + log.info("Allow reconnect."); + + clientSpi.writeLatch.countDown(); + + assertTrue(reconnectLatch.await(5000, MILLISECONDS)); + + assertFalse(clientSet.add("2")); + + assertTrue(clientSet.remove("2")); + + assertFalse(srvSet.contains("2")); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ff3d61fc/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectComputeTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectComputeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectComputeTest.java new file mode 100644 index 0000000..01eb2ca --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectComputeTest.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +/** + * + */ +public class IgniteClientReconnectComputeTest extends IgniteClientReconnectAbstractTest { + /** {@inheritDoc} */ + @Override protected int serverCount() { + return 1; + } + + /** + * @throws Exception If failed. + */ + public void testReconnectOperationInProgress() throws Exception { + // TODO IGNITE-901. + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ff3d61fc/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectContinuousProcessorTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectContinuousProcessorTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectContinuousProcessorTest.java new file mode 100644 index 0000000..bf0130b --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectContinuousProcessorTest.java @@ -0,0 +1,350 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import org.apache.ignite.*; +import org.apache.ignite.cache.query.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.events.*; +import org.apache.ignite.events.EventType; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.resources.*; + +import javax.cache.event.*; +import java.util.*; +import java.util.concurrent.*; + +import static java.util.concurrent.TimeUnit.*; +import static org.apache.ignite.events.EventType.*; + +/** + * + */ +public class IgniteClientReconnectContinuousProcessorTest extends IgniteClientReconnectAbstractTest { + /** */ + private static volatile CountDownLatch latch; + + /** {@inheritDoc} */ + @Override protected int serverCount() { + return 1; + } + + /** {@inheritDoc} */ + @Override protected int clientCount() { + return 1; + } + + /** + * @throws Exception If failed. + */ + public void testEventListenerReconnect() throws Exception { + Ignite client = grid(serverCount()); + + assertTrue(client.cluster().localNode().isClient()); + + Ignite srv = clientRouter(client); + + TestTcpDiscoverySpi srvSpi = spi(srv); + + EventListener lsnr = new EventListener(); + + UUID opId = client.events().remoteListen(lsnr, null, EventType.EVT_JOB_STARTED); + + lsnr.latch = new CountDownLatch(1); + + log.info("Created remote listener: " + opId); + + final CountDownLatch reconnectLatch = new CountDownLatch(1); + + client.events().localListen(new IgnitePredicate<Event>() { + @Override public boolean apply(Event evt) { + if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) { + info("Reconnected: " + evt); + + reconnectLatch.countDown(); + } + + return true; + } + }, EVT_CLIENT_NODE_RECONNECTED); + + srvSpi.failNode(client.cluster().localNode().id(), null); + + assertTrue(reconnectLatch.await(5000, MILLISECONDS)); + + client.compute().run(new DummyJob()); + + assertTrue(lsnr.latch.await(5000, MILLISECONDS)); + + lsnr.latch = new CountDownLatch(1); + + srv.compute().run(new DummyJob()); + + assertTrue(lsnr.latch.await(5000, MILLISECONDS)); + + lsnr.latch = new CountDownLatch(1); + + log.info("Stop listen, should not get events anymore."); + + client.events().stopRemoteListen(opId); + + assertFalse(lsnr.latch.await(3000, MILLISECONDS)); + } + + /** + * @throws Exception If failed. + */ + public void testMessageListenerReconnect() throws Exception { + Ignite client = grid(serverCount()); + + assertTrue(client.cluster().localNode().isClient()); + + Ignite srv = clientRouter(client); + + TestTcpDiscoverySpi srvSpi = spi(srv); + + final String topic = "testTopic"; + + MessageListener locLsnr = new MessageListener(); + + UUID opId = client.message().remoteListen(topic, new RemoteMessageListener()); + + client.message().localListen(topic, locLsnr); + + final CountDownLatch reconnectLatch = new CountDownLatch(1); + + client.events().localListen(new IgnitePredicate<Event>() { + @Override public boolean apply(Event evt) { + if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) { + info("Reconnected: " + evt); + + reconnectLatch.countDown(); + } + + return true; + } + }, EVT_CLIENT_NODE_RECONNECTED); + + srvSpi.failNode(client.cluster().localNode().id(), null); + + assertTrue(reconnectLatch.await(5000, MILLISECONDS)); + + locLsnr.latch = new CountDownLatch(1); + latch = new CountDownLatch(2); + + client.message().send(topic, "msg1"); + + assertTrue(locLsnr.latch.await(5000, MILLISECONDS)); + assertTrue(latch.await(5000, MILLISECONDS)); + + locLsnr.latch = new CountDownLatch(1); + latch = new CountDownLatch(2); + + srv.message().send(topic, "msg2"); + + assertTrue(locLsnr.latch.await(5000, MILLISECONDS)); + assertTrue(latch.await(5000, MILLISECONDS)); + + log.info("Stop listen, should not get remote messages anymore."); + + client.message().stopRemoteListen(opId); + + srv.message().send(topic, "msg3"); + + locLsnr.latch = new CountDownLatch(1); + latch = new CountDownLatch(1); + + assertTrue(locLsnr.latch.await(5000, MILLISECONDS)); + assertFalse(latch.await(3000, MILLISECONDS)); + } + + /** + * @throws Exception If failed. + */ + public void testCacheContinuousQueryReconnect() throws Exception { + Ignite client = grid(serverCount()); + + assertTrue(client.cluster().localNode().isClient()); + + Ignite srv = clientRouter(client); + + TestTcpDiscoverySpi srvSpi = spi(srv); + + IgniteCache<Object, Object> clientCache = client.getOrCreateCache(new CacheConfiguration<>()); + + CacheEventListener lsnr = new CacheEventListener(); + + ContinuousQuery<Object, Object> qry = new ContinuousQuery<>(); + + qry.setLocalListener(lsnr); + + QueryCursor<?> cur = clientCache.query(qry); + + final CountDownLatch reconnectLatch = new CountDownLatch(1); + + client.events().localListen(new IgnitePredicate<Event>() { + @Override public boolean apply(Event evt) { + if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) { + info("Reconnected: " + evt); + + reconnectLatch.countDown(); + } + + return true; + } + }, EVT_CLIENT_NODE_RECONNECTED); + + srvSpi.failNode(client.cluster().localNode().id(), null); + + assertTrue(reconnectLatch.await(5000, MILLISECONDS)); + + lsnr.latch = new CountDownLatch(1); + + clientCache.put(1, 1); + + assertTrue(lsnr.latch.await(5000, MILLISECONDS)); + + lsnr.latch = new CountDownLatch(1); + + srv.cache(null).put(2, 2); + + assertTrue(lsnr.latch.await(5000, MILLISECONDS)); + + log.info("Close cursor, should not get cache events anymore."); + + cur.close(); + + lsnr.latch = new CountDownLatch(1); + + clientCache.put(3, 3); + + assertFalse(lsnr.latch.await(3000, MILLISECONDS)); + } + + // TODO IGNITE-901 test operations in progress are cancelled. + + /** + * + */ + private static class EventListener implements P2<UUID, Event> { + /** */ + private volatile CountDownLatch latch; + + /** */ + @IgniteInstanceResource + private Ignite ignite; + + /** {@inheritDoc} */ + @Override public boolean apply(UUID uuid, Event evt) { + assertTrue(ignite.cluster().localNode().isClient()); + + ignite.log().info("Received event: " + evt); + + if (latch != null) + latch.countDown(); + + return true; + } + } + + /** + * + */ + private static class MessageListener implements P2<UUID, Object> { + /** */ + private volatile CountDownLatch latch; + + /** */ + @IgniteInstanceResource + private Ignite ignite; + + /** {@inheritDoc} */ + @Override public boolean apply(UUID uuid, Object msg) { + assertTrue(ignite.cluster().localNode().isClient()); + + ignite.log().info("Local listener received message: " + msg); + + if (latch != null) + latch.countDown(); + + return true; + } + } + + /** + * + */ + private static class RemoteMessageListener implements P2<UUID, Object> { + /** */ + @IgniteInstanceResource + private Ignite ignite; + + /** {@inheritDoc} */ + @Override public boolean apply(UUID uuid, Object msg) { + ignite.log().info("Remote listener received message: " + msg); + + if (latch != null) + latch.countDown(); + + return true; + } + } + + /** + * + */ + private static class CacheEventListener implements CacheEntryUpdatedListener<Object, Object> { + /** */ + private volatile CountDownLatch latch; + + /** */ + @IgniteInstanceResource + private Ignite ignite; + + /** {@inheritDoc} */ + @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) { + int cnt = 0; + + for (CacheEntryEvent<?, ?> evt : evts) { + ignite.log().info("Received cache event: " + evt); + + cnt++; + } + + assertEquals(1, cnt); + + if (latch != null) + latch.countDown(); + } + } + + /** + * + */ + static class DummyJob implements IgniteRunnable { + /** */ + @IgniteInstanceResource + private Ignite ignite; + + /** {@inheritDoc} */ + @Override public void run() { + ignite.log().info("Job run."); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ff3d61fc/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientReconnectTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientReconnectTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientReconnectTestSuite.java index 7533a2c..0f5b3ee 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientReconnectTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientReconnectTestSuite.java @@ -34,6 +34,10 @@ public class IgniteClientReconnectTestSuite extends TestSuite { suite.addTestSuite(IgniteClientReconnectApiBlockTest.class); suite.addTestSuite(IgniteClientReconnectDiscoveryStateTest.class); suite.addTestSuite(IgniteClientReconnectCacheTest.class); + suite.addTestSuite(IgniteClientReconnectContinuousProcessorTest.class); + suite.addTestSuite(IgniteClientReconnectComputeTest.class); + suite.addTestSuite(IgniteClientReconnectAtomicsTest.class); + suite.addTestSuite(IgniteClientReconnectCollectionsTest.class); return suite; }