#ignite-373: wip.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/778a0a72 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/778a0a72 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/778a0a72 Branch: refs/heads/ignite-373 Commit: 778a0a722fd066f77c58993c872fc5e409316ec1 Parents: 478b3ee Author: ivasilinets <ivasilin...@gridgain.com> Authored: Tue May 12 11:19:31 2015 +0300 Committer: ivasilinets <ivasilin...@gridgain.com> Committed: Tue May 12 11:19:31 2015 +0300 ---------------------------------------------------------------------- .../GridDistributedCacheAdapter.java | 38 ++++++++++++++++---- .../cache/CacheRemoveAllSelfTest.java | 18 +++++++++- 2 files changed, 49 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/778a0a72/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java index 3a685cc..aa42067 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.cache.distributed; import org.apache.ignite.*; +import org.apache.ignite.cache.*; import org.apache.ignite.cluster.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.cluster.*; @@ -142,21 +143,32 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter try { AffinityTopologyVersion topVer; + boolean removed; + do { + removed = true; + + System.out.println("!!!!Redone remove all"); topVer = ctx.affinity().affinityTopologyVersion(); // Send job to all data nodes. Collection<ClusterNode> nodes = ctx.grid().cluster().forDataNodes(name()).nodes(); + if (!nodes.isEmpty()) { CacheOperationContext opCtx = ctx.operationContextPerCall(); - ctx.closures().callAsyncNoFailover(BROADCAST, - new GlobalRemoveAllCallable<>(name(), topVer, opCtx != null && opCtx.skipStore()), nodes, + Collection<Object> results = ctx.closures().callAsyncNoFailover(BROADCAST, + Collections.singleton(new GlobalRemoveAllCallable<>(name(), topVer, opCtx != null && opCtx.skipStore())), nodes, true).get(); + + for (Object res : results) { + if (res != null) + removed = false; + } } } - while (ctx.affinity().affinityTopologyVersion().compareTo(topVer) > 0); + while (ctx.affinity().affinityTopologyVersion().compareTo(topVer) != 0 || !removed); } catch (ClusterGroupEmptyCheckedException ignore) { if (log.isDebugEnabled()) @@ -272,6 +284,9 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter @Override public Object call() throws Exception { GridCacheAdapter<K, V> cacheAdapter = ((IgniteKernal)ignite).context().cache().internalCache(cacheName); + if (cacheAdapter == null) + return new Integer(-1); + final GridCacheContext<K, V> ctx = cacheAdapter.context(); ctx.affinity().affinityReadyFuture(topVer).get(); @@ -279,8 +294,10 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter ctx.gate().enter(); try { - if (!ctx.affinity().affinityTopologyVersion().equals(topVer)) - return null; // Ignore this remove request because remove request will be sent again. + if (!ctx.affinity().affinityTopologyVersion().equals(topVer)) { + System.out.println("!!!! have different version"); + return new Integer(-1); // Ignore this remove request because remove request will be sent again. + } GridDhtCacheAdapter<K, V> dht; GridNearCacheAdapter<K, V> near = null; @@ -303,7 +320,7 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter for (GridDhtLocalPartition locPart : dht.topology().currentLocalPartitions()) { if (!locPart.isEmpty() && locPart.primary(topVer)) { for (GridDhtCacheEntry o : locPart.entries()) { - if (!o.obsoleteOrDeleted()) + //if (!o.obsoleteOrDeleted()) dataLdr.removeDataInternal(o.key()); } } @@ -333,6 +350,15 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter ctx.gate().leave(); } + if (!ctx.affinity().affinityTopologyVersion().equals(topVer)) { + System.out.println("!!!! have different version in the end. Local size=" + + cacheAdapter.localSize(new CachePeekMode[]{CachePeekMode.ALL}) + + ", local primary size=" + cacheAdapter.localSize(new CachePeekMode[]{CachePeekMode.PRIMARY}) + + ", local backup size=" + cacheAdapter.localSize(new CachePeekMode[]{CachePeekMode.BACKUP})); + + return new Integer(-1); + } + return null; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/778a0a72/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheRemoveAllSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheRemoveAllSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheRemoveAllSelfTest.java index 9162ac4..c4455b7 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheRemoveAllSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheRemoveAllSelfTest.java @@ -18,7 +18,9 @@ package org.apache.ignite.internal.processors.cache; import org.apache.ignite.*; +import org.apache.ignite.cache.*; import org.apache.ignite.internal.*; +import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.testframework.*; import java.util.concurrent.*; @@ -28,6 +30,11 @@ import java.util.concurrent.atomic.*; * Test remove all method. */ public class CacheRemoveAllSelfTest extends GridCacheAbstractSelfTest { + @Override + protected long getTestTimeout() { + return 600000; + } + /** {@inheritDoc} */ @Override protected int gridCount() { return 4; @@ -58,7 +65,16 @@ public class CacheRemoveAllSelfTest extends GridCacheAbstractSelfTest { fut.get(); for (int i = 0; i < igniteId.get(); ++i) - assertEquals(0, grid(i).cache(null).localSize()); + assertEquals("Local entries: " + entrySet(grid(i).cache(null).localEntries(CachePeekMode.PRIMARY)) + + ". All entries:" + + entrySet(grid(i).cache(null).localEntries()), 0, grid(i).cache(null).localSize()); + + U.sleep(5000); + + for (int i = 0; i < igniteId.get(); ++i) + assertEquals("2 Local entries: " + entrySet(grid(i).cache(null).localEntries(CachePeekMode.PRIMARY)) + + ". All entries:" + + entrySet(grid(i).cache(null).localEntries()), 0, grid(i).cache(null).localSize()); assertEquals(0, cache.size()); }