Repository: incubator-ignite Updated Branches: refs/heads/ignite-417 [created] 314cc899a
# IGNITE-417 removeAll() throws IllegalStateException if remote node stops during removeAll() execution Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/314cc899 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/314cc899 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/314cc899 Branch: refs/heads/ignite-417 Commit: 314cc899aa377eea47e65acf8262db2379331bc5 Parents: 9b0ba86 Author: sevdokimov <sevdoki...@gridgain.com> Authored: Fri Mar 6 18:50:37 2015 +0300 Committer: sevdokimov <sevdoki...@gridgain.com> Committed: Fri Mar 6 18:50:37 2015 +0300 ---------------------------------------------------------------------- .../GridDistributedCacheAdapter.java | 174 ++++++++++++++----- .../GridCacheRemoveAllMultithreadedTest.java | 118 +++++++++++++ 2 files changed, 246 insertions(+), 46 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/314cc899/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java index 00190d9..f8f8b92 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed; import org.apache.ignite.*; import org.apache.ignite.cluster.*; +import org.apache.ignite.compute.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.cluster.*; import org.apache.ignite.internal.processors.cache.*; @@ -49,6 +50,9 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter /** */ private static final long serialVersionUID = 0L; + /** */ + private static final int MAX_REMOVE_ALL_ATTEMPTS = 50; + /** * Empty constructor required by {@link Externalizable}. */ @@ -136,25 +140,58 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter /** {@inheritDoc} */ @Override public void removeAll() throws IgniteCheckedException { - try { - long topVer; + int attemptCnt = 0; + + while (true) { + long topVer = ctx.discovery().topologyVersion(); + + IgniteInternalFuture<Long> fut = ctx.affinity().affinityReadyFuturex(topVer); + if (fut != null) + fut.get(); + + // Send job to all data nodes. + ClusterGroup cluster = ctx.grid().cluster().forDataNodes(name()); + + if (cluster.nodes().isEmpty()) + break; + + try { + Collection<Long> res = ctx.grid().compute(cluster).withNoFailover().broadcast( + new GlobalRemoveAllCallable<>(name(), topVer)); + + Long max = Collections.max(res); + + if (max > 0) { + assert max > topVer; + + ctx.affinity().affinityReadyFuture(max).get(); + + continue; + } - do { - topVer = ctx.affinity().affinityTopologyVersion(); + if (res.contains(-1L)) { + if (++attemptCnt > MAX_REMOVE_ALL_ATTEMPTS) + throw new IgniteCheckedException("Failed to remove all entries."); - // Send job to all data nodes. - Collection<ClusterNode> nodes = ctx.grid().cluster().forDataNodes(name()).nodes(); + continue; + } + } + catch (ClusterGroupEmptyException ignore) { + if (log.isDebugEnabled()) + log.debug("All remote nodes left while cache remove [cacheName=" + name() + "]"); - if (!nodes.isEmpty()) { - ctx.closures().callAsyncNoFailover(BROADCAST, - new GlobalRemoveAllCallable<>(name(), topVer), nodes, true).get(); + break; + } + catch (ClusterTopologyException e) { + // GlobalRemoveAllCallable was sent to node that has left. + if (topVer == ctx.discovery().topologyVersion()) { + // Node was not left, some other error has occurs. + throw e; } } - while (ctx.affinity().affinityTopologyVersion() > topVer); - } - catch (ClusterGroupEmptyCheckedException ignore) { - if (log.isDebugEnabled()) - log.debug("All remote nodes left while cache remove [cacheName=" + name() + "]"); + + if (topVer == ctx.discovery().topologyVersion()) + break; } } @@ -162,44 +199,79 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter @Override public IgniteInternalFuture<?> removeAllAsync() { GridFutureAdapter<Void> opFut = new GridFutureAdapter<>(ctx.kernalContext()); - long topVer = ctx.affinity().affinityTopologyVersion(); - - removeAllAsync(opFut, topVer); + removeAllAsync(opFut, 0); return opFut; } /** * @param opFut Future. - * @param topVer Topology version. + * @param attemptCnt Attempts count. */ - private void removeAllAsync(final GridFutureAdapter<Void> opFut, final long topVer) { - Collection<ClusterNode> nodes = ctx.grid().cluster().forDataNodes(name()).nodes(); + private void removeAllAsync(final GridFutureAdapter<Void> opFut, final int attemptCnt) { + final long topVer = ctx.affinity().affinityTopologyVersion(); + + ClusterGroup cluster = ctx.grid().cluster().forDataNodes(name()); + + if (cluster.nodes().isEmpty()) + opFut.onDone(); + else { + IgniteCompute computeAsync = ctx.grid().compute(cluster).withNoFailover().withAsync(); + + computeAsync.broadcast(new GlobalRemoveAllCallable<>(name(), topVer)); - if (!nodes.isEmpty()) { - IgniteInternalFuture<?> rmvFut = ctx.closures().callAsyncNoFailover(BROADCAST, - new GlobalRemoveAllCallable<>(name(), topVer), nodes, true); + ComputeTaskFuture<Collection<Long>> fut = computeAsync.future(); - rmvFut.listenAsync(new IgniteInClosure<IgniteInternalFuture<?>>() { - @Override public void apply(IgniteInternalFuture<?> fut) { + fut.listenAsync(new IgniteInClosure<IgniteFuture<Collection<Long>>>() { + @Override public void apply(IgniteFuture<Collection<Long>> fut) { try { - fut.get(); + Collection<Long> res = fut.get(); + + Long max = Collections.max(res); - long topVer0 = ctx.affinity().affinityTopologyVersion(); + if (max > 0) { + assert max > topVer; - if (topVer0 == topVer) - opFut.onDone(); - else - removeAllAsync(opFut, topVer0); + try { + ctx.affinity().affinityReadyFuture(max).get(); + + removeAllAsync(opFut, attemptCnt); + } + catch (IgniteCheckedException e) { + opFut.onDone(e); + } + + return; + } + + if (res.contains(-1L)) { + if (attemptCnt >= MAX_REMOVE_ALL_ATTEMPTS) + opFut.onDone(new IgniteCheckedException("Failed to remove all entries.")); + else + removeAllAsync(opFut, attemptCnt + 1); + + return; + } + + if (topVer != ctx.affinity().affinityTopologyVersion()) + removeAllAsync(opFut, attemptCnt); } - catch (ClusterGroupEmptyCheckedException ignore) { + catch (ClusterGroupEmptyException ignore) { if (log.isDebugEnabled()) log.debug("All remote nodes left while cache remove [cacheName=" + name() + "]"); opFut.onDone(); } - catch (IgniteCheckedException e) { - opFut.onDone(e); + catch (ClusterTopologyException e) { + // GlobalRemoveAllCallable was sent to node that has left. + if (topVer == ctx.discovery().topologyVersion()) { + // Node was not left, some other error has occurs. + opFut.onDone(e); + + return; + } + + removeAllAsync(opFut, attemptCnt + 1); } catch (Error e) { opFut.onDone(e); @@ -209,8 +281,6 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter } }); } - else - opFut.onDone(); } /** {@inheritDoc} */ @@ -223,7 +293,7 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter * operation on a cache with the given name. */ @GridInternal - private static class GlobalRemoveAllCallable<K,V> implements Callable<Object>, Externalizable { + private static class GlobalRemoveAllCallable<K,V> implements IgniteCallable<Long>, Externalizable { /** */ private static final long serialVersionUID = 0L; @@ -253,21 +323,22 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter this.topVer = topVer; } - /** - * {@inheritDoc} - */ - @Override public Object call() throws Exception { + /** {@inheritDoc} */ + @Override public Long call() throws Exception { GridCacheAdapter<K, V> cacheAdapter = ((IgniteKernal)ignite).context().cache().internalCache(cacheName); final GridCacheContext<K, V> ctx = cacheAdapter.context(); - ctx.affinity().affinityReadyFuture(topVer).get(); + IgniteInternalFuture<Long> topVerFut = ctx.affinity().affinityReadyFuture(topVer); + + if (topVerFut != null) + topVerFut.get(); ctx.gate().enter(); try { - if (ctx.affinity().affinityTopologyVersion() != topVer) - return null; // Ignore this remove request because remove request will be sent again. + if (ctx.affinity().affinityTopologyVersion() > topVer) + return ctx.affinity().affinityTopologyVersion(); GridDhtCacheAdapter<K, V> dht; @@ -299,13 +370,24 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter while (it.hasNext()) dataLdr.removeData(it.next().getKey()); + + return 0L; // 0 means remove completer successfully. + } + catch (IgniteException e) { + if (e instanceof ClusterTopologyException + || e.hasCause(ClusterTopologyCheckedException.class, ClusterTopologyException.class)) + return -1L; + + throw e; + } + catch (IllegalStateException ignored) { + // Looks like node is about stop. + return -1L; // -1 means request should be resend. } } finally { ctx.gate().leave(); } - - return null; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/314cc899/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheRemoveAllMultithreadedTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheRemoveAllMultithreadedTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheRemoveAllMultithreadedTest.java new file mode 100644 index 0000000..49245cc --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheRemoveAllMultithreadedTest.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.testframework.*; + +import java.util.*; + +import static org.apache.ignite.cache.CacheAtomicityMode.*; +import static org.apache.ignite.cache.CacheMode.*; + +/** + * Base test for all multithreaded cache scenarios w/ and w/o failover. + */ +public class GridCacheRemoveAllMultithreadedTest extends GridCacheAbstractSelfTest { + /** {@inheritDoc} */ + @Override protected int gridCount() { + return 2; + } + + /** + * @return Cache mode. + */ + @Override protected CacheMode cacheMode() { + return PARTITIONED; + } + + /** + * @return Cache atomicity mode. + */ + @Override protected CacheAtomicityMode atomicityMode() { + return ATOMIC; + } + + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return Long.MAX_VALUE; + } + + /** + * Actual test. + * + * @throws Exception If failed. + */ + public void testRemoveAll() throws Exception { + final Object mux = new Object(); + + Thread t = new GridTestThread(new Runnable() { + @Override public void run() { + try { + while (!Thread.interrupted()) { + + startGrid(3); + + synchronized (mux) { + stopGrid(3); + } + } + } + catch (IgniteInterruptedCheckedException ignored) { + // Test stopped. + } + catch (Exception e) { + e.printStackTrace(); + } + } + }); + t.start(); + + try { + long endTime = System.currentTimeMillis() + 60 * 1000; + + Random rnd = new Random(); + + while (endTime > System.currentTimeMillis()) { + synchronized (mux) { + try (IgniteDataLoader<Integer, Integer> ldr = ignite(0).dataLoader(null)) { + for (int i = 0; i < 1000; i++) + ldr.addData(i, i); + } + } + + jcache(0).removeAll(); + + for (int i = 0; i < gridCount(); i++) { + int locSize = jcache(i).localSize(CachePeekMode.ALL); + + assert locSize == 0 : locSize; + } + } + } + finally { + t.interrupt(); + + t.join(); + } + + } +}