#ignite-373: Cache is not empty after removeAll.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/593e3eee Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/593e3eee Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/593e3eee Branch: refs/heads/ignite-843 Commit: 593e3eeeb0d4965b1c1a83d4f68a9d18e6615632 Parents: 7c91389 Author: ivasilinets <ivasilin...@gridgain.com> Authored: Thu May 14 15:35:27 2015 +0300 Committer: ivasilinets <ivasilin...@gridgain.com> Committed: Thu May 14 15:35:27 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheAdapter.java | 119 +++++------ .../GridDistributedCacheAdapter.java | 210 ++++++++++++------- .../cache/CacheRemoveAllSelfTest.java | 81 +++++++ .../near/NoneRebalanceModeSelfTest.java | 67 ++++++ .../testsuites/IgniteCacheTestSuite2.java | 1 + .../testsuites/IgniteCacheTestSuite4.java | 2 + 6 files changed, 338 insertions(+), 142 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/593e3eee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index 3826bfa..4106cb0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -1133,7 +1133,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V ctx.kernalContext().task().setThreadContext(TC_SUBGRID, nodes); - ctx.kernalContext().task().execute(new ClearTask(ctx, keys), null).get(); + ctx.kernalContext().task().execute( + new ClearTask(ctx.name(), ctx.affinity().affinityTopologyVersion(), keys), null).get(); } } @@ -1152,7 +1153,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V if (!nodes.isEmpty()) { ctx.kernalContext().task().setThreadContext(TC_SUBGRID, nodes); - return ctx.kernalContext().task().execute(new ClearTask(ctx, keys), null); + return ctx.kernalContext().task().execute( + new ClearTask(ctx.name(), ctx.affinity().affinityTopologyVersion(), keys), null); } else return new GridFinishedFuture<>(); @@ -3571,7 +3573,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V ctx.kernalContext().task().setThreadContext(TC_SUBGRID, nodes); - return ctx.kernalContext().task().execute(new SizeTask(ctx, peekModes), null); + return ctx.kernalContext().task().execute( + new SizeTask(ctx.name(), ctx.affinity().affinityTopologyVersion(), peekModes), null); } /** {@inheritDoc} */ @@ -4827,13 +4830,6 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V private static final long serialVersionUID = 0L; /** - * Empty constructor for serialization. - */ - public GlobalClearAllJob() { - // No-op. - } - - /** * @param cacheName Cache name. * @param topVer Affinity topology version. */ @@ -4859,14 +4855,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V private static final long serialVersionUID = 0L; /** Keys to remove. */ - private Set<? extends K> keys; - - /** - * Empty constructor for serialization. - */ - public GlobalClearKeySetJob() { - // No-op. - } + private final Set<? extends K> keys; /** * @param cacheName Cache name. @@ -4897,14 +4886,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V private static final long serialVersionUID = 0L; /** Peek modes. */ - private CachePeekMode[] peekModes; - - /** - * Required by {@link Externalizable}. - */ - public SizeJob() { - // No-op. - } + private final CachePeekMode[] peekModes; /** * @param cacheName Cache name. @@ -5514,17 +5496,10 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V protected Ignite ignite; /** Affinity topology version. */ - protected AffinityTopologyVersion topVer; + protected final AffinityTopologyVersion topVer; /** Cache name. */ - protected String cacheName; - - /** - * Empty constructor for serialization. - */ - public TopologyVersionAwareJob() { - // No-op. - } + protected final String cacheName; /** * @param cacheName Cache name. @@ -5583,24 +5558,23 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /** */ private static final long serialVersionUID = 0L; - /** Cache context. */ - private GridCacheContext ctx; + /** Cache name. */ + private final String cacheName; - /** Peek modes. */ - private CachePeekMode[] peekModes; + /** Affinity topology version. */ + private final AffinityTopologyVersion topVer; - /** - * Empty constructor for serialization. - */ - public SizeTask() { - // No-op. - } + /** Peek modes. */ + private final CachePeekMode[] peekModes; /** - * @param ctx Cache context. + * @param cacheName Cache name. + * @param topVer Affinity topology version. + * @param peekModes Cache peek modes. */ - public SizeTask(GridCacheContext ctx, CachePeekMode[] peekModes) { - this.ctx = ctx; + public SizeTask(String cacheName, AffinityTopologyVersion topVer, CachePeekMode[] peekModes) { + this.cacheName = cacheName; + this.topVer = topVer; this.peekModes = peekModes; } @@ -5610,13 +5584,22 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V Map<ComputeJob, ClusterNode> jobs = new HashMap(); for (ClusterNode node : subgrid) - jobs.put(new SizeJob(ctx.name(), ctx.affinity().affinityTopologyVersion(), peekModes), node); + jobs.put(new SizeJob(cacheName, topVer, peekModes), node); return jobs; } /** {@inheritDoc} */ @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) { + IgniteException e = res.getException(); + + if (e != null) { + if (e instanceof ClusterTopologyException) + return ComputeJobResultPolicy.WAIT; + + throw new IgniteException("Remote job threw exception.", e); + } + return ComputeJobResultPolicy.WAIT; } @@ -5640,25 +5623,23 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /** */ private static final long serialVersionUID = 0L; - /** Cache context. */ - private GridCacheContext ctx; + /** Cache name. */ + private final String cacheName; - /** Keys to clear. */ - private Set<? extends K> keys; + /** Affinity topology version. */ + private final AffinityTopologyVersion topVer; - /** - * Empty constructor for serialization. - */ - public ClearTask() { - // No-op. - } + /** Keys to clear. */ + private final Set<? extends K> keys; /** - * @param ctx Cache context. + * @param cacheName Cache name. + * @param topVer Affinity topology version. * @param keys Keys to clear. */ - public ClearTask(GridCacheContext ctx, Set<? extends K> keys) { - this.ctx = ctx; + public ClearTask(String cacheName, AffinityTopologyVersion topVer, Set<? extends K> keys) { + this.cacheName = cacheName; + this.topVer = topVer; this.keys = keys; } @@ -5668,9 +5649,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V Map<ComputeJob, ClusterNode> jobs = new HashMap(); for (ClusterNode node : subgrid) { - jobs.put(keys == null ? - new GlobalClearAllJob(ctx.name(), ctx.affinity().affinityTopologyVersion()) : - new GlobalClearKeySetJob<K>(ctx.name(), ctx.affinity().affinityTopologyVersion(), keys), + jobs.put(keys == null ? new GlobalClearAllJob(cacheName, topVer) : + new GlobalClearKeySetJob<K>(cacheName, topVer, keys), node); } @@ -5679,6 +5659,15 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /** {@inheritDoc} */ @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) { + IgniteException e = res.getException(); + + if (e != null) { + if (e instanceof ClusterTopologyException) + return ComputeJobResultPolicy.WAIT; + + throw new IgniteException("Remote job threw exception.", e); + } + return ComputeJobResultPolicy.WAIT; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/593e3eee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java index 3a685cc..c5ef22f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed; import org.apache.ignite.*; import org.apache.ignite.cluster.*; +import org.apache.ignite.compute.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.cluster.*; import org.apache.ignite.internal.processors.affinity.*; @@ -30,17 +31,17 @@ import org.apache.ignite.internal.processors.cache.version.*; import org.apache.ignite.internal.processors.datastreamer.*; import org.apache.ignite.internal.processors.task.*; import org.apache.ignite.internal.util.future.*; +import org.apache.ignite.internal.util.lang.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; -import org.apache.ignite.resources.*; import org.apache.ignite.transactions.*; import org.jetbrains.annotations.*; import java.io.*; import java.util.*; -import java.util.concurrent.*; -import static org.apache.ignite.internal.GridClosureCallMode.*; +import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.*; +import static org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.*; /** * Distributed cache implementation. @@ -142,21 +143,28 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter try { AffinityTopologyVersion topVer; + boolean retry; + + CacheOperationContext opCtx = ctx.operationContextPerCall(); + + boolean skipStore = opCtx != null && opCtx.skipStore(); + do { + retry = false; + topVer = ctx.affinity().affinityTopologyVersion(); // Send job to all data nodes. Collection<ClusterNode> nodes = ctx.grid().cluster().forDataNodes(name()).nodes(); if (!nodes.isEmpty()) { - CacheOperationContext opCtx = ctx.operationContextPerCall(); + ctx.kernalContext().task().setThreadContext(TC_SUBGRID, nodes); - ctx.closures().callAsyncNoFailover(BROADCAST, - new GlobalRemoveAllCallable<>(name(), topVer, opCtx != null && opCtx.skipStore()), nodes, - true).get(); + retry = !ctx.kernalContext().task().execute( + new RemoveAllTask(ctx.name(), topVer, skipStore), null).get(); } } - while (ctx.affinity().affinityTopologyVersion().compareTo(topVer) > 0); + while (ctx.affinity().affinityTopologyVersion().compareTo(topVer) != 0 || retry); } catch (ClusterGroupEmptyCheckedException ignore) { if (log.isDebugEnabled()) @@ -170,7 +178,11 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion(); - removeAllAsync(opFut, topVer); + CacheOperationContext opCtx = ctx.operationContextPerCall(); + + boolean skipStore = opCtx != null && opCtx.skipStore(); + + removeAllAsync(opFut, topVer, skipStore); return opFut; } @@ -178,27 +190,29 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter /** * @param opFut Future. * @param topVer Topology version. + * @param skipStore Skip store flag. */ - private void removeAllAsync(final GridFutureAdapter<Void> opFut, final AffinityTopologyVersion topVer) { + private void removeAllAsync(final GridFutureAdapter<Void> opFut, final AffinityTopologyVersion topVer, + final boolean skipStore) { Collection<ClusterNode> nodes = ctx.grid().cluster().forDataNodes(name()).nodes(); if (!nodes.isEmpty()) { - CacheOperationContext opCtx = ctx.operationContextPerCall(); + ctx.kernalContext().task().setThreadContext(TC_SUBGRID, nodes); - IgniteInternalFuture<?> rmvFut = ctx.closures().callAsyncNoFailover(BROADCAST, - new GlobalRemoveAllCallable<>(name(), topVer, opCtx != null && opCtx.skipStore()), nodes, true); + IgniteInternalFuture<Boolean> rmvAll = ctx.kernalContext().task().execute( + new RemoveAllTask(ctx.name(), topVer, skipStore), null); - rmvFut.listen(new IgniteInClosure<IgniteInternalFuture<?>>() { - @Override public void apply(IgniteInternalFuture<?> fut) { + rmvAll.listen(new IgniteInClosure<IgniteInternalFuture<Boolean>>() { + @Override public void apply(IgniteInternalFuture<Boolean> fut) { try { - fut.get(); + boolean retry = !fut.get(); AffinityTopologyVersion topVer0 = ctx.affinity().affinityTopologyVersion(); - if (topVer0.equals(topVer)) + if (topVer0.equals(topVer) && !retry) opFut.onDone(); else - removeAllAsync(opFut, topVer0); + removeAllAsync(opFut, topVer0, skipStore); } catch (ClusterGroupEmptyCheckedException ignore) { if (log.isDebugEnabled()) @@ -227,97 +241,150 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter } /** - * Internal callable which performs remove all primary key mappings - * operation on a cache with the given name. + * Remove task. */ @GridInternal - private static class GlobalRemoveAllCallable<K,V> implements Callable<Object>, Externalizable { + private static class RemoveAllTask extends ComputeTaskAdapter<Object, Boolean> { /** */ private static final long serialVersionUID = 0L; /** Cache name. */ - private String cacheName; + private final String cacheName; - /** Topology version. */ - private AffinityTopologyVersion topVer; + /** Affinity topology version. */ + private final AffinityTopologyVersion topVer; /** Skip store flag. */ - private boolean skipStore; - - /** Injected grid instance. */ - @IgniteInstanceResource - private Ignite ignite; + private final boolean skipStore; /** - * Empty constructor for serialization. + * @param cacheName Cache name. + * @param topVer Affinity topology version. + * @param skipStore Skip store flag. */ - public GlobalRemoveAllCallable() { - // No-op. + public RemoveAllTask(String cacheName, AffinityTopologyVersion topVer, boolean skipStore) { + this.cacheName = cacheName; + this.topVer = topVer; + this.skipStore = skipStore; + } + + /** {@inheritDoc} */ + @Nullable @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, + @Nullable Object arg) throws IgniteException { + Map<ComputeJob, ClusterNode> jobs = new HashMap(); + + for (ClusterNode node : subgrid) + jobs.put(new GlobalRemoveAllJob(cacheName, topVer, skipStore), node); + + return jobs; + } + + /** {@inheritDoc} */ + @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) { + IgniteException e = res.getException(); + + if (e != null) { + if (e instanceof ClusterTopologyException) + return ComputeJobResultPolicy.WAIT; + + throw new IgniteException("Remote job threw exception.", e); + } + + return ComputeJobResultPolicy.WAIT; + } + + /** {@inheritDoc} */ + @Nullable @Override public Boolean reduce(List<ComputeJobResult> results) throws IgniteException { + for (ComputeJobResult locRes : results) { + if (locRes != null && (locRes.getException() != null || !locRes.<Boolean>getData())) + return false; + } + + return true; } + } + /** + * Internal job which performs remove all primary key mappings + * operation on a cache with the given name. + */ + @GridInternal + private static class GlobalRemoveAllJob<K,V> extends TopologyVersionAwareJob { + /** */ + private static final long serialVersionUID = 0L; + + /** Skip store flag. */ + private final boolean skipStore; /** * @param cacheName Cache name. * @param topVer Topology version. * @param skipStore Skip store flag. */ - private GlobalRemoveAllCallable(String cacheName, @NotNull AffinityTopologyVersion topVer, boolean skipStore) { - this.cacheName = cacheName; - this.topVer = topVer; + private GlobalRemoveAllJob(String cacheName, @NotNull AffinityTopologyVersion topVer, boolean skipStore) { + super(cacheName, topVer); + this.skipStore = skipStore; } - /** - * {@inheritDoc} - */ - @Override public Object call() throws Exception { - GridCacheAdapter<K, V> cacheAdapter = ((IgniteKernal)ignite).context().cache().internalCache(cacheName); + /** {@inheritDoc} */ + @Nullable @Override public Object localExecute(@Nullable IgniteInternalCache cache0) { + GridCacheAdapter cache = ((IgniteKernal) ignite).context().cache().internalCache(cacheName); - final GridCacheContext<K, V> ctx = cacheAdapter.context(); + if (cache == null) + return true; - ctx.affinity().affinityReadyFuture(topVer).get(); + final GridCacheContext<K, V> ctx = cache.context(); ctx.gate().enter(); try { if (!ctx.affinity().affinityTopologyVersion().equals(topVer)) - return null; // Ignore this remove request because remove request will be sent again. + return false; // Ignore this remove request because remove request will be sent again. GridDhtCacheAdapter<K, V> dht; GridNearCacheAdapter<K, V> near = null; - if (cacheAdapter instanceof GridNearCacheAdapter) { - near = ((GridNearCacheAdapter<K, V>)cacheAdapter); + if (cache instanceof GridNearCacheAdapter) { + near = ((GridNearCacheAdapter<K, V>) cache); dht = near.dht(); } else - dht = (GridDhtCacheAdapter<K, V>)cacheAdapter; + dht = (GridDhtCacheAdapter<K, V>) cache; try (DataStreamerImpl<KeyCacheObject, Object> dataLdr = - (DataStreamerImpl)ignite.dataStreamer(cacheName)) { - ((DataStreamerImpl)dataLdr).maxRemapCount(0); + (DataStreamerImpl) ignite.dataStreamer(cacheName)) { + ((DataStreamerImpl) dataLdr).maxRemapCount(0); dataLdr.skipStore(skipStore); dataLdr.receiver(DataStreamerCacheUpdaters.<KeyCacheObject, Object>batched()); - for (GridDhtLocalPartition locPart : dht.topology().currentLocalPartitions()) { - if (!locPart.isEmpty() && locPart.primary(topVer)) { - for (GridDhtCacheEntry o : locPart.entries()) { - if (!o.obsoleteOrDeleted()) - dataLdr.removeDataInternal(o.key()); - } - } - } + for (int part : ctx.affinity().primaryPartitions(ctx.localNodeId(), topVer)) { + GridDhtLocalPartition locPart = dht.topology().localPartition(part, topVer, false); - Iterator<KeyCacheObject> it = dht.context().swap().offHeapKeyIterator(true, false, topVer); + if (locPart == null || (ctx.rebalanceEnabled() && locPart.state() != OWNING) || !locPart.reserve()) + return false; - while (it.hasNext()) - dataLdr.removeDataInternal(it.next()); + try { + if (!locPart.isEmpty()) { + for (GridDhtCacheEntry o : locPart.entries()) { + if (!o.obsoleteOrDeleted()) + dataLdr.removeDataInternal(o.key()); + } + } - it = dht.context().swap().swapKeyIterator(true, false, topVer); + GridCloseableIterator<Map.Entry<byte[], GridCacheSwapEntry>> iter = + dht.context().swap().iterator(part); - while (it.hasNext()) - dataLdr.removeDataInternal(it.next()); + if (iter != null) { + for (Map.Entry<byte[], GridCacheSwapEntry> e : iter) + dataLdr.removeDataInternal(ctx.toCacheKeyObject(e.getKey())); + } + } + finally { + locPart.release(); + } + } } if (near != null) { @@ -329,25 +396,14 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter } } } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } finally { ctx.gate().leave(); } - return null; - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - U.writeString(out, cacheName); - out.writeObject(topVer); - out.writeBoolean(skipStore); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - cacheName = U.readString(in); - topVer = (AffinityTopologyVersion)in.readObject(); - skipStore = in.readBoolean(); + return true; } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/593e3eee/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheRemoveAllSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheRemoveAllSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheRemoveAllSelfTest.java new file mode 100644 index 0000000..f5de96f --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheRemoveAllSelfTest.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.testframework.*; + +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +/** + * Test remove all method. + */ +public class CacheRemoveAllSelfTest extends GridCacheAbstractSelfTest { + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return 60000; + } + + /** {@inheritDoc} */ + @Override protected int gridCount() { + return 4; + } + + /** + * @throws Exception If failed. + */ + public void testRemoveAll() throws Exception { + IgniteCache<Integer, String> cache = grid(0).cache(null); + + for (int i = 0; i < 10_000; ++i) + cache.put(i, "val"); + + final AtomicInteger igniteId = new AtomicInteger(gridCount()); + + IgniteInternalFuture fut = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + for (int i = 0; i < 2; ++i) + startGrid(igniteId.getAndIncrement()); + + return true; + } + }, 3, "start-node-thread"); + + cache.removeAll(); + + fut.get(); + + U.sleep(5000); + + for (int i = 0; i < igniteId.get(); ++i) { + IgniteCache locCache = grid(i).cache(null); + + assertEquals("Local size: " + locCache.localSize() + "\n" + + "On heap: " + locCache.localSize(CachePeekMode.ONHEAP) + "\n" + + "Off heap: " + locCache.localSize(CachePeekMode.OFFHEAP) + "\n" + + "Swap: " + locCache.localSize(CachePeekMode.SWAP) + "\n" + + "Primary: " + locCache.localSize(CachePeekMode.PRIMARY) + "\n" + + "Backup: " + locCache.localSize(CachePeekMode.BACKUP), + 0, locCache.localSize()); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/593e3eee/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/NoneRebalanceModeSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/NoneRebalanceModeSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/NoneRebalanceModeSelfTest.java new file mode 100644 index 0000000..d61ddcc --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/NoneRebalanceModeSelfTest.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.near; + +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.cache.distributed.dht.*; +import org.apache.ignite.testframework.junits.common.*; + +import static org.apache.ignite.cache.CacheRebalanceMode.*; +import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.*; + +/** + * Test none rebalance mode. + */ +public class NoneRebalanceModeSelfTest extends GridCommonAbstractTest { + /** {@inheritDoc} */ + @SuppressWarnings({"ConstantConditions"}) + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration c = super.getConfiguration(gridName); + + CacheConfiguration cc = defaultCacheConfiguration(); + + cc.setRebalanceMode(NONE); + + c.setCacheConfiguration(cc); + + return c; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + startGrid(0); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + } + + /** + * @throws Exception If failed. + */ + public void testRemoveAll() throws Exception { + GridNearTransactionalCache cache = (GridNearTransactionalCache)((IgniteKernal)grid(0)).internalCache(null); + + for (GridDhtLocalPartition part : cache.dht().topology().localPartitions()) + assertEquals(MOVING, part.state()); + + grid(0).cache(null).removeAll(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/593e3eee/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java index dc3a2c0..5738778 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java @@ -58,6 +58,7 @@ public class IgniteCacheTestSuite2 extends TestSuite { suite.addTestSuite(GridCachePartitionedGetSelfTest.class); suite.addTest(new TestSuite(GridCachePartitionedBasicApiTest.class)); suite.addTest(new TestSuite(GridCacheNearMultiGetSelfTest.class)); + suite.addTest(new TestSuite(NoneRebalanceModeSelfTest.class)); suite.addTest(new TestSuite(GridCacheNearJobExecutionSelfTest.class)); suite.addTest(new TestSuite(GridCacheNearOneNodeSelfTest.class)); suite.addTest(new TestSuite(GridCacheNearMultiNodeSelfTest.class)); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/593e3eee/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java index 8eb0688..aaf7e5b 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java @@ -126,6 +126,8 @@ public class IgniteCacheTestSuite4 extends TestSuite { suite.addTestSuite(CacheNoValueClassOnServerNodeTest.class); + suite.addTestSuite(CacheRemoveAllSelfTest.class); + suite.addTestSuite(CacheOffheapMapEntrySelfTest.class); return suite;