#ignite-732: wip.

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/b65aa1d9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/b65aa1d9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/b65aa1d9

Branch: refs/heads/ignite-732
Commit: b65aa1d97730559864e464fff696bfa936f4117f
Parents: ec4218e
Author: ivasilinets <ivasilin...@gridgain.com>
Authored: Thu Apr 30 17:47:57 2015 +0300
Committer: ivasilinets <ivasilin...@gridgain.com>
Committed: Thu Apr 30 17:47:57 2015 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheAdapter.java      | 323 ++++++++++---------
 .../processors/cache/GridCacheProcessor.java    |   2 +
 .../cache/GridCacheSizeTopologyChangedTest.java |  23 +-
 3 files changed, 186 insertions(+), 162 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b65aa1d9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index a574103..46d248a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -75,12 +75,12 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
     /** */
     private static final long serialVersionUID = 0L;
 
+    /** Filed result. */
+    private static final Object FAIL = new Integer(-1);
+
     /** clearLocally() split threshold. */
     public static final int CLEAR_ALL_SPLIT_THRESHOLD = 10000;
 
-    /** Filed result. */
-    protected static final GridCacheReturn FAIL = new GridCacheReturn(false, 
false);
-
     /** Deserialization stash. */
     private static final ThreadLocal<IgniteBiTuple<String, String>> stash = 
new ThreadLocal<IgniteBiTuple<String,
         String>>() {
@@ -1086,7 +1086,7 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
         // Clear local cache synchronously.
         clearLocally();
 
-        clearRemotes(0, new GlobalClearAllCallable(name(), 
ctx.affinity().affinityTopologyVersion()));
+        clearRemotes(0, new GlobalRemoteClearAllCallable(name(), 
ctx.affinity().affinityTopologyVersion()));
     }
 
     /** {@inheritDoc} */
@@ -1094,7 +1094,7 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
         // Clear local cache synchronously.
         clearLocally(key);
 
-        clearRemotes(0, new GlobalClearKeySetCallable<K, V>(name(), 
ctx.affinity().affinityTopologyVersion(),
+        clearRemotes(0, new GlobalRemoteClearKeySetCallable<K, V>(name(), 
ctx.affinity().affinityTopologyVersion(),
             Collections.singleton(key)));
     }
 
@@ -1103,7 +1103,7 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
         // Clear local cache synchronously.
         clearLocallyAll(keys);
 
-        clearRemotes(0, new GlobalClearKeySetCallable<K, V>(name(), 
ctx.affinity().affinityTopologyVersion(),
+        clearRemotes(0, new GlobalRemoteClearKeySetCallable<K, V>(name(), 
ctx.affinity().affinityTopologyVersion(),
             keys));
     }
 
@@ -4843,7 +4843,7 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
      * Internal callable which performs clear operation on a cache with the 
given name.
      */
     @GridInternal
-    private static abstract class GlobalClearCallable extends 
VersionComparable implements Callable<Object> {
+    private static abstract class GlobalClearCallable extends 
VersionAwareCallable {
         /**
          * Empty constructor for serialization.
          */
@@ -4858,30 +4858,20 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
         protected GlobalClearCallable(String cacheName, 
AffinityTopologyVersion topVer) {
             super(cacheName, topVer);
         }
-
-        /** {@inheritDoc} */
-        @Override public void writeExternal(ObjectOutput out) throws 
IOException {
-            super.writeExternal(out);
-        }
-
-        /** {@inheritDoc} */
-        @Override public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException {
-            super.readExternal(in);
-        }
     }
 
     /**
      * Global clear all.
      */
     @GridInternal
-    private static class GlobalClearAllCallable extends GlobalClearCallable {
+    private static class GlobalRemoteClearAllCallable extends 
GlobalClearCallable {
         /** */
         private static final long serialVersionUID = 0L;
 
         /**
          * Empty constructor for serialization.
          */
-        public GlobalClearAllCallable() {
+        public GlobalRemoteClearAllCallable() {
             // No-op.
         }
 
@@ -4889,22 +4879,43 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
          * @param cacheName Cache name.
          * @param topVer Affinity topology version.
          */
-        private GlobalClearAllCallable(String cacheName, 
AffinityTopologyVersion topVer) {
+        private GlobalRemoteClearAllCallable(String cacheName, 
AffinityTopologyVersion topVer) {
             super(cacheName, topVer);
         }
 
         /** {@inheritDoc} */
-        @Override public Object call() throws Exception {
-            if (!compareTopologyVersions())
-                return FAIL;
-
+        @Override protected Object callLocal() {
             ((IgniteEx)ignite).cachex(cacheName).clearLocally();
 
-            if (!compareTopologyVersions())
-                return FAIL;
-
             return null;
         }
+
+        /** {@inheritDoc} */
+        @Override protected Collection<ClusterNode> nodes(GridCacheContext 
ctx) {
+            return ctx.grid().cluster().forCacheNodes(ctx.name(), true, true, 
false).forRemotes().nodes();
+        }
+    }
+
+    /**
+     * Global clear all.
+     */
+    @GridInternal
+    private static class GlobalClearAllCallable extends 
GlobalRemoteClearAllCallable {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /**
+         * @param cacheName Cache name.
+         * @param topVer Affinity topology version.
+         */
+        private GlobalClearAllCallable(String cacheName, 
AffinityTopologyVersion topVer) {
+            super(cacheName, topVer);
+        }
+
+        /** {@inheritDoc} */
+        @Override protected Collection<ClusterNode> nodes(GridCacheContext 
ctx) {
+            return ctx.grid().cluster().forCacheNodes(ctx.name(), true, true, 
false).nodes();
+        }
     }
 
     /**
@@ -4930,34 +4941,42 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
          * @param topVer Affinity topology version.
          * @param keys Keys to clear.
          */
-        private GlobalClearKeySetCallable(String cacheName, 
AffinityTopologyVersion topVer, Set<? extends K> keys) {
+        protected GlobalClearKeySetCallable(String cacheName, 
AffinityTopologyVersion topVer, Set<? extends K> keys) {
             super(cacheName, topVer);
 
             this.keys = keys;
         }
 
         /** {@inheritDoc} */
-        @Override public Object call() throws Exception {
-            if (!compareTopologyVersions())
-                return FAIL;
+        @Override protected Collection<ClusterNode> nodes(GridCacheContext 
ctx) {
+            return ctx.grid().cluster().forCacheNodes(ctx.name(), true, true, 
false).nodes();
+        }
 
+        /** {@inheritDoc} */
+        @Override protected Object callLocal() {
             ((IgniteEx)ignite).<K, V>cachex(cacheName).clearLocallyAll(keys);
 
             return null;
         }
+    }
 
-        /** {@inheritDoc} */
-        @Override public void writeExternal(ObjectOutput out) throws 
IOException {
-            super.writeExternal(out);
-
-            out.writeObject(keys);
+    /**
+     * Global clear keys.
+     */
+    @GridInternal
+    private static class GlobalRemoteClearKeySetCallable<K, V> extends 
GlobalClearKeySetCallable {
+        /**
+         * @param cacheName Cache name.
+         * @param topVer Affinity topology version.
+         * @param keys Keys to clear.
+         */
+        protected GlobalRemoteClearKeySetCallable(String cacheName, 
AffinityTopologyVersion topVer, Set<? extends K> keys) {
+            super(cacheName, topVer, keys);
         }
 
         /** {@inheritDoc} */
-        @Override public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException {
-            super.readExternal(in);
-
-            keys = (Set<K>) in.readObject();
+        @Override protected Collection<ClusterNode> nodes(GridCacheContext 
ctx) {
+            return ctx.grid().cluster().forCacheNodes(ctx.name(), true, true, 
false).forRemotes().nodes();
         }
     }
 
@@ -4965,13 +4984,16 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
      * Internal callable for global size calculation.
      */
     @GridInternal
-    private static class SizeCallable extends VersionComparable implements 
IgniteClosure<Object, Integer> {
+    private static class SizeCallable extends VersionAwareCallable {
         /** */
         private static final long serialVersionUID = 0L;
 
         /** Peek modes. */
         private CachePeekMode[] peekModes;
 
+        /** Near enable. */
+        private boolean nearEnable;
+
         /**
          * Required by {@link Externalizable}.
          */
@@ -4984,18 +5006,16 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
          * @param topVer Affinity topology version.
          * @param peekModes Cache peek modes.
          */
-        private SizeCallable(String cacheName, AffinityTopologyVersion topVer, 
CachePeekMode[] peekModes) {
+        private SizeCallable(String cacheName, AffinityTopologyVersion topVer, 
CachePeekMode[] peekModes, boolean nearEnable) {
             super(cacheName, topVer);
 
             this.peekModes = peekModes;
+            this.nearEnable = nearEnable;
         }
 
         /** {@inheritDoc} */
-        @Override public Integer apply(Object o) {
+        @Override protected Object callLocal() {
             try {
-                if (!compareTopologyVersions())
-                    return -1;
-
                 IgniteInternalCache<Object, Object> cache = 
((IgniteEx)ignite).cachex(cacheName);
 
                 return cache == null ? 0 : cache.localSize(peekModes);
@@ -5006,26 +5026,12 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
         }
 
         /** {@inheritDoc} */
-        @SuppressWarnings("ForLoopReplaceableByForEach")
-        @Override public void writeExternal(ObjectOutput out) throws 
IOException {
-            super.writeExternal(out);
-
-            out.writeInt(peekModes.length);
-
-            for (int i = 0; i < peekModes.length; i++)
-                U.writeEnum(out, peekModes[i]);
-        }
-
-        /** {@inheritDoc} */
-        @Override public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException {
-            super.readExternal(in);
-
-            int len = in.readInt();
+        @Override protected Collection<ClusterNode> nodes(GridCacheContext 
ctx) {
+            IgniteClusterEx cluster = ctx.grid().cluster();
 
-            peekModes = new CachePeekMode[len];
+            ClusterGroup grp = nearEnable ? cluster.forCacheNodes(ctx.name(), 
true, true, false) : cluster.forDataNodes(ctx.name());
 
-            for (int i = 0; i < len; i++)
-                peekModes[i] = CachePeekMode.fromOrdinal(in.readByte());
+            return grp.nodes();
         }
 
         /** {@inheritDoc} */
@@ -5037,102 +5043,78 @@ public abstract class GridCacheAdapter<K, V> 
implements IgniteInternalCache<K, V
     /**
      * Cache size future.
      */
-    private static class SizeFuture extends GridFutureAdapter<Integer> {
-        /** Peek modes. */
-        private final CachePeekMode[] peekModes;
-
-        /** Context. */
-        private final GridCacheContext ctx;
-
-        /** Near enable. */
-        private final boolean near;
-
-        /** Max retries count before issuing an error. */
-        private int retries = 32;
+    private static class SizeFuture extends RetryFuture {
+        /** Size. */
+        private int size = 0;
 
         /**
          * @param peekModes Peek modes.
          */
         public SizeFuture(CachePeekMode[] peekModes, GridCacheContext ctx, 
boolean near) {
-            this.peekModes = peekModes;
-            this.ctx = ctx;
-            this.near = near;
-
-            init();
+            super(ctx, new SizeCallable(ctx.name(), 
ctx.affinity().affinityTopologyVersion(), peekModes, near));
         }
 
-        /**
-         * Init.
-         */
-        private void init() {
-            IgniteClusterEx cluster = ctx.grid().cluster();
-
-            ClusterGroup grp = near ?
-                cluster.forCacheNodes(ctx.name(), true, true, false) :
-                cluster.forDataNodes(ctx.name());
-
-            Collection<ClusterNode> nodes = grp.nodes();
-
-            IgniteInternalFuture<Collection<Integer>> fut =
-                ctx.closures().broadcastNoFailover(
-                new SizeCallable(ctx.name(), 
ctx.affinity().affinityTopologyVersion(), peekModes), null, nodes);
+        /** {@inheritDoc} */
+        @Override protected void onInit() {
+            size = 0;
+        }
 
-            fut.listen(new 
IgniteInClosure<IgniteInternalFuture<Collection<Integer>>>() {
-                @Override public void 
apply(IgniteInternalFuture<Collection<Integer>> fut) {
-                    try {
-                        Collection<Integer> res = fut.get();
+        /** {@inheritDoc} */
+        @Override protected void onLocal(Object localRes) {
+            size += (Integer)localRes;
+        }
 
-                        int size = 0;
+        /** {@inheritDoc} */
+        @Override protected void allDone() {
+            onDone(size);
+        }
+    }
 
-                        for (Integer locSize : res) {
-                            if (locSize == -1) {
-                                if (retries-- > 0)
-                                    init();
-                                else {
-                                    onDone(new 
ClusterTopologyException("Failed to wait topology."));
+    /**
+     * Cache clear future.
+     */
+    private static class ClearFuture extends RetryFuture {
+        /**
+         */
+        public ClearFuture(GridCacheContext ctx, GlobalClearCallable 
clearCall) {
+            super(ctx, clearCall);
+        }
 
-                                    return;
-                                }
-                            }
+        /** {@inheritDoc} */
+        @Override protected void onInit() {
+           // No-op.
+        }
 
-                            size += locSize;
-                        }
+        /** {@inheritDoc} */
+        @Override protected void onLocal(Object localRes) {
+            // No-op.
+        }
 
-                        onDone(size);
-                    }
-                    catch (IgniteCheckedException e) {
-                        if (X.hasCause(e, ClusterTopologyException.class)) {
-                            if (retries-- > 0)
-                                init();
-                            else
-                                onDone(e);
-                        }
-                        else
-                            onDone(e);
-                    }
-                }
-            });
+        /** {@inheritDoc} */
+        @Override protected void allDone() {
+            onDone();
         }
     }
 
     /**
-     * Cache clear future.
+     * Retry future.
      */
-    private static class ClearFuture extends GridFutureAdapter<Object> {
+    protected static abstract class RetryFuture<T> extends 
GridFutureAdapter<T> {
         /** Context. */
         private final GridCacheContext ctx;
 
-
-        private final GlobalClearCallable clearCall;
+        /** Callable. */
+        private final VersionAwareCallable call;
 
         /** Max retries count before issuing an error. */
         private volatile int retries = 32;
 
         /**
          */
-        public ClearFuture(GridCacheContext ctx, GlobalClearCallable 
clearCall) {
+        public RetryFuture(GridCacheContext ctx, VersionAwareCallable call) {
             this.ctx = ctx;
-            this.clearCall = clearCall;
+            this.call = call;
+
             init();
         }
 
@@ -5140,20 +5122,19 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
          * Init.
          */
         private void init() {
-            Collection<ClusterNode> nodes =
-                ctx.grid().cluster().forCacheNodes(ctx.name(), true, true, 
false).forRemotes().nodes();
+            Collection<ClusterNode> nodes = call.nodes(ctx);
 
-            
clearCall.topologyVersion(ctx.affinity().affinityTopologyVersion());
+            call.topologyVersion(ctx.affinity().affinityTopologyVersion());
 
             IgniteInternalFuture<Collection<Object>> fut = 
ctx.closures().callAsyncNoFailover(BROADCAST,
-                F.asSet(clearCall), nodes, true);
+                F.asSet((Callable<Object>)call), nodes, true);
 
             fut.listen(new 
IgniteInClosure<IgniteInternalFuture<Collection<Object>>>() {
                 @Override public void 
apply(IgniteInternalFuture<Collection<Object>> fut) {
                     try {
                         Collection<Object> res = fut.get();
 
-                        int size = 0;
+                        onInit();
 
                         for (Object locRes : res) {
                             if (locRes == FAIL) {
@@ -5165,9 +5146,11 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
                                     return;
                                 }
                             }
+
+                            onLocal(locRes);
                         }
 
-                        onDone(size);
+                        allDone();
                     }
                     catch (IgniteCheckedException e) {
                         if (X.hasCause(e, ClusterTopologyException.class)) {
@@ -5182,6 +5165,21 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
                 }
             });
         }
+
+        /**
+         * Init reducer.
+         */
+        protected abstract void onInit();
+
+        /**
+         * @param localRes Add local result to global result.
+         */
+        protected abstract void onLocal(Object localRes);
+
+        /**
+         * On done.
+         */
+        protected abstract void allDone();
     }
 
     /**
@@ -5749,7 +5747,7 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
     /**
      * Delayed callable class.
      */
-    protected static abstract class VersionComparable<K, V> implements 
Externalizable {
+    protected static abstract class VersionAwareCallable<K, V> implements 
Serializable, Callable<Object> {
         /** */
         private static final long serialVersionUID = 0L;
 
@@ -5766,18 +5764,45 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
         /**
          * Empty constructor for serialization.
          */
-        public VersionComparable() {
+        public VersionAwareCallable() {
             // No-op.
         }
 
         /**
          * @param topVer Affinity topology version.
          */
-        public VersionComparable(String cacheName, AffinityTopologyVersion 
topVer) {
+        public VersionAwareCallable(String cacheName, AffinityTopologyVersion 
topVer) {
             this.cacheName = cacheName;
             this.topVer = topVer;
         }
 
+        /** {@inheritDoc} */
+        @Override public Object call() throws Exception {
+            if (!compareTopologyVersions())
+                return FAIL;
+
+            Object res = callLocal();
+
+            if (!compareTopologyVersions())
+                return FAIL;
+            else
+                return res;
+        }
+
+        /**
+         * Call local.
+         *
+         * @return Local result.
+         * @throws IgniteCheckedException If failed.
+         */
+        protected abstract Object callLocal() throws IgniteCheckedException;
+
+        /**
+         * @param ctx Grid cache context.
+         * @return Nodes to call.
+         */
+        protected abstract Collection<ClusterNode> nodes(GridCacheContext ctx);
+
         /**
          * Compare topology versions.
          */
@@ -5802,21 +5827,5 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
         public void topologyVersion(AffinityTopologyVersion topVer) {
             this.topVer = topVer;
         }
-
-        /** {@inheritDoc} */
-        @Override public void writeExternal(ObjectOutput out) throws 
IOException {
-            U.writeString(out, cacheName);
-
-            topVer.writeExternal(out);
-        }
-
-        /** {@inheritDoc} */
-        @Override public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException {
-            cacheName = U.readString(in);
-
-            topVer = new AffinityTopologyVersion();
-
-            topVer.readExternal(in);
-        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b65aa1d9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index c0026ab..9003b11 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -739,6 +739,8 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
             }
         }
 
+
+
         ctx.marshallerContext().onMarshallerCacheStarted(ctx);
 
         marshallerCache().context().preloader().syncFuture().listen(new 
CIX1<IgniteInternalFuture<?>>() {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b65aa1d9/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheSizeTopologyChangedTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheSizeTopologyChangedTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheSizeTopologyChangedTest.java
index ddc2b78..bac6536 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheSizeTopologyChangedTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheSizeTopologyChangedTest.java
@@ -61,15 +61,15 @@ public class GridCacheSizeTopologyChangedTest extends 
GridCommonAbstractTest {
 
         CacheConfiguration ccfg = new CacheConfiguration();
 
-        ccfg.setAtomicityMode(ATOMIC);
+        ccfg.setAtomicityMode(ATOMIC);/*
 
         ccfg.setRebalanceMode(CacheRebalanceMode.SYNC);
 
-        
ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+        
ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);*/
 
         ccfg.setCacheMode(PARTITIONED);
 
-        ccfg.setBackups(2);
+        ccfg.setBackups(1);
 
         cfg.setCacheConfiguration(defaultCacheConfiguration());
 
@@ -96,14 +96,27 @@ public class GridCacheSizeTopologyChangedTest extends 
GridCommonAbstractTest {
 
         IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable<Void>() 
{
             @Override public Void call() throws Exception {
-                int idx = 2;
                 while(!canceled.get()) {
+                    int idx = rnd.nextInt(GRIDS_CNT);
+
+                    if (idx > 0) {
+                        boolean state = status[idx];
+
+                        if (state) {
+                            System.out.println("!!! STOP GRID: " + idx);
                             stopGrid(idx);
+                        }
+                        else {
+                            System.out.println("!!! START GRID:" + idx);
+
                             startGrid(idx);
+                        }
+
+                        status[idx] = !state;
 
                         U.sleep(3000);
+                    }
                 }
-
                 return null;
             }
         });

Reply via email to