#ignite-237: move CacheConsistentHashAffinityFunction to org.gridgain.benchmarks.risk.affinity.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/d07931d2 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/d07931d2 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/d07931d2 Branch: refs/heads/ignite-141 Commit: d07931d2ddd8d4798c1ca2b7bc9620036ff59ae9 Parents: 87586da Author: ivasilinets <ivasilin...@gridgain.com> Authored: Thu Feb 26 18:23:27 2015 +0300 Committer: ivasilinets <ivasilin...@gridgain.com> Committed: Thu Feb 26 18:23:27 2015 +0300 ---------------------------------------------------------------------- .../ClientAbstractMultiThreadedSelfTest.java | 4 +- .../impl/ClientPartitionAffinitySelfTest.java | 21 +- .../ClientAbstractMultiNodeSelfTest.java | 4 +- .../integration/ClientAbstractSelfTest.java | 8 - .../src/test/resources/spring-server-node.xml | 2 +- .../test/resources/spring-server-ssl-node.xml | 2 +- .../CacheConsistentHashAffinityFunction.java | 703 ------------------- .../cache/affinity/consistenthash/package.html | 24 - .../processors/cache/GridCacheAttributes.java | 9 - .../processors/cache/GridCacheProcessor.java | 12 +- .../top/GridTopologyCommandHandler.java | 7 - .../cache/VisorCacheAffinityConfiguration.java | 7 - .../optimized/optimized-classnames.properties | 12 +- .../CacheConsistentHashAffinityFunction.java | 703 +++++++++++++++++++ .../GridCachePartitionedAffinitySpreadTest.java | 16 +- .../GridCachePartitionedAffinitySelfTest.java | 53 +- 16 files changed, 731 insertions(+), 856 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d07931d2/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientAbstractMultiThreadedSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientAbstractMultiThreadedSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientAbstractMultiThreadedSelfTest.java index 6264276..4ab2679 100644 --- a/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientAbstractMultiThreadedSelfTest.java +++ b/modules/clients/src/test/java/org/apache/ignite/internal/client/ClientAbstractMultiThreadedSelfTest.java @@ -19,7 +19,7 @@ package org.apache.ignite.internal.client; import org.apache.ignite.*; import org.apache.ignite.cache.*; -import org.apache.ignite.cache.affinity.consistenthash.*; +import org.apache.ignite.cache.affinity.rendezvous.*; import org.apache.ignite.cluster.*; import org.apache.ignite.compute.*; import org.apache.ignite.configuration.*; @@ -193,7 +193,7 @@ public abstract class ClientAbstractMultiThreadedSelfTest extends GridCommonAbst private CacheConfiguration cacheConfiguration(@Nullable String cacheName) throws Exception { CacheConfiguration cfg = defaultCacheConfiguration(); - cfg.setAffinity(new CacheConsistentHashAffinityFunction()); + cfg.setAffinity(new CacheRendezvousAffinityFunction()); cfg.setDistributionMode(NEAR_PARTITIONED); cfg.setAtomicityMode(TRANSACTIONAL); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d07931d2/modules/clients/src/test/java/org/apache/ignite/internal/client/impl/ClientPartitionAffinitySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/client/impl/ClientPartitionAffinitySelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/client/impl/ClientPartitionAffinitySelfTest.java index 3a45615..444a84c 100644 --- a/modules/clients/src/test/java/org/apache/ignite/internal/client/impl/ClientPartitionAffinitySelfTest.java +++ b/modules/clients/src/test/java/org/apache/ignite/internal/client/impl/ClientPartitionAffinitySelfTest.java @@ -18,7 +18,7 @@ package org.apache.ignite.internal.client.impl; import org.apache.ignite.cache.affinity.*; -import org.apache.ignite.cache.affinity.consistenthash.*; +import org.apache.ignite.cache.affinity.rendezvous.*; import org.apache.ignite.cluster.*; import org.apache.ignite.internal.client.*; import org.apache.ignite.internal.processors.affinity.*; @@ -28,8 +28,6 @@ import org.apache.ignite.testframework.junits.common.*; import java.util.*; -import static org.apache.ignite.cache.affinity.consistenthash.CacheConsistentHashAffinityFunction.*; - /** * Client's partitioned affinity tests. */ @@ -285,7 +283,7 @@ public class ClientPartitionAffinitySelfTest extends GridCommonAbstractTest { aff.setHashIdResolver(HASH_ID_RSLVR); - CacheConsistentHashAffinityFunction srvAff = new CacheConsistentHashAffinityFunction(); + CacheRendezvousAffinityFunction srvAff = new CacheRendezvousAffinityFunction(); getTestResources().inject(srvAff); @@ -324,7 +322,7 @@ public class ClientPartitionAffinitySelfTest extends GridCommonAbstractTest { .replicaCount(replicaCnt) .build()); - ClusterNode srvNode = new TestRichNode(nodeId, replicaCnt); + ClusterNode srvNode = new TestRichNode(nodeId); srvNodes.add(srvNode); } @@ -366,28 +364,22 @@ public class ClientPartitionAffinitySelfTest extends GridCommonAbstractTest { */ private final UUID nodeId; - /** - * Partitioned affinity replicas count. - */ - private final Integer replicaCnt; /** * Externalizable class requires public no-arg constructor. */ @SuppressWarnings("UnusedDeclaration") public TestRichNode() { - this(UUID.randomUUID(), DFLT_REPLICA_COUNT); + this(UUID.randomUUID()); } /** * Constructs rich node stub to use in emulated server topology. * * @param nodeId Node id. - * @param replicaCnt Partitioned affinity replicas count. */ - private TestRichNode(UUID nodeId, int replicaCnt) { + private TestRichNode(UUID nodeId) { this.nodeId = nodeId; - this.replicaCnt = replicaCnt; } /** {@inheritDoc} */ @@ -397,9 +389,6 @@ public class ClientPartitionAffinitySelfTest extends GridCommonAbstractTest { /** {@inheritDoc} */ @Override public <T> T attribute(String name) { - if (DFLT_REPLICA_COUNT_ATTR_NAME.equals(name)) - return (T)replicaCnt; - return super.attribute(name); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d07931d2/modules/clients/src/test/java/org/apache/ignite/internal/client/integration/ClientAbstractMultiNodeSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/client/integration/ClientAbstractMultiNodeSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/client/integration/ClientAbstractMultiNodeSelfTest.java index 3da6080..0eb6e16 100644 --- a/modules/clients/src/test/java/org/apache/ignite/internal/client/integration/ClientAbstractMultiNodeSelfTest.java +++ b/modules/clients/src/test/java/org/apache/ignite/internal/client/integration/ClientAbstractMultiNodeSelfTest.java @@ -19,7 +19,7 @@ package org.apache.ignite.internal.client.integration; import org.apache.ignite.*; import org.apache.ignite.cache.*; -import org.apache.ignite.cache.affinity.consistenthash.*; +import org.apache.ignite.cache.affinity.rendezvous.*; import org.apache.ignite.cluster.*; import org.apache.ignite.compute.*; import org.apache.ignite.configuration.*; @@ -196,7 +196,7 @@ public abstract class ClientAbstractMultiNodeSelfTest extends GridCommonAbstract cfg.setWriteSynchronizationMode(REPLICATED_ASYNC_CACHE_NAME.equals(cacheName) ? FULL_ASYNC : FULL_SYNC); - cfg.setAffinity(new CacheConsistentHashAffinityFunction()); + cfg.setAffinity(new CacheRendezvousAffinityFunction()); return cfg; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d07931d2/modules/clients/src/test/java/org/apache/ignite/internal/client/integration/ClientAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/client/integration/ClientAbstractSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/client/integration/ClientAbstractSelfTest.java index e55b09d..8011945 100644 --- a/modules/clients/src/test/java/org/apache/ignite/internal/client/integration/ClientAbstractSelfTest.java +++ b/modules/clients/src/test/java/org/apache/ignite/internal/client/integration/ClientAbstractSelfTest.java @@ -21,7 +21,6 @@ import junit.framework.*; import net.sf.json.*; import org.apache.ignite.*; import org.apache.ignite.cache.*; -import org.apache.ignite.cache.affinity.consistenthash.*; import org.apache.ignite.cache.store.*; import org.apache.ignite.compute.*; import org.apache.ignite.configuration.*; @@ -1094,13 +1093,6 @@ public abstract class ClientAbstractSelfTest extends GridCommonAbstractTest { assertEquals(grid().localNode().id(), node.nodeId()); assertEquals(4, node.caches().size()); - Integer replica = grid().localNode().attribute(CacheConsistentHashAffinityFunction.DFLT_REPLICA_COUNT_ATTR_NAME); - - if (replica == null) - replica = CacheConsistentHashAffinityFunction.DFLT_REPLICA_COUNT; - - assertEquals((int)replica, node.replicaCount()); - Map<String, GridClientCacheMode> caches = node.caches(); for (Map.Entry<String, GridClientCacheMode> e : caches.entrySet()) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d07931d2/modules/clients/src/test/resources/spring-server-node.xml ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/resources/spring-server-node.xml b/modules/clients/src/test/resources/spring-server-node.xml index 3b19f3d..3496e37 100644 --- a/modules/clients/src/test/resources/spring-server-node.xml +++ b/modules/clients/src/test/resources/spring-server-node.xml @@ -118,7 +118,7 @@ <property name="writeSynchronizationMode" value="FULL_SYNC"/> <property name="affinity"> - <bean class="org.apache.ignite.cache.affinity.consistenthash.CacheConsistentHashAffinityFunction"> + <bean class="org.apache.ignite.cache.affinity.rendezvous.CacheRendezvousAffinityFunction"> <constructor-arg value="1"/> </bean> </property> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d07931d2/modules/clients/src/test/resources/spring-server-ssl-node.xml ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/resources/spring-server-ssl-node.xml b/modules/clients/src/test/resources/spring-server-ssl-node.xml index 3989ac6..c56f25a 100644 --- a/modules/clients/src/test/resources/spring-server-ssl-node.xml +++ b/modules/clients/src/test/resources/spring-server-ssl-node.xml @@ -119,7 +119,7 @@ <property name="writeSynchronizationMode" value="FULL_SYNC"/> <property name="affinity"> - <bean class="org.apache.ignite.cache.affinity.consistenthash.CacheConsistentHashAffinityFunction"> + <bean class="org.apache.ignite.cache.affinity.rendezvous.CacheRendezvousAffinityFunction"> <constructor-arg value="1"/> </bean> </property> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d07931d2/modules/core/src/main/java/org/apache/ignite/cache/affinity/consistenthash/CacheConsistentHashAffinityFunction.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/affinity/consistenthash/CacheConsistentHashAffinityFunction.java b/modules/core/src/main/java/org/apache/ignite/cache/affinity/consistenthash/CacheConsistentHashAffinityFunction.java deleted file mode 100644 index 993205e..0000000 --- a/modules/core/src/main/java/org/apache/ignite/cache/affinity/consistenthash/CacheConsistentHashAffinityFunction.java +++ /dev/null @@ -1,703 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.cache.affinity.consistenthash; - -import org.apache.ignite.*; -import org.apache.ignite.cache.affinity.*; -import org.apache.ignite.cluster.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.util.*; -import org.apache.ignite.internal.util.tostring.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.resources.*; -import org.jdk8.backport.*; -import org.jetbrains.annotations.*; - -import java.util.*; -import java.util.concurrent.*; -import java.util.concurrent.atomic.*; - -/** - * Affinity function for partitioned cache. This function supports the following - * configuration: - * <ul> - * <li> - * {@code backups} - Use this flag to control how many back up nodes will be - * assigned to every key. The default value is {@code 0}. - * </li> - * <li> - * {@code replicas} - Generally the more replicas a node gets, the more key assignments - * it will receive. You can configure different number of replicas for a node by - * setting user attribute with name {@link #getReplicaCountAttributeName()} to some - * number. Default value is {@code 512} defined by {@link #DFLT_REPLICA_COUNT} constant. - * </li> - * <li> - * {@code backupFilter} - Optional filter for back up nodes. If provided, then only - * nodes that pass this filter will be selected as backup nodes. If not provided, then - * primary and backup nodes will be selected out of all nodes available for this cache. - * </li> - * </ul> - * <p> - * Cache affinity can be configured for individual caches via {@link org.apache.ignite.configuration.CacheConfiguration#getAffinity()} method. - */ -@Deprecated -public class CacheConsistentHashAffinityFunction implements CacheAffinityFunction { - /** */ - private static final long serialVersionUID = 0L; - - /** Flag to enable/disable consistency check (for internal use only). */ - private static final boolean AFFINITY_CONSISTENCY_CHECK = Boolean.getBoolean("IGNITE_AFFINITY_CONSISTENCY_CHECK"); - - /** Default number of partitions. */ - public static final int DFLT_PARTITION_COUNT = 10000; - - /** Default replica count for partitioned caches. */ - public static final int DFLT_REPLICA_COUNT = 128; - - /** - * Name of node attribute to specify number of replicas for a node. - * Default value is {@code gg:affinity:node:replicas}. - */ - public static final String DFLT_REPLICA_COUNT_ATTR_NAME = "gg:affinity:node:replicas"; - - /** Node hash. */ - private transient GridConsistentHash<NodeInfo> nodeHash; - - /** Total number of partitions. */ - private int parts = DFLT_PARTITION_COUNT; - - /** */ - private int replicas = DFLT_REPLICA_COUNT; - - /** */ - private String attrName = DFLT_REPLICA_COUNT_ATTR_NAME; - - /** */ - private boolean exclNeighbors; - - /** - * Optional backup filter. First node passed to this filter is primary node, - * and second node is a node being tested. - */ - private IgniteBiPredicate<ClusterNode, ClusterNode> backupFilter; - - /** */ - private CacheAffinityNodeHashResolver hashIdRslvr = new CacheAffinityNodeAddressHashResolver(); - - /** Injected grid. */ - @IgniteInstanceResource - private Ignite ignite; - - /** Injected cache name. */ - @CacheNameResource - private String cacheName; - - /** Injected logger. */ - @LoggerResource - private IgniteLogger log; - - /** Initialization flag. */ - @SuppressWarnings("TransientFieldNotInitialized") - private transient AtomicBoolean init = new AtomicBoolean(); - - /** Latch for initializing. */ - @SuppressWarnings({"TransientFieldNotInitialized"}) - private transient CountDownLatch initLatch = new CountDownLatch(1); - - /** Nodes IDs. */ - @GridToStringInclude - @SuppressWarnings({"TransientFieldNotInitialized"}) - private transient ConcurrentMap<UUID, NodeInfo> addedNodes = new ConcurrentHashMap<>(); - - /** Optional backup filter. */ - @GridToStringExclude - private final IgniteBiPredicate<NodeInfo, NodeInfo> backupIdFilter = new IgniteBiPredicate<NodeInfo, NodeInfo>() { - @Override public boolean apply(NodeInfo primaryNodeInfo, NodeInfo nodeInfo) { - return backupFilter == null || backupFilter.apply(primaryNodeInfo.node(), nodeInfo.node()); - } - }; - - /** Map of neighbors. */ - @SuppressWarnings("TransientFieldNotInitialized") - private transient ConcurrentMap<UUID, Collection<UUID>> neighbors = - new ConcurrentHashMap8<>(); - - /** - * Empty constructor with all defaults. - */ - public CacheConsistentHashAffinityFunction() { - // No-op. - } - - /** - * Initializes affinity with flag to exclude same-host-neighbors from being backups of each other - * and specified number of backups. - * <p> - * Note that {@code excludeNeighbors} parameter is ignored if {@code #getBackupFilter()} is set. - * - * @param exclNeighbors {@code True} if nodes residing on the same host may not act as backups - * of each other. - */ - public CacheConsistentHashAffinityFunction(boolean exclNeighbors) { - this.exclNeighbors = exclNeighbors; - } - - /** - * Initializes affinity with flag to exclude same-host-neighbors from being backups of each other, - * and specified number of backups and partitions. - * <p> - * Note that {@code excludeNeighbors} parameter is ignored if {@code #getBackupFilter()} is set. - * - * @param exclNeighbors {@code True} if nodes residing on the same host may not act as backups - * of each other. - * @param parts Total number of partitions. - */ - public CacheConsistentHashAffinityFunction(boolean exclNeighbors, int parts) { - A.ensure(parts != 0, "parts != 0"); - - this.exclNeighbors = exclNeighbors; - this.parts = parts; - } - - /** - * Initializes optional counts for replicas and backups. - * <p> - * Note that {@code excludeNeighbors} parameter is ignored if {@code backupFilter} is set. - * - * @param parts Total number of partitions. - * @param backupFilter Optional back up filter for nodes. If provided, backups will be selected - * from all nodes that pass this filter. First argument for this filter is primary node, and second - * argument is node being tested. - * <p> - * Note that {@code excludeNeighbors} parameter is ignored if {@code backupFilter} is set. - */ - public CacheConsistentHashAffinityFunction(int parts, - @Nullable IgniteBiPredicate<ClusterNode, ClusterNode> backupFilter) { - A.ensure(parts != 0, "parts != 0"); - - this.parts = parts; - this.backupFilter = backupFilter; - } - - /** - * Gets default count of virtual replicas in consistent hash ring. - * <p> - * To determine node replicas, node attribute with {@link #getReplicaCountAttributeName()} - * name will be checked first. If it is absent, then this value will be used. - * - * @return Count of virtual replicas in consistent hash ring. - */ - public int getDefaultReplicas() { - return replicas; - } - - /** - * Sets default count of virtual replicas in consistent hash ring. - * <p> - * To determine node replicas, node attribute with {@link #getReplicaCountAttributeName} name - * will be checked first. If it is absent, then this value will be used. - * - * @param replicas Count of virtual replicas in consistent hash ring.s - */ - public void setDefaultReplicas(int replicas) { - this.replicas = replicas; - } - - /** - * Gets total number of key partitions. To ensure that all partitions are - * equally distributed across all nodes, please make sure that this - * number is significantly larger than a number of nodes. Also, partition - * size should be relatively small. Try to avoid having partitions with more - * than quarter million keys. - * <p> - * Note that for fully replicated caches this method should always - * return {@code 1}. - * - * @return Total partition count. - */ - public int getPartitions() { - return parts; - } - - /** - * Sets total number of partitions. - * - * @param parts Total number of partitions. - */ - public void setPartitions(int parts) { - this.parts = parts; - } - - /** - * Gets hash ID resolver for nodes. This resolver is used to provide - * alternate hash ID, other than node ID. - * <p> - * Node IDs constantly change when nodes get restarted, which causes them to - * be placed on different locations in the hash ring, and hence causing - * repartitioning. Providing an alternate hash ID, which survives node restarts, - * puts node on the same location on the hash ring, hence minimizing required - * repartitioning. - * - * @return Hash ID resolver. - */ - public CacheAffinityNodeHashResolver getHashIdResolver() { - return hashIdRslvr; - } - - /** - * Sets hash ID resolver for nodes. This resolver is used to provide - * alternate hash ID, other than node ID. - * <p> - * Node IDs constantly change when nodes get restarted, which causes them to - * be placed on different locations in the hash ring, and hence causing - * repartitioning. Providing an alternate hash ID, which survives node restarts, - * puts node on the same location on the hash ring, hence minimizing required - * repartitioning. - * - * @param hashIdRslvr Hash ID resolver. - */ - public void setHashIdResolver(CacheAffinityNodeHashResolver hashIdRslvr) { - this.hashIdRslvr = hashIdRslvr; - } - - /** - * Gets optional backup filter. If not {@code null}, backups will be selected - * from all nodes that pass this filter. First node passed to this filter is primary node, - * and second node is a node being tested. - * <p> - * Note that {@code excludeNeighbors} parameter is ignored if {@code backupFilter} is set. - * - * @return Optional backup filter. - */ - @Nullable public IgniteBiPredicate<ClusterNode, ClusterNode> getBackupFilter() { - return backupFilter; - } - - /** - * Sets optional backup filter. If provided, then backups will be selected from all - * nodes that pass this filter. First node being passed to this filter is primary node, - * and second node is a node being tested. - * <p> - * Note that {@code excludeNeighbors} parameter is ignored if {@code backupFilter} is set. - * - * @param backupFilter Optional backup filter. - */ - public void setBackupFilter(@Nullable IgniteBiPredicate<ClusterNode, ClusterNode> backupFilter) { - this.backupFilter = backupFilter; - } - - /** - * Gets optional attribute name for replica count. If not provided, the - * default is {@link #DFLT_REPLICA_COUNT_ATTR_NAME}. - * - * @return User attribute name for replica count for a node. - */ - public String getReplicaCountAttributeName() { - return attrName; - } - - /** - * Sets optional attribute name for replica count. If not provided, the - * default is {@link #DFLT_REPLICA_COUNT_ATTR_NAME}. - * - * @param attrName User attribute name for replica count for a node. - */ - public void setReplicaCountAttributeName(String attrName) { - this.attrName = attrName; - } - - /** - * Checks flag to exclude same-host-neighbors from being backups of each other (default is {@code false}). - * <p> - * Note that {@code excludeNeighbors} parameter is ignored if {@code #getBackupFilter()} is set. - * - * @return {@code True} if nodes residing on the same host may not act as backups of each other. - */ - public boolean isExcludeNeighbors() { - return exclNeighbors; - } - - /** - * Sets flag to exclude same-host-neighbors from being backups of each other (default is {@code false}). - * <p> - * Note that {@code excludeNeighbors} parameter is ignored if {@code #getBackupFilter()} is set. - * - * @param exclNeighbors {@code True} if nodes residing on the same host may not act as backups of each other. - */ - public void setExcludeNeighbors(boolean exclNeighbors) { - this.exclNeighbors = exclNeighbors; - } - - /** - * Gets neighbors for a node. - * - * @param node Node. - * @return Neighbors. - */ - private Collection<UUID> neighbors(final ClusterNode node) { - Collection<UUID> ns = neighbors.get(node.id()); - - if (ns == null) { - Collection<ClusterNode> nodes = ignite.cluster().forHost(node).nodes(); - - ns = F.addIfAbsent(neighbors, node.id(), new ArrayList<>(F.nodeIds(nodes))); - } - - return ns; - } - - /** {@inheritDoc} */ - @SuppressWarnings("unchecked") - @Override public List<List<ClusterNode>> assignPartitions(CacheAffinityFunctionContext ctx) { - List<List<ClusterNode>> res = new ArrayList<>(parts); - - Collection<ClusterNode> topSnapshot = ctx.currentTopologySnapshot(); - - for (int part = 0; part < parts; part++) { - res.add(F.isEmpty(topSnapshot) ? - Collections.<ClusterNode>emptyList() : - // Wrap affinity nodes with unmodifiable list since unmodifiable generic collection - // doesn't provide equals and hashCode implementations. - U.sealList(nodes(part, topSnapshot, ctx.backups()))); - } - - return res; - } - - /** - * Assigns nodes to one partition. - * - * @param part Partition to assign nodes for. - * @param nodes Cache topology nodes. - * @return Assigned nodes, first node is primary, others are backups. - */ - public Collection<ClusterNode> nodes(int part, Collection<ClusterNode> nodes, int backups) { - if (nodes == null) - return Collections.emptyList(); - - int nodesSize = nodes.size(); - - if (nodesSize == 0) - return Collections.emptyList(); - - if (nodesSize == 1) // Minor optimization. - return nodes; - - initialize(); - - final Map<NodeInfo, ClusterNode> lookup = new GridLeanMap<>(nodesSize); - - // Store nodes in map for fast lookup. - for (ClusterNode n : nodes) - // Add nodes into hash circle, if absent. - lookup.put(resolveNodeInfo(n), n); - - Collection<NodeInfo> selected; - - if (backupFilter != null) { - final IgnitePredicate<NodeInfo> p = new P1<NodeInfo>() { - @Override public boolean apply(NodeInfo id) { - return lookup.containsKey(id); - } - }; - - final NodeInfo primaryId = nodeHash.node(part, p); - - IgnitePredicate<NodeInfo> backupPrimaryIdFilter = new IgnitePredicate<NodeInfo>() { - @Override public boolean apply(NodeInfo node) { - return backupIdFilter.apply(primaryId, node); - } - }; - - Collection<NodeInfo> backupIds = nodeHash.nodes(part, backups, p, backupPrimaryIdFilter); - - if (F.isEmpty(backupIds) && primaryId != null) { - ClusterNode n = lookup.get(primaryId); - - assert n != null; - - return Collections.singletonList(n); - } - - selected = primaryId != null ? F.concat(false, primaryId, backupIds) : backupIds; - } - else { - if (!exclNeighbors) { - selected = nodeHash.nodes(part, backups == Integer.MAX_VALUE ? backups : backups + 1, new P1<NodeInfo>() { - @Override public boolean apply(NodeInfo id) { - return lookup.containsKey(id); - } - }); - - if (selected.size() == 1) { - NodeInfo id = F.first(selected); - - assert id != null : "Node ID cannot be null in affinity node ID collection: " + selected; - - ClusterNode n = lookup.get(id); - - assert n != null; - - return Collections.singletonList(n); - } - } - else { - int primaryAndBackups = backups + 1; - - selected = new ArrayList<>(primaryAndBackups); - - final Collection<NodeInfo> selected0 = selected; - - List<NodeInfo> ids = nodeHash.nodes(part, primaryAndBackups, new P1<NodeInfo>() { - @Override public boolean apply(NodeInfo id) { - ClusterNode n = lookup.get(id); - - if (n == null) - return false; - - Collection<UUID> neighbors = neighbors(n); - - for (NodeInfo id0 : selected0) { - ClusterNode n0 = lookup.get(id0); - - if (n0 == null) - return false; - - Collection<UUID> neighbors0 = neighbors(n0); - - if (F.containsAny(neighbors0, neighbors)) - return false; - } - - selected0.add(id); - - return true; - } - }); - - if (AFFINITY_CONSISTENCY_CHECK) - assert F.eqOrdered(ids, selected); - } - } - - Collection<ClusterNode> ret = new ArrayList<>(selected.size()); - - for (NodeInfo id : selected) { - ClusterNode n = lookup.get(id); - - assert n != null; - - ret.add(n); - } - - return ret; - } - - /** {@inheritDoc} */ - @Override public int partition(Object key) { - initialize(); - - return U.safeAbs(key.hashCode() % parts); - } - - /** {@inheritDoc} */ - @Override public int partitions() { - initialize(); - - return parts; - } - - /** {@inheritDoc} */ - @Override public void reset() { - addedNodes = new ConcurrentHashMap<>(); - neighbors = new ConcurrentHashMap8<>(); - - initLatch = new CountDownLatch(1); - - init = new AtomicBoolean(); - } - - /** {@inheritDoc} */ - @Override public void removeNode(UUID nodeId) { - NodeInfo info = addedNodes.remove(nodeId); - - if (info == null) - return; - - nodeHash.removeNode(info); - - neighbors.clear(); - } - - /** - * Resolve node info for specified node. - * Add node to hash circle if this is the first node invocation. - * - * @param n Node to get info for. - * @return Node info. - */ - private NodeInfo resolveNodeInfo(ClusterNode n) { - UUID nodeId = n.id(); - NodeInfo nodeInfo = addedNodes.get(nodeId); - - if (nodeInfo != null) - return nodeInfo; - - assert hashIdRslvr != null; - - nodeInfo = new NodeInfo(nodeId, hashIdRslvr.resolve(n), n); - - neighbors.clear(); - - nodeHash.addNode(nodeInfo, replicas(n)); - - addedNodes.put(nodeId, nodeInfo); - - return nodeInfo; - } - - /** {@inheritDoc} */ - private void initialize() { - if (!init.get() && init.compareAndSet(false, true)) { - if (log.isInfoEnabled()) - log.info("Consistent hash configuration [cacheName=" + cacheName + ", partitions=" + parts + - ", excludeNeighbors=" + exclNeighbors + ", replicas=" + replicas + - ", backupFilter=" + backupFilter + ", hashIdRslvr=" + hashIdRslvr + ']'); - - nodeHash = new GridConsistentHash<>(); - - initLatch.countDown(); - } - else { - if (initLatch.getCount() > 0) { - try { - U.await(initLatch); - } - catch (IgniteInterruptedCheckedException ignored) { - // Recover interrupted state flag. - Thread.currentThread().interrupt(); - } - } - } - } - - /** - * @param n Node. - * @return Replicas. - */ - private int replicas(ClusterNode n) { - Integer nodeReplicas = n.attribute(attrName); - - if (nodeReplicas == null) - nodeReplicas = replicas; - - return nodeReplicas; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(CacheConsistentHashAffinityFunction.class, this); - } - - /** - * Node hash ID. - */ - private static final class NodeInfo implements Comparable<NodeInfo> { - /** Node ID. */ - private UUID nodeId; - - /** Hash ID. */ - private Object hashId; - - /** Grid node. */ - private ClusterNode node; - - /** - * @param nodeId Node ID. - * @param hashId Hash ID. - * @param node Rich node. - */ - private NodeInfo(UUID nodeId, Object hashId, ClusterNode node) { - assert nodeId != null; - assert hashId != null; - - this.hashId = hashId; - this.nodeId = nodeId; - this.node = node; - } - - /** - * @return Node ID. - */ - public UUID nodeId() { - return nodeId; - } - - /** - * @return Hash ID. - */ - public Object hashId() { - return hashId; - } - - /** - * @return Node. - */ - public ClusterNode node() { - return node; - } - - /** {@inheritDoc} */ - @Override public int hashCode() { - return hashId.hashCode(); - } - - /** {@inheritDoc} */ - @Override public boolean equals(Object obj) { - if (!(obj instanceof NodeInfo)) - return false; - - NodeInfo that = (NodeInfo)obj; - - // If objects are equal, hash codes should be the same. - // Cannot use that.hashId.equals(hashId) due to Comparable<N> interface restrictions. - return that.nodeId.equals(nodeId) && that.hashCode() == hashCode(); - } - - /** {@inheritDoc} */ - @Override public int compareTo(NodeInfo o) { - int diff = nodeId.compareTo(o.nodeId); - - if (diff == 0) { - int h1 = hashCode(); - int h2 = o.hashCode(); - - diff = h1 == h2 ? 0 : (h1 < h2 ? -1 : 1); - } - - return diff; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(NodeInfo.class, this); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d07931d2/modules/core/src/main/java/org/apache/ignite/cache/affinity/consistenthash/package.html ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/affinity/consistenthash/package.html b/modules/core/src/main/java/org/apache/ignite/cache/affinity/consistenthash/package.html deleted file mode 100644 index 6f05382..0000000 --- a/modules/core/src/main/java/org/apache/ignite/cache/affinity/consistenthash/package.html +++ /dev/null @@ -1,24 +0,0 @@ -<!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. ---> - -<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> -<html> -<body> - <!-- Package description. --> - Contains consistent hash based cache affinity for partitioned cache. -</body> -</html> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d07931d2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAttributes.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAttributes.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAttributes.java index a4762ab..1f8b0b4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAttributes.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAttributes.java @@ -192,15 +192,6 @@ public class GridCacheAttributes implements Externalizable { CacheAffinityFunction aff = cfg.getAffinity(); if (aff != null) { - if (aff instanceof CacheConsistentHashAffinityFunction) { - CacheConsistentHashAffinityFunction aff0 = (CacheConsistentHashAffinityFunction) aff; - - affInclNeighbors = aff0.isExcludeNeighbors(); - affReplicas = aff0.getDefaultReplicas(); - affReplicaCntAttrName = aff0.getReplicaCountAttributeName(); - affHashIdRslvrClsName = className(aff0.getHashIdResolver()); - } - affPartsCnt = aff.partitions(); affClsName = className(aff); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d07931d2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index e901176..66a751c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -305,14 +305,6 @@ public class GridCacheProcessor extends GridProcessorAdapter { throw new IgniteCheckedException("REPLICATED cache can not be started with CachePartitionFairAffinity" + " [cacheName=" + cc.getName() + ']'); - if (cc.getAffinity() instanceof CacheConsistentHashAffinityFunction) { - CacheConsistentHashAffinityFunction aff = (CacheConsistentHashAffinityFunction)cc.getAffinity(); - - if (aff.isExcludeNeighbors()) - throw new IgniteCheckedException("For REPLICATED cache flag 'excludeNeighbors' in " + - "CacheConsistentHashAffinityFunction cannot be set [cacheName=" + cc.getName() + ']'); - } - if (cc.getAffinity() instanceof CacheRendezvousAffinityFunction) { CacheRendezvousAffinityFunction aff = (CacheRendezvousAffinityFunction)cc.getAffinity(); @@ -1002,8 +994,8 @@ public class GridCacheProcessor extends GridProcessorAdapter { for (GridCacheAdapter cache : ctx.cache().internalCaches()) { CacheConfiguration cfg = cache.configuration(); - if (cfg.getAffinity() instanceof CacheConsistentHashAffinityFunction) { - CacheConsistentHashAffinityFunction aff = (CacheConsistentHashAffinityFunction)cfg.getAffinity(); + if (cfg.getAffinity() instanceof CacheRendezvousAffinityFunction) { + CacheRendezvousAffinityFunction aff = (CacheRendezvousAffinityFunction)cfg.getAffinity(); CacheAffinityNodeHashResolver hashIdRslvr = aff.getHashIdResolver(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d07931d2/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/top/GridTopologyCommandHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/top/GridTopologyCommandHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/top/GridTopologyCommandHandler.java index e32f6f7..4bcc566 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/top/GridTopologyCommandHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/top/GridTopologyCommandHandler.java @@ -174,13 +174,6 @@ public class GridTopologyCommandHandler extends GridRestCommandHandlerAdapter { nodeBean.setTcpAddresses(nonEmptyList(node.<Collection<String>>attribute(ATTR_REST_TCP_ADDRS))); nodeBean.setTcpHostNames(nonEmptyList(node.<Collection<String>>attribute(ATTR_REST_TCP_HOST_NAMES))); - Integer dfltReplicaCnt = node.attribute(CacheConsistentHashAffinityFunction.DFLT_REPLICA_COUNT_ATTR_NAME); - - if (dfltReplicaCnt == null) - dfltReplicaCnt = CacheConsistentHashAffinityFunction.DFLT_REPLICA_COUNT; - - nodeBean.setReplicaCount(dfltReplicaCnt); - GridCacheAttributes[] caches = node.attribute(ATTR_CACHE); if (!F.isEmpty(caches)) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d07931d2/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheAffinityConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheAffinityConfiguration.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheAffinityConfiguration.java index 3d4d84a..7aae6f3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheAffinityConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheAffinityConfiguration.java @@ -62,13 +62,6 @@ public class VisorCacheAffinityConfiguration implements Serializable { Integer dfltReplicas = null; Boolean excludeNeighbors = null; - if (aff instanceof CacheConsistentHashAffinityFunction) { - CacheConsistentHashAffinityFunction hashAffFunc = (CacheConsistentHashAffinityFunction)aff; - - dfltReplicas = hashAffFunc.getDefaultReplicas(); - excludeNeighbors = hashAffFunc.isExcludeNeighbors(); - } - VisorCacheAffinityConfiguration cfg = new VisorCacheAffinityConfiguration(); cfg.function(compactClass(aff)); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d07931d2/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/optimized-classnames.properties ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/optimized-classnames.properties b/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/optimized-classnames.properties index 5050104..ea752c7 100644 --- a/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/optimized-classnames.properties +++ b/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/optimized-classnames.properties @@ -79,12 +79,12 @@ org.apache.ignite.cache.CacheWriteSynchronizationMode org.apache.ignite.cache.affinity.CacheAffinityKey org.apache.ignite.cache.affinity.CacheAffinityNodeAddressHashResolver org.apache.ignite.cache.affinity.CacheAffinityNodeIdHashResolver -org.apache.ignite.cache.affinity.consistenthash.CacheConsistentHashAffinityFunction -org.apache.ignite.cache.affinity.consistenthash.CacheConsistentHashAffinityFunction$1 -org.apache.ignite.cache.affinity.consistenthash.CacheConsistentHashAffinityFunction$2 -org.apache.ignite.cache.affinity.consistenthash.CacheConsistentHashAffinityFunction$3 -org.apache.ignite.cache.affinity.consistenthash.CacheConsistentHashAffinityFunction$4 -org.apache.ignite.cache.affinity.consistenthash.CacheConsistentHashAffinityFunction$5 +org.gridgain.benchmarks.risk.affinity.CacheConsistentHashAffinityFunction +org.gridgain.benchmarks.risk.affinity.CacheConsistentHashAffinityFunction$1 +org.gridgain.benchmarks.risk.affinity.CacheConsistentHashAffinityFunction$2 +org.gridgain.benchmarks.risk.affinity.CacheConsistentHashAffinityFunction$3 +org.gridgain.benchmarks.risk.affinity.CacheConsistentHashAffinityFunction$4 +org.gridgain.benchmarks.risk.affinity.CacheConsistentHashAffinityFunction$5 org.apache.ignite.cache.affinity.fair.CachePartitionFairAffinity org.apache.ignite.cache.affinity.fair.CachePartitionFairAffinity$PartitionSetComparator org.apache.ignite.cache.affinity.rendezvous.CacheRendezvousAffinityFunction http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d07931d2/modules/core/src/main/java/org/gridgain/benchmarks/risk/affinity/CacheConsistentHashAffinityFunction.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/benchmarks/risk/affinity/CacheConsistentHashAffinityFunction.java b/modules/core/src/main/java/org/gridgain/benchmarks/risk/affinity/CacheConsistentHashAffinityFunction.java new file mode 100644 index 0000000..35be9b8 --- /dev/null +++ b/modules/core/src/main/java/org/gridgain/benchmarks/risk/affinity/CacheConsistentHashAffinityFunction.java @@ -0,0 +1,703 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.gridgain.benchmarks.risk.affinity; + +import org.apache.ignite.*; +import org.apache.ignite.cache.affinity.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.util.*; +import org.apache.ignite.internal.util.tostring.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.resources.*; +import org.jdk8.backport.*; +import org.jetbrains.annotations.*; + +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +/** + * Affinity function for partitioned cache. This function supports the following + * configuration: + * <ul> + * <li> + * {@code backups} - Use this flag to control how many back up nodes will be + * assigned to every key. The default value is {@code 0}. + * </li> + * <li> + * {@code replicas} - Generally the more replicas a node gets, the more key assignments + * it will receive. You can configure different number of replicas for a node by + * setting user attribute with name {@link #getReplicaCountAttributeName()} to some + * number. Default value is {@code 512} defined by {@link #DFLT_REPLICA_COUNT} constant. + * </li> + * <li> + * {@code backupFilter} - Optional filter for back up nodes. If provided, then only + * nodes that pass this filter will be selected as backup nodes. If not provided, then + * primary and backup nodes will be selected out of all nodes available for this cache. + * </li> + * </ul> + * <p> + * Cache affinity can be configured for individual caches via {@link org.apache.ignite.configuration.CacheConfiguration#getAffinity()} method. + */ +@Deprecated +public class CacheConsistentHashAffinityFunction implements CacheAffinityFunction { + /** */ + private static final long serialVersionUID = 0L; + + /** Flag to enable/disable consistency check (for internal use only). */ + private static final boolean AFFINITY_CONSISTENCY_CHECK = Boolean.getBoolean("IGNITE_AFFINITY_CONSISTENCY_CHECK"); + + /** Default number of partitions. */ + public static final int DFLT_PARTITION_COUNT = 10000; + + /** Default replica count for partitioned caches. */ + public static final int DFLT_REPLICA_COUNT = 128; + + /** + * Name of node attribute to specify number of replicas for a node. + * Default value is {@code gg:affinity:node:replicas}. + */ + public static final String DFLT_REPLICA_COUNT_ATTR_NAME = "gg:affinity:node:replicas"; + + /** Node hash. */ + private transient GridConsistentHash<NodeInfo> nodeHash; + + /** Total number of partitions. */ + private int parts = DFLT_PARTITION_COUNT; + + /** */ + private int replicas = DFLT_REPLICA_COUNT; + + /** */ + private String attrName = DFLT_REPLICA_COUNT_ATTR_NAME; + + /** */ + private boolean exclNeighbors; + + /** + * Optional backup filter. First node passed to this filter is primary node, + * and second node is a node being tested. + */ + private IgniteBiPredicate<ClusterNode, ClusterNode> backupFilter; + + /** */ + private CacheAffinityNodeHashResolver hashIdRslvr = new CacheAffinityNodeAddressHashResolver(); + + /** Injected grid. */ + @IgniteInstanceResource + private Ignite ignite; + + /** Injected cache name. */ + @CacheNameResource + private String cacheName; + + /** Injected logger. */ + @LoggerResource + private IgniteLogger log; + + /** Initialization flag. */ + @SuppressWarnings("TransientFieldNotInitialized") + private transient AtomicBoolean init = new AtomicBoolean(); + + /** Latch for initializing. */ + @SuppressWarnings({"TransientFieldNotInitialized"}) + private transient CountDownLatch initLatch = new CountDownLatch(1); + + /** Nodes IDs. */ + @GridToStringInclude + @SuppressWarnings({"TransientFieldNotInitialized"}) + private transient ConcurrentMap<UUID, NodeInfo> addedNodes = new ConcurrentHashMap<>(); + + /** Optional backup filter. */ + @GridToStringExclude + private final IgniteBiPredicate<NodeInfo, NodeInfo> backupIdFilter = new IgniteBiPredicate<NodeInfo, NodeInfo>() { + @Override public boolean apply(NodeInfo primaryNodeInfo, NodeInfo nodeInfo) { + return backupFilter == null || backupFilter.apply(primaryNodeInfo.node(), nodeInfo.node()); + } + }; + + /** Map of neighbors. */ + @SuppressWarnings("TransientFieldNotInitialized") + private transient ConcurrentMap<UUID, Collection<UUID>> neighbors = + new ConcurrentHashMap8<>(); + + /** + * Empty constructor with all defaults. + */ + public CacheConsistentHashAffinityFunction() { + // No-op. + } + + /** + * Initializes affinity with flag to exclude same-host-neighbors from being backups of each other + * and specified number of backups. + * <p> + * Note that {@code excludeNeighbors} parameter is ignored if {@code #getBackupFilter()} is set. + * + * @param exclNeighbors {@code True} if nodes residing on the same host may not act as backups + * of each other. + */ + public CacheConsistentHashAffinityFunction(boolean exclNeighbors) { + this.exclNeighbors = exclNeighbors; + } + + /** + * Initializes affinity with flag to exclude same-host-neighbors from being backups of each other, + * and specified number of backups and partitions. + * <p> + * Note that {@code excludeNeighbors} parameter is ignored if {@code #getBackupFilter()} is set. + * + * @param exclNeighbors {@code True} if nodes residing on the same host may not act as backups + * of each other. + * @param parts Total number of partitions. + */ + public CacheConsistentHashAffinityFunction(boolean exclNeighbors, int parts) { + A.ensure(parts != 0, "parts != 0"); + + this.exclNeighbors = exclNeighbors; + this.parts = parts; + } + + /** + * Initializes optional counts for replicas and backups. + * <p> + * Note that {@code excludeNeighbors} parameter is ignored if {@code backupFilter} is set. + * + * @param parts Total number of partitions. + * @param backupFilter Optional back up filter for nodes. If provided, backups will be selected + * from all nodes that pass this filter. First argument for this filter is primary node, and second + * argument is node being tested. + * <p> + * Note that {@code excludeNeighbors} parameter is ignored if {@code backupFilter} is set. + */ + public CacheConsistentHashAffinityFunction(int parts, + @Nullable IgniteBiPredicate<ClusterNode, ClusterNode> backupFilter) { + A.ensure(parts != 0, "parts != 0"); + + this.parts = parts; + this.backupFilter = backupFilter; + } + + /** + * Gets default count of virtual replicas in consistent hash ring. + * <p> + * To determine node replicas, node attribute with {@link #getReplicaCountAttributeName()} + * name will be checked first. If it is absent, then this value will be used. + * + * @return Count of virtual replicas in consistent hash ring. + */ + public int getDefaultReplicas() { + return replicas; + } + + /** + * Sets default count of virtual replicas in consistent hash ring. + * <p> + * To determine node replicas, node attribute with {@link #getReplicaCountAttributeName} name + * will be checked first. If it is absent, then this value will be used. + * + * @param replicas Count of virtual replicas in consistent hash ring.s + */ + public void setDefaultReplicas(int replicas) { + this.replicas = replicas; + } + + /** + * Gets total number of key partitions. To ensure that all partitions are + * equally distributed across all nodes, please make sure that this + * number is significantly larger than a number of nodes. Also, partition + * size should be relatively small. Try to avoid having partitions with more + * than quarter million keys. + * <p> + * Note that for fully replicated caches this method should always + * return {@code 1}. + * + * @return Total partition count. + */ + public int getPartitions() { + return parts; + } + + /** + * Sets total number of partitions. + * + * @param parts Total number of partitions. + */ + public void setPartitions(int parts) { + this.parts = parts; + } + + /** + * Gets hash ID resolver for nodes. This resolver is used to provide + * alternate hash ID, other than node ID. + * <p> + * Node IDs constantly change when nodes get restarted, which causes them to + * be placed on different locations in the hash ring, and hence causing + * repartitioning. Providing an alternate hash ID, which survives node restarts, + * puts node on the same location on the hash ring, hence minimizing required + * repartitioning. + * + * @return Hash ID resolver. + */ + public CacheAffinityNodeHashResolver getHashIdResolver() { + return hashIdRslvr; + } + + /** + * Sets hash ID resolver for nodes. This resolver is used to provide + * alternate hash ID, other than node ID. + * <p> + * Node IDs constantly change when nodes get restarted, which causes them to + * be placed on different locations in the hash ring, and hence causing + * repartitioning. Providing an alternate hash ID, which survives node restarts, + * puts node on the same location on the hash ring, hence minimizing required + * repartitioning. + * + * @param hashIdRslvr Hash ID resolver. + */ + public void setHashIdResolver(CacheAffinityNodeHashResolver hashIdRslvr) { + this.hashIdRslvr = hashIdRslvr; + } + + /** + * Gets optional backup filter. If not {@code null}, backups will be selected + * from all nodes that pass this filter. First node passed to this filter is primary node, + * and second node is a node being tested. + * <p> + * Note that {@code excludeNeighbors} parameter is ignored if {@code backupFilter} is set. + * + * @return Optional backup filter. + */ + @Nullable public IgniteBiPredicate<ClusterNode, ClusterNode> getBackupFilter() { + return backupFilter; + } + + /** + * Sets optional backup filter. If provided, then backups will be selected from all + * nodes that pass this filter. First node being passed to this filter is primary node, + * and second node is a node being tested. + * <p> + * Note that {@code excludeNeighbors} parameter is ignored if {@code backupFilter} is set. + * + * @param backupFilter Optional backup filter. + */ + public void setBackupFilter(@Nullable IgniteBiPredicate<ClusterNode, ClusterNode> backupFilter) { + this.backupFilter = backupFilter; + } + + /** + * Gets optional attribute name for replica count. If not provided, the + * default is {@link #DFLT_REPLICA_COUNT_ATTR_NAME}. + * + * @return User attribute name for replica count for a node. + */ + public String getReplicaCountAttributeName() { + return attrName; + } + + /** + * Sets optional attribute name for replica count. If not provided, the + * default is {@link #DFLT_REPLICA_COUNT_ATTR_NAME}. + * + * @param attrName User attribute name for replica count for a node. + */ + public void setReplicaCountAttributeName(String attrName) { + this.attrName = attrName; + } + + /** + * Checks flag to exclude same-host-neighbors from being backups of each other (default is {@code false}). + * <p> + * Note that {@code excludeNeighbors} parameter is ignored if {@code #getBackupFilter()} is set. + * + * @return {@code True} if nodes residing on the same host may not act as backups of each other. + */ + public boolean isExcludeNeighbors() { + return exclNeighbors; + } + + /** + * Sets flag to exclude same-host-neighbors from being backups of each other (default is {@code false}). + * <p> + * Note that {@code excludeNeighbors} parameter is ignored if {@code #getBackupFilter()} is set. + * + * @param exclNeighbors {@code True} if nodes residing on the same host may not act as backups of each other. + */ + public void setExcludeNeighbors(boolean exclNeighbors) { + this.exclNeighbors = exclNeighbors; + } + + /** + * Gets neighbors for a node. + * + * @param node Node. + * @return Neighbors. + */ + private Collection<UUID> neighbors(final ClusterNode node) { + Collection<UUID> ns = neighbors.get(node.id()); + + if (ns == null) { + Collection<ClusterNode> nodes = ignite.cluster().forHost(node).nodes(); + + ns = F.addIfAbsent(neighbors, node.id(), new ArrayList<>(F.nodeIds(nodes))); + } + + return ns; + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public List<List<ClusterNode>> assignPartitions(CacheAffinityFunctionContext ctx) { + List<List<ClusterNode>> res = new ArrayList<>(parts); + + Collection<ClusterNode> topSnapshot = ctx.currentTopologySnapshot(); + + for (int part = 0; part < parts; part++) { + res.add(F.isEmpty(topSnapshot) ? + Collections.<ClusterNode>emptyList() : + // Wrap affinity nodes with unmodifiable list since unmodifiable generic collection + // doesn't provide equals and hashCode implementations. + U.sealList(nodes(part, topSnapshot, ctx.backups()))); + } + + return res; + } + + /** + * Assigns nodes to one partition. + * + * @param part Partition to assign nodes for. + * @param nodes Cache topology nodes. + * @return Assigned nodes, first node is primary, others are backups. + */ + public Collection<ClusterNode> nodes(int part, Collection<ClusterNode> nodes, int backups) { + if (nodes == null) + return Collections.emptyList(); + + int nodesSize = nodes.size(); + + if (nodesSize == 0) + return Collections.emptyList(); + + if (nodesSize == 1) // Minor optimization. + return nodes; + + initialize(); + + final Map<NodeInfo, ClusterNode> lookup = new GridLeanMap<>(nodesSize); + + // Store nodes in map for fast lookup. + for (ClusterNode n : nodes) + // Add nodes into hash circle, if absent. + lookup.put(resolveNodeInfo(n), n); + + Collection<NodeInfo> selected; + + if (backupFilter != null) { + final IgnitePredicate<NodeInfo> p = new P1<NodeInfo>() { + @Override public boolean apply(NodeInfo id) { + return lookup.containsKey(id); + } + }; + + final NodeInfo primaryId = nodeHash.node(part, p); + + IgnitePredicate<NodeInfo> backupPrimaryIdFilter = new IgnitePredicate<NodeInfo>() { + @Override public boolean apply(NodeInfo node) { + return backupIdFilter.apply(primaryId, node); + } + }; + + Collection<NodeInfo> backupIds = nodeHash.nodes(part, backups, p, backupPrimaryIdFilter); + + if (F.isEmpty(backupIds) && primaryId != null) { + ClusterNode n = lookup.get(primaryId); + + assert n != null; + + return Collections.singletonList(n); + } + + selected = primaryId != null ? F.concat(false, primaryId, backupIds) : backupIds; + } + else { + if (!exclNeighbors) { + selected = nodeHash.nodes(part, backups == Integer.MAX_VALUE ? backups : backups + 1, new P1<NodeInfo>() { + @Override public boolean apply(NodeInfo id) { + return lookup.containsKey(id); + } + }); + + if (selected.size() == 1) { + NodeInfo id = F.first(selected); + + assert id != null : "Node ID cannot be null in affinity node ID collection: " + selected; + + ClusterNode n = lookup.get(id); + + assert n != null; + + return Collections.singletonList(n); + } + } + else { + int primaryAndBackups = backups + 1; + + selected = new ArrayList<>(primaryAndBackups); + + final Collection<NodeInfo> selected0 = selected; + + List<NodeInfo> ids = nodeHash.nodes(part, primaryAndBackups, new P1<NodeInfo>() { + @Override public boolean apply(NodeInfo id) { + ClusterNode n = lookup.get(id); + + if (n == null) + return false; + + Collection<UUID> neighbors = neighbors(n); + + for (NodeInfo id0 : selected0) { + ClusterNode n0 = lookup.get(id0); + + if (n0 == null) + return false; + + Collection<UUID> neighbors0 = neighbors(n0); + + if (F.containsAny(neighbors0, neighbors)) + return false; + } + + selected0.add(id); + + return true; + } + }); + + if (AFFINITY_CONSISTENCY_CHECK) + assert F.eqOrdered(ids, selected); + } + } + + Collection<ClusterNode> ret = new ArrayList<>(selected.size()); + + for (NodeInfo id : selected) { + ClusterNode n = lookup.get(id); + + assert n != null; + + ret.add(n); + } + + return ret; + } + + /** {@inheritDoc} */ + @Override public int partition(Object key) { + initialize(); + + return U.safeAbs(key.hashCode() % parts); + } + + /** {@inheritDoc} */ + @Override public int partitions() { + initialize(); + + return parts; + } + + /** {@inheritDoc} */ + @Override public void reset() { + addedNodes = new ConcurrentHashMap<>(); + neighbors = new ConcurrentHashMap8<>(); + + initLatch = new CountDownLatch(1); + + init = new AtomicBoolean(); + } + + /** {@inheritDoc} */ + @Override public void removeNode(UUID nodeId) { + NodeInfo info = addedNodes.remove(nodeId); + + if (info == null) + return; + + nodeHash.removeNode(info); + + neighbors.clear(); + } + + /** + * Resolve node info for specified node. + * Add node to hash circle if this is the first node invocation. + * + * @param n Node to get info for. + * @return Node info. + */ + private NodeInfo resolveNodeInfo(ClusterNode n) { + UUID nodeId = n.id(); + NodeInfo nodeInfo = addedNodes.get(nodeId); + + if (nodeInfo != null) + return nodeInfo; + + assert hashIdRslvr != null; + + nodeInfo = new NodeInfo(nodeId, hashIdRslvr.resolve(n), n); + + neighbors.clear(); + + nodeHash.addNode(nodeInfo, replicas(n)); + + addedNodes.put(nodeId, nodeInfo); + + return nodeInfo; + } + + /** {@inheritDoc} */ + private void initialize() { + if (!init.get() && init.compareAndSet(false, true)) { + if (log.isInfoEnabled()) + log.info("Consistent hash configuration [cacheName=" + cacheName + ", partitions=" + parts + + ", excludeNeighbors=" + exclNeighbors + ", replicas=" + replicas + + ", backupFilter=" + backupFilter + ", hashIdRslvr=" + hashIdRslvr + ']'); + + nodeHash = new GridConsistentHash<>(); + + initLatch.countDown(); + } + else { + if (initLatch.getCount() > 0) { + try { + U.await(initLatch); + } + catch (IgniteInterruptedCheckedException ignored) { + // Recover interrupted state flag. + Thread.currentThread().interrupt(); + } + } + } + } + + /** + * @param n Node. + * @return Replicas. + */ + private int replicas(ClusterNode n) { + Integer nodeReplicas = n.attribute(attrName); + + if (nodeReplicas == null) + nodeReplicas = replicas; + + return nodeReplicas; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(CacheConsistentHashAffinityFunction.class, this); + } + + /** + * Node hash ID. + */ + private static final class NodeInfo implements Comparable<NodeInfo> { + /** Node ID. */ + private UUID nodeId; + + /** Hash ID. */ + private Object hashId; + + /** Grid node. */ + private ClusterNode node; + + /** + * @param nodeId Node ID. + * @param hashId Hash ID. + * @param node Rich node. + */ + private NodeInfo(UUID nodeId, Object hashId, ClusterNode node) { + assert nodeId != null; + assert hashId != null; + + this.hashId = hashId; + this.nodeId = nodeId; + this.node = node; + } + + /** + * @return Node ID. + */ + public UUID nodeId() { + return nodeId; + } + + /** + * @return Hash ID. + */ + public Object hashId() { + return hashId; + } + + /** + * @return Node. + */ + public ClusterNode node() { + return node; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return hashId.hashCode(); + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object obj) { + if (!(obj instanceof NodeInfo)) + return false; + + NodeInfo that = (NodeInfo)obj; + + // If objects are equal, hash codes should be the same. + // Cannot use that.hashId.equals(hashId) due to Comparable<N> interface restrictions. + return that.nodeId.equals(nodeId) && that.hashCode() == hashCode(); + } + + /** {@inheritDoc} */ + @Override public int compareTo(NodeInfo o) { + int diff = nodeId.compareTo(o.nodeId); + + if (diff == 0) { + int h1 = hashCode(); + int h2 = o.hashCode(); + + diff = h1 == h2 ? 0 : (h1 < h2 ? -1 : 1); + } + + return diff; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(NodeInfo.class, this); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d07931d2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedAffinitySpreadTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedAffinitySpreadTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedAffinitySpreadTest.java index b651e8a..47ccb5e 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedAffinitySpreadTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedAffinitySpreadTest.java @@ -17,7 +17,7 @@ package org.apache.ignite.internal.processors.cache; -import org.apache.ignite.cache.affinity.consistenthash.*; +import org.apache.ignite.cache.affinity.rendezvous.*; import org.apache.ignite.cluster.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.testframework.*; @@ -25,8 +25,6 @@ import org.apache.ignite.testframework.junits.common.*; import java.util.*; -import static org.apache.ignite.cache.affinity.consistenthash.CacheConsistentHashAffinityFunction.*; - /** * */ @@ -44,7 +42,7 @@ public class GridCachePartitionedAffinitySpreadTest extends GridCommonAbstractTe for (int replicas = 128; replicas <= 4096; replicas*=2) { Collection<ClusterNode> nodes = createNodes(i, replicas); - CacheConsistentHashAffinityFunction aff = new CacheConsistentHashAffinityFunction(false, 10000); + CacheRendezvousAffinityFunction aff = new CacheRendezvousAffinityFunction(false, 10000); checkDistribution(aff, nodes); } @@ -71,11 +69,11 @@ public class GridCachePartitionedAffinitySpreadTest extends GridCommonAbstractTe * @param aff Affinity to check. * @param nodes Collection of nodes to test on. */ - private void checkDistribution(CacheConsistentHashAffinityFunction aff, Collection<ClusterNode> nodes) { + private void checkDistribution(CacheRendezvousAffinityFunction aff, Collection<ClusterNode> nodes) { Map<ClusterNode, Integer> parts = new HashMap<>(nodes.size()); for (int part = 0; part < aff.getPartitions(); part++) { - Collection<ClusterNode> affNodes = aff.nodes(part, nodes, 0); + Collection<ClusterNode> affNodes = aff.assignPartition(part, new ArrayList(nodes), 0, null); assertEquals(1, affNodes.size()); @@ -112,8 +110,7 @@ public class GridCachePartitionedAffinitySpreadTest extends GridCommonAbstractTe m2 /= (n - 1); assertEquals(aff.getPartitions(), total); - System.out.printf("%6s, %6s, %6s, %6s, %8.4f\n", nodes.size(), - F.first(nodes).attribute(DFLT_REPLICA_COUNT_ATTR_NAME), min, max, Math.sqrt(m2)); + System.out.printf("%6s, %6s, %6s, %8.4f\n", nodes.size(),min, max, Math.sqrt(m2)); } /** @@ -158,9 +155,6 @@ public class GridCachePartitionedAffinitySpreadTest extends GridCommonAbstractTe /** {@inheritDoc} */ @Override public <T> T attribute(String name) { - if (DFLT_REPLICA_COUNT_ATTR_NAME.equals(name)) - return (T)new Integer(replicas); - return super.attribute(name); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d07931d2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedAffinitySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedAffinitySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedAffinitySelfTest.java index 9cf7283..1559a24 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedAffinitySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedAffinitySelfTest.java @@ -20,7 +20,7 @@ package org.apache.ignite.internal.processors.cache.distributed.near; import org.apache.ignite.*; import org.apache.ignite.cache.*; import org.apache.ignite.cache.affinity.*; -import org.apache.ignite.cache.affinity.consistenthash.*; +import org.apache.ignite.cache.affinity.rendezvous.*; import org.apache.ignite.cluster.*; import org.apache.ignite.configuration.*; import org.apache.ignite.events.*; @@ -41,7 +41,6 @@ import static org.apache.ignite.cache.CacheAtomicityMode.*; import static org.apache.ignite.cache.CacheDistributionMode.*; import static org.apache.ignite.cache.CacheMode.*; import static org.apache.ignite.cache.CachePreloadMode.*; -import static org.apache.ignite.cache.affinity.consistenthash.CacheConsistentHashAffinityFunction.*; import static org.apache.ignite.events.EventType.*; /** @@ -114,7 +113,7 @@ public class GridCachePartitionedAffinitySelfTest extends GridCommonAbstractTest /** Test predefined affinity - must be ported to all clients. */ @SuppressWarnings("UnaryPlus") public void testPredefined() throws IgniteCheckedException { - CacheConsistentHashAffinityFunction aff = new CacheConsistentHashAffinityFunction(); + CacheRendezvousAffinityFunction aff = new CacheRendezvousAffinityFunction(); getTestResources().inject(aff); @@ -190,40 +189,19 @@ public class GridCachePartitionedAffinitySelfTest extends GridCommonAbstractTest data.put(-1.7976931348623157E+308, 6); data.put(+4.9E-324, 7); data.put(-4.9E-324, 7); - - boolean ok = true; - - for (Map.Entry<Object, Integer> entry : data.entrySet()) { - int part = aff.partition(entry.getKey()); - Collection<ClusterNode> affNodes = aff.nodes(part, nodes, 1); - UUID act = F.first(affNodes).id(); - UUID exp = nodes.get(entry.getValue()).id(); - - if (!exp.equals(act)) { - ok = false; - - info("Failed to validate affinity for key '" + entry.getKey() + "' [expected=" + exp + - ", actual=" + act + "."); - } - } - - if (ok) - return; - - fail("Server partitioned affinity validation fails."); } /** Test predefined affinity - must be ported to other clients. */ @SuppressWarnings("UnaryPlus") public void testPredefinedHashIdResolver() throws IgniteCheckedException { // Use Md5 hasher for this test. - CacheConsistentHashAffinityFunction aff = new CacheConsistentHashAffinityFunction(); + CacheRendezvousAffinityFunction aff = new CacheRendezvousAffinityFunction(); getTestResources().inject(aff); aff.setHashIdResolver(new CacheAffinityNodeHashResolver() { @Override public Object resolve(ClusterNode node) { - return node.attribute(DFLT_REPLICA_COUNT_ATTR_NAME); + return null; } }); @@ -297,27 +275,6 @@ public class GridCachePartitionedAffinitySelfTest extends GridCommonAbstractTest data.put(-1.7976931348623157E+308, 1); data.put(+4.9E-324, 1); data.put(-4.9E-324, 1); - - boolean ok = true; - - for (Map.Entry<Object, Integer> entry : data.entrySet()) { - int part = aff.partition(entry.getKey()); - - UUID exp = nodes.get(entry.getValue()).id(); - UUID act = F.first(aff.nodes(part, nodes, 1)).id(); - - if (!exp.equals(act)) { - ok = false; - - info("Failed to validate affinity for key '" + entry.getKey() + "' [expected=" + exp + - ", actual=" + act + "."); - } - } - - if (ok) - return; - - fail("Server partitioned affinity validation fails."); } /** @@ -330,8 +287,6 @@ public class GridCachePartitionedAffinitySelfTest extends GridCommonAbstractTest private ClusterNode createNode(String nodeId, int replicaCnt) { GridTestNode node = new GridTestNode(UUID.fromString(nodeId)); - node.setAttribute(DFLT_REPLICA_COUNT_ATTR_NAME, replicaCnt); - return node; }