# ignite-648: cache proxy; add events proxy; delete transaction support for multiJvm; skip some tests for multi jvm
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/470e48b5 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/470e48b5 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/470e48b5 Branch: refs/heads/ignite-648 Commit: 470e48b54debab29b7fd1fc39e306876ae379e50 Parents: 2e6fd9b Author: ashutak <ashu...@gridgain.com> Authored: Mon Jun 22 20:02:33 2015 +0300 Committer: ashutak <ashu...@gridgain.com> Committed: Mon Jun 22 20:02:33 2015 +0300 ---------------------------------------------------------------------- .../cache/GridCacheAbstractFullApiSelfTest.java | 64 +++++--- .../framework/AffinityProcessProxy.java | 5 +- .../framework/IgniteCacheProcessProxy.java | 43 ++++-- .../framework/IgniteEventsProcessProxy.java | 147 +++++++++++++++++++ .../multijvm/framework/IgniteProcessProxy.java | 16 +- .../IgniteTransactionsProcessProxy.java | 78 ---------- 6 files changed, 237 insertions(+), 116 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/470e48b5/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java index 75cba54..85eb08e 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java @@ -105,7 +105,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract }; /** Dflt grid. */ - protected Ignite dfltIgnite; + protected transient Ignite dfltIgnite; /** */ private Map<String, CacheConfiguration[]> cacheCfgMap; @@ -291,15 +291,20 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract } for (int i = 0; i < gridCount(); i++) { - GridCacheContext<String, Integer> ctx = context(i); + if (!isMultiJvmAndNodeIsRemote(i)) { + GridCacheContext<String, Integer> ctx = context(i); - int sum = 0; + int sum = 0; - for (String key : map.keySet()) - if (ctx.affinity().localNode(key, new AffinityTopologyVersion(ctx.discovery().topologyVersion()))) - sum++; + for (String key : map.keySet()) + if (ctx.affinity().localNode(key, new AffinityTopologyVersion(ctx.discovery().topologyVersion()))) + sum++; - assertEquals("Incorrect key size on cache #" + i, sum, jcache(i).localSize(ALL)); + assertEquals("Incorrect key size on cache #" + i, sum, jcache(i).localSize(ALL)); + } + else { + // TODO add multi jvm support. + } } for (int i = 0; i < gridCount(); i++) { @@ -2949,14 +2954,16 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract * @throws Exception If failed. */ public void testPeekTxRemoveOptimistic() throws Exception { - checkPeekTxRemove(OPTIMISTIC); + if (!isMultiJvm()) // Transactions are not supported in multi JVM mode. + checkPeekTxRemove(OPTIMISTIC); } /** * @throws Exception If failed. */ public void testPeekTxRemovePessimistic() throws Exception { - checkPeekTxRemove(PESSIMISTIC); + if (!isMultiJvm()) // Transactions are not supported in multi JVM mode. + checkPeekTxRemove(PESSIMISTIC); } /** @@ -3098,7 +3105,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract * @throws Exception If failed. */ public void testTtlTx() throws Exception { - if (txEnabled()) + if (txEnabled() && !isMultiJvm()) checkTtl(true, false); } @@ -3106,6 +3113,9 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract * @throws Exception If failed. */ public void testTtlNoTx() throws Exception { + if (isMultiJvm()) + fail("TODO implement multi jvm support."); + checkTtl(false, false); } @@ -3113,6 +3123,9 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract * @throws Exception If failed. */ public void testTtlNoTxOldEntry() throws Exception { + if (isMultiJvm()) + fail("TODO implement multi jvm support."); + checkTtl(false, true); } @@ -3805,26 +3818,31 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract assertEquals(keys.size(), jcache().localSize(CachePeekMode.ALL)); else { for (int i = 0; i < gridCount(); i++) { - GridCacheContext<String, Integer> ctx = context(i); + if (!isMultiJvmAndNodeIsRemote(i)) { + GridCacheContext<String, Integer> ctx = context(i); - if (ctx.cache().configuration().getMemoryMode() == OFFHEAP_TIERED) - continue; + if (ctx.cache().configuration().getMemoryMode() == OFFHEAP_TIERED) + continue; - int size = 0; + int size = 0; - for (String key : keys) { - if (ctx.affinity().localNode(key, ctx.discovery().topologyVersionEx())) { - GridCacheEntryEx e = - ctx.isNear() ? ctx.near().dht().peekEx(key) : ctx.cache().peekEx(key); + for (String key : keys) { + if (ctx.affinity().localNode(key, ctx.discovery().topologyVersionEx())) { + GridCacheEntryEx e = + ctx.isNear() ? ctx.near().dht().peekEx(key) : ctx.cache().peekEx(key); - assert e != null : "Entry is null [idx=" + i + ", key=" + key + ", ctx=" + ctx + ']'; - assert !e.deleted() : "Entry is deleted: " + e; + assert e != null : "Entry is null [idx=" + i + ", key=" + key + ", ctx=" + ctx + ']'; + assert !e.deleted() : "Entry is deleted: " + e; - size++; + size++; + } } - } - assertEquals("Incorrect size on cache #" + i, size, jcache(i).localSize(ALL)); + assertEquals("Incorrect size on cache #" + i, size, jcache(i).localSize(ALL)); + } + else { + // TODO add multi jvm support. + } } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/470e48b5/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/multijvm/framework/AffinityProcessProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/multijvm/framework/AffinityProcessProxy.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/multijvm/framework/AffinityProcessProxy.java index 07a5a5f..7168534 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/multijvm/framework/AffinityProcessProxy.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/multijvm/framework/AffinityProcessProxy.java @@ -45,10 +45,7 @@ public class AffinityProcessProxy<K> implements Affinity<K> { public AffinityProcessProxy(String cacheName, IgniteProcessProxy proxy) { this.cacheName = cacheName; gridId = proxy.getId(); - - ClusterGroup grp = proxy.localJvmGrid().cluster().forNodeId(proxy.getId()); - - compute = proxy.localJvmGrid().compute(grp); + compute = proxy.remoteCompute(); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/470e48b5/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/multijvm/framework/IgniteCacheProcessProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/multijvm/framework/IgniteCacheProcessProxy.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/multijvm/framework/IgniteCacheProcessProxy.java index 61cbdc3..f8cd8a3 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/multijvm/framework/IgniteCacheProcessProxy.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/multijvm/framework/IgniteCacheProcessProxy.java @@ -71,10 +71,7 @@ public class IgniteCacheProcessProxy<K, V> implements IgniteCache<K, V> { isAsync = async; gridId = proxy.getId(); igniteProxy = proxy; - - ClusterGroup grp = proxy.localJvmGrid().cluster().forNodeId(proxy.getId()); - - compute = proxy.localJvmGrid().compute(grp); + compute = proxy.remoteCompute(); } /** {@inheritDoc} */ @@ -89,6 +86,15 @@ public class IgniteCacheProcessProxy<K, V> implements IgniteCache<K, V> { /** {@inheritDoc} */ @Override public <R> IgniteFuture<R> future() { + // TODO implement. +// R futureRes = (R)compute.call(new IgniteCallable<Object>() { +// @Override public Object call() throws Exception { +// return cache().future().get(); +// } +// }); +// +// return new IgniteFinishedFutureImpl<R>(futureRes); + throw new UnsupportedOperationException("Method should be supported."); } @@ -124,8 +130,14 @@ public class IgniteCacheProcessProxy<K, V> implements IgniteCache<K, V> { } /** {@inheritDoc} */ - @Override public void localLoadCache(@Nullable IgniteBiPredicate<K, V> p, @Nullable Object... args) throws CacheException { - throw new UnsupportedOperationException("Method should be supported."); + @Override public void localLoadCache(@Nullable final IgniteBiPredicate<K, V> p, @Nullable final Object... args) throws CacheException { + final IgniteBiPredicate pCopy = p; + + compute.run(new IgniteRunnable() { + @Override public void run() { + cache().localLoadCache(pCopy, args); + } + }); } /** {@inheritDoc} */ @@ -178,8 +190,12 @@ public class IgniteCacheProcessProxy<K, V> implements IgniteCache<K, V> { } /** {@inheritDoc} */ - @Override public void localEvict(Collection<? extends K> keys) { - throw new UnsupportedOperationException("Method should be supported."); + @Override public void localEvict(final Collection<? extends K> keys) { + compute.run(new IgniteRunnable() { + @Override public void run() { + cache().localEvict(keys); + } + }); } /** {@inheritDoc} */ @@ -501,7 +517,16 @@ public class IgniteCacheProcessProxy<K, V> implements IgniteCache<K, V> { /** {@inheritDoc} */ @Override public <T> T unwrap(final Class<T> clazz) { - throw new UnsupportedOperationException("Method cannot be supported because T can be unmarshallable."); + try { + return (T)compute.call(new IgniteCallable<Object>() { + @Override public Object call() throws Exception { + return cache().unwrap(clazz); + } + }); + } + catch (Exception e) { + throw new IllegalArgumentException("Looks like class " + clazz + " is unmarshallable. Exception type:" + e.getClass(), e); + } } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/470e48b5/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/multijvm/framework/IgniteEventsProcessProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/multijvm/framework/IgniteEventsProcessProxy.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/multijvm/framework/IgniteEventsProcessProxy.java new file mode 100644 index 0000000..b9a260b --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/multijvm/framework/IgniteEventsProcessProxy.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.multijvm.framework; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.events.*; +import org.apache.ignite.lang.*; +import org.jetbrains.annotations.*; + +import java.util.*; + +/** + * Ignite events proxy for ignite instance at another JVM. + */ +public class IgniteEventsProcessProxy implements IgniteEvents { + /** Ignite proxy. */ + private final transient IgniteProcessProxy igniteProxy; + + /** Grid id. */ + private final UUID gridId; + + /** + * @param igniteProxy Ignite proxy. + */ + public IgniteEventsProcessProxy(IgniteProcessProxy igniteProxy) { + this.igniteProxy = igniteProxy; + + gridId = igniteProxy.getId(); + } + + /** + * @return Events instance. + */ + private IgniteEvents events() { + return Ignition.ignite(gridId).events(); + } + + /** {@inheritDoc} */ + @Override public ClusterGroup clusterGroup() { + return null; // TODO: CODE: implement. + } + + /** {@inheritDoc} */ + @Override public <T extends Event> List<T> remoteQuery(IgnitePredicate<T> p, long timeout, + @Nullable int... types) throws IgniteException { + return null; // TODO: CODE: implement. + } + + /** {@inheritDoc} */ + @Override public <T extends Event> UUID remoteListen(@Nullable IgniteBiPredicate<UUID, T> locLsnr, + @Nullable IgnitePredicate<T> rmtFilter, @Nullable int... types) throws IgniteException { + return null; // TODO: CODE: implement. + } + + /** {@inheritDoc} */ + @Override public <T extends Event> UUID remoteListen(int bufSize, long interval, boolean autoUnsubscribe, + @Nullable IgniteBiPredicate<UUID, T> locLsnr, @Nullable IgnitePredicate<T> rmtFilter, + @Nullable int... types) throws IgniteException { + return null; // TODO: CODE: implement. + } + + /** {@inheritDoc} */ + @Override public void stopRemoteListen(UUID opId) throws IgniteException { + // TODO: CODE: implement. + } + + /** {@inheritDoc} */ + @Override public <T extends Event> T waitForLocal(@Nullable IgnitePredicate<T> filter, + @Nullable int... types) throws IgniteException { + return null; // TODO: CODE: implement. + } + + /** {@inheritDoc} */ + @Override public <T extends Event> Collection<T> localQuery(IgnitePredicate<T> p, @Nullable int... types) { + return null; // TODO: CODE: implement. + } + + /** {@inheritDoc} */ + @Override public void recordLocal(Event evt) { + // TODO: CODE: implement. + } + + /** {@inheritDoc} */ + @Override public void localListen(final IgnitePredicate<? extends Event> lsnr, final int... types) { + igniteProxy.remoteCompute().run(new IgniteRunnable() { + @Override public void run() { + events().localListen(lsnr, types); + } + }); + } + + /** {@inheritDoc} */ + @Override public boolean stopLocalListen(IgnitePredicate<? extends Event> lsnr, @Nullable int... types) { + return false; // TODO: CODE: implement. + } + + /** {@inheritDoc} */ + @Override public void enableLocal(int... types) { + // TODO: CODE: implement. + } + + /** {@inheritDoc} */ + @Override public void disableLocal(int... types) { + // TODO: CODE: implement. + } + + /** {@inheritDoc} */ + @Override public int[] enabledEvents() { + return new int[0]; // TODO: CODE: implement. + } + + /** {@inheritDoc} */ + @Override public boolean isEnabled(int type) { + return false; // TODO: CODE: implement. + } + + /** {@inheritDoc} */ + @Override public IgniteEvents withAsync() { + return null; // TODO: CODE: implement. + } + + /** {@inheritDoc} */ + @Override public boolean isAsync() { + return false; // TODO: CODE: implement. + } + + /** {@inheritDoc} */ + @Override public <R> IgniteFuture<R> future() { + return null; // TODO: CODE: implement. + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/470e48b5/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/multijvm/framework/IgniteProcessProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/multijvm/framework/IgniteProcessProxy.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/multijvm/framework/IgniteProcessProxy.java index 1d11dc3..e534478 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/multijvm/framework/IgniteProcessProxy.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/multijvm/framework/IgniteProcessProxy.java @@ -277,7 +277,7 @@ public class IgniteProcessProxy implements IgniteEx { /** {@inheritDoc} */ @Override public IgniteEvents events() { - return null; // TODO: CODE: implement. + return new IgniteEventsProcessProxy(this); } /** {@inheritDoc} */ @@ -375,7 +375,7 @@ public class IgniteProcessProxy implements IgniteEx { /** {@inheritDoc} */ @Override public IgniteTransactions transactions() { - return new IgniteTransactionsProcessProxy(this); + throw new UnsupportedOperationException("Transactions are not supported in multi JVM mode."); } /** {@inheritDoc} */ @@ -459,6 +459,18 @@ public class IgniteProcessProxy implements IgniteEx { return proc; } + /** + * @return {@link IgniteCompute} instance to communicate with remote node. + */ + public IgniteCompute remoteCompute() { + ClusterGroup grp = localJvmGrid().cluster().forNodeId(id); + + if (grp.nodes().isEmpty()) + throw new IllegalStateException("Could not found node with id=" + id + "."); + + return locJvmGrid.compute(grp); + } + // TODO delete or use. // public <K, V> GridCacheAdapter<K, V> remoteInternalCache() { // return (GridCacheAdapter<K, V>)compute.call(new MyCallable(id)); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/470e48b5/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/multijvm/framework/IgniteTransactionsProcessProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/multijvm/framework/IgniteTransactionsProcessProxy.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/multijvm/framework/IgniteTransactionsProcessProxy.java deleted file mode 100644 index 6464838..0000000 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/multijvm/framework/IgniteTransactionsProcessProxy.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache.multijvm.framework; - -import org.apache.ignite.*; -import org.apache.ignite.cluster.*; -import org.apache.ignite.transactions.*; - -import java.util.*; - -/** - * Ignite transactions proxy for ignite instance at another JVM. - */ -public class IgniteTransactionsProcessProxy implements IgniteTransactions { - /** Compute. */ - private final transient IgniteCompute compute; - - /** Grid id. */ - private final UUID gridId; - - /** - * @param proxy Ignite process proxy. - */ - public IgniteTransactionsProcessProxy(IgniteProcessProxy proxy) { - gridId = proxy.getId(); - - ClusterGroup grp = proxy.localJvmGrid().cluster().forNodeId(proxy.getId()); - - compute = proxy.localJvmGrid().compute(grp); - } - - /** {@inheritDoc} */ - @Override public Transaction txStart() throws IllegalStateException { - return null; // TODO: CODE: implement. - } - - /** {@inheritDoc} */ - @Override public Transaction txStart(TransactionConcurrency concurrency, TransactionIsolation isolation) { - return null; // TODO: CODE: implement. - } - - /** {@inheritDoc} */ - @Override - public Transaction txStart(TransactionConcurrency concurrency, TransactionIsolation isolation, long timeout, - int txSize) { - return null; // TODO: CODE: implement. - } - - /** {@inheritDoc} */ - @Override public Transaction tx() { - return null; // TODO: CODE: implement. - } - - /** {@inheritDoc} */ - @Override public TransactionMetrics metrics() { - return null; // TODO: CODE: implement. - } - - /** {@inheritDoc} */ - @Override public void resetMetrics() { - // TODO: CODE: implement. - } -}