# ignite-648: loadAll fix and IgniteProcessProxy.java refactoring (constructor, close)
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/ec660e53 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/ec660e53 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/ec660e53 Branch: refs/heads/ignite-648-failover Commit: ec660e537e55b8f983fbe11b00cc6590125094a5 Parents: ae4e791 Author: ashutak <ashu...@gridgain.com> Authored: Wed Jul 1 17:44:15 2015 +0300 Committer: ashutak <ashu...@gridgain.com> Committed: Wed Jul 1 17:44:15 2015 +0300 ---------------------------------------------------------------------- ...heAtomicNearOnlyMultiJvmFullApiSelfTest.java | 2 +- .../junits/common/GridCommonAbstractTest.java | 35 +++++++- .../multijvm/IgniteCacheProcessProxy.java | 4 +- .../junits/multijvm/IgniteProcessProxy.java | 88 ++++++++++++++++---- 4 files changed, 109 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ec660e53/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/multijvm/GridCacheAtomicNearOnlyMultiJvmFullApiSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/multijvm/GridCacheAtomicNearOnlyMultiJvmFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/multijvm/GridCacheAtomicNearOnlyMultiJvmFullApiSelfTest.java index 4a2dcdd..ba01611 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/multijvm/GridCacheAtomicNearOnlyMultiJvmFullApiSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/multijvm/GridCacheAtomicNearOnlyMultiJvmFullApiSelfTest.java @@ -22,7 +22,7 @@ import org.apache.ignite.internal.processors.cache.distributed.near.*; /** * Multy Jvm tests. */ -public class GridCacheAtomicNearOnlyMultiJvmFullApiSelfTest extends +public class GridCacheAtomicNearOnlyMultiJvmFullApiSelfTest extends GridCacheAtomicNearOnlyMultiNodeFullApiSelfTest { /** {@inheritDoc} */ protected boolean isMultiJvm() { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ec660e53/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java index 1fc4415..01cb240 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java @@ -254,7 +254,40 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest { * @param replaceExistingValues Replace existing values. * @throws Exception If failed. */ - protected static <K> void loadAll(Cache<K, ?> cache, Set<K> keys, boolean replaceExistingValues) throws Exception { + protected static <K> void loadAll(Cache<K, ?> cache, final Set<K> keys, final boolean replaceExistingValues) throws Exception { + Ignite ignite = cache.unwrap(Ignite.class); + + if (!(ignite instanceof IgniteProcessProxy)) + loadAll0(cache, keys, replaceExistingValues); + else { + IgniteProcessProxy proxy = (IgniteProcessProxy)ignite; + + final UUID id = proxy.getId(); + + final String cacheName = cache.getName(); + + final Set<Object> keysCp = (Set<Object>)keys; + + proxy.remoteCompute().run(new CAX() { + @Override public void applyx() throws IgniteCheckedException { + try { + loadAll0(Ignition.ignite(id).cache(cacheName), keysCp, replaceExistingValues); + } + catch (Exception e) { + throw new IgniteCheckedException(e); + } + } + }); + } + } + + /** + * @param cache Cache. + * @param keys Keys. + * @param replaceExistingValues Replace existing values. + * @throws Exception If failed. + */ + private static <K> void loadAll0(Cache<K, ?> cache, Set<K> keys, boolean replaceExistingValues) throws Exception { final AtomicReference<Exception> ex = new AtomicReference<>(); final CountDownLatch latch = new CountDownLatch(1); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ec660e53/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteCacheProcessProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteCacheProcessProxy.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteCacheProcessProxy.java index a478db0..e23739d 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteCacheProcessProxy.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteCacheProcessProxy.java @@ -280,8 +280,8 @@ public class IgniteCacheProcessProxy<K, V> implements IgniteCache<K, V> { } /** {@inheritDoc} */ - @Override public void loadAll(Set<? extends K> keys, boolean replaceExistingValues, CompletionListener completionListener) { - throw new UnsupportedOperationException("Method should be supported."); + @Override public void loadAll(Set<? extends K> keys, boolean replaceExistingValues, CompletionListener completionLsnr) { + throw new UnsupportedOperationException("Oparetion can't be supported automatically."); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ec660e53/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java index b96fe5f..6da478d 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteProcessProxy.java @@ -28,6 +28,7 @@ import org.apache.ignite.internal.processors.cache.*; import org.apache.ignite.internal.processors.hadoop.*; import org.apache.ignite.internal.util.*; import org.apache.ignite.internal.util.lang.*; +import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; import org.apache.ignite.plugin.*; @@ -59,9 +60,6 @@ public class IgniteProcessProxy implements IgniteEx { /** Grid id. */ private final UUID id = UUID.randomUUID(); - /** Remote ignite instance started latch. */ - private final transient CountDownLatch rmtNodeStartedLatch = new CountDownLatch(1); - /** * @param cfg Configuration. * @param log Logger. @@ -84,17 +82,9 @@ public class IgniteProcessProxy implements IgniteEx { filteredJvmArgs.add(arg); } - locJvmGrid.events().localListen(new IgnitePredicateX<Event>() { - @Override public boolean applyx(Event e) { - if (((DiscoveryEvent)e).eventNode().id().equals(id)) { - rmtNodeStartedLatch.countDown(); + final CountDownLatch rmtNodeStartedLatch = new CountDownLatch(1); - return false; - } - - return true; - } - }, EventType.EVT_NODE_JOINED); + locJvmGrid.events().localListen(new NodeStartedListener(id, rmtNodeStartedLatch), EventType.EVT_NODE_JOINED); proc = GridJavaProcess.exec( IgniteNodeRunner.class, @@ -117,6 +107,36 @@ public class IgniteProcessProxy implements IgniteEx { } /** + */ + private static class NodeStartedListener extends IgnitePredicateX<Event> { + /** Id. */ + private final UUID id; + + /** Remote node started latch. */ + private final CountDownLatch rmtNodeStartedLatch; + + /** + * @param id Id. + * @param rmtNodeStartedLatch Remotenode started latch. + */ + NodeStartedListener(UUID id, CountDownLatch rmtNodeStartedLatch) { + this.id = id; + this.rmtNodeStartedLatch = rmtNodeStartedLatch; + } + + /** {@inheritDoc} */ + @Override public boolean applyx(Event e) { + if (((DiscoveryEvent)e).eventNode().id().equals(id)) { + rmtNodeStartedLatch.countDown(); + + return false; + } + + return true; + } + } + + /** * @param gridName Grid name. * @return Instance by name or exception wiil be thrown. */ @@ -131,6 +151,15 @@ public class IgniteProcessProxy implements IgniteEx { } /** + * For usage in closures. + * + * @return Ignite instance. + */ + private Ignite igniteById() { + return Ignition.ignite(id); + } + + /** * @param locNodeId ID of local node the requested grid instance is managing. * @return An instance of named grid. This method never returns {@code null}. * @throws IgniteIllegalStateException Thrown if grid was not properly initialized or grid instance was stopped or @@ -378,7 +407,7 @@ public class IgniteProcessProxy implements IgniteEx { /** {@inheritDoc} */ @Override public void destroyCache(String cacheName) { - // TODO: CODE: implement. + throw new UnsupportedOperationException("Operation isn't supported yet."); } /** {@inheritDoc} */ @@ -388,7 +417,7 @@ public class IgniteProcessProxy implements IgniteEx { /** {@inheritDoc} */ @Override public IgniteTransactions transactions() { - throw new UnsupportedOperationException("Transactions are not supported in multi JVM mode."); + throw new UnsupportedOperationException("Transactions can't be supported automatically in multi JVM mode."); } /** {@inheritDoc} */ @@ -452,11 +481,38 @@ public class IgniteProcessProxy implements IgniteEx { /** {@inheritDoc} */ @Override public void close() throws IgniteException { + final CountDownLatch rmtNodeStoppedLatch = new CountDownLatch(1); + + locJvmGrid.events().localListen(new IgnitePredicateX<Event>() { + @Override public boolean applyx(Event e) { + if (((DiscoveryEvent)e).eventNode().id().equals(id)) { + rmtNodeStoppedLatch.countDown(); + + return false; + } + + return true; + } + }, EventType.EVT_NODE_LEFT, EventType.EVT_NODE_FAILED); + + compute().run(new IgniteRunnable() { + @Override public void run() { + igniteById().close(); + } + }); + + try { + assert U.await(rmtNodeStoppedLatch, 15, TimeUnit.SECONDS) : "NodeId=" + id; + } + catch (IgniteInterruptedCheckedException e) { + throw new IgniteException(e); + } + try { getProcess().kill(); } catch (Exception e) { - e.printStackTrace(); + X.printerr("Could not kill process after close.", e); } }