#IGNITE-99: merge

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/52033a95
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/52033a95
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/52033a95

Branch: refs/heads/sprint-1
Commit: 52033a9523fdfe4a6e4340849526b1fc38541268
Parents: cef4e37
Author: ivasilinets <ivasilin...@gridgain.com>
Authored: Fri Jan 23 17:57:54 2015 +0300
Committer: ivasilinets <ivasilin...@gridgain.com>
Committed: Fri Jan 23 17:57:54 2015 +0300

----------------------------------------------------------------------
 .../src/main/java/org/apache/ignite/Ignite.java |  10 +
 .../org/apache/ignite/internal/GridKernal.java  |  11 +
 .../affinity/GridAffinityProcessor.java         | 203 ++++++++++++++++++-
 .../testframework/junits/GridTestIgnite.java    |   6 +
 .../org/apache/ignite/IgniteSpringBean.java     |   6 +
 5 files changed, 235 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/52033a95/modules/core/src/main/java/org/apache/ignite/Ignite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/Ignite.java 
b/modules/core/src/main/java/org/apache/ignite/Ignite.java
index 1565130..18c7062 100644
--- a/modules/core/src/main/java/org/apache/ignite/Ignite.java
+++ b/modules/core/src/main/java/org/apache/ignite/Ignite.java
@@ -18,6 +18,7 @@
 package org.apache.ignite;
 
 import org.apache.ignite.cache.*;
+import org.apache.ignite.cache.affinity.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.configuration.*;
 import org.apache.ignite.fs.IgniteFsConfiguration;
@@ -319,4 +320,13 @@ public interface Ignite extends AutoCloseable {
      * @throws IgniteCheckedException If failed to stop grid.
      */
     @Override public void close() throws IgniteCheckedException;
+
+    /**
+     * Gets affinity service to provide information about data partitioning
+     * and distribution.
+     * @param cacheName Cache name.
+     * @param <K> Cache key type.
+     * @return Affinity.
+     */
+    public <K> CacheAffinity<K> affinity(String cacheName);
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/52033a95/modules/core/src/main/java/org/apache/ignite/internal/GridKernal.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/GridKernal.java 
b/modules/core/src/main/java/org/apache/ignite/internal/GridKernal.java
index 681009d..77032da 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernal.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal;
 
 import org.apache.ignite.*;
 import org.apache.ignite.cache.*;
+import org.apache.ignite.cache.affinity.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.configuration.*;
 import org.apache.ignite.internal.processors.*;
@@ -3225,6 +3226,16 @@ public class GridKernal extends ClusterGroupAdapter 
implements GridEx, IgniteMBe
         Ignition.stop(gridName, true);
     }
 
+    /** {@inheritDoc} */
+    @Override public <K> CacheAffinity<K> affinity(String cacheName) {
+        GridCacheAdapter<K, ?> cache = ctx.cache().internalCache(cacheName);
+
+        if (cache != null)
+            return cache.affinity();
+
+        return ctx.affinity().affinityProxy(cacheName);
+    }
+
     /**
      * Creates optional component.
      *

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/52033a95/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
index 7eed5be..879895a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityProcessor.java
@@ -207,6 +207,14 @@ public class GridAffinityProcessor extends 
GridProcessorAdapter {
 
     /**
      * @param cacheName Cache name.
+     * @return Cache affinity.
+     */
+    public <K> GridCacheAffinityProxy<K> affinityProxy(String cacheName) {
+        return new GridCacheAffinityProxy(cacheName);
+    }
+
+    /**
+     * @param cacheName Cache name.
      * @return Non-null cache name.
      */
     private String maskNull(@Nullable String cacheName) {
@@ -525,4 +533,197 @@ public class GridAffinityProcessor extends 
GridProcessorAdapter {
             return res;
         }
     }
-}
+    /**
+     * Grid cache affinity.
+     */
+    private class GridCacheAffinityProxy<K> implements CacheAffinity<K> {
+        private final String cacheName;
+
+        /**
+         * @param cacheName Cache name.
+         */
+        public GridCacheAffinityProxy(String cacheName) {
+            this.cacheName = cacheName;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int partitions() {
+            try {
+                return GridAffinityProcessor.this.affinityCache(cacheName, 
topologyVersion()).affFunc.partitions();
+            }
+            catch (IgniteCheckedException e) {
+                throw new IgniteException(e);
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public int partition(K key) {
+            try {
+                return GridAffinityProcessor.this.affinityCache(cacheName, 
topologyVersion()).affFunc.partition(key);
+            }
+            catch (IgniteCheckedException e) {
+                throw new IgniteException(e);
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean isPrimary(ClusterNode n, K key) {
+            try {
+                return GridAffinityProcessor.this.affinityCache(cacheName, 
topologyVersion())
+                    .assignment.primaryPartitions(n.id()).contains(key);
+            }
+            catch (IgniteCheckedException e) {
+                throw new IgniteException(e);
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean isBackup(ClusterNode n, K key) {
+            try {
+                return GridAffinityProcessor.this.affinityCache(cacheName, 
topologyVersion())
+                    .assignment.backupPartitions(n.id()).contains(key);
+            }
+            catch (IgniteCheckedException e) {
+                throw new IgniteException(e);
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean isPrimaryOrBackup(ClusterNode n, K key) {
+            return isPrimary(n, key) || isBackup(n, key);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int[] primaryPartitions(ClusterNode n) {
+            try {
+                Set<Integer> parts = 
GridAffinityProcessor.this.affinityCache(cacheName, topologyVersion())
+                    .assignment.primaryPartitions(n.id());
+
+                return U.toIntArray(parts);
+            }
+            catch (IgniteCheckedException e) {
+                throw new IgniteException(e);
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public int[] backupPartitions(ClusterNode n) {
+            try {
+                Set<Integer> parts = 
GridAffinityProcessor.this.affinityCache(cacheName, topologyVersion())
+                    .assignment.backupPartitions(n.id());
+
+                return U.toIntArray(parts);
+            }
+            catch (IgniteCheckedException e) {
+                throw new IgniteException(e);
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public int[] allPartitions(ClusterNode n) {
+            try {
+                Collection<Integer> parts = new HashSet<>();
+
+                AffinityInfo affInfo= 
GridAffinityProcessor.this.affinityCache(cacheName, topologyVersion());
+
+                for (int partsCnt = affInfo.affFunc.partitions(), part = 0; 
part < partsCnt; part++) {
+                    for (ClusterNode affNode : affInfo.assignment.get(part)) {
+                        if (n.id().equals(affNode.id())) {
+                            parts.add(part);
+
+                            break;
+                        }
+                    }
+                }
+
+                return U.toIntArray(parts);
+            }
+            catch (IgniteCheckedException e) {
+                throw new IgniteException(e);
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public Object affinityKey(K key) {
+            try {
+                return GridAffinityProcessor.this.affinityCache(cacheName, 
topologyVersion())
+                    .mapper.affinityKey(key);
+            }
+            catch (IgniteCheckedException e) {
+                throw new IgniteException(e);
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public Map<ClusterNode, Collection<K>> 
mapKeysToNodes(@Nullable Collection<? extends K> keys) {
+            try {
+                return GridAffinityProcessor.this.mapKeysToNodes(keys);
+            }
+            catch (IgniteCheckedException e) {
+                throw new IgniteException(e);
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Nullable @Override public ClusterNode mapKeyToNode(K key) {
+            try {
+                return GridAffinityProcessor.this.mapKeyToNode(key);
+            }
+            catch (IgniteCheckedException e) {
+                throw new IgniteException(e);
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public Collection<ClusterNode> mapKeyToPrimaryAndBackups(K 
key) {
+            try {
+                return GridAffinityProcessor.this.affinityCache(cacheName, 
topologyVersion())
+                    .assignment.get(partition(key));
+            }
+            catch (IgniteCheckedException e) {
+                throw new IgniteException(e);
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public ClusterNode mapPartitionToNode(int part) {
+            try {
+                return 
F.first(GridAffinityProcessor.this.affinityCache(cacheName, topologyVersion())
+                    .assignment.get(part));
+            }
+            catch (IgniteCheckedException e) {
+                throw new IgniteException(e);
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public Map<Integer, ClusterNode> 
mapPartitionsToNodes(Collection<Integer> parts) {
+            Map<Integer, ClusterNode> map = new HashMap<>();
+
+            if (!F.isEmpty(parts)) {
+                for (int p : parts)
+                    map.put(p, mapPartitionToNode(p));
+            }
+
+            return map;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Collection<ClusterNode> 
mapPartitionToPrimaryAndBackups(int part) {
+            try {
+                return GridAffinityProcessor.this.affinityCache(cacheName, 
topologyVersion())
+                    .assignment.get(part);
+            }
+            catch (IgniteCheckedException e) {
+                throw new IgniteException(e);
+            }
+        }
+
+        /**
+         * @return Topology version.
+         */
+        private long topologyVersion() {
+            return 
GridAffinityProcessor.this.ctx.discovery().topologyVersion();
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/52033a95/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestIgnite.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestIgnite.java
 
b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestIgnite.java
index 8f36350..bbb800c 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestIgnite.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestIgnite.java
@@ -11,6 +11,7 @@ package org.apache.ignite.testframework.junits;
 
 import org.apache.ignite.*;
 import org.apache.ignite.cache.*;
+import org.apache.ignite.cache.affinity.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.configuration.*;
 import org.apache.ignite.hadoop.*;
@@ -221,4 +222,9 @@ public class GridTestIgnite implements Ignite {
 
     /** {@inheritDoc} */
     @Override public void close() throws IgniteCheckedException {}
+
+    /** {@inheritDoc} */
+    @Override public <K> CacheAffinity<K> affinity(String cacheName) {
+        return null;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/52033a95/modules/spring/src/main/java/org/apache/ignite/IgniteSpringBean.java
----------------------------------------------------------------------
diff --git 
a/modules/spring/src/main/java/org/apache/ignite/IgniteSpringBean.java 
b/modules/spring/src/main/java/org/apache/ignite/IgniteSpringBean.java
index 94d7116..1b7f985 100644
--- a/modules/spring/src/main/java/org/apache/ignite/IgniteSpringBean.java
+++ b/modules/spring/src/main/java/org/apache/ignite/IgniteSpringBean.java
@@ -18,6 +18,7 @@
 package org.apache.ignite;
 
 import org.apache.ignite.cache.*;
+import org.apache.ignite.cache.affinity.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.configuration.*;
 import org.apache.ignite.plugin.*;
@@ -324,6 +325,11 @@ public class IgniteSpringBean implements Ignite, 
DisposableBean, InitializingBea
     }
 
     /** {@inheritDoc} */
+    @Override public <K> CacheAffinity<K> affinity(String cacheName) {
+        return g.affinity(cacheName);
+    }
+
+    /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(IgniteSpringBean.class, this);
     }

Reply via email to