#IGNITE-99: merge
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/52033a95 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/52033a95 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/52033a95 Branch: refs/heads/sprint-1 Commit: 52033a9523fdfe4a6e4340849526b1fc38541268 Parents: cef4e37 Author: ivasilinets <ivasilin...@gridgain.com> Authored: Fri Jan 23 17:57:54 2015 +0300 Committer: ivasilinets <ivasilin...@gridgain.com> Committed: Fri Jan 23 17:57:54 2015 +0300 ---------------------------------------------------------------------- .../src/main/java/org/apache/ignite/Ignite.java | 10 + .../org/apache/ignite/internal/GridKernal.java | 11 + .../affinity/GridAffinityProcessor.java | 203 ++++++++++++++++++- .../testframework/junits/GridTestIgnite.java | 6 + .../org/apache/ignite/IgniteSpringBean.java | 6 + 5 files changed, 235 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/52033a95/modules/core/src/main/java/org/apache/ignite/Ignite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/Ignite.java b/modules/core/src/main/java/org/apache/ignite/Ignite.java index 1565130..18c7062 100644 --- a/modules/core/src/main/java/org/apache/ignite/Ignite.java +++ b/modules/core/src/main/java/org/apache/ignite/Ignite.java @@ -18,6 +18,7 @@ package org.apache.ignite; import org.apache.ignite.cache.*; +import org.apache.ignite.cache.affinity.*; import org.apache.ignite.cluster.*; import org.apache.ignite.configuration.*; import org.apache.ignite.fs.IgniteFsConfiguration; @@ -319,4 +320,13 @@ public interface Ignite extends AutoCloseable { * @throws IgniteCheckedException If failed to stop grid. */ @Override public void close() throws IgniteCheckedException; + + /** + * Gets affinity service to provide information about data partitioning + * and distribution. + * @param cacheName Cache name. + * @param <K> Cache key type. + * @return Affinity. + */ + public <K> CacheAffinity<K> affinity(String cacheName); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/52033a95/modules/core/src/main/java/org/apache/ignite/internal/GridKernal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernal.java index 681009d..77032da 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernal.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal; import org.apache.ignite.*; import org.apache.ignite.cache.*; +import org.apache.ignite.cache.affinity.*; import org.apache.ignite.cluster.*; import org.apache.ignite.configuration.*; import org.apache.ignite.internal.processors.*; @@ -3225,6 +3226,16 @@ public class GridKernal extends ClusterGroupAdapter implements GridEx, IgniteMBe Ignition.stop(gridName, true); } + /** {@inheritDoc} */ + @Override public <K> CacheAffinity<K> affinity(String cacheName) { + GridCacheAdapter<K, ?> cache = ctx.cache().internalCache(cacheName); + + if (cache != null) + return cache.affinity(); + + return ctx.affinity().affinityProxy(cacheName); + } + /** * Creates optional component. * http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/52033a95/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java index 7eed5be..879895a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java @@ -207,6 +207,14 @@ public class GridAffinityProcessor extends GridProcessorAdapter { /** * @param cacheName Cache name. + * @return Cache affinity. + */ + public <K> GridCacheAffinityProxy<K> affinityProxy(String cacheName) { + return new GridCacheAffinityProxy(cacheName); + } + + /** + * @param cacheName Cache name. * @return Non-null cache name. */ private String maskNull(@Nullable String cacheName) { @@ -525,4 +533,197 @@ public class GridAffinityProcessor extends GridProcessorAdapter { return res; } } -} + /** + * Grid cache affinity. + */ + private class GridCacheAffinityProxy<K> implements CacheAffinity<K> { + private final String cacheName; + + /** + * @param cacheName Cache name. + */ + public GridCacheAffinityProxy(String cacheName) { + this.cacheName = cacheName; + } + + /** {@inheritDoc} */ + @Override public int partitions() { + try { + return GridAffinityProcessor.this.affinityCache(cacheName, topologyVersion()).affFunc.partitions(); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + } + + /** {@inheritDoc} */ + @Override public int partition(K key) { + try { + return GridAffinityProcessor.this.affinityCache(cacheName, topologyVersion()).affFunc.partition(key); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + } + + /** {@inheritDoc} */ + @Override public boolean isPrimary(ClusterNode n, K key) { + try { + return GridAffinityProcessor.this.affinityCache(cacheName, topologyVersion()) + .assignment.primaryPartitions(n.id()).contains(key); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + } + + /** {@inheritDoc} */ + @Override public boolean isBackup(ClusterNode n, K key) { + try { + return GridAffinityProcessor.this.affinityCache(cacheName, topologyVersion()) + .assignment.backupPartitions(n.id()).contains(key); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + } + + /** {@inheritDoc} */ + @Override public boolean isPrimaryOrBackup(ClusterNode n, K key) { + return isPrimary(n, key) || isBackup(n, key); + } + + /** {@inheritDoc} */ + @Override public int[] primaryPartitions(ClusterNode n) { + try { + Set<Integer> parts = GridAffinityProcessor.this.affinityCache(cacheName, topologyVersion()) + .assignment.primaryPartitions(n.id()); + + return U.toIntArray(parts); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + } + + /** {@inheritDoc} */ + @Override public int[] backupPartitions(ClusterNode n) { + try { + Set<Integer> parts = GridAffinityProcessor.this.affinityCache(cacheName, topologyVersion()) + .assignment.backupPartitions(n.id()); + + return U.toIntArray(parts); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + } + + /** {@inheritDoc} */ + @Override public int[] allPartitions(ClusterNode n) { + try { + Collection<Integer> parts = new HashSet<>(); + + AffinityInfo affInfo= GridAffinityProcessor.this.affinityCache(cacheName, topologyVersion()); + + for (int partsCnt = affInfo.affFunc.partitions(), part = 0; part < partsCnt; part++) { + for (ClusterNode affNode : affInfo.assignment.get(part)) { + if (n.id().equals(affNode.id())) { + parts.add(part); + + break; + } + } + } + + return U.toIntArray(parts); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + } + + /** {@inheritDoc} */ + @Override public Object affinityKey(K key) { + try { + return GridAffinityProcessor.this.affinityCache(cacheName, topologyVersion()) + .mapper.affinityKey(key); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + } + + /** {@inheritDoc} */ + @Override public Map<ClusterNode, Collection<K>> mapKeysToNodes(@Nullable Collection<? extends K> keys) { + try { + return GridAffinityProcessor.this.mapKeysToNodes(keys); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + } + + /** {@inheritDoc} */ + @Nullable @Override public ClusterNode mapKeyToNode(K key) { + try { + return GridAffinityProcessor.this.mapKeyToNode(key); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + } + + /** {@inheritDoc} */ + @Override public Collection<ClusterNode> mapKeyToPrimaryAndBackups(K key) { + try { + return GridAffinityProcessor.this.affinityCache(cacheName, topologyVersion()) + .assignment.get(partition(key)); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + } + + /** {@inheritDoc} */ + @Override public ClusterNode mapPartitionToNode(int part) { + try { + return F.first(GridAffinityProcessor.this.affinityCache(cacheName, topologyVersion()) + .assignment.get(part)); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + } + + /** {@inheritDoc} */ + @Override public Map<Integer, ClusterNode> mapPartitionsToNodes(Collection<Integer> parts) { + Map<Integer, ClusterNode> map = new HashMap<>(); + + if (!F.isEmpty(parts)) { + for (int p : parts) + map.put(p, mapPartitionToNode(p)); + } + + return map; + } + + /** {@inheritDoc} */ + @Override public Collection<ClusterNode> mapPartitionToPrimaryAndBackups(int part) { + try { + return GridAffinityProcessor.this.affinityCache(cacheName, topologyVersion()) + .assignment.get(part); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + } + + /** + * @return Topology version. + */ + private long topologyVersion() { + return GridAffinityProcessor.this.ctx.discovery().topologyVersion(); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/52033a95/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestIgnite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestIgnite.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestIgnite.java index 8f36350..bbb800c 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestIgnite.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestIgnite.java @@ -11,6 +11,7 @@ package org.apache.ignite.testframework.junits; import org.apache.ignite.*; import org.apache.ignite.cache.*; +import org.apache.ignite.cache.affinity.*; import org.apache.ignite.cluster.*; import org.apache.ignite.configuration.*; import org.apache.ignite.hadoop.*; @@ -221,4 +222,9 @@ public class GridTestIgnite implements Ignite { /** {@inheritDoc} */ @Override public void close() throws IgniteCheckedException {} + + /** {@inheritDoc} */ + @Override public <K> CacheAffinity<K> affinity(String cacheName) { + return null; + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/52033a95/modules/spring/src/main/java/org/apache/ignite/IgniteSpringBean.java ---------------------------------------------------------------------- diff --git a/modules/spring/src/main/java/org/apache/ignite/IgniteSpringBean.java b/modules/spring/src/main/java/org/apache/ignite/IgniteSpringBean.java index 94d7116..1b7f985 100644 --- a/modules/spring/src/main/java/org/apache/ignite/IgniteSpringBean.java +++ b/modules/spring/src/main/java/org/apache/ignite/IgniteSpringBean.java @@ -18,6 +18,7 @@ package org.apache.ignite; import org.apache.ignite.cache.*; +import org.apache.ignite.cache.affinity.*; import org.apache.ignite.cluster.*; import org.apache.ignite.configuration.*; import org.apache.ignite.plugin.*; @@ -324,6 +325,11 @@ public class IgniteSpringBean implements Ignite, DisposableBean, InitializingBea } /** {@inheritDoc} */ + @Override public <K> CacheAffinity<K> affinity(String cacheName) { + return g.affinity(cacheName); + } + + /** {@inheritDoc} */ @Override public String toString() { return S.toString(IgniteSpringBean.class, this); }