Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-417 [created] 314cc899a


# IGNITE-417 removeAll() throws IllegalStateException if remote node stops 
during removeAll() execution


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/314cc899
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/314cc899
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/314cc899

Branch: refs/heads/ignite-417
Commit: 314cc899aa377eea47e65acf8262db2379331bc5
Parents: 9b0ba86
Author: sevdokimov <sevdoki...@gridgain.com>
Authored: Fri Mar 6 18:50:37 2015 +0300
Committer: sevdokimov <sevdoki...@gridgain.com>
Committed: Fri Mar 6 18:50:37 2015 +0300

----------------------------------------------------------------------
 .../GridDistributedCacheAdapter.java            | 174 ++++++++++++++-----
 .../GridCacheRemoveAllMultithreadedTest.java    | 118 +++++++++++++
 2 files changed, 246 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/314cc899/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
index 00190d9..f8f8b92 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
@@ -19,6 +19,7 @@ package 
org.apache.ignite.internal.processors.cache.distributed;
 
 import org.apache.ignite.*;
 import org.apache.ignite.cluster.*;
+import org.apache.ignite.compute.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.cluster.*;
 import org.apache.ignite.internal.processors.cache.*;
@@ -49,6 +50,9 @@ public abstract class GridDistributedCacheAdapter<K, V> 
extends GridCacheAdapter
     /** */
     private static final long serialVersionUID = 0L;
 
+    /** */
+    private static final int MAX_REMOVE_ALL_ATTEMPTS = 50;
+
     /**
      * Empty constructor required by {@link Externalizable}.
      */
@@ -136,25 +140,58 @@ public abstract class GridDistributedCacheAdapter<K, V> 
extends GridCacheAdapter
 
     /** {@inheritDoc} */
     @Override public void removeAll() throws IgniteCheckedException {
-        try {
-            long topVer;
+        int attemptCnt = 0;
+
+        while (true) {
+            long topVer = ctx.discovery().topologyVersion();
+
+            IgniteInternalFuture<Long> fut = 
ctx.affinity().affinityReadyFuturex(topVer);
+            if (fut != null)
+                fut.get();
+
+            // Send job to all data nodes.
+            ClusterGroup cluster = ctx.grid().cluster().forDataNodes(name());
+
+            if (cluster.nodes().isEmpty())
+                break;
+
+            try {
+                Collection<Long> res = 
ctx.grid().compute(cluster).withNoFailover().broadcast(
+                    new GlobalRemoveAllCallable<>(name(), topVer));
+
+                Long max = Collections.max(res);
+
+                if (max > 0) {
+                    assert max > topVer;
+
+                    ctx.affinity().affinityReadyFuture(max).get();
+
+                    continue;
+                }
 
-            do {
-                topVer = ctx.affinity().affinityTopologyVersion();
+                if (res.contains(-1L)) {
+                    if (++attemptCnt > MAX_REMOVE_ALL_ATTEMPTS)
+                        throw new IgniteCheckedException("Failed to remove all 
entries.");
 
-                // Send job to all data nodes.
-                Collection<ClusterNode> nodes = 
ctx.grid().cluster().forDataNodes(name()).nodes();
+                    continue;
+                }
+            }
+            catch (ClusterGroupEmptyException ignore) {
+                if (log.isDebugEnabled())
+                    log.debug("All remote nodes left while cache remove 
[cacheName=" + name() + "]");
 
-                if (!nodes.isEmpty()) {
-                    ctx.closures().callAsyncNoFailover(BROADCAST,
-                        new GlobalRemoveAllCallable<>(name(), topVer), nodes, 
true).get();
+                break;
+            }
+            catch (ClusterTopologyException e) {
+                // GlobalRemoveAllCallable was sent to node that has left.
+                if (topVer == ctx.discovery().topologyVersion()) {
+                    // Node was not left, some other error has occurs.
+                    throw e;
                 }
             }
-            while (ctx.affinity().affinityTopologyVersion() > topVer);
-        }
-        catch (ClusterGroupEmptyCheckedException ignore) {
-            if (log.isDebugEnabled())
-                log.debug("All remote nodes left while cache remove 
[cacheName=" + name() + "]");
+
+            if (topVer == ctx.discovery().topologyVersion())
+                break;
         }
     }
 
@@ -162,44 +199,79 @@ public abstract class GridDistributedCacheAdapter<K, V> 
extends GridCacheAdapter
     @Override public IgniteInternalFuture<?> removeAllAsync() {
         GridFutureAdapter<Void> opFut = new 
GridFutureAdapter<>(ctx.kernalContext());
 
-        long topVer = ctx.affinity().affinityTopologyVersion();
-
-        removeAllAsync(opFut, topVer);
+        removeAllAsync(opFut, 0);
 
         return opFut;
     }
 
     /**
      * @param opFut Future.
-     * @param topVer Topology version.
+     * @param attemptCnt Attempts count.
      */
-    private void removeAllAsync(final GridFutureAdapter<Void> opFut, final 
long topVer) {
-        Collection<ClusterNode> nodes = 
ctx.grid().cluster().forDataNodes(name()).nodes();
+    private void removeAllAsync(final GridFutureAdapter<Void> opFut, final int 
attemptCnt) {
+        final long topVer = ctx.affinity().affinityTopologyVersion();
+
+        ClusterGroup cluster = ctx.grid().cluster().forDataNodes(name());
+
+        if (cluster.nodes().isEmpty())
+            opFut.onDone();
+        else {
+            IgniteCompute computeAsync = 
ctx.grid().compute(cluster).withNoFailover().withAsync();
+
+            computeAsync.broadcast(new GlobalRemoveAllCallable<>(name(), 
topVer));
 
-        if (!nodes.isEmpty()) {
-            IgniteInternalFuture<?> rmvFut = 
ctx.closures().callAsyncNoFailover(BROADCAST,
-                    new GlobalRemoveAllCallable<>(name(), topVer), nodes, 
true);
+            ComputeTaskFuture<Collection<Long>> fut = computeAsync.future();
 
-            rmvFut.listenAsync(new IgniteInClosure<IgniteInternalFuture<?>>() {
-                @Override public void apply(IgniteInternalFuture<?> fut) {
+            fut.listenAsync(new 
IgniteInClosure<IgniteFuture<Collection<Long>>>() {
+                @Override public void apply(IgniteFuture<Collection<Long>> 
fut) {
                     try {
-                        fut.get();
+                        Collection<Long> res = fut.get();
+
+                        Long max = Collections.max(res);
 
-                        long topVer0 = 
ctx.affinity().affinityTopologyVersion();
+                        if (max > 0) {
+                            assert max > topVer;
 
-                        if (topVer0 == topVer)
-                            opFut.onDone();
-                        else
-                            removeAllAsync(opFut, topVer0);
+                            try {
+                                ctx.affinity().affinityReadyFuture(max).get();
+
+                                removeAllAsync(opFut, attemptCnt);
+                            }
+                            catch (IgniteCheckedException e) {
+                                opFut.onDone(e);
+                            }
+
+                            return;
+                        }
+
+                        if (res.contains(-1L)) {
+                            if (attemptCnt >= MAX_REMOVE_ALL_ATTEMPTS)
+                                opFut.onDone(new 
IgniteCheckedException("Failed to remove all entries."));
+                            else
+                                removeAllAsync(opFut, attemptCnt + 1);
+
+                            return;
+                        }
+
+                        if (topVer != ctx.affinity().affinityTopologyVersion())
+                            removeAllAsync(opFut, attemptCnt);
                     }
-                    catch (ClusterGroupEmptyCheckedException ignore) {
+                    catch (ClusterGroupEmptyException ignore) {
                         if (log.isDebugEnabled())
                             log.debug("All remote nodes left while cache 
remove [cacheName=" + name() + "]");
 
                         opFut.onDone();
                     }
-                    catch (IgniteCheckedException e) {
-                        opFut.onDone(e);
+                    catch (ClusterTopologyException e) {
+                        // GlobalRemoveAllCallable was sent to node that has 
left.
+                        if (topVer == ctx.discovery().topologyVersion()) {
+                            // Node was not left, some other error has occurs.
+                            opFut.onDone(e);
+
+                            return;
+                        }
+
+                        removeAllAsync(opFut, attemptCnt + 1);
                     }
                     catch (Error e) {
                         opFut.onDone(e);
@@ -209,8 +281,6 @@ public abstract class GridDistributedCacheAdapter<K, V> 
extends GridCacheAdapter
                 }
             });
         }
-        else
-            opFut.onDone();
     }
 
     /** {@inheritDoc} */
@@ -223,7 +293,7 @@ public abstract class GridDistributedCacheAdapter<K, V> 
extends GridCacheAdapter
      * operation on a cache with the given name.
      */
     @GridInternal
-    private static class GlobalRemoveAllCallable<K,V> implements 
Callable<Object>, Externalizable {
+    private static class GlobalRemoveAllCallable<K,V> implements 
IgniteCallable<Long>, Externalizable {
         /** */
         private static final long serialVersionUID = 0L;
 
@@ -253,21 +323,22 @@ public abstract class GridDistributedCacheAdapter<K, V> 
extends GridCacheAdapter
             this.topVer = topVer;
         }
 
-        /**
-         * {@inheritDoc}
-         */
-        @Override public Object call() throws Exception {
+        /** {@inheritDoc} */
+        @Override public Long call() throws Exception {
             GridCacheAdapter<K, V> cacheAdapter = 
((IgniteKernal)ignite).context().cache().internalCache(cacheName);
 
             final GridCacheContext<K, V> ctx = cacheAdapter.context();
 
-            ctx.affinity().affinityReadyFuture(topVer).get();
+            IgniteInternalFuture<Long> topVerFut = 
ctx.affinity().affinityReadyFuture(topVer);
+
+            if (topVerFut != null)
+                topVerFut.get();
 
             ctx.gate().enter();
 
             try {
-                if (ctx.affinity().affinityTopologyVersion() != topVer)
-                    return null; // Ignore this remove request because remove 
request will be sent again.
+                if (ctx.affinity().affinityTopologyVersion() > topVer)
+                    return ctx.affinity().affinityTopologyVersion();
 
                 GridDhtCacheAdapter<K, V> dht;
 
@@ -299,13 +370,24 @@ public abstract class GridDistributedCacheAdapter<K, V> 
extends GridCacheAdapter
 
                     while (it.hasNext())
                         dataLdr.removeData(it.next().getKey());
+
+                    return 0L; // 0 means remove completer successfully.
+                }
+                catch (IgniteException e) {
+                    if (e instanceof ClusterTopologyException
+                        || e.hasCause(ClusterTopologyCheckedException.class, 
ClusterTopologyException.class))
+                        return -1L;
+
+                    throw e;
+                }
+                catch (IllegalStateException ignored) {
+                    // Looks like node is about stop.
+                    return -1L; // -1 means request should be resend.
                 }
             }
             finally {
                 ctx.gate().leave();
             }
-
-            return null;
         }
 
         /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/314cc899/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheRemoveAllMultithreadedTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheRemoveAllMultithreadedTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheRemoveAllMultithreadedTest.java
new file mode 100644
index 0000000..49245cc
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheRemoveAllMultithreadedTest.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.testframework.*;
+
+import java.util.*;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.*;
+import static org.apache.ignite.cache.CacheMode.*;
+
+/**
+ * Base test for all multithreaded cache scenarios w/ and w/o failover.
+ */
+public class GridCacheRemoveAllMultithreadedTest extends 
GridCacheAbstractSelfTest {
+    /** {@inheritDoc} */
+    @Override protected int gridCount() {
+        return 2;
+    }
+
+    /**
+     * @return Cache mode.
+     */
+    @Override protected CacheMode cacheMode() {
+        return PARTITIONED;
+    }
+
+    /**
+     * @return Cache atomicity mode.
+     */
+    @Override protected CacheAtomicityMode atomicityMode() {
+        return ATOMIC;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected long getTestTimeout() {
+        return Long.MAX_VALUE;
+    }
+
+    /**
+     * Actual test.
+     *
+     * @throws Exception If failed.
+     */
+    public void testRemoveAll() throws Exception {
+        final Object mux = new Object();
+
+        Thread t = new GridTestThread(new Runnable() {
+            @Override public void run() {
+                try {
+                    while (!Thread.interrupted()) {
+
+                        startGrid(3);
+
+                        synchronized (mux) {
+                            stopGrid(3);
+                        }
+                    }
+                }
+                catch (IgniteInterruptedCheckedException ignored) {
+                    // Test stopped.
+                }
+                catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        });
+        t.start();
+
+        try {
+            long endTime = System.currentTimeMillis() + 60 * 1000;
+
+            Random rnd = new Random();
+
+            while (endTime > System.currentTimeMillis()) {
+                synchronized (mux) {
+                    try (IgniteDataLoader<Integer, Integer> ldr = 
ignite(0).dataLoader(null)) {
+                        for (int i = 0; i < 1000; i++)
+                            ldr.addData(i, i);
+                    }
+                }
+
+                jcache(0).removeAll();
+
+                for (int i = 0; i < gridCount(); i++) {
+                    int locSize = jcache(i).localSize(CachePeekMode.ALL);
+
+                    assert locSize == 0 : locSize;
+                }
+            }
+        }
+        finally {
+            t.interrupt();
+
+            t.join();
+        }
+
+    }
+}

Reply via email to