Repository: incubator-ignite Updated Branches: refs/heads/sprint-1 c2363b6f5 -> 4264bf558
#IGNITE-99: Add GridAffinityProcessor.GridCacheAffinityProxy Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/dc7fc31b Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/dc7fc31b Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/dc7fc31b Branch: refs/heads/sprint-1 Commit: dc7fc31ba67a4d7ff703f7ec500d69243f23aeaa Parents: 8795b0f Author: ivasilinets <ivasilin...@gridgain.com> Authored: Fri Jan 23 12:41:11 2015 +0300 Committer: ivasilinets <ivasilin...@gridgain.com> Committed: Fri Jan 23 12:41:11 2015 +0300 ---------------------------------------------------------------------- .../src/main/java/org/apache/ignite/Ignite.java | 13 +- .../org/gridgain/grid/kernal/GridKernal.java | 14 ++ .../affinity/GridAffinityProcessor.java | 194 +++++++++++++++++++ .../processors/cache/GridCacheAdapter.java | 1 - .../testframework/junits/GridTestIgnite.java | 6 + .../java/org/gridgain/grid/GridSpringBean.java | 6 + 6 files changed, 231 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dc7fc31b/modules/core/src/main/java/org/apache/ignite/Ignite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/Ignite.java b/modules/core/src/main/java/org/apache/ignite/Ignite.java index d1e1a55..0682684 100644 --- a/modules/core/src/main/java/org/apache/ignite/Ignite.java +++ b/modules/core/src/main/java/org/apache/ignite/Ignite.java @@ -19,11 +19,11 @@ package org.apache.ignite; import org.apache.ignite.cluster.*; import org.apache.ignite.configuration.*; -import org.apache.ignite.fs.IgniteFsConfiguration; +import org.apache.ignite.fs.*; import org.apache.ignite.plugin.*; import org.apache.ignite.product.*; -import org.gridgain.grid.*; import org.gridgain.grid.cache.*; +import org.gridgain.grid.cache.affinity.*; import org.gridgain.grid.hadoop.*; import org.apache.ignite.plugin.security.*; import org.gridgain.grid.util.typedef.*; @@ -320,4 +320,13 @@ public interface Ignite extends AutoCloseable { * @throws IgniteCheckedException If failed to stop grid. */ @Override public void close() throws IgniteCheckedException; + + /** + * Gets affinity service to provide information about data partitioning + * and distribution. + * @param cacheName Cache name. + * @param <K> Cache key type. + * @return Affinity. + */ + public <K> GridCacheAffinity<K> affinity(String cacheName); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dc7fc31b/modules/core/src/main/java/org/gridgain/grid/kernal/GridKernal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/GridKernal.java b/modules/core/src/main/java/org/gridgain/grid/kernal/GridKernal.java index d9202f3..695a19e 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/GridKernal.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/GridKernal.java @@ -33,6 +33,7 @@ import org.apache.ignite.spi.authentication.*; import org.apache.ignite.spi.authentication.noop.*; import org.gridgain.grid.*; import org.gridgain.grid.cache.*; +import org.gridgain.grid.cache.affinity.*; import org.gridgain.grid.hadoop.*; import org.gridgain.grid.kernal.managers.*; import org.gridgain.grid.kernal.managers.checkpoint.*; @@ -3228,6 +3229,19 @@ public class GridKernal extends ClusterGroupAdapter implements GridEx, IgniteMBe } /** + * @param cacheName Cache name. + * @return Cache affinity. + */ + @Override public <K> GridCacheAffinity<K> affinity(String cacheName) { + GridCacheAdapter<K, ?> cache = ctx.cache().internalCache(cacheName); + + if (cache != null) + return cache.affinity(); + + return ctx.affinity().affinityProxy(cacheName); + } + + /** * Creates optional component. * * @param cls Component interface. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dc7fc31b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/affinity/GridAffinityProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/affinity/GridAffinityProcessor.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/affinity/GridAffinityProcessor.java index c66176d..68cc5d2 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/affinity/GridAffinityProcessor.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/affinity/GridAffinityProcessor.java @@ -207,6 +207,14 @@ public class GridAffinityProcessor extends GridProcessorAdapter { /** * @param cacheName Cache name. + * @return Cache affinity. + */ + public <K> GridCacheAffinityProxy<K> affinityProxy(String cacheName) { + return new GridCacheAffinityProxy(cacheName); + } + + /** + * @param cacheName Cache name. * @return Non-null cache name. */ private String maskNull(@Nullable String cacheName) { @@ -525,4 +533,190 @@ public class GridAffinityProcessor extends GridProcessorAdapter { return res; } } + + /** + * Grid cache affinity. + */ + private class GridCacheAffinityProxy<K> implements GridCacheAffinity<K> { + private final String cacheName; + + /** + * @param cacheName Cache name. + */ + public GridCacheAffinityProxy(String cacheName) { + this.cacheName = cacheName; + } + + /** {@inheritDoc} */ + @Override public int partitions() { + try { + return GridAffinityProcessor.this.affinityCache(cacheName, topologyVersion()).affFunc.partitions(); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + } + + /** {@inheritDoc} */ + @Override public int partition(K key) { + try { + return GridAffinityProcessor.this.affinityCache(cacheName, topologyVersion()).affFunc.partition(key); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + } + + /** {@inheritDoc} */ + @Override public boolean isPrimary(ClusterNode n, K key) { + try { + return GridAffinityProcessor.this.affinityCache(cacheName, topologyVersion()) + .assignment.primaryPartitions(n.id()).contains(key); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + } + + /** {@inheritDoc} */ + @Override public boolean isBackup(ClusterNode n, K key) { + try { + return GridAffinityProcessor.this.affinityCache(cacheName, topologyVersion()) + .assignment.backupPartitions(n.id()).contains(key); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + } + + /** {@inheritDoc} */ + @Override public boolean isPrimaryOrBackup(ClusterNode n, K key) { + return isPrimary(n, key) || isBackup(n, key); + } + + /** {@inheritDoc} */ + @Override public int[] primaryPartitions(ClusterNode n) { + try { + Set<Integer> parts = GridAffinityProcessor.this.affinityCache(cacheName, topologyVersion()) + .assignment.primaryPartitions(n.id()); + + return U.toIntArray(parts); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + } + + /** {@inheritDoc} */ + @Override public int[] backupPartitions(ClusterNode n) { + try { + Set<Integer> parts = GridAffinityProcessor.this.affinityCache(cacheName, topologyVersion()) + .assignment.backupPartitions(n.id()); + + return U.toIntArray(parts); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + } + + /** {@inheritDoc} */ + @Override public int[] allPartitions(ClusterNode n) { + try { + Set<Integer> parts = GridAffinityProcessor.this.affinityCache(cacheName, topologyVersion()) + .assignment.backupPartitions(n.id()); + + parts.addAll(GridAffinityProcessor.this.affinityCache(cacheName, topologyVersion()) + .assignment.primaryPartitions(n.id())); + + return U.toIntArray(parts); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + } + + /** {@inheritDoc} */ + @Override public Object affinityKey(K key) { + try { + return GridAffinityProcessor.this.affinityCache(cacheName, topologyVersion()) + .mapper.affinityKey(key); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + } + + /** {@inheritDoc} */ + @Override public Map<ClusterNode, Collection<K>> mapKeysToNodes(@Nullable Collection<? extends K> keys) { + try { + return GridAffinityProcessor.this.mapKeysToNodes(keys); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + } + + /** {@inheritDoc} */ + @Nullable @Override public ClusterNode mapKeyToNode(K key) { + try { + return GridAffinityProcessor.this.mapKeyToNode(key); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + } + + /** {@inheritDoc} */ + @Override public Collection<ClusterNode> mapKeyToPrimaryAndBackups(K key) { + try { + return GridAffinityProcessor.this.affinityCache(cacheName, topologyVersion()) + .assignment.get(partition(key)); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + } + + /** {@inheritDoc} */ + @Override public ClusterNode mapPartitionToNode(int part) { + try { + return F.first(GridAffinityProcessor.this.affinityCache(cacheName, topologyVersion()) + .assignment.get(part)); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + } + + /** {@inheritDoc} */ + @Override public Map<Integer, ClusterNode> mapPartitionsToNodes(Collection<Integer> parts) { + Map<Integer, ClusterNode> map = new HashMap<>(); + + if (!F.isEmpty(parts)) { + for (int p : parts) + map.put(p, mapPartitionToNode(p)); + } + + return map; + } + + /** {@inheritDoc} */ + @Override public Collection<ClusterNode> mapPartitionToPrimaryAndBackups(int part) { + try { + return GridAffinityProcessor.this.affinityCache(cacheName, topologyVersion()) + .assignment.get(part); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + } + + /** + * @return Topology version. + */ + private long topologyVersion() { + return GridAffinityProcessor.this.ctx.discovery().topologyVersion(); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dc7fc31b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java index 4fdfc19..695e87f 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java @@ -22,7 +22,6 @@ import org.apache.ignite.cache.*; import org.apache.ignite.cluster.*; import org.apache.ignite.compute.*; import org.apache.ignite.configuration.*; -import org.apache.ignite.dataload.*; import org.apache.ignite.fs.*; import org.apache.ignite.lang.*; import org.apache.ignite.plugin.security.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dc7fc31b/modules/core/src/test/java/org/gridgain/testframework/junits/GridTestIgnite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/testframework/junits/GridTestIgnite.java b/modules/core/src/test/java/org/gridgain/testframework/junits/GridTestIgnite.java index d5415d6..211cb67 100644 --- a/modules/core/src/test/java/org/gridgain/testframework/junits/GridTestIgnite.java +++ b/modules/core/src/test/java/org/gridgain/testframework/junits/GridTestIgnite.java @@ -17,6 +17,7 @@ import org.apache.ignite.plugin.*; import org.apache.ignite.plugin.security.*; import org.apache.ignite.product.*; import org.gridgain.grid.cache.*; +import org.gridgain.grid.cache.affinity.*; import org.gridgain.grid.hadoop.*; import org.jetbrains.annotations.*; @@ -221,4 +222,9 @@ public class GridTestIgnite implements Ignite { /** {@inheritDoc} */ @Override public void close() throws IgniteCheckedException {} + + /** {@inheritDoc} */ + @Override public <K> GridCacheAffinity<K> affinity(String cacheName) { + return null; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/dc7fc31b/modules/spring/src/main/java/org/gridgain/grid/GridSpringBean.java ---------------------------------------------------------------------- diff --git a/modules/spring/src/main/java/org/gridgain/grid/GridSpringBean.java b/modules/spring/src/main/java/org/gridgain/grid/GridSpringBean.java index d849235..51ddcf7 100644 --- a/modules/spring/src/main/java/org/gridgain/grid/GridSpringBean.java +++ b/modules/spring/src/main/java/org/gridgain/grid/GridSpringBean.java @@ -23,6 +23,7 @@ import org.apache.ignite.configuration.*; import org.apache.ignite.plugin.*; import org.apache.ignite.product.*; import org.gridgain.grid.cache.*; +import org.gridgain.grid.cache.affinity.*; import org.gridgain.grid.hadoop.*; import org.apache.ignite.plugin.security.*; import org.gridgain.grid.util.typedef.*; @@ -325,6 +326,11 @@ public class GridSpringBean implements Ignite, DisposableBean, InitializingBean, } /** {@inheritDoc} */ + @Override public <K> GridCacheAffinity<K> affinity(String cacheName) { + return g.affinity(cacheName); + } + + /** {@inheritDoc} */ @Override public String toString() { return S.toString(GridSpringBean.class, this); }