http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java index 29e3551..84e4dc2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java @@ -893,6 +893,14 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { } /** + * @param cacheId Cache ID to remove handlers for. + * @param type Message type. + */ + public void removeHandler(int cacheId, Class<? extends GridCacheMessage> type) { + clsHandlers.remove(new ListenerKey(cacheId, type)); + } + + /** * @param msgCls Message class to check. * @return Message index. */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheManager.java index 775daf5..ae7e9d7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheManager.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.cache; import org.apache.ignite.*; +import org.apache.ignite.lang.*; /** * Interface for cache managers. @@ -49,6 +50,11 @@ public interface GridCacheManager<K, V> { public void onKernalStop(boolean cancel); /** + * @param reconnectFut Reconnect future. + */ + public void onDisconnected(IgniteFuture<?> reconnectFut); + + /** * Prints memory statistics (sizes of internal data structures, etc.). * * NOTE: this method is for testing and profiling purposes only. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheManagerAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheManagerAdapter.java index 52fade8..54b1915 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheManagerAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheManagerAdapter.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache; import org.apache.ignite.*; import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; import java.util.concurrent.atomic.*; @@ -127,6 +128,11 @@ public class GridCacheManagerAdapter<K, V> implements GridCacheManager<K, V> { } /** {@inheritDoc} */ + @Override public void onDisconnected(IgniteFuture<?> reconnectFut) { + // No-op. + } + + /** {@inheritDoc} */ @Override public void printMemoryStats() { // No-op. } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java index f24cf01..36e108f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java @@ -208,10 +208,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { exchLog = cctx.logger(getClass().getName() + ".exchange"); pendingExplicit = GridConcurrentFactory.newMap(); - } - /** {@inheritDoc} */ - @Override public void onKernalStart0() throws IgniteCheckedException { cctx.gridEvents().addLocalEventListener(discoLsnr, EVT_NODE_FAILED, EVT_NODE_LEFT); } @@ -295,15 +292,39 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { * Cancels all client futures. */ public void cancelClientFutures() { - IgniteCheckedException e = new IgniteCheckedException("Operation has been cancelled (grid is stopping)."); + cancelClientFutures(new IgniteCheckedException("Operation has been cancelled (node is stopping).")); + } + + /** {@inheritDoc} */ + @Override public void onDisconnected(IgniteFuture reconnectFut) { + IgniteClientDisconnectedCheckedException err = disconnectedError(reconnectFut); + + cancelClientFutures(err); + } + /** + * @param err Error. + */ + private void cancelClientFutures(IgniteCheckedException err) { for (Collection<GridCacheFuture<?>> futures : futs.values()) { for (GridCacheFuture<?> future : futures) - ((GridFutureAdapter)future).onDone(e); + ((GridFutureAdapter)future).onDone(err); } for (GridCacheAtomicFuture<?> future : atomicFuts.values()) - ((GridFutureAdapter)future).onDone(e); + ((GridFutureAdapter)future).onDone(err); + } + + /** + * @param reconnectFut Reconnect future. + * @return Client disconnected exception. + */ + private IgniteClientDisconnectedCheckedException disconnectedError(@Nullable IgniteFuture<?> reconnectFut) { + if (reconnectFut == null) + reconnectFut = cctx.kernalContext().cluster().clientReconnectFuture(); + + return new IgniteClientDisconnectedCheckedException(reconnectFut, + "Operation has been cancelled (client node disconnected)."); } /** @@ -339,6 +360,9 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { IgniteInternalFuture<?> old = atomicFuts.put(futVer, fut); assert old == null : "Old future is not null [futVer=" + futVer + ", fut=" + fut + ", old=" + old + ']'; + + if (cctx.kernalContext().clientDisconnected()) + ((GridFutureAdapter)fut).onDone(disconnectedError(null)); } /** @@ -459,7 +483,10 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { fut.onNodeLeft(n.id()); } - // Just in case if future was complete before it was added. + if (cctx.kernalContext().clientDisconnected()) + ((GridFutureAdapter)fut).onDone(disconnectedError(null)); + + // Just in case if future was completed before it was added. if (fut.isDone()) removeFuture(fut); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 4398b4c..1f6a8bb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -97,6 +97,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana private final AtomicReference<AffinityTopologyVersion> readyTopVer = new AtomicReference<>(AffinityTopologyVersion.NONE); + /** */ + private GridFutureAdapter<?> reconnectExchangeFut; + /** * Partition map futures. * This set also contains already completed exchange futures to address race conditions when coordinator @@ -237,9 +240,16 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana }); } + /** + * @return Reconnect partition exchange future. + */ + public IgniteInternalFuture<?> reconnectExchangeFuture() { + return reconnectExchangeFut; + } + /** {@inheritDoc} */ - @Override protected void onKernalStart0() throws IgniteCheckedException { - super.onKernalStart0(); + @Override protected void onKernalStart0(boolean reconnect) throws IgniteCheckedException { + super.onKernalStart0(reconnect); ClusterNode loc = cctx.localNode(); @@ -260,6 +270,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana GridDhtPartitionsExchangeFuture fut = exchangeFuture(exchId, discoEvt, null); + if (reconnect) + reconnectExchangeFut = new GridFutureAdapter<>(); + new IgniteThread(cctx.gridName(), "exchange-worker", exchWorker).start(); onDiscoveryEvent(cctx.localNodeId(), fut); @@ -267,10 +280,30 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana // Allow discovery events to get processed. locExchFut.onDone(); - if (log.isDebugEnabled()) - log.debug("Beginning to wait on local exchange future: " + fut); + if (reconnect) { + fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { + @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) { + try { + fut.get(); + + for (GridCacheContext cacheCtx : cctx.cacheContexts()) + cacheCtx.preloader().onInitialExchangeComplete(null); + + reconnectExchangeFut.onDone(); + } + catch (IgniteCheckedException e) { + for (GridCacheContext cacheCtx : cctx.cacheContexts()) + cacheCtx.preloader().onInitialExchangeComplete(e); + + reconnectExchangeFut.onDone(e); + } + } + }); + } + else { + if (log.isDebugEnabled()) + log.debug("Beginning to wait on local exchange future: " + fut); - try { boolean first = true; while (true) { @@ -296,28 +329,35 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana for (GridCacheContext cacheCtx : cctx.cacheContexts()) cacheCtx.preloader().onInitialExchangeComplete(null); - } - catch (IgniteFutureTimeoutCheckedException e) { - IgniteCheckedException err = new IgniteCheckedException("Timed out waiting for exchange future: " + fut, e); - for (GridCacheContext cacheCtx : cctx.cacheContexts()) - cacheCtx.preloader().onInitialExchangeComplete(err); - - throw err; + if (log.isDebugEnabled()) + log.debug("Finished waiting for initial exchange: " + fut.exchangeId()); } - - if (log.isDebugEnabled()) - log.debug("Finished waiting on local exchange: " + fut.exchangeId()); } /** {@inheritDoc} */ @Override protected void onKernalStop0(boolean cancel) { + cctx.gridEvents().removeLocalEventListener(discoLsnr); + + cctx.io().removeHandler(0, GridDhtPartitionsSingleMessage.class); + cctx.io().removeHandler(0, GridDhtPartitionsFullMessage.class); + cctx.io().removeHandler(0, GridDhtPartitionsSingleRequest.class); + + IgniteCheckedException err = cctx.kernalContext().clientDisconnected() ? + new IgniteClientDisconnectedCheckedException(cctx.kernalContext().cluster().clientReconnectFuture(), + "Client node disconnected: " + cctx.gridName()) : + new IgniteInterruptedCheckedException("Node is stopping: " + cctx.gridName()); + // Finish all exchange futures. - for (GridDhtPartitionsExchangeFuture f : exchFuts.values()) - f.onDone(new IgniteInterruptedCheckedException("Grid is stopping: " + cctx.gridName())); + ExchangeFutureSet exchFuts0 = exchFuts; + + if (exchFuts0 != null) { + for (GridDhtPartitionsExchangeFuture f : exchFuts.values()) + f.onDone(err); + } for (AffinityReadyFuture f : readyFuts.values()) - f.onDone(new IgniteInterruptedCheckedException("Grid is stopping: " + cctx.gridName())); + f.onDone(err); U.cancel(exchWorker); @@ -634,7 +674,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana node.id() + ", msg=" + m + ']'); } catch (IgniteCheckedException e) { - U.error(log, "Failed to send partitions full message [node=" + node + ']', e); + U.warn(log, "Failed to send partitions full message [node=" + node + ", err=" + e + ']'); } } @@ -1097,6 +1137,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana catch (IgniteInterruptedCheckedException e) { throw e; } + catch (IgniteClientDisconnectedCheckedException e) { + return; + } catch (IgniteCheckedException e) { U.error(log, "Failed to wait for completion of partition map exchange " + "(preloading will not start): " + exchFut, e); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java index e0f6181..b8bb08e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java @@ -56,6 +56,11 @@ public interface GridCachePreloader { public void onKernalStop(); /** + * Client reconnected callback. + */ + public void onReconnected(); + + /** * Callback by exchange manager when initial partition exchange is complete. * * @param err Error, if any happened on initial exchange. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java index b4f386f..0adf510 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java @@ -87,6 +87,11 @@ public class GridCachePreloaderAdapter implements GridCachePreloader { } /** {@inheritDoc} */ + @Override public void onReconnected() { + // No-op. + } + + /** {@inheritDoc} */ @Override public void preloadPredicate(IgnitePredicate<GridCacheEntryInfo> preloadPred) { this.preloadPred = preloadPred; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index bb87a86..bda0485 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -127,6 +127,9 @@ public class GridCacheProcessor extends GridProcessorAdapter { /** Count down latch for caches. */ private final CountDownLatch cacheStartedLatch = new CountDownLatch(1); + /** */ + private Map<String, DynamicCacheDescriptor> cachesOnDisconnect; + /** * @param ctx Kernal context. */ @@ -704,7 +707,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { }); for (GridCacheSharedManager<?, ?> mgr : sharedCtx.managers()) - mgr.onKernalStart(); + mgr.onKernalStart(false); for (GridCacheAdapter<?, ?> cache : caches.values()) onKernalStart(cache); @@ -796,7 +799,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { // Must call onKernalStart on shared managers after creation of fetched caches. for (GridCacheSharedManager<?, ?> mgr : sharedCtx.managers()) - mgr.onKernalStart(); + mgr.onKernalStart(false); for (Map.Entry<String, GridCacheAdapter<?, ?>> e : caches.entrySet()) { GridCacheAdapter cache = e.getValue(); @@ -911,6 +914,90 @@ public class GridCacheProcessor extends GridProcessorAdapter { } } + /** {@inheritDoc} */ + @Override public void onDisconnected(IgniteFuture<?> reconnectFut) throws IgniteCheckedException { + cachesOnDisconnect = new HashMap<>(registeredCaches); + + IgniteClientDisconnectedCheckedException err = new IgniteClientDisconnectedCheckedException( + ctx.cluster().clientReconnectFuture(), + "Failed to execute dynamic cache change request, client node disconnected."); + + for (IgniteInternalFuture fut : pendingFuts.values()) + ((GridFutureAdapter)fut).onDone(err); + + for (IgniteInternalFuture fut : pendingTemplateFuts.values()) + ((GridFutureAdapter)fut).onDone(err); + + for (GridCacheAdapter cache : caches.values()) { + GridCacheContext cctx = cache.context(); + + cctx.gate().onDisconnected(reconnectFut); + + List<GridCacheManager> mgrs = cache.context().managers(); + + for (ListIterator<GridCacheManager> it = mgrs.listIterator(mgrs.size()); it.hasPrevious();) { + GridCacheManager mgr = it.previous(); + + mgr.onDisconnected(reconnectFut); + } + } + + sharedCtx.onDisconnected(reconnectFut); + + registeredCaches.clear(); + + registeredTemplates.clear(); + } + + /** {@inheritDoc} */ + @Override public void onReconnected(boolean clusterRestarted) throws IgniteCheckedException { + List<GridCacheAdapter> reconnected = new ArrayList<>(caches.size()); + + for (GridCacheAdapter cache : caches.values()) { + String name = cache.name(); + + boolean stopped; + + boolean sysCache = CU.isMarshallerCache(name) || CU.isUtilityCache(name) || CU.isAtomicsCache(name); + + if (!sysCache) { + DynamicCacheDescriptor oldDesc = cachesOnDisconnect.get(maskNull(name)); + + assert oldDesc != null : "No descriptor for cache: " + name; + + DynamicCacheDescriptor newDesc = registeredCaches.get(maskNull(name)); + + stopped = newDesc == null || !oldDesc.deploymentId().equals(newDesc.deploymentId()); + } + else + stopped = false; + + if (stopped) { + cache.context().gate().reconnected(true); + + sharedCtx.removeCacheContext(cache.ctx); + + caches.remove(maskNull(cache.name())); + jCacheProxies.remove(maskNull(cache.name())); + + onKernalStop(cache, true); + stopCache(cache, true); + } + else { + cache.onReconnected(); + + reconnected.add(cache); + } + } + + sharedCtx.onReconnected(); + + for (GridCacheAdapter cache : reconnected) + cache.context().gate().reconnected(false); + + cachesOnDisconnect = null; + } + /** * @param cache Cache to start. * @throws IgniteCheckedException If failed to start cache. @@ -1529,7 +1616,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { if (proxy != null) { if (req.stop()) - proxy.gate().block(); + proxy.gate().stopped(); else proxy.closeProxy(); } @@ -1673,8 +1760,8 @@ public class GridCacheProcessor extends GridProcessorAdapter { depMgr, exchMgr, ioMgr, - storeSesLsnrs, - jta + jta, + storeSesLsnrs ); } @@ -1689,7 +1776,11 @@ public class GridCacheProcessor extends GridProcessorAdapter { Collection<DynamicCacheChangeRequest> reqs = new ArrayList<>(registeredCaches.size() + registeredTemplates.size()); - for (DynamicCacheDescriptor desc : registeredCaches.values()) { + boolean reconnect = ctx.localNodeId().equals(nodeId) && cachesOnDisconnect != null; + + Map<String, DynamicCacheDescriptor> descs = reconnect ? cachesOnDisconnect : registeredCaches; + + for (DynamicCacheDescriptor desc : descs.values()) { if (!desc.cancelled()) { DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(desc.cacheConfiguration().getName(), null); @@ -1717,7 +1808,25 @@ public class GridCacheProcessor extends GridProcessorAdapter { DynamicCacheChangeBatch req = new DynamicCacheChangeBatch(reqs); - req.clientNodes(ctx.discovery().clientNodesMap()); + Map<String, Map<UUID, Boolean>> clientNodesMap = ctx.discovery().clientNodesMap(); + + if (reconnect) { + clientNodesMap = U.newHashMap(caches.size()); + + for (GridCacheAdapter<?, ?> cache : caches.values()) { + Boolean nearEnabled = cache.isNear(); + + Map<UUID, Boolean> map = U.newHashMap(1); + + map.put(nodeId, nearEnabled); + + clientNodesMap.put(cache.name(), map); + } + } + + req.clientNodes(clientNodesMap); + + req.clientReconnect(reconnect); return req; } @@ -1727,38 +1836,86 @@ public class GridCacheProcessor extends GridProcessorAdapter { if (data instanceof DynamicCacheChangeBatch) { DynamicCacheChangeBatch batch = (DynamicCacheChangeBatch)data; - for (DynamicCacheChangeRequest req : batch.requests()) { - if (req.template()) { - CacheConfiguration ccfg = req.startCacheConfiguration(); + if (batch.clientReconnect()) { + for (DynamicCacheChangeRequest req : batch.requests()) { + assert !req.template() : req; - assert ccfg != null : req; + String name = req.cacheName(); - DynamicCacheDescriptor existing = registeredTemplates.get(maskNull(req.cacheName())); + boolean sysCache = CU.isMarshallerCache(name) || CU.isUtilityCache(name) || CU.isAtomicsCache(name); - if (existing == null) { - DynamicCacheDescriptor desc = new DynamicCacheDescriptor( - ctx, - ccfg, - req.cacheType(), - true, - req.deploymentId()); + if (!sysCache) { + DynamicCacheDescriptor desc = registeredCaches.get(maskNull(req.cacheName())); - registeredTemplates.put(maskNull(req.cacheName()), desc); - } + if (desc != null && !desc.cancelled() && desc.deploymentId().equals(req.deploymentId())) { + Map<UUID, Boolean> nodes = batch.clientNodes().get(name); - continue; + assert nodes != null : req; + assert nodes.containsKey(joiningNodeId) : nodes; + + ctx.discovery().addClientNode(req.cacheName(), joiningNodeId, nodes.get(joiningNodeId)); + } + } + else + ctx.discovery().addClientNode(req.cacheName(), joiningNodeId, false); } + } + else { + for (DynamicCacheChangeRequest req : batch.requests()) { + if (req.template()) { + CacheConfiguration ccfg = req.startCacheConfiguration(); - DynamicCacheDescriptor existing = registeredCaches.get(maskNull(req.cacheName())); + assert ccfg != null : req; - if (req.start() && !req.clientStartOnly()) { - CacheConfiguration ccfg = req.startCacheConfiguration(); + DynamicCacheDescriptor existing = registeredTemplates.get(maskNull(req.cacheName())); - if (existing != null) { - if (existing.locallyConfigured()) { - existing.deploymentId(req.deploymentId()); + if (existing == null) { + DynamicCacheDescriptor desc = new DynamicCacheDescriptor( + ctx, + ccfg, + req.cacheType(), + true, + req.deploymentId()); + + registeredTemplates.put(maskNull(req.cacheName()), desc); + } + + continue; + } - existing.addRemoteConfiguration(rmtNodeId, req.startCacheConfiguration()); + DynamicCacheDescriptor existing = registeredCaches.get(maskNull(req.cacheName())); + + if (req.start() && !req.clientStartOnly()) { + CacheConfiguration ccfg = req.startCacheConfiguration(); + + if (existing != null) { + if (existing.locallyConfigured()) { + existing.deploymentId(req.deploymentId()); + + existing.addRemoteConfiguration(rmtNodeId, req.startCacheConfiguration()); + + ctx.discovery().setCacheFilter( + req.cacheName(), + ccfg.getNodeFilter(), + ccfg.getNearConfiguration() != null, + ccfg.getCacheMode() == LOCAL); + } + } + else { + assert req.cacheType() != null : req; + + DynamicCacheDescriptor desc = new DynamicCacheDescriptor( + ctx, + ccfg, + req.cacheType(), + false, + req.deploymentId()); + + // Received statically configured cache. + if (req.initiatingNodeId() == null) + desc.staticallyConfigured(true); + + registeredCaches.put(maskNull(req.cacheName()), desc); ctx.discovery().setCacheFilter( req.cacheName(), @@ -1767,37 +1924,15 @@ public class GridCacheProcessor extends GridProcessorAdapter { ccfg.getCacheMode() == LOCAL); } } - else { - assert req.cacheType() != null : req; - - DynamicCacheDescriptor desc = new DynamicCacheDescriptor( - ctx, - ccfg, - req.cacheType(), - false, - req.deploymentId()); - - // Received statically configured cache. - if (req.initiatingNodeId() == null) - desc.staticallyConfigured(true); - - registeredCaches.put(maskNull(req.cacheName()), desc); - - ctx.discovery().setCacheFilter( - req.cacheName(), - ccfg.getNodeFilter(), - ccfg.getNearConfiguration() != null, - ccfg.getCacheMode() == LOCAL); - } } - } - if (!F.isEmpty(batch.clientNodes())) { - for (Map.Entry<String, Map<UUID, Boolean>> entry : batch.clientNodes().entrySet()) { - String cacheName = entry.getKey(); + if (!F.isEmpty(batch.clientNodes())) { + for (Map.Entry<String, Map<UUID, Boolean>> entry : batch.clientNodes().entrySet()) { + String cacheName = entry.getKey(); - for (Map.Entry<UUID, Boolean> tup : entry.getValue().entrySet()) - ctx.discovery().addClientNode(cacheName, tup.getKey(), tup.getValue()); + for (Map.Entry<UUID, Boolean> tup : entry.getValue().entrySet()) + ctx.discovery().addClientNode(cacheName, tup.getKey(), tup.getValue()); + } } } } @@ -2152,8 +2287,25 @@ public class GridCacheProcessor extends GridProcessorAdapter { } } - if (!sndReqs.isEmpty()) - ctx.discovery().sendCustomEvent(new DynamicCacheChangeBatch(sndReqs)); + Exception err = null; + + if (!sndReqs.isEmpty()) { + try { + ctx.discovery().sendCustomEvent(new DynamicCacheChangeBatch(sndReqs)); + + if (ctx.clientDisconnected()) + err = new IgniteClientDisconnectedCheckedException(ctx.cluster().clientReconnectFuture(), + "Failed to execute dynamic cache change request, client node disconnected."); + } + catch (IgniteCheckedException e) { + err = e; + } + } + + if (err != null) { + for (DynamicCacheStartFuture fut : res) + fut.onDone(err); + } return res; } @@ -2666,8 +2818,8 @@ public class GridCacheProcessor extends GridProcessorAdapter { * * @return Utility cache. */ - public <K, V> GridCacheAdapter<K, V> utilityCache() { - return internalCache(CU.UTILITY_CACHE_NAME); + public <K, V> IgniteInternalCache<K, V> utilityCache() { + return internalCacheEx(CU.UTILITY_CACHE_NAME); } /** @@ -2676,7 +2828,23 @@ public class GridCacheProcessor extends GridProcessorAdapter { * @return Utility cache for atomic data structures. */ public <K, V> IgniteInternalCache<K, V> atomicsCache() { - return cache(CU.ATOMICS_CACHE_NAME); + return internalCacheEx(CU.ATOMICS_CACHE_NAME); + } + + /** + * @param name Cache name. + * @return Cache. + */ + private <K, V> IgniteInternalCache<K, V> internalCacheEx(String name) { + if (ctx.discovery().localNode().isClient()) { + IgniteCacheProxy<K, V> proxy = (IgniteCacheProxy<K, V>)jCacheProxies.get(name); + + assert proxy != null; + + return proxy.internalProxy(); + } + + return internalCache(name); } /** @@ -2796,7 +2964,21 @@ public class GridCacheProcessor extends GridProcessorAdapter { if (old != null) fut = old; - ctx.discovery().sendCustomEvent(new DynamicCacheChangeBatch(Collections.singleton(req))); + Exception err = null; + + try { + ctx.discovery().sendCustomEvent(new DynamicCacheChangeBatch(Collections.singleton(req))); + + if (ctx.clientDisconnected()) + err = new IgniteClientDisconnectedCheckedException(ctx.cluster().clientReconnectFuture(), + "Failed to execute dynamic cache change request, client node disconnected."); + } + catch (IgniteCheckedException e) { + err = e; + } + + if (err != null) + fut.onDone(err); fut.get(); } @@ -2856,8 +3038,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { * Cancel all user operations. */ public void cancelUserOperations() { - for (GridCacheAdapter<?, ?> cache : caches.values()) - cache.ctx.mvcc().cancelClientFutures(); + sharedCtx.mvcc().cancelClientFutures(); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java index 7f4daff..4075d79 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java @@ -35,6 +35,7 @@ import org.apache.ignite.internal.processors.timeout.*; import org.apache.ignite.internal.util.future.*; import org.apache.ignite.internal.util.tostring.*; import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.lang.*; import org.apache.ignite.marshaller.*; import org.jetbrains.annotations.*; @@ -86,9 +87,15 @@ public class GridCacheSharedContext<K, V> { private Collection<CacheStoreSessionListener> storeSesLsnrs; /** + * @param kernalCtx Context. * @param txMgr Transaction manager. * @param verMgr Version manager. * @param mvccMgr MVCC manager. + * @param depMgr Deployment manager. + * @param exchMgr Exchange manager. + * @param ioMgr IO manager. + * @param jtaMgr JTA manager. + * @param storeSesLsnrs Store session listeners. */ public GridCacheSharedContext( GridKernalContext kernalCtx, @@ -98,17 +105,13 @@ public class GridCacheSharedContext<K, V> { GridCacheDeploymentManager<K, V> depMgr, GridCachePartitionExchangeManager<K, V> exchMgr, GridCacheIoManager ioMgr, - Collection<CacheStoreSessionListener> storeSesLsnrs, - CacheJtaManagerAdapter jtaMgr + CacheJtaManagerAdapter jtaMgr, + Collection<CacheStoreSessionListener> storeSesLsnrs ) { this.kernalCtx = kernalCtx; - this.mvccMgr = add(mvccMgr); - this.verMgr = add(verMgr); - this.txMgr = add(txMgr); - this.jtaMgr = add(jtaMgr); - this.depMgr = add(depMgr); - this.exchMgr = add(exchMgr); - this.ioMgr = add(ioMgr); + + setManagers(mgrs, txMgr, jtaMgr, verMgr, mvccMgr, depMgr, exchMgr, ioMgr); + this.storeSesLsnrs = storeSesLsnrs; txMetrics = new TransactionMetricsAdapter(); @@ -117,6 +120,89 @@ public class GridCacheSharedContext<K, V> { } /** + * @param reconnectFut Reconnect future. + * @throws IgniteCheckedException If failed. + */ + void onDisconnected(IgniteFuture<?> reconnectFut) throws IgniteCheckedException { + for (ListIterator<? extends GridCacheSharedManager<?, ?>> it = mgrs.listIterator(mgrs.size()); + it.hasPrevious();) { + GridCacheSharedManager<?, ?> mgr = it.previous(); + + mgr.onDisconnected(reconnectFut); + + if (restartOnDisconnect(mgr)) + mgr.onKernalStop(true); + } + + for (ListIterator<? extends GridCacheSharedManager<?, ?>> it = mgrs.listIterator(mgrs.size()); it.hasPrevious();) { + GridCacheSharedManager<?, ?> mgr = it.previous(); + + if (restartOnDisconnect(mgr)) + mgr.stop(true); + } + } + + /** + * @throws IgniteCheckedException If failed. + */ + void onReconnected() throws IgniteCheckedException { + List<GridCacheSharedManager<K, V>> mgrs = new LinkedList<>(); + + setManagers(mgrs, txMgr, + jtaMgr, + verMgr, + mvccMgr, + new GridCacheDeploymentManager<K, V>(), + new GridCachePartitionExchangeManager<K, V>(), + ioMgr); + + this.mgrs = mgrs; + + for (GridCacheSharedManager<K, V> mgr : mgrs) { + if (restartOnDisconnect(mgr)) + mgr.start(this); + } + + for (GridCacheSharedManager<?, ?> mgr : mgrs) + mgr.onKernalStart(true); + } + + /** + * @param mgr Manager. + * @return {@code True} if manager is restarted cn reconnect. + */ + private boolean restartOnDisconnect(GridCacheSharedManager<?, ?> mgr) { + return mgr instanceof GridCacheDeploymentManager || mgr instanceof GridCachePartitionExchangeManager; + } + + /** + * @param mgrs Managers list. + * @param txMgr Transaction manager. + * @param verMgr Version manager. + * @param mvccMgr MVCC manager. + * @param depMgr Deployment manager. + * @param exchMgr Exchange manager. + * @param ioMgr IO manager. + * @param jtaMgr JTA manager. + */ + private void setManagers(List<GridCacheSharedManager<K, V>> mgrs, + IgniteTxManager txMgr, + CacheJtaManagerAdapter jtaMgr, + GridCacheVersionManager verMgr, + GridCacheMvccManager mvccMgr, + GridCacheDeploymentManager<K, V> depMgr, + GridCachePartitionExchangeManager<K, V> exchMgr, + GridCacheIoManager ioMgr) { + this.mvccMgr = add(mgrs, mvccMgr); + this.verMgr = add(mgrs, verMgr); + this.txMgr = add(mgrs, txMgr); + this.jtaMgr = add(mgrs, jtaMgr); + this.depMgr = add(mgrs, depMgr); + this.exchMgr = add(mgrs, exchMgr); + this.ioMgr = add(mgrs, ioMgr); + } + + /** * Gets all cache contexts for local node. * * @return Collection of all cache contexts. @@ -136,6 +222,7 @@ public class GridCacheSharedContext<K, V> { * Adds cache context to shared cache context. * * @param cacheCtx Cache context to add. + * @throws IgniteCheckedException If cache ID conflict detected. */ @SuppressWarnings("unchecked") public void addCacheContext(GridCacheContext cacheCtx) throws IgniteCheckedException { @@ -230,7 +317,7 @@ public class GridCacheSharedContext<K, V> { */ public byte dataCenterId() { // Data center ID is same for all caches, so grab the first one. - GridCacheContext<K, V> cacheCtx = F.first(cacheContexts()); + GridCacheContext<?, ?> cacheCtx = F.first(cacheContexts()); return cacheCtx.dataCenterId(); } @@ -242,7 +329,7 @@ public class GridCacheSharedContext<K, V> { if (preloadersStartFut == null) { GridCompoundFuture<Object, Object> compound = null; - for (GridCacheContext<K, V> cacheCtx : cacheContexts()) { + for (GridCacheContext<?, ?> cacheCtx : cacheContexts()) { IgniteInternalFuture<Object> startFut = cacheCtx.preloader().startFuture(); if (!startFut.isDone()) { @@ -551,10 +638,12 @@ public class GridCacheSharedContext<K, V> { } /** + * @param mgrs Managers list. * @param mgr Manager to add. * @return Added manager. */ - @Nullable private <T extends GridCacheSharedManager<K, V>> T add(@Nullable T mgr) { + @Nullable private <T extends GridCacheSharedManager<K, V>> T add(List<GridCacheSharedManager<K, V>> mgrs, + @Nullable T mgr) { if (mgr != null) mgrs.add(mgr); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedManager.java index d45052c..668bd00 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedManager.java @@ -18,11 +18,12 @@ package org.apache.ignite.internal.processors.cache; import org.apache.ignite.*; +import org.apache.ignite.lang.*; /** * Cache manager shared across all caches. */ -public interface GridCacheSharedManager <K, V> { +public interface GridCacheSharedManager<K, V> { /** * Starts manager. * @@ -39,9 +40,10 @@ public interface GridCacheSharedManager <K, V> { public void stop(boolean cancel); /** + * @param reconnect {@code True} if manager restarted after client reconnect. * @throws IgniteCheckedException If failed. */ - public void onKernalStart() throws IgniteCheckedException; + public void onKernalStart(boolean reconnect) throws IgniteCheckedException; /** * @param cancel Cancel flag. @@ -49,6 +51,11 @@ public interface GridCacheSharedManager <K, V> { public void onKernalStop(boolean cancel); /** + * @param reconnectFut Reconnect future. + */ + public void onDisconnected(IgniteFuture<?> reconnectFut); + + /** * Prints memory statistics (sizes of internal data structures, etc.). * * NOTE: this method is for testing and profiling purposes only. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedManagerAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedManagerAdapter.java index 2cf7051..6ad76ca 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedManagerAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedManagerAdapter.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache; import org.apache.ignite.*; import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; import java.util.concurrent.atomic.*; @@ -35,6 +36,9 @@ public class GridCacheSharedManagerAdapter<K, V> implements GridCacheSharedManag /** Starting flag. */ private final AtomicBoolean starting = new AtomicBoolean(false); + /** */ + private final AtomicBoolean stop = new AtomicBoolean(false); + /** {@inheritDoc} */ @Override public final void start(GridCacheSharedContext<K, V> cctx) throws IgniteCheckedException { if (!starting.compareAndSet(false, true)) @@ -75,7 +79,7 @@ public class GridCacheSharedManagerAdapter<K, V> implements GridCacheSharedManag /** {@inheritDoc} */ @Override public final void stop(boolean cancel) { - if (!starting.get()) + if (!starting.get() || !stop.compareAndSet(false, true)) // Ignoring attempt to stop manager that has never been started. return; @@ -93,10 +97,10 @@ public class GridCacheSharedManagerAdapter<K, V> implements GridCacheSharedManag } /** {@inheritDoc} */ - @Override public final void onKernalStart() throws IgniteCheckedException { - onKernalStart0(); + @Override public final void onKernalStart(boolean reconnect) throws IgniteCheckedException { + onKernalStart0(reconnect); - if (log != null && log.isDebugEnabled()) + if (!reconnect && log != null && log.isDebugEnabled()) log.debug(kernalStartInfo()); } @@ -113,9 +117,10 @@ public class GridCacheSharedManagerAdapter<K, V> implements GridCacheSharedManag } /** + * @param reconnect {@code True} if manager restarted after client reconnect. * @throws IgniteCheckedException If failed. */ - protected void onKernalStart0() throws IgniteCheckedException { + protected void onKernalStart0(boolean reconnect) throws IgniteCheckedException { // No-op. } @@ -127,6 +132,11 @@ public class GridCacheSharedManagerAdapter<K, V> implements GridCacheSharedManag } /** {@inheritDoc} */ + @Override public void onDisconnected(IgniteFuture<?> reconnectFut) { + // No-op. + } + + /** {@inheritDoc} */ @Override public void printMemoryStats() { // No-op. } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java index f88e288..bd2623d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java @@ -1560,6 +1560,17 @@ public class GridCacheUtils { * @return CacheException runtime exception, never null. */ @NotNull public static RuntimeException convertToCacheException(IgniteCheckedException e) { + IgniteClientDisconnectedCheckedException disconnectedErr = + e instanceof IgniteClientDisconnectedCheckedException ? + (IgniteClientDisconnectedCheckedException)e + : e.getCause(IgniteClientDisconnectedCheckedException.class); + + if (disconnectedErr != null) { + assert disconnectedErr.reconnectFuture() != null : disconnectedErr; + + e = disconnectedErr; + } + if (e.hasCause(CacheWriterException.class)) return new CacheWriterException(U.convertExceptionNoWrap(e)); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheFutureImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheFutureImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheFutureImpl.java index 42e31d2..9233f24 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheFutureImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheFutureImpl.java @@ -43,6 +43,11 @@ public class IgniteCacheFutureImpl<V> extends IgniteFutureImpl<V> { /** {@inheritDoc} */ @Override protected RuntimeException convertException(IgniteCheckedException e) { + if (e instanceof IgniteFutureCancelledCheckedException || + e instanceof IgniteInterruptedCheckedException || + e instanceof IgniteFutureTimeoutCheckedException) + return U.convertException(e); + return CU.convertToCacheException(e); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java index bb7714a..0b2eba0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java @@ -1751,7 +1751,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V assert false; } - @Override public void block() { + @Override public void stopped() { // No-op. } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java index b5c5161..78bd0eb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java @@ -106,6 +106,41 @@ public class CacheDataStructuresManager extends GridCacheManagerAdapter { } /** + * @param set Set. + */ + public void onRemoved(GridCacheSetProxy set) { + setsMap.remove(set.delegate().id(), set); + } + + /** + * @param clusterRestarted Cluster restarted flag. + * @throws IgniteCheckedException If failed. + */ + public void onReconnected(boolean clusterRestarted) throws IgniteCheckedException { + for (Map.Entry<IgniteUuid, GridCacheSetProxy> e : setsMap.entrySet()) { + GridCacheSetProxy set = e.getValue(); + + if (clusterRestarted) { + set.blockOnRemove(); + + setsMap.remove(e.getKey(), set); + } + else + set.needCheckNotRemoved(); + } + + for (Map.Entry<IgniteUuid, GridCacheQueueProxy> e : queuesMap.entrySet()) { + GridCacheQueueProxy queue = e.getValue(); + + if (clusterRestarted) { + queue.delegate().onRemoved(false); + + queuesMap.remove(e.getKey(), queue); + } + } + } + + /** * @throws IgniteCheckedException If thread is interrupted or manager * was not successfully initialized. */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxFinishSync.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxFinishSync.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxFinishSync.java index 2838838..0b351b7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxFinishSync.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxFinishSync.java @@ -22,6 +22,7 @@ import org.apache.ignite.internal.*; import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.util.future.*; import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.lang.*; import org.jetbrains.annotations.*; import org.jsr166.*; @@ -81,6 +82,16 @@ public class GridCacheTxFinishSync<K, V> { } /** + * @param reconnectFut Reconnect future. + */ + public void onDisconnected(IgniteFuture<?> reconnectFut) { + for (ThreadFinishSync threadSync : threadMap.values()) + threadSync.onDisconnected(reconnectFut); + + threadMap.clear(); + } + + /** * Callback invoked when finish response is received from remote node. * * @param nodeId Node ID response was received from. @@ -139,6 +150,11 @@ public class GridCacheTxFinishSync<K, V> { nodeMap.remove(nodeId); } + else if (cctx.kernalContext().clientDisconnected()) { + sync.onDisconnected(cctx.kernalContext().cluster().clientReconnectFuture()); + + nodeMap.remove(nodeId); + } } sync.onSend(); @@ -160,6 +176,16 @@ public class GridCacheTxFinishSync<K, V> { } /** + * @param reconnectFut Reconnect future. + */ + public void onDisconnected(IgniteFuture<?> reconnectFut) { + for (TxFinishSync sync : nodeMap.values()) + sync.onDisconnected(reconnectFut); + + nodeMap.clear(); + } + + /** * @param nodeId Node ID response received from. */ public void onReceive(UUID nodeId) { @@ -288,5 +314,25 @@ public class GridCacheTxFinishSync<K, V> { } } } + + /** + * Client disconnected callback. + * + * @param reconnectFut Reconnect future. + */ + public void onDisconnected(IgniteFuture<?> reconnectFut) { + synchronized (this) { + nodeLeft = true; + + if (pendingFut != null) { + IgniteClientDisconnectedCheckedException err = new IgniteClientDisconnectedCheckedException( + reconnectFut, + "Failed to wait for transaction synchronizer, client node disconnected: " + nodeId); + pendingFut.onDone(err); + + pendingFut = null; + } + } + } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java index adea9e0..22a5287 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java @@ -51,7 +51,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap private static final long serialVersionUID = 0L; /** Topology. */ - private GridDhtPartitionTopology top; + private GridDhtPartitionTopologyImpl top; /** Preloader. */ protected GridCachePreloader preldr; @@ -134,6 +134,18 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap } /** {@inheritDoc} */ + @Override public void onReconnected() { + super.onReconnected(); + + ctx.affinity().onReconnected(); + + top.onReconnected(); + + if (preldr != null) + preldr.onReconnected(); + } + + /** {@inheritDoc} */ @Override public void onKernalStart() throws IgniteCheckedException { super.onKernalStart(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java index de7f876..facf329 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java @@ -94,6 +94,30 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { } /** + * + */ + public void onReconnected() { + lock.writeLock().lock(); + + try { + node2part = null; + + part2node = new HashMap<>(); + + lastExchangeId = null; + + updateSeq.set(1); + + topReadyFut = null; + + topVer = AffinityTopologyVersion.NONE; + } + finally { + lock.writeLock().unlock(); + } + } + + /** * @return Full map string representation. */ @SuppressWarnings( {"ConstantConditions"}) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java index 79d5e75..bb3673d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java @@ -639,10 +639,17 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M if (timeout.finish()) { cctx.kernalContext().timeout().removeTimeoutObject(timeout); - // Remap. - map(keys.keySet(), F.t(node, keys), updTopVer); + try { + fut.get(); - onDone(Collections.<K, V>emptyMap()); + // Remap. + map(keys.keySet(), F.t(node, keys), updTopVer); + + onDone(Collections.<K, V>emptyMap()); + } + catch (IgniteCheckedException e) { + GridPartitionedGetFuture.this.onDone(e); + } } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java index 0355bb3..a43ebe2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java @@ -67,7 +67,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { private GridDhtPartitionDemandPool demandPool; /** Start future. */ - private final GridFutureAdapter<Object> startFut; + private GridFutureAdapter<Object> startFut; /** Busy lock to prevent activities from accessing exchanger while it's stopping. */ private final ReadWriteLock busyLock = new ReentrantReadWriteLock(); @@ -180,13 +180,6 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { topVer.setIfGreater(startTopVer); - // Generate dummy discovery event for local node joining. - DiscoveryEvent discoEvt = cctx.discovery().localJoinEvent(); - - assert discoEvt != null; - - assert discoEvt.topologyVersion() == startTopVer; - supplyPool.start(); demandPool.start(); } @@ -230,7 +223,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { final CacheConfiguration cfg = cctx.config(); - if (cfg.getRebalanceDelay() >= 0) { + if (cfg.getRebalanceDelay() >= 0 && !cctx.kernalContext().clientNode()) { U.log(log, "Starting rebalancing in " + cfg.getRebalanceMode() + " mode: " + cctx.name()); demandPool.syncFuture().listen(new CI1<Object>() { @@ -246,6 +239,11 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { } /** {@inheritDoc} */ + @Override public void onReconnected() { + startFut = new GridFutureAdapter<>(); + } + + /** {@inheritDoc} */ @Override public void onExchangeFutureAdded() { demandPool.onExchangeFutureAdded(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java index 351d6cd..79b7c1a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java @@ -90,6 +90,14 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda public abstract GridDhtCacheAdapter<K, V> dht(); /** {@inheritDoc} */ + @Override public void onReconnected() { + map = new GridCacheConcurrentMap(ctx, + ctx.config().getNearConfiguration().getNearStartSize(), + 0.75F, + map.getEntryFactory()); + } + + /** {@inheritDoc} */ @Override public boolean isNear() { return true; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java index 58f6fe5..0691d39 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java @@ -764,10 +764,17 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma if (timeout.finish()) { cctx.kernalContext().timeout().removeTimeoutObject(timeout); - // Remap. - map(keys.keySet(), F.t(node, keys), updTopVer); + try { + fut.get(); - onDone(Collections.<K, V>emptyMap()); + // Remap. + map(keys.keySet(), F.t(node, keys), updTopVer); + + onDone(Collections.<K, V>emptyMap()); + } + catch (IgniteCheckedException e) { + GridNearGetFuture.this.onDone(e); + } } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridOsCacheDrManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridOsCacheDrManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridOsCacheDrManager.java index 00ed020..7f0a568 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridOsCacheDrManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridOsCacheDrManager.java @@ -20,9 +20,9 @@ package org.apache.ignite.internal.processors.cache.dr; import org.apache.ignite.*; import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.*; -import org.apache.ignite.internal.processors.cache.dr.*; import org.apache.ignite.internal.processors.cache.version.*; import org.apache.ignite.internal.processors.dr.*; +import org.apache.ignite.lang.*; import org.jetbrains.annotations.*; /** @@ -103,4 +103,9 @@ public class GridOsCacheDrManager implements GridCacheDrManager { @Override public boolean receiveEnabled() { return false; } + + /** {@inheritDoc} */ + @Override public void onDisconnected(IgniteFuture reconnectFut) { + // No-op. + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java index 2b93144..316713f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java @@ -110,6 +110,20 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage } /** {@inheritDoc} */ + @Override public void onDisconnected(IgniteFuture<?> reconnectFut) { + IgniteClientDisconnectedCheckedException err = new IgniteClientDisconnectedCheckedException(reconnectFut, + "Query was cancelled, client node disconnected."); + + for (Map.Entry<Long, GridCacheDistributedQueryFuture<?, ?, ?>> e : futs.entrySet()) { + GridCacheDistributedQueryFuture<?, ?, ?> fut = e.getValue(); + + fut.onPage(null, null, err, true); + + futs.remove(e.getKey(), fut); + } + } + + /** {@inheritDoc} */ @Override public void printMemoryStats() { super.printMemoryStats(); @@ -125,6 +139,14 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage */ protected void addQueryFuture(long reqId, GridCacheDistributedQueryFuture<?, ?, ?> fut) { futs.put(reqId, fut); + + if (cctx.kernalContext().clientDisconnected()) { + IgniteClientDisconnectedCheckedException err = new IgniteClientDisconnectedCheckedException( + cctx.kernalContext().cluster().clientReconnectFuture(), + "Query was cancelled, client node disconnected."); + + fut.onDone(err); + } } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java index c2425f0..953cb9a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java @@ -578,6 +578,7 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> { /** * @param nodes Nodes. + * @return Nodes for query execution. */ private Queue<ClusterNode> fallbacks(Collection<ClusterNode> nodes) { Queue<ClusterNode> fallbacks = new LinkedList<>(); @@ -595,18 +596,22 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> { /** * */ + @SuppressWarnings("unchecked") private void init() { ClusterNode node = nodes.poll(); - GridCacheQueryFutureAdapter<?, ?, R> fut0 = - (GridCacheQueryFutureAdapter<?, ?, R>)(node.isLocal() ? qryMgr.queryLocal(bean) : - qryMgr.queryDistributed(bean, Collections.singleton(node))); + GridCacheQueryFutureAdapter<?, ?, R> fut0 = (GridCacheQueryFutureAdapter<?, ?, R>)(node.isLocal() ? + qryMgr.queryLocal(bean) : + qryMgr.queryDistributed(bean, Collections.singleton(node))); fut0.listen(new IgniteInClosure<IgniteInternalFuture<Collection<R>>>() { @Override public void apply(IgniteInternalFuture<Collection<R>> fut) { try { onDone(fut.get()); } + catch (IgniteClientDisconnectedCheckedException e) { + onDone(e); + } catch (IgniteCheckedException e) { if (F.isEmpty(nodes)) onDone(e); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java index a8bace0..53017c9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java @@ -163,7 +163,7 @@ public abstract class GridCacheQueryFutureAdapter<K, V, R> extends GridFutureAda return null; } catch (IgniteCheckedException e) { - throw new IgniteException(e); + throw CU.convertToCacheException(e); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java index e059760..879c30c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java @@ -396,6 +396,11 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { } /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(CacheContinuousQueryHandler.class, this); + } + + /** {@inheritDoc} */ @Override public void writeExternal(ObjectOutput out) throws IOException { U.writeString(out, cacheName); out.writeObject(topic); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTransactionsImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTransactionsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTransactionsImpl.java index 99907e4..7d9bcf0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTransactionsImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTransactionsImpl.java @@ -140,32 +140,39 @@ public class IgniteTransactionsImpl<K, V> implements IgniteTransactionsEx { @SuppressWarnings("unchecked") private IgniteInternalTx txStart0(TransactionConcurrency concurrency, TransactionIsolation isolation, long timeout, int txSize, @Nullable GridCacheContext sysCacheCtx) { - TransactionConfiguration cfg = cctx.gridConfig().getTransactionConfiguration(); - - if (!cfg.isTxSerializableEnabled() && isolation == SERIALIZABLE) - throw new IllegalArgumentException("SERIALIZABLE isolation level is disabled (to enable change " + - "'txSerializableEnabled' configuration property)"); - - IgniteInternalTx tx = cctx.tm().userTx(sysCacheCtx); - - if (tx != null) - throw new IllegalStateException("Failed to start new transaction " + - "(current thread already has a transaction): " + tx); - - tx = cctx.tm().newTx( - false, - false, - sysCacheCtx, - concurrency, - isolation, - timeout, - true, - txSize - ); - - assert tx != null; - - return tx; + cctx.kernalContext().gateway().readLock(); + + try { + TransactionConfiguration cfg = cctx.gridConfig().getTransactionConfiguration(); + + if (!cfg.isTxSerializableEnabled() && isolation == SERIALIZABLE) + throw new IllegalArgumentException("SERIALIZABLE isolation level is disabled (to enable change " + + "'txSerializableEnabled' configuration property)"); + + IgniteInternalTx tx = cctx.tm().userTx(sysCacheCtx); + + if (tx != null) + throw new IllegalStateException("Failed to start new transaction " + + "(current thread already has a transaction): " + tx); + + tx = cctx.tm().newTx( + false, + false, + sysCacheCtx, + concurrency, + isolation, + timeout, + true, + txSize + ); + + assert tx != null; + + return tx; + } + finally { + cctx.kernalContext().gateway().readUnlock(); + } } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java index b6c77f6..82543c2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java @@ -115,7 +115,10 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { new ConcurrentHashMap8<>(5120); /** {@inheritDoc} */ - @Override protected void onKernalStart0() { + @Override protected void onKernalStart0(boolean reconnect) { + if (reconnect) + return; + cctx.gridEvents().addLocalEventListener( new GridLocalEventListener() { @Override public void onEvent(Event evt) { @@ -149,6 +152,14 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { txHandler = new IgniteTxHandler(cctx); } + /** {@inheritDoc} */ + @Override public void onDisconnected(IgniteFuture reconnectFut) { + txFinishSync.onDisconnected(reconnectFut); + + for (Map.Entry<Long, IgniteInternalTx> e : threadMap.entrySet()) + rollbackTx(e.getValue()); + } + /** * @return TX handler. */ @@ -764,11 +775,11 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { throw new IgniteTxTimeoutCheckedException("Transaction timed out: " + this); } - boolean txSerializableEnabled = cctx.txConfig().isTxSerializableEnabled(); + boolean txSerEnabled = cctx.txConfig().isTxSerializableEnabled(); // Clean up committed transactions queue. if (tx.pessimistic() && tx.local()) { - if (tx.enforceSerializable() && txSerializableEnabled) { + if (tx.enforceSerializable() && txSerEnabled) { for (Iterator<IgniteInternalTx> it = committedQ.iterator(); it.hasNext();) { IgniteInternalTx committedTx = it.next(); @@ -784,7 +795,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { return; } - if (txSerializableEnabled && tx.optimistic() && tx.enforceSerializable()) { + if (txSerEnabled && tx.optimistic() && tx.enforceSerializable()) { Set<IgniteTxKey> readSet = tx.readSet(); Set<IgniteTxKey> writeSet = tx.writeSet(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionProxyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionProxyImpl.java index 5099b42..9346e43 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionProxyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionProxyImpl.java @@ -89,7 +89,7 @@ public class TransactionProxyImpl<K, V> implements TransactionProxy, Externaliza try { cctx.kernalContext().gateway().readLock(); } - catch (IllegalStateException e) { + catch (IllegalStateException | IgniteClientDisconnectedException e) { throw e; } catch (RuntimeException | Error e) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java index c776361..90919c7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java @@ -78,13 +78,6 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter { } }; - /** - * @return Pre-generated UUID. - */ - private IgniteUuid uuid() { - return IgniteUuid.randomUuid(); - } - /** {@inheritDoc} */ @Override public void start0() throws IgniteCheckedException { txSerEnabled = cctx.gridConfig().getTransactionConfiguration().isTxSerializableEnabled(); @@ -95,7 +88,7 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter { } /** {@inheritDoc} */ - @Override protected void onKernalStart0() throws IgniteCheckedException { + @Override protected void onKernalStart0(boolean reconnect) throws IgniteCheckedException { for (ClusterNode n : cctx.discovery().remoteNodes()) onReceived(n.id(), n.metrics().getLastDataVersion()); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java index 2920176..3ac44f2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java @@ -57,7 +57,7 @@ public class GridClockSyncProcessor extends GridProcessorAdapter { /** Time coordinator thread. */ private volatile TimeCoordinator timeCoord; - /** Time delta history. Constructed on coorinator. */ + /** Time delta history. Constructed on coordinator. */ private NavigableMap<GridClockDeltaVersion, GridClockDeltaSnapshot> timeSyncHist = new GridBoundedConcurrentOrderedMap<>(MAX_TIME_SYNC_HISTORY); @@ -222,7 +222,7 @@ public class GridClockSyncProcessor extends GridProcessorAdapter { minNodeOrder = node.order(); } - ClusterNode locNode = ctx.grid().localNode(); + ClusterNode locNode = ctx.discovery().localNode(); if (locNode.order() == minNodeOrder) { if (log.isDebugEnabled()) @@ -295,7 +295,7 @@ public class GridClockSyncProcessor extends GridProcessorAdapter { ctx.io().send(n, TOPIC_TIME_SYNC, msg, SYSTEM_POOL); } catch (IgniteCheckedException e) { - if (ctx.discovery().pingNode(n.id())) + if (ctx.discovery().pingNodeNoError(n.id())) U.error(log, "Failed to send time sync snapshot to remote node (did not leave grid?) " + "[nodeId=" + n.id() + ", msg=" + msg + ", err=" + e.getMessage() + ']'); else if (log.isDebugEnabled()) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java index 0ee00f1..1f5589f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/ClusterProcessor.java @@ -20,6 +20,8 @@ package org.apache.ignite.internal.processors.cluster; import org.apache.ignite.internal.*; import org.apache.ignite.internal.cluster.*; import org.apache.ignite.internal.processors.*; +import org.apache.ignite.internal.util.future.*; +import org.apache.ignite.lang.*; /** * @@ -43,4 +45,13 @@ public class ClusterProcessor extends GridProcessorAdapter { public IgniteClusterImpl get() { return cluster; } + + /** + * @return Client reconnect future. + */ + public IgniteFuture<?> clientReconnectFuture() { + IgniteFuture<?> fut = cluster.clientReconnectFuture(); + + return fut != null ? fut : new IgniteFinishedFutureImpl<>(); + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java index ce9b7c0..79020da 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java @@ -33,7 +33,14 @@ public interface GridContinuousHandler extends Externalizable, Cloneable { * Listener registration status. */ public enum RegisterStatus { - REGISTERED, NOT_REGISTERED, DELAYED + /** */ + REGISTERED, + + /** */ + NOT_REGISTERED, + + /** */ + DELAYED } /**