http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25609651/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheAffinityKeyMapper.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheAffinityKeyMapper.java b/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheAffinityKeyMapper.java new file mode 100644 index 0000000..3584200 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheAffinityKeyMapper.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.affinity; + +import org.gridgain.grid.cache.*; + +import java.io.*; + +/** + * Affinity mapper which maps cache key to an affinity key. Affinity key is a key which will be + * used to determine a node on which this key will be cached. Every cache key will first be passed + * through {@link #affinityKey(Object)} method, and the returned value of this method + * will be given to {@link CacheAffinityFunction} implementation to find out key-to-node affinity. + * <p> + * The default implementation, which will be used if no explicit affinity mapper is specified + * in cache configuration, will first look for any field or method annotated with + * {@link CacheAffinityKeyMapped @GridCacheAffinityKeyMapped} annotation. If such field or method + * is not found, then the cache key itself will be returned from {@link #affinityKey(Object) affinityKey(Object)} + * method (this means that all objects with the same cache key will always be routed to the same node). + * If such field or method is found, then the value of this field or method will be returned from + * {@link #affinityKey(Object) affinityKey(Object)} method. This allows to specify alternate affinity key, other + * than the cache key itself, whenever needed. + * <p> + * A custom (other than default) affinity mapper can be provided + * via {@link GridCacheConfiguration#getAffinityMapper()} configuration property. + * <p> + * For more information on affinity mapping and examples refer to {@link CacheAffinityFunction} and + * {@link CacheAffinityKeyMapped @GridCacheAffinityKeyMapped} documentation. + * @see CacheAffinityFunction + * @see CacheAffinityKeyMapped + */ +public interface CacheAffinityKeyMapper extends Serializable { + /** + * Maps passed in key to an alternate key which will be used for node affinity. + * + * @param key Key to map. + * @return Key to be used for node-to-affinity mapping (may be the same + * key as passed in). + */ + public Object affinityKey(Object key); + + /** + * Resets cache affinity mapper to its initial state. This method will be called by + * the system any time the affinity mapper has been sent to remote node where + * it has to be reinitialized. If your implementation of affinity mapper + * has no initialization logic, leave this method empty. + */ + public void reset(); +}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25609651/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheAffinityNodeAddressHashResolver.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheAffinityNodeAddressHashResolver.java b/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheAffinityNodeAddressHashResolver.java new file mode 100644 index 0000000..732ebf6 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheAffinityNodeAddressHashResolver.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.affinity; + +import org.apache.ignite.cluster.*; +import org.gridgain.grid.util.typedef.internal.*; + +/** + * Node hash resolver which uses {@link org.apache.ignite.cluster.ClusterNode#consistentId()} as alternate hash value. + */ +public class CacheAffinityNodeAddressHashResolver implements CacheAffinityNodeHashResolver { + /** */ + private static final long serialVersionUID = 0L; + + /** {@inheritDoc} */ + @Override public Object resolve(ClusterNode node) { + return node.consistentId(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(CacheAffinityNodeAddressHashResolver.class, this); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25609651/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheAffinityNodeHashResolver.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheAffinityNodeHashResolver.java b/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheAffinityNodeHashResolver.java new file mode 100644 index 0000000..79489cf --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheAffinityNodeHashResolver.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.affinity; + +import org.apache.ignite.cluster.*; + +import java.io.*; + +/** + * Resolver which is used to provide node hash value for affinity function. + * <p> + * Node IDs constantly change when nodes get restarted, which causes affinity mapping to change between restarts, + * and hence causing redundant repartitioning. Providing an alternate node hash value, which survives node restarts, + * will help to map keys to the same nodes whenever possible. + * <p> + * Note that on case clients exist they will query this object from the server and use it for affinity calculation. + * Therefore you must ensure that server and clients can marshal and unmarshal this object in portable format, + * i.e. all parties have object class(es) configured as portable. + */ +public interface CacheAffinityNodeHashResolver extends Serializable { + /** + * Resolve alternate hash value for the given Grid node. + * + * @param node Grid node. + * @return Resolved hash ID. + */ + public Object resolve(ClusterNode node); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25609651/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheAffinityNodeIdHashResolver.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheAffinityNodeIdHashResolver.java b/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheAffinityNodeIdHashResolver.java new file mode 100644 index 0000000..9604d5a --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheAffinityNodeIdHashResolver.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.affinity; + +import org.apache.ignite.cluster.*; +import org.gridgain.grid.util.typedef.internal.*; + +/** + * Node hash resolver which uses generated node ID as node hash value. As new node ID is generated + * on each node start, this resolver do not provide ability to map keys to the same nodes after restart. + */ +public class CacheAffinityNodeIdHashResolver implements CacheAffinityNodeHashResolver { + /** */ + private static final long serialVersionUID = 0L; + + /** {@inheritDoc} */ + @Override public Object resolve(ClusterNode node) { + return node.id(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(CacheAffinityNodeIdHashResolver.class, this); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25609651/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheCentralizedAffinityFunction.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheCentralizedAffinityFunction.java b/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheCentralizedAffinityFunction.java new file mode 100644 index 0000000..0df8ffe --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheCentralizedAffinityFunction.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.affinity; + +import java.lang.annotation.*; + +/** + * Annotation marker which identifies affinity function that must be calculated on one centralized node + * instead of independently on each node. In many cases it happens because it requires previous affinity state + * in order to calculate new one. + */ +@Retention(RetentionPolicy.RUNTIME) +@Target(ElementType.TYPE) +public @interface CacheCentralizedAffinityFunction { + +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25609651/modules/core/src/main/java/org/apache/ignite/cache/affinity/consistenthash/CacheConsistentHashAffinityFunction.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/affinity/consistenthash/CacheConsistentHashAffinityFunction.java b/modules/core/src/main/java/org/apache/ignite/cache/affinity/consistenthash/CacheConsistentHashAffinityFunction.java new file mode 100644 index 0000000..17b53fb --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/cache/affinity/consistenthash/CacheConsistentHashAffinityFunction.java @@ -0,0 +1,703 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.affinity.consistenthash; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.resources.*; +import org.gridgain.grid.*; +import org.gridgain.grid.cache.*; +import org.apache.ignite.cache.affinity.*; +import org.gridgain.grid.util.*; +import org.gridgain.grid.util.tostring.*; +import org.gridgain.grid.util.typedef.*; +import org.gridgain.grid.util.typedef.internal.*; +import org.jdk8.backport.*; +import org.jetbrains.annotations.*; + +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +/** + * Affinity function for partitioned cache. This function supports the following + * configuration: + * <ul> + * <li> + * {@code backups} - Use this flag to control how many back up nodes will be + * assigned to every key. The default value is {@code 0}. + * </li> + * <li> + * {@code replicas} - Generally the more replicas a node gets, the more key assignments + * it will receive. You can configure different number of replicas for a node by + * setting user attribute with name {@link #getReplicaCountAttributeName()} to some + * number. Default value is {@code 512} defined by {@link #DFLT_REPLICA_COUNT} constant. + * </li> + * <li> + * {@code backupFilter} - Optional filter for back up nodes. If provided, then only + * nodes that pass this filter will be selected as backup nodes. If not provided, then + * primary and backup nodes will be selected out of all nodes available for this cache. + * </li> + * </ul> + * <p> + * Cache affinity can be configured for individual caches via {@link GridCacheConfiguration#getAffinity()} method. + */ +public class CacheConsistentHashAffinityFunction implements CacheAffinityFunction { + /** */ + private static final long serialVersionUID = 0L; + + /** Flag to enable/disable consistency check (for internal use only). */ + private static final boolean AFFINITY_CONSISTENCY_CHECK = Boolean.getBoolean("GRIDGAIN_AFFINITY_CONSISTENCY_CHECK"); + + /** Default number of partitions. */ + public static final int DFLT_PARTITION_COUNT = 10000; + + /** Default replica count for partitioned caches. */ + public static final int DFLT_REPLICA_COUNT = 128; + + /** + * Name of node attribute to specify number of replicas for a node. + * Default value is {@code gg:affinity:node:replicas}. + */ + public static final String DFLT_REPLICA_COUNT_ATTR_NAME = "gg:affinity:node:replicas"; + + /** Node hash. */ + private transient GridConsistentHash<NodeInfo> nodeHash; + + /** Total number of partitions. */ + private int parts = DFLT_PARTITION_COUNT; + + /** */ + private int replicas = DFLT_REPLICA_COUNT; + + /** */ + private String attrName = DFLT_REPLICA_COUNT_ATTR_NAME; + + /** */ + private boolean exclNeighbors; + + /** + * Optional backup filter. First node passed to this filter is primary node, + * and second node is a node being tested. + */ + private IgniteBiPredicate<ClusterNode, ClusterNode> backupFilter; + + /** */ + private CacheAffinityNodeHashResolver hashIdRslvr = new CacheAffinityNodeAddressHashResolver(); + + /** Injected grid. */ + @IgniteInstanceResource + private Ignite ignite; + + /** Injected cache name. */ + @IgniteCacheNameResource + private String cacheName; + + /** Injected logger. */ + @IgniteLoggerResource + private IgniteLogger log; + + /** Initialization flag. */ + @SuppressWarnings("TransientFieldNotInitialized") + private transient AtomicBoolean init = new AtomicBoolean(); + + /** Latch for initializing. */ + @SuppressWarnings({"TransientFieldNotInitialized"}) + private transient CountDownLatch initLatch = new CountDownLatch(1); + + /** Nodes IDs. */ + @GridToStringInclude + @SuppressWarnings({"TransientFieldNotInitialized"}) + private transient ConcurrentMap<UUID, NodeInfo> addedNodes = new ConcurrentHashMap<>(); + + /** Optional backup filter. */ + @GridToStringExclude + private final IgniteBiPredicate<NodeInfo, NodeInfo> backupIdFilter = new IgniteBiPredicate<NodeInfo, NodeInfo>() { + @Override public boolean apply(NodeInfo primaryNodeInfo, NodeInfo nodeInfo) { + return backupFilter == null || backupFilter.apply(primaryNodeInfo.node(), nodeInfo.node()); + } + }; + + /** Map of neighbors. */ + @SuppressWarnings("TransientFieldNotInitialized") + private transient ConcurrentMap<UUID, Collection<UUID>> neighbors = + new ConcurrentHashMap8<>(); + + /** + * Empty constructor with all defaults. + */ + public CacheConsistentHashAffinityFunction() { + // No-op. + } + + /** + * Initializes affinity with flag to exclude same-host-neighbors from being backups of each other + * and specified number of backups. + * <p> + * Note that {@code excludeNeighbors} parameter is ignored if {@code #getBackupFilter()} is set. + * + * @param exclNeighbors {@code True} if nodes residing on the same host may not act as backups + * of each other. + */ + public CacheConsistentHashAffinityFunction(boolean exclNeighbors) { + this.exclNeighbors = exclNeighbors; + } + + /** + * Initializes affinity with flag to exclude same-host-neighbors from being backups of each other, + * and specified number of backups and partitions. + * <p> + * Note that {@code excludeNeighbors} parameter is ignored if {@code #getBackupFilter()} is set. + * + * @param exclNeighbors {@code True} if nodes residing on the same host may not act as backups + * of each other. + * @param parts Total number of partitions. + */ + public CacheConsistentHashAffinityFunction(boolean exclNeighbors, int parts) { + A.ensure(parts != 0, "parts != 0"); + + this.exclNeighbors = exclNeighbors; + this.parts = parts; + } + + /** + * Initializes optional counts for replicas and backups. + * <p> + * Note that {@code excludeNeighbors} parameter is ignored if {@code backupFilter} is set. + * + * @param parts Total number of partitions. + * @param backupFilter Optional back up filter for nodes. If provided, backups will be selected + * from all nodes that pass this filter. First argument for this filter is primary node, and second + * argument is node being tested. + * <p> + * Note that {@code excludeNeighbors} parameter is ignored if {@code backupFilter} is set. + */ + public CacheConsistentHashAffinityFunction(int parts, + @Nullable IgniteBiPredicate<ClusterNode, ClusterNode> backupFilter) { + A.ensure(parts != 0, "parts != 0"); + + this.parts = parts; + this.backupFilter = backupFilter; + } + + /** + * Gets default count of virtual replicas in consistent hash ring. + * <p> + * To determine node replicas, node attribute with {@link #getReplicaCountAttributeName()} + * name will be checked first. If it is absent, then this value will be used. + * + * @return Count of virtual replicas in consistent hash ring. + */ + public int getDefaultReplicas() { + return replicas; + } + + /** + * Sets default count of virtual replicas in consistent hash ring. + * <p> + * To determine node replicas, node attribute with {@link #getReplicaCountAttributeName} name + * will be checked first. If it is absent, then this value will be used. + * + * @param replicas Count of virtual replicas in consistent hash ring.s + */ + public void setDefaultReplicas(int replicas) { + this.replicas = replicas; + } + + /** + * Gets total number of key partitions. To ensure that all partitions are + * equally distributed across all nodes, please make sure that this + * number is significantly larger than a number of nodes. Also, partition + * size should be relatively small. Try to avoid having partitions with more + * than quarter million keys. + * <p> + * Note that for fully replicated caches this method should always + * return {@code 1}. + * + * @return Total partition count. + */ + public int getPartitions() { + return parts; + } + + /** + * Sets total number of partitions. + * + * @param parts Total number of partitions. + */ + public void setPartitions(int parts) { + this.parts = parts; + } + + /** + * Gets hash ID resolver for nodes. This resolver is used to provide + * alternate hash ID, other than node ID. + * <p> + * Node IDs constantly change when nodes get restarted, which causes them to + * be placed on different locations in the hash ring, and hence causing + * repartitioning. Providing an alternate hash ID, which survives node restarts, + * puts node on the same location on the hash ring, hence minimizing required + * repartitioning. + * + * @return Hash ID resolver. + */ + public CacheAffinityNodeHashResolver getHashIdResolver() { + return hashIdRslvr; + } + + /** + * Sets hash ID resolver for nodes. This resolver is used to provide + * alternate hash ID, other than node ID. + * <p> + * Node IDs constantly change when nodes get restarted, which causes them to + * be placed on different locations in the hash ring, and hence causing + * repartitioning. Providing an alternate hash ID, which survives node restarts, + * puts node on the same location on the hash ring, hence minimizing required + * repartitioning. + * + * @param hashIdRslvr Hash ID resolver. + */ + public void setHashIdResolver(CacheAffinityNodeHashResolver hashIdRslvr) { + this.hashIdRslvr = hashIdRslvr; + } + + /** + * Gets optional backup filter. If not {@code null}, backups will be selected + * from all nodes that pass this filter. First node passed to this filter is primary node, + * and second node is a node being tested. + * <p> + * Note that {@code excludeNeighbors} parameter is ignored if {@code backupFilter} is set. + * + * @return Optional backup filter. + */ + @Nullable public IgniteBiPredicate<ClusterNode, ClusterNode> getBackupFilter() { + return backupFilter; + } + + /** + * Sets optional backup filter. If provided, then backups will be selected from all + * nodes that pass this filter. First node being passed to this filter is primary node, + * and second node is a node being tested. + * <p> + * Note that {@code excludeNeighbors} parameter is ignored if {@code backupFilter} is set. + * + * @param backupFilter Optional backup filter. + */ + public void setBackupFilter(@Nullable IgniteBiPredicate<ClusterNode, ClusterNode> backupFilter) { + this.backupFilter = backupFilter; + } + + /** + * Gets optional attribute name for replica count. If not provided, the + * default is {@link #DFLT_REPLICA_COUNT_ATTR_NAME}. + * + * @return User attribute name for replica count for a node. + */ + public String getReplicaCountAttributeName() { + return attrName; + } + + /** + * Sets optional attribute name for replica count. If not provided, the + * default is {@link #DFLT_REPLICA_COUNT_ATTR_NAME}. + * + * @param attrName User attribute name for replica count for a node. + */ + public void setReplicaCountAttributeName(String attrName) { + this.attrName = attrName; + } + + /** + * Checks flag to exclude same-host-neighbors from being backups of each other (default is {@code false}). + * <p> + * Note that {@code excludeNeighbors} parameter is ignored if {@code #getBackupFilter()} is set. + * + * @return {@code True} if nodes residing on the same host may not act as backups of each other. + */ + public boolean isExcludeNeighbors() { + return exclNeighbors; + } + + /** + * Sets flag to exclude same-host-neighbors from being backups of each other (default is {@code false}). + * <p> + * Note that {@code excludeNeighbors} parameter is ignored if {@code #getBackupFilter()} is set. + * + * @param exclNeighbors {@code True} if nodes residing on the same host may not act as backups of each other. + */ + public void setExcludeNeighbors(boolean exclNeighbors) { + this.exclNeighbors = exclNeighbors; + } + + /** + * Gets neighbors for a node. + * + * @param node Node. + * @return Neighbors. + */ + private Collection<UUID> neighbors(final ClusterNode node) { + Collection<UUID> ns = neighbors.get(node.id()); + + if (ns == null) { + Collection<ClusterNode> nodes = ignite.cluster().forHost(node).nodes(); + + ns = F.addIfAbsent(neighbors, node.id(), new ArrayList<>(F.nodeIds(nodes))); + } + + return ns; + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public List<List<ClusterNode>> assignPartitions(CacheAffinityFunctionContext ctx) { + List<List<ClusterNode>> res = new ArrayList<>(parts); + + Collection<ClusterNode> topSnapshot = ctx.currentTopologySnapshot(); + + for (int part = 0; part < parts; part++) { + res.add(F.isEmpty(topSnapshot) ? + Collections.<ClusterNode>emptyList() : + // Wrap affinity nodes with unmodifiable list since unmodifiable generic collection + // doesn't provide equals and hashCode implementations. + U.sealList(nodes(part, topSnapshot, ctx.backups()))); + } + + return res; + } + + /** + * Assigns nodes to one partition. + * + * @param part Partition to assign nodes for. + * @param nodes Cache topology nodes. + * @return Assigned nodes, first node is primary, others are backups. + */ + public Collection<ClusterNode> nodes(int part, Collection<ClusterNode> nodes, int backups) { + if (nodes == null) + return Collections.emptyList(); + + int nodesSize = nodes.size(); + + if (nodesSize == 0) + return Collections.emptyList(); + + if (nodesSize == 1) // Minor optimization. + return nodes; + + initialize(); + + final Map<NodeInfo, ClusterNode> lookup = new GridLeanMap<>(nodesSize); + + // Store nodes in map for fast lookup. + for (ClusterNode n : nodes) + // Add nodes into hash circle, if absent. + lookup.put(resolveNodeInfo(n), n); + + Collection<NodeInfo> selected; + + if (backupFilter != null) { + final IgnitePredicate<NodeInfo> p = new P1<NodeInfo>() { + @Override public boolean apply(NodeInfo id) { + return lookup.containsKey(id); + } + }; + + final NodeInfo primaryId = nodeHash.node(part, p); + + IgnitePredicate<NodeInfo> backupPrimaryIdFilter = new IgnitePredicate<NodeInfo>() { + @Override public boolean apply(NodeInfo node) { + return backupIdFilter.apply(primaryId, node); + } + }; + + Collection<NodeInfo> backupIds = nodeHash.nodes(part, backups, p, backupPrimaryIdFilter); + + if (F.isEmpty(backupIds) && primaryId != null) { + ClusterNode n = lookup.get(primaryId); + + assert n != null; + + return Collections.singletonList(n); + } + + selected = primaryId != null ? F.concat(false, primaryId, backupIds) : backupIds; + } + else { + if (!exclNeighbors) { + selected = nodeHash.nodes(part, backups == Integer.MAX_VALUE ? backups : backups + 1, new P1<NodeInfo>() { + @Override public boolean apply(NodeInfo id) { + return lookup.containsKey(id); + } + }); + + if (selected.size() == 1) { + NodeInfo id = F.first(selected); + + assert id != null : "Node ID cannot be null in affinity node ID collection: " + selected; + + ClusterNode n = lookup.get(id); + + assert n != null; + + return Collections.singletonList(n); + } + } + else { + int primaryAndBackups = backups + 1; + + selected = new ArrayList<>(primaryAndBackups); + + final Collection<NodeInfo> selected0 = selected; + + List<NodeInfo> ids = nodeHash.nodes(part, primaryAndBackups, new P1<NodeInfo>() { + @Override public boolean apply(NodeInfo id) { + ClusterNode n = lookup.get(id); + + if (n == null) + return false; + + Collection<UUID> neighbors = neighbors(n); + + for (NodeInfo id0 : selected0) { + ClusterNode n0 = lookup.get(id0); + + if (n0 == null) + return false; + + Collection<UUID> neighbors0 = neighbors(n0); + + if (F.containsAny(neighbors0, neighbors)) + return false; + } + + selected0.add(id); + + return true; + } + }); + + if (AFFINITY_CONSISTENCY_CHECK) + assert F.eqOrdered(ids, selected); + } + } + + Collection<ClusterNode> ret = new ArrayList<>(selected.size()); + + for (NodeInfo id : selected) { + ClusterNode n = lookup.get(id); + + assert n != null; + + ret.add(n); + } + + return ret; + } + + /** {@inheritDoc} */ + @Override public int partition(Object key) { + initialize(); + + return U.safeAbs(key.hashCode() % parts); + } + + /** {@inheritDoc} */ + @Override public int partitions() { + initialize(); + + return parts; + } + + /** {@inheritDoc} */ + @Override public void reset() { + addedNodes = new ConcurrentHashMap<>(); + neighbors = new ConcurrentHashMap8<>(); + + initLatch = new CountDownLatch(1); + + init = new AtomicBoolean(); + } + + /** {@inheritDoc} */ + @Override public void removeNode(UUID nodeId) { + NodeInfo info = addedNodes.remove(nodeId); + + if (info == null) + return; + + nodeHash.removeNode(info); + + neighbors.clear(); + } + + /** + * Resolve node info for specified node. + * Add node to hash circle if this is the first node invocation. + * + * @param n Node to get info for. + * @return Node info. + */ + private NodeInfo resolveNodeInfo(ClusterNode n) { + UUID nodeId = n.id(); + NodeInfo nodeInfo = addedNodes.get(nodeId); + + if (nodeInfo != null) + return nodeInfo; + + assert hashIdRslvr != null; + + nodeInfo = new NodeInfo(nodeId, hashIdRslvr.resolve(n), n); + + neighbors.clear(); + + nodeHash.addNode(nodeInfo, replicas(n)); + + addedNodes.put(nodeId, nodeInfo); + + return nodeInfo; + } + + /** {@inheritDoc} */ + private void initialize() { + if (!init.get() && init.compareAndSet(false, true)) { + if (log.isInfoEnabled()) + log.info("Consistent hash configuration [cacheName=" + cacheName + ", partitions=" + parts + + ", excludeNeighbors=" + exclNeighbors + ", replicas=" + replicas + + ", backupFilter=" + backupFilter + ", hashIdRslvr=" + hashIdRslvr + ']'); + + nodeHash = new GridConsistentHash<>(); + + initLatch.countDown(); + } + else { + if (initLatch.getCount() > 0) { + try { + U.await(initLatch); + } + catch (GridInterruptedException ignored) { + // Recover interrupted state flag. + Thread.currentThread().interrupt(); + } + } + } + } + + /** + * @param n Node. + * @return Replicas. + */ + private int replicas(ClusterNode n) { + Integer nodeReplicas = n.attribute(attrName); + + if (nodeReplicas == null) + nodeReplicas = replicas; + + return nodeReplicas; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(CacheConsistentHashAffinityFunction.class, this); + } + + /** + * Node hash ID. + */ + private static final class NodeInfo implements Comparable<NodeInfo> { + /** Node ID. */ + private UUID nodeId; + + /** Hash ID. */ + private Object hashId; + + /** Grid node. */ + private ClusterNode node; + + /** + * @param nodeId Node ID. + * @param hashId Hash ID. + * @param node Rich node. + */ + private NodeInfo(UUID nodeId, Object hashId, ClusterNode node) { + assert nodeId != null; + assert hashId != null; + + this.hashId = hashId; + this.nodeId = nodeId; + this.node = node; + } + + /** + * @return Node ID. + */ + public UUID nodeId() { + return nodeId; + } + + /** + * @return Hash ID. + */ + public Object hashId() { + return hashId; + } + + /** + * @return Node. + */ + public ClusterNode node() { + return node; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return hashId.hashCode(); + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object obj) { + if (!(obj instanceof NodeInfo)) + return false; + + NodeInfo that = (NodeInfo)obj; + + // If objects are equal, hash codes should be the same. + // Cannot use that.hashId.equals(hashId) due to Comparable<N> interface restrictions. + return that.nodeId.equals(nodeId) && that.hashCode() == hashCode(); + } + + /** {@inheritDoc} */ + @Override public int compareTo(NodeInfo o) { + int diff = nodeId.compareTo(o.nodeId); + + if (diff == 0) { + int h1 = hashCode(); + int h2 = o.hashCode(); + + diff = h1 == h2 ? 0 : (h1 < h2 ? -1 : 1); + } + + return diff; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(NodeInfo.class, this); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25609651/modules/core/src/main/java/org/apache/ignite/cache/affinity/consistenthash/package.html ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/affinity/consistenthash/package.html b/modules/core/src/main/java/org/apache/ignite/cache/affinity/consistenthash/package.html new file mode 100644 index 0000000..f5d5e93 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/cache/affinity/consistenthash/package.html @@ -0,0 +1,24 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + +<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> +<html> +<body> + <!-- Package description. --> + Contains consistent hash based cache affinity for partitioned cache. +</body> +</html> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25609651/modules/core/src/main/java/org/apache/ignite/cache/affinity/fair/CachePartitionFairAffinity.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/affinity/fair/CachePartitionFairAffinity.java b/modules/core/src/main/java/org/apache/ignite/cache/affinity/fair/CachePartitionFairAffinity.java new file mode 100644 index 0000000..3b34413 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/cache/affinity/fair/CachePartitionFairAffinity.java @@ -0,0 +1,805 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.affinity.fair; + +import org.apache.ignite.cluster.*; +import org.apache.ignite.events.*; +import org.apache.ignite.lang.*; +import org.gridgain.grid.cache.*; +import org.apache.ignite.cache.affinity.*; +import org.gridgain.grid.util.typedef.*; +import org.gridgain.grid.util.typedef.internal.*; + +import java.io.*; +import java.util.*; + +/** + * Fair affinity function which tries to ensure that all nodes get equal number of partitions with + * minimum amount of reassignments between existing nodes. + * <p> + * Cache affinity can be configured for individual caches via {@link GridCacheConfiguration#getAffinity()} method. + */ +@CacheCentralizedAffinityFunction +public class CachePartitionFairAffinity implements CacheAffinityFunction { + /** Default partition count. */ + public static final int DFLT_PART_CNT = 256; + + /** */ + private static final long serialVersionUID = 0L; + + /** Ascending comparator. */ + private static final Comparator<PartitionSet> ASC_CMP = new PartitionSetComparator(false); + + /** Descending comparator. */ + private static final Comparator<PartitionSet> DESC_CMP = new PartitionSetComparator(true); + + /** */ + private int parts; + + /** + * Creates fair affinity with default partition count. + */ + public CachePartitionFairAffinity() { + this(DFLT_PART_CNT); + } + + /** + * @param parts Number of partitions. + */ + public CachePartitionFairAffinity(int parts) { + this.parts = parts; + } + + /** {@inheritDoc} */ + @Override public List<List<ClusterNode>> assignPartitions(CacheAffinityFunctionContext ctx) { + List<ClusterNode> topSnapshot = ctx.currentTopologySnapshot(); + + if (topSnapshot.size() == 1) { + ClusterNode primary = topSnapshot.get(0); + + List<List<ClusterNode>> assignments = new ArrayList<>(parts); + + for (int i = 0; i < parts; i++) + assignments.add(Collections.singletonList(primary)); + + return assignments; + } + + IgniteBiTuple<List<List<ClusterNode>>, Map<UUID, PartitionSet>> cp = createCopy(ctx, topSnapshot); + + List<List<ClusterNode>> assignment = cp.get1(); + + int tiers = Math.min(ctx.backups() + 1, topSnapshot.size()); + + // Per tier pending partitions. + Map<Integer, Queue<Integer>> pendingParts = new HashMap<>(); + + FullAssignmentMap fullMap = new FullAssignmentMap(tiers, assignment, topSnapshot); + + for (int tier = 0; tier < tiers; tier++) { + // Check if this is a new tier and add pending partitions. + Queue<Integer> pending = pendingParts.get(tier); + + for (int part = 0; part < parts; part++) { + if (fullMap.assignments.get(part).size() < tier + 1) { + if (pending == null) { + pending = new LinkedList<>(); + + pendingParts.put(tier, pending); + } + + if (!pending.contains(part)) + pending.add(part); + + } + } + + // Assign pending partitions, if any. + assignPending(tier, pendingParts, fullMap, topSnapshot); + + // Balance assignments. + balance(tier, pendingParts, fullMap, topSnapshot); + } + + return fullMap.assignments; + } + + /** {@inheritDoc} */ + @Override public void reset() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public int partitions() { + return parts; + } + + /** {@inheritDoc} */ + @Override public int partition(Object key) { + return U.safeAbs(hash(key.hashCode())) % parts; + } + + /** {@inheritDoc} */ + @Override public void removeNode(UUID nodeId) { + // No-op. + } + + /** + * Assigns pending (unassigned) partitions to nodes. + * + * @param tier Tier to assign (0 is primary, 1 - 1st backup,...). + * @param pendingMap Pending partitions per tier. + * @param fullMap Full assignment map to modify. + * @param topSnapshot Topology snapshot. + */ + private void assignPending(int tier, Map<Integer, Queue<Integer>> pendingMap, FullAssignmentMap fullMap, + List<ClusterNode> topSnapshot) { + Queue<Integer> pending = pendingMap.get(tier); + + if (F.isEmpty(pending)) + return; + + int idealPartCnt = parts / topSnapshot.size(); + + Map<UUID, PartitionSet> tierMapping = fullMap.tierMapping(tier); + + PrioritizedPartitionMap underloadedNodes = filterNodes(tierMapping, idealPartCnt, false); + + // First iterate over underloaded nodes. + assignPendingToUnderloaded(tier, pendingMap, fullMap, underloadedNodes, topSnapshot, false); + + if (!pending.isEmpty() && !underloadedNodes.isEmpty()) { + // Same, forcing updates. + assignPendingToUnderloaded(tier, pendingMap, fullMap, underloadedNodes, topSnapshot, true); + } + + if (!pending.isEmpty()) + assignPendingToNodes(tier, pendingMap, fullMap, topSnapshot); + + assert pending.isEmpty(); + + pendingMap.remove(tier); + } + + /** + * Assigns pending partitions to underloaded nodes. + * + * @param tier Tier to assign. + * @param pendingMap Pending partitions per tier. + * @param fullMap Full assignment map to modify. + * @param underloadedNodes Underloaded nodes. + * @param topSnapshot Topology snapshot. + * @param force {@code True} if partitions should be moved. + */ + private void assignPendingToUnderloaded( + int tier, + Map<Integer, Queue<Integer>> pendingMap, + FullAssignmentMap fullMap, + PrioritizedPartitionMap underloadedNodes, + Collection<ClusterNode> topSnapshot, + boolean force) { + Iterator<Integer> it = pendingMap.get(tier).iterator(); + + int ideal = parts / topSnapshot.size(); + + while (it.hasNext()) { + int part = it.next(); + + for (PartitionSet set : underloadedNodes.assignments()) { + ClusterNode node = set.node(); + + assert node != null; + + if (fullMap.assign(part, tier, node, force, pendingMap)) { + // We could add partition to partition map without forcing, remove partition from pending. + it.remove(); + + if (set.size() <= ideal) + underloadedNodes.remove(set.nodeId()); + else + underloadedNodes.update(); + + break; // for, continue to the next partition. + } + } + + if (underloadedNodes.isEmpty()) + return; + } + } + + /** + * Spreads pending partitions equally to all nodes in topology snapshot. + * + * @param tier Tier to assign. + * @param pendingMap Pending partitions per tier. + * @param fullMap Full assignment map to modify. + * @param topSnapshot Topology snapshot. + */ + private void assignPendingToNodes(int tier, Map<Integer, Queue<Integer>> pendingMap, + FullAssignmentMap fullMap, List<ClusterNode> topSnapshot) { + Iterator<Integer> it = pendingMap.get(tier).iterator(); + + int idx = 0; + + while (it.hasNext()) { + int part = it.next(); + + int i = idx; + + boolean assigned = false; + + do { + ClusterNode node = topSnapshot.get(i); + + if (fullMap.assign(part, tier, node, false, pendingMap)) { + it.remove(); + + assigned = true; + } + + i = (i + 1) % topSnapshot.size(); + + if (assigned) + idx = i; + } while (i != idx); + + if (!assigned) { + do { + ClusterNode node = topSnapshot.get(i); + + if (fullMap.assign(part, tier, node, true, pendingMap)) { + it.remove(); + + assigned = true; + } + + i = (i + 1) % topSnapshot.size(); + + if (assigned) + idx = i; + } while (i != idx); + } + + if (!assigned) + throw new IllegalStateException("Failed to find assignable node for partition."); + } + } + + /** + * Tries to balance assignments between existing nodes in topology. + * + * @param tier Tier to assign. + * @param pendingParts Pending partitions per tier. + * @param fullMap Full assignment map to modify. + * @param topSnapshot Topology snapshot. + */ + private void balance(int tier, Map<Integer, Queue<Integer>> pendingParts, FullAssignmentMap fullMap, + Collection<ClusterNode> topSnapshot) { + int idealPartCnt = parts / topSnapshot.size(); + + Map<UUID, PartitionSet> mapping = fullMap.tierMapping(tier); + + PrioritizedPartitionMap underloadedNodes = filterNodes(mapping, idealPartCnt, false); + PrioritizedPartitionMap overloadedNodes = filterNodes(mapping, idealPartCnt, true); + + do { + boolean retry = false; + + for (PartitionSet overloaded : overloadedNodes.assignments()) { + for (Integer part : overloaded.partitions()) { + boolean assigned = false; + + for (PartitionSet underloaded : underloadedNodes.assignments()) { + if (fullMap.assign(part, tier, underloaded.node(), false, pendingParts)) { + // Size of partition sets has changed. + if (overloaded.size() <= idealPartCnt) + overloadedNodes.remove(overloaded.nodeId()); + else + overloadedNodes.update(); + + if (underloaded.size() >= idealPartCnt) + underloadedNodes.remove(underloaded.nodeId()); + else + underloadedNodes.update(); + + assigned = true; + + retry = true; + + break; + } + } + + if (!assigned) { + for (PartitionSet underloaded : underloadedNodes.assignments()) { + if (fullMap.assign(part, tier, underloaded.node(), true, pendingParts)) { + // Size of partition sets has changed. + if (overloaded.size() <= idealPartCnt) + overloadedNodes.remove(overloaded.nodeId()); + else + overloadedNodes.update(); + + if (underloaded.size() >= idealPartCnt) + underloadedNodes.remove(underloaded.nodeId()); + else + underloadedNodes.update(); + + retry = true; + + break; + } + } + } + + if (retry) + break; // for part. + } + + if (retry) + break; // for overloaded. + } + + if (!retry) + break; + } + while (true); + } + + /** + * Constructs underloaded or overloaded partition map. + * + * @param mapping Mapping to filter. + * @param idealPartCnt Ideal number of partitions per node. + * @param overloaded {@code True} if should create overloaded map, {@code false} for underloaded. + * @return Prioritized partition map. + */ + private PrioritizedPartitionMap filterNodes(Map<UUID, PartitionSet> mapping, int idealPartCnt, boolean overloaded) { + assert mapping != null; + + PrioritizedPartitionMap res = new PrioritizedPartitionMap(overloaded ? DESC_CMP : ASC_CMP); + + for (PartitionSet set : mapping.values()) { + if ((overloaded && set.size() > idealPartCnt) || (!overloaded && set.size() < idealPartCnt)) + res.add(set); + } + + return res; + } + + /** + * Creates copy of previous partition assignment. + * + * @param ctx Affinity function context. + * @param topSnapshot Topology snapshot. + * @return Assignment copy and per node partition map. + */ + private IgniteBiTuple<List<List<ClusterNode>>, Map<UUID, PartitionSet>> createCopy( + CacheAffinityFunctionContext ctx, Iterable<ClusterNode> topSnapshot) { + IgniteDiscoveryEvent discoEvt = ctx.discoveryEvent(); + + UUID leftNodeId = discoEvt.type() == IgniteEventType.EVT_NODE_JOINED ? null : discoEvt.eventNode().id(); + + List<List<ClusterNode>> cp = new ArrayList<>(parts); + + Map<UUID, PartitionSet> parts = new HashMap<>(); + + for (int part = 0; part < this.parts; part++) { + List<ClusterNode> partNodes = ctx.previousAssignment(part); + + List<ClusterNode> partNodesCp = new ArrayList<>(partNodes.size()); + + for (ClusterNode affNode : partNodes) { + if (!affNode.id().equals(leftNodeId)) { + partNodesCp.add(affNode); + + PartitionSet partSet = parts.get(affNode.id()); + + if (partSet == null) { + partSet = new PartitionSet(affNode); + + parts.put(affNode.id(), partSet); + } + + partSet.add(part); + } + } + + cp.add(partNodesCp); + } + + if (leftNodeId == null) { + // Node joined, find it and add empty set to mapping. + ClusterNode joinedNode = null; + + for (ClusterNode node : topSnapshot) { + if (node.id().equals(discoEvt.eventNode().id())) { + joinedNode = node; + + break; + } + } + + assert joinedNode != null; + + parts.put(joinedNode.id(), new PartitionSet(joinedNode)); + } + + return F.t(cp, parts); + } + + /** + * + */ + private static class PartitionSetComparator implements Comparator<PartitionSet>, Serializable { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private boolean descending; + + /** + * @param descending {@code True} if comparator should be descending. + */ + private PartitionSetComparator(boolean descending) { + this.descending = descending; + } + + /** {@inheritDoc} */ + @Override public int compare(PartitionSet o1, PartitionSet o2) { + int res = o1.parts.size() < o2.parts.size() ? -1 : o1.parts.size() > o2.parts.size() ? 1 : 0; + + return descending ? -res : res; + } + } + + /** + * Prioritized partition map. Ordered structure in which nodes are ordered in ascending or descending order + * by number of partitions assigned to a node. + */ + private static class PrioritizedPartitionMap { + /** Comparator. */ + private Comparator<PartitionSet> cmp; + + /** Assignment map. */ + private Map<UUID, PartitionSet> assignmentMap = new HashMap<>(); + + /** Assignment list, ordered according to comparator. */ + private List<PartitionSet> assignmentList = new ArrayList<>(); + + /** + * @param cmp Comparator. + */ + private PrioritizedPartitionMap(Comparator<PartitionSet> cmp) { + this.cmp = cmp; + } + + /** + * @param set Partition set to add. + */ + public void add(PartitionSet set) { + PartitionSet old = assignmentMap.put(set.nodeId(), set); + + if (old == null) { + assignmentList.add(set); + + update(); + } + } + + /** + * Sorts assignment list. + */ + public void update() { + Collections.sort(assignmentList, cmp); + } + + /** + * @return Sorted assignment list. + */ + public List<PartitionSet> assignments() { + return assignmentList; + } + + public void remove(UUID uuid) { + PartitionSet rmv = assignmentMap.remove(uuid); + + assignmentList.remove(rmv); + } + + public boolean isEmpty() { + return assignmentList.isEmpty(); + } + } + + /** + * Constructs assignment map for specified tier. + * + * @param tier Tier number, -1 for all tiers altogether. + * @param assignment Assignment to construct map from. + * @param topSnapshot Topology snapshot. + * @return Assignment map. + */ + private static Map<UUID, PartitionSet> assignments(int tier, List<List<ClusterNode>> assignment, + Collection<ClusterNode> topSnapshot) { + Map<UUID, PartitionSet> tmp = new LinkedHashMap<>(); + + for (int part = 0; part < assignment.size(); part++) { + List<ClusterNode> nodes = assignment.get(part); + + assert nodes instanceof RandomAccess; + + if (nodes.size() <= tier) + continue; + + int start = tier < 0 ? 0 : tier; + int end = tier < 0 ? nodes.size() : tier + 1; + + for (int i = start; i < end; i++) { + ClusterNode n = nodes.get(i); + + PartitionSet set = tmp.get(n.id()); + + if (set == null) { + set = new PartitionSet(n); + + tmp.put(n.id(), set); + } + + set.add(part); + } + } + + if (tmp.size() < topSnapshot.size()) { + for (ClusterNode node : topSnapshot) { + if (!tmp.containsKey(node.id())) + tmp.put(node.id(), new PartitionSet(node)); + } + } + + return tmp; + } + + /** + * Full assignment map. Auxiliary data structure which maintains resulting assignment and temporary + * maps consistent. + */ + @SuppressWarnings("unchecked") + private static class FullAssignmentMap { + /** Per-tier assignment maps. */ + private Map<UUID, PartitionSet>[] tierMaps; + + /** Full assignment map. */ + private Map<UUID, PartitionSet> fullMap; + + /** Resulting assignment. */ + private List<List<ClusterNode>> assignments; + + /** + * @param tiers Number of tiers. + * @param assignments Assignments to modify. + * @param topSnapshot Topology snapshot. + */ + private FullAssignmentMap(int tiers, List<List<ClusterNode>> assignments, Collection<ClusterNode> topSnapshot) { + this.assignments = assignments; + + tierMaps = new Map[tiers]; + + for (int tier = 0; tier < tiers; tier++) + tierMaps[tier] = assignments(tier, assignments, topSnapshot); + + fullMap = assignments(-1, assignments, topSnapshot); + } + + /** + * Tries to assign partition to given node on specified tier. If force is false, assignment will succeed + * only if this partition is not already assigned to a node. If force is true, then assignment will succeed + * only if partition is not assigned to a tier with number less than passed in. Assigned partition from + * greater tier will be moved to pending queue. + * + * @param part Partition to assign. + * @param tier Tier number to assign. + * @param node Node to move partition to. + * @param force Force flag. + * @param pendingParts per tier pending partitions map. + * @return {@code True} if assignment succeeded. + */ + boolean assign(int part, int tier, ClusterNode node, boolean force, Map<Integer, Queue<Integer>> pendingParts) { + UUID nodeId = node.id(); + + if (!fullMap.get(nodeId).contains(part)) { + tierMaps[tier].get(nodeId).add(part); + + fullMap.get(nodeId).add(part); + + List<ClusterNode> assignment = assignments.get(part); + + if (assignment.size() <= tier) + assignment.add(node); + else { + ClusterNode oldNode = assignment.set(tier, node); + + if (oldNode != null) { + UUID oldNodeId = oldNode.id(); + + tierMaps[tier].get(oldNodeId).remove(part); + fullMap.get(oldNodeId).remove(part); + } + } + + return true; + } + else if (force) { + assert !tierMaps[tier].get(nodeId).contains(part); + + // Check previous tiers first. + for (int t = 0; t < tier; t++) { + if (tierMaps[t].get(nodeId).contains(part)) + return false; + } + + // Partition is on some lower tier, switch it. + for (int t = tier + 1; t < tierMaps.length; t++) { + if (tierMaps[t].get(nodeId).contains(part)) { + ClusterNode oldNode = assignments.get(part).get(tier); + + // Move partition from level t to tier. + assignments.get(part).set(tier, node); + assignments.get(part).set(t, null); + + if (oldNode != null) { + tierMaps[tier].get(oldNode.id()).remove(part); + fullMap.get(oldNode.id()).remove(part); + } + + tierMaps[tier].get(nodeId).add(part); + tierMaps[t].get(nodeId).remove(part); + + Queue<Integer> pending = pendingParts.get(t); + + if (pending == null) { + pending = new LinkedList<>(); + + pendingParts.put(t, pending); + } + + pending.add(part); + + return true; + } + } + + throw new IllegalStateException("Unable to assign partition to node while force is true."); + } + + // !force. + return false; + } + + /** + * Gets tier mapping. + * + * @param tier Tier to get mapping. + * @return Per node map. + */ + public Map<UUID, PartitionSet> tierMapping(int tier) { + return tierMaps[tier]; + } + } + + /** + * Applies a supplemental hash function to a given hashCode, which + * defends against poor quality hash functions. + * + * @param h Hash code. + * @return Enhanced hash code. + */ + private static int hash(int h) { + // Spread bits to regularize both segment and index locations, + // using variant of single-word Wang/Jenkins hash. + h += (h << 15) ^ 0xffffcd7d; + h ^= (h >>> 10); + h += (h << 3); + h ^= (h >>> 6); + h += (h << 2) + (h << 14); + return h ^ (h >>> 16); + } + + @SuppressWarnings("ComparableImplementedButEqualsNotOverridden") + private static class PartitionSet { + /** */ + private ClusterNode node; + + /** Partitions. */ + private Collection<Integer> parts = new LinkedList<>(); + + /** + * @param node Node. + */ + private PartitionSet(ClusterNode node) { + this.node = node; + } + + /** + * @return Node. + */ + private ClusterNode node() { + return node; + } + + /** + * @return Node ID. + */ + private UUID nodeId() { + return node.id(); + } + + /** + * @return Partition set size. + */ + private int size() { + return parts.size(); + } + + /** + * Adds partition to partition set. + * + * @param part Partition to add. + * @return {@code True} if partition was added, {@code false} if partition already exists. + */ + private boolean add(int part) { + if (!parts.contains(part)) { + parts.add(part); + + return true; + } + + return false; + } + + /** + * @param part Partition to remove. + */ + private void remove(Integer part) { + parts.remove(part); // Remove object, not index. + } + + /** + * @return Partitions. + */ + @SuppressWarnings("TypeMayBeWeakened") + private Collection<Integer> partitions() { + return parts; + } + + /** + * Checks if partition set contains given partition. + * + * @param part Partition to check. + * @return {@code True} if partition set contains given partition. + */ + private boolean contains(int part) { + return parts.contains(part); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return "PartSet [nodeId=" + node.id() + ", size=" + parts.size() + ", parts=" + parts + ']'; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25609651/modules/core/src/main/java/org/apache/ignite/cache/affinity/fair/package.html ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/affinity/fair/package.html b/modules/core/src/main/java/org/apache/ignite/cache/affinity/fair/package.html new file mode 100644 index 0000000..f71f3c2 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/cache/affinity/fair/package.html @@ -0,0 +1,24 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + +<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> +<html> +<body> + <!-- Package description. --> + Contains fair cache affinity for partitioned cache. +</body> +</html> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25609651/modules/core/src/main/java/org/apache/ignite/cache/affinity/package.html ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/affinity/package.html b/modules/core/src/main/java/org/apache/ignite/cache/affinity/package.html new file mode 100644 index 0000000..defee90 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/cache/affinity/package.html @@ -0,0 +1,24 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + +<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> +<html> +<body> + <!-- Package description. --> + Contains cache node affinity implementations. +</body> +</html> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25609651/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/CacheRendezvousAffinityFunction.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/CacheRendezvousAffinityFunction.java b/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/CacheRendezvousAffinityFunction.java new file mode 100644 index 0000000..1a7a6ff --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/CacheRendezvousAffinityFunction.java @@ -0,0 +1,502 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.affinity.rendezvous; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.marshaller.*; +import org.apache.ignite.marshaller.optimized.*; +import org.gridgain.grid.cache.*; +import org.apache.ignite.cache.affinity.*; +import org.gridgain.grid.kernal.*; +import org.gridgain.grid.util.typedef.*; +import org.gridgain.grid.util.typedef.internal.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.security.*; +import java.util.*; + +/** + * Affinity function for partitioned cache based on Highest Random Weight algorithm. + * This function supports the following configuration: + * <ul> + * <li> + * {@code partitions} - Number of partitions to spread across nodes. + * </li> + * <li> + * {@code excludeNeighbors} - If set to {@code true}, will exclude same-host-neighbors + * from being backups of each other. Note that {@code backupFilter} is ignored if + * {@code excludeNeighbors} is set to {@code true}. + * </li> + * <li> + * {@code backupFilter} - Optional filter for back up nodes. If provided, then only + * nodes that pass this filter will be selected as backup nodes. If not provided, then + * primary and backup nodes will be selected out of all nodes available for this cache. + * </li> + * </ul> + * <p> + * Cache affinity can be configured for individual caches via {@link GridCacheConfiguration#getAffinity()} method. + */ +public class CacheRendezvousAffinityFunction implements CacheAffinityFunction, Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** Default number of partitions. */ + public static final int DFLT_PARTITION_COUNT = 10000; + + /** Comparator. */ + private static final Comparator<IgniteBiTuple<Long, ClusterNode>> COMPARATOR = + new HashComparator(); + + /** Thread local message digest. */ + private ThreadLocal<MessageDigest> digest = new ThreadLocal<MessageDigest>() { + @Override protected MessageDigest initialValue() { + try { + return MessageDigest.getInstance("MD5"); + } + catch (NoSuchAlgorithmException e) { + assert false : "Should have failed in constructor"; + + throw new IgniteException("Failed to obtain message digest (digest was available in constructor)", + e); + } + } + }; + + /** Number of partitions. */ + private int parts; + + /** Exclude neighbors flag. */ + private boolean exclNeighbors; + + /** Optional backup filter. First node is primary, second node is a node being tested. */ + private IgniteBiPredicate<ClusterNode, ClusterNode> backupFilter; + + /** Hash ID resolver. */ + private CacheAffinityNodeHashResolver hashIdRslvr = new CacheAffinityNodeAddressHashResolver(); + + /** Marshaller. */ + private IgniteMarshaller marshaller = new IgniteOptimizedMarshaller(false); + + /** + * Empty constructor with all defaults. + */ + public CacheRendezvousAffinityFunction() { + this(false); + } + + /** + * Initializes affinity with flag to exclude same-host-neighbors from being backups of each other + * and specified number of backups. + * <p> + * Note that {@code excludeNeighbors} parameter is ignored if {@code #getBackupFilter()} is set. + * + * @param exclNeighbors {@code True} if nodes residing on the same host may not act as backups + * of each other. + */ + public CacheRendezvousAffinityFunction(boolean exclNeighbors) { + this(exclNeighbors, DFLT_PARTITION_COUNT); + } + + /** + * Initializes affinity with flag to exclude same-host-neighbors from being backups of each other, + * and specified number of backups and partitions. + * <p> + * Note that {@code excludeNeighbors} parameter is ignored if {@code #getBackupFilter()} is set. + * + * @param exclNeighbors {@code True} if nodes residing on the same host may not act as backups + * of each other. + * @param parts Total number of partitions. + */ + public CacheRendezvousAffinityFunction(boolean exclNeighbors, int parts) { + this(exclNeighbors, parts, null); + } + + /** + * Initializes optional counts for replicas and backups. + * <p> + * Note that {@code excludeNeighbors} parameter is ignored if {@code backupFilter} is set. + * + * @param parts Total number of partitions. + * @param backupFilter Optional back up filter for nodes. If provided, backups will be selected + * from all nodes that pass this filter. First argument for this filter is primary node, and second + * argument is node being tested. + * <p> + * Note that {@code excludeNeighbors} parameter is ignored if {@code backupFilter} is set. + */ + public CacheRendezvousAffinityFunction(int parts, + @Nullable IgniteBiPredicate<ClusterNode, ClusterNode> backupFilter) { + this(false, parts, backupFilter); + } + + /** + * Private constructor. + * + * @param exclNeighbors Exclude neighbors flag. + * @param parts Partitions count. + * @param backupFilter Backup filter. + */ + private CacheRendezvousAffinityFunction(boolean exclNeighbors, int parts, + IgniteBiPredicate<ClusterNode, ClusterNode> backupFilter) { + A.ensure(parts != 0, "parts != 0"); + + this.exclNeighbors = exclNeighbors; + this.parts = parts; + this.backupFilter = backupFilter; + + try { + MessageDigest.getInstance("MD5"); + } + catch (NoSuchAlgorithmException e) { + throw new IgniteException("Failed to obtain MD5 message digest instance.", e); + } + } + + /** + * Gets total number of key partitions. To ensure that all partitions are + * equally distributed across all nodes, please make sure that this + * number is significantly larger than a number of nodes. Also, partition + * size should be relatively small. Try to avoid having partitions with more + * than quarter million keys. + * <p> + * Note that for fully replicated caches this method should always + * return {@code 1}. + * + * @return Total partition count. + */ + public int getPartitions() { + return parts; + } + + /** + * Sets total number of partitions. + * + * @param parts Total number of partitions. + */ + public void setPartitions(int parts) { + this.parts = parts; + } + + /** + * Gets hash ID resolver for nodes. This resolver is used to provide + * alternate hash ID, other than node ID. + * <p> + * Node IDs constantly change when nodes get restarted, which causes them to + * be placed on different locations in the hash ring, and hence causing + * repartitioning. Providing an alternate hash ID, which survives node restarts, + * puts node on the same location on the hash ring, hence minimizing required + * repartitioning. + * + * @return Hash ID resolver. + */ + public CacheAffinityNodeHashResolver getHashIdResolver() { + return hashIdRslvr; + } + + /** + * Sets hash ID resolver for nodes. This resolver is used to provide + * alternate hash ID, other than node ID. + * <p> + * Node IDs constantly change when nodes get restarted, which causes them to + * be placed on different locations in the hash ring, and hence causing + * repartitioning. Providing an alternate hash ID, which survives node restarts, + * puts node on the same location on the hash ring, hence minimizing required + * repartitioning. + * + * @param hashIdRslvr Hash ID resolver. + */ + public void setHashIdResolver(CacheAffinityNodeHashResolver hashIdRslvr) { + this.hashIdRslvr = hashIdRslvr; + } + + /** + * Gets optional backup filter. If not {@code null}, backups will be selected + * from all nodes that pass this filter. First node passed to this filter is primary node, + * and second node is a node being tested. + * <p> + * Note that {@code excludeNeighbors} parameter is ignored if {@code backupFilter} is set. + * + * @return Optional backup filter. + */ + @Nullable public IgniteBiPredicate<ClusterNode, ClusterNode> getBackupFilter() { + return backupFilter; + } + + /** + * Sets optional backup filter. If provided, then backups will be selected from all + * nodes that pass this filter. First node being passed to this filter is primary node, + * and second node is a node being tested. + * <p> + * Note that {@code excludeNeighbors} parameter is ignored if {@code backupFilter} is set. + * + * @param backupFilter Optional backup filter. + */ + public void setBackupFilter(@Nullable IgniteBiPredicate<ClusterNode, ClusterNode> backupFilter) { + this.backupFilter = backupFilter; + } + + /** + * Checks flag to exclude same-host-neighbors from being backups of each other (default is {@code false}). + * <p> + * Note that {@code excludeNeighbors} parameter is ignored if {@code #getBackupFilter()} is set. + * + * @return {@code True} if nodes residing on the same host may not act as backups of each other. + */ + public boolean isExcludeNeighbors() { + return exclNeighbors; + } + + /** + * Sets flag to exclude same-host-neighbors from being backups of each other (default is {@code false}). + * <p> + * Note that {@code excludeNeighbors} parameter is ignored if {@code #getBackupFilter()} is set. + * + * @param exclNeighbors {@code True} if nodes residing on the same host may not act as backups of each other. + */ + public void setExcludeNeighbors(boolean exclNeighbors) { + this.exclNeighbors = exclNeighbors; + } + + /** + * Returns collection of nodes (primary first) for specified partition. + */ + public List<ClusterNode> assignPartition(int part, List<ClusterNode> nodes, int backups, + @Nullable Map<UUID, Collection<ClusterNode>> neighborhoodCache) { + if (nodes.size() <= 1) + return nodes; + + List<IgniteBiTuple<Long, ClusterNode>> lst = new ArrayList<>(); + + MessageDigest d = digest.get(); + + for (ClusterNode node : nodes) { + Object nodeHash = hashIdRslvr.resolve(node); + + try { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + + byte[] nodeHashBytes = marshaller.marshal(nodeHash); + + out.write(nodeHashBytes, 0, nodeHashBytes.length); // Avoid IOException. + out.write(U.intToBytes(part), 0, 4); // Avoid IOException. + + d.reset(); + + byte[] bytes = d.digest(out.toByteArray()); + + long hash = + (bytes[0] & 0xFFL) + | ((bytes[1] & 0xFFL) << 8) + | ((bytes[2] & 0xFFL) << 16) + | ((bytes[3] & 0xFFL) << 24) + | ((bytes[4] & 0xFFL) << 32) + | ((bytes[5] & 0xFFL) << 40) + | ((bytes[6] & 0xFFL) << 48) + | ((bytes[7] & 0xFFL) << 56); + + lst.add(F.t(hash, node)); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + } + + Collections.sort(lst, COMPARATOR); + + int primaryAndBackups; + + List<ClusterNode> res; + + if (backups == Integer.MAX_VALUE) { + primaryAndBackups = Integer.MAX_VALUE; + + res = new ArrayList<>(); + } + else { + primaryAndBackups = backups + 1; + + res = new ArrayList<>(primaryAndBackups); + } + + ClusterNode primary = lst.get(0).get2(); + + res.add(primary); + + // Select backups. + if (backups > 0) { + for (int i = 1; i < lst.size(); i++) { + IgniteBiTuple<Long, ClusterNode> next = lst.get(i); + + ClusterNode node = next.get2(); + + if (exclNeighbors) { + Collection<ClusterNode> allNeighbors = allNeighbors(neighborhoodCache, res); + + if (!allNeighbors.contains(node)) + res.add(node); + } + else { + if (!res.contains(node) && (backupFilter == null || backupFilter.apply(primary, node))) + res.add(next.get2()); + } + + if (res.size() == primaryAndBackups) + break; + } + } + + if (res.size() < primaryAndBackups && nodes.size() >= primaryAndBackups && exclNeighbors) { + // Need to iterate one more time in case if there are no nodes which pass exclude backups criteria. + for (int i = 1; i < lst.size(); i++) { + IgniteBiTuple<Long, ClusterNode> next = lst.get(i); + + ClusterNode node = next.get2(); + + if (!res.contains(node)) + res.add(next.get2()); + + if (res.size() == primaryAndBackups) + break; + } + } + + assert res.size() <= primaryAndBackups; + + return res; + } + + /** {@inheritDoc} */ + @Override public void reset() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public int partitions() { + return parts; + } + + /** {@inheritDoc} */ + @Override public int partition(Object key) { + return U.safeAbs(key.hashCode() % parts); + } + + /** {@inheritDoc} */ + @Override public List<List<ClusterNode>> assignPartitions(CacheAffinityFunctionContext affCtx) { + List<List<ClusterNode>> assignments = new ArrayList<>(parts); + + Map<UUID, Collection<ClusterNode>> neighborhoodCache = exclNeighbors ? + neighbors(affCtx.currentTopologySnapshot()) : null; + + for (int i = 0; i < parts; i++) { + List<ClusterNode> partAssignment = assignPartition(i, affCtx.currentTopologySnapshot(), affCtx.backups(), + neighborhoodCache); + + assignments.add(partAssignment); + } + + return assignments; + } + + /** {@inheritDoc} */ + @Override public void removeNode(UUID nodeId) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeInt(parts); + out.writeBoolean(exclNeighbors); + out.writeObject(hashIdRslvr); + out.writeObject(backupFilter); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + parts = in.readInt(); + exclNeighbors = in.readBoolean(); + hashIdRslvr = (CacheAffinityNodeHashResolver)in.readObject(); + backupFilter = (IgniteBiPredicate<ClusterNode, ClusterNode>)in.readObject(); + } + + /** + * Builds neighborhood map for all nodes in snapshot. + * + * @param topSnapshot Topology snapshot. + * @return Neighbors map. + */ + private Map<UUID, Collection<ClusterNode>> neighbors(Collection<ClusterNode> topSnapshot) { + Map<String, Collection<ClusterNode>> macMap = new HashMap<>(topSnapshot.size(), 1.0f); + + // Group by mac addresses. + for (ClusterNode node : topSnapshot) { + String macs = node.attribute(GridNodeAttributes.ATTR_MACS); + + Collection<ClusterNode> nodes = macMap.get(macs); + + if (nodes == null) { + nodes = new HashSet<>(); + + macMap.put(macs, nodes); + } + + nodes.add(node); + } + + Map<UUID, Collection<ClusterNode>> neighbors = new HashMap<>(topSnapshot.size(), 1.0f); + + for (Collection<ClusterNode> group : macMap.values()) { + for (ClusterNode node : group) + neighbors.put(node.id(), group); + } + + return neighbors; + } + + /** + * @param neighborhoodCache Neighborhood cache. + * @param nodes Nodes. + * @return All neighbors for given nodes. + */ + private Collection<ClusterNode> allNeighbors(Map<UUID, Collection<ClusterNode>> neighborhoodCache, + Iterable<ClusterNode> nodes) { + Collection<ClusterNode> res = new HashSet<>(); + + for (ClusterNode node : nodes) { + if (!res.contains(node)) + res.addAll(neighborhoodCache.get(node.id())); + } + + return res; + } + + /** + * + */ + private static class HashComparator implements Comparator<IgniteBiTuple<Long, ClusterNode>>, Serializable { + /** */ + private static final long serialVersionUID = 0L; + + /** {@inheritDoc} */ + @Override public int compare(IgniteBiTuple<Long, ClusterNode> o1, IgniteBiTuple<Long, ClusterNode> o2) { + return o1.get1() < o2.get1() ? -1 : o1.get1() > o2.get1() ? 1 : + o1.get2().id().compareTo(o2.get2().id()); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25609651/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/package.html ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/package.html b/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/package.html new file mode 100644 index 0000000..780cabc --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/package.html @@ -0,0 +1,23 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> +<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> +<html> +<body> +<!-- Package description. --> +Contains HRW-based cache affinity for partitioned cache. +</body> +</html>