Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-732 1b6f1e5b5 -> e50ba9efe (forced update)


ignite-732 IgniteCache.size() should not fail in case of topology changes


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/e50ba9ef
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/e50ba9ef
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/e50ba9ef

Branch: refs/heads/ignite-732
Commit: e50ba9efeb098dc0bc583d9d921cf715bac8feff
Parents: dfca76b
Author: agura <ag...@gridgain.com>
Authored: Wed Apr 29 22:20:57 2015 +0300
Committer: agura <ag...@gridgain.com>
Committed: Wed Apr 29 22:44:13 2015 +0300

----------------------------------------------------------------------
 .../ignite/compute/ComputeTaskAdapter.java      |  14 +-
 .../processors/cache/GridCacheAdapter.java      | 169 +++++++------------
 .../processors/task/GridTaskWorker.java         |   3 +-
 .../cache/GridCacheSizeTopologyChangedTest.java | 140 +++++++++++++++
 4 files changed, 208 insertions(+), 118 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e50ba9ef/modules/core/src/main/java/org/apache/ignite/compute/ComputeTaskAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/compute/ComputeTaskAdapter.java 
b/modules/core/src/main/java/org/apache/ignite/compute/ComputeTaskAdapter.java
index c2ad198..87081fc 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/compute/ComputeTaskAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/compute/ComputeTaskAdapter.java
@@ -24,15 +24,16 @@ import java.util.*;
 
 /**
  * Convenience adapter for {@link ComputeTask} interface. Here is an example of
- * how {@code GridComputeTaskAdapter} can be used:
+ * how {@code ComputeTaskAdapter} can be used:
  * <pre name="code" class="java">
- * public class MyFooBarTask extends GridComputeTaskAdapter&lt;String, 
String&gt; {
+ * public class MyFooBarTask extends ComputeTaskAdapter&lt;String, String&gt; {
  *     // Inject load balancer.
  *     &#64;LoadBalancerResource
  *     ComputeLoadBalancer balancer;
  *
  *     // Map jobs to grid nodes.
- *     public Map&lt;? extends ComputeJob, GridNode&gt; 
map(List&lt;GridNode&gt; subgrid, String arg) throws IgniteCheckedException {
+ *     public Map&lt;? extends ComputeJob, GridNode&gt; 
map(List&lt;GridNode&gt; subgrid, String arg)
+ *         throws IgniteCheckedException {
  *         Map&lt;MyFooBarJob, GridNode&gt; jobs = new HashMap&lt;MyFooBarJob, 
GridNode&gt;(subgrid.size());
  *
  *         // In more complex cases, you can actually do
@@ -76,8 +77,8 @@ public abstract class ComputeTaskAdapter<T, R> implements 
ComputeTask<T, R> {
      * <p>
      * If remote job resulted in exception ({@link 
ComputeJobResult#getException()} is not {@code null}),
      * then {@link ComputeJobResultPolicy#FAILOVER} policy will be returned if 
the exception is instance
-     * of {@link org.apache.ignite.cluster.ClusterTopologyException} or {@link 
ComputeExecutionRejectedException}, which means that
-     * remote node either failed or job execution was rejected before it got a 
chance to start. In all
+     * of {@link org.apache.ignite.cluster.ClusterTopologyException} or {@link 
ComputeExecutionRejectedException},
+     * which means that remote node either failed or job execution was 
rejected before it got a chance to start. In all
      * other cases the exception will be rethrown which will ultimately cause 
task to fail.
      *
      * @param res Received remote grid executable result.
@@ -87,7 +88,8 @@ public abstract class ComputeTaskAdapter<T, R> implements 
ComputeTask<T, R> {
      * @throws IgniteException If handling a job result caused an error 
effectively rejecting
      *      a failover. This exception will be thrown out of {@link 
ComputeTaskFuture#get()} method.
      */
-    @Override public ComputeJobResultPolicy result(ComputeJobResult res, 
List<ComputeJobResult> rcvd) throws IgniteException {
+    @Override public ComputeJobResultPolicy result(ComputeJobResult res, 
List<ComputeJobResult> rcvd)
+        throws IgniteException {
         IgniteException e = res.getException();
 
         // Try to failover if result is failed.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e50ba9ef/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 3f4e97b..ac508d7 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -882,7 +882,7 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
 
     /** {@inheritDoc} */
     @Override public Set<Cache.Entry<K, V>> entrySet() {
-        return entrySet((CacheEntryPredicate[]) null);
+        return entrySet((CacheEntryPredicate[])null);
     }
 
     /** {@inheritDoc} */
@@ -897,17 +897,17 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
 
     /** {@inheritDoc} */
     @Override public Set<K> keySet() {
-        return keySet((CacheEntryPredicate[]) null);
+        return keySet((CacheEntryPredicate[])null);
     }
 
     /** {@inheritDoc} */
     @Override public Set<K> primaryKeySet() {
-        return primaryKeySet((CacheEntryPredicate[]) null);
+        return primaryKeySet((CacheEntryPredicate[])null);
     }
 
     /** {@inheritDoc} */
     @Override public Collection<V> values() {
-        return values((CacheEntryPredicate[]) null);
+        return values((CacheEntryPredicate[])null);
     }
 
     /**
@@ -2117,7 +2117,7 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
                 throws IgniteCheckedException {
                 Map<? extends K, EntryProcessor<K, V, Object>> invokeMap = 
F.viewAsMap(keys,
                     new C1<K, EntryProcessor<K, V, Object>>() {
-                            @Override public EntryProcessor apply(K k) {
+                        @Override public EntryProcessor apply(K k) {
                             return entryProcessor;
                         }
                     });
@@ -2145,7 +2145,7 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
         IgniteInternalFuture<?> fut = asyncOp(new AsyncInOp() {
             @Override public IgniteInternalFuture<GridCacheReturn> 
inOp(IgniteTxLocalAdapter tx) {
                 Map<? extends K, EntryProcessor<K, V, Object>> invokeMap =
-                    Collections.singletonMap(key, (EntryProcessor<K, V, 
Object>) entryProcessor);
+                    Collections.singletonMap(key, (EntryProcessor<K, V, 
Object>)entryProcessor);
 
                 return tx.invokeAsync(ctx, invokeMap, args);
             }
@@ -2371,7 +2371,7 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
         IgniteInternalFuture<V> fut = asyncOp(new AsyncOp<V>() {
             @Override public IgniteInternalFuture<V> op(IgniteTxLocalAdapter 
tx) {
                 return tx.putAllAsync(ctx, F.t(key, val), true, null, -1, 
ctx.noValArray())
-                    
.chain((IgniteClosure<IgniteInternalFuture<GridCacheReturn>, V>) RET2VAL);
+                    
.chain((IgniteClosure<IgniteInternalFuture<GridCacheReturn>, V>)RET2VAL);
             }
 
             @Override public String toString() {
@@ -2526,7 +2526,7 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
         return asyncOp(new AsyncOp<Boolean>() {
             @Override public IgniteInternalFuture<Boolean> 
op(IgniteTxLocalAdapter tx) {
                 return tx.putAllAsync(ctx, F.t(key, val), false, null, -1, 
ctx.hasValArray()).chain(
-                    (IgniteClosure<IgniteInternalFuture<GridCacheReturn>, 
Boolean>) RET2FLAG);
+                    (IgniteClosure<IgniteInternalFuture<GridCacheReturn>, 
Boolean>)RET2FLAG);
             }
 
             @Override public String toString() {
@@ -2915,7 +2915,7 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
                 if (ctx.deploymentEnabled())
                     ctx.deploy().registerClass(oldVal);
 
-                return (GridCacheReturn) tx.putAllAsync(ctx,
+                return tx.putAllAsync(ctx,
                         F.t(key, newVal),
                         true,
                         null,
@@ -3017,7 +3017,7 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
                     ctx.deploy().registerClass(val);
 
                 return tx.removeAllAsync(ctx, Collections.singletonList(key), 
null, false,
-                        ctx.equalsValArray(val)).get().success();
+                    ctx.equalsValArray(val)).get().success();
             }
 
             @Override public String toString() {
@@ -3230,10 +3230,10 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
         TransactionConfiguration cfg = 
ctx.gridConfig().getTransactionConfiguration();
 
         return txStart(
-                concurrency,
-                isolation,
-                cfg.getDefaultTxTimeout(),
-                0
+            concurrency,
+            isolation,
+            cfg.getDefaultTxTimeout(),
+            0
         );
     }
 
@@ -3576,22 +3576,7 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
         if (nodes.isEmpty())
             return new GridFinishedFuture<>(0);
 
-        IgniteInternalFuture<Collection<Integer>> fut =
-            ctx.closures().broadcastNoFailover(new SizeCallable(ctx.name(), 
peekModes), null, nodes);
-
-        return fut.chain(new CX1<IgniteInternalFuture<Collection<Integer>>, 
Integer>() {
-            @Override public Integer 
applyx(IgniteInternalFuture<Collection<Integer>> fut)
-            throws IgniteCheckedException {
-                Collection<Integer> res = fut.get();
-
-                int totalSize = 0;
-
-                for (Integer size : res)
-                    totalSize += size;
-
-                return totalSize;
-            }
-        });
+        return new SizeFuture(peekModes, ctx, nodes);
     }
 
     /** {@inheritDoc} */
@@ -3675,7 +3660,7 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
         return F.iterator(iterator(),
             new IgniteClosure<Cache.Entry<K, V>, Cache.Entry<K, V>>() {
                 private IgniteCacheExpiryPolicy expiryPlc =
-                        ctx.cache().expiryPolicy(opCtx != null ? 
opCtx.expiry() : null);
+                    ctx.cache().expiryPolicy(opCtx != null ? opCtx.expiry() : 
null);
 
                 @Override public Cache.Entry<K, V> apply(Cache.Entry<K, V> 
lazyEntry) {
                     CacheOperationContext prev = ctx.gate().enter(opCtx);
@@ -3909,50 +3894,6 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
     }
 
     /**
-     * Gets cache global size (with or without backups).
-     *
-     * @param primaryOnly {@code True} if only primary sizes should be 
included.
-     * @return Global size.
-     * @throws IgniteCheckedException If internal task execution failed.
-     */
-    private int globalSize(boolean primaryOnly) throws IgniteCheckedException {
-        try {
-            // Send job to remote nodes only.
-            Collection<ClusterNode> nodes = 
ctx.grid().cluster().forCacheNodes(name()).forRemotes().nodes();
-
-            IgniteInternalFuture<Collection<Integer>> fut = null;
-
-            if (!nodes.isEmpty()) {
-                ctx.kernalContext().task().setThreadContext(TC_TIMEOUT, 
gridCfg.getNetworkTimeout());
-
-                fut = ctx.closures().broadcastNoFailover(new 
GlobalSizeCallable(name(), primaryOnly), null, nodes);
-            }
-
-            // Get local value.
-            int globalSize = primaryOnly ? primarySize() : size();
-
-            if (fut != null) {
-                for (Integer i : fut.get())
-                    globalSize += i;
-            }
-
-            return globalSize;
-        }
-        catch (ClusterGroupEmptyCheckedException ignore) {
-            if (log.isDebugEnabled())
-                log.debug("All remote nodes left while cache clearLocally 
[cacheName=" + name() + "]");
-
-            return primaryOnly ? primarySize() : size();
-        }
-        catch (ComputeTaskTimeoutCheckedException e) {
-            U.warn(log, "Timed out waiting for remote nodes to finish cache 
clear (consider increasing " +
-                "'networkTimeout' configuration property) [cacheName=" + 
name() + "]");
-
-            throw e;
-        }
-    }
-
-    /**
      * @param op Cache operation.
      * @param <T> Return type.
      * @return Operation result.
@@ -5082,57 +5023,63 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
     }
 
     /**
-     * Internal callable which performs {@link IgniteInternalCache#size()} or 
{@link IgniteInternalCache#primarySize()}
-     * operation on a cache with the given name.
+     * Cache size future.
      */
-    @GridInternal
-    private static class GlobalSizeCallable implements IgniteClosure<Object, 
Integer>, Externalizable {
-        /** */
-        private static final long serialVersionUID = 0L;
+    private static class SizeFuture extends GridFutureAdapter<Integer> {
+        /** Peek modes. */
+        private final CachePeekMode[] peekModes;
 
-        /** Cache name. */
-        private String cacheName;
+        /** Context. */
+        private final GridCacheContext ctx;
 
-        /** Primary only flag. */
-        private boolean primaryOnly;
+        /** Nodes. */
+        private final Collection<ClusterNode> nodes;
 
-        /** Injected grid instance. */
-        @IgniteInstanceResource
-        private Ignite ignite;
+        /** Max retries count before issuing an error. */
+        private int retries = 32;
 
         /**
-         * Empty constructor for serialization.
+         * @param peekModes Peek modes.
          */
-        public GlobalSizeCallable() {
-            // No-op.
+        public SizeFuture(CachePeekMode[] peekModes, GridCacheContext ctx, 
Collection<ClusterNode> nodes) {
+            this.peekModes = peekModes;
+            this.ctx = ctx;
+            this.nodes = nodes;
+
+            init();
         }
 
         /**
-         * @param cacheName Cache name.
-         * @param primaryOnly Primary only flag.
+         * Init.
          */
-        private GlobalSizeCallable(String cacheName, boolean primaryOnly) {
-            this.cacheName = cacheName;
-            this.primaryOnly = primaryOnly;
-        }
+        private void init() {
+            IgniteInternalFuture<Collection<Integer>> fut =
+                ctx.closures().broadcastNoFailover(new 
SizeCallable(ctx.name(), peekModes), null, nodes);
 
-        /** {@inheritDoc} */
-        @Override public Integer apply(Object o) {
-            IgniteInternalCache<Object, Object> cache = 
((IgniteEx)ignite).cachex(cacheName);
+            fut.listen(new 
IgniteInClosure<IgniteInternalFuture<Collection<Integer>>>() {
+                @Override public void 
apply(IgniteInternalFuture<Collection<Integer>> fut) {
+                    try {
+                        Collection<Integer> res = fut.get();
 
-            return primaryOnly ? cache.primarySize() : cache.size();
-        }
+                        int size = 0;
 
-        /** {@inheritDoc} */
-        @Override public void writeExternal(ObjectOutput out) throws 
IOException {
-            U.writeString(out, cacheName);
-            out.writeBoolean(primaryOnly);
-        }
+                        for (Integer locSize : res)
+                            size += locSize;
 
-        /** {@inheritDoc} */
-        @Override public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException {
-            cacheName = U.readString(in);
-            primaryOnly = in.readBoolean();
+                        onDone(size);
+                    }
+                    catch (IgniteCheckedException e) {
+                        if (X.hasCause(e, ClusterTopologyException.class, 
NullPointerException.class)) {
+                            if (retries-- > 0)
+                                init();
+                            else
+                                onDone(e);
+                        }
+                        else
+                            onDone(e);
+                    }
+                }
+            });
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e50ba9ef/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
index f6d686c..0f8b36b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
@@ -852,7 +852,8 @@ class GridTaskWorker<T, R> extends GridWorker implements 
GridTimeoutObject {
                 }
                 catch (IgniteException e) {
                     if (X.hasCause(e, GridInternalException.class) ||
-                        X.hasCause(e, IgfsOutOfSpaceException.class)) {
+                        X.hasCause(e, IgfsOutOfSpaceException.class) ||
+                        X.hasCause(e, ClusterTopologyException.class)) {
                         // Print internal exceptions only if debug is enabled.
                         if (log.isDebugEnabled())
                             U.error(log, "Failed to obtain remote job result 
policy for result from " +

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e50ba9ef/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheSizeTopologyChangedTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheSizeTopologyChangedTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheSizeTopologyChangedTest.java
new file mode 100644
index 0000000..031caf0
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheSizeTopologyChangedTest.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.testframework.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.*;
+import static org.apache.ignite.cache.CacheMode.*;
+
+/**
+ *
+ */
+public class GridCacheSizeTopologyChangedTest extends GridCommonAbstractTest {
+    /** Grids count */
+    private static int GRIDS_CNT = 15;
+
+    /** Keys count */
+    private static int KEYS_CNT = 10_000_000;
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        CacheConfiguration ccfg = new CacheConfiguration();
+
+        ccfg.setAtomicityMode(ATOMIC);
+
+        ccfg.setCacheMode(PARTITIONED);
+
+        ccfg.setBackups(1);
+
+        cfg.setCacheConfiguration(defaultCacheConfiguration());
+
+        return cfg;
+    }
+
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+    }
+
+    /**
+     * @throws Exception if failed
+     */
+    public void testCacheSize() throws Exception {
+        Ignite g0 = startGrid(0);
+        //Ignite g0 = startGrids(GRIDS_CNT);
+
+        final AtomicBoolean canceled = new AtomicBoolean();
+
+        final Random rnd = new Random();
+
+        final boolean[] status = new boolean[GRIDS_CNT];
+
+        IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable<Void>() 
{
+            @Override public Void call() throws Exception {
+                while(!canceled.get()) {
+                    int idx = rnd.nextInt(GRIDS_CNT);
+
+                    if (idx > 0) {
+                        boolean state = status[idx];
+
+                        if (state) {
+                            System.out.println("!!! STOP " + idx);
+                            stopGrid(idx);
+                        }
+                        else {
+                            System.out.println("!!! START " + idx);
+
+                            startGrid(idx);
+                        }
+
+                        status[idx] = !state;
+
+                        U.sleep(2000);
+                    }
+                }
+                return null;
+            }
+        });
+
+        try {
+            IgniteCache<Integer, Integer> cache = g0.cache(null);
+
+            for (int i = 0; i < KEYS_CNT; i++) {
+                cache.put(i, 0);
+
+                int size = cache.size();
+
+                if (i % 1000 == 0)
+                    System.out.println("!!! Key: " + i + ", size: " + size);
+
+                if (size == -1)
+                    System.out.println("!!! SIZE: " + size);
+            }
+
+            canceled.set(true);
+
+            Thread.sleep(5000);
+
+            System.out.println("!!! ASSERT");
+            assertEquals(KEYS_CNT, cache.size());
+        }
+        catch (Exception e) {
+            System.out.println("!!!!!");
+            e.printStackTrace();
+        }
+        finally {
+            canceled.set(true);
+            fut.get();
+        }
+    }
+
+}

Reply via email to