#ignite-373: Get locks for partitions in removeAll().
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/1ffe1158 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/1ffe1158 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/1ffe1158 Branch: refs/heads/ignite-373 Commit: 1ffe1158e95145bc7f0912d8e912403757add60c Parents: cb09f7c Author: ivasilinets <ivasilin...@gridgain.com> Authored: Tue May 12 13:36:30 2015 +0300 Committer: ivasilinets <ivasilin...@gridgain.com> Committed: Tue May 12 13:36:30 2015 +0300 ---------------------------------------------------------------------- .../GridDistributedCacheAdapter.java | 73 +++++++++++++------- 1 file changed, 47 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1ffe1158/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java index b4417a0..29a806b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java @@ -30,6 +30,7 @@ import org.apache.ignite.internal.processors.cache.version.*; import org.apache.ignite.internal.processors.datastreamer.*; import org.apache.ignite.internal.processors.task.*; import org.apache.ignite.internal.util.future.*; +import org.apache.ignite.internal.util.lang.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; import org.apache.ignite.resources.*; @@ -41,6 +42,7 @@ import java.util.*; import java.util.concurrent.*; import static org.apache.ignite.internal.GridClosureCallMode.*; +import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.*; /** * Distributed cache implementation. @@ -142,10 +144,10 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter try { AffinityTopologyVersion topVer; - boolean removed; + boolean removedAll; do { - removed = true; + removedAll = true; topVer = ctx.affinity().affinityTopologyVersion(); @@ -156,17 +158,17 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter if (!nodes.isEmpty()) { CacheOperationContext opCtx = ctx.operationContextPerCall(); - Collection<Object> results = ctx.closures().callAsyncNoFailover(BROADCAST, + Collection<Boolean> results = ctx.closures().callAsyncNoFailover(BROADCAST, Collections.singleton(new GlobalRemoveAllCallable<>(name(), topVer, opCtx != null && opCtx.skipStore())), nodes, true).get(); - for (Object res : results) { - if (res != null) - removed = false; + for (Boolean res : results) { + if (res != null && !res) + removedAll = false; } } } - while (ctx.affinity().affinityTopologyVersion().compareTo(topVer) != 0 || !removed); + while (ctx.affinity().affinityTopologyVersion().compareTo(topVer) != 0 || !removedAll); } catch (ClusterGroupEmptyCheckedException ignore) { if (log.isDebugEnabled()) @@ -241,7 +243,7 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter * operation on a cache with the given name. */ @GridInternal - private static class GlobalRemoveAllCallable<K,V> implements Callable<Object>, Externalizable { + private static class GlobalRemoveAllCallable<K,V> implements Callable<Boolean>, Externalizable { /** */ private static final long serialVersionUID = 0L; @@ -279,11 +281,11 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter /** * {@inheritDoc} */ - @Override public Object call() throws Exception { + @Override public Boolean call() throws Exception { GridCacheAdapter<K, V> cacheAdapter = ((IgniteKernal)ignite).context().cache().internalCache(cacheName); if (cacheAdapter == null) - return new Integer(-1); + return false; final GridCacheContext<K, V> ctx = cacheAdapter.context(); @@ -293,7 +295,7 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter try { if (!ctx.affinity().affinityTopologyVersion().equals(topVer)) - return new Integer(-1); // Ignore this remove request because remove request will be sent again. + return false; // Ignore this remove request because remove request will be sent again. GridDhtCacheAdapter<K, V> dht; GridNearCacheAdapter<K, V> near = null; @@ -313,24 +315,43 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter dataLdr.receiver(DataStreamerCacheUpdaters.<KeyCacheObject, Object>batched()); - for (GridDhtLocalPartition locPart : dht.topology().currentLocalPartitions()) { - if (!locPart.isEmpty() && locPart.primary(topVer)) { - for (GridDhtCacheEntry o : locPart.entries()) { - if (!o.obsoleteOrDeleted()) - dataLdr.removeDataInternal(o.key()); - } + for (GridDhtLocalPartition locPart : dht.topology().localPartitions()) { + if (locPart.state() == EVICTED) { + assert locPart.entries().size() == 0; + + continue; } - } - Iterator<KeyCacheObject> it = dht.context().swap().offHeapKeyIterator(true, false, topVer); + if (locPart == null || locPart.state() != OWNING || !locPart.reserve()) + return false; + + try { + if (!locPart.isEmpty() && locPart.primary(topVer)) { + for (GridDhtCacheEntry o : locPart.entries()) { + if (!ctx.affinity().belongs(ctx.localNode(), locPart.id(), dht.topology().topologyVersion())) + return false; + + if (!o.obsoleteOrDeleted()) + dataLdr.removeDataInternal(o.key()); + } + } - while (it.hasNext()) - dataLdr.removeDataInternal(it.next()); + GridCloseableIterator<Map.Entry<byte[], GridCacheSwapEntry>> iter = + ctx.swap().iterator(locPart.id()); - it = dht.context().swap().swapKeyIterator(true, false, topVer); + if (iter != null) { + for (Map.Entry<byte[], GridCacheSwapEntry> e : iter) { + if (!ctx.affinity().belongs(ctx.localNode(), locPart.id(), dht.topology().topologyVersion())) + return false; - while (it.hasNext()) - dataLdr.removeDataInternal(it.next()); + dataLdr.removeDataInternal(ctx.toCacheKeyObject(e.getKey())); + } + } + } + finally { + locPart.release(); + } + } } if (near != null) { @@ -347,9 +368,9 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter } if (!ctx.affinity().affinityTopologyVersion().equals(topVer)) - return new Integer(-1); + return false; - return null; + return true; } /** {@inheritDoc} */