# ignite-648: callback works good (implement Cache.removeAll)

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/c797886a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/c797886a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/c797886a

Branch: refs/heads/ignite-648
Commit: c797886a0793d0b8300b06cb192859a8a117e833
Parents: ae06c33
Author: Artem Shutak <ashu...@gridgain.com>
Authored: Tue Apr 21 14:01:11 2015 +0300
Committer: Artem Shutak <ashu...@gridgain.com>
Committed: Tue Apr 21 14:01:11 2015 +0300

----------------------------------------------------------------------
 .../processors/cache/multijvm/CacheProxy.java   | 327 +++++++++++++++++++
 ...CachePartitionedMultiJvmFullApiSelfTest.java | 104 +-----
 .../cache/multijvm/IgniteExProxy.java           |  32 +-
 .../cache/multijvm/IgniteNodeRunner.java        |  23 +-
 4 files changed, 349 insertions(+), 137 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c797886a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/multijvm/CacheProxy.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/multijvm/CacheProxy.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/multijvm/CacheProxy.java
new file mode 100644
index 0000000..52f9aea
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/multijvm/CacheProxy.java
@@ -0,0 +1,327 @@
+/* @java.file.header */
+
+/*  _________        _____ __________________        _____
+ *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
+ *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+ *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+ *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+ */
+
+package org.apache.ignite.internal.processors.cache.multijvm;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.cache.query.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.internal.util.lang.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.mxbean.*;
+import org.jetbrains.annotations.*;
+
+import javax.cache.*;
+import javax.cache.CacheManager;
+import javax.cache.configuration.*;
+import javax.cache.expiry.*;
+import javax.cache.integration.*;
+import javax.cache.processor.*;
+import java.util.*;
+import java.util.concurrent.locks.*;
+
+/**
+ * TODO: Add class description.
+ *
+ * @author @java.author
+ * @version @java.version
+ */
+public class CacheProxy<K, V> implements IgniteCache<K, V> {
+    private transient IgniteExProxy proxy;
+    private final String cacheName;
+
+    public CacheProxy(String name, IgniteExProxy proxy) {
+        this.proxy = proxy;
+        cacheName = name;
+    }
+
+    @Override public IgniteCache<K, V> withAsync() {
+        return null; // TODO: CODE: implement.
+    }
+
+    @Override public boolean isAsync() {
+        return false; // TODO: CODE: implement.
+    }
+
+    @Override public <R> IgniteFuture<R> future() {
+        return null; // TODO: CODE: implement.
+    }
+
+    @Override public <C extends Configuration<K, V>> C 
getConfiguration(Class<C> clazz) {
+        return null; // TODO: CODE: implement.
+    }
+
+    @Override public Entry<K, V> randomEntry() {
+        return null; // TODO: CODE: implement.
+    }
+
+    @Override public IgniteCache<K, V> withExpiryPolicy(ExpiryPolicy plc) {
+        return null; // TODO: CODE: implement.
+    }
+
+    @Override public IgniteCache<K, V> withSkipStore() {
+        return null; // TODO: CODE: implement.
+    }
+
+    @Override
+    public void loadCache(@Nullable IgniteBiPredicate<K, V> p, @Nullable 
Object... args) throws CacheException {
+        // TODO: CODE: implement.
+    }
+
+    @Override
+    public void localLoadCache(@Nullable IgniteBiPredicate<K, V> p, @Nullable 
Object... args) throws CacheException {
+        // TODO: CODE: implement.
+    }
+
+    @Override public V getAndPutIfAbsent(K key, V val) throws CacheException {
+        return null; // TODO: CODE: implement.
+    }
+
+    @Override public Lock lock(K key) {
+        return null; // TODO: CODE: implement.
+    }
+
+    @Override public Lock lockAll(Collection<? extends K> keys) {
+        return null; // TODO: CODE: implement.
+    }
+
+    @Override public boolean isLocalLocked(K key, boolean byCurrThread) {
+        return false; // TODO: CODE: implement.
+    }
+
+    @Override public <R> QueryCursor<R> query(Query<R> qry) {
+        return null; // TODO: CODE: implement.
+    }
+
+    @Override public Iterable<Entry<K, V>> localEntries(CachePeekMode... 
peekModes) throws CacheException {
+        return null; // TODO: CODE: implement.
+    }
+
+    @Override public QueryMetrics queryMetrics() {
+        return null; // TODO: CODE: implement.
+    }
+
+    @Override public void localEvict(Collection<? extends K> keys) {
+        // TODO: CODE: implement.
+    }
+
+    @Override public V localPeek(K key, CachePeekMode... peekModes) {
+        return null; // TODO: CODE: implement.
+    }
+
+    @Override public void localPromote(Set<? extends K> keys) throws 
CacheException {
+        // TODO: CODE: implement.
+    }
+
+    @Override public int size(CachePeekMode... peekModes) throws 
CacheException {
+        return 0; // TODO: CODE: implement.
+    }
+
+    @Override public int localSize(CachePeekMode... peekModes) {
+        return 0; // TODO: CODE: implement.
+    }
+
+    @Override
+    public <T> Map<K, EntryProcessorResult<T>> invokeAll(Map<? extends K, ? 
extends EntryProcessor<K, V, T>> map,
+        Object... args) {
+        return null; // TODO: CODE: implement.
+    }
+
+    @Override public V get(K key) {
+        return null; // TODO: CODE: implement.
+    }
+
+    @Override public Map<K, V> getAll(Set<? extends K> keys) {
+        return null; // TODO: CODE: implement.
+    }
+
+    @Override public boolean containsKey(K key) {
+        return false; // TODO: CODE: implement.
+    }
+
+    @Override
+    public void loadAll(Set<? extends K> keys, boolean replaceExistingValues, 
CompletionListener completionListener) {
+        // TODO: CODE: implement.
+    }
+
+    @Override public boolean containsKeys(Set<? extends K> keys) {
+        return false; // TODO: CODE: implement.
+    }
+
+    @Override public void put(K key, V val) {
+        // TODO: CODE: implement.
+    }
+
+    @Override public V getAndPut(K key, V val) {
+        return null; // TODO: CODE: implement.
+    }
+
+    @Override public void putAll(Map<? extends K, ? extends V> map) {
+        // TODO: CODE: implement.
+    }
+
+    @Override public boolean putIfAbsent(K key, V val) {
+        return false; // TODO: CODE: implement.
+    }
+
+    @Override public boolean remove(K key) {
+        return false; // TODO: CODE: implement.
+    }
+
+    @Override public boolean remove(K key, V oldVal) {
+        return false; // TODO: CODE: implement.
+    }
+
+    @Override public V getAndRemove(K key) {
+        return null; // TODO: CODE: implement.
+    }
+
+    @Override public boolean replace(K key, V oldVal, V newVal) {
+        return false; // TODO: CODE: implement.
+    }
+
+    @Override public boolean replace(K key, V val) {
+        return false; // TODO: CODE: implement.
+    }
+
+    @Override public V getAndReplace(K key, V val) {
+        return null; // TODO: CODE: implement.
+    }
+
+    @Override public void removeAll(final Set<? extends K> keys) {
+        ClusterGroup grp = 
proxy.localJvmGrid().cluster().forNodeId(proxy.getId());
+
+        IgniteCompute compute = proxy.localJvmGrid().compute(grp);
+        
+        compute.broadcast(new MyClos(proxy.getId(), proxy.name(), cacheName), 
keys);
+    }
+    
+    public static class MyClos extends IgniteClosureX<Set<?>, String> {
+        private UUID id;
+        private final String gridName;
+        private final String cacheName;
+
+        public MyClos(UUID id, String gridName, String cacheName) {
+            this.id = id;
+            this.gridName = gridName;
+            this.cacheName = cacheName;
+        }
+
+        @Override public String applyx(Set<?> ks) throws 
IgniteCheckedException {
+            X.println(">>>>> Cache. Removing keys=" + ks);
+            
+            Ignite ignite = Ignition.ignite(id);
+
+            X.println(">>>>> Cache. Ignite=" + ignite);
+
+            IgniteCache<Object, Object> cache = ignite.cache(cacheName);
+
+            X.println(">>>>> Cache. Cache=" + cache);
+
+            cache.removeAll(ks);
+
+            return "";
+        }
+    }
+
+    @Override public void removeAll() {
+        // TODO: CODE: implement.
+    }
+
+    @Override public void clear() {
+        // TODO: CODE: implement.
+    }
+
+    @Override public void clear(K key) {
+        // TODO: CODE: implement.
+    }
+
+    @Override public void clearAll(Set<? extends K> keys) {
+        // TODO: CODE: implement.
+    }
+
+    @Override public void localClear(K key) {
+        // TODO: CODE: implement.
+    }
+
+    @Override public void localClearAll(Set<? extends K> keys) {
+        // TODO: CODE: implement.
+    }
+
+    @Override public <T> T invoke(K key, EntryProcessor<K, V, T> 
entryProcessor, Object... arguments) {
+        return null; // TODO: CODE: implement.
+    }
+
+    @Override public <T> T invoke(K key, CacheEntryProcessor<K, V, T> 
entryProcessor, Object... arguments) {
+        return null; // TODO: CODE: implement.
+    }
+
+    @Override
+    public <T> Map<K, EntryProcessorResult<T>> invokeAll(Set<? extends K> 
keys, EntryProcessor<K, V, T> entryProcessor,
+        Object... args) {
+        return null; // TODO: CODE: implement.
+    }
+
+    @Override public String getName() {
+        return null; // TODO: CODE: implement.
+    }
+
+    @Override public CacheManager getCacheManager() {
+        return null; // TODO: CODE: implement.
+    }
+
+    @Override public void close() {
+        // TODO: CODE: implement.
+    }
+
+    @Override public boolean isClosed() {
+        return false; // TODO: CODE: implement.
+    }
+
+    @Override public <T> T unwrap(Class<T> clazz) {
+        return null; // TODO: CODE: implement.
+    }
+
+    @Override
+    public void registerCacheEntryListener(CacheEntryListenerConfiguration<K, 
V> cacheEntryListenerConfiguration) {
+        // TODO: CODE: implement.
+    }
+
+    @Override
+    public void 
deregisterCacheEntryListener(CacheEntryListenerConfiguration<K, V> 
cacheEntryListenerConfiguration) {
+        // TODO: CODE: implement.
+    }
+
+    @Override public Iterator<Entry<K, V>> iterator() {
+        return null; // TODO: CODE: implement.
+    }
+
+    @Override public <T> Map<K, EntryProcessorResult<T>> invokeAll(Set<? 
extends K> keys,
+        CacheEntryProcessor<K, V, T> entryProcessor, Object... args) {
+        return null; // TODO: CODE: implement.
+    }
+
+    @Override public IgniteFuture<?> rebalance() {
+        return null; // TODO: CODE: implement.
+    }
+
+    @Override public CacheMetrics metrics() {
+        return null; // TODO: CODE: implement.
+    }
+
+    @Override public CacheMetrics metrics(ClusterGroup grp) {
+        return null; // TODO: CODE: implement.
+    }
+
+    @Override public CacheMetricsMXBean mxBean() {
+        return null; // TODO: CODE: implement.
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c797886a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/multijvm/GridCachePartitionedMultiJvmFullApiSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/multijvm/GridCachePartitionedMultiJvmFullApiSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/multijvm/GridCachePartitionedMultiJvmFullApiSelfTest.java
index 4f40122..9395b54 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/multijvm/GridCachePartitionedMultiJvmFullApiSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/multijvm/GridCachePartitionedMultiJvmFullApiSelfTest.java
@@ -22,8 +22,7 @@ import org.apache.ignite.configuration.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.processors.cache.distributed.near.*;
 import org.apache.ignite.internal.processors.resource.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.lang.*;
+import org.apache.ignite.spi.discovery.tcp.*;
 
 import java.util.*;
 
@@ -67,13 +66,11 @@ public class GridCachePartitionedMultiJvmFullApiSelfTest 
extends GridCachePartit
 
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
-//        IgniteConfiguration cfg = super.getConfiguration(gridName);
-//
-//        
((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IgniteNodeRunner.ipFinder);
-//
-//        return cfg;
-        
-        return IgniteNodeRunner.theSameConf(gridName);
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        
((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IgniteNodeRunner.ipFinder);
+
+        return cfg;
     }
 
     /** {@inheritDoc} */
@@ -150,92 +147,15 @@ public class GridCachePartitionedMultiJvmFullApiSelfTest 
extends GridCachePartit
 
         IgniteCache<Object, Object> c1 = grid1.cache(null);
 
-//        c0.putAll(putMap);
-//
-//        atomicClockModeDelay(c0);
-//
-//        c1.removeAll(putMap.keySet());
-//
-//        for (int i = 0; i < size; i++) {
-//            assertNull(c0.get(i));
-//            assertNull(c1.get(i));
-//        }
-//
-//        for (IgniteExProxy ignite : ignites.values())
-//            ignite.getProcess().kill();
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testBroadcast() throws Exception {
-        IgniteEx grid0 = grid(0);
-        IgniteEx grid1 = grid(1);
-
-        Thread.sleep(10_000);
-
-//        ClusterGroup grp = grid0.cluster().forNode(grid1.localNode());
-
-        grid0.compute().broadcast(new IgniteRunnable() {
-            @Override public void run() {
-                System.out.println(">>>>> trololo");
-            }
-        });
-        
-        Thread.sleep(10_000);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testSimpleBroadcast_worked() throws Exception {
-        Ignite rmt = null;
-
-        try(Ignite ignite = 
Ignition.start(IgniteNodeRunner.theSameConf("SomeLocGrid"))) {
-            Thread.sleep(5_000);
-
-            rmt = new 
IgniteExProxy(IgniteNodeRunner.theSameConf("remoteNode"), ignite.log(), ignite);
-
-            Thread.sleep(5_000);
-
-            ignite.compute().broadcast(new IgniteRunnable() {
-                @Override public void run() {
-                    System.out.println(">>>>> trololo");
-                }
-            });
-
-            Collection<String> res = ignite.compute().broadcast(new 
C1<Integer, String>() {
-                @Override public String apply(Integer integer) {
-                    return String.valueOf(integer + 12);
-                }
-            }, 100500);
-
-            System.out.println(">>>>> " + res);
-        }
-        finally {
-            rmt.close();
-        }
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testRmt2RmtBroadcast_worked() throws Exception {
-        Ignite rmt1 = null;
-        Ignite rmt2 = null;
-
-        try {
-            rmt1 = new 
IgniteExProxy(IgniteNodeRunner.theSameConf("remoteNode1"), log, null);
+        c0.putAll(putMap);
 
-            Thread.sleep(5_000);
+        atomicClockModeDelay(c0);
 
-            rmt2 = new 
IgniteExProxy(IgniteNodeRunner.theSameConf("remoteNode2"), log, null);
+        c1.removeAll(putMap.keySet());
 
-            Thread.sleep(5_000);
-        }
-        finally {
-            rmt1.close();
-            rmt2.close();
+        for (int i = 0; i < size; i++) {
+            assertNull(c0.get(i));
+            assertNull(c1.get(i));
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c797886a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/multijvm/IgniteExProxy.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/multijvm/IgniteExProxy.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/multijvm/IgniteExProxy.java
index 8a273b6..074529f 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/multijvm/IgniteExProxy.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/multijvm/IgniteExProxy.java
@@ -86,6 +86,14 @@ public class IgniteExProxy implements IgniteEx {
         gridProxies.put(cfg.getGridName(), this);
     }
 
+    public Ignite localJvmGrid() {
+        return locJvmGrid;
+    }
+
+    public UUID getId() {
+        return id;
+    }
+
     @Override public String name() {
         return cfg.getGridName();
     }
@@ -248,29 +256,7 @@ public class IgniteExProxy implements IgniteEx {
     }
 
     @Override public <K, V> IgniteCache<K, V> cache(@Nullable final String 
name) {
-        ClusterGroup grp = locJvmGrid.cluster().forNodeId(id);
-
-        locJvmGrid.compute(grp).broadcast(new IgniteRunnable() {
-            @Override public void run() {
-                System.out.println(">>>>> trololo");
-            }
-        });
-        
-//        locJvmGrid.compute(grp).run(new IgniteRunnable() {
-//            @Override public void run() {
-//                X.println(">>>>> trololo");
-//            }
-//        });
-
-//        return locJvmGrid.compute(grp).apply(new C1<Set<String>, 
IgniteCache<K,V>>() {
-//            @Override public IgniteCache<K,V> apply(Set<String> objects) {
-//                X.println(">>>>> Cache");
-//
-//                return Ignition.ignite().cache(name);
-//            }
-//        }, Collections.<String>emptySet());
-//
-        return null;
+        return new CacheProxy(name, this);
     }
 
     @Override public IgniteTransactions transactions() {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c797886a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/multijvm/IgniteNodeRunner.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/multijvm/IgniteNodeRunner.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/multijvm/IgniteNodeRunner.java
index be200a6..8ee0abf 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/multijvm/IgniteNodeRunner.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/multijvm/IgniteNodeRunner.java
@@ -111,8 +111,7 @@ public class IgniteNodeRunner {
 
         cfg.setNodeId(nodeId);
 
-//        return cfg;
-        return theSameConf(gridName);
+        return cfg;
     }
 
     private static boolean isDebug() {
@@ -142,24 +141,4 @@ public class IgniteNodeRunner {
 //        }
         return new CacheConfiguration();
     }
-
-    public static IgniteConfiguration theSameConf(String gridName) {
-        IgniteConfiguration cfg = new IgniteConfiguration();
-
-        TcpDiscoverySpi disco = new TcpDiscoverySpi();
-        disco.setIpFinder(ipFinder);
-        cfg.setDiscoverySpi(disco);
-
-        cfg.setMarshaller(new OptimizedMarshaller(false));
-        
-        cfg.setCacheConfiguration(new CacheConfiguration());
-        
-        cfg.setGridName(gridName);
-
-        cfg.setLocalHost("127.0.0.1");
-
-        cfg.setIncludeProperties();
-
-        return cfg;
-    }
 }

Reply via email to