# ignite-648: AffinityProcessProxy and fix another process stopping
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/9cdd165b Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/9cdd165b Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/9cdd165b Branch: refs/heads/ignite-648 Commit: 9cdd165ba3d4e372f4ec17c68e9a951b892000e4 Parents: 9ed8a4e Author: ashutak <ashu...@gridgain.com> Authored: Mon Jun 22 16:52:12 2015 +0300 Committer: ashutak <ashu...@gridgain.com> Committed: Mon Jun 22 16:52:12 2015 +0300 ---------------------------------------------------------------------- .../framework/AffinityProcessProxy.java | 197 +++++++++++++++++++ .../multijvm/framework/IgniteProcessProxy.java | 9 +- .../testframework/junits/GridAbstractTest.java | 4 +- 3 files changed, 204 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9cdd165b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/multijvm/framework/AffinityProcessProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/multijvm/framework/AffinityProcessProxy.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/multijvm/framework/AffinityProcessProxy.java new file mode 100644 index 0000000..07a5a5f --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/multijvm/framework/AffinityProcessProxy.java @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.multijvm.framework; + +import org.apache.ignite.*; +import org.apache.ignite.cache.affinity.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.lang.*; +import org.jetbrains.annotations.*; + +import java.util.*; + +/** + * Proxy class for affinity at another jvm. + */ +public class AffinityProcessProxy<K> implements Affinity<K> { + /** Compute. */ + private final transient IgniteCompute compute; + + /** Cache name. */ + private final String cacheName; + + /** Grid id. */ + private final UUID gridId; + + /** + * @param cacheName Cache name. + * @param proxy Ignite ptocess proxy. + */ + public AffinityProcessProxy(String cacheName, IgniteProcessProxy proxy) { + this.cacheName = cacheName; + gridId = proxy.getId(); + + ClusterGroup grp = proxy.localJvmGrid().cluster().forNodeId(proxy.getId()); + + compute = proxy.localJvmGrid().compute(grp); + } + + /** + * Returns cache instance. Method to be called from closure at another JVM. + * + * @return Cache. + */ + private Affinity<Object> affinity() { + return Ignition.ignite(gridId).affinity(cacheName); + } + + /** {@inheritDoc} */ + @Override public int partitions() { + return (int)compute.call(new IgniteCallable<Object>() { + @Override public Object call() throws Exception { + return affinity().partitions(); + } + }); + } + + /** {@inheritDoc} */ + @Override public int partition(final K key) { + return (int)compute.call(new IgniteCallable<Object>() { + @Override public Object call() throws Exception { + return affinity().partition(key); + } + }); + } + + /** {@inheritDoc} */ + @Override public boolean isPrimary(final ClusterNode n, final K key) { + return (boolean)compute.call(new IgniteCallable<Object>() { + @Override public Object call() throws Exception { + return affinity().isPrimary(n, key); + } + }); + } + + /** {@inheritDoc} */ + @Override public boolean isBackup(final ClusterNode n, final K key) { + return (boolean)compute.call(new IgniteCallable<Object>() { + @Override public Object call() throws Exception { + return affinity().isBackup(n, key); + } + }); + } + + /** {@inheritDoc} */ + @Override public boolean isPrimaryOrBackup(final ClusterNode n, final K key) { + return (boolean)compute.call(new IgniteCallable<Object>() { + @Override public Object call() throws Exception { + return affinity().isPrimaryOrBackup(n, key); + } + }); + } + + /** {@inheritDoc} */ + @Override public int[] primaryPartitions(final ClusterNode n) { + return (int[])compute.call(new IgniteCallable<Object>() { + @Override public Object call() throws Exception { + return affinity().primaryPartitions(n); + } + }); + } + + /** {@inheritDoc} */ + @Override public int[] backupPartitions(final ClusterNode n) { + return (int[])compute.call(new IgniteCallable<Object>() { + @Override public Object call() throws Exception { + return affinity().backupPartitions(n); + } + }); + } + + /** {@inheritDoc} */ + @Override public int[] allPartitions(final ClusterNode n) { + return (int[])compute.call(new IgniteCallable<Object>() { + @Override public Object call() throws Exception { + return affinity().allPartitions(n); + } + }); + } + + /** {@inheritDoc} */ + @Override public Object affinityKey(final K key) { + return compute.call(new IgniteCallable<Object>() { + @Override public Object call() throws Exception { + return affinity().affinityKey(key); + } + }); + } + + /** {@inheritDoc} */ + @Override public Map<ClusterNode, Collection<K>> mapKeysToNodes(final Collection<? extends K> keys) { + return (Map<ClusterNode, Collection<K>>)compute.call(new IgniteCallable<Object>() { + @Override public Object call() throws Exception { + return affinity().mapKeysToNodes(keys); + } + }); + } + + /** {@inheritDoc} */ + @Nullable @Override public ClusterNode mapKeyToNode(final K key) { + return (ClusterNode)compute.call(new IgniteCallable<Object>() { + @Override public Object call() throws Exception { + return affinity().mapKeyToNode(key); + } + }); + } + + /** {@inheritDoc} */ + @Override public Collection<ClusterNode> mapKeyToPrimaryAndBackups(final K key) { + return (Collection<ClusterNode>)compute.call(new IgniteCallable<Object>() { + @Override public Object call() throws Exception { + return affinity().mapKeyToPrimaryAndBackups(key); + } + }); + } + + /** {@inheritDoc} */ + @Override public ClusterNode mapPartitionToNode(final int part) { + return (ClusterNode)compute.call(new IgniteCallable<Object>() { + @Override public Object call() throws Exception { + return affinity().mapPartitionToNode(part); + } + }); + } + + /** {@inheritDoc} */ + @Override public Map<Integer, ClusterNode> mapPartitionsToNodes(final Collection<Integer> parts) { + return (Map<Integer, ClusterNode>)compute.call(new IgniteCallable<Object>() { + @Override public Object call() throws Exception { + return affinity().mapPartitionsToNodes(parts); + } + }); + } + + /** {@inheritDoc} */ + @Override public Collection<ClusterNode> mapPartitionToPrimaryAndBackups(final int part) { + return (Collection<ClusterNode>)compute.call(new IgniteCallable<Object>() { + @Override public Object call() throws Exception { + return affinity().mapPartitionToPrimaryAndBackups(part); + } + }); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9cdd165b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/multijvm/framework/IgniteProcessProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/multijvm/framework/IgniteProcessProxy.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/multijvm/framework/IgniteProcessProxy.java index 71c6689..95ce583 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/multijvm/framework/IgniteProcessProxy.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/multijvm/framework/IgniteProcessProxy.java @@ -137,17 +137,16 @@ public class IgniteProcessProxy implements IgniteEx { /** * Kill all running processes. - * - * @throws Exception if failed. */ - public static void killAll() throws Exception { - for (IgniteProcessProxy ignite : gridProxies.values()) + public static void killAll() { + for (IgniteProcessProxy ignite : gridProxies.values()) { try { ignite.getProcess().kill(); } catch (Exception e) { U.error(ignite.log, "Killing failed.", e); } + } gridProxies.clear(); } @@ -450,7 +449,7 @@ public class IgniteProcessProxy implements IgniteEx { /** {@inheritDoc} */ @Override public <K> Affinity<K> affinity(String cacheName) { - return null; // TODO: CODE: implement. + return new AffinityProcessProxy(cacheName, this); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9cdd165b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java index 990ea76..b67f5b8 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java @@ -441,7 +441,7 @@ public abstract class GridAbstractTest extends TestCase { * @throws Exception If failed. */ protected void afterTestsStopped() throws Exception { - IgniteProcessProxy.killAll(); + // No-op. } /** {@inheritDoc} */ @@ -775,6 +775,8 @@ public abstract class GridAbstractTest extends TestCase { * @param cancel Cancel flag. */ protected void stopAllGrids(boolean cancel) { + IgniteProcessProxy.killAll(); // In multi jvm case. + Collection<Ignite> clients = new ArrayList<>(); Collection<Ignite> srvs = new ArrayList<>();