# ignite-901 client reconnect WIP
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/f5f3efd1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/f5f3efd1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/f5f3efd1 Branch: refs/heads/ignite-901 Commit: f5f3efd164ae0b67917bbbbf1b856b2d24e72217 Parents: 6e23608 Author: sboikov <sboi...@gridgain.com> Authored: Mon Jun 29 16:01:17 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Wed Jul 1 18:10:57 2015 +0300 ---------------------------------------------------------------------- .../apache/ignite/internal/GridComponent.java | 10 + .../ignite/internal/GridKernalContextImpl.java | 12 +- .../ignite/internal/GridKernalGateway.java | 43 +- .../ignite/internal/GridKernalGatewayImpl.java | 115 ++-- .../apache/ignite/internal/GridKernalState.java | 3 + .../ignite/internal/GridPluginComponent.java | 10 + .../IgniteDisconnectedCheckedException.java | 32 ++ .../apache/ignite/internal/IgniteKernal.java | 103 +++- .../ignite/internal/MarshallerContextImpl.java | 11 +- .../internal/managers/GridManagerAdapter.java | 10 + .../deployment/GridDeploymentManager.java | 88 ++- .../discovery/GridDiscoveryManager.java | 31 +- .../processors/GridProcessorAdapter.java | 10 + .../processors/cache/GridCacheAdapter.java | 284 ++++++---- .../cache/GridCacheDeploymentManager.java | 5 + .../processors/cache/GridCacheGateway.java | 128 ++++- .../processors/cache/GridCacheIoManager.java | 9 +- .../processors/cache/GridCacheMvccManager.java | 10 +- .../GridCachePartitionExchangeManager.java | 29 +- .../processors/cache/GridCacheProcessor.java | 50 +- .../cache/GridCacheSharedContext.java | 55 +- .../cache/GridCacheSharedManager.java | 7 +- .../cache/GridCacheSharedManagerAdapter.java | 14 +- .../GridDhtPartitionsExchangeFuture.java | 4 + .../dht/preloader/GridDhtPreloader.java | 7 - .../distributed/near/GridNearCacheAdapter.java | 5 + .../processors/task/GridTaskProcessor.java | 2 +- .../org/apache/ignite/spi/IgniteSpiAdapter.java | 9 +- .../communication/tcp/TcpCommunicationSpi.java | 56 +- .../ignite/spi/discovery/tcp/ClientImpl.java | 169 ++++-- .../ignite/spi/discovery/tcp/ServerImpl.java | 4 +- .../spi/discovery/tcp/TcpDiscoveryImpl.java | 2 +- .../spi/discovery/tcp/TcpDiscoverySpi.java | 2 +- .../tcp/internal/TcpDiscoveryNode.java | 17 + .../spi/swapspace/file/FileSwapSpaceSpi.java | 2 +- .../internal/GridUpdateNotifierSelfTest.java | 14 +- .../IgniteClientReconnectAbstractTest.java | 143 +++++ .../IgniteClientReconnectApiBlockTest.java | 157 ++++++ .../IgniteClientReconnectCacheTest.java | 562 +++++++++++++++++++ ...IgniteClientReconnectDiscoveryStateTest.java | 110 ++++ .../GridCacheReplicatedInvalidateSelfTest.java | 3 +- .../tcp/TcpClientDiscoverySpiSelfTest.java | 148 ++++- .../IgniteClientReconnectTestSuite.java | 40 ++ 43 files changed, 2162 insertions(+), 363 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f3efd1/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java b/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java index fb227cd..5b3b0c3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java @@ -116,4 +116,14 @@ public interface GridComponent { * @return Unique component type for discovery data exchange. */ @Nullable public DiscoveryDataExchangeType discoveryDataType(); + + /** + * + */ + public void onDisconnected() throws IgniteCheckedException; + + /** + * + */ + public void onReconnected() throws IgniteCheckedException; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f3efd1/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java index 65107a7..581c891 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal; import org.apache.ignite.*; +import org.apache.ignite.cluster.*; import org.apache.ignite.configuration.*; import org.apache.ignite.internal.managers.checkpoint.*; import org.apache.ignite.internal.managers.collision.*; @@ -501,9 +502,18 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable return ((IgniteKernal)grid).isStopping(); } + /** */ + private ClusterNode locNode; + /** {@inheritDoc} */ @Override public UUID localNodeId() { - return cfg.getNodeId(); + if (locNode != null) + return locNode.id(); + + if (discoMgr != null) + locNode = discoMgr.localNode(); + + return locNode != null ? locNode.id() : config().getNodeId(); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f3efd1/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGateway.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGateway.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGateway.java index 0156136..20d81de 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGateway.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGateway.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal; +import org.apache.ignite.internal.util.future.*; import org.apache.ignite.internal.util.tostring.*; /** @@ -39,22 +40,6 @@ import org.apache.ignite.internal.util.tostring.*; @GridToStringExclude public interface GridKernalGateway { /** - * Performs light-weight check on the kernal state at the moment of this call. - * <p> - * This method should only be used when the kernal state should be checked just once - * at the beginning of the method and the fact that <b>kernal state can change in the middle - * of such method's execution</b> should not matter. - * <p> - * For example, when a method returns a constant value its implementation doesn't depend - * on the kernal being valid throughout its execution. In such case it is enough to check - * the kernal's state just once at the beginning of this method to provide consistent behavior - * of the API without incurring overhead of <code>lock-based</code> guard methods. - * - * @throws IllegalStateException Thrown in case when no kernal calls are allowed. - */ - public void lightCheck() throws IllegalStateException; - - /** * Should be called on entering every kernal related call * <b>originated directly or indirectly via public API</b>. * <p> @@ -113,31 +98,27 @@ public interface GridKernalGateway { public void writeUnlock(); /** - * Adds stop listener. Note that the identity set will be used to store listeners for - * performance reasons. Futures can register a listener to be notified when they need to - * be internally interrupted. + * Gets user stack trace through the first call of grid public API. * - * @param lsnr Listener to add. + * @return User stack trace. */ - public void addStopListener(Runnable lsnr); + public String userStackTrace(); /** - * Removes previously added stop listener. - * - * @param lsnr Listener to remove. + * @param timeout Timeout. + * @return {@code True} if write lock has been acquired. + * @throws InterruptedException If interrupted. */ - public void removeStopListener(Runnable lsnr); + public boolean tryWriteLock(long timeout) throws InterruptedException; /** - * Gets user stack trace through the first call of grid public API. + * Disconnected callback. */ - public String userStackTrace(); + public void onDisconnected(); /** - * @param timeout Timeout. - * @return {@code True} if write lock has been acquired. - * @throws InterruptedException If interrupted. + * Reconnected callback. */ - public boolean tryWriteLock(long timeout) throws InterruptedException; + public void onReconnected(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f3efd1/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGatewayImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGatewayImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGatewayImpl.java index 35bbbed..ef894cf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGatewayImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGatewayImpl.java @@ -17,12 +17,13 @@ package org.apache.ignite.internal; +import org.apache.ignite.*; import org.apache.ignite.internal.util.*; +import org.apache.ignite.internal.util.future.*; import org.apache.ignite.internal.util.tostring.*; import org.apache.ignite.internal.util.typedef.internal.*; import java.io.*; -import java.util.*; import java.util.concurrent.*; /** @@ -38,10 +39,6 @@ public class GridKernalGatewayImpl implements GridKernalGateway, Serializable { private final GridSpinReadWriteLock rwLock = new GridSpinReadWriteLock(); /** */ - @GridToStringExclude - private final Collection<Runnable> lsnrs = new GridSetWrapper<>(new IdentityHashMap<Runnable, Object>()); - - /** */ private volatile GridKernalState state = GridKernalState.STOPPED; /** */ @@ -63,12 +60,6 @@ public class GridKernalGatewayImpl implements GridKernalGateway, Serializable { } /** {@inheritDoc} */ - @Override public void lightCheck() throws IllegalStateException { - if (state != GridKernalState.STARTED) - throw illegalState(); - } - - /** {@inheritDoc} */ @SuppressWarnings({"LockAcquiredButNotSafelyReleased", "BusyWait"}) @Override public void readLock() throws IllegalStateException { if (stackTrace == null) @@ -76,12 +67,7 @@ public class GridKernalGatewayImpl implements GridKernalGateway, Serializable { rwLock.readLock(); - if (state != GridKernalState.STARTED) { - // Unlock just acquired lock. - rwLock.readUnlock(); - - throw illegalState(); - } + checkState(true); } /** {@inheritDoc} */ @@ -90,6 +76,8 @@ public class GridKernalGatewayImpl implements GridKernalGateway, Serializable { stackTrace = stackTrace(); rwLock.readLock(); + + checkState(false); } /** {@inheritDoc} */ @@ -137,6 +125,36 @@ public class GridKernalGatewayImpl implements GridKernalGateway, Serializable { return false; } + /** {@inheritDoc} */ + @Override public void onDisconnected() { + rwLock.readLock(); + + try { + if (state == GridKernalState.STARTED) + state = GridKernalState.DISCONNECTED; + } + finally { + rwLock.readUnlock(); + } + } + + /** {@inheritDoc} */ + @Override public void onReconnected() { + rwLock.writeLock(); + + try { + if (state == GridKernalState.DISCONNECTED) + state = GridKernalState.STARTED; + } + finally { + rwLock.writeUnlock(); + } + + synchronized (this) { + notifyAll(); + } + } + /** * Retrieves user stack trace. * @@ -173,16 +191,8 @@ public class GridKernalGatewayImpl implements GridKernalGateway, Serializable { // NOTE: this method should always be called within write lock. this.state = state; - if (state == GridKernalState.STOPPING) { - Runnable[] runs; - - synchronized (lsnrs) { - lsnrs.toArray(runs = new Runnable[lsnrs.size()]); - } - - // In the same thread. - for (Runnable r : runs) - r.run(); + synchronized (this) { + notifyAll(); } } @@ -192,33 +202,42 @@ public class GridKernalGatewayImpl implements GridKernalGateway, Serializable { } /** {@inheritDoc} */ - @Override public void addStopListener(Runnable lsnr) { - assert lsnr != null; - - if (state == GridKernalState.STARTING || state == GridKernalState.STARTED) - synchronized (lsnrs) { - lsnrs.add(lsnr); - } - else - // Call right away in the same thread. - lsnr.run(); + @Override public String userStackTrace() { + return stackTrace; } - /** {@inheritDoc} */ - @Override public void removeStopListener(Runnable lsnr) { - assert lsnr != null; - - synchronized (lsnrs) { - lsnrs.remove(lsnr); + /** + * @param err If {@code true} throws {@link IllegalStateException} if not started. + */ + private void checkState(boolean err) { + if (state != GridKernalState.STARTED) { + do { + if (state == GridKernalState.DISCONNECTED) { + rwLock.readUnlock(); + + try { + synchronized (this) { + while (state == GridKernalState.DISCONNECTED) + this.wait(); + } + } + catch (InterruptedException e) { + throw new IgniteException(e); + } + + rwLock.readLock(); + } + else if (err) { + rwLock.readUnlock(); + + throw illegalState(); + } + } + while (state != GridKernalState.STARTED); } } /** {@inheritDoc} */ - @Override public String userStackTrace() { - return stackTrace; - } - - /** {@inheritDoc} */ @Override public String toString() { return S.toString(GridKernalGatewayImpl.class, this); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f3efd1/modules/core/src/main/java/org/apache/ignite/internal/GridKernalState.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalState.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalState.java index fbb8f45..7d63578 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalState.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalState.java @@ -32,6 +32,9 @@ public enum GridKernalState { /** Kernal is stopping. */ STOPPING, + /** Kernal is disconnected. */ + DISCONNECTED, + /** Kernal is stopped. * <p> * This is also the initial state of the kernal. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f3efd1/modules/core/src/main/java/org/apache/ignite/internal/GridPluginComponent.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridPluginComponent.java b/modules/core/src/main/java/org/apache/ignite/internal/GridPluginComponent.java index b438bc1..709ce3d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridPluginComponent.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridPluginComponent.java @@ -64,6 +64,16 @@ public class GridPluginComponent implements GridComponent { } /** {@inheritDoc} */ + @Override public void onDisconnected() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void onReconnected() { + // No-op. + } + + /** {@inheritDoc} */ @Override public void onKernalStop(boolean cancel) { plugin.onIgniteStop(cancel); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f3efd1/modules/core/src/main/java/org/apache/ignite/internal/IgniteDisconnectedCheckedException.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteDisconnectedCheckedException.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteDisconnectedCheckedException.java new file mode 100644 index 0000000..0684356 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteDisconnectedCheckedException.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import org.apache.ignite.*; + +/** + * + */ +public class IgniteDisconnectedCheckedException extends IgniteCheckedException { + /** + * @param msg Message. + */ + public IgniteDisconnectedCheckedException(String msg) { + super(msg); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f3efd1/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java index e19d3d3..821a1f5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java @@ -60,6 +60,7 @@ import org.apache.ignite.internal.processors.session.*; import org.apache.ignite.internal.processors.task.*; import org.apache.ignite.internal.processors.timeout.*; import org.apache.ignite.internal.util.*; +import org.apache.ignite.internal.util.future.*; import org.apache.ignite.internal.util.lang.*; import org.apache.ignite.internal.util.tostring.*; import org.apache.ignite.internal.util.typedef.*; @@ -439,7 +440,8 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { assert cfg != null; return F.transform(cfg.getUserAttributes().entrySet(), new C1<Map.Entry<String, ?>, String>() { - @Override public String apply(Map.Entry<String, ?> e) { + @Override + public String apply(Map.Entry<String, ?> e) { return e.getKey() + ", " + e.getValue().toString(); } }); @@ -2800,6 +2802,105 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { } /** + * + */ + public void disconnected() { + ctx.gateway().onDisconnected(); + + try { + for (GridComponent comp : ctx.components()) + comp.onDisconnected(); + } + catch (IgniteCheckedException e) { + e.printStackTrace(); + } + } + + private void stopOnDisconnect() { + GridCacheProcessor cacheProcessor = ctx.cache(); + + List<GridComponent> comps = ctx.components(); + + // Callback component in reverse order while kernal is still functional + // if called in the same thread, at least. + for (ListIterator<GridComponent> it = comps.listIterator(comps.size()); it.hasPrevious();) { + GridComponent comp = it.previous(); + + try { + if (!skipDaemon(comp) && (!(comp instanceof GridManager))) + comp.onKernalStop(true); + } + catch (Throwable e) { + errOnStop = true; + + U.error(log, "Failed to pre-stop processor: " + comp, e); + + if (e instanceof Error) + throw e; + } + } + + if (cacheProcessor != null) + cacheProcessor.cancelUserOperations(); + + for (ListIterator<GridComponent> it = comps.listIterator(comps.size()); it.hasPrevious();) { + GridComponent comp = it.previous(); + + try { + if (!skipDaemon(comp) && (!(comp instanceof GridManager))) { + comp.stop(true); + + if (log.isDebugEnabled()) + log.debug("Component stopped: " + comp); + } + } + catch (Throwable e) { + errOnStop = true; + + U.error(log, "Failed to stop component (ignoring): " + comp, e); + + if (e instanceof Error) + throw (Error)e; + } + } + + ctx.marshallerContext().onDisconnected(); + } + + private void restart() throws IgniteCheckedException { + List<PluginProvider> plugins = U.allPluginProviders(); + + startProcessor(new ClusterProcessor(ctx)); + + GridResourceProcessor rsrcProc = new GridResourceProcessor(ctx); + + rsrcProc.setSpringContext(rsrcCtx); + + scheduler = new IgniteSchedulerImpl(ctx); + + startProcessor(rsrcProc); + } + + /** + * + */ + public void reconnected() { + new Thread() { + public void run() { + try { + ctx.gateway().onReconnected(); + + for (GridComponent comp : ctx.components()) + comp.onReconnected(); + } + catch (IgniteCheckedException e) { + e.printStackTrace(); + } + } + }.start(); + } + + /** * Creates optional component. * * @param cls Component interface. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f3efd1/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java index 9f7c983..948babc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java @@ -32,7 +32,7 @@ import java.util.concurrent.*; */ public class MarshallerContextImpl extends MarshallerContextAdapter { /** */ - private final CountDownLatch latch = new CountDownLatch(1); + private CountDownLatch latch = new CountDownLatch(1); /** */ private final File workDir; @@ -57,6 +57,15 @@ public class MarshallerContextImpl extends MarshallerContextAdapter { } /** + * + */ + public void onDisconnected() { + latch = new CountDownLatch(1); + + cache = null; + } + + /** * @param ctx Kernal context. * @throws IgniteCheckedException In case of error. */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f3efd1/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java index 40a5ea5..d886f4c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java @@ -166,6 +166,16 @@ public abstract class GridManagerAdapter<T extends IgniteSpi> implements GridMan // No-op. } + /** {@inheritDoc} */ + @Override public void onDisconnected() throws IgniteCheckedException { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void onReconnected() throws IgniteCheckedException { + // No-op. + } + /** * Starts wrapped SPI. * http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f3efd1/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentManager.java index 75fe98f..b82090f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/deployment/GridDeploymentManager.java @@ -110,20 +110,24 @@ public class GridDeploymentManager extends GridManagerAdapter<DeploymentSpi> { } /** {@inheritDoc} */ - @Override public void stop(boolean cancel) throws IgniteCheckedException { - GridProtocolHandler.deregisterDeploymentManager(); + @Override public void onDisconnected() throws IgniteCheckedException { + storesOnKernalStop(); - if (verStore != null) - verStore.stop(); + storesStop(); - if (ldrStore != null) - ldrStore.stop(); + startStores(); + } - if (locStore != null) - locStore.stop(); + /** {@inheritDoc} */ + @Override public void onReconnected() throws IgniteCheckedException { + storesOnKernalStart(); + } - if (comm != null) - comm.stop(); + /** {@inheritDoc} */ + @Override public void stop(boolean cancel) throws IgniteCheckedException { + GridProtocolHandler.deregisterDeploymentManager(); + + storesStop(); getSpi().setListener(null); @@ -135,21 +139,12 @@ public class GridDeploymentManager extends GridManagerAdapter<DeploymentSpi> { /** {@inheritDoc} */ @Override public void onKernalStart0() throws IgniteCheckedException { - locStore.onKernalStart(); - ldrStore.onKernalStart(); - verStore.onKernalStart(); + storesOnKernalStart(); } /** {@inheritDoc} */ @Override public void onKernalStop0(boolean cancel) { - if (verStore != null) - verStore.onKernalStop(); - - if (ldrStore != null) - ldrStore.onKernalStop(); - - if (locStore != null) - locStore.onKernalStop(); + storesOnKernalStop(); } /** {@inheritDoc} */ @@ -547,6 +542,57 @@ public class GridDeploymentManager extends GridManagerAdapter<DeploymentSpi> { return ldr instanceof GridDeploymentClassLoader; } + + /** + * @throws IgniteCheckedException If failed. + */ + private void startStores() throws IgniteCheckedException { + locStore = new GridDeploymentLocalStore(getSpi(), ctx, comm); + ldrStore = new GridDeploymentPerLoaderStore(getSpi(), ctx, comm); + verStore = new GridDeploymentPerVersionStore(getSpi(), ctx, comm); + + locStore.start(); + ldrStore.start(); + verStore.start(); + } + + /** + * @throws IgniteCheckedException If failed. + */ + private void storesOnKernalStart() throws IgniteCheckedException { + locStore.onKernalStart(); + ldrStore.onKernalStart(); + verStore.onKernalStart(); + } + + /** + * + */ + private void storesOnKernalStop() { + if (verStore != null) + verStore.onKernalStop(); + + if (ldrStore != null) + ldrStore.onKernalStop(); + + if (locStore != null) + locStore.onKernalStop(); + } + + /** + * + */ + private void storesStop() { + if (verStore != null) + verStore.stop(); + + if (ldrStore != null) + ldrStore.stop(); + + if (locStore != null) + locStore.stop(); + } + /** * */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f3efd1/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java index 1e4b972..7a524a4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java @@ -287,6 +287,11 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { } /** {@inheritDoc} */ + @Override public void onDisconnected() throws IgniteCheckedException { + locJoinEvt = new GridFutureAdapter<>(); + } + + /** {@inheritDoc} */ @Override protected void onKernalStart0() throws IgniteCheckedException { if (Boolean.TRUE.equals(ctx.config().isClientMode()) && !getSpi().isClientMode()) ctx.performance().add("Enable client mode for TcpDiscoverySpi " + @@ -386,7 +391,9 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { verChanged = false; } else { - if (type != EVT_NODE_SEGMENTED) { + if (type != EVT_NODE_SEGMENTED && + type != EVT_CLIENT_NODE_DISCONNECTED && + type != EVT_CLIENT_NODE_RECONNECTED) { minorTopVer = 0; verChanged = true; @@ -1693,6 +1700,12 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { else if (type == EVT_NODE_SEGMENTED) evt.message("Node segmented: " + node); + else if (type == EVT_CLIENT_NODE_DISCONNECTED) + evt.message("Client node disconnected: " + node); + + else if (type == EVT_CLIENT_NODE_RECONNECTED) + evt.message("Client node reconnected: " + node); + else assert false; @@ -1862,6 +1875,22 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { break; } + case EVT_CLIENT_NODE_DISCONNECTED: { + assert localNode().isClient() : evt; + + ((IgniteKernal)ctx.grid()).disconnected(); + + break; + } + + case EVT_CLIENT_NODE_RECONNECTED: { + assert localNode().isClient() : evt; + + ((IgniteKernal)ctx.grid()).reconnected(); + + break; + } + case DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT: { if (ctx.event().isRecordable(DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT)) { DiscoveryCustomEvent customEvt = new DiscoveryCustomEvent(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f3efd1/modules/core/src/main/java/org/apache/ignite/internal/processors/GridProcessorAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/GridProcessorAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/GridProcessorAdapter.java index a84c48a..04a39d1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/GridProcessorAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/GridProcessorAdapter.java @@ -62,6 +62,16 @@ public abstract class GridProcessorAdapter implements GridProcessor { } /** {@inheritDoc} */ + @Override public void onDisconnected() throws IgniteCheckedException { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void onReconnected() throws IgniteCheckedException { + // No-op. + } + + /** {@inheritDoc} */ @Nullable @Override public DiscoveryDataExchangeType discoveryDataType() { return null; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f3efd1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index 7335d72..d754e3e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -906,12 +906,12 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /** {@inheritDoc} */ @Override public Set<K> keySet() { - return keySet((CacheEntryPredicate[])null); + return keySet((CacheEntryPredicate[]) null); } /** {@inheritDoc} */ @Override public Set<K> keySetx() { - return keySetx((CacheEntryPredicate[])null); + return keySetx((CacheEntryPredicate[]) null); } /** {@inheritDoc} */ @@ -1217,7 +1217,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V return getAllAsync(Collections.singletonList(key), /*force primary*/true, /*skip tx*/false, null, null, taskName, true, false).chain(new CX1<IgniteInternalFuture<Map<K, V>>, V>() { - @Override public V applyx(IgniteInternalFuture<Map<K, V>> e) throws IgniteCheckedException { + @Override + public V applyx(IgniteInternalFuture<Map<K, V>> e) throws IgniteCheckedException { return e.get().get(key); } }); @@ -1259,11 +1260,12 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V String taskName, final IgniteBiInClosure<KeyCacheObject, Object> vis) { return ctx.closures().callLocalSafe(new GPC<Object>() { - @Nullable @Override public Object call() { + @Nullable + @Override + public Object call() { try { ctx.store().loadAll(tx, keys, vis); - } - catch (IgniteCheckedException e) { + } catch (IgniteCheckedException e) { throw new GridClosureException(e); } @@ -1465,8 +1467,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V if (ctx.config().getInterceptor() != null) fut = fut.chain(new CX1<IgniteInternalFuture<V>, V>() { - @Override public V applyx(IgniteInternalFuture<V> f) throws IgniteCheckedException { - return (V)ctx.config().getInterceptor().onGet(key, f.get()); + @Override + public V applyx(IgniteInternalFuture<V> f) throws IgniteCheckedException { + return (V) ctx.config().getInterceptor().onGet(key, f.get()); } }); @@ -1978,12 +1981,14 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V validateCacheValue(val); return asyncOp(new AsyncOp<V>() { - @Override public IgniteInternalFuture<V> op(IgniteTxLocalAdapter tx) { + @Override + public IgniteInternalFuture<V> op(IgniteTxLocalAdapter tx) { return tx.putAllAsync(ctx, F.t(key, val), true, null, -1, filter) - .chain((IgniteClosure<IgniteInternalFuture<GridCacheReturn>, V>)RET2VAL); + .chain((IgniteClosure<IgniteInternalFuture<GridCacheReturn>, V>) RET2VAL); } - @Override public String toString() { + @Override + public String toString() { return "putAsync [key=" + key + ", val=" + val + ", filter=" + Arrays.toString(filter) + ']'; } }); @@ -2041,11 +2046,13 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V ctx.dr().onReceiveCacheEntriesReceived(drMap.size()); syncOp(new SyncInOp(drMap.size() == 1) { - @Override public void inOp(IgniteTxLocalAdapter tx) throws IgniteCheckedException { + @Override + public void inOp(IgniteTxLocalAdapter tx) throws IgniteCheckedException { tx.putAllDrAsync(ctx, drMap).get(); } - @Override public String toString() { + @Override + public String toString() { return "putAllConflict [drMap=" + drMap + ']'; } }); @@ -2060,11 +2067,13 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V ctx.dr().onReceiveCacheEntriesReceived(drMap.size()); return asyncOp(new AsyncInOp(drMap.keySet()) { - @Override public IgniteInternalFuture<?> inOp(IgniteTxLocalAdapter tx) { + @Override + public IgniteInternalFuture<?> inOp(IgniteTxLocalAdapter tx) { return tx.putAllDrAsync(ctx, drMap); } - @Override public String toString() { + @Override + public String toString() { return "putAllConflictAsync [drMap=" + drMap + ']'; } }); @@ -2081,7 +2090,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V validateCacheKey(key); return syncOp(new SyncOp<EntryProcessorResult<T>>(true) { - @Nullable @Override public EntryProcessorResult<T> op(IgniteTxLocalAdapter tx) + @Nullable + @Override + public EntryProcessorResult<T> op(IgniteTxLocalAdapter tx) throws IgniteCheckedException { Map<? extends K, EntryProcessor<K, V, Object>> invokeMap = Collections.singletonMap(key, (EntryProcessor<K, V, Object>) entryProcessor); @@ -2113,11 +2124,14 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V validateCacheKeys(keys); return syncOp(new SyncOp<Map<K, EntryProcessorResult<T>>>(keys.size() == 1) { - @Nullable @Override public Map<K, EntryProcessorResult<T>> op(IgniteTxLocalAdapter tx) + @Nullable + @Override + public Map<K, EntryProcessorResult<T>> op(IgniteTxLocalAdapter tx) throws IgniteCheckedException { Map<? extends K, EntryProcessor<K, V, Object>> invokeMap = F.viewAsMap(keys, new C1<K, EntryProcessor<K, V, Object>>() { - @Override public EntryProcessor apply(K k) { + @Override + public EntryProcessor apply(K k) { return entryProcessor; } }); @@ -2158,7 +2172,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V IgniteInternalFuture<GridCacheReturn> fut0 = (IgniteInternalFuture<GridCacheReturn>)fut; return fut0.chain(new CX1<IgniteInternalFuture<GridCacheReturn>, EntryProcessorResult<T>>() { - @Override public EntryProcessorResult<T> applyx(IgniteInternalFuture<GridCacheReturn> fut) + @Override + public EntryProcessorResult<T> applyx(IgniteInternalFuture<GridCacheReturn> fut) throws IgniteCheckedException { GridCacheReturn ret = fut.get(); @@ -2188,7 +2203,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V IgniteInternalFuture<?> fut = asyncOp(new AsyncInOp(keys) { @Override public IgniteInternalFuture<GridCacheReturn> inOp(IgniteTxLocalAdapter tx) { Map<? extends K, EntryProcessor<K, V, Object>> invokeMap = F.viewAsMap(keys, new C1<K, EntryProcessor<K, V, Object>>() { - @Override public EntryProcessor apply(K k) { + @Override + public EntryProcessor apply(K k) { return entryProcessor; } }); @@ -2205,7 +2221,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V (IgniteInternalFuture<GridCacheReturn>)fut; return fut0.chain(new CX1<IgniteInternalFuture<GridCacheReturn>, Map<K, EntryProcessorResult<T>>>() { - @Override public Map<K, EntryProcessorResult<T>> applyx(IgniteInternalFuture<GridCacheReturn> fut) + @Override + public Map<K, EntryProcessorResult<T>> applyx(IgniteInternalFuture<GridCacheReturn> fut) throws IgniteCheckedException { GridCacheReturn ret = fut.get(); @@ -2238,7 +2255,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V IgniteInternalFuture<GridCacheReturn> fut0 = (IgniteInternalFuture<GridCacheReturn>)fut; return fut0.chain(new CX1<IgniteInternalFuture<GridCacheReturn>, Map<K, EntryProcessorResult<T>>>() { - @Override public Map<K, EntryProcessorResult<T>> applyx(IgniteInternalFuture<GridCacheReturn> fut) + @Override + public Map<K, EntryProcessorResult<T>> applyx(IgniteInternalFuture<GridCacheReturn> fut) throws IgniteCheckedException { GridCacheReturn ret = fut.get(); @@ -2259,10 +2277,12 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V validateCacheKeys(map.keySet()); return syncOp(new SyncOp<Map<K, EntryProcessorResult<T>>>(map.size() == 1) { - @Nullable @Override public Map<K, EntryProcessorResult<T>> op(IgniteTxLocalAdapter tx) + @Nullable + @Override + public Map<K, EntryProcessorResult<T>> op(IgniteTxLocalAdapter tx) throws IgniteCheckedException { IgniteInternalFuture<GridCacheReturn> fut = - tx.invokeAsync(ctx, (Map<? extends K, ? extends EntryProcessor<K, V, Object>>)map, args); + tx.invokeAsync(ctx, (Map<? extends K, ? extends EntryProcessor<K, V, Object>>) map, args); return fut.get().value(); } @@ -2310,12 +2330,14 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V validateCacheValue(val); return asyncOp(new AsyncOp<Boolean>() { - @Override public IgniteInternalFuture<Boolean> op(IgniteTxLocalAdapter tx) { + @Override + public IgniteInternalFuture<Boolean> op(IgniteTxLocalAdapter tx) { return tx.putAllAsync(ctx, F.t(key, val), false, null, -1, filter).chain( (IgniteClosure<IgniteInternalFuture<GridCacheReturn>, Boolean>) RET2FLAG); } - @Override public String toString() { + @Override + public String toString() { return "putxAsync [key=" + key + ", val=" + val + ", filter=" + Arrays.toString(filter) + ']'; } }); @@ -2345,11 +2367,13 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V validateCacheValue(val); return syncOp(new SyncOp<V>(true) { - @Override public V op(IgniteTxLocalAdapter tx) throws IgniteCheckedException { - return (V)tx.putAllAsync(ctx, F.t(key, val), true, null, -1, ctx.noValArray()).get().value(); + @Override + public V op(IgniteTxLocalAdapter tx) throws IgniteCheckedException { + return (V) tx.putAllAsync(ctx, F.t(key, val), true, null, -1, ctx.noValArray()).get().value(); } - @Override public String toString() { + @Override + public String toString() { return "putIfAbsent [key=" + key + ", val=" + val + ']'; } }); @@ -2369,12 +2393,14 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V validateCacheValue(val); IgniteInternalFuture<V> fut = asyncOp(new AsyncOp<V>() { - @Override public IgniteInternalFuture<V> op(IgniteTxLocalAdapter tx) { + @Override + public IgniteInternalFuture<V> op(IgniteTxLocalAdapter tx) { return tx.putAllAsync(ctx, F.t(key, val), true, null, -1, ctx.noValArray()) .chain((IgniteClosure<IgniteInternalFuture<GridCacheReturn>, V>) RET2VAL); } - @Override public String toString() { + @Override + public String toString() { return "putIfAbsentAsync [key=" + key + ", val=" + val + ']'; } }); @@ -2399,11 +2425,13 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V validateCacheValue(val); Boolean stored = syncOp(new SyncOp<Boolean>(true) { - @Override public Boolean op(IgniteTxLocalAdapter tx) throws IgniteCheckedException { + @Override + public Boolean op(IgniteTxLocalAdapter tx) throws IgniteCheckedException { return tx.putAllAsync(ctx, F.t(key, val), false, null, -1, ctx.noValArray()).get().success(); } - @Override public String toString() { + @Override + public String toString() { return "putxIfAbsent [key=" + key + ", val=" + val + ']'; } }); @@ -2428,12 +2456,14 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V validateCacheValue(val); IgniteInternalFuture<Boolean> fut = asyncOp(new AsyncOp<Boolean>() { - @Override public IgniteInternalFuture<Boolean> op(IgniteTxLocalAdapter tx) { + @Override + public IgniteInternalFuture<Boolean> op(IgniteTxLocalAdapter tx) { return tx.putAllAsync(ctx, F.t(key, val), false, null, -1, ctx.noValArray()).chain( - (IgniteClosure<IgniteInternalFuture<GridCacheReturn>, Boolean>)RET2FLAG); + (IgniteClosure<IgniteInternalFuture<GridCacheReturn>, Boolean>) RET2FLAG); } - @Override public String toString() { + @Override + public String toString() { return "putxIfAbsentAsync [key=" + key + ", val=" + val + ']'; } }); @@ -2504,11 +2534,13 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V validateCacheValue(val); return syncOp(new SyncOp<Boolean>(true) { - @Override public Boolean op(IgniteTxLocalAdapter tx) throws IgniteCheckedException { + @Override + public Boolean op(IgniteTxLocalAdapter tx) throws IgniteCheckedException { return tx.putAllAsync(ctx, F.t(key, val), false, null, -1, ctx.hasValArray()).get().success(); } - @Override public String toString() { + @Override + public String toString() { return "replacex [key=" + key + ", val=" + val + ']'; } }); @@ -2524,12 +2556,14 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V validateCacheValue(val); return asyncOp(new AsyncOp<Boolean>() { - @Override public IgniteInternalFuture<Boolean> op(IgniteTxLocalAdapter tx) { + @Override + public IgniteInternalFuture<Boolean> op(IgniteTxLocalAdapter tx) { return tx.putAllAsync(ctx, F.t(key, val), false, null, -1, ctx.hasValArray()).chain( (IgniteClosure<IgniteInternalFuture<GridCacheReturn>, Boolean>) RET2FLAG); } - @Override public String toString() { + @Override + public String toString() { return "replacexAsync [key=" + key + ", val=" + val + ']'; } }); @@ -2547,7 +2581,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V validateCacheValue(newVal); return syncOp(new SyncOp<Boolean>(true) { - @Override public Boolean op(IgniteTxLocalAdapter tx) throws IgniteCheckedException { + @Override + public Boolean op(IgniteTxLocalAdapter tx) throws IgniteCheckedException { // Register before hiding in the filter. if (ctx.deploymentEnabled()) ctx.deploy().registerClass(oldVal); @@ -2556,7 +2591,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V .success(); } - @Override public String toString() { + @Override + public String toString() { return "replace [key=" + key + ", oldVal=" + oldVal + ", newVal=" + newVal + ']'; } }); @@ -2619,11 +2655,13 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V validateCacheValues(m.values()); syncOp(new SyncInOp(m.size() == 1) { - @Override public void inOp(IgniteTxLocalAdapter tx) throws IgniteCheckedException { + @Override + public void inOp(IgniteTxLocalAdapter tx) throws IgniteCheckedException { tx.putAllAsync(ctx, m, false, null, -1, CU.empty0()).get(); } - @Override public String toString() { + @Override + public String toString() { return "putAll [map=" + m + ']'; } }); @@ -2665,16 +2703,18 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V validateCacheKey(key); V prevVal = syncOp(new SyncOp<V>(true) { - @Override public V op(IgniteTxLocalAdapter tx) throws IgniteCheckedException { + @Override + public V op(IgniteTxLocalAdapter tx) throws IgniteCheckedException { V ret = tx.removeAllAsync(ctx, Collections.singletonList(key), null, true, CU.empty0()).get().value(); if (ctx.config().getInterceptor() != null) - return (V)ctx.config().getInterceptor().onBeforeRemove(new CacheEntryImpl(key, ret)).get2(); + return (V) ctx.config().getInterceptor().onBeforeRemove(new CacheEntryImpl(key, ret)).get2(); return ret; } - @Override public String toString() { + @Override + public String toString() { return "remove [key=" + key + ']'; } }); @@ -2697,13 +2737,15 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V validateCacheKey(key); IgniteInternalFuture<V> fut = asyncOp(new AsyncOp<V>() { - @Override public IgniteInternalFuture<V> op(IgniteTxLocalAdapter tx) { + @Override + public IgniteInternalFuture<V> op(IgniteTxLocalAdapter tx) { // TODO should we invoke interceptor here? return tx.removeAllAsync(ctx, Collections.singletonList(key), null, true, CU.empty0()) .chain((IgniteClosure<IgniteInternalFuture<GridCacheReturn>, V>) RET2VAL); } - @Override public String toString() { + @Override + public String toString() { return "removeAsync [key=" + key + ']'; } }); @@ -2745,11 +2787,13 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V validateCacheKeys(keys); syncOp(new SyncInOp(keys.size() == 1) { - @Override public void inOp(IgniteTxLocalAdapter tx) throws IgniteCheckedException { + @Override + public void inOp(IgniteTxLocalAdapter tx) throws IgniteCheckedException { tx.removeAllAsync(ctx, keys, null, false, CU.empty0()).get(); } - @Override public String toString() { + @Override + public String toString() { return "removeAll [keys=" + keys + ']'; } }); @@ -2771,11 +2815,13 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V validateCacheKeys(keys); IgniteInternalFuture<Object> fut = asyncOp(new AsyncInOp(keys) { - @Override public IgniteInternalFuture<?> inOp(IgniteTxLocalAdapter tx) { + @Override + public IgniteInternalFuture<?> inOp(IgniteTxLocalAdapter tx) { return tx.removeAllAsync(ctx, keys, null, false, CU.empty0()).chain(RET2NULL); } - @Override public String toString() { + @Override + public String toString() { return "removeAllAsync [keys=" + keys + ']'; } }); @@ -2798,11 +2844,13 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V validateCacheKey(key); boolean rmv = syncOp(new SyncOp<Boolean>(true) { - @Override public Boolean op(IgniteTxLocalAdapter tx) throws IgniteCheckedException { + @Override + public Boolean op(IgniteTxLocalAdapter tx) throws IgniteCheckedException { return tx.removeAllAsync(ctx, Collections.singletonList(key), null, false, CU.empty0()).get().success(); } - @Override public String toString() { + @Override + public String toString() { return "removex [key=" + key + ']'; } }); @@ -2836,12 +2884,14 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V validateCacheKey(key); IgniteInternalFuture<Boolean> fut = asyncOp(new AsyncOp<Boolean>() { - @Override public IgniteInternalFuture<Boolean> op(IgniteTxLocalAdapter tx) { + @Override + public IgniteInternalFuture<Boolean> op(IgniteTxLocalAdapter tx) { return tx.removeAllAsync(ctx, Collections.singletonList(key), null, false, filter).chain( - (IgniteClosure<IgniteInternalFuture<GridCacheReturn>, Boolean>)RET2FLAG); + (IgniteClosure<IgniteInternalFuture<GridCacheReturn>, Boolean>) RET2FLAG); } - @Override public String toString() { + @Override + public String toString() { return "removeAsync [key=" + key + ", filter=" + Arrays.toString(filter) + ']'; } }); @@ -2860,19 +2910,21 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V validateCacheKey(key); return syncOp(new SyncOp<GridCacheReturn>(true) { - @Override public GridCacheReturn op(IgniteTxLocalAdapter tx) throws IgniteCheckedException { + @Override + public GridCacheReturn op(IgniteTxLocalAdapter tx) throws IgniteCheckedException { // Register before hiding in the filter. if (ctx.deploymentEnabled()) ctx.deploy().registerClass(val); - return (GridCacheReturn)tx.removeAllAsync(ctx, + return (GridCacheReturn) tx.removeAllAsync(ctx, Collections.singletonList(key), null, true, ctx.equalsValArray(val)).get(); } - @Override public String toString() { + @Override + public String toString() { return "remove [key=" + key + ", val=" + val + ']'; } }); @@ -2887,11 +2939,13 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V ctx.dr().onReceiveCacheEntriesReceived(drMap.size()); syncOp(new SyncInOp(false) { - @Override public void inOp(IgniteTxLocalAdapter tx) throws IgniteCheckedException { + @Override + public void inOp(IgniteTxLocalAdapter tx) throws IgniteCheckedException { tx.removeAllDrAsync(ctx, drMap).get(); } - @Override public String toString() { + @Override + public String toString() { return "removeAllConflict [drMap=" + drMap + ']'; } }); @@ -2906,11 +2960,13 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V ctx.dr().onReceiveCacheEntriesReceived(drMap.size()); return asyncOp(new AsyncInOp(drMap.keySet()) { - @Override public IgniteInternalFuture<?> inOp(IgniteTxLocalAdapter tx) { + @Override + public IgniteInternalFuture<?> inOp(IgniteTxLocalAdapter tx) { return tx.removeAllDrAsync(ctx, drMap); } - @Override public String toString() { + @Override + public String toString() { return "removeAllDrASync [drMap=" + drMap + ']'; } }); @@ -2926,20 +2982,22 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V validateCacheKey(key); return syncOp(new SyncOp<GridCacheReturn>(true) { - @Override public GridCacheReturn op(IgniteTxLocalAdapter tx) throws IgniteCheckedException { + @Override + public GridCacheReturn op(IgniteTxLocalAdapter tx) throws IgniteCheckedException { // Register before hiding in the filter. if (ctx.deploymentEnabled()) ctx.deploy().registerClass(oldVal); return (GridCacheReturn) tx.putAllAsync(ctx, - F.t(key, newVal), - true, - null, - -1, - ctx.equalsValArray(oldVal)).get(); + F.t(key, newVal), + true, + null, + -1, + ctx.equalsValArray(oldVal)).get(); } - @Override public String toString() { + @Override + public String toString() { return "replace [key=" + key + ", oldVal=" + oldVal + ", newVal=" + newVal + ']'; } }); @@ -2953,17 +3011,17 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V validateCacheKey(key); return asyncOp(new AsyncOp<GridCacheReturn>() { - @Override public IgniteInternalFuture<GridCacheReturn> op(IgniteTxLocalAdapter tx) { + @Override + public IgniteInternalFuture<GridCacheReturn> op(IgniteTxLocalAdapter tx) { // Register before hiding in the filter. try { if (ctx.deploymentEnabled()) ctx.deploy().registerClass(val); - } - catch (IgniteCheckedException e) { + } catch (IgniteCheckedException e) { return new GridFinishedFuture<>(e); } - IgniteInternalFuture<GridCacheReturn> fut = (IgniteInternalFuture)tx.removeAllAsync(ctx, + IgniteInternalFuture<GridCacheReturn> fut = (IgniteInternalFuture) tx.removeAllAsync(ctx, Collections.singletonList(key), null, true, @@ -2972,7 +3030,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V return fut; } - @Override public String toString() { + @Override + public String toString() { return "removeAsync [key=" + key + ", val=" + val + ']'; } }); @@ -2987,17 +3046,17 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V validateCacheKey(key); return asyncOp(new AsyncOp<GridCacheReturn>() { - @Override public IgniteInternalFuture<GridCacheReturn> op(IgniteTxLocalAdapter tx) { + @Override + public IgniteInternalFuture<GridCacheReturn> op(IgniteTxLocalAdapter tx) { // Register before hiding in the filter. try { if (ctx.deploymentEnabled()) ctx.deploy().registerClass(oldVal); - } - catch (IgniteCheckedException e) { + } catch (IgniteCheckedException e) { return new GridFinishedFuture<>(e); } - IgniteInternalFuture<GridCacheReturn> fut = (IgniteInternalFuture)tx.putAllAsync(ctx, + IgniteInternalFuture<GridCacheReturn> fut = (IgniteInternalFuture) tx.putAllAsync(ctx, F.t(key, newVal), true, null, @@ -3007,7 +3066,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V return fut; } - @Override public String toString() { + @Override + public String toString() { return "replaceAsync [key=" + key + ", oldVal=" + oldVal + ", newVal=" + newVal + ']'; } }); @@ -3027,16 +3087,18 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V validateCacheValue(val); boolean rmv = syncOp(new SyncOp<Boolean>(true) { - @Override public Boolean op(IgniteTxLocalAdapter tx) throws IgniteCheckedException { + @Override + public Boolean op(IgniteTxLocalAdapter tx) throws IgniteCheckedException { // Register before hiding in the filter. if (ctx.deploymentEnabled()) ctx.deploy().registerClass(val); return tx.removeAllAsync(ctx, Collections.singletonList(key), null, false, - ctx.equalsValArray(val)).get().success(); + ctx.equalsValArray(val)).get().success(); } - @Override public String toString() { + @Override + public String toString() { return "remove [key=" + key + ", val=" + val + ']'; } }); @@ -3061,23 +3123,24 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V validateCacheValue(val); IgniteInternalFuture<Boolean> fut = asyncOp(new AsyncOp<Boolean>() { - @Override public IgniteInternalFuture<Boolean> op(IgniteTxLocalAdapter tx) { + @Override + public IgniteInternalFuture<Boolean> op(IgniteTxLocalAdapter tx) { // Register before hiding in the filter. if (ctx.deploymentEnabled()) { try { ctx.deploy().registerClass(val); - } - catch (IgniteCheckedException e) { + } catch (IgniteCheckedException e) { return new GridFinishedFuture<>(e); } } return tx.removeAllAsync(ctx, Collections.singletonList(key), null, false, ctx.equalsValArray(val)).chain( - (IgniteClosure<IgniteInternalFuture<GridCacheReturn>, Boolean>)RET2FLAG); + (IgniteClosure<IgniteInternalFuture<GridCacheReturn>, Boolean>) RET2FLAG); } - @Override public String toString() { + @Override + public String toString() { return "removeAsync [key=" + key + ", val=" + val + ']'; } }); @@ -3247,10 +3310,10 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V TransactionConfiguration cfg = ctx.gridConfig().getTransactionConfiguration(); return txStart( - concurrency, - isolation, - cfg.getDefaultTxTimeout(), - 0 + concurrency, + isolation, + cfg.getDefaultTxTimeout(), + 0 ); } @@ -3686,19 +3749,18 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V return F.iterator(iterator(), new IgniteClosure<Cache.Entry<K, V>, Cache.Entry<K, V>>() { private IgniteCacheExpiryPolicy expiryPlc = - ctx.cache().expiryPolicy(opCtx != null ? opCtx.expiry() : null); + ctx.cache().expiryPolicy(opCtx != null ? opCtx.expiry() : null); - @Override public Cache.Entry<K, V> apply(Cache.Entry<K, V> lazyEntry) { + @Override + public Cache.Entry<K, V> apply(Cache.Entry<K, V> lazyEntry) { CacheOperationContext prev = ctx.gate().enter(opCtx); try { V val = localPeek(lazyEntry.getKey(), CachePeekModes.ONHEAP_ONLY, expiryPlc); return new CacheEntryImpl<>(lazyEntry.getKey(), val); - } - catch (IgniteCheckedException e) { + } catch (IgniteCheckedException e) { throw CU.convertToCacheException(e); - } - finally { + } finally { ctx.gate().leave(prev); } } @@ -3722,20 +3784,20 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V .execute(); return ctx.itHolder().iterator(fut, new CacheIteratorConverter<Cache.Entry<K, V>, Map.Entry<K, V>>() { - @Override protected Cache.Entry<K, V> convert(Map.Entry<K, V> e) { + @Override + protected Cache.Entry<K, V> convert(Map.Entry<K, V> e) { return new CacheEntryImpl<>(e.getKey(), e.getValue()); } - @Override protected void remove(Cache.Entry<K, V> item) { + @Override + protected void remove(Cache.Entry<K, V> item) { CacheOperationContext prev = ctx.gate().enter(opCtx); try { GridCacheAdapter.this.remove(item.getKey()); - } - catch (IgniteCheckedException e) { + } catch (IgniteCheckedException e) { throw CU.convertToCacheException(e); - } - finally { + } finally { ctx.gate().leave(prev); } } @@ -4380,7 +4442,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V return getAllAsync(Collections.singletonList(key), deserializePortable).chain( new CX1<IgniteInternalFuture<Map<K, V>>, V>() { - @Override public V applyx(IgniteInternalFuture<Map<K, V>> e) throws IgniteCheckedException { + @Override + public V applyx(IgniteInternalFuture<Map<K, V>> e) throws IgniteCheckedException { Map<K, V> map = e.get(); assert map.isEmpty() || map.size() == 1 : map.size(); @@ -4428,6 +4491,13 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V public abstract void onDeferredDelete(GridCacheEntryEx entry, GridCacheVersion ver); /** + * + */ + public void disconnected() { + // No-op. + } + + /** * Validates that given cache value implements {@link Externalizable}. * * @param val Cache value. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f3efd1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheDeploymentManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheDeploymentManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheDeploymentManager.java index ff109ed..475a6e9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheDeploymentManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheDeploymentManager.java @@ -116,6 +116,11 @@ public class GridCacheDeploymentManager<K, V> extends GridCacheSharedManagerAdap } /** {@inheritDoc} */ + @Override public boolean restartOnDisconnect() { + return true; + } + + /** {@inheritDoc} */ @Override protected void stop0(boolean cancel) { if (discoLsnr != null) cctx.gridEvents().removeLocalEventListener(discoLsnr); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f3efd1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java index d9d151c..d63e818 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java @@ -33,7 +33,7 @@ public class GridCacheGateway<K, V> { private final GridCacheContext<K, V> ctx; /** Stopped flag for dynamic caches. */ - private volatile boolean stopped; + private volatile State state = State.STARTED; /** */ private GridSpinReadWriteLock rwLock = new GridSpinReadWriteLock(); @@ -56,11 +56,46 @@ public class GridCacheGateway<K, V> { rwLock.readLock(); - if (stopped) { - rwLock.readUnlock(); + checkState(true, true); + } - throw new IllegalStateException("Dynamic cache has been stopped: " + ctx.name()); + /** + * + */ + private boolean checkState(boolean lock, boolean err) { + if (state != State.STARTED) { + do { + if (state == State.STOPPED) { + if (lock) + rwLock.readUnlock(); + + if (err) + throw new IllegalStateException("Cache has been stopped: " + ctx.name()); + else + return false; + } + else { + if (lock) + rwLock.readUnlock(); + + try { + synchronized (this) { + while (state == State.DISCONNECTED) + wait(); + } + } + catch (InterruptedException e) { + throw new IgniteException(e); + } + + if (lock) + rwLock.readLock(); + } + } + while (state != State.STARTED); } + + return true; } /** @@ -71,17 +106,11 @@ public class GridCacheGateway<K, V> { public boolean enterIfNotClosed() { onEnter(); - // Must unlock in case of unexpected errors to avoid - // deadlocks during kernal stop. + // Must unlock in case of unexpected errors to avoid deadlocks during kernal stop. rwLock.readLock(); - if (stopped) { - rwLock.readUnlock(); + return checkState(true, false); - return false; - } - - return true; } /** @@ -92,7 +121,7 @@ public class GridCacheGateway<K, V> { public boolean enterIfNotClosedNoLock() { onEnter(); - return !stopped; + return checkState(false, false); } /** @@ -144,11 +173,7 @@ public class GridCacheGateway<K, V> { rwLock.readLock(); - if (stopped) { - rwLock.readUnlock(); - - throw new IllegalStateException("Cache has been stopped: " + ctx.name()); - } + checkState(true, true); // Must unlock in case of unexpected errors to avoid // deadlocks during kernal stop. @@ -169,8 +194,7 @@ public class GridCacheGateway<K, V> { @Nullable public CacheOperationContext enterNoLock(@Nullable CacheOperationContext opCtx) { onEnter(); - if (stopped) - throw new IllegalStateException("Cache has been stopped: " + ctx.name()); + checkState(false, false); return setOperationContextPerCall(opCtx); } @@ -229,8 +253,48 @@ public class GridCacheGateway<K, V> { /** * */ - public void block() { - stopped = true; + public void stopped() { + state = State.STOPPED; + + synchronized (this) { + notifyAll(); + } + } + + /** + * + */ + public void onDisconnected() { + if (state == State.STARTED) + state = State.DISCONNECTED; + } + + /** + * + */ + public void waitOperations() { + rwLock.writeLock(); + + rwLock.writeUnlock(); + } + + /** + * @param stopped Cache stopped flag. + */ + public void reconnected(boolean stopped) { + rwLock.writeLock(); + + try { + if (state == State.DISCONNECTED) + state = stopped ? State.STOPPED : State.STARTED; + } + finally { + rwLock.writeUnlock(); + } + + synchronized (this) { + notifyAll(); + } } /** @@ -257,10 +321,28 @@ public class GridCacheGateway<K, V> { try { // No-op. - stopped = true; + state = State.STOPPED; } finally { rwLock.writeUnlock(); } + + synchronized (this) { + notifyAll(); + } + } + + /** + * + */ + private enum State { + /** */ + STARTED, + + /** */ + DISCONNECTED, + + /** */ + STOPPED } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f3efd1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java index 74a4512..48a16d5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java @@ -170,7 +170,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { /** {@inheritDoc} */ @SuppressWarnings("BusyWait") - @Override protected void onKernalStop0(boolean cancel) { + @Override protected void onKernalStop0(boolean cancel, boolean disconnected) { cctx.gridIO().removeMessageListener(TOPIC_CACHE); for (Object ordTopic : orderedHandlers.keySet()) @@ -891,6 +891,13 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { } /** + * @param cacheId Cache ID to remove handlers for. + */ + public void removeHandler(int cacheId, Class<? extends GridCacheMessage> type) { + clsHandlers.remove(new ListenerKey(cacheId, type)); + } + + /** * @param msgCls Message class to check. * @return Message index. */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f3efd1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java index c528e08..e2d22dd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java @@ -216,7 +216,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { } /** {@inheritDoc} */ - @Override public void onKernalStop0(boolean cancel) { + @Override public void onKernalStop0(boolean cancel, boolean disconnected) { cctx.gridEvents().removeLocalEventListener(discoLsnr); } @@ -293,9 +293,13 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { /** * Cancels all client futures. + * + * @param stop If {@code true} node is stopping, otherwise disconnected. */ - public void cancelClientFutures() { - IgniteCheckedException e = new IgniteCheckedException("Operation has been cancelled (grid is stopping)."); + public void cancelClientFutures(boolean stop) { + IgniteCheckedException e = stop ? + new IgniteCheckedException("Operation has been cancelled (node is stopping).") : + new IgniteCheckedException("Operation has been cancelled (node disconnected)."); for (Collection<GridCacheFuture<?>> futures : futs.values()) { for (GridCacheFuture<?> future : futures) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f3efd1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index af87685..f0c9b3b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -207,6 +207,11 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana }; /** {@inheritDoc} */ + @Override public boolean restartOnDisconnect() { + return true; + } + + /** {@inheritDoc} */ @Override protected void start0() throws IgniteCheckedException { super.start0(); @@ -281,6 +286,11 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana break; } + catch (IgniteDisconnectedCheckedException e) { + log.info("Disconnected while waiting for initial partition map exchange: " + e); + + break; + } catch (IgniteFutureTimeoutCheckedException ignored) { if (first) { U.warn(log, "Failed to wait for initial partition map exchange. " + @@ -313,13 +323,23 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana } /** {@inheritDoc} */ - @Override protected void onKernalStop0(boolean cancel) { + @Override protected void onKernalStop0(boolean cancel, boolean disconnected) { + cctx.gridEvents().removeLocalEventListener(discoLsnr); + + cctx.io().removeHandler(0, GridDhtPartitionsSingleMessage.class); + cctx.io().removeHandler(0, GridDhtPartitionsFullMessage.class); + cctx.io().removeHandler(0, GridDhtPartitionsSingleRequest.class); + + IgniteCheckedException err = disconnected ? + new IgniteDisconnectedCheckedException("Node disconnected: " + cctx.gridName()) : + new IgniteInterruptedCheckedException("Node is stopping: " + cctx.gridName()); + // Finish all exchange futures. for (GridDhtPartitionsExchangeFuture f : exchFuts.values()) - f.onDone(new IgniteInterruptedCheckedException("Grid is stopping: " + cctx.gridName())); + f.onDone(err); for (AffinityReadyFuture f : readyFuts.values()) - f.onDone(new IgniteInterruptedCheckedException("Grid is stopping: " + cctx.gridName())); + f.onDone(err); U.cancel(exchWorker); @@ -1099,6 +1119,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana catch (IgniteInterruptedCheckedException e) { throw e; } + catch (IgniteDisconnectedCheckedException e) { + return; + } catch (IgniteCheckedException e) { U.error(log, "Failed to wait for completion of partition map exchange " + "(preloading will not start): " + exchFut, e); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f5f3efd1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index 2f7f22c..e11a221 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -341,8 +341,8 @@ public class GridCacheProcessor extends GridProcessorAdapter { "Serializable transactions are disabled while default transaction isolation is SERIALIZABLE " + "(most likely misconfiguration - either update 'isTxSerializableEnabled' or " + "'defaultTxIsolationLevel' properties) for cache: " + U.maskName(cc.getName()), - "Serializable transactions are disabled while default transaction isolation is SERIALIZABLE " + - "for cache: " + U.maskName(cc.getName())); + "Serializable transactions are disabled while default transaction isolation is SERIALIZABLE " + + "for cache: " + U.maskName(cc.getName())); if (cc.isWriteBehindEnabled()) { if (cfgStore == null) @@ -567,8 +567,8 @@ public class GridCacheProcessor extends GridProcessorAdapter { CacheConfiguration[] cfgs = ctx.config().getCacheConfiguration(); - sharedCtx = createSharedContext(ctx, CU.startStoreSessionListeners(ctx, - ctx.config().getCacheStoreSessionListenerFactories())); + sharedCtx = createSharedContext(ctx, CU.startStoreSessionListeners( + ctx, ctx.config().getCacheStoreSessionListenerFactories())); ctx.performance().add("Disable serializable transactions (set 'txSerializableEnabled' to false)", !ctx.config().getTransactionConfiguration().isTxSerializableEnabled()); @@ -871,10 +871,42 @@ public class GridCacheProcessor extends GridProcessorAdapter { it.hasPrevious();) { GridCacheSharedManager<?, ?> mgr = it.previous(); - mgr.onKernalStop(cancel); + mgr.onKernalStop(cancel, false); } } + /** {@inheritDoc} */ + @Override public void onDisconnected() throws IgniteCheckedException { + for (GridCacheAdapter cache : caches.values()) + cache.context().gate().onDisconnected(); + + sharedCtx.mvcc().cancelClientFutures(false); + + for (GridCacheAdapter cache : caches.values()) + cache.disconnected(); + + registeredCaches.clear(); + + sharedCtx.onDisconnected(); + } + + /** {@inheritDoc} */ + @Override public void onReconnected() throws IgniteCheckedException { + for (GridCacheAdapter cache : caches.values()) + cache.context().gate().reconnected(false); + + ctx.marshallerContext().onMarshallerCacheStarted(ctx); + + marshallerCache().context().preloader().syncFuture().listen(new CIX1<IgniteInternalFuture<?>>() { + @Override public void applyx(IgniteInternalFuture<?> f) throws IgniteCheckedException { + ctx.marshallerContext().onMarshallerCachePreloaded(ctx); + } + }); + + for (GridCacheSharedManager<?, ?> mgr : sharedCtx.managers()) + mgr.onKernalStart(); + } + /** * @param cache Cache to start. * @throws IgniteCheckedException If failed to start cache. @@ -1487,7 +1519,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { IgniteCacheProxy<?, ?> proxy = jCacheProxies.get(maskNull(req.cacheName())); if (proxy != null) - proxy.gate().block(); + proxy.gate().stopped(); } /** @@ -1591,7 +1623,8 @@ public class GridCacheProcessor extends GridProcessorAdapter { * @return Shared context. */ @SuppressWarnings("unchecked") - private GridCacheSharedContext createSharedContext(GridKernalContext kernalCtx, + private GridCacheSharedContext createSharedContext( + GridKernalContext kernalCtx, Collection<CacheStoreSessionListener> storeSesLsnrs) { IgniteTxManager tm = new IgniteTxManager(); GridCacheMvccManager mvccMgr = new GridCacheMvccManager(); @@ -2796,8 +2829,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { * Cancel all user operations. */ public void cancelUserOperations() { - for (GridCacheAdapter<?, ?> cache : caches.values()) - cache.ctx.mvcc().cancelClientFutures(); + sharedCtx.mvcc().cancelClientFutures(true); } /**