# ignite-648: callback works good (implement Cache.removeAll)
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/c797886a Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/c797886a Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/c797886a Branch: refs/heads/ignite-648 Commit: c797886a0793d0b8300b06cb192859a8a117e833 Parents: ae06c33 Author: Artem Shutak <ashu...@gridgain.com> Authored: Tue Apr 21 14:01:11 2015 +0300 Committer: Artem Shutak <ashu...@gridgain.com> Committed: Tue Apr 21 14:01:11 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/multijvm/CacheProxy.java | 327 +++++++++++++++++++ ...CachePartitionedMultiJvmFullApiSelfTest.java | 104 +----- .../cache/multijvm/IgniteExProxy.java | 32 +- .../cache/multijvm/IgniteNodeRunner.java | 23 +- 4 files changed, 349 insertions(+), 137 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c797886a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/multijvm/CacheProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/multijvm/CacheProxy.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/multijvm/CacheProxy.java new file mode 100644 index 0000000..52f9aea --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/multijvm/CacheProxy.java @@ -0,0 +1,327 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.apache.ignite.internal.processors.cache.multijvm; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.cache.query.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.internal.util.lang.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.mxbean.*; +import org.jetbrains.annotations.*; + +import javax.cache.*; +import javax.cache.CacheManager; +import javax.cache.configuration.*; +import javax.cache.expiry.*; +import javax.cache.integration.*; +import javax.cache.processor.*; +import java.util.*; +import java.util.concurrent.locks.*; + +/** + * TODO: Add class description. + * + * @author @java.author + * @version @java.version + */ +public class CacheProxy<K, V> implements IgniteCache<K, V> { + private transient IgniteExProxy proxy; + private final String cacheName; + + public CacheProxy(String name, IgniteExProxy proxy) { + this.proxy = proxy; + cacheName = name; + } + + @Override public IgniteCache<K, V> withAsync() { + return null; // TODO: CODE: implement. + } + + @Override public boolean isAsync() { + return false; // TODO: CODE: implement. + } + + @Override public <R> IgniteFuture<R> future() { + return null; // TODO: CODE: implement. + } + + @Override public <C extends Configuration<K, V>> C getConfiguration(Class<C> clazz) { + return null; // TODO: CODE: implement. + } + + @Override public Entry<K, V> randomEntry() { + return null; // TODO: CODE: implement. + } + + @Override public IgniteCache<K, V> withExpiryPolicy(ExpiryPolicy plc) { + return null; // TODO: CODE: implement. + } + + @Override public IgniteCache<K, V> withSkipStore() { + return null; // TODO: CODE: implement. + } + + @Override + public void loadCache(@Nullable IgniteBiPredicate<K, V> p, @Nullable Object... args) throws CacheException { + // TODO: CODE: implement. + } + + @Override + public void localLoadCache(@Nullable IgniteBiPredicate<K, V> p, @Nullable Object... args) throws CacheException { + // TODO: CODE: implement. + } + + @Override public V getAndPutIfAbsent(K key, V val) throws CacheException { + return null; // TODO: CODE: implement. + } + + @Override public Lock lock(K key) { + return null; // TODO: CODE: implement. + } + + @Override public Lock lockAll(Collection<? extends K> keys) { + return null; // TODO: CODE: implement. + } + + @Override public boolean isLocalLocked(K key, boolean byCurrThread) { + return false; // TODO: CODE: implement. + } + + @Override public <R> QueryCursor<R> query(Query<R> qry) { + return null; // TODO: CODE: implement. + } + + @Override public Iterable<Entry<K, V>> localEntries(CachePeekMode... peekModes) throws CacheException { + return null; // TODO: CODE: implement. + } + + @Override public QueryMetrics queryMetrics() { + return null; // TODO: CODE: implement. + } + + @Override public void localEvict(Collection<? extends K> keys) { + // TODO: CODE: implement. + } + + @Override public V localPeek(K key, CachePeekMode... peekModes) { + return null; // TODO: CODE: implement. + } + + @Override public void localPromote(Set<? extends K> keys) throws CacheException { + // TODO: CODE: implement. + } + + @Override public int size(CachePeekMode... peekModes) throws CacheException { + return 0; // TODO: CODE: implement. + } + + @Override public int localSize(CachePeekMode... peekModes) { + return 0; // TODO: CODE: implement. + } + + @Override + public <T> Map<K, EntryProcessorResult<T>> invokeAll(Map<? extends K, ? extends EntryProcessor<K, V, T>> map, + Object... args) { + return null; // TODO: CODE: implement. + } + + @Override public V get(K key) { + return null; // TODO: CODE: implement. + } + + @Override public Map<K, V> getAll(Set<? extends K> keys) { + return null; // TODO: CODE: implement. + } + + @Override public boolean containsKey(K key) { + return false; // TODO: CODE: implement. + } + + @Override + public void loadAll(Set<? extends K> keys, boolean replaceExistingValues, CompletionListener completionListener) { + // TODO: CODE: implement. + } + + @Override public boolean containsKeys(Set<? extends K> keys) { + return false; // TODO: CODE: implement. + } + + @Override public void put(K key, V val) { + // TODO: CODE: implement. + } + + @Override public V getAndPut(K key, V val) { + return null; // TODO: CODE: implement. + } + + @Override public void putAll(Map<? extends K, ? extends V> map) { + // TODO: CODE: implement. + } + + @Override public boolean putIfAbsent(K key, V val) { + return false; // TODO: CODE: implement. + } + + @Override public boolean remove(K key) { + return false; // TODO: CODE: implement. + } + + @Override public boolean remove(K key, V oldVal) { + return false; // TODO: CODE: implement. + } + + @Override public V getAndRemove(K key) { + return null; // TODO: CODE: implement. + } + + @Override public boolean replace(K key, V oldVal, V newVal) { + return false; // TODO: CODE: implement. + } + + @Override public boolean replace(K key, V val) { + return false; // TODO: CODE: implement. + } + + @Override public V getAndReplace(K key, V val) { + return null; // TODO: CODE: implement. + } + + @Override public void removeAll(final Set<? extends K> keys) { + ClusterGroup grp = proxy.localJvmGrid().cluster().forNodeId(proxy.getId()); + + IgniteCompute compute = proxy.localJvmGrid().compute(grp); + + compute.broadcast(new MyClos(proxy.getId(), proxy.name(), cacheName), keys); + } + + public static class MyClos extends IgniteClosureX<Set<?>, String> { + private UUID id; + private final String gridName; + private final String cacheName; + + public MyClos(UUID id, String gridName, String cacheName) { + this.id = id; + this.gridName = gridName; + this.cacheName = cacheName; + } + + @Override public String applyx(Set<?> ks) throws IgniteCheckedException { + X.println(">>>>> Cache. Removing keys=" + ks); + + Ignite ignite = Ignition.ignite(id); + + X.println(">>>>> Cache. Ignite=" + ignite); + + IgniteCache<Object, Object> cache = ignite.cache(cacheName); + + X.println(">>>>> Cache. Cache=" + cache); + + cache.removeAll(ks); + + return ""; + } + } + + @Override public void removeAll() { + // TODO: CODE: implement. + } + + @Override public void clear() { + // TODO: CODE: implement. + } + + @Override public void clear(K key) { + // TODO: CODE: implement. + } + + @Override public void clearAll(Set<? extends K> keys) { + // TODO: CODE: implement. + } + + @Override public void localClear(K key) { + // TODO: CODE: implement. + } + + @Override public void localClearAll(Set<? extends K> keys) { + // TODO: CODE: implement. + } + + @Override public <T> T invoke(K key, EntryProcessor<K, V, T> entryProcessor, Object... arguments) { + return null; // TODO: CODE: implement. + } + + @Override public <T> T invoke(K key, CacheEntryProcessor<K, V, T> entryProcessor, Object... arguments) { + return null; // TODO: CODE: implement. + } + + @Override + public <T> Map<K, EntryProcessorResult<T>> invokeAll(Set<? extends K> keys, EntryProcessor<K, V, T> entryProcessor, + Object... args) { + return null; // TODO: CODE: implement. + } + + @Override public String getName() { + return null; // TODO: CODE: implement. + } + + @Override public CacheManager getCacheManager() { + return null; // TODO: CODE: implement. + } + + @Override public void close() { + // TODO: CODE: implement. + } + + @Override public boolean isClosed() { + return false; // TODO: CODE: implement. + } + + @Override public <T> T unwrap(Class<T> clazz) { + return null; // TODO: CODE: implement. + } + + @Override + public void registerCacheEntryListener(CacheEntryListenerConfiguration<K, V> cacheEntryListenerConfiguration) { + // TODO: CODE: implement. + } + + @Override + public void deregisterCacheEntryListener(CacheEntryListenerConfiguration<K, V> cacheEntryListenerConfiguration) { + // TODO: CODE: implement. + } + + @Override public Iterator<Entry<K, V>> iterator() { + return null; // TODO: CODE: implement. + } + + @Override public <T> Map<K, EntryProcessorResult<T>> invokeAll(Set<? extends K> keys, + CacheEntryProcessor<K, V, T> entryProcessor, Object... args) { + return null; // TODO: CODE: implement. + } + + @Override public IgniteFuture<?> rebalance() { + return null; // TODO: CODE: implement. + } + + @Override public CacheMetrics metrics() { + return null; // TODO: CODE: implement. + } + + @Override public CacheMetrics metrics(ClusterGroup grp) { + return null; // TODO: CODE: implement. + } + + @Override public CacheMetricsMXBean mxBean() { + return null; // TODO: CODE: implement. + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c797886a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/multijvm/GridCachePartitionedMultiJvmFullApiSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/multijvm/GridCachePartitionedMultiJvmFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/multijvm/GridCachePartitionedMultiJvmFullApiSelfTest.java index 4f40122..9395b54 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/multijvm/GridCachePartitionedMultiJvmFullApiSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/multijvm/GridCachePartitionedMultiJvmFullApiSelfTest.java @@ -22,8 +22,7 @@ import org.apache.ignite.configuration.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.processors.cache.distributed.near.*; import org.apache.ignite.internal.processors.resource.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.lang.*; +import org.apache.ignite.spi.discovery.tcp.*; import java.util.*; @@ -67,13 +66,11 @@ public class GridCachePartitionedMultiJvmFullApiSelfTest extends GridCachePartit /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { -// IgniteConfiguration cfg = super.getConfiguration(gridName); -// -// ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IgniteNodeRunner.ipFinder); -// -// return cfg; - - return IgniteNodeRunner.theSameConf(gridName); + IgniteConfiguration cfg = super.getConfiguration(gridName); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IgniteNodeRunner.ipFinder); + + return cfg; } /** {@inheritDoc} */ @@ -150,92 +147,15 @@ public class GridCachePartitionedMultiJvmFullApiSelfTest extends GridCachePartit IgniteCache<Object, Object> c1 = grid1.cache(null); -// c0.putAll(putMap); -// -// atomicClockModeDelay(c0); -// -// c1.removeAll(putMap.keySet()); -// -// for (int i = 0; i < size; i++) { -// assertNull(c0.get(i)); -// assertNull(c1.get(i)); -// } -// -// for (IgniteExProxy ignite : ignites.values()) -// ignite.getProcess().kill(); - } - - /** - * @throws Exception If failed. - */ - public void testBroadcast() throws Exception { - IgniteEx grid0 = grid(0); - IgniteEx grid1 = grid(1); - - Thread.sleep(10_000); - -// ClusterGroup grp = grid0.cluster().forNode(grid1.localNode()); - - grid0.compute().broadcast(new IgniteRunnable() { - @Override public void run() { - System.out.println(">>>>> trololo"); - } - }); - - Thread.sleep(10_000); - } - - /** - * @throws Exception If failed. - */ - public void testSimpleBroadcast_worked() throws Exception { - Ignite rmt = null; - - try(Ignite ignite = Ignition.start(IgniteNodeRunner.theSameConf("SomeLocGrid"))) { - Thread.sleep(5_000); - - rmt = new IgniteExProxy(IgniteNodeRunner.theSameConf("remoteNode"), ignite.log(), ignite); - - Thread.sleep(5_000); - - ignite.compute().broadcast(new IgniteRunnable() { - @Override public void run() { - System.out.println(">>>>> trololo"); - } - }); - - Collection<String> res = ignite.compute().broadcast(new C1<Integer, String>() { - @Override public String apply(Integer integer) { - return String.valueOf(integer + 12); - } - }, 100500); - - System.out.println(">>>>> " + res); - } - finally { - rmt.close(); - } - } - - /** - * @throws Exception If failed. - */ - public void testRmt2RmtBroadcast_worked() throws Exception { - Ignite rmt1 = null; - Ignite rmt2 = null; - - try { - rmt1 = new IgniteExProxy(IgniteNodeRunner.theSameConf("remoteNode1"), log, null); + c0.putAll(putMap); - Thread.sleep(5_000); + atomicClockModeDelay(c0); - rmt2 = new IgniteExProxy(IgniteNodeRunner.theSameConf("remoteNode2"), log, null); + c1.removeAll(putMap.keySet()); - Thread.sleep(5_000); - } - finally { - rmt1.close(); - rmt2.close(); + for (int i = 0; i < size; i++) { + assertNull(c0.get(i)); + assertNull(c1.get(i)); } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c797886a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/multijvm/IgniteExProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/multijvm/IgniteExProxy.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/multijvm/IgniteExProxy.java index 8a273b6..074529f 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/multijvm/IgniteExProxy.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/multijvm/IgniteExProxy.java @@ -86,6 +86,14 @@ public class IgniteExProxy implements IgniteEx { gridProxies.put(cfg.getGridName(), this); } + public Ignite localJvmGrid() { + return locJvmGrid; + } + + public UUID getId() { + return id; + } + @Override public String name() { return cfg.getGridName(); } @@ -248,29 +256,7 @@ public class IgniteExProxy implements IgniteEx { } @Override public <K, V> IgniteCache<K, V> cache(@Nullable final String name) { - ClusterGroup grp = locJvmGrid.cluster().forNodeId(id); - - locJvmGrid.compute(grp).broadcast(new IgniteRunnable() { - @Override public void run() { - System.out.println(">>>>> trololo"); - } - }); - -// locJvmGrid.compute(grp).run(new IgniteRunnable() { -// @Override public void run() { -// X.println(">>>>> trololo"); -// } -// }); - -// return locJvmGrid.compute(grp).apply(new C1<Set<String>, IgniteCache<K,V>>() { -// @Override public IgniteCache<K,V> apply(Set<String> objects) { -// X.println(">>>>> Cache"); -// -// return Ignition.ignite().cache(name); -// } -// }, Collections.<String>emptySet()); -// - return null; + return new CacheProxy(name, this); } @Override public IgniteTransactions transactions() { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c797886a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/multijvm/IgniteNodeRunner.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/multijvm/IgniteNodeRunner.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/multijvm/IgniteNodeRunner.java index be200a6..8ee0abf 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/multijvm/IgniteNodeRunner.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/multijvm/IgniteNodeRunner.java @@ -111,8 +111,7 @@ public class IgniteNodeRunner { cfg.setNodeId(nodeId); -// return cfg; - return theSameConf(gridName); + return cfg; } private static boolean isDebug() { @@ -142,24 +141,4 @@ public class IgniteNodeRunner { // } return new CacheConfiguration(); } - - public static IgniteConfiguration theSameConf(String gridName) { - IgniteConfiguration cfg = new IgniteConfiguration(); - - TcpDiscoverySpi disco = new TcpDiscoverySpi(); - disco.setIpFinder(ipFinder); - cfg.setDiscoverySpi(disco); - - cfg.setMarshaller(new OptimizedMarshaller(false)); - - cfg.setCacheConfiguration(new CacheConfiguration()); - - cfg.setGridName(gridName); - - cfg.setLocalHost("127.0.0.1"); - - cfg.setIncludeProperties(); - - return cfg; - } }