# ignite-901
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/ce2caffd Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/ce2caffd Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/ce2caffd Branch: refs/heads/ignite-901 Commit: ce2caffdde641b6722dfc53877fe5b4633aef2a1 Parents: 782c235 Author: sboikov <sboi...@gridgain.com> Authored: Fri Jul 10 12:15:01 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Fri Jul 10 16:56:14 2015 +0300 ---------------------------------------------------------------------- .../apache/ignite/internal/GridComponent.java | 4 + .../ignite/internal/GridKernalContextImpl.java | 13 +- .../ignite/internal/GridKernalGateway.java | 3 +- .../apache/ignite/internal/IgniteKernal.java | 3 +- .../internal/cluster/IgniteClusterImpl.java | 2 +- .../discovery/GridDiscoveryManager.java | 119 +++++---- .../affinity/GridAffinityAssignmentCache.java | 2 + .../processors/cache/GridCacheAdapter.java | 267 +++++++------------ .../processors/cache/GridCacheIoManager.java | 1 + .../processors/cache/GridCacheProcessor.java | 4 +- .../cache/GridCacheSharedContext.java | 7 +- .../cache/GridCacheSharedManagerAdapter.java | 1 + .../processors/cache/IgniteCacheProxy.java | 2 +- .../CacheDataStructuresManager.java | 1 + .../continuous/CacheContinuousQueryHandler.java | 10 +- .../cache/transactions/IgniteTxManager.java | 4 +- .../continuous/GridContinuousProcessor.java | 9 +- .../org/apache/ignite/spi/IgniteSpiAdapter.java | 15 ++ .../communication/tcp/TcpCommunicationSpi.java | 2 +- .../ignite/spi/discovery/tcp/ClientImpl.java | 59 ++-- .../GridDeploymentManagerStopSelfTest.java | 8 +- .../tcp/TcpClientDiscoverySpiSelfTest.java | 148 ++++++++-- .../IgniteCacheQuerySelfTestSuite.java | 1 - 23 files changed, 378 insertions(+), 307 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ce2caffd/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java b/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java index fb0a157..65e0644 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java @@ -120,12 +120,16 @@ public interface GridComponent { @Nullable public DiscoveryDataExchangeType discoveryDataType(); /** + * Client disconnected callback. + * * @param reconnectFut Reconnect future. * @throws IgniteCheckedException If failed. */ public void onDisconnected(IgniteFuture<?> reconnectFut) throws IgniteCheckedException; /** + * Client reconnected callback. + * * @param clusterRestarted Cluster restarted flag. * @throws IgniteCheckedException If failed. */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ce2caffd/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java index 4a60e28..fd8b50c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java @@ -70,7 +70,6 @@ import java.util.*; import java.util.concurrent.*; import static org.apache.ignite.IgniteSystemProperties.*; -import static org.apache.ignite.internal.GridKernalState.*; import static org.apache.ignite.internal.IgniteComponentType.*; /** @@ -306,6 +305,9 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable private MarshallerContextImpl marshCtx; /** */ + private ClusterNode locNode; + + /** */ private volatile boolean disconnected; /** @@ -330,6 +332,7 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable * @param mgmtExecSvc Management executor service. * @param igfsExecSvc IGFS executor service. * @param restExecSvc REST executor service. + * @param plugins Plugin providers. * @throws IgniteCheckedException In case of error. */ @SuppressWarnings("TypeMayBeWeakened") @@ -506,9 +509,6 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable return ((IgniteKernal)grid).isStopping(); } - /** */ - private ClusterNode locNode; - /** {@inheritDoc} */ @Override public UUID localNodeId() { if (locNode != null) @@ -918,7 +918,10 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable /** {@inheritDoc} */ @Override public boolean clientDisconnected() { - return locNode.isClient() && disconnected; + if (locNode == null) + locNode = discoMgr != null ? discoMgr.localNode() : null; + + return locNode != null ? (locNode.isClient() && disconnected) : false; } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ce2caffd/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGateway.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGateway.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGateway.java index 957174a..1d50aa2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGateway.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGateway.java @@ -17,7 +17,6 @@ package org.apache.ignite.internal; -import org.apache.ignite.*; import org.apache.ignite.internal.util.future.*; import org.apache.ignite.internal.util.tostring.*; import org.jetbrains.annotations.*; @@ -116,7 +115,7 @@ public interface GridKernalGateway { /** * Disconnected callback. * - * @return Reconnect future. + * @return Reconnect future. */ @Nullable public GridFutureAdapter<?> onDisconnected(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ce2caffd/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java index 22338cc..4718d75 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java @@ -439,8 +439,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { assert cfg != null; return F.transform(cfg.getUserAttributes().entrySet(), new C1<Map.Entry<String, ?>, String>() { - @Override - public String apply(Map.Entry<String, ?> e) { + @Override public String apply(Map.Entry<String, ?> e) { return e.getKey() + ", " + e.getValue().toString(); } }); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ce2caffd/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterImpl.java index 246eab5..0287ca7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterImpl.java @@ -52,7 +52,7 @@ public class IgniteClusterImpl extends ClusterGroupAdapter implements IgniteClus @GridToStringExclude private ConcurrentMap nodeLoc; - /** */ + /** Client reconnect future. */ private IgniteFuture<?> reconnecFut; /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ce2caffd/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java index 3e8557d..044dc71 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java @@ -326,7 +326,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { ctx.addNodeAttribute(IgniteNodeAttributes.ATTR_PHY_RAM, totSysMemory); - final DiscoverySpi spi = getSpi(); + DiscoverySpi spi = getSpi(); discoOrdered = discoOrdered(); @@ -477,7 +477,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { // If this is a local join event, just save it and do not notify listeners. if (type == EVT_NODE_JOINED && node.id().equals(locNode.id())) { if (gridStartTime == 0) - gridStartTime = spi.getGridStartTime(); + gridStartTime = getSpi().getGridStartTime(); DiscoveryEvent discoEvt = new DiscoveryEvent(); @@ -515,9 +515,9 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { assert locNode.isClient() : locNode; assert node.isClient() : node; - boolean clusterRestarted = gridStartTime != spi.getGridStartTime(); + boolean clusterRestarted = gridStartTime != getSpi().getGridStartTime(); - gridStartTime = spi.getGridStartTime(); + gridStartTime = getSpi().getGridStartTime(); ((IgniteKernal)ctx.grid()).onReconnected(clusterRestarted); @@ -1198,6 +1198,9 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { try { return getSpi().pingNode(nodeId); } + catch (IgniteException e) { + return false; + } finally { busyLock.leaveBusy(); } @@ -1580,9 +1583,15 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { /** * @param msg Custom message. + * @throws IgniteCheckedException If failed. */ - public void sendCustomEvent(DiscoveryCustomMessage msg) { - getSpi().sendCustomEvent(new CustomMessageWrapper(msg)); + public void sendCustomEvent(DiscoveryCustomMessage msg) throws IgniteCheckedException { + try { + getSpi().sendCustomEvent(new CustomMessageWrapper(msg)); + } + catch (IgniteException e) { + throw new IgniteCheckedException(e); + } } /** @@ -1679,55 +1688,6 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { ).start(); } - /** - * Method is called when any discovery event occurs. - * - * @param type Discovery event type. See {@link DiscoveryEvent} for more details. - * @param topVer Topology version. - * @param node Remote node this event is connected with. - * @param topSnapshot Topology snapshot. - */ - @SuppressWarnings("RedundantTypeArguments") - private void recordEvent(int type, long topVer, ClusterNode node, Collection<ClusterNode> topSnapshot) { - assert node != null; - - if (ctx.event().isRecordable(type)) { - DiscoveryEvent evt = new DiscoveryEvent(); - - evt.node(ctx.discovery().localNode()); - evt.eventNode(node); - evt.type(type); - - evt.topologySnapshot(topVer, U.<ClusterNode, ClusterNode>arrayList(topSnapshot, daemonFilter)); - - if (type == EVT_NODE_METRICS_UPDATED) - evt.message("Metrics were updated: " + node); - - else if (type == EVT_NODE_JOINED) - evt.message("Node joined: " + node); - - else if (type == EVT_NODE_LEFT) - evt.message("Node left: " + node); - - else if (type == EVT_NODE_FAILED) - evt.message("Node failed: " + node); - - else if (type == EVT_NODE_SEGMENTED) - evt.message("Node segmented: " + node); - - else if (type == EVT_CLIENT_NODE_DISCONNECTED) - evt.message("Client node disconnected: " + node); - - else if (type == EVT_CLIENT_NODE_RECONNECTED) - evt.message("Client node reconnected: " + node); - - else - assert false; - - ctx.event().record(evt); - } - } - /** Worker for network segment checks. */ private class SegmentCheckWorker extends GridWorker { /** */ @@ -1818,6 +1778,55 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { } /** + * Method is called when any discovery event occurs. + * + * @param type Discovery event type. See {@link DiscoveryEvent} for more details. + * @param topVer Topology version. + * @param node Remote node this event is connected with. + * @param topSnapshot Topology snapshot. + */ + @SuppressWarnings("RedundantTypeArguments") + private void recordEvent(int type, long topVer, ClusterNode node, Collection<ClusterNode> topSnapshot) { + assert node != null; + + if (ctx.event().isRecordable(type)) { + DiscoveryEvent evt = new DiscoveryEvent(); + + evt.node(ctx.discovery().localNode()); + evt.eventNode(node); + evt.type(type); + + evt.topologySnapshot(topVer, U.<ClusterNode, ClusterNode>arrayList(topSnapshot, daemonFilter)); + + if (type == EVT_NODE_METRICS_UPDATED) + evt.message("Metrics were updated: " + node); + + else if (type == EVT_NODE_JOINED) + evt.message("Node joined: " + node); + + else if (type == EVT_NODE_LEFT) + evt.message("Node left: " + node); + + else if (type == EVT_NODE_FAILED) + evt.message("Node failed: " + node); + + else if (type == EVT_NODE_SEGMENTED) + evt.message("Node segmented: " + node); + + else if (type == EVT_CLIENT_NODE_DISCONNECTED) + evt.message("Client node disconnected: " + node); + + else if (type == EVT_CLIENT_NODE_RECONNECTED) + evt.message("Client node reconnected: " + node); + + else + assert false; + + ctx.event().record(evt); + } + } + + /** * @param type Event type. * @param topVer Topology version. * @param node Node. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ce2caffd/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java index 178226d..d5c2b1a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java @@ -130,6 +130,8 @@ public class GridAffinityAssignmentCache { /** * Kernal stop callback. + * + * @param err Error. */ public void onKernalStop(IgniteCheckedException err) { stopErr = err; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ce2caffd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index e70d8e8..d2a730a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -909,12 +909,12 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /** {@inheritDoc} */ @Override public Set<K> keySet() { - return keySet((CacheEntryPredicate[]) null); + return keySet((CacheEntryPredicate[])null); } /** {@inheritDoc} */ @Override public Set<K> keySetx() { - return keySetx((CacheEntryPredicate[]) null); + return keySetx((CacheEntryPredicate[])null); } /** {@inheritDoc} */ @@ -1220,8 +1220,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V return getAllAsync(Collections.singletonList(key), /*force primary*/true, /*skip tx*/false, null, null, taskName, true, false).chain(new CX1<IgniteInternalFuture<Map<K, V>>, V>() { - @Override - public V applyx(IgniteInternalFuture<Map<K, V>> e) throws IgniteCheckedException { + @Override public V applyx(IgniteInternalFuture<Map<K, V>> e) throws IgniteCheckedException { return e.get().get(key); } }); @@ -1263,12 +1262,11 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V String taskName, final IgniteBiInClosure<KeyCacheObject, Object> vis) { return ctx.closures().callLocalSafe(new GPC<Object>() { - @Nullable - @Override - public Object call() { + @Nullable @Override public Object call() { try { ctx.store().loadAll(tx, keys, vis); - } catch (IgniteCheckedException e) { + } + catch (IgniteCheckedException e) { throw new GridClosureException(e); } @@ -1470,9 +1468,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V if (ctx.config().getInterceptor() != null) fut = fut.chain(new CX1<IgniteInternalFuture<V>, V>() { - @Override - public V applyx(IgniteInternalFuture<V> f) throws IgniteCheckedException { - return (V) ctx.config().getInterceptor().onGet(key, f.get()); + @Override public V applyx(IgniteInternalFuture<V> f) throws IgniteCheckedException { + return (V)ctx.config().getInterceptor().onGet(key, f.get()); } }); @@ -1984,14 +1981,12 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V validateCacheValue(val); return asyncOp(new AsyncOp<V>() { - @Override - public IgniteInternalFuture<V> op(IgniteTxLocalAdapter tx) { + @Override public IgniteInternalFuture<V> op(IgniteTxLocalAdapter tx) { return tx.putAllAsync(ctx, F.t(key, val), true, null, -1, filter) - .chain((IgniteClosure<IgniteInternalFuture<GridCacheReturn>, V>) RET2VAL); + .chain((IgniteClosure<IgniteInternalFuture<GridCacheReturn>, V>)RET2VAL); } - @Override - public String toString() { + @Override public String toString() { return "putAsync [key=" + key + ", val=" + val + ", filter=" + Arrays.toString(filter) + ']'; } }); @@ -2049,13 +2044,11 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V ctx.dr().onReceiveCacheEntriesReceived(drMap.size()); syncOp(new SyncInOp(drMap.size() == 1) { - @Override - public void inOp(IgniteTxLocalAdapter tx) throws IgniteCheckedException { + @Override public void inOp(IgniteTxLocalAdapter tx) throws IgniteCheckedException { tx.putAllDrAsync(ctx, drMap).get(); } - @Override - public String toString() { + @Override public String toString() { return "putAllConflict [drMap=" + drMap + ']'; } }); @@ -2070,13 +2063,11 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V ctx.dr().onReceiveCacheEntriesReceived(drMap.size()); return asyncOp(new AsyncInOp(drMap.keySet()) { - @Override - public IgniteInternalFuture<?> inOp(IgniteTxLocalAdapter tx) { + @Override public IgniteInternalFuture<?> inOp(IgniteTxLocalAdapter tx) { return tx.putAllDrAsync(ctx, drMap); } - @Override - public String toString() { + @Override public String toString() { return "putAllConflictAsync [drMap=" + drMap + ']'; } }); @@ -2093,9 +2084,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V validateCacheKey(key); return syncOp(new SyncOp<EntryProcessorResult<T>>(true) { - @Nullable - @Override - public EntryProcessorResult<T> op(IgniteTxLocalAdapter tx) + @Nullable @Override public EntryProcessorResult<T> op(IgniteTxLocalAdapter tx) throws IgniteCheckedException { Map<? extends K, EntryProcessor<K, V, Object>> invokeMap = Collections.singletonMap(key, (EntryProcessor<K, V, Object>) entryProcessor); @@ -2127,14 +2116,11 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V validateCacheKeys(keys); return syncOp(new SyncOp<Map<K, EntryProcessorResult<T>>>(keys.size() == 1) { - @Nullable - @Override - public Map<K, EntryProcessorResult<T>> op(IgniteTxLocalAdapter tx) + @Nullable @Override public Map<K, EntryProcessorResult<T>> op(IgniteTxLocalAdapter tx) throws IgniteCheckedException { Map<? extends K, EntryProcessor<K, V, Object>> invokeMap = F.viewAsMap(keys, new C1<K, EntryProcessor<K, V, Object>>() { - @Override - public EntryProcessor apply(K k) { + @Override public EntryProcessor apply(K k) { return entryProcessor; } }); @@ -2175,8 +2161,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V IgniteInternalFuture<GridCacheReturn> fut0 = (IgniteInternalFuture<GridCacheReturn>)fut; return fut0.chain(new CX1<IgniteInternalFuture<GridCacheReturn>, EntryProcessorResult<T>>() { - @Override - public EntryProcessorResult<T> applyx(IgniteInternalFuture<GridCacheReturn> fut) + @Override public EntryProcessorResult<T> applyx(IgniteInternalFuture<GridCacheReturn> fut) throws IgniteCheckedException { GridCacheReturn ret = fut.get(); @@ -2206,8 +2191,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V IgniteInternalFuture<?> fut = asyncOp(new AsyncInOp(keys) { @Override public IgniteInternalFuture<GridCacheReturn> inOp(IgniteTxLocalAdapter tx) { Map<? extends K, EntryProcessor<K, V, Object>> invokeMap = F.viewAsMap(keys, new C1<K, EntryProcessor<K, V, Object>>() { - @Override - public EntryProcessor apply(K k) { + @Override public EntryProcessor apply(K k) { return entryProcessor; } }); @@ -2224,8 +2208,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V (IgniteInternalFuture<GridCacheReturn>)fut; return fut0.chain(new CX1<IgniteInternalFuture<GridCacheReturn>, Map<K, EntryProcessorResult<T>>>() { - @Override - public Map<K, EntryProcessorResult<T>> applyx(IgniteInternalFuture<GridCacheReturn> fut) + @Override public Map<K, EntryProcessorResult<T>> applyx(IgniteInternalFuture<GridCacheReturn> fut) throws IgniteCheckedException { GridCacheReturn ret = fut.get(); @@ -2258,8 +2241,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V IgniteInternalFuture<GridCacheReturn> fut0 = (IgniteInternalFuture<GridCacheReturn>)fut; return fut0.chain(new CX1<IgniteInternalFuture<GridCacheReturn>, Map<K, EntryProcessorResult<T>>>() { - @Override - public Map<K, EntryProcessorResult<T>> applyx(IgniteInternalFuture<GridCacheReturn> fut) + @Override public Map<K, EntryProcessorResult<T>> applyx(IgniteInternalFuture<GridCacheReturn> fut) throws IgniteCheckedException { GridCacheReturn ret = fut.get(); @@ -2280,12 +2262,10 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V validateCacheKeys(map.keySet()); return syncOp(new SyncOp<Map<K, EntryProcessorResult<T>>>(map.size() == 1) { - @Nullable - @Override - public Map<K, EntryProcessorResult<T>> op(IgniteTxLocalAdapter tx) + @Nullable @Override public Map<K, EntryProcessorResult<T>> op(IgniteTxLocalAdapter tx) throws IgniteCheckedException { IgniteInternalFuture<GridCacheReturn> fut = - tx.invokeAsync(ctx, (Map<? extends K, ? extends EntryProcessor<K, V, Object>>) map, args); + tx.invokeAsync(ctx, (Map<? extends K, ? extends EntryProcessor<K, V, Object>>)map, args); return fut.get().value(); } @@ -2333,14 +2313,12 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V validateCacheValue(val); return asyncOp(new AsyncOp<Boolean>() { - @Override - public IgniteInternalFuture<Boolean> op(IgniteTxLocalAdapter tx) { + @Override public IgniteInternalFuture<Boolean> op(IgniteTxLocalAdapter tx) { return tx.putAllAsync(ctx, F.t(key, val), false, null, -1, filter).chain( (IgniteClosure<IgniteInternalFuture<GridCacheReturn>, Boolean>) RET2FLAG); } - @Override - public String toString() { + @Override public String toString() { return "putxAsync [key=" + key + ", val=" + val + ", filter=" + Arrays.toString(filter) + ']'; } }); @@ -2370,13 +2348,11 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V validateCacheValue(val); return syncOp(new SyncOp<V>(true) { - @Override - public V op(IgniteTxLocalAdapter tx) throws IgniteCheckedException { - return (V) tx.putAllAsync(ctx, F.t(key, val), true, null, -1, ctx.noValArray()).get().value(); + @Override public V op(IgniteTxLocalAdapter tx) throws IgniteCheckedException { + return (V)tx.putAllAsync(ctx, F.t(key, val), true, null, -1, ctx.noValArray()).get().value(); } - @Override - public String toString() { + @Override public String toString() { return "putIfAbsent [key=" + key + ", val=" + val + ']'; } }); @@ -2396,14 +2372,12 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V validateCacheValue(val); IgniteInternalFuture<V> fut = asyncOp(new AsyncOp<V>() { - @Override - public IgniteInternalFuture<V> op(IgniteTxLocalAdapter tx) { + @Override public IgniteInternalFuture<V> op(IgniteTxLocalAdapter tx) { return tx.putAllAsync(ctx, F.t(key, val), true, null, -1, ctx.noValArray()) .chain((IgniteClosure<IgniteInternalFuture<GridCacheReturn>, V>) RET2VAL); } - @Override - public String toString() { + @Override public String toString() { return "putIfAbsentAsync [key=" + key + ", val=" + val + ']'; } }); @@ -2428,13 +2402,11 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V validateCacheValue(val); Boolean stored = syncOp(new SyncOp<Boolean>(true) { - @Override - public Boolean op(IgniteTxLocalAdapter tx) throws IgniteCheckedException { + @Override public Boolean op(IgniteTxLocalAdapter tx) throws IgniteCheckedException { return tx.putAllAsync(ctx, F.t(key, val), false, null, -1, ctx.noValArray()).get().success(); } - @Override - public String toString() { + @Override public String toString() { return "putxIfAbsent [key=" + key + ", val=" + val + ']'; } }); @@ -2459,14 +2431,12 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V validateCacheValue(val); IgniteInternalFuture<Boolean> fut = asyncOp(new AsyncOp<Boolean>() { - @Override - public IgniteInternalFuture<Boolean> op(IgniteTxLocalAdapter tx) { + @Override public IgniteInternalFuture<Boolean> op(IgniteTxLocalAdapter tx) { return tx.putAllAsync(ctx, F.t(key, val), false, null, -1, ctx.noValArray()).chain( - (IgniteClosure<IgniteInternalFuture<GridCacheReturn>, Boolean>) RET2FLAG); + (IgniteClosure<IgniteInternalFuture<GridCacheReturn>, Boolean>)RET2FLAG); } - @Override - public String toString() { + @Override public String toString() { return "putxIfAbsentAsync [key=" + key + ", val=" + val + ']'; } }); @@ -2537,13 +2507,11 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V validateCacheValue(val); return syncOp(new SyncOp<Boolean>(true) { - @Override - public Boolean op(IgniteTxLocalAdapter tx) throws IgniteCheckedException { + @Override public Boolean op(IgniteTxLocalAdapter tx) throws IgniteCheckedException { return tx.putAllAsync(ctx, F.t(key, val), false, null, -1, ctx.hasValArray()).get().success(); } - @Override - public String toString() { + @Override public String toString() { return "replacex [key=" + key + ", val=" + val + ']'; } }); @@ -2559,14 +2527,12 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V validateCacheValue(val); return asyncOp(new AsyncOp<Boolean>() { - @Override - public IgniteInternalFuture<Boolean> op(IgniteTxLocalAdapter tx) { + @Override public IgniteInternalFuture<Boolean> op(IgniteTxLocalAdapter tx) { return tx.putAllAsync(ctx, F.t(key, val), false, null, -1, ctx.hasValArray()).chain( (IgniteClosure<IgniteInternalFuture<GridCacheReturn>, Boolean>) RET2FLAG); } - @Override - public String toString() { + @Override public String toString() { return "replacexAsync [key=" + key + ", val=" + val + ']'; } }); @@ -2584,8 +2550,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V validateCacheValue(newVal); return syncOp(new SyncOp<Boolean>(true) { - @Override - public Boolean op(IgniteTxLocalAdapter tx) throws IgniteCheckedException { + @Override public Boolean op(IgniteTxLocalAdapter tx) throws IgniteCheckedException { // Register before hiding in the filter. if (ctx.deploymentEnabled()) ctx.deploy().registerClass(oldVal); @@ -2594,8 +2559,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V .success(); } - @Override - public String toString() { + @Override public String toString() { return "replace [key=" + key + ", oldVal=" + oldVal + ", newVal=" + newVal + ']'; } }); @@ -2658,13 +2622,11 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V validateCacheValues(m.values()); syncOp(new SyncInOp(m.size() == 1) { - @Override - public void inOp(IgniteTxLocalAdapter tx) throws IgniteCheckedException { + @Override public void inOp(IgniteTxLocalAdapter tx) throws IgniteCheckedException { tx.putAllAsync(ctx, m, false, null, -1, CU.empty0()).get(); } - @Override - public String toString() { + @Override public String toString() { return "putAll [map=" + m + ']'; } }); @@ -2706,18 +2668,16 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V validateCacheKey(key); V prevVal = syncOp(new SyncOp<V>(true) { - @Override - public V op(IgniteTxLocalAdapter tx) throws IgniteCheckedException { + @Override public V op(IgniteTxLocalAdapter tx) throws IgniteCheckedException { V ret = tx.removeAllAsync(ctx, Collections.singletonList(key), null, true, CU.empty0()).get().value(); if (ctx.config().getInterceptor() != null) - return (V) ctx.config().getInterceptor().onBeforeRemove(new CacheEntryImpl(key, ret)).get2(); + return (V)ctx.config().getInterceptor().onBeforeRemove(new CacheEntryImpl(key, ret)).get2(); return ret; } - @Override - public String toString() { + @Override public String toString() { return "remove [key=" + key + ']'; } }); @@ -2740,15 +2700,13 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V validateCacheKey(key); IgniteInternalFuture<V> fut = asyncOp(new AsyncOp<V>() { - @Override - public IgniteInternalFuture<V> op(IgniteTxLocalAdapter tx) { + @Override public IgniteInternalFuture<V> op(IgniteTxLocalAdapter tx) { // TODO should we invoke interceptor here? return tx.removeAllAsync(ctx, Collections.singletonList(key), null, true, CU.empty0()) .chain((IgniteClosure<IgniteInternalFuture<GridCacheReturn>, V>) RET2VAL); } - @Override - public String toString() { + @Override public String toString() { return "removeAsync [key=" + key + ']'; } }); @@ -2790,13 +2748,11 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V validateCacheKeys(keys); syncOp(new SyncInOp(keys.size() == 1) { - @Override - public void inOp(IgniteTxLocalAdapter tx) throws IgniteCheckedException { + @Override public void inOp(IgniteTxLocalAdapter tx) throws IgniteCheckedException { tx.removeAllAsync(ctx, keys, null, false, CU.empty0()).get(); } - @Override - public String toString() { + @Override public String toString() { return "removeAll [keys=" + keys + ']'; } }); @@ -2818,13 +2774,11 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V validateCacheKeys(keys); IgniteInternalFuture<Object> fut = asyncOp(new AsyncInOp(keys) { - @Override - public IgniteInternalFuture<?> inOp(IgniteTxLocalAdapter tx) { + @Override public IgniteInternalFuture<?> inOp(IgniteTxLocalAdapter tx) { return tx.removeAllAsync(ctx, keys, null, false, CU.empty0()).chain(RET2NULL); } - @Override - public String toString() { + @Override public String toString() { return "removeAllAsync [keys=" + keys + ']'; } }); @@ -2847,13 +2801,11 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V validateCacheKey(key); boolean rmv = syncOp(new SyncOp<Boolean>(true) { - @Override - public Boolean op(IgniteTxLocalAdapter tx) throws IgniteCheckedException { + @Override public Boolean op(IgniteTxLocalAdapter tx) throws IgniteCheckedException { return tx.removeAllAsync(ctx, Collections.singletonList(key), null, false, CU.empty0()).get().success(); } - @Override - public String toString() { + @Override public String toString() { return "removex [key=" + key + ']'; } }); @@ -2887,14 +2839,12 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V validateCacheKey(key); IgniteInternalFuture<Boolean> fut = asyncOp(new AsyncOp<Boolean>() { - @Override - public IgniteInternalFuture<Boolean> op(IgniteTxLocalAdapter tx) { + @Override public IgniteInternalFuture<Boolean> op(IgniteTxLocalAdapter tx) { return tx.removeAllAsync(ctx, Collections.singletonList(key), null, false, filter).chain( - (IgniteClosure<IgniteInternalFuture<GridCacheReturn>, Boolean>) RET2FLAG); + (IgniteClosure<IgniteInternalFuture<GridCacheReturn>, Boolean>)RET2FLAG); } - @Override - public String toString() { + @Override public String toString() { return "removeAsync [key=" + key + ", filter=" + Arrays.toString(filter) + ']'; } }); @@ -2913,21 +2863,19 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V validateCacheKey(key); return syncOp(new SyncOp<GridCacheReturn>(true) { - @Override - public GridCacheReturn op(IgniteTxLocalAdapter tx) throws IgniteCheckedException { + @Override public GridCacheReturn op(IgniteTxLocalAdapter tx) throws IgniteCheckedException { // Register before hiding in the filter. if (ctx.deploymentEnabled()) ctx.deploy().registerClass(val); - return (GridCacheReturn) tx.removeAllAsync(ctx, + return tx.removeAllAsync(ctx, Collections.singletonList(key), null, true, ctx.equalsValArray(val)).get(); } - @Override - public String toString() { + @Override public String toString() { return "remove [key=" + key + ", val=" + val + ']'; } }); @@ -2942,13 +2890,11 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V ctx.dr().onReceiveCacheEntriesReceived(drMap.size()); syncOp(new SyncInOp(false) { - @Override - public void inOp(IgniteTxLocalAdapter tx) throws IgniteCheckedException { + @Override public void inOp(IgniteTxLocalAdapter tx) throws IgniteCheckedException { tx.removeAllDrAsync(ctx, drMap).get(); } - @Override - public String toString() { + @Override public String toString() { return "removeAllConflict [drMap=" + drMap + ']'; } }); @@ -2963,13 +2909,11 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V ctx.dr().onReceiveCacheEntriesReceived(drMap.size()); return asyncOp(new AsyncInOp(drMap.keySet()) { - @Override - public IgniteInternalFuture<?> inOp(IgniteTxLocalAdapter tx) { + @Override public IgniteInternalFuture<?> inOp(IgniteTxLocalAdapter tx) { return tx.removeAllDrAsync(ctx, drMap); } - @Override - public String toString() { + @Override public String toString() { return "removeAllDrASync [drMap=" + drMap + ']'; } }); @@ -2985,22 +2929,20 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V validateCacheKey(key); return syncOp(new SyncOp<GridCacheReturn>(true) { - @Override - public GridCacheReturn op(IgniteTxLocalAdapter tx) throws IgniteCheckedException { + @Override public GridCacheReturn op(IgniteTxLocalAdapter tx) throws IgniteCheckedException { // Register before hiding in the filter. if (ctx.deploymentEnabled()) ctx.deploy().registerClass(oldVal); - return (GridCacheReturn) tx.putAllAsync(ctx, - F.t(key, newVal), - true, - null, - -1, - ctx.equalsValArray(oldVal)).get(); + return tx.putAllAsync(ctx, + F.t(key, newVal), + true, + null, + -1, + ctx.equalsValArray(oldVal)).get(); } - @Override - public String toString() { + @Override public String toString() { return "replace [key=" + key + ", oldVal=" + oldVal + ", newVal=" + newVal + ']'; } }); @@ -3014,17 +2956,17 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V validateCacheKey(key); return asyncOp(new AsyncOp<GridCacheReturn>() { - @Override - public IgniteInternalFuture<GridCacheReturn> op(IgniteTxLocalAdapter tx) { + @Override public IgniteInternalFuture<GridCacheReturn> op(IgniteTxLocalAdapter tx) { // Register before hiding in the filter. try { if (ctx.deploymentEnabled()) ctx.deploy().registerClass(val); - } catch (IgniteCheckedException e) { + } + catch (IgniteCheckedException e) { return new GridFinishedFuture<>(e); } - IgniteInternalFuture<GridCacheReturn> fut = (IgniteInternalFuture) tx.removeAllAsync(ctx, + IgniteInternalFuture<GridCacheReturn> fut = (IgniteInternalFuture)tx.removeAllAsync(ctx, Collections.singletonList(key), null, true, @@ -3033,8 +2975,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V return fut; } - @Override - public String toString() { + @Override public String toString() { return "removeAsync [key=" + key + ", val=" + val + ']'; } }); @@ -3049,17 +2990,17 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V validateCacheKey(key); return asyncOp(new AsyncOp<GridCacheReturn>() { - @Override - public IgniteInternalFuture<GridCacheReturn> op(IgniteTxLocalAdapter tx) { + @Override public IgniteInternalFuture<GridCacheReturn> op(IgniteTxLocalAdapter tx) { // Register before hiding in the filter. try { if (ctx.deploymentEnabled()) ctx.deploy().registerClass(oldVal); - } catch (IgniteCheckedException e) { + } + catch (IgniteCheckedException e) { return new GridFinishedFuture<>(e); } - IgniteInternalFuture<GridCacheReturn> fut = (IgniteInternalFuture) tx.putAllAsync(ctx, + IgniteInternalFuture<GridCacheReturn> fut = (IgniteInternalFuture)tx.putAllAsync(ctx, F.t(key, newVal), true, null, @@ -3069,8 +3010,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V return fut; } - @Override - public String toString() { + @Override public String toString() { return "replaceAsync [key=" + key + ", oldVal=" + oldVal + ", newVal=" + newVal + ']'; } }); @@ -3090,8 +3030,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V validateCacheValue(val); boolean rmv = syncOp(new SyncOp<Boolean>(true) { - @Override - public Boolean op(IgniteTxLocalAdapter tx) throws IgniteCheckedException { + @Override public Boolean op(IgniteTxLocalAdapter tx) throws IgniteCheckedException { // Register before hiding in the filter. if (ctx.deploymentEnabled()) ctx.deploy().registerClass(val); @@ -3100,8 +3039,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V ctx.equalsValArray(val)).get().success(); } - @Override - public String toString() { + @Override public String toString() { return "remove [key=" + key + ", val=" + val + ']'; } }); @@ -3126,24 +3064,23 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V validateCacheValue(val); IgniteInternalFuture<Boolean> fut = asyncOp(new AsyncOp<Boolean>() { - @Override - public IgniteInternalFuture<Boolean> op(IgniteTxLocalAdapter tx) { + @Override public IgniteInternalFuture<Boolean> op(IgniteTxLocalAdapter tx) { // Register before hiding in the filter. if (ctx.deploymentEnabled()) { try { ctx.deploy().registerClass(val); - } catch (IgniteCheckedException e) { + } + catch (IgniteCheckedException e) { return new GridFinishedFuture<>(e); } } return tx.removeAllAsync(ctx, Collections.singletonList(key), null, false, ctx.equalsValArray(val)).chain( - (IgniteClosure<IgniteInternalFuture<GridCacheReturn>, Boolean>) RET2FLAG); + (IgniteClosure<IgniteInternalFuture<GridCacheReturn>, Boolean>)RET2FLAG); } - @Override - public String toString() { + @Override public String toString() { return "removeAsync [key=" + key + ", val=" + val + ']'; } }); @@ -3754,16 +3691,17 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V private IgniteCacheExpiryPolicy expiryPlc = ctx.cache().expiryPolicy(opCtx != null ? opCtx.expiry() : null); - @Override - public Cache.Entry<K, V> apply(Cache.Entry<K, V> lazyEntry) { + @Override public Cache.Entry<K, V> apply(Cache.Entry<K, V> lazyEntry) { CacheOperationContext prev = ctx.gate().enter(opCtx); try { V val = localPeek(lazyEntry.getKey(), CachePeekModes.ONHEAP_ONLY, expiryPlc); return new CacheEntryImpl<>(lazyEntry.getKey(), val); - } catch (IgniteCheckedException e) { + } + catch (IgniteCheckedException e) { throw CU.convertToCacheException(e); - } finally { + } + finally { ctx.gate().leave(prev); } } @@ -3787,20 +3725,20 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V .execute(); return ctx.itHolder().iterator(fut, new CacheIteratorConverter<Cache.Entry<K, V>, Map.Entry<K, V>>() { - @Override - protected Cache.Entry<K, V> convert(Map.Entry<K, V> e) { + @Override protected Cache.Entry<K, V> convert(Map.Entry<K, V> e) { return new CacheEntryImpl<>(e.getKey(), e.getValue()); } - @Override - protected void remove(Cache.Entry<K, V> item) { + @Override protected void remove(Cache.Entry<K, V> item) { CacheOperationContext prev = ctx.gate().enter(opCtx); try { GridCacheAdapter.this.remove(item.getKey()); - } catch (IgniteCheckedException e) { + } + catch (IgniteCheckedException e) { throw CU.convertToCacheException(e); - } finally { + } + finally { ctx.gate().leave(prev); } } @@ -4457,8 +4395,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V return getAllAsync(Collections.singletonList(key), deserializePortable).chain( new CX1<IgniteInternalFuture<Map<K, V>>, V>() { - @Override - public V applyx(IgniteInternalFuture<Map<K, V>> e) throws IgniteCheckedException { + @Override public V applyx(IgniteInternalFuture<Map<K, V>> e) throws IgniteCheckedException { Map<K, V> map = e.get(); assert map.isEmpty() || map.size() == 1 : map.size(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ce2caffd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java index 7e6b906..84e4dc2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java @@ -894,6 +894,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { /** * @param cacheId Cache ID to remove handlers for. + * @param type Message type. */ public void removeHandler(int cacheId, Class<? extends GridCacheMessage> type) { clsHandlers.remove(new ListenerKey(cacheId, type)); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ce2caffd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index 46f9206..bf0f63b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -2297,7 +2297,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { err = new IgniteClientDisconnectedCheckedException(ctx.cluster().clientReconnectFuture(), "Failed to execute dynamic cache change request, client node disconnected."); } - catch (IgniteException e) { + catch (IgniteCheckedException e) { err = e; } } @@ -2957,7 +2957,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { err = new IgniteClientDisconnectedCheckedException(ctx.cluster().clientReconnectFuture(), "Failed to execute dynamic cache change request, client node disconnected."); } - catch (IgniteException e) { + catch (IgniteCheckedException e) { err = e; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ce2caffd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java index 91a6042..4075d79 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java @@ -176,6 +176,7 @@ public class GridCacheSharedContext<K, V> { } /** + * @param mgrs Managers list. * @param txMgr Transaction manager. * @param verMgr Version manager. * @param mvccMgr MVCC manager. @@ -221,6 +222,7 @@ public class GridCacheSharedContext<K, V> { * Adds cache context to shared cache context. * * @param cacheCtx Cache context to add. + * @throws IgniteCheckedException If cache ID conflict detected. */ @SuppressWarnings("unchecked") public void addCacheContext(GridCacheContext cacheCtx) throws IgniteCheckedException { @@ -315,7 +317,7 @@ public class GridCacheSharedContext<K, V> { */ public byte dataCenterId() { // Data center ID is same for all caches, so grab the first one. - GridCacheContext<K, V> cacheCtx = F.first(cacheContexts()); + GridCacheContext<?, ?> cacheCtx = F.first(cacheContexts()); return cacheCtx.dataCenterId(); } @@ -327,7 +329,7 @@ public class GridCacheSharedContext<K, V> { if (preloadersStartFut == null) { GridCompoundFuture<Object, Object> compound = null; - for (GridCacheContext<K, V> cacheCtx : cacheContexts()) { + for (GridCacheContext<?, ?> cacheCtx : cacheContexts()) { IgniteInternalFuture<Object> startFut = cacheCtx.preloader().startFuture(); if (!startFut.isDone()) { @@ -636,6 +638,7 @@ public class GridCacheSharedContext<K, V> { } /** + * @param mgrs Managers list. * @param mgr Manager to add. * @return Added manager. */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ce2caffd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedManagerAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedManagerAdapter.java index 3ad0759..6ad76ca 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedManagerAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedManagerAdapter.java @@ -117,6 +117,7 @@ public class GridCacheSharedManagerAdapter<K, V> implements GridCacheSharedManag } /** + * @param reconnect {@code True} if manager restarted after client reconnect. * @throws IgniteCheckedException If failed. */ protected void onKernalStart0(boolean reconnect) throws IgniteCheckedException { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ce2caffd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java index 9767f49..6f2eed9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java @@ -1774,7 +1774,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V assert false; } - @Override public void block() { + @Override public void stopped() { // No-op. } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ce2caffd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java index 3691ee6..f710105 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java @@ -107,6 +107,7 @@ public class CacheDataStructuresManager extends GridCacheManagerAdapter { /** * Client reconnect callback. + * * @throws IgniteCheckedException If failed. */ public void onReconnected() throws IgniteCheckedException { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ce2caffd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java index 2ed4341..879c30c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java @@ -396,6 +396,11 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { } /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(CacheContinuousQueryHandler.class, this); + } + + /** {@inheritDoc} */ @Override public void writeExternal(ObjectOutput out) throws IOException { U.writeString(out, cacheName); out.writeObject(topic); @@ -438,11 +443,6 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { taskHash = in.readInt(); } - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(CacheContinuousQueryHandler.class, this); - } - /** * @param ctx Kernal context. * @return Cache context. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ce2caffd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java index caaa22d..82543c2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java @@ -154,10 +154,10 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { /** {@inheritDoc} */ @Override public void onDisconnected(IgniteFuture reconnectFut) { + txFinishSync.onDisconnected(reconnectFut); + for (Map.Entry<Long, IgniteInternalTx> e : threadMap.entrySet()) rollbackTx(e.getValue()); - - txFinishSync.onDisconnected(reconnectFut); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ce2caffd/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java index defcd3f..daa9494 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java @@ -511,7 +511,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter { try { ctx.discovery().sendCustomEvent(new StartRoutineDiscoveryMessage(routineId, reqData)); } - catch (IgniteException e) { // Marshaller exception may occurs if user pass unmarshallable filter. + catch (IgniteCheckedException e) { // Marshaller exception may occurs if user pass unmarshallable filter. startFuts.remove(routineId); locInfos.remove(routineId); @@ -576,7 +576,12 @@ public class GridContinuousProcessor extends GridProcessorAdapter { // Unregister handler locally. unregisterHandler(routineId, routine.hnd, true); - ctx.discovery().sendCustomEvent(new StopRoutineDiscoveryMessage(routineId)); + try { + ctx.discovery().sendCustomEvent(new StopRoutineDiscoveryMessage(routineId)); + } + catch (IgniteCheckedException e) { + fut.onDone(e); + } if (ctx.isStopping()) fut.onDone(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ce2caffd/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java index a49d85a..07b39bb 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java @@ -71,6 +71,9 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement /** Discovery listener. */ private GridLocalEventListener paramsLsnr; + /** Local node. */ + private ClusterNode locNode; + /** * Creates new adapter and initializes it from the current (this) class. * SPI name will be initialized to the simple name of the class @@ -112,6 +115,18 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement return ignite.cluster().localNode().id(); } + /** + * @return Local node. + */ + protected ClusterNode getLocalNode() { + if (locNode != null) + return locNode; + + locNode = getSpiContext().localNode(); + + return locNode; + } + /** {@inheritDoc} */ @Override public final String getIgniteHome() { return ignite.configuration().getIgniteHome(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ce2caffd/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index d99a764..4fce6f8 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -1717,7 +1717,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter if (log.isTraceEnabled()) log.trace("Sending message to node [node=" + node + ", msg=" + msg + ']'); - if (node.isLocal()) + if (node.equals(getLocalNode())) notifyListener(node.id(), msg, NOOP); else { GridCommunicationClient client = null; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ce2caffd/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java index 404c71d..cad5435 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java @@ -42,6 +42,7 @@ import java.util.concurrent.atomic.*; import static java.util.concurrent.TimeUnit.*; import static org.apache.ignite.events.EventType.*; import static org.apache.ignite.internal.events.DiscoveryCustomEvent.*; +import static org.apache.ignite.spi.discovery.tcp.ClientImpl.State.*; /** * @@ -265,13 +266,13 @@ class ClientImpl extends TcpDiscoveryImpl { else { State state = this.state; - if (spi.getSpiContext().isStopping() || state == State.STOPPED || state == State.SEGMENTED) { + if (spi.getSpiContext().isStopping() || state == STOPPED || state == SEGMENTED) { if (pingFuts.remove(nodeId, fut)) fut.onDone(false); return false; } - else if (state == State.DISCONNECTED) { + else if (state == DISCONNECTED) { if (pingFuts.remove(nodeId, fut)) fut.onDone(new IgniteClientDisconnectedCheckedException(null, "Failed to ping node, client node disconnected.")); @@ -282,7 +283,7 @@ class ClientImpl extends TcpDiscoveryImpl { timer.schedule(new TimerTask() { @Override public void run() { if (pingFuts.remove(nodeId, finalFut)) { - if (ClientImpl.this.state == State.DISCONNECTED) + if (ClientImpl.this.state == DISCONNECTED) finalFut.onDone(new IgniteClientDisconnectedCheckedException(null, "Failed to ping node, client node disconnected.")); else @@ -345,10 +346,10 @@ class ClientImpl extends TcpDiscoveryImpl { @Override public void sendCustomEvent(DiscoverySpiCustomMessage evt) { State state = this.state; - if (state == State.SEGMENTED) + if (state == SEGMENTED) throw new IgniteException("Failed to send custom message: client is segmented."); - if (state == State.DISCONNECTED) + if (state == DISCONNECTED) throw new IgniteException("Failed to send custom message: client is disconnected."); try { @@ -981,10 +982,6 @@ class ClientImpl extends TcpDiscoveryImpl { /** {@inheritDoc} */ @Override protected void body() throws InterruptedException { - assert state == ClientImpl.State.DISCONNECTED - || state == ClientImpl.State.CONNECTED - || state == ClientImpl.State.STARTING : state; - boolean success = false; Exception err = null; @@ -1135,7 +1132,7 @@ class ClientImpl extends TcpDiscoveryImpl { /** {@inheritDoc} */ @SuppressWarnings("InfiniteLoopStatement") @Override protected void body() throws InterruptedException { - state = ClientImpl.State.STARTING; + state = STARTING; spi.stats.onJoinStarted(); @@ -1146,23 +1143,23 @@ class ClientImpl extends TcpDiscoveryImpl { Object msg = queue.take(); if (msg == JOIN_TIMEOUT) { - if (state == ClientImpl.State.STARTING) { + if (state == STARTING) { joinError(new IgniteSpiException("Join process timed out, did not receive response for " + "join request (consider increasing 'joinTimeout' configuration property) " + "[joinTimeout=" + spi.joinTimeout + ", sock=" + currSock + ']')); break; } - else if (state == ClientImpl.State.DISCONNECTED) { + else if (state == DISCONNECTED) { log.info("Rejoin timeout, will segment."); - state = ClientImpl.State.SEGMENTED; + state = SEGMENTED; notifyDiscovery(EVT_NODE_SEGMENTED, topVer, locNode, allVisibleNodes()); } } else if (msg == SPI_STOP) { - state = ClientImpl.State.STOPPED; + state = STOPPED; assert spi.getSpiContext().isStopping(); @@ -1182,7 +1179,7 @@ class ClientImpl extends TcpDiscoveryImpl { boolean join = joinLatch.getCount() > 0; - if (spi.getSpiContext().isStopping() || (state == ClientImpl.State.SEGMENTED)) { + if (spi.getSpiContext().isStopping() || state == SEGMENTED) { leaveLatch.countDown(); if (join) { @@ -1209,10 +1206,10 @@ class ClientImpl extends TcpDiscoveryImpl { reconnector = null; if (spi.isClientReconnectDisabled()) { - if (state != ClientImpl.State.SEGMENTED && state != ClientImpl.State.STOPPED) { + if (state != SEGMENTED && state != STOPPED) { log.info("Reconnected failed, will segment."); - state = ClientImpl.State.SEGMENTED; + state = SEGMENTED; notifyDiscovery(EVT_NODE_SEGMENTED, topVer, locNode, allVisibleNodes()); } @@ -1220,8 +1217,8 @@ class ClientImpl extends TcpDiscoveryImpl { else { log.info("Reconnected failed, will try join."); - if (state == ClientImpl.State.STARTING || state == ClientImpl.State.CONNECTED) { - state = ClientImpl.State.DISCONNECTED; + if (state == STARTING || state == CONNECTED) { + state = DISCONNECTED; nodeAdded = false; @@ -1264,8 +1261,8 @@ class ClientImpl extends TcpDiscoveryImpl { err = spi.checkFailedError((TcpDiscoveryCheckFailedMessage)msg); if (err != null) { - if (state == ClientImpl.State.DISCONNECTED) { - state = ClientImpl.State.SEGMENTED; + if (state == DISCONNECTED) { + state = SEGMENTED; notifyDiscovery(EVT_NODE_SEGMENTED, topVer, locNode, allVisibleNodes()); } @@ -1298,9 +1295,9 @@ class ClientImpl extends TcpDiscoveryImpl { * @throws InterruptedException If interrupted. */ private void tryJoin() throws InterruptedException { - assert state == ClientImpl.State.DISCONNECTED || state == ClientImpl.State.STARTING : state; + assert state == DISCONNECTED || state == STARTING : state; - boolean join = state == ClientImpl.State.STARTING; + boolean join = state == STARTING; log.info("Try join topology with timeout: " + spi.joinTimeout); @@ -1314,7 +1311,7 @@ class ClientImpl extends TcpDiscoveryImpl { else { log.info("Send join request on rejoin failed, will segment."); - state = ClientImpl.State.SEGMENTED; + state = SEGMENTED; notifyDiscovery(EVT_NODE_SEGMENTED, topVer, locNode, allVisibleNodes()); } @@ -1377,14 +1374,14 @@ class ClientImpl extends TcpDiscoveryImpl { private boolean joining() { ClientImpl.State state = ClientImpl.this.state; - return state == ClientImpl.State.STARTING || state == ClientImpl.State.DISCONNECTED; + return state == STARTING || state == DISCONNECTED; } /** * @return {@code True} if client disconnected. */ private boolean disconnected() { - return state == ClientImpl.State.DISCONNECTED; + return state == DISCONNECTED; } /** @@ -1477,14 +1474,14 @@ class ClientImpl extends TcpDiscoveryImpl { if (disconnected()) notifyDiscovery(EVT_CLIENT_NODE_RECONNECTED, topVer, locNode, nodes); + else + spi.stats.onJoinFinished(); - state = ClientImpl.State.CONNECTED; + state = CONNECTED; joinErr.set(null);; joinLatch.countDown(); - - spi.stats.onJoinFinished(); } else if (log.isDebugEnabled()) log.debug("Discarding node add finished message (this message has already been processed) " + @@ -1737,7 +1734,7 @@ class ClientImpl extends TcpDiscoveryImpl { * @param msg Message. */ private void processCustomMessage(TcpDiscoveryCustomEventMessage msg) { - if (msg.verified() && state == ClientImpl.State.CONNECTED) { + if (msg.verified() && state == CONNECTED) { DiscoverySpiListener lsnr = spi.lsnr; if (lsnr != null) { @@ -1877,7 +1874,7 @@ class ClientImpl extends TcpDiscoveryImpl { /** * */ - private enum State { + enum State { /** */ STARTING, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ce2caffd/modules/core/src/test/java/org/apache/ignite/internal/managers/deployment/GridDeploymentManagerStopSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/deployment/GridDeploymentManagerStopSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/deployment/GridDeploymentManagerStopSelfTest.java index b8f9ce1..62f5d41 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/managers/deployment/GridDeploymentManagerStopSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/deployment/GridDeploymentManagerStopSelfTest.java @@ -98,13 +98,9 @@ public class GridDeploymentManagerStopSelfTest extends GridCommonAbstractTest { @Override public boolean unregister(String rsrcName) { return false; } /** {@inheritDoc} */ - @Override public void onClientDisconnected(IgniteFuture<?> reconnectFut) { - // No-op. - } + @Override public void onClientDisconnected(IgniteFuture<?> reconnectFut) { /* No-op. */ } /** {@inheritDoc} */ - @Override public void onClientReconnected(boolean clusterRestarted) { - // No-op. - } + @Override public void onClientReconnected(boolean clusterRestarted) { /* No-op. */ } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ce2caffd/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java index 5838481..4d19f3e 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java @@ -391,11 +391,11 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { final CountDownLatch latch = new CountDownLatch(1); ((TcpDiscoverySpi)srv1.configuration().getDiscoverySpi()).addIncomeConnectionListener(new IgniteInClosure<Socket>() { - @Override - public void apply(Socket sock) { + @Override public void apply(Socket sock) { try { latch.await(); - } catch (InterruptedException e) { + } + catch (InterruptedException e) { throw new RuntimeException(e); } } @@ -753,11 +753,11 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { attachListeners(1, 1); ((TcpDiscoverySpi)G.ignite("server-1").configuration().getDiscoverySpi()).addSendMessageListener(new IgniteInClosure<TcpDiscoveryAbstractMessage>() { - @Override - public void apply(TcpDiscoveryAbstractMessage msg) { + @Override public void apply(TcpDiscoveryAbstractMessage msg) { try { Thread.sleep(1000000); - } catch (InterruptedException ignored) { + } + catch (InterruptedException ignored) { Thread.interrupted(); } } @@ -787,8 +787,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { G.ignite("client-0").compute().broadcast(F.noop()); assertTrue(GridTestUtils.waitForCondition(new PA() { - @Override - public boolean apply() { + @Override public boolean apply() { return checkMetrics(3, 3, 1); } }, 10000)); @@ -798,8 +797,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { G.ignite("server-0").compute().broadcast(F.noop()); assertTrue(GridTestUtils.waitForCondition(new PA() { - @Override - public boolean apply() { + @Override public boolean apply() { return checkMetrics(3, 3, 2); } }, 10000)); @@ -1204,8 +1202,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { clientsPerSrv = CLIENTS; GridTestUtils.runMultiThreaded(new Callable<Void>() { - @Override - public Void call() throws Exception { + @Override public Void call() throws Exception { Ignite g = startGrid("client-" + clientIdx.getAndIncrement()); clientNodeIds.add(g.cluster().localNode().id()); @@ -1297,7 +1294,8 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { if (changeTop) clientSpi.pauseAll(); - } else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) { + } + else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) { assertEquals(0, disconnectLatch.getCount()); reconnectLatch.countDown(); @@ -1406,7 +1404,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { startClientNodes(1); - Ignite srv = G.ignite("server-0"); + final Ignite srv = G.ignite("server-0"); Ignite client = G.ignite("client-0"); TestTcpDiscoverySpi srvSpi = ((TestTcpDiscoverySpi)srv.configuration().getDiscoverySpi()); @@ -1461,7 +1459,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { log.info("Fail client connection."); srvSpi.failClientReconnect.set(1_000_000); - srvSpi.failNodeAdded.set(1_000_000); + srvSpi.skipNodeAdded = true; clientSpi.brakeConnection(); } @@ -1474,8 +1472,17 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { assertFalse(err.get()); - if (!failSrv) + if (!failSrv) { await(srvFailedLatch); + + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return srv.cluster().nodes().size() == 1; + } + }, 10_000); + + checkNodes(1, 0); + } } /** @@ -1485,10 +1492,6 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { netTimeout = 3000; joinTimeout = 60_000; - clientIpFinder = new TcpDiscoveryVmIpFinder(); - - clientIpFinder.setAddresses(Collections.singleton("localhost:47500..47509")); - final CountDownLatch disconnectLatch = new CountDownLatch(1); final CountDownLatch reconnectLatch = new CountDownLatch(1); final AtomicBoolean err = new AtomicBoolean(false); @@ -1501,8 +1504,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { Ignite client = G.ignite("client-0"); client.events().localListen(new IgnitePredicate<Event>() { - @Override - public boolean apply(Event evt) { + @Override public boolean apply(Event evt) { if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) { log.info("Disconnected event."); @@ -1510,7 +1512,8 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { assertEquals(1, disconnectLatch.getCount()); disconnectLatch.countDown(); - } else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) { + } + else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) { log.info("Reconnected event."); assertEquals(1, reconnectLatch.getCount()); @@ -1518,7 +1521,8 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { assertFalse(err.get()); reconnectLatch.countDown(); - } else { + } + else { log.error("Unexpected event: " + evt); err.set(true); @@ -1545,6 +1549,11 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { assertTrue(reconnectLatch.await(10_000, MILLISECONDS)); + clientNodeIds.clear(); + clientNodeIds.add(client.cluster().localNode().id()); + + checkNodes(1, 1); + assertFalse(err.get()); } @@ -1552,6 +1561,87 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { * @throws Exception If failed. */ public void testDisconnectAfterNetworkTimeout() throws Exception { + netTimeout = 5000; + joinTimeout = 60_000; + maxMissedClientHbs = 2; + + startServerNodes(1); + + startClientNodes(1); + + final Ignite srv = G.ignite("server-0"); + Ignite client = G.ignite("client-0"); + + TestTcpDiscoverySpi srvSpi = ((TestTcpDiscoverySpi)srv.configuration().getDiscoverySpi()); + TestTcpDiscoverySpi clientSpi = ((TestTcpDiscoverySpi)client.configuration().getDiscoverySpi()); + + final CountDownLatch disconnectLatch = new CountDownLatch(1); + final CountDownLatch reconnectLatch = new CountDownLatch(1); + final AtomicBoolean err = new AtomicBoolean(false); + + client.events().localListen(new IgnitePredicate<Event>() { + @Override + public boolean apply(Event evt) { + if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) { + log.info("Disconnected event."); + + assertEquals(1, reconnectLatch.getCount()); + assertEquals(1, disconnectLatch.getCount()); + assertFalse(err.get()); + + disconnectLatch.countDown(); + } + else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) { + log.info("Reconnected event."); + + assertEquals(1, reconnectLatch.getCount()); + assertEquals(0, disconnectLatch.getCount()); + assertFalse(err.get()); + + reconnectLatch.countDown(); + } + else { + log.error("Unexpected event: " + evt); + + err.set(true); + } + + return true; + } + }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED, EVT_NODE_SEGMENTED); + + log.info("Fail client connection1."); + + srvSpi.failClientReconnect.set(1_000_000); + srvSpi.skipNodeAdded = true; + + clientSpi.brakeConnection(); + + assertTrue(disconnectLatch.await(10_000, MILLISECONDS)); + + log.info("Fail client connection2."); + + srvSpi.failClientReconnect.set(0); + srvSpi.skipNodeAdded = false; + + clientSpi.brakeConnection(); + + assertTrue(reconnectLatch.await(10_000, MILLISECONDS)); + + clientNodeIds.clear(); + + clientNodeIds.add(client.cluster().localNode().id()); + + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override + public boolean apply() { + return srv.cluster().nodes().size() == 2; + } + }, 10_000); + + checkNodes(1, 1); + + assertFalse(err.get()); } /** @@ -1834,6 +1924,9 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { /** */ private volatile String delayJoinAckFor; + /** */ + private volatile boolean skipNodeAdded; + /** * @param lock Lock. */ @@ -1906,6 +1999,13 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest { boolean fail = false; + if (skipNodeAdded && + (msg instanceof TcpDiscoveryNodeAddedMessage || msg instanceof TcpDiscoveryNodeAddFinishedMessage)) { + log.info("Skip message: " + msg); + + return; + } + if (msg instanceof TcpDiscoveryNodeAddedMessage) fail = failNodeAdded.getAndDecrement() > 0; else if (msg instanceof TcpDiscoveryNodeAddFinishedMessage)