Repository: incubator-ignite Updated Branches: refs/heads/ignite-901 9e9847599 -> 2222303f2
# ignite-901 Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/2222303f Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/2222303f Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/2222303f Branch: refs/heads/ignite-901 Commit: 2222303f27e55841a6bc2142129be4f29c859e9d Parents: 9e98475 Author: sboikov <sboi...@gridgain.com> Authored: Mon Jul 13 13:13:38 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Mon Jul 13 13:13:38 2015 +0300 ---------------------------------------------------------------------- .../CacheDataStructuresManager.java | 22 +++++++---- .../datastructures/DataStructuresProcessor.java | 24 ++++++++---- .../datastructures/GridCacheAtomicLongImpl.java | 33 ++++++++++++++-- .../GridCacheAtomicReferenceImpl.java | 34 +++++++++++++++-- .../GridCacheAtomicSequenceImpl.java | 33 ++++++++++++++-- .../GridCacheAtomicStampedImpl.java | 33 ++++++++++++++-- .../GridCacheCountDownLatchImpl.java | 2 +- .../datastructures/GridCacheRemovable.java | 6 +-- .../datastructures/GridCacheSetImpl.java | 2 +- .../datastructures/GridCacheSetProxy.java | 40 +++++++++++++++++++- 10 files changed, 194 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2222303f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java index f710105..78bd0eb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java @@ -106,28 +106,36 @@ public class CacheDataStructuresManager extends GridCacheManagerAdapter { } /** - * Client reconnect callback. - * + * @param set Set. + */ + public void onRemoved(GridCacheSetProxy set) { + setsMap.remove(set.delegate().id(), set); + } + + /** + * @param clusterRestarted Cluster restarted flag. * @throws IgniteCheckedException If failed. */ - public void onReconnected() throws IgniteCheckedException { + public void onReconnected(boolean clusterRestarted) throws IgniteCheckedException { for (Map.Entry<IgniteUuid, GridCacheSetProxy> e : setsMap.entrySet()) { GridCacheSetProxy set = e.getValue(); - if (!set.delegate().checkHeader()) { + if (clusterRestarted) { set.blockOnRemove(); - setsMap.remove(e.getKey(), e.getValue()); + setsMap.remove(e.getKey(), set); } + else + set.needCheckNotRemoved(); } for (Map.Entry<IgniteUuid, GridCacheQueueProxy> e : queuesMap.entrySet()) { GridCacheQueueProxy queue = e.getValue(); - if (!queue.delegate().checkHeader()) { + if (clusterRestarted) { queue.delegate().onRemoved(false); - queuesMap.remove(e.getKey(), e.getValue()); + queuesMap.remove(e.getKey(), queue); } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2222303f/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java index 57b0834..57b16f0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java @@ -184,22 +184,30 @@ public final class DataStructuresProcessor extends GridProcessorAdapter { dsCacheCtx.continuousQueries().cancelInternalQuery(qryId); } + /** + * @param key Key. + * @param obj Object. + */ + void onRemoved(GridCacheInternal key, GridCacheRemovable obj) { + dsMap.remove(key, obj); + } + /** {@inheritDoc} */ @Override public void onReconnected(boolean clusterRestarted) throws IgniteCheckedException { - Set<GridCacheInternal> keys = dsMap.keySet(); - - Map<GridCacheInternal, GridCacheInternal> vals = dsView.getAll(keys); - for (Map.Entry<GridCacheInternal, GridCacheRemovable> e : dsMap.entrySet()) { - if (!vals.containsKey(e.getKey())) { - dsMap.remove(e.getKey()); + GridCacheRemovable obj = e.getValue(); + + if (clusterRestarted) { + obj.onRemoved(); - e.getValue().onRemoved(); + dsMap.remove(e.getKey(), obj); } + else + obj.needCheckNotRemoved(); } for (GridCacheContext cctx : ctx.cache().context().cacheContexts()) - cctx.dataStructures().onReconnected(); + cctx.dataStructures().onReconnected(clusterRestarted); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2222303f/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java index 5e9245d..1d6e735 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java @@ -57,6 +57,9 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext /** Removed flag.*/ private volatile boolean rmvd; + /** Check removed flag. */ + private boolean rmvCheck; + /** Atomic long key. */ private GridCacheInternalKey key; @@ -336,7 +339,31 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext */ private void checkRemoved() throws IllegalStateException { if (rmvd) - throw new IllegalStateException("Atomic long was removed from cache: " + name); + throw removedError(); + + if (rmvCheck) { + try { + rmvd = atomicView.get(key) == null; + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + + rmvCheck = false; + + if (rmvd) { + ctx.kernalContext().dataStructures().onRemoved(key, this); + + throw removedError(); + } + } + } + + /** + * @return Error. + */ + private IllegalStateException removedError() { + return new IllegalStateException("Atomic long was removed from cache: " + name); } /** {@inheritDoc} */ @@ -345,8 +372,8 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext } /** {@inheritDoc} */ - @Override public void onInvalid(@Nullable Exception err) { - // No-op. + @Override public void needCheckNotRemoved() { + rmvCheck = true; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2222303f/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java index 0c4e5e6..f740c4e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java @@ -24,7 +24,6 @@ import org.apache.ignite.internal.processors.cache.transactions.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; -import org.jetbrains.annotations.*; import java.io.*; import java.util.concurrent.*; @@ -56,6 +55,9 @@ public final class GridCacheAtomicReferenceImpl<T> implements GridCacheAtomicRef /** Status.*/ private volatile boolean rmvd; + /** Check removed flag. */ + private boolean rmvCheck; + /** Atomic reference key. */ private GridCacheInternalKey key; @@ -156,8 +158,8 @@ public final class GridCacheAtomicReferenceImpl<T> implements GridCacheAtomicRef } /** {@inheritDoc} */ - @Override public void onInvalid(@Nullable Exception err) { - // No-op. + @Override public void needCheckNotRemoved() { + rmvCheck = true; } /** {@inheritDoc} */ @@ -293,7 +295,31 @@ public final class GridCacheAtomicReferenceImpl<T> implements GridCacheAtomicRef */ private void checkRemoved() throws IllegalStateException { if (rmvd) - throw new IllegalStateException("Atomic reference was removed from cache: " + name); + throw removedError(); + + if (rmvCheck) { + try { + rmvd = atomicView.get(key) == null; + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + + rmvCheck = false; + + if (rmvd) { + ctx.kernalContext().dataStructures().onRemoved(key, this); + + throw removedError(); + } + } + } + + /** + * @return Error. + */ + private IllegalStateException removedError() { + return new IllegalStateException("Atomic reference was removed from cache: " + name); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2222303f/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java index 2400a7e..31f4f24 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java @@ -61,6 +61,9 @@ public final class GridCacheAtomicSequenceImpl implements GridCacheAtomicSequenc /** Removed flag. */ private volatile boolean rmvd; + /** Check removed flag. */ + private boolean rmvCheck; + /** Sequence key. */ private GridCacheInternalKey key; @@ -391,7 +394,31 @@ public final class GridCacheAtomicSequenceImpl implements GridCacheAtomicSequenc */ private void checkRemoved() throws IllegalStateException { if (rmvd) - throw new IllegalStateException("Sequence was removed from cache: " + name); + throw removedError(); + + if (rmvCheck) { + try { + rmvd = seqView.get(key) == null; + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + + rmvCheck = false; + + if (rmvd) { + ctx.kernalContext().dataStructures().onRemoved(key, this); + + throw removedError(); + } + } + } + + /** + * @return Error. + */ + private IllegalStateException removedError() { + return new IllegalStateException("Sequence was removed from cache: " + name); } /** {@inheritDoc} */ @@ -400,8 +427,8 @@ public final class GridCacheAtomicSequenceImpl implements GridCacheAtomicSequenc } /** {@inheritDoc} */ - @Override public void onInvalid(@Nullable Exception err) { - // No-op. + @Override public void needCheckNotRemoved() { + rmvCheck = true; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2222303f/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicStampedImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicStampedImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicStampedImpl.java index 76ea7ca..d2dedeb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicStampedImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicStampedImpl.java @@ -59,6 +59,9 @@ public final class GridCacheAtomicStampedImpl<T, S> implements GridCacheAtomicSt /** Removed flag.*/ private volatile boolean rmvd; + /** Check removed flag. */ + private boolean rmvCheck; + /** Atomic stamped key. */ private GridCacheInternalKey key; @@ -206,8 +209,8 @@ public final class GridCacheAtomicStampedImpl<T, S> implements GridCacheAtomicSt } /** {@inheritDoc} */ - @Override public void onInvalid(@Nullable Exception err) { - // No-op. + @Override public void needCheckNotRemoved() { + rmvCheck = true; } /** {@inheritDoc} */ @@ -369,7 +372,31 @@ public final class GridCacheAtomicStampedImpl<T, S> implements GridCacheAtomicSt */ private void checkRemoved() throws IllegalStateException { if (rmvd) - throw new IllegalStateException("Atomic stamped was removed from cache: " + name); + throw removedError(); + + if (rmvCheck) { + try { + rmvd = atomicView.get(key) == null; + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + + rmvCheck = false; + + if (rmvd) { + ctx.kernalContext().dataStructures().onRemoved(key, this); + + throw removedError(); + } + } + } + + /** + * @return Error. + */ + private IllegalStateException removedError() { + return new IllegalStateException("Atomic stamped was removed from cache: " + name); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2222303f/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java index 59b7c91..95b970a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java @@ -209,7 +209,7 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc } /** {@inheritDoc} */ - @Override public void onInvalid(@Nullable Exception err) { + @Override public void needCheckNotRemoved() { // No-op. } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2222303f/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheRemovable.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheRemovable.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheRemovable.java index 48d8644..dd4f2cc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheRemovable.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheRemovable.java @@ -17,8 +17,6 @@ package org.apache.ignite.internal.processors.datastructures; -import org.jetbrains.annotations.*; - /** * Provides callback for marking object as removed. */ @@ -31,7 +29,7 @@ public interface GridCacheRemovable { public boolean onRemoved(); /** - * @param err Error which cause data structure to become invalid. + * */ - public void onInvalid(@Nullable Exception err); + public void needCheckNotRemoved(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2222303f/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java index 936662a..6d920fd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java @@ -489,7 +489,7 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements Ignite /** * @return Set ID. */ - IgniteUuid id() { + public IgniteUuid id() { return id; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2222303f/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetProxy.java index 38c124a..90c26f0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetProxy.java @@ -57,6 +57,9 @@ public class GridCacheSetProxy<T> implements IgniteSet<T>, Externalizable { /** Busy lock. */ private GridSpinBusyLock busyLock; + /** Check removed flag. */ + private boolean rmvCheck; + /** * Required by {@link Externalizable}. */ @@ -517,8 +520,43 @@ public class GridCacheSetProxy<T> implements IgniteSet<T>, Externalizable { * Enters busy state. */ private void enterBusy() { + boolean rmvd; + + if (rmvCheck) { + try { + rmvd = !delegate().checkHeader(); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + + rmvCheck = false; + + if (rmvd) { + delegate.removed(true); + + cctx.dataStructures().onRemoved(this); + + throw removedError(); + } + } + if (!busyLock.enterBusy()) - throw new IllegalStateException("Set has been removed from cache: " + delegate); + throw removedError(); + } + + /** + * + */ + public void needCheckNotRemoved() { + rmvCheck = true; + } + + /** + * @return Error. + */ + private IllegalStateException removedError() { + return new IllegalStateException("Set has been removed from cache: " + delegate); } /**