http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25609651/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheInterceptorAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheInterceptorAbstractSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheInterceptorAbstractSelfTest.java index 682120a..469f454 100644 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheInterceptorAbstractSelfTest.java +++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheInterceptorAbstractSelfTest.java @@ -22,7 +22,6 @@ import org.apache.ignite.configuration.*; import org.apache.ignite.lang.*; import org.apache.ignite.transactions.*; import org.gridgain.grid.cache.*; -import org.gridgain.grid.cache.affinity.*; import org.gridgain.grid.util.typedef.*; import org.jdk8.backport.*; import org.jetbrains.annotations.*; @@ -1380,7 +1379,7 @@ public abstract class GridCacheInterceptorAbstractSelfTest extends GridCacheAbst private List<String> primaryKeys(int idx, int cnt) { assert cnt > 0; - GridCacheAffinity aff = cache(0).affinity(); + IgniteCacheAffinity aff = affinity(0); List<String> keys = new ArrayList<>(cnt); @@ -1405,7 +1404,7 @@ public abstract class GridCacheInterceptorAbstractSelfTest extends GridCacheAbst * @return Primary key for grid. */ private String backupKey(int idx) { - GridCacheAffinity aff = cache(0).affinity(); + IgniteCacheAffinity aff = affinity(0); String key = null; @@ -1427,7 +1426,7 @@ public abstract class GridCacheInterceptorAbstractSelfTest extends GridCacheAbst * @return Key which does not belong to the grid. */ private String nearKey(int idx) { - GridCacheAffinity aff = cache(0).affinity(); + IgniteCacheAffinity aff = affinity(0); String key = null;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25609651/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheLeakTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheLeakTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheLeakTest.java index 5b6a0d9..255fa7e 100644 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheLeakTest.java +++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheLeakTest.java @@ -19,7 +19,7 @@ package org.gridgain.grid.kernal.processors.cache; import org.apache.ignite.configuration.*; import org.gridgain.grid.cache.*; -import org.gridgain.grid.cache.affinity.rendezvous.*; +import org.apache.ignite.cache.affinity.rendezvous.*; import org.gridgain.grid.kernal.*; import org.apache.ignite.spi.discovery.tcp.*; import org.apache.ignite.spi.discovery.tcp.ipfinder.*; @@ -71,7 +71,7 @@ public class GridCacheLeakTest extends GridCommonAbstractTest { cfg.setName(CACHE_NAME); - cfg.setAffinity(new GridCacheRendezvousAffinityFunction(false, 128)); + cfg.setAffinity(new CacheRendezvousAffinityFunction(false, 128)); cfg.setCacheMode(PARTITIONED); cfg.setBackups(1); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25609651/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheLifecycleAwareSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheLifecycleAwareSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheLifecycleAwareSelfTest.java index 0d11f58..4589dd1 100644 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheLifecycleAwareSelfTest.java +++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheLifecycleAwareSelfTest.java @@ -25,7 +25,7 @@ import org.apache.ignite.lifecycle.LifecycleAware; import org.apache.ignite.spi.discovery.tcp.*; import org.apache.ignite.transactions.*; import org.gridgain.grid.cache.*; -import org.gridgain.grid.cache.affinity.*; +import org.apache.ignite.cache.affinity.*; import org.gridgain.grid.cache.cloner.*; import org.gridgain.grid.cache.eviction.*; import org.gridgain.grid.cache.store.*; @@ -105,7 +105,7 @@ public class GridCacheLifecycleAwareSelfTest extends GridAbstractLifecycleAwareS /** */ - private static class TestAffinityFunction extends TestLifecycleAware implements GridCacheAffinityFunction { + private static class TestAffinityFunction extends TestLifecycleAware implements CacheAffinityFunction { /** */ TestAffinityFunction() { @@ -128,7 +128,7 @@ public class GridCacheLifecycleAwareSelfTest extends GridAbstractLifecycleAwareS } /** {@inheritDoc} */ - @Override public List<List<ClusterNode>> assignPartitions(GridCacheAffinityFunctionContext affCtx) { + @Override public List<List<ClusterNode>> assignPartitions(CacheAffinityFunctionContext affCtx) { List<List<ClusterNode>> res = new ArrayList<>(); res.add(nodes(0, affCtx.currentTopologySnapshot())); @@ -194,7 +194,7 @@ public class GridCacheLifecycleAwareSelfTest extends GridAbstractLifecycleAwareS /** */ - private static class TestAffinityKeyMapper extends TestLifecycleAware implements GridCacheAffinityKeyMapper { + private static class TestAffinityKeyMapper extends TestLifecycleAware implements CacheAffinityKeyMapper { /** */ TestAffinityKeyMapper() { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25609651/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheOffHeapMultiThreadedUpdateAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheOffHeapMultiThreadedUpdateAbstractSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheOffHeapMultiThreadedUpdateAbstractSelfTest.java index f6a8bc0..27f7d0f 100644 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheOffHeapMultiThreadedUpdateAbstractSelfTest.java +++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheOffHeapMultiThreadedUpdateAbstractSelfTest.java @@ -330,7 +330,7 @@ public abstract class GridCacheOffHeapMultiThreadedUpdateAbstractSelfTest extend GridCache<Integer, Integer> cache = grid(0).cache(null); for (int i = 0; i < 10_000; i++) { - if (cache.affinity().isPrimary(grid(idx).localNode(), i)) { + if (grid(0).affinity(cache.name()).isPrimary(grid(idx).localNode(), i)) { key0 = i; break; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25609651/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheOffheapUpdateSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheOffheapUpdateSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheOffheapUpdateSelfTest.java index 73eb85e..4941d67 100644 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheOffheapUpdateSelfTest.java +++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheOffheapUpdateSelfTest.java @@ -61,7 +61,7 @@ public class GridCacheOffheapUpdateSelfTest extends GridCommonAbstractTest { int key = 0; - while (!rmtCache.affinity().isPrimary(grid(1).localNode(), key)) + while (!ignite.affinity(rmtCache.name()).isPrimary(grid(1).localNode(), key)) key++; GridCache<Object, Object> locCache = grid(1).cache(null); @@ -102,6 +102,8 @@ public class GridCacheOffheapUpdateSelfTest extends GridCommonAbstractTest { GridCache<Object, Object> cache = grid.cache(null); + IgniteCacheAffinity<Object> affinity = grid.affinity(cache.name()); + for (int i = 0; i < 30; i++) cache.put(i, 0); @@ -118,7 +120,7 @@ public class GridCacheOffheapUpdateSelfTest extends GridCommonAbstractTest { ClusterNode locNode = grid.cluster().localNode(); for (;key < 30; key++) { - if (!cache.affinity().isPrimary(locNode, key) && !cache.affinity().isBackup(locNode, key)) + if (!affinity.isPrimary(locNode, key) && !affinity.isBackup(locNode, key)) break; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25609651/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheP2PUndeploySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheP2PUndeploySelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheP2PUndeploySelfTest.java index b08b6df..23a9ed2 100644 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheP2PUndeploySelfTest.java +++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheP2PUndeploySelfTest.java @@ -20,7 +20,6 @@ package org.gridgain.grid.kernal.processors.cache; import org.apache.ignite.*; import org.apache.ignite.configuration.*; import org.apache.ignite.marshaller.jdk.*; -import org.gridgain.grid.*; import org.gridgain.grid.cache.*; import org.gridgain.grid.kernal.*; import org.gridgain.grid.kernal.processors.cache.distributed.*; @@ -97,7 +96,7 @@ public class GridCacheP2PUndeploySelfTest extends GridCommonAbstractTest { partCacheCfg.setName("partitioned"); partCacheCfg.setCacheMode(PARTITIONED); partCacheCfg.setPreloadMode(mode); - partCacheCfg.setAffinity(new GridCacheModuloAffinityFunction(11, 1)); + partCacheCfg.setAffinity(new CacheModuloAffinityFunction(11, 1)); partCacheCfg.setWriteSynchronizationMode(GridCacheWriteSynchronizationMode.FULL_SYNC); partCacheCfg.setEvictNearSynchronized(false); partCacheCfg.setQueryIndexEnabled(false); @@ -114,7 +113,7 @@ public class GridCacheP2PUndeploySelfTest extends GridCommonAbstractTest { cfg.setDeploymentMode(SHARED); cfg.setPeerClassLoadingLocalClassPathExclude(GridCacheP2PUndeploySelfTest.class.getName()); - cfg.setUserAttributes(F.asMap(GridCacheModuloAffinityFunction.IDX_ATTR, idxGen.getAndIncrement())); + cfg.setUserAttributes(F.asMap(CacheModuloAffinityFunction.IDX_ATTR, idxGen.getAndIncrement())); return cfg; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25609651/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCachePartitionedAffinitySpreadTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCachePartitionedAffinitySpreadTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCachePartitionedAffinitySpreadTest.java index 64f6841..5fa89b2 100644 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCachePartitionedAffinitySpreadTest.java +++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCachePartitionedAffinitySpreadTest.java @@ -18,14 +18,14 @@ package org.gridgain.grid.kernal.processors.cache; import org.apache.ignite.cluster.*; -import org.gridgain.grid.cache.affinity.consistenthash.*; +import org.apache.ignite.cache.affinity.consistenthash.*; import org.gridgain.grid.util.typedef.*; import org.gridgain.testframework.*; import org.gridgain.testframework.junits.common.*; import java.util.*; -import static org.gridgain.grid.cache.affinity.consistenthash.GridCacheConsistentHashAffinityFunction.*; +import static org.apache.ignite.cache.affinity.consistenthash.CacheConsistentHashAffinityFunction.*; /** * @@ -44,7 +44,7 @@ public class GridCachePartitionedAffinitySpreadTest extends GridCommonAbstractTe for (int replicas = 128; replicas <= 4096; replicas*=2) { Collection<ClusterNode> nodes = createNodes(i, replicas); - GridCacheConsistentHashAffinityFunction aff = new GridCacheConsistentHashAffinityFunction(false, 10000); + CacheConsistentHashAffinityFunction aff = new CacheConsistentHashAffinityFunction(false, 10000); checkDistribution(aff, nodes); } @@ -71,7 +71,7 @@ public class GridCachePartitionedAffinitySpreadTest extends GridCommonAbstractTe * @param aff Affinity to check. * @param nodes Collection of nodes to test on. */ - private void checkDistribution(GridCacheConsistentHashAffinityFunction aff, Collection<ClusterNode> nodes) { + private void checkDistribution(CacheConsistentHashAffinityFunction aff, Collection<ClusterNode> nodes) { Map<ClusterNode, Integer> parts = new HashMap<>(nodes.size()); for (int part = 0; part < aff.getPartitions(); part++) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25609651/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCachePreloadingEvictionsSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCachePreloadingEvictionsSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCachePreloadingEvictionsSelfTest.java index 469360e..29a315d 100644 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCachePreloadingEvictionsSelfTest.java +++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCachePreloadingEvictionsSelfTest.java @@ -70,7 +70,7 @@ public class GridCachePreloadingEvictionsSelfTest extends GridCommonAbstractTest GridCacheConfiguration partCacheCfg = defaultCacheConfiguration(); partCacheCfg.setCacheMode(PARTITIONED); - partCacheCfg.setAffinity(new GridCacheModuloAffinityFunction(1, 1)); + partCacheCfg.setAffinity(new CacheModuloAffinityFunction(1, 1)); partCacheCfg.setWriteSynchronizationMode(FULL_SYNC); partCacheCfg.setDistributionMode(PARTITIONED_ONLY); partCacheCfg.setEvictSynchronized(true); @@ -86,7 +86,7 @@ public class GridCachePreloadingEvictionsSelfTest extends GridCommonAbstractTest cfg.setCacheConfiguration(partCacheCfg); - cfg.setUserAttributes(F.asMap(GridCacheModuloAffinityFunction.IDX_ATTR, idxGen.getAndIncrement())); + cfg.setUserAttributes(F.asMap(CacheModuloAffinityFunction.IDX_ATTR, idxGen.getAndIncrement())); cfg.setNetworkTimeout(60000); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25609651/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheQueryInternalKeysSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheQueryInternalKeysSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheQueryInternalKeysSelfTest.java index 5f658f6..99b8e40 100644 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheQueryInternalKeysSelfTest.java +++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheQueryInternalKeysSelfTest.java @@ -77,7 +77,7 @@ public class GridCacheQueryInternalKeysSelfTest extends GridCacheAbstractSelfTes for (int i = 0; i < ENTRY_CNT; i++) { GridCacheQueueHeaderKey internalKey = new GridCacheQueueHeaderKey("queue" + i); - Collection<ClusterNode> nodes = cache.affinity().mapKeyToPrimaryAndBackups(internalKey); + Collection<ClusterNode> nodes = grid(0).affinity(cache.name()).mapKeyToPrimaryAndBackups(internalKey); for (ClusterNode n : nodes) { Ignite g = findGridForNodeId(n.id()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25609651/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheValueConsistencyAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheValueConsistencyAbstractSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheValueConsistencyAbstractSelfTest.java index 1dc2e4f..8a2ec09 100644 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheValueConsistencyAbstractSelfTest.java +++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheValueConsistencyAbstractSelfTest.java @@ -123,7 +123,7 @@ public abstract class GridCacheValueConsistencyAbstractSelfTest extends GridCach for (int i = 0; i < keyCnt; i++) { String key = "key" + i; - if (cache.affinity().mapKeyToPrimaryAndBackups(key).contains(locNode)) { + if (affinity(0).mapKeyToPrimaryAndBackups(key).contains(locNode)) { info("Node is reported as affinity node for key [key=" + key + ", nodeId=" + locNode.id() + ']'); assertEquals((Integer)i, cache0.peek(key)); @@ -183,7 +183,7 @@ public abstract class GridCacheValueConsistencyAbstractSelfTest extends GridCach for (int i = 0; i < keyCnt; i++) { String key = "key" + i; - if (cache.affinity().mapKeyToPrimaryAndBackups(key).contains(grid(g).localNode())) { + if (affinity(0).mapKeyToPrimaryAndBackups(key).contains(grid(g).localNode())) { info("Node is reported as affinity node for key [key=" + key + ", nodeId=" + locNode.id() + ']'); assertEquals((Integer)i, cache0.peek(key)); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25609651/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteCacheAffinityApiSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteCacheAffinityApiSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteCacheAffinityApiSelfTest.java new file mode 100644 index 0000000..c0ddf82 --- /dev/null +++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteCacheAffinityApiSelfTest.java @@ -0,0 +1,361 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.gridgain.grid.kernal.processors.cache; + +import org.apache.ignite.cluster.*; +import org.gridgain.grid.cache.*; +import org.apache.ignite.cache.affinity.*; +import org.gridgain.grid.kernal.*; +import org.gridgain.grid.kernal.processors.affinity.*; +import org.gridgain.grid.util.typedef.*; + +import java.util.*; + +import static org.gridgain.grid.cache.GridCacheMode.*; +import static org.gridgain.grid.cache.GridCacheWriteSynchronizationMode.*; + +/** + * Affinity API tests. + */ +public class IgniteCacheAffinityApiSelfTest extends GridCacheAbstractSelfTest { + /** */ + private static final int GRID_CNT = 4; + + /** */ + private static final Random RND = new Random(); + + /** {@inheritDoc} */ + @Override protected GridCacheConfiguration cacheConfiguration(String gridName) throws Exception { + GridCacheConfiguration cfg = super.cacheConfiguration(gridName); + + cfg.setCacheMode(PARTITIONED); + cfg.setWriteSynchronizationMode(FULL_SYNC); + + cfg.setBackups(1); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected int gridCount() { + return GRID_CNT; + } + + /** + * @return Affinity. + */ + private CacheAffinityFunction affinity() { + return ((GridKernal)grid(0)).internalCache().configuration().getAffinity(); + } + + /** + * @return Affinity mapper. + */ + private CacheAffinityKeyMapper affinityMapper() { + return ((GridKernal)grid(0)).internalCache().configuration().getAffinityMapper(); + } + + /** + * JUnit. + * + * @throws Exception If failed. + */ + public void testPartitions() throws Exception { + assertEquals(affinity().partitions(), affinity(0).partitions()); + } + + /** + * JUnit. + * + * @throws Exception If failed. + */ + public void testPartition() throws Exception { + String key = "key"; + + assertEquals(affinity().partition(key), affinity(0).partition(key)); + } + + /** + * JUnit. + * + * @throws Exception If failed. + */ + public void testPrimaryPartitionsOneNode() throws Exception { + CacheAffinityFunctionContext ctx = + new CacheAffinityFunctionContextImpl(new ArrayList<>(grid(0).nodes()), null, null, 1, 1); + + List<List<ClusterNode>> assignment = affinity().assignPartitions(ctx); + + for (ClusterNode node : grid(0).nodes()) { + int[] parts = affinity(0).primaryPartitions(node); + + assert !F.isEmpty(parts); + + for (int p : parts) { + Collection<ClusterNode> owners = nodes(assignment, p); + + assert !F.isEmpty(owners); + + ClusterNode primary = F.first(owners); + + assert F.eqNodes(node, primary); + } + } + } + + /** + * JUnit. + * + * @throws Exception If failed. + */ + public void testPrimaryPartitions() throws Exception { + // Pick 2 nodes and create a projection over them. + ClusterNode n0 = grid(0).localNode(); + + int[] parts = affinity(0).primaryPartitions(n0); + + info("Primary partitions count: " + parts.length); + + assert parts.length > 1 : "Invalid partitions: " + Arrays.toString(parts); + + for (int part : parts) + assert part >= 0; + + assert !F.isEmpty(parts); + + CacheAffinityFunctionContext ctx = + new CacheAffinityFunctionContextImpl(new ArrayList<>(grid(0).nodes()), null, null, 1, 1); + + List<List<ClusterNode>> assignment = affinity().assignPartitions(ctx); + + for (int p : parts) { + Collection<ClusterNode> owners = nodes(assignment, p); + + assert !F.isEmpty(owners); + + ClusterNode primary = F.first(owners); + + assert F.eqNodes(n0, primary); + } + } + + /** + * JUnit. + * + * @throws Exception If failed. + */ + public void testBackupPartitions() throws Exception { + // Pick 2 nodes and create a projection over them. + ClusterNode n0 = grid(0).localNode(); + + // Get backup partitions without explicitly specified levels. + int[] parts = affinity(0).backupPartitions(n0); + + assert !F.isEmpty(parts); + + CacheAffinityFunctionContext ctx = + new CacheAffinityFunctionContextImpl(new ArrayList<>(grid(0).nodes()), null, null, 1, 1); + + List<List<ClusterNode>> assignment = affinity().assignPartitions(ctx); + + for (int p : parts) { + Collection<ClusterNode> owners = new ArrayList<>(nodes(assignment, p)); + + assert !F.isEmpty(owners); + + // Remove primary. + Iterator<ClusterNode> iter = owners.iterator(); + + iter.next(); + iter.remove(); + + assert owners.contains(n0); + } + } + + /** + * JUnit. + * + * @throws Exception If failed. + */ + public void testAllPartitions() throws Exception { + // Pick 2 nodes and create a projection over them. + ClusterNode n0 = grid(0).localNode(); + + int[] parts = affinity(0).allPartitions(n0); + + assert !F.isEmpty(parts); + + CacheAffinityFunctionContext ctx = + new CacheAffinityFunctionContextImpl(new ArrayList<>(grid(0).nodes()), null, null, 1, 1); + + List<List<ClusterNode>> assignment = affinity().assignPartitions(ctx); + + for (int p : parts) { + Collection<ClusterNode> owners = nodes(assignment, p); + + assert !F.isEmpty(owners); + + assert owners.contains(n0); + } + } + + /** + * JUnit. + * + * @throws Exception If failed. + */ + public void testMapPartitionToNode() throws Exception { + int part = RND.nextInt(affinity().partitions()); + + CacheAffinityFunctionContext ctx = + new CacheAffinityFunctionContextImpl(new ArrayList<>(grid(0).nodes()), null, null, 1, 1); + + CacheAffinityFunction aff = affinity(); + + List<List<ClusterNode>> assignment = aff.assignPartitions(ctx); + + assertEquals(F.first(nodes(assignment, aff, part)), affinity(0).mapPartitionToNode(part)); + } + + /** + * JUnit. + * + * @throws Exception If failed. + */ + public void testMapPartitionsToNode() throws Exception { + Map<Integer, ClusterNode> map = affinity(0).mapPartitionsToNodes(F.asList(0, 1, 5, 19, 12)); + + CacheAffinityFunctionContext ctx = + new CacheAffinityFunctionContextImpl(new ArrayList<>(grid(0).nodes()), null, null, 1, 1); + + CacheAffinityFunction aff = affinity(); + + List<List<ClusterNode>> assignment = aff.assignPartitions(ctx); + + for (Map.Entry<Integer, ClusterNode> e : map.entrySet()) + assert F.eqNodes(F.first(nodes(assignment, aff, e.getKey())), e.getValue()); + } + + /** + * JUnit. + * + * @throws Exception If failed. + */ + public void testMapPartitionsToNodeArray() throws Exception { + Map<Integer, ClusterNode> map = affinity(0).mapPartitionsToNodes(F.asList(0, 1, 5, 19, 12)); + + CacheAffinityFunctionContext ctx = + new CacheAffinityFunctionContextImpl(new ArrayList<>(grid(0).nodes()), null, null, 1, 1); + + CacheAffinityFunction aff = affinity(); + + List<List<ClusterNode>> assignment = aff.assignPartitions(ctx); + + for (Map.Entry<Integer, ClusterNode> e : map.entrySet()) + assert F.eqNodes(F.first(nodes(assignment, aff, e.getKey())), e.getValue()); + } + + /** + * JUnit. + * + * @throws Exception If failed. + */ + public void testMapPartitionsToNodeCollection() throws Exception { + Collection<Integer> parts = new LinkedList<>(); + + for (int p = 0; p < affinity().partitions(); p++) + parts.add(p); + + Map<Integer, ClusterNode> map = affinity(0).mapPartitionsToNodes(parts); + + CacheAffinityFunctionContext ctx = + new CacheAffinityFunctionContextImpl(new ArrayList<>(grid(0).nodes()), null, null, 1, 1); + + CacheAffinityFunction aff = affinity(); + + List<List<ClusterNode>> assignment = aff.assignPartitions(ctx); + + for (Map.Entry<Integer, ClusterNode> e : map.entrySet()) + assert F.eqNodes(F.first(nodes(assignment, aff, e.getKey())), e.getValue()); + } + + /** + * Gets affinity nodes for partition. + * + * @param assignments Assignments. + * @param part Partition to get affinity nodes for. + * @return Affinity nodes. + */ + private List<ClusterNode> nodes(List<List<ClusterNode>> assignments, int part) { + return assignments.get(part); + } + + /** + * Gets affinity nodes for partition. + * + * @param key Affinity key. + * @return Affinity nodes. + */ + private Iterable<ClusterNode> nodes(List<List<ClusterNode>> assignment, CacheAffinityFunction aff, Object key) { + return assignment.get(aff.partition(key)); + } + + /** + * JUnit. + * + * @throws Exception If failed. + */ + public void testEntryPartition() throws Exception { + int keyCnt = 100; + + for (int kv = 0; kv < keyCnt; kv++) + cache().put(String.valueOf(kv), kv); + + for (int kv = 0; kv < keyCnt; kv++) { + String key = String.valueOf(kv); + + GridCacheEntry<String, Integer> entry = cache().entry(key); + + assert entry != null; + + assertEquals(affinity().partition(key), entry.partition()); + } + } + + /** + * @throws Exception If failed. + */ + public void testPartitionWithAffinityMapper() throws Exception { + CacheAffinityKey<Integer> key = new CacheAffinityKey<>(1, 2); + + int expPart = affinity().partition(affinityMapper().affinityKey(key)); + + for (int i = 0; i < gridCount(); i++) { + assertEquals(expPart, grid(i).affinity(grid(i).cache(null).name()).partition(key)); + assertEquals(expPart, grid(i).cache(null).entry(key).partition()); + } + + assertTrue(grid(0).cache(null).putx(key, 1)); + + for (int i = 0; i < gridCount(); i++) { + assertEquals(expPart, grid(i).affinity(grid(i).cache(null).name()).partition(key)); + assertEquals(expPart, grid(i).cache(null).entry(key).partition()); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25609651/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteCacheAffinityMapperSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteCacheAffinityMapperSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteCacheAffinityMapperSelfTest.java new file mode 100644 index 0000000..3e9aaaa --- /dev/null +++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteCacheAffinityMapperSelfTest.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.gridgain.grid.kernal.processors.cache; + +import org.apache.ignite.cache.affinity.*; +import org.gridgain.testframework.junits.common.*; + +import java.util.*; + +/** + * Test affinity mapper. + */ +public class IgniteCacheAffinityMapperSelfTest extends GridCommonAbstractTest { + /** + * + */ + public void testMethodAffinityMapper() { + CacheAffinityKeyMapper mapper = + new CacheDefaultAffinityKeyMapper(); + + List<CacheAffinityKey<Integer>> keys = new ArrayList<>(); + + for (int i = 1; i <= 10; i++) + keys.add(new CacheAffinityKey<>(i, Integer.toString(i))); + + for (int i = 1; i <= 10; i++) { + CacheAffinityKey<Integer> key = keys.get(i - 1); + + Object mapped = mapper.affinityKey(key); + + info("Mapped key: " + mapped); + + assertNotNull(mapped); + assertSame(key.affinityKey(), mapped); + } + } + + /** + * + */ + public void testFieldAffinityMapper() { + CacheAffinityKeyMapper mapper = + new CacheDefaultAffinityKeyMapper(); + + List<FieldAffinityKey<Integer>> keys = new ArrayList<>(); + + for (int i = 1; i <= 10; i++) + keys.add(new FieldAffinityKey<>(i, Integer.toString(i))); + + for (int i = 1; i <= 10; i++) { + FieldAffinityKey<Integer> key = keys.get(i - 1); + + Object mapped = mapper.affinityKey(key); + + info("Mapped key: " + mapped); + + assertNotNull(mapped); + assertSame(key.affinityKey(), mapped); + } + } + + /** + * + */ + public void testFieldAffinityMapperWithWrongClass() { + CacheAffinityKeyMapper mapper = + new CacheDefaultAffinityKeyMapper(); + + FieldNoAffinityKey key = new FieldNoAffinityKey(); + Object mapped = mapper.affinityKey(key); + assertEquals(key, mapped); + } + + /** + * Test key for field annotation. + */ + private static class FieldNoAffinityKey { + // No-op. + } + + /** + * Test key for field annotation. + */ + private static class FieldAffinityKey<K> { + /** Key. */ + private K key; + + /** Affinity key. */ + @CacheAffinityKeyMapped + private Object affKey; + + /** + * Initializes key together with its affinity key counter-part. + * + * @param key Key. + * @param affKey Affinity key. + */ + FieldAffinityKey(K key, Object affKey) { + this.key = key; + this.affKey = affKey; + } + + /** + * @return Key. + */ + public K key() { + return key; + } + + /** + * @return Affinity key. + */ + public Object affinityKey() { + return affKey; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25609651/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteCacheAffinityRoutingSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteCacheAffinityRoutingSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteCacheAffinityRoutingSelfTest.java new file mode 100644 index 0000000..987b297 --- /dev/null +++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteCacheAffinityRoutingSelfTest.java @@ -0,0 +1,689 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.gridgain.grid.kernal.processors.cache; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.marshaller.optimized.*; +import org.apache.ignite.resources.*; +import org.gridgain.grid.cache.*; +import org.apache.ignite.cache.affinity.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.gridgain.grid.util.typedef.*; +import org.gridgain.grid.util.typedef.internal.*; +import org.gridgain.grid.util.tostring.*; +import org.gridgain.testframework.junits.common.*; + +import java.util.*; + +import static org.gridgain.grid.cache.GridCacheMode.*; +import static org.gridgain.grid.cache.GridCacheWriteSynchronizationMode.*; + +/** + * Affinity routing tests. + */ +public class IgniteCacheAffinityRoutingSelfTest extends GridCommonAbstractTest { + /** */ + private static final int GRID_CNT = 4; + + /** */ + private static final String NON_DFLT_CACHE_NAME = "myCache"; + + /** */ + private static final int KEY_CNT = 50; + + /** */ + private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** + * Constructs test. + */ + public IgniteCacheAffinityRoutingSelfTest() { + super(/* don't start grid */ false); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + TcpDiscoverySpi spi = new TcpDiscoverySpi(); + + spi.setIpFinder(ipFinder); + + cfg.setDiscoverySpi(spi); + + if (!gridName.equals(getTestGridName(GRID_CNT))) { + // Default cache configuration. + GridCacheConfiguration dfltCacheCfg = defaultCacheConfiguration(); + + dfltCacheCfg.setCacheMode(PARTITIONED); + dfltCacheCfg.setBackups(1); + dfltCacheCfg.setWriteSynchronizationMode(FULL_SYNC); + + // Non-default cache configuration. + GridCacheConfiguration namedCacheCfg = defaultCacheConfiguration(); + + namedCacheCfg.setCacheMode(PARTITIONED); + namedCacheCfg.setBackups(1); + namedCacheCfg.setWriteSynchronizationMode(FULL_SYNC); + namedCacheCfg.setName(NON_DFLT_CACHE_NAME); + + cfg.setCacheConfiguration(dfltCacheCfg, namedCacheCfg); + } + else { + // No cache should be configured for extra node. + cfg.setCacheConfiguration(); + } + + cfg.setMarshaller(new IgniteOptimizedMarshaller(false)); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + for (int i = 0; i < GRID_CNT; i++) + startGrid(i); + + assert G.allGrids().size() == GRID_CNT; + + for (int i = 0; i < KEY_CNT; i++) { + grid(0).cache(null).put(i, i); + + grid(0).cache(NON_DFLT_CACHE_NAME).put(i, i); + } + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + super.afterTestsStopped(); + + for (int i = 0; i < GRID_CNT; i++) + stopGrid(i); + + assert G.allGrids().isEmpty(); + } + + /** + * JUnit. + * + * @throws Exception If failed. + */ + public void testAffinityRun() throws Exception { + for (int i = 0; i < KEY_CNT; i++) + grid(0).compute().affinityRun(NON_DFLT_CACHE_NAME, i, new CheckRunnable(i, i)); + } + + /** + * JUnit. + * + * @throws Exception If failed. + */ + public void testAffinityRunComplexKey() throws Exception { + for (int i = 0; i < KEY_CNT; i++) { + AffinityTestKey key = new AffinityTestKey(i); + + grid(0).compute().affinityRun(NON_DFLT_CACHE_NAME, i, new CheckRunnable(i, key)); + grid(0).compute().affinityRun(NON_DFLT_CACHE_NAME, key, new CheckRunnable(i, key)); + } + } + + /** + * JUnit. + * + * @throws Exception If failed. + */ + public void testAffinityCall() throws Exception { + for (int i = 0; i < KEY_CNT; i++) + grid(0).compute().affinityCall(NON_DFLT_CACHE_NAME, i, new CheckCallable(i, i)); + } + + /** + * JUnit. + * + * @throws Exception If failed. + */ + public void testAffinityCallComplexKey() throws Exception { + for (int i = 0; i < KEY_CNT; i++) { + final AffinityTestKey key = new AffinityTestKey(i); + + grid(0).compute().affinityCall(NON_DFLT_CACHE_NAME, i, new CheckCallable(i, key)); + grid(0).compute().affinityCall(NON_DFLT_CACHE_NAME, key, new CheckCallable(i, key)); + } + } + + /** + * JUnit. + * + * @throws Exception If failed. + */ + public void testField() throws Exception { + // Jobs should be routed correctly in case of using load balancer. + for (int i = 0; i < KEY_CNT; i++) + assert grid(0).compute().call(new FieldAffinityJob(i)) : + "Job was routed to a wrong node [i=" + i + "]"; + } + + /** + * JUnit. + * + * @throws Exception If failed. + */ + public void testMethod() throws Exception { + // Jobs should be routed correctly in case of using load balancer. + for (int i = 0; i < KEY_CNT; i++) + assert grid(0).compute().call(new MethodAffinityJob(i)) : + "Job was routed to a wrong node [i=" + i + "]"; + } + + /** + * JUnit. + * + * @throws Exception If failed. + */ + public void testFiledCacheName() throws Exception { + // Jobs should be routed correctly in case of using load balancer. + for (int i = 0; i < KEY_CNT; i++) + assert grid(0).compute().call(new FieldCacheNameAffinityJob(i)) : + "Job was routed to a wrong node [i=" + i + "]"; + } + + /** + * JUnit. + * + * @throws Exception If failed. + */ + public void testMethodCacheName() throws Exception { + // Jobs should be routed correctly in case of using load balancer. + for (int i = 0; i < KEY_CNT; i++) + assert grid(0).compute().call(new MethodCacheNameAffinityJob(i)) : + "Job was routed to a wrong node [i=" + i + "]"; + } + + /** + * JUnit. + * + * @throws Exception If failed. + */ + public void testMultipleAnnotationsJob() throws Exception { + try { + grid(0).compute().call(new MultipleAnnotationsJob(0)); + + fail(); + } + catch (IgniteCheckedException e) { + info("Caught expected exception: " + e); + } + } + + /** + * JUnit. + * + * @throws Exception If failed. + */ + public void testTask() throws Exception { + // Jobs should be routed correctly. + for (int i = 0; i < KEY_CNT; i++) + assert grid(0).compute().execute(new OneJobTask(i), i) : + "Job was routed to a wrong node [i=" + i + "]"; + + info("Starting extra node without configured caches..."); + + assertEquals(GRID_CNT, G.allGrids().size()); + + Ignite g = startGrid(GRID_CNT); + + try { + assertEquals(GRID_CNT + 1, g.cluster().nodes().size()); + + for (int i = 0; i < KEY_CNT; i++) + assert grid(GRID_CNT).compute().execute(new OneJobTask(i), i) : + "Job was routed to a wrong node [i=" + i + "]"; + } + finally { + stopGrid(GRID_CNT); + } + } + + /** + * Test job with field annotation. + */ + private static class FieldAffinityJob implements IgniteCallable<Boolean> { + /** Affinity key. */ + @CacheAffinityKeyMapped + @GridToStringInclude + private Object affKey; + + /** */ + @IgniteInstanceResource + private Ignite ignite; + + /** */ + @IgniteLoggerResource + private IgniteLogger log; + + /** */ + @IgniteJobContextResource + private ComputeJobContext jobCtx; + + /** + * @param affKey Affinity key. + */ + FieldAffinityJob(Object affKey) { + this.affKey = affKey; + } + + /** {@inheritDoc} */ + @Override public Boolean call() { + assert ignite != null; + + assert jobCtx.affinityKey().equals(affKey); + assert jobCtx.cacheName() == null; + + if (log.isDebugEnabled()) + log.debug("Running job [node=" + ignite.cluster().localNode().id() + ", job=" + this + "]"); + + IgniteCacheAffinity<Object> aff = ignite.affinity(ignite.cache(null).name()); + + return F.eqNodes(ignite.cluster().localNode(), aff.mapKeyToNode(affKey)); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(FieldAffinityJob.class, this); + } + } + + /** + * Test job with method annotation. + */ + private static class MethodAffinityJob implements IgniteCallable<Boolean> { + /** Affinity key. */ + @GridToStringInclude + private Object affKey; + + /** */ + @IgniteInstanceResource + private Ignite ignite; + + /** */ + @IgniteLoggerResource + private IgniteLogger log; + + /** */ + @IgniteJobContextResource + private ComputeJobContext jobCtx; + + /** + * @param affKey Affinity key. + */ + MethodAffinityJob(Object affKey) { + this.affKey = affKey; + } + + /** + * @return Affinity key. + */ + @CacheAffinityKeyMapped + public Object affinityKey() { + return affKey; + } + + /** {@inheritDoc} */ + @Override public Boolean call() { + assert ignite != null; + + assert jobCtx.affinityKey().equals(affinityKey()); + assert jobCtx.cacheName() == null; + + if (log.isDebugEnabled()) + log.debug("Running job [node=" + ignite.cluster().localNode().id() + ", job=" + this + "]"); + + IgniteCacheAffinity<Object> aff = ignite.affinity(ignite.cache(null).name()); + + return F.eqNodes(ignite.cluster().localNode(), aff.mapKeyToNode(affKey)); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(MethodAffinityJob.class, this); + } + } + + /** + * Test job with field cache name annotation. + */ + private static class FieldCacheNameAffinityJob implements IgniteCallable<Boolean> { + /** Affinity key. */ + @GridToStringInclude + private Object affKey; + + /** Cache name to use affinity from. */ + @GridCacheName + private String cacheName = NON_DFLT_CACHE_NAME; + + /** */ + @IgniteInstanceResource + private Ignite ignite; + + /** */ + @IgniteLoggerResource + private IgniteLogger log; + + /** */ + @IgniteJobContextResource + private ComputeJobContext jobCtx; + + /** + * @param affKey Affinity key. + */ + FieldCacheNameAffinityJob(Object affKey) { + this.affKey = affKey; + } + + /** + * @return Affinity key. + */ + @CacheAffinityKeyMapped + public Object affinityKey() { + return affKey; + } + + /** {@inheritDoc} */ + @Override public Boolean call() { + assert ignite != null; + + assert jobCtx.affinityKey().equals(affKey); + assert jobCtx.cacheName().equals(cacheName); + + if (log.isDebugEnabled()) + log.debug("Running job [node=" + ignite.cluster().localNode().id() + ", job=" + this + "]"); + + IgniteCacheAffinity<Object> aff = ignite.affinity(cacheName); + + return F.eqNodes(ignite.cluster().localNode(), aff.mapKeyToNode(affKey)); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(FieldCacheNameAffinityJob.class, this); + } + } + + /** + * Test job with method cache name annotation. + */ + private static class MethodCacheNameAffinityJob implements IgniteCallable<Boolean> { + /** Affinity key. */ + @GridToStringInclude + private Object affKey; + + /** */ + @IgniteInstanceResource + private Ignite ignite; + + /** */ + @IgniteLoggerResource + private IgniteLogger log; + + /** */ + @IgniteJobContextResource + private ComputeJobContext jobCtx; + + /** + * @param affKey Affinity key. + */ + MethodCacheNameAffinityJob(Object affKey) { + this.affKey = affKey; + } + + /** + * @return Affinity key. + */ + @CacheAffinityKeyMapped + public Object affinityKey() { + return affKey; + } + + /** + * @return Cache name for affinity routing. + */ + @GridCacheName + public String cacheName() { + return NON_DFLT_CACHE_NAME; + } + + /** {@inheritDoc} */ + @Override public Boolean call() { + assert ignite != null; + + assert jobCtx.affinityKey().equals(affKey); + assert jobCtx.cacheName().equals(cacheName()); + + if (log.isDebugEnabled()) + log.debug("Running job [node=" + ignite.cluster().localNode().id() + ", job=" + this + "]"); + + IgniteCacheAffinity<Object> aff = ignite.affinity(cacheName()); + + return F.eqNodes(ignite.cluster().localNode(), aff.mapKeyToNode(affKey)); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(MethodCacheNameAffinityJob.class, this); + } + } + + /** + * Test job with method cache name annotation. + */ + private static class MultipleAnnotationsJob implements IgniteCallable<Boolean> { + /** Affinity key. */ + @GridToStringInclude + @CacheAffinityKeyMapped + private Object affKey; + + /** Duplicated affinity key. */ + @SuppressWarnings({"UnusedDeclaration"}) + @CacheAffinityKeyMapped + private Object affKeyDup; + + /** */ + @IgniteInstanceResource + private Ignite ignite; + + /** */ + @IgniteLoggerResource + private IgniteLogger log; + + /** + * @param affKey Affinity key. + */ + MultipleAnnotationsJob(Object affKey) { + this.affKey = affKey; + affKeyDup = affKey; + } + + /** {@inheritDoc} */ + @Override public Boolean call() { + assert ignite != null; + + if (log.isDebugEnabled()) + log.debug("Running job [node=" + ignite.cluster().localNode().id() + ", job=" + this + "]"); + + IgniteCacheAffinity<Object> aff = ignite.affinity(ignite.cache(null).name()); + + return F.eqNodes(ignite.cluster().localNode(), aff.mapKeyToNode(affKey)); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(MultipleAnnotationsJob.class, this); + } + } + + /** + * Test task that produces a single job. + */ + private static class OneJobTask extends ComputeTaskSplitAdapter<Integer, Boolean> { + /** Affinity key. */ + @GridToStringInclude + @CacheAffinityKeyMapped + private Object affKey; + + /** */ + @IgniteLoggerResource + private IgniteLogger log; + + /** */ + @IgniteInstanceResource + private Ignite ignite; + + /** + * @param affKey Affinity key. + */ + private OneJobTask(Integer affKey) { + this.affKey = affKey; + } + + /** {@inheritDoc} */ + @Override protected Collection<? extends ComputeJob> split(int gridSize, Integer arg) throws IgniteCheckedException { + return F.asList(new ComputeJobAdapter() { + @Override public Object execute() { + IgniteCacheAffinity<Object> aff = ignite.affinity(ignite.cache(null).name()); + + ClusterNode primary = aff.mapKeyToNode(affKey); + + if (log.isInfoEnabled()) + log.info("Primary node for the job key [affKey=" + affKey + ", primary=" + primary.id() + "]"); + + return F.eqNodes(ignite.cluster().localNode(), primary); + } + }); + } + + /** {@inheritDoc} */ + @Override public Boolean reduce(List<ComputeJobResult> results) throws IgniteCheckedException { + return results.get(0).getData(); + } + } + + /** + * Test key. + */ + private static class AffinityTestKey { + /** Affinity key. */ + @CacheAffinityKeyMapped + private final int affKey; + + /** + * @param affKey Affinity key. + */ + private AffinityTestKey(int affKey) { + this.affKey = affKey; + } + + /** + * @return Affinity key. + */ + public int affinityKey() { + return affKey; + } + } + + /** + * Test runnable. + */ + private static class CheckRunnable extends CAX { + /** Affinity key. */ + private final Object affKey; + + /** Key. */ + private final Object key; + + /** */ + @IgniteInstanceResource + private Ignite ignite; + + /** */ + @IgniteJobContextResource + private ComputeJobContext jobCtx; + + /** + * @param affKey Affinity key. + * @param key Key. + */ + private CheckRunnable(Object affKey, Object key) { + this.affKey = affKey; + this.key = key; + } + + /** {@inheritDoc} */ + @Override public void applyx() throws IgniteCheckedException { + assert ignite.cluster().localNode().id().equals(ignite.cluster().mapKeyToNode(null, affKey).id()); + assert ignite.cluster().localNode().id().equals(ignite.cluster().mapKeyToNode(null, key).id()); + assert jobCtx.affinityKey().equals(affKey); + assert jobCtx.cacheName().equals(NON_DFLT_CACHE_NAME); + } + } + + /** + * Test callable. + */ + private static class CheckCallable implements IgniteCallable<Object> { + /** Affinity key. */ + private final Object affKey; + + /** Key. */ + private final Object key; + + /** */ + @IgniteInstanceResource + private Ignite ignite; + + /** */ + @IgniteJobContextResource + private ComputeJobContext jobCtx; + + /** + * @param affKey Affinity key. + * @param key Key. + */ + private CheckCallable(Object affKey, Object key) { + this.affKey = affKey; + this.key = key; + } + + /** {@inheritDoc} */ + @Override public Object call() throws IgniteCheckedException { + assert ignite.cluster().localNode().id().equals(ignite.cluster().mapKeyToNode(null, affKey).id()); + assert ignite.cluster().localNode().id().equals(ignite.cluster().mapKeyToNode(null, key).id()); + assert jobCtx.affinityKey().equals(affKey); + assert jobCtx.cacheName().equals(NON_DFLT_CACHE_NAME); + + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25609651/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxAbstractTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxAbstractTest.java index cc63503..af21c6f 100644 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxAbstractTest.java +++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxAbstractTest.java @@ -21,7 +21,7 @@ import org.apache.ignite.*; import org.apache.ignite.configuration.*; import org.apache.ignite.transactions.*; import org.gridgain.grid.cache.*; -import org.gridgain.grid.cache.affinity.*; +import org.apache.ignite.cache.affinity.*; import org.gridgain.grid.kernal.*; import org.apache.ignite.spi.discovery.tcp.*; import org.apache.ignite.spi.discovery.tcp.ipfinder.*; @@ -185,12 +185,12 @@ abstract class IgniteTxAbstractTest extends GridCommonAbstractTest { assert key >= prevKey : "key: " + key + ", prevKey: " + prevKey; if (isTestDebug()) { - GridCacheAffinityFunction aff = cache.configuration().getAffinity(); + CacheAffinityFunction aff = cache.configuration().getAffinity(); int part = aff.partition(key); debug("Key affinity [key=" + key + ", partition=" + part + ", affinity=" + - U.toShortString(cache.affinity().mapPartitionToPrimaryAndBackups(part)) + ']'); + U.toShortString(affinity(gridIdx).mapPartitionToPrimaryAndBackups(part)) + ']'); } String val = Integer.toString(key); @@ -317,12 +317,12 @@ abstract class IgniteTxAbstractTest extends GridCommonAbstractTest { try { for (Integer key : getKeys()) { if (isTestDebug()) { - GridCacheAffinityFunction aff = cache.configuration().getAffinity(); + CacheAffinityFunction aff = cache.configuration().getAffinity(); int part = aff.partition(key); debug("Key affinity [key=" + key + ", partition=" + part + ", affinity=" + - U.toShortString(cache.affinity().mapPartitionToPrimaryAndBackups(part)) + ']'); + U.toShortString(affinity(gridIdx).mapPartitionToPrimaryAndBackups(part)) + ']'); } String val = Integer.toString(key); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25609651/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxExceptionAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxExceptionAbstractSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxExceptionAbstractSelfTest.java index 4775d41..8247ec7 100644 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxExceptionAbstractSelfTest.java +++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxExceptionAbstractSelfTest.java @@ -19,11 +19,11 @@ package org.gridgain.grid.kernal.processors.cache; import org.apache.ignite.cluster.*; import org.apache.ignite.configuration.*; -import org.apache.ignite.lang.*; import org.apache.ignite.spi.*; import org.apache.ignite.spi.indexing.*; import org.apache.ignite.transactions.*; import org.gridgain.grid.cache.*; +import org.apache.ignite.IgniteCacheAffinity; import org.gridgain.grid.kernal.*; import org.gridgain.grid.kernal.processors.cache.distributed.near.*; import org.gridgain.testframework.*; @@ -536,6 +536,7 @@ public abstract class IgniteTxExceptionAbstractSelfTest extends GridCacheAbstrac */ private Integer keyForNode(ClusterNode node, int type) { GridCache<Integer, Integer> cache = grid(0).cache(null); + IgniteCacheAffinity<Integer> affinity = grid(0).affinity(cache.name()); if (cache.configuration().getCacheMode() == LOCAL) return ++lastKey; @@ -546,7 +547,7 @@ public abstract class IgniteTxExceptionAbstractSelfTest extends GridCacheAbstrac for (int key = lastKey + 1; key < (lastKey + 10_000); key++) { switch (type) { case NOT_PRIMARY_AND_BACKUP: { - if (!cache.affinity().isPrimaryOrBackup(node, key)) { + if (!grid(0).affinity(null).isPrimaryOrBackup(node, key)) { lastKey = key; return key; @@ -556,7 +557,7 @@ public abstract class IgniteTxExceptionAbstractSelfTest extends GridCacheAbstrac } case PRIMARY: { - if (cache.affinity().isPrimary(node, key)) { + if (affinity.isPrimary(node, key)) { lastKey = key; return key; @@ -566,7 +567,7 @@ public abstract class IgniteTxExceptionAbstractSelfTest extends GridCacheAbstrac } case BACKUP: { - if (cache.affinity().isBackup(node, key)) { + if (affinity.isBackup(node, key)) { lastKey = key; return key; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25609651/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxMultiNodeAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxMultiNodeAbstractTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxMultiNodeAbstractTest.java index 83d7a9b..3a52e77 100644 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxMultiNodeAbstractTest.java +++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxMultiNodeAbstractTest.java @@ -24,7 +24,6 @@ import org.apache.ignite.lang.*; import org.apache.ignite.resources.*; import org.apache.ignite.transactions.*; import org.gridgain.grid.cache.*; -import org.gridgain.grid.cache.affinity.*; import org.gridgain.grid.cache.query.*; import org.gridgain.grid.kernal.*; import org.gridgain.grid.kernal.processors.cache.distributed.dht.*; @@ -109,7 +108,7 @@ public abstract class IgniteTxMultiNodeAbstractTest extends GridCommonAbstractTe */ @SuppressWarnings("unchecked") private static UUID primaryId(Ignite ignite, Object key) { - GridCacheAffinity aff = ignite.cache(null).cache().affinity(); + IgniteCacheAffinity aff = ignite.affinity(null); Collection<ClusterNode> affNodes = aff.mapPartitionToPrimaryAndBackups(aff.partition(key)); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25609651/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxStoreExceptionAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxStoreExceptionAbstractSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxStoreExceptionAbstractSelfTest.java index 5731a86..2b0c916 100644 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxStoreExceptionAbstractSelfTest.java +++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/IgniteTxStoreExceptionAbstractSelfTest.java @@ -551,7 +551,7 @@ public abstract class IgniteTxStoreExceptionAbstractSelfTest extends GridCacheAb for (int key = lastKey + 1; key < (lastKey + 10_000); key++) { switch (type) { case NOT_PRIMARY_AND_BACKUP: { - if (!cache.affinity().isPrimaryOrBackup(node, key)) { + if (!grid(0).affinity(null).isPrimaryOrBackup(node, key)) { lastKey = key; return key; @@ -561,7 +561,7 @@ public abstract class IgniteTxStoreExceptionAbstractSelfTest extends GridCacheAb } case PRIMARY: { - if (cache.affinity().isPrimary(node, key)) { + if (grid(0).affinity(null).isPrimary(node, key)) { lastKey = key; return key; @@ -571,7 +571,7 @@ public abstract class IgniteTxStoreExceptionAbstractSelfTest extends GridCacheAb } case BACKUP: { - if (cache.affinity().isBackup(node, key)) { + if (grid(0).affinity(null).isBackup(node, key)) { lastKey = key; return key; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25609651/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheAbstractQueueFailoverDataConsistencySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheAbstractQueueFailoverDataConsistencySelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheAbstractQueueFailoverDataConsistencySelfTest.java index 96501f4..e3cd046 100644 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheAbstractQueueFailoverDataConsistencySelfTest.java +++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheAbstractQueueFailoverDataConsistencySelfTest.java @@ -17,10 +17,10 @@ package org.gridgain.grid.kernal.processors.cache.datastructures; +import org.apache.ignite.IgniteCacheAffinity; import org.apache.ignite.configuration.*; import org.apache.ignite.lang.*; import org.gridgain.grid.cache.*; -import org.gridgain.grid.cache.affinity.*; import org.gridgain.grid.cache.datastructures.*; import org.gridgain.grid.kernal.*; import org.gridgain.grid.kernal.processors.cache.*; @@ -357,7 +357,7 @@ public abstract class GridCacheAbstractQueueFailoverDataConsistencySelfTest exte * @return Primary node for queue's header. */ private int primaryQueueNode() { - GridCacheAffinity<Object> aff = grid(0).cache(null).affinity(); + IgniteCacheAffinity<Object> aff = grid(0).affinity(null); for (int i = 0; i < gridCount(); i++) { for (GridCacheEntryEx e : ((GridKernal)grid(i)).context().cache().internalCache().map().allEntries0()) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25609651/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheQueueCleanupSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheQueueCleanupSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheQueueCleanupSelfTest.java index da4caff..4a68f75 100644 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheQueueCleanupSelfTest.java +++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheQueueCleanupSelfTest.java @@ -79,7 +79,7 @@ public class GridCacheQueueCleanupSelfTest extends GridCacheAbstractSelfTest { public void testCleanup() throws Exception { GridCacheQueue<Integer> queue = cache().dataStructures().queue(QUEUE_NAME1, 0, false, true); - ClusterNode node = grid(0).cache(null).affinity().mapKeyToNode(new GridCacheQueueHeaderKey(QUEUE_NAME1)); + ClusterNode node = grid(0).affinity(null).mapKeyToNode(new GridCacheQueueHeaderKey(QUEUE_NAME1)); final Ignite ignite = grid(0).localNode().equals(node) ? grid(1) : grid(0); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25609651/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheQueueMultiNodeAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheQueueMultiNodeAbstractSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheQueueMultiNodeAbstractSelfTest.java index f9a5a27..b17515b 100644 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheQueueMultiNodeAbstractSelfTest.java +++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/datastructures/GridCacheQueueMultiNodeAbstractSelfTest.java @@ -182,7 +182,7 @@ public abstract class GridCacheQueueMultiNodeAbstractSelfTest extends GridCommon @Override public Object call() throws Exception { info(">>> Executing put callable [node=" + g.cluster().localNode().id() + ", thread=" + Thread.currentThread().getName() + ", aff=" + - F.nodeId8s(g.cache(null).affinity().mapKeyToPrimaryAndBackups( + F.nodeId8s(g.affinity(null).mapKeyToPrimaryAndBackups( new GridCacheInternalKeyImpl(queueName))) + ']'); GridCacheQueue<Integer> q = g.cache(null).dataStructures().queue(queueName, 5, true, true); @@ -215,7 +215,7 @@ public abstract class GridCacheQueueMultiNodeAbstractSelfTest extends GridCommon try { info(">>> Executing poll callable [node=" + g1.cluster().localNode().id() + ", thread=" + Thread.currentThread().getName() + ", aff=" + - F.nodeId8s(g1.cache(null).affinity().mapKeyToPrimaryAndBackups( + F.nodeId8s(g1.affinity(null).mapKeyToPrimaryAndBackups( new GridCacheInternalKeyImpl(queueName))) + ']'); GridCacheQueue<Integer> q = g1.cache(null).dataStructures().queue(queueName, 5, true, true); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25609651/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/datastructures/partitioned/GridCachePartitionedQueueCreateMultiNodeSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/datastructures/partitioned/GridCachePartitionedQueueCreateMultiNodeSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/datastructures/partitioned/GridCachePartitionedQueueCreateMultiNodeSelfTest.java index 2859076..b6e2f30 100644 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/datastructures/partitioned/GridCachePartitionedQueueCreateMultiNodeSelfTest.java +++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/datastructures/partitioned/GridCachePartitionedQueueCreateMultiNodeSelfTest.java @@ -157,7 +157,7 @@ public class GridCachePartitionedQueueCreateMultiNodeSelfTest extends GridCommon GridCache<Integer, String> cache = ignite.cache(null); - info("Partition: " + cache.affinity().partition(1)); + info("Partition: " + ignite.affinity(cache.name()).partition(1)); try (IgniteTx tx = cache.txStart(PESSIMISTIC, REPEATABLE_READ)) { // info("Getting value for key 1"); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25609651/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/datastructures/partitioned/GridCachePartitionedQueueEntryMoveSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/datastructures/partitioned/GridCachePartitionedQueueEntryMoveSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/datastructures/partitioned/GridCachePartitionedQueueEntryMoveSelfTest.java index 4e5ca09..eb197b9 100644 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/datastructures/partitioned/GridCachePartitionedQueueEntryMoveSelfTest.java +++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/datastructures/partitioned/GridCachePartitionedQueueEntryMoveSelfTest.java @@ -21,9 +21,8 @@ import org.apache.ignite.*; import org.apache.ignite.cluster.*; import org.apache.ignite.configuration.*; import org.apache.ignite.lang.*; -import org.gridgain.grid.*; import org.gridgain.grid.cache.*; -import org.gridgain.grid.cache.affinity.*; +import org.apache.ignite.cache.affinity.*; import org.gridgain.grid.cache.datastructures.*; import org.gridgain.grid.kernal.processors.affinity.*; import org.apache.ignite.spi.discovery.tcp.*; @@ -193,8 +192,8 @@ public class GridCachePartitionedQueueEntryMoveSelfTest extends GridCommonAbstra * @throws Exception If failed. */ private void startAdditionalNodes(int cnt, String queueName) throws Exception { - GridCacheAffinityFunction aff = cache(0).configuration().getAffinity(); - GridCacheAffinityKeyMapper mapper = cache(0).configuration().getAffinityMapper(); + CacheAffinityFunction aff = cache(0).configuration().getAffinity(); + CacheAffinityKeyMapper mapper = cache(0).configuration().getAffinityMapper(); assertNotNull(aff); assertNotNull(mapper); @@ -203,7 +202,7 @@ public class GridCachePartitionedQueueEntryMoveSelfTest extends GridCommonAbstra Collection<ClusterNode> nodes = grid(0).nodes(); - Collection<ClusterNode> aff0 = cache(0).affinity().mapKeyToPrimaryAndBackups(queueName); + Collection<ClusterNode> aff0 = affinity(0).mapKeyToPrimaryAndBackups(queueName); Collection<ClusterNode> aff1 = nodes(aff, part, nodes); assertEquals(new ArrayList<>(aff0), new ArrayList<>(aff1)); @@ -235,7 +234,7 @@ public class GridCachePartitionedQueueEntryMoveSelfTest extends GridCommonAbstra startGrid(i++); } - aff2 = cache(0).affinity().mapKeyToPrimaryAndBackups(queueName); + aff2 = affinity(0).mapKeyToPrimaryAndBackups(queueName); assertFalse("Unexpected affinity [aff1=" + aff1 + ", aff2=" + aff2 + ']', F.containsAny(aff1, aff2)); } @@ -246,9 +245,9 @@ public class GridCachePartitionedQueueEntryMoveSelfTest extends GridCommonAbstra * @param nodes Topology nodes. * @return Affinity nodes for partition. */ - private Collection<ClusterNode> nodes(GridCacheAffinityFunction aff, int part, Collection<ClusterNode> nodes) { + private Collection<ClusterNode> nodes(CacheAffinityFunction aff, int part, Collection<ClusterNode> nodes) { List<List<ClusterNode>> assignment = aff.assignPartitions( - new GridCacheAffinityFunctionContextImpl(new ArrayList<>(nodes), null, null, 1, BACKUP_CNT)); + new CacheAffinityFunctionContextImpl(new ArrayList<>(nodes), null, null, 1, BACKUP_CNT)); return assignment.get(part); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25609651/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/CacheModuloAffinityFunction.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/CacheModuloAffinityFunction.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/CacheModuloAffinityFunction.java new file mode 100644 index 0000000..d04e87f --- /dev/null +++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/CacheModuloAffinityFunction.java @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.gridgain.grid.kernal.processors.cache.distributed; + +import org.apache.ignite.cluster.*; +import org.apache.ignite.cache.affinity.*; +import org.gridgain.grid.util.typedef.*; +import org.gridgain.grid.util.typedef.internal.*; + +import java.util.*; + +/** + * Affinity which controls where nodes end up using mod operation. + */ +public class CacheModuloAffinityFunction implements CacheAffinityFunction { + /** Node attribute for index. */ + public static final String IDX_ATTR = "nodeIndex"; + + /** Number of backups. */ + private int backups = -1; + + /** Number of partitions. */ + private int parts = -1; + + /** + * Empty constructor. + */ + public CacheModuloAffinityFunction() { + // No-op. + } + + /** + * @param parts Number of partitions. + * @param backups Number of backups. + */ + public CacheModuloAffinityFunction(int parts, int backups) { + assert parts > 0; + assert backups >= 0; + + this.parts = parts; + this.backups = backups; + } + + /** + * @param parts Number of partitions. + */ + public void partitions(int parts) { + assert parts > 0; + + this.parts = parts; + } + + /** + * @param backups Number of backups. + */ + public void backups(int backups) { + assert backups >= 0; + + this.backups = backups; + } + + /** + * @return Number of backups. + */ + public int backups() { + return backups; + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public List<List<ClusterNode>> assignPartitions(CacheAffinityFunctionContext ctx) { + List<List<ClusterNode>> res = new ArrayList<>(parts); + + Collection<ClusterNode> topSnapshot = ctx.currentTopologySnapshot(); + + for (int part = 0; part < parts; part++) { + res.add(F.isEmpty(topSnapshot) ? + Collections.<ClusterNode>emptyList() : + // Wrap affinity nodes with unmodifiable list since unmodifiable generic collection + // doesn't provide equals and hashCode implementations. + U.sealList(nodes(part, topSnapshot))); + } + + return Collections.unmodifiableList(res); + } + + /** {@inheritDoc} */ + public Collection<ClusterNode> nodes(int part, Collection<ClusterNode> nodes) { + List<ClusterNode> sorted = new ArrayList<>(nodes); + + Collections.sort(sorted, new Comparator<ClusterNode>() { + @Override public int compare(ClusterNode n1, ClusterNode n2) { + int idx1 = n1.<Integer>attribute(IDX_ATTR); + int idx2 = n2.<Integer>attribute(IDX_ATTR); + + return idx1 < idx2 ? -1 : idx1 == idx2 ? 0 : 1; + } + }); + + int max = 1 + backups; + + if (max > nodes.size()) + max = nodes.size(); + + Collection<ClusterNode> ret = new ArrayList<>(max); + + Iterator<ClusterNode> it = sorted.iterator(); + + for (int i = 0; i < max; i++) { + ClusterNode n = null; + + if (i == 0) { + while (it.hasNext()) { + n = it.next(); + + int nodeIdx = n.<Integer>attribute(IDX_ATTR); + + if (part <= nodeIdx) + break; + else + n = null; + } + } + else { + if (it.hasNext()) + n = it.next(); + else { + it = sorted.iterator(); + + assert it.hasNext(); + + n = it.next(); + } + } + + assert n != null || nodes.size() < parts; + + if (n == null) + n = (it = sorted.iterator()).next(); + + + ret.add(n); + } + + return ret; + } + + /** + * @param parts Number of partitions. + * @param backups Number of backups. + */ + public void reset(int parts, int backups) { + this.parts = parts; + this.backups = backups; + } + + /** {@inheritDoc} */ + @Override public void reset() { + parts = -1; + backups = -1; + } + + /** {@inheritDoc} */ + @Override public int partitions() { + return parts; + } + + /** {@inheritDoc} */ + @Override public int partition(Object key) { + if (key instanceof Number) + return ((Number)key).intValue() % parts; + + return key == null ? 0 : U.safeAbs(key.hashCode() % parts); + } + + /** {@inheritDoc} */ + @Override public void removeNode(UUID nodeId) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(CacheModuloAffinityFunction.class, this); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25609651/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheAtomicTimeoutSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheAtomicTimeoutSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheAtomicTimeoutSelfTest.java index bedf1c7..b09ae90 100644 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheAtomicTimeoutSelfTest.java +++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheAtomicTimeoutSelfTest.java @@ -22,9 +22,7 @@ import org.apache.ignite.cluster.*; import org.apache.ignite.configuration.*; import org.apache.ignite.lang.*; import org.apache.ignite.spi.*; -import org.gridgain.grid.*; import org.gridgain.grid.cache.*; -import org.gridgain.grid.cache.affinity.*; import org.gridgain.grid.kernal.*; import org.gridgain.grid.kernal.managers.communication.*; import org.gridgain.grid.kernal.processors.cache.distributed.dht.atomic.*; @@ -250,7 +248,7 @@ public class GridCacheAtomicTimeoutSelfTest extends GridCommonAbstractTest { private int keyForTest() { int i = 0; - GridCacheAffinity<Object> aff = grid(0).cache(null).affinity(); + IgniteCacheAffinity<Object> aff = grid(0).affinity(null); while (!aff.isPrimary(grid(1).localNode(), i) || !aff.isBackup(grid(2).localNode(), i)) i++; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25609651/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheClientModesAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheClientModesAbstractSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheClientModesAbstractSelfTest.java index 27130de..842b78d 100644 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheClientModesAbstractSelfTest.java +++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/GridCacheClientModesAbstractSelfTest.java @@ -19,7 +19,7 @@ package org.gridgain.grid.kernal.processors.cache.distributed; import org.apache.ignite.*; import org.gridgain.grid.cache.*; -import org.gridgain.grid.cache.affinity.consistenthash.*; +import org.apache.ignite.cache.affinity.consistenthash.*; import org.gridgain.grid.kernal.processors.cache.*; import org.gridgain.grid.util.typedef.*; @@ -61,7 +61,7 @@ public abstract class GridCacheClientModesAbstractSelfTest extends GridCacheAbst } cfg.setStore(null); - cfg.setAffinity(new GridCacheConsistentHashAffinityFunction(false, 32)); + cfg.setAffinity(new CacheConsistentHashAffinityFunction(false, 32)); cfg.setBackups(1); return cfg; @@ -97,7 +97,7 @@ public abstract class GridCacheClientModesAbstractSelfTest extends GridCacheAbst for (int key = 0; key < 10; key++) { for (int i = 1; i < gridCount(); i++) { - if (grid(i).cache(null).affinity().isPrimaryOrBackup(grid(i).localNode(), key)) + if (grid(i).affinity(null).isPrimaryOrBackup(grid(i).localNode(), key)) assertEquals(key, grid(i).cache(null).peek(key)); } @@ -151,7 +151,7 @@ public abstract class GridCacheClientModesAbstractSelfTest extends GridCacheAbst if (cacheMode() == PARTITIONED) assertFalse(cache.entry(key).primary() || cache.entry(key).backup()); - assertFalse(cache.affinity().mapKeyToPrimaryAndBackups(key).contains(g.cluster().localNode())); + assertFalse(g.affinity(cache.name()).mapKeyToPrimaryAndBackups(key).contains(g.cluster().localNode())); } } else { @@ -166,7 +166,7 @@ public abstract class GridCacheClientModesAbstractSelfTest extends GridCacheAbst if (cache.entry(key).primary() || cache.entry(key).backup()) foundEntry = true; - if (cache.affinity().mapKeyToPrimaryAndBackups(key).contains(g.cluster().localNode())) + if (g.affinity(cache.name()).mapKeyToPrimaryAndBackups(key).contains(g.cluster().localNode())) foundAffinityNode = true; }