http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4ba0b01a/modules/core/src/main/java/org/apache/ignite/cache/affinity/fair/FairAffinityFunction.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/affinity/fair/FairAffinityFunction.java b/modules/core/src/main/java/org/apache/ignite/cache/affinity/fair/FairAffinityFunction.java new file mode 100644 index 0000000..0b9e33b --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/cache/affinity/fair/FairAffinityFunction.java @@ -0,0 +1,777 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.affinity.fair; + +import org.apache.ignite.cache.affinity.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.events.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.io.*; +import java.util.*; + +/** + * Fair affinity function which tries to ensure that all nodes get equal number of partitions with + * minimum amount of reassignments between existing nodes. + * <p> + * Cache affinity can be configured for individual caches via {@link CacheConfiguration#getAffinity()} method. + */ +@CentralizedAffinityFunction +public class FairAffinityFunction implements AffinityFunction { + /** Default partition count. */ + public static final int DFLT_PART_CNT = 256; + + /** */ + private static final long serialVersionUID = 0L; + + /** Ascending comparator. */ + private static final Comparator<PartitionSet> ASC_CMP = new PartitionSetComparator(); + + /** Descending comparator. */ + private static final Comparator<PartitionSet> DESC_CMP = Collections.reverseOrder(ASC_CMP); + + /** */ + private final int parts; + + /** + * Creates fair affinity with default partition count. + */ + public FairAffinityFunction() { + this(DFLT_PART_CNT); + } + + /** + * @param parts Number of partitions. + */ + public FairAffinityFunction(int parts) { + this.parts = parts; + } + + /** {@inheritDoc} */ + @Override public List<List<ClusterNode>> assignPartitions(AffinityFunctionContext ctx) { + List<ClusterNode> topSnapshot = ctx.currentTopologySnapshot(); + + if (topSnapshot.size() == 1) { + ClusterNode primary = topSnapshot.get(0); + + return Collections.nCopies(parts, Collections.singletonList(primary)); + } + + List<List<ClusterNode>> assignment = createCopy(ctx); + + int tiers = Math.min(ctx.backups() + 1, topSnapshot.size()); + + // Per tier pending partitions. + Map<Integer, Queue<Integer>> pendingParts = new HashMap<>(); + + FullAssignmentMap fullMap = new FullAssignmentMap(tiers, assignment, topSnapshot); + + for (int tier = 0; tier < tiers; tier++) { + // Check if this is a new tier and add pending partitions. + Queue<Integer> pending = pendingParts.get(tier); + + for (int part = 0; part < parts; part++) { + if (fullMap.assignments.get(part).size() < tier + 1) { + if (pending == null) { + pending = new LinkedList<>(); + + pendingParts.put(tier, pending); + } + + if (!pending.contains(part)) + pending.add(part); + + } + } + + // Assign pending partitions, if any. + assignPending(tier, pendingParts, fullMap, topSnapshot); + + // Balance assignments. + balance(tier, pendingParts, fullMap, topSnapshot); + } + + return fullMap.assignments; + } + + /** {@inheritDoc} */ + @Override public void reset() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public int partitions() { + return parts; + } + + /** {@inheritDoc} */ + @Override public int partition(Object key) { + return U.safeAbs(hash(key.hashCode())) % parts; + } + + /** {@inheritDoc} */ + @Override public void removeNode(UUID nodeId) { + // No-op. + } + + /** + * Assigns pending (unassigned) partitions to nodes. + * + * @param tier Tier to assign (0 is primary, 1 - 1st backup,...). + * @param pendingMap Pending partitions per tier. + * @param fullMap Full assignment map to modify. + * @param topSnapshot Topology snapshot. + */ + private void assignPending(int tier, Map<Integer, Queue<Integer>> pendingMap, FullAssignmentMap fullMap, + List<ClusterNode> topSnapshot) { + Queue<Integer> pending = pendingMap.get(tier); + + if (F.isEmpty(pending)) + return; + + int idealPartCnt = parts / topSnapshot.size(); + + Map<UUID, PartitionSet> tierMapping = fullMap.tierMapping(tier); + + PrioritizedPartitionMap underloadedNodes = filterNodes(tierMapping, idealPartCnt, false); + + // First iterate over underloaded nodes. + assignPendingToUnderloaded(tier, pendingMap, fullMap, underloadedNodes, topSnapshot, false); + + if (!pending.isEmpty() && !underloadedNodes.isEmpty()) { + // Same, forcing updates. + assignPendingToUnderloaded(tier, pendingMap, fullMap, underloadedNodes, topSnapshot, true); + } + + if (!pending.isEmpty()) + assignPendingToNodes(tier, pendingMap, fullMap, topSnapshot); + + assert pending.isEmpty(); + + pendingMap.remove(tier); + } + + /** + * Assigns pending partitions to underloaded nodes. + * + * @param tier Tier to assign. + * @param pendingMap Pending partitions per tier. + * @param fullMap Full assignment map to modify. + * @param underloadedNodes Underloaded nodes. + * @param topSnapshot Topology snapshot. + * @param force {@code True} if partitions should be moved. + */ + private void assignPendingToUnderloaded( + int tier, + Map<Integer, Queue<Integer>> pendingMap, + FullAssignmentMap fullMap, + PrioritizedPartitionMap underloadedNodes, + Collection<ClusterNode> topSnapshot, + boolean force) { + Iterator<Integer> it = pendingMap.get(tier).iterator(); + + int ideal = parts / topSnapshot.size(); + + while (it.hasNext()) { + int part = it.next(); + + for (PartitionSet set : underloadedNodes.assignments()) { + ClusterNode node = set.node(); + + assert node != null; + + if (fullMap.assign(part, tier, node, force, pendingMap)) { + // We could add partition to partition map without forcing, remove partition from pending. + it.remove(); + + if (set.size() <= ideal) + underloadedNodes.remove(set.nodeId()); + else + underloadedNodes.update(); + + break; // for, continue to the next partition. + } + } + + if (underloadedNodes.isEmpty()) + return; + } + } + + /** + * Spreads pending partitions equally to all nodes in topology snapshot. + * + * @param tier Tier to assign. + * @param pendingMap Pending partitions per tier. + * @param fullMap Full assignment map to modify. + * @param topSnapshot Topology snapshot. + */ + private void assignPendingToNodes(int tier, Map<Integer, Queue<Integer>> pendingMap, + FullAssignmentMap fullMap, List<ClusterNode> topSnapshot) { + Iterator<Integer> it = pendingMap.get(tier).iterator(); + + int idx = 0; + + while (it.hasNext()) { + int part = it.next(); + + int i = idx; + + boolean assigned = false; + + do { + ClusterNode node = topSnapshot.get(i); + + if (fullMap.assign(part, tier, node, false, pendingMap)) { + it.remove(); + + assigned = true; + } + + i = (i + 1) % topSnapshot.size(); + + if (assigned) + idx = i; + } while (i != idx); + + if (!assigned) { + do { + ClusterNode node = topSnapshot.get(i); + + if (fullMap.assign(part, tier, node, true, pendingMap)) { + it.remove(); + + assigned = true; + } + + i = (i + 1) % topSnapshot.size(); + + if (assigned) + idx = i; + } while (i != idx); + } + + if (!assigned) + throw new IllegalStateException("Failed to find assignable node for partition."); + } + } + + /** + * Tries to balance assignments between existing nodes in topology. + * + * @param tier Tier to assign. + * @param pendingParts Pending partitions per tier. + * @param fullMap Full assignment map to modify. + * @param topSnapshot Topology snapshot. + */ + private void balance(int tier, Map<Integer, Queue<Integer>> pendingParts, FullAssignmentMap fullMap, + Collection<ClusterNode> topSnapshot) { + int idealPartCnt = parts / topSnapshot.size(); + + Map<UUID, PartitionSet> mapping = fullMap.tierMapping(tier); + + PrioritizedPartitionMap underloadedNodes = filterNodes(mapping, idealPartCnt, false); + PrioritizedPartitionMap overloadedNodes = filterNodes(mapping, idealPartCnt, true); + + do { + boolean retry = false; + + for (PartitionSet overloaded : overloadedNodes.assignments()) { + for (Integer part : overloaded.partitions()) { + boolean assigned = false; + + for (PartitionSet underloaded : underloadedNodes.assignments()) { + if (fullMap.assign(part, tier, underloaded.node(), false, pendingParts)) { + // Size of partition sets has changed. + if (overloaded.size() <= idealPartCnt) + overloadedNodes.remove(overloaded.nodeId()); + else + overloadedNodes.update(); + + if (underloaded.size() >= idealPartCnt) + underloadedNodes.remove(underloaded.nodeId()); + else + underloadedNodes.update(); + + assigned = true; + + retry = true; + + break; + } + } + + if (!assigned) { + for (PartitionSet underloaded : underloadedNodes.assignments()) { + if (fullMap.assign(part, tier, underloaded.node(), true, pendingParts)) { + // Size of partition sets has changed. + if (overloaded.size() <= idealPartCnt) + overloadedNodes.remove(overloaded.nodeId()); + else + overloadedNodes.update(); + + if (underloaded.size() >= idealPartCnt) + underloadedNodes.remove(underloaded.nodeId()); + else + underloadedNodes.update(); + + retry = true; + + break; + } + } + } + + if (retry) + break; // for part. + } + + if (retry) + break; // for overloaded. + } + + if (!retry) + break; + } + while (true); + } + + /** + * Constructs underloaded or overloaded partition map. + * + * @param mapping Mapping to filter. + * @param idealPartCnt Ideal number of partitions per node. + * @param overloaded {@code True} if should create overloaded map, {@code false} for underloaded. + * @return Prioritized partition map. + */ + private PrioritizedPartitionMap filterNodes(Map<UUID, PartitionSet> mapping, int idealPartCnt, boolean overloaded) { + assert mapping != null; + + PrioritizedPartitionMap res = new PrioritizedPartitionMap(overloaded ? DESC_CMP : ASC_CMP); + + for (PartitionSet set : mapping.values()) { + if ((overloaded && set.size() > idealPartCnt) || (!overloaded && set.size() < idealPartCnt)) + res.add(set); + } + + return res; + } + + /** + * Creates copy of previous partition assignment. + * + * @param ctx Affinity function context. + * @return Assignment copy and per node partition map. + */ + private List<List<ClusterNode>> createCopy(AffinityFunctionContext ctx) { + DiscoveryEvent discoEvt = ctx.discoveryEvent(); + + UUID leftNodeId = (discoEvt == null || discoEvt.type() == EventType.EVT_NODE_JOINED) + ? null + : discoEvt.eventNode().id(); + + List<List<ClusterNode>> cp = new ArrayList<>(parts); + + for (int part = 0; part < parts; part++) { + List<ClusterNode> partNodes = ctx.previousAssignment(part); + + List<ClusterNode> partNodesCp; + + if (partNodes == null) + partNodesCp = new ArrayList<>(); + else { + if (leftNodeId == null) { + partNodesCp = new ArrayList<>(partNodes.size() + 1); // Node joined. + + partNodesCp.addAll(partNodes); + } + else { + partNodesCp = new ArrayList<>(partNodes.size()); + + for (ClusterNode affNode : partNodes) { + if (!affNode.id().equals(leftNodeId)) + partNodesCp.add(affNode); + } + } + } + + cp.add(partNodesCp); + } + + return cp; + } + + /** + * + */ + private static class PartitionSetComparator implements Comparator<PartitionSet>, Serializable { + /** */ + private static final long serialVersionUID = 0L; + + /** {@inheritDoc} */ + @Override public int compare(PartitionSet o1, PartitionSet o2) { + return Integer.compare(o1.parts.size(), o2.parts.size()); + } + } + + /** + * Prioritized partition map. Ordered structure in which nodes are ordered in ascending or descending order + * by number of partitions assigned to a node. + */ + private static class PrioritizedPartitionMap { + /** Comparator. */ + private Comparator<PartitionSet> cmp; + + /** Assignment map. */ + private Map<UUID, PartitionSet> assignmentMap = new HashMap<>(); + + /** Assignment list, ordered according to comparator. */ + private List<PartitionSet> assignmentList = new ArrayList<>(); + + /** + * @param cmp Comparator. + */ + private PrioritizedPartitionMap(Comparator<PartitionSet> cmp) { + this.cmp = cmp; + } + + /** + * @param set Partition set to add. + */ + public void add(PartitionSet set) { + PartitionSet old = assignmentMap.put(set.nodeId(), set); + + if (old == null) { + assignmentList.add(set); + + update(); + } + } + + /** + * Sorts assignment list. + */ + public void update() { + Collections.sort(assignmentList, cmp); + } + + /** + * @return Sorted assignment list. + */ + public List<PartitionSet> assignments() { + return assignmentList; + } + + /** + * @param uuid Uuid. + */ + public void remove(UUID uuid) { + PartitionSet rmv = assignmentMap.remove(uuid); + + assignmentList.remove(rmv); + } + + /** + * + */ + public boolean isEmpty() { + return assignmentList.isEmpty(); + } + } + + /** + * Constructs assignment map for specified tier. + * + * @param tier Tier number, -1 for all tiers altogether. + * @param assignment Assignment to construct map from. + * @param topSnapshot Topology snapshot. + * @return Assignment map. + */ + private static Map<UUID, PartitionSet> assignments(int tier, List<List<ClusterNode>> assignment, + Collection<ClusterNode> topSnapshot) { + Map<UUID, PartitionSet> tmp = new LinkedHashMap<>(); + + for (int part = 0; part < assignment.size(); part++) { + List<ClusterNode> nodes = assignment.get(part); + + assert nodes instanceof RandomAccess; + + if (nodes.size() <= tier) + continue; + + int start = tier < 0 ? 0 : tier; + int end = tier < 0 ? nodes.size() : tier + 1; + + for (int i = start; i < end; i++) { + ClusterNode n = nodes.get(i); + + PartitionSet set = tmp.get(n.id()); + + if (set == null) { + set = new PartitionSet(n); + + tmp.put(n.id(), set); + } + + set.add(part); + } + } + + if (tmp.size() < topSnapshot.size()) { + for (ClusterNode node : topSnapshot) { + if (!tmp.containsKey(node.id())) + tmp.put(node.id(), new PartitionSet(node)); + } + } + + return tmp; + } + + /** + * Full assignment map. Auxiliary data structure which maintains resulting assignment and temporary + * maps consistent. + */ + @SuppressWarnings("unchecked") + private static class FullAssignmentMap { + /** Per-tier assignment maps. */ + private Map<UUID, PartitionSet>[] tierMaps; + + /** Full assignment map. */ + private Map<UUID, PartitionSet> fullMap; + + /** Resulting assignment. */ + private List<List<ClusterNode>> assignments; + + /** + * @param tiers Number of tiers. + * @param assignments Assignments to modify. + * @param topSnapshot Topology snapshot. + */ + private FullAssignmentMap(int tiers, List<List<ClusterNode>> assignments, Collection<ClusterNode> topSnapshot) { + this.assignments = assignments; + + tierMaps = new Map[tiers]; + + for (int tier = 0; tier < tiers; tier++) + tierMaps[tier] = assignments(tier, assignments, topSnapshot); + + fullMap = assignments(-1, assignments, topSnapshot); + } + + /** + * Tries to assign partition to given node on specified tier. If force is false, assignment will succeed + * only if this partition is not already assigned to a node. If force is true, then assignment will succeed + * only if partition is not assigned to a tier with number less than passed in. Assigned partition from + * greater tier will be moved to pending queue. + * + * @param part Partition to assign. + * @param tier Tier number to assign. + * @param node Node to move partition to. + * @param force Force flag. + * @param pendingParts per tier pending partitions map. + * @return {@code True} if assignment succeeded. + */ + boolean assign(int part, int tier, ClusterNode node, boolean force, Map<Integer, Queue<Integer>> pendingParts) { + UUID nodeId = node.id(); + + if (!fullMap.get(nodeId).contains(part)) { + tierMaps[tier].get(nodeId).add(part); + + fullMap.get(nodeId).add(part); + + List<ClusterNode> assignment = assignments.get(part); + + if (assignment.size() <= tier) + assignment.add(node); + else { + ClusterNode oldNode = assignment.set(tier, node); + + if (oldNode != null) { + UUID oldNodeId = oldNode.id(); + + tierMaps[tier].get(oldNodeId).remove(part); + fullMap.get(oldNodeId).remove(part); + } + } + + return true; + } + else if (force) { + assert !tierMaps[tier].get(nodeId).contains(part); + + // Check previous tiers first. + for (int t = 0; t < tier; t++) { + if (tierMaps[t].get(nodeId).contains(part)) + return false; + } + + // Partition is on some lower tier, switch it. + for (int t = tier + 1; t < tierMaps.length; t++) { + if (tierMaps[t].get(nodeId).contains(part)) { + ClusterNode oldNode = assignments.get(part).get(tier); + + // Move partition from level t to tier. + assignments.get(part).set(tier, node); + assignments.get(part).set(t, null); + + if (oldNode != null) { + tierMaps[tier].get(oldNode.id()).remove(part); + fullMap.get(oldNode.id()).remove(part); + } + + tierMaps[tier].get(nodeId).add(part); + tierMaps[t].get(nodeId).remove(part); + + Queue<Integer> pending = pendingParts.get(t); + + if (pending == null) { + pending = new LinkedList<>(); + + pendingParts.put(t, pending); + } + + pending.add(part); + + return true; + } + } + + throw new IllegalStateException("Unable to assign partition to node while force is true."); + } + + // !force. + return false; + } + + /** + * Gets tier mapping. + * + * @param tier Tier to get mapping. + * @return Per node map. + */ + public Map<UUID, PartitionSet> tierMapping(int tier) { + return tierMaps[tier]; + } + } + + /** + * Applies a supplemental hash function to a given hashCode, which + * defends against poor quality hash functions. + * + * @param h Hash code. + * @return Enhanced hash code. + */ + private static int hash(int h) { + // Spread bits to regularize both segment and index locations, + // using variant of single-word Wang/Jenkins hash. + h += (h << 15) ^ 0xffffcd7d; + h ^= (h >>> 10); + h += (h << 3); + h ^= (h >>> 6); + h += (h << 2) + (h << 14); + return h ^ (h >>> 16); + } + + /** + * + */ + @SuppressWarnings("ComparableImplementedButEqualsNotOverridden") + private static class PartitionSet { + /** */ + private ClusterNode node; + + /** Partitions. */ + private Collection<Integer> parts = new LinkedList<>(); + + /** + * @param node Node. + */ + private PartitionSet(ClusterNode node) { + this.node = node; + } + + /** + * @return Node. + */ + private ClusterNode node() { + return node; + } + + /** + * @return Node ID. + */ + private UUID nodeId() { + return node.id(); + } + + /** + * @return Partition set size. + */ + private int size() { + return parts.size(); + } + + /** + * Adds partition to partition set. + * + * @param part Partition to add. + * @return {@code True} if partition was added, {@code false} if partition already exists. + */ + private boolean add(int part) { + if (!parts.contains(part)) { + parts.add(part); + + return true; + } + + return false; + } + + /** + * @param part Partition to remove. + */ + private void remove(Integer part) { + parts.remove(part); // Remove object, not index. + } + + /** + * @return Partitions. + */ + @SuppressWarnings("TypeMayBeWeakened") + private Collection<Integer> partitions() { + return parts; + } + + /** + * Checks if partition set contains given partition. + * + * @param part Partition to check. + * @return {@code True} if partition set contains given partition. + */ + private boolean contains(int part) { + return parts.contains(part); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return "PartSet [nodeId=" + node.id() + ", size=" + parts.size() + ", parts=" + parts + ']'; + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4ba0b01a/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/CacheRendezvousAffinityFunction.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/CacheRendezvousAffinityFunction.java b/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/CacheRendezvousAffinityFunction.java deleted file mode 100644 index ac0ac5e..0000000 --- a/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/CacheRendezvousAffinityFunction.java +++ /dev/null @@ -1,501 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.cache.affinity.rendezvous; - -import org.apache.ignite.*; -import org.apache.ignite.cache.affinity.*; -import org.apache.ignite.cluster.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.resources.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.security.*; -import java.util.*; - -/** - * Affinity function for partitioned cache based on Highest Random Weight algorithm. - * This function supports the following configuration: - * <ul> - * <li> - * {@code partitions} - Number of partitions to spread across nodes. - * </li> - * <li> - * {@code excludeNeighbors} - If set to {@code true}, will exclude same-host-neighbors - * from being backups of each other. Note that {@code backupFilter} is ignored if - * {@code excludeNeighbors} is set to {@code true}. - * </li> - * <li> - * {@code backupFilter} - Optional filter for back up nodes. If provided, then only - * nodes that pass this filter will be selected as backup nodes. If not provided, then - * primary and backup nodes will be selected out of all nodes available for this cache. - * </li> - * </ul> - * <p> - * Cache affinity can be configured for individual caches via {@link org.apache.ignite.configuration.CacheConfiguration#getAffinity()} method. - */ -public class CacheRendezvousAffinityFunction implements CacheAffinityFunction, Externalizable { - /** */ - private static final long serialVersionUID = 0L; - - /** Default number of partitions. */ - public static final int DFLT_PARTITION_COUNT = 1024; - - /** Comparator. */ - private static final Comparator<IgniteBiTuple<Long, ClusterNode>> COMPARATOR = - new HashComparator(); - - /** Thread local message digest. */ - private ThreadLocal<MessageDigest> digest = new ThreadLocal<MessageDigest>() { - @Override protected MessageDigest initialValue() { - try { - return MessageDigest.getInstance("MD5"); - } - catch (NoSuchAlgorithmException e) { - assert false : "Should have failed in constructor"; - - throw new IgniteException("Failed to obtain message digest (digest was available in constructor)", - e); - } - } - }; - - /** Number of partitions. */ - private int parts; - - /** Exclude neighbors flag. */ - private boolean exclNeighbors; - - /** Optional backup filter. First node is primary, second node is a node being tested. */ - private IgniteBiPredicate<ClusterNode, ClusterNode> backupFilter; - - /** Hash ID resolver. */ - private CacheAffinityNodeHashResolver hashIdRslvr = new CacheAffinityNodeAddressHashResolver(); - - /** Ignite instance. */ - @IgniteInstanceResource - private Ignite ignite; - - /** - * Empty constructor with all defaults. - */ - public CacheRendezvousAffinityFunction() { - this(false); - } - - /** - * Initializes affinity with flag to exclude same-host-neighbors from being backups of each other - * and specified number of backups. - * <p> - * Note that {@code excludeNeighbors} parameter is ignored if {@code #getBackupFilter()} is set. - * - * @param exclNeighbors {@code True} if nodes residing on the same host may not act as backups - * of each other. - */ - public CacheRendezvousAffinityFunction(boolean exclNeighbors) { - this(exclNeighbors, DFLT_PARTITION_COUNT); - } - - /** - * Initializes affinity with flag to exclude same-host-neighbors from being backups of each other, - * and specified number of backups and partitions. - * <p> - * Note that {@code excludeNeighbors} parameter is ignored if {@code #getBackupFilter()} is set. - * - * @param exclNeighbors {@code True} if nodes residing on the same host may not act as backups - * of each other. - * @param parts Total number of partitions. - */ - public CacheRendezvousAffinityFunction(boolean exclNeighbors, int parts) { - this(exclNeighbors, parts, null); - } - - /** - * Initializes optional counts for replicas and backups. - * <p> - * Note that {@code excludeNeighbors} parameter is ignored if {@code backupFilter} is set. - * - * @param parts Total number of partitions. - * @param backupFilter Optional back up filter for nodes. If provided, backups will be selected - * from all nodes that pass this filter. First argument for this filter is primary node, and second - * argument is node being tested. - * <p> - * Note that {@code excludeNeighbors} parameter is ignored if {@code backupFilter} is set. - */ - public CacheRendezvousAffinityFunction(int parts, - @Nullable IgniteBiPredicate<ClusterNode, ClusterNode> backupFilter) { - this(false, parts, backupFilter); - } - - /** - * Private constructor. - * - * @param exclNeighbors Exclude neighbors flag. - * @param parts Partitions count. - * @param backupFilter Backup filter. - */ - private CacheRendezvousAffinityFunction(boolean exclNeighbors, int parts, - IgniteBiPredicate<ClusterNode, ClusterNode> backupFilter) { - A.ensure(parts != 0, "parts != 0"); - - this.exclNeighbors = exclNeighbors; - this.parts = parts; - this.backupFilter = backupFilter; - - try { - MessageDigest.getInstance("MD5"); - } - catch (NoSuchAlgorithmException e) { - throw new IgniteException("Failed to obtain MD5 message digest instance.", e); - } - } - - /** - * Gets total number of key partitions. To ensure that all partitions are - * equally distributed across all nodes, please make sure that this - * number is significantly larger than a number of nodes. Also, partition - * size should be relatively small. Try to avoid having partitions with more - * than quarter million keys. - * <p> - * Note that for fully replicated caches this method should always - * return {@code 1}. - * - * @return Total partition count. - */ - public int getPartitions() { - return parts; - } - - /** - * Sets total number of partitions. - * - * @param parts Total number of partitions. - */ - public void setPartitions(int parts) { - this.parts = parts; - } - - /** - * Gets hash ID resolver for nodes. This resolver is used to provide - * alternate hash ID, other than node ID. - * <p> - * Node IDs constantly change when nodes get restarted, which causes them to - * be placed on different locations in the hash ring, and hence causing - * repartitioning. Providing an alternate hash ID, which survives node restarts, - * puts node on the same location on the hash ring, hence minimizing required - * repartitioning. - * - * @return Hash ID resolver. - */ - public CacheAffinityNodeHashResolver getHashIdResolver() { - return hashIdRslvr; - } - - /** - * Sets hash ID resolver for nodes. This resolver is used to provide - * alternate hash ID, other than node ID. - * <p> - * Node IDs constantly change when nodes get restarted, which causes them to - * be placed on different locations in the hash ring, and hence causing - * repartitioning. Providing an alternate hash ID, which survives node restarts, - * puts node on the same location on the hash ring, hence minimizing required - * repartitioning. - * - * @param hashIdRslvr Hash ID resolver. - */ - public void setHashIdResolver(CacheAffinityNodeHashResolver hashIdRslvr) { - this.hashIdRslvr = hashIdRslvr; - } - - /** - * Gets optional backup filter. If not {@code null}, backups will be selected - * from all nodes that pass this filter. First node passed to this filter is primary node, - * and second node is a node being tested. - * <p> - * Note that {@code excludeNeighbors} parameter is ignored if {@code backupFilter} is set. - * - * @return Optional backup filter. - */ - @Nullable public IgniteBiPredicate<ClusterNode, ClusterNode> getBackupFilter() { - return backupFilter; - } - - /** - * Sets optional backup filter. If provided, then backups will be selected from all - * nodes that pass this filter. First node being passed to this filter is primary node, - * and second node is a node being tested. - * <p> - * Note that {@code excludeNeighbors} parameter is ignored if {@code backupFilter} is set. - * - * @param backupFilter Optional backup filter. - */ - public void setBackupFilter(@Nullable IgniteBiPredicate<ClusterNode, ClusterNode> backupFilter) { - this.backupFilter = backupFilter; - } - - /** - * Checks flag to exclude same-host-neighbors from being backups of each other (default is {@code false}). - * <p> - * Note that {@code excludeNeighbors} parameter is ignored if {@code #getBackupFilter()} is set. - * - * @return {@code True} if nodes residing on the same host may not act as backups of each other. - */ - public boolean isExcludeNeighbors() { - return exclNeighbors; - } - - /** - * Sets flag to exclude same-host-neighbors from being backups of each other (default is {@code false}). - * <p> - * Note that {@code excludeNeighbors} parameter is ignored if {@code #getBackupFilter()} is set. - * - * @param exclNeighbors {@code True} if nodes residing on the same host may not act as backups of each other. - */ - public void setExcludeNeighbors(boolean exclNeighbors) { - this.exclNeighbors = exclNeighbors; - } - - /** - * Returns collection of nodes (primary first) for specified partition. - */ - public List<ClusterNode> assignPartition(int part, List<ClusterNode> nodes, int backups, - @Nullable Map<UUID, Collection<ClusterNode>> neighborhoodCache) { - if (nodes.size() <= 1) - return nodes; - - List<IgniteBiTuple<Long, ClusterNode>> lst = new ArrayList<>(); - - MessageDigest d = digest.get(); - - for (ClusterNode node : nodes) { - Object nodeHash = hashIdRslvr.resolve(node); - - try { - ByteArrayOutputStream out = new ByteArrayOutputStream(); - - byte[] nodeHashBytes = ignite.configuration().getMarshaller().marshal(nodeHash); - - out.write(U.intToBytes(part), 0, 4); // Avoid IOException. - out.write(nodeHashBytes, 0, nodeHashBytes.length); // Avoid IOException. - - d.reset(); - - byte[] bytes = d.digest(out.toByteArray()); - - long hash = - (bytes[0] & 0xFFL) - | ((bytes[1] & 0xFFL) << 8) - | ((bytes[2] & 0xFFL) << 16) - | ((bytes[3] & 0xFFL) << 24) - | ((bytes[4] & 0xFFL) << 32) - | ((bytes[5] & 0xFFL) << 40) - | ((bytes[6] & 0xFFL) << 48) - | ((bytes[7] & 0xFFL) << 56); - - lst.add(F.t(hash, node)); - } - catch (IgniteCheckedException e) { - throw new IgniteException(e); - } - } - - Collections.sort(lst, COMPARATOR); - - int primaryAndBackups; - - List<ClusterNode> res; - - if (backups == Integer.MAX_VALUE) { - primaryAndBackups = Integer.MAX_VALUE; - - res = new ArrayList<>(); - } - else { - primaryAndBackups = backups + 1; - - res = new ArrayList<>(primaryAndBackups); - } - - ClusterNode primary = lst.get(0).get2(); - - res.add(primary); - - // Select backups. - if (backups > 0) { - for (int i = 1; i < lst.size(); i++) { - IgniteBiTuple<Long, ClusterNode> next = lst.get(i); - - ClusterNode node = next.get2(); - - if (exclNeighbors) { - Collection<ClusterNode> allNeighbors = allNeighbors(neighborhoodCache, res); - - if (!allNeighbors.contains(node)) - res.add(node); - } - else { - if (!res.contains(node) && (backupFilter == null || backupFilter.apply(primary, node))) - res.add(next.get2()); - } - - if (res.size() == primaryAndBackups) - break; - } - } - - if (res.size() < primaryAndBackups && nodes.size() >= primaryAndBackups && exclNeighbors) { - // Need to iterate one more time in case if there are no nodes which pass exclude backups criteria. - for (int i = 1; i < lst.size(); i++) { - IgniteBiTuple<Long, ClusterNode> next = lst.get(i); - - ClusterNode node = next.get2(); - - if (!res.contains(node)) - res.add(next.get2()); - - if (res.size() == primaryAndBackups) - break; - } - } - - assert res.size() <= primaryAndBackups; - - return res; - } - - /** {@inheritDoc} */ - @Override public void reset() { - // No-op. - } - - /** {@inheritDoc} */ - @Override public int partitions() { - return parts; - } - - /** {@inheritDoc} */ - @Override public int partition(Object key) { - return U.safeAbs(key.hashCode() % parts); - } - - /** {@inheritDoc} */ - @Override public List<List<ClusterNode>> assignPartitions(CacheAffinityFunctionContext affCtx) { - List<List<ClusterNode>> assignments = new ArrayList<>(parts); - - Map<UUID, Collection<ClusterNode>> neighborhoodCache = exclNeighbors ? - neighbors(affCtx.currentTopologySnapshot()) : null; - - for (int i = 0; i < parts; i++) { - List<ClusterNode> partAssignment = assignPartition(i, affCtx.currentTopologySnapshot(), affCtx.backups(), - neighborhoodCache); - - assignments.add(partAssignment); - } - - return assignments; - } - - /** {@inheritDoc} */ - @Override public void removeNode(UUID nodeId) { - // No-op. - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - out.writeInt(parts); - out.writeBoolean(exclNeighbors); - out.writeObject(hashIdRslvr); - out.writeObject(backupFilter); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - parts = in.readInt(); - exclNeighbors = in.readBoolean(); - hashIdRslvr = (CacheAffinityNodeHashResolver)in.readObject(); - backupFilter = (IgniteBiPredicate<ClusterNode, ClusterNode>)in.readObject(); - } - - /** - * Builds neighborhood map for all nodes in snapshot. - * - * @param topSnapshot Topology snapshot. - * @return Neighbors map. - */ - private Map<UUID, Collection<ClusterNode>> neighbors(Collection<ClusterNode> topSnapshot) { - Map<String, Collection<ClusterNode>> macMap = new HashMap<>(topSnapshot.size(), 1.0f); - - // Group by mac addresses. - for (ClusterNode node : topSnapshot) { - String macs = node.attribute(IgniteNodeAttributes.ATTR_MACS); - - Collection<ClusterNode> nodes = macMap.get(macs); - - if (nodes == null) { - nodes = new HashSet<>(); - - macMap.put(macs, nodes); - } - - nodes.add(node); - } - - Map<UUID, Collection<ClusterNode>> neighbors = new HashMap<>(topSnapshot.size(), 1.0f); - - for (Collection<ClusterNode> group : macMap.values()) { - for (ClusterNode node : group) - neighbors.put(node.id(), group); - } - - return neighbors; - } - - /** - * @param neighborhoodCache Neighborhood cache. - * @param nodes Nodes. - * @return All neighbors for given nodes. - */ - private Collection<ClusterNode> allNeighbors(Map<UUID, Collection<ClusterNode>> neighborhoodCache, - Iterable<ClusterNode> nodes) { - Collection<ClusterNode> res = new HashSet<>(); - - for (ClusterNode node : nodes) { - if (!res.contains(node)) - res.addAll(neighborhoodCache.get(node.id())); - } - - return res; - } - - /** - * - */ - private static class HashComparator implements Comparator<IgniteBiTuple<Long, ClusterNode>>, Serializable { - /** */ - private static final long serialVersionUID = 0L; - - /** {@inheritDoc} */ - @Override public int compare(IgniteBiTuple<Long, ClusterNode> o1, IgniteBiTuple<Long, ClusterNode> o2) { - return o1.get1() < o2.get1() ? -1 : o1.get1() > o2.get1() ? 1 : - o1.get2().id().compareTo(o2.get2().id()); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4ba0b01a/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java b/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java new file mode 100644 index 0000000..2b26630 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java @@ -0,0 +1,500 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.affinity.rendezvous; + +import org.apache.ignite.*; +import org.apache.ignite.cache.affinity.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.resources.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.security.*; +import java.util.*; + +/** + * Affinity function for partitioned cache based on Highest Random Weight algorithm. + * This function supports the following configuration: + * <ul> + * <li> + * {@code partitions} - Number of partitions to spread across nodes. + * </li> + * <li> + * {@code excludeNeighbors} - If set to {@code true}, will exclude same-host-neighbors + * from being backups of each other. Note that {@code backupFilter} is ignored if + * {@code excludeNeighbors} is set to {@code true}. + * </li> + * <li> + * {@code backupFilter} - Optional filter for back up nodes. If provided, then only + * nodes that pass this filter will be selected as backup nodes. If not provided, then + * primary and backup nodes will be selected out of all nodes available for this cache. + * </li> + * </ul> + * <p> + * Cache affinity can be configured for individual caches via {@link org.apache.ignite.configuration.CacheConfiguration#getAffinity()} method. + */ +public class RendezvousAffinityFunction implements AffinityFunction, Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** Default number of partitions. */ + public static final int DFLT_PARTITION_COUNT = 1024; + + /** Comparator. */ + private static final Comparator<IgniteBiTuple<Long, ClusterNode>> COMPARATOR = + new HashComparator(); + + /** Thread local message digest. */ + private ThreadLocal<MessageDigest> digest = new ThreadLocal<MessageDigest>() { + @Override protected MessageDigest initialValue() { + try { + return MessageDigest.getInstance("MD5"); + } + catch (NoSuchAlgorithmException e) { + assert false : "Should have failed in constructor"; + + throw new IgniteException("Failed to obtain message digest (digest was available in constructor)", + e); + } + } + }; + + /** Number of partitions. */ + private int parts; + + /** Exclude neighbors flag. */ + private boolean exclNeighbors; + + /** Optional backup filter. First node is primary, second node is a node being tested. */ + private IgniteBiPredicate<ClusterNode, ClusterNode> backupFilter; + + /** Hash ID resolver. */ + private AffinityNodeHashResolver hashIdRslvr = new AffinityNodeAddressHashResolver(); + + /** Ignite instance. */ + @IgniteInstanceResource + private Ignite ignite; + + /** + * Empty constructor with all defaults. + */ + public RendezvousAffinityFunction() { + this(false); + } + + /** + * Initializes affinity with flag to exclude same-host-neighbors from being backups of each other + * and specified number of backups. + * <p> + * Note that {@code excludeNeighbors} parameter is ignored if {@code #getBackupFilter()} is set. + * + * @param exclNeighbors {@code True} if nodes residing on the same host may not act as backups + * of each other. + */ + public RendezvousAffinityFunction(boolean exclNeighbors) { + this(exclNeighbors, DFLT_PARTITION_COUNT); + } + + /** + * Initializes affinity with flag to exclude same-host-neighbors from being backups of each other, + * and specified number of backups and partitions. + * <p> + * Note that {@code excludeNeighbors} parameter is ignored if {@code #getBackupFilter()} is set. + * + * @param exclNeighbors {@code True} if nodes residing on the same host may not act as backups + * of each other. + * @param parts Total number of partitions. + */ + public RendezvousAffinityFunction(boolean exclNeighbors, int parts) { + this(exclNeighbors, parts, null); + } + + /** + * Initializes optional counts for replicas and backups. + * <p> + * Note that {@code excludeNeighbors} parameter is ignored if {@code backupFilter} is set. + * + * @param parts Total number of partitions. + * @param backupFilter Optional back up filter for nodes. If provided, backups will be selected + * from all nodes that pass this filter. First argument for this filter is primary node, and second + * argument is node being tested. + * <p> + * Note that {@code excludeNeighbors} parameter is ignored if {@code backupFilter} is set. + */ + public RendezvousAffinityFunction(int parts, @Nullable IgniteBiPredicate<ClusterNode, ClusterNode> backupFilter) { + this(false, parts, backupFilter); + } + + /** + * Private constructor. + * + * @param exclNeighbors Exclude neighbors flag. + * @param parts Partitions count. + * @param backupFilter Backup filter. + */ + private RendezvousAffinityFunction(boolean exclNeighbors, int parts, + IgniteBiPredicate<ClusterNode, ClusterNode> backupFilter) { + A.ensure(parts != 0, "parts != 0"); + + this.exclNeighbors = exclNeighbors; + this.parts = parts; + this.backupFilter = backupFilter; + + try { + MessageDigest.getInstance("MD5"); + } + catch (NoSuchAlgorithmException e) { + throw new IgniteException("Failed to obtain MD5 message digest instance.", e); + } + } + + /** + * Gets total number of key partitions. To ensure that all partitions are + * equally distributed across all nodes, please make sure that this + * number is significantly larger than a number of nodes. Also, partition + * size should be relatively small. Try to avoid having partitions with more + * than quarter million keys. + * <p> + * Note that for fully replicated caches this method should always + * return {@code 1}. + * + * @return Total partition count. + */ + public int getPartitions() { + return parts; + } + + /** + * Sets total number of partitions. + * + * @param parts Total number of partitions. + */ + public void setPartitions(int parts) { + this.parts = parts; + } + + /** + * Gets hash ID resolver for nodes. This resolver is used to provide + * alternate hash ID, other than node ID. + * <p> + * Node IDs constantly change when nodes get restarted, which causes them to + * be placed on different locations in the hash ring, and hence causing + * repartitioning. Providing an alternate hash ID, which survives node restarts, + * puts node on the same location on the hash ring, hence minimizing required + * repartitioning. + * + * @return Hash ID resolver. + */ + public AffinityNodeHashResolver getHashIdResolver() { + return hashIdRslvr; + } + + /** + * Sets hash ID resolver for nodes. This resolver is used to provide + * alternate hash ID, other than node ID. + * <p> + * Node IDs constantly change when nodes get restarted, which causes them to + * be placed on different locations in the hash ring, and hence causing + * repartitioning. Providing an alternate hash ID, which survives node restarts, + * puts node on the same location on the hash ring, hence minimizing required + * repartitioning. + * + * @param hashIdRslvr Hash ID resolver. + */ + public void setHashIdResolver(AffinityNodeHashResolver hashIdRslvr) { + this.hashIdRslvr = hashIdRslvr; + } + + /** + * Gets optional backup filter. If not {@code null}, backups will be selected + * from all nodes that pass this filter. First node passed to this filter is primary node, + * and second node is a node being tested. + * <p> + * Note that {@code excludeNeighbors} parameter is ignored if {@code backupFilter} is set. + * + * @return Optional backup filter. + */ + @Nullable public IgniteBiPredicate<ClusterNode, ClusterNode> getBackupFilter() { + return backupFilter; + } + + /** + * Sets optional backup filter. If provided, then backups will be selected from all + * nodes that pass this filter. First node being passed to this filter is primary node, + * and second node is a node being tested. + * <p> + * Note that {@code excludeNeighbors} parameter is ignored if {@code backupFilter} is set. + * + * @param backupFilter Optional backup filter. + */ + public void setBackupFilter(@Nullable IgniteBiPredicate<ClusterNode, ClusterNode> backupFilter) { + this.backupFilter = backupFilter; + } + + /** + * Checks flag to exclude same-host-neighbors from being backups of each other (default is {@code false}). + * <p> + * Note that {@code excludeNeighbors} parameter is ignored if {@code #getBackupFilter()} is set. + * + * @return {@code True} if nodes residing on the same host may not act as backups of each other. + */ + public boolean isExcludeNeighbors() { + return exclNeighbors; + } + + /** + * Sets flag to exclude same-host-neighbors from being backups of each other (default is {@code false}). + * <p> + * Note that {@code excludeNeighbors} parameter is ignored if {@code #getBackupFilter()} is set. + * + * @param exclNeighbors {@code True} if nodes residing on the same host may not act as backups of each other. + */ + public void setExcludeNeighbors(boolean exclNeighbors) { + this.exclNeighbors = exclNeighbors; + } + + /** + * Returns collection of nodes (primary first) for specified partition. + */ + public List<ClusterNode> assignPartition(int part, List<ClusterNode> nodes, int backups, + @Nullable Map<UUID, Collection<ClusterNode>> neighborhoodCache) { + if (nodes.size() <= 1) + return nodes; + + List<IgniteBiTuple<Long, ClusterNode>> lst = new ArrayList<>(); + + MessageDigest d = digest.get(); + + for (ClusterNode node : nodes) { + Object nodeHash = hashIdRslvr.resolve(node); + + try { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + + byte[] nodeHashBytes = ignite.configuration().getMarshaller().marshal(nodeHash); + + out.write(U.intToBytes(part), 0, 4); // Avoid IOException. + out.write(nodeHashBytes, 0, nodeHashBytes.length); // Avoid IOException. + + d.reset(); + + byte[] bytes = d.digest(out.toByteArray()); + + long hash = + (bytes[0] & 0xFFL) + | ((bytes[1] & 0xFFL) << 8) + | ((bytes[2] & 0xFFL) << 16) + | ((bytes[3] & 0xFFL) << 24) + | ((bytes[4] & 0xFFL) << 32) + | ((bytes[5] & 0xFFL) << 40) + | ((bytes[6] & 0xFFL) << 48) + | ((bytes[7] & 0xFFL) << 56); + + lst.add(F.t(hash, node)); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + } + + Collections.sort(lst, COMPARATOR); + + int primaryAndBackups; + + List<ClusterNode> res; + + if (backups == Integer.MAX_VALUE) { + primaryAndBackups = Integer.MAX_VALUE; + + res = new ArrayList<>(); + } + else { + primaryAndBackups = backups + 1; + + res = new ArrayList<>(primaryAndBackups); + } + + ClusterNode primary = lst.get(0).get2(); + + res.add(primary); + + // Select backups. + if (backups > 0) { + for (int i = 1; i < lst.size(); i++) { + IgniteBiTuple<Long, ClusterNode> next = lst.get(i); + + ClusterNode node = next.get2(); + + if (exclNeighbors) { + Collection<ClusterNode> allNeighbors = allNeighbors(neighborhoodCache, res); + + if (!allNeighbors.contains(node)) + res.add(node); + } + else { + if (!res.contains(node) && (backupFilter == null || backupFilter.apply(primary, node))) + res.add(next.get2()); + } + + if (res.size() == primaryAndBackups) + break; + } + } + + if (res.size() < primaryAndBackups && nodes.size() >= primaryAndBackups && exclNeighbors) { + // Need to iterate one more time in case if there are no nodes which pass exclude backups criteria. + for (int i = 1; i < lst.size(); i++) { + IgniteBiTuple<Long, ClusterNode> next = lst.get(i); + + ClusterNode node = next.get2(); + + if (!res.contains(node)) + res.add(next.get2()); + + if (res.size() == primaryAndBackups) + break; + } + } + + assert res.size() <= primaryAndBackups; + + return res; + } + + /** {@inheritDoc} */ + @Override public void reset() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public int partitions() { + return parts; + } + + /** {@inheritDoc} */ + @Override public int partition(Object key) { + return U.safeAbs(key.hashCode() % parts); + } + + /** {@inheritDoc} */ + @Override public List<List<ClusterNode>> assignPartitions(AffinityFunctionContext affCtx) { + List<List<ClusterNode>> assignments = new ArrayList<>(parts); + + Map<UUID, Collection<ClusterNode>> neighborhoodCache = exclNeighbors ? + neighbors(affCtx.currentTopologySnapshot()) : null; + + for (int i = 0; i < parts; i++) { + List<ClusterNode> partAssignment = assignPartition(i, affCtx.currentTopologySnapshot(), affCtx.backups(), + neighborhoodCache); + + assignments.add(partAssignment); + } + + return assignments; + } + + /** {@inheritDoc} */ + @Override public void removeNode(UUID nodeId) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeInt(parts); + out.writeBoolean(exclNeighbors); + out.writeObject(hashIdRslvr); + out.writeObject(backupFilter); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + parts = in.readInt(); + exclNeighbors = in.readBoolean(); + hashIdRslvr = (AffinityNodeHashResolver)in.readObject(); + backupFilter = (IgniteBiPredicate<ClusterNode, ClusterNode>)in.readObject(); + } + + /** + * Builds neighborhood map for all nodes in snapshot. + * + * @param topSnapshot Topology snapshot. + * @return Neighbors map. + */ + private Map<UUID, Collection<ClusterNode>> neighbors(Collection<ClusterNode> topSnapshot) { + Map<String, Collection<ClusterNode>> macMap = new HashMap<>(topSnapshot.size(), 1.0f); + + // Group by mac addresses. + for (ClusterNode node : topSnapshot) { + String macs = node.attribute(IgniteNodeAttributes.ATTR_MACS); + + Collection<ClusterNode> nodes = macMap.get(macs); + + if (nodes == null) { + nodes = new HashSet<>(); + + macMap.put(macs, nodes); + } + + nodes.add(node); + } + + Map<UUID, Collection<ClusterNode>> neighbors = new HashMap<>(topSnapshot.size(), 1.0f); + + for (Collection<ClusterNode> group : macMap.values()) { + for (ClusterNode node : group) + neighbors.put(node.id(), group); + } + + return neighbors; + } + + /** + * @param neighborhoodCache Neighborhood cache. + * @param nodes Nodes. + * @return All neighbors for given nodes. + */ + private Collection<ClusterNode> allNeighbors(Map<UUID, Collection<ClusterNode>> neighborhoodCache, + Iterable<ClusterNode> nodes) { + Collection<ClusterNode> res = new HashSet<>(); + + for (ClusterNode node : nodes) { + if (!res.contains(node)) + res.addAll(neighborhoodCache.get(node.id())); + } + + return res; + } + + /** + * + */ + private static class HashComparator implements Comparator<IgniteBiTuple<Long, ClusterNode>>, Serializable { + /** */ + private static final long serialVersionUID = 0L; + + /** {@inheritDoc} */ + @Override public int compare(IgniteBiTuple<Long, ClusterNode> o1, IgniteBiTuple<Long, ClusterNode> o2) { + return o1.get1() < o2.get1() ? -1 : o1.get1() > o2.get1() ? 1 : + o1.get2().id().compareTo(o2.get2().id()); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4ba0b01a/modules/core/src/main/java/org/apache/ignite/cache/eviction/CacheEvictableEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/eviction/CacheEvictableEntry.java b/modules/core/src/main/java/org/apache/ignite/cache/eviction/CacheEvictableEntry.java deleted file mode 100644 index 568db12..0000000 --- a/modules/core/src/main/java/org/apache/ignite/cache/eviction/CacheEvictableEntry.java +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.cache.eviction; - -import org.jetbrains.annotations.*; - -import javax.cache.*; - -/** - * Evictable cache entry passed into {@link CacheEvictionPolicy}. - * - * @author @java.author - * @version @java.version - */ -public interface CacheEvictableEntry<K, V> extends Cache.Entry<K, V> { - /** - * Evicts entry associated with given key from cache. Note, that entry will be evicted - * only if it's not used (not participating in any locks or transactions). - * - * @return {@code True} if entry could be evicted, {@code false} otherwise. - */ - public boolean evict(); - - /** - * Checks whether entry is currently present in cache or not. If entry is not in - * cache (e.g. has been removed) {@code false} is returned. In this case all - * operations on this entry will cause creation of a new entry in cache. - * - * @return {@code True} if entry is in cache, {@code false} otherwise. - */ - public boolean isCached(); - - /** - * Gets metadata added by eviction policy. - * - * @return Metadata value or {@code null}. - */ - @Nullable public <T> T meta(); - - /** - * Adds a new metadata. - * - * @param val Metadata value. - * @return Metadata previously added, or - * {@code null} if there was none. - */ - @Nullable public <T> T addMeta(T val); - - /** - * Adds given metadata value only if it was absent. - * - * @param val Value to add if it's not attached already. - * @return {@code null} if new value was put, or current value if put didn't happen. - */ - @Nullable public <T> T putMetaIfAbsent(T val); - - /** - * Replaces given metadata with new {@code newVal} value only if its current value - * is equal to {@code curVal}. Otherwise, it is no-op. - * - * @param curVal Current value to check. - * @param newVal New value. - * @return {@code true} if replacement occurred, {@code false} otherwise. - */ - public <T> boolean replaceMeta(T curVal, T newVal); - - /** - * Removes metadata by name. - * - * @return Value of removed metadata or {@code null}. - */ - @Nullable public <T> T removeMeta(); - - /** - * Removes metadata only if its current value is equal to {@code val} passed in. - * - * @param val Value to compare. - * @return {@code True} if value was removed, {@code false} otherwise. - */ - public <T> boolean removeMeta(T val); -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4ba0b01a/modules/core/src/main/java/org/apache/ignite/cache/eviction/CacheEvictionFilter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/eviction/CacheEvictionFilter.java b/modules/core/src/main/java/org/apache/ignite/cache/eviction/CacheEvictionFilter.java deleted file mode 100644 index 10f63ee..0000000 --- a/modules/core/src/main/java/org/apache/ignite/cache/eviction/CacheEvictionFilter.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.cache.eviction; - -import org.apache.ignite.configuration.CacheConfiguration; - -import javax.cache.*; -import java.io.*; - -/** - * Eviction filter to specify which entries should not be evicted. Not applicable when - * calling explicit evict via {@link CacheEvictableEntry#evict()}. - * If {@link #evictAllowed(Cache.Entry)} method returns {@code false} then eviction - * policy will not be notified and entry will never be evicted. - * <p> - * Eviction filter can be configured via {@link CacheConfiguration#getEvictionFilter()} - * configuration property. Default value is {@code null} which means that all - * cache entries will be tracked by eviction policy. - */ -public interface CacheEvictionFilter<K, V> extends Serializable { - /** - * Checks if entry may be evicted from cache. - * - * @param entry Cache entry. - * @return {@code True} if it is allowed to evict this entry. - */ - public boolean evictAllowed(Cache.Entry<K, V> entry); -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4ba0b01a/modules/core/src/main/java/org/apache/ignite/cache/eviction/CacheEvictionPolicy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/eviction/CacheEvictionPolicy.java b/modules/core/src/main/java/org/apache/ignite/cache/eviction/CacheEvictionPolicy.java deleted file mode 100644 index b5f341a..0000000 --- a/modules/core/src/main/java/org/apache/ignite/cache/eviction/CacheEvictionPolicy.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.cache.eviction; - -/** - * Pluggable cache eviction policy. Usually, implementations will internally order - * cache entries based on {@link #onEntryAccessed(boolean, CacheEvictableEntry)} notifications and - * whenever an element needs to be evicted, {@link CacheEvictableEntry#evict()} - * method should be called. - * <p> - * Ignite comes with following eviction policies out-of-the-box: - * <ul> - * <li>{@link org.apache.ignite.cache.eviction.lru.CacheLruEvictionPolicy}</li> - * <li>{@link org.apache.ignite.cache.eviction.random.CacheRandomEvictionPolicy}</li> - * <li>{@link org.apache.ignite.cache.eviction.fifo.CacheFifoEvictionPolicy}</li> - * </ul> - * <p> - * The eviction policy thread-safety is ensured by Ignition. Implementations of this interface should - * not worry about concurrency and should be implemented as they were only accessed from one thread. - * <p> - * Note that implementations of all eviction policies provided by Ignite are very - * light weight in a way that they are all lock-free (or very close to it), and do not - * create any internal tables, arrays, or other expensive structures. - * The eviction order is preserved by attaching light-weight meta-data to existing - * cache entries. - */ -public interface CacheEvictionPolicy<K, V> { - /** - * Callback for whenever entry is accessed. - * - * @param rmv {@code True} if entry has been removed, {@code false} otherwise. - * @param entry Accessed entry. - */ - public void onEntryAccessed(boolean rmv, CacheEvictableEntry<K, V> entry); -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4ba0b01a/modules/core/src/main/java/org/apache/ignite/cache/eviction/EvictableEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/eviction/EvictableEntry.java b/modules/core/src/main/java/org/apache/ignite/cache/eviction/EvictableEntry.java new file mode 100644 index 0000000..d87109f --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/cache/eviction/EvictableEntry.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.eviction; + +import org.jetbrains.annotations.*; + +import javax.cache.*; + +/** + * Evictable cache entry passed into {@link EvictionPolicy}. + * + * @author @java.author + * @version @java.version + */ +public interface EvictableEntry<K, V> extends Cache.Entry<K, V> { + /** + * Evicts entry associated with given key from cache. Note, that entry will be evicted + * only if it's not used (not participating in any locks or transactions). + * + * @return {@code True} if entry could be evicted, {@code false} otherwise. + */ + public boolean evict(); + + /** + * Checks whether entry is currently present in cache or not. If entry is not in + * cache (e.g. has been removed) {@code false} is returned. In this case all + * operations on this entry will cause creation of a new entry in cache. + * + * @return {@code True} if entry is in cache, {@code false} otherwise. + */ + public boolean isCached(); + + /** + * Gets metadata added by eviction policy. + * + * @return Metadata value or {@code null}. + */ + @Nullable public <T> T meta(); + + /** + * Adds a new metadata. + * + * @param val Metadata value. + * @return Metadata previously added, or + * {@code null} if there was none. + */ + @Nullable public <T> T addMeta(T val); + + /** + * Adds given metadata value only if it was absent. + * + * @param val Value to add if it's not attached already. + * @return {@code null} if new value was put, or current value if put didn't happen. + */ + @Nullable public <T> T putMetaIfAbsent(T val); + + /** + * Replaces given metadata with new {@code newVal} value only if its current value + * is equal to {@code curVal}. Otherwise, it is no-op. + * + * @param curVal Current value to check. + * @param newVal New value. + * @return {@code true} if replacement occurred, {@code false} otherwise. + */ + public <T> boolean replaceMeta(T curVal, T newVal); + + /** + * Removes metadata by name. + * + * @return Value of removed metadata or {@code null}. + */ + @Nullable public <T> T removeMeta(); + + /** + * Removes metadata only if its current value is equal to {@code val} passed in. + * + * @param val Value to compare. + * @return {@code True} if value was removed, {@code false} otherwise. + */ + public <T> boolean removeMeta(T val); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4ba0b01a/modules/core/src/main/java/org/apache/ignite/cache/eviction/EvictionFilter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/eviction/EvictionFilter.java b/modules/core/src/main/java/org/apache/ignite/cache/eviction/EvictionFilter.java new file mode 100644 index 0000000..2b6ead1 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/cache/eviction/EvictionFilter.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.eviction; + +import org.apache.ignite.configuration.CacheConfiguration; + +import javax.cache.*; +import java.io.*; + +/** + * Eviction filter to specify which entries should not be evicted. Not applicable when + * calling explicit evict via {@link EvictableEntry#evict()}. + * If {@link #evictAllowed(Cache.Entry)} method returns {@code false} then eviction + * policy will not be notified and entry will never be evicted. + * <p> + * Eviction filter can be configured via {@link CacheConfiguration#getEvictionFilter()} + * configuration property. Default value is {@code null} which means that all + * cache entries will be tracked by eviction policy. + */ +public interface EvictionFilter<K, V> extends Serializable { + /** + * Checks if entry may be evicted from cache. + * + * @param entry Cache entry. + * @return {@code True} if it is allowed to evict this entry. + */ + public boolean evictAllowed(Cache.Entry<K, V> entry); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4ba0b01a/modules/core/src/main/java/org/apache/ignite/cache/eviction/EvictionPolicy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/eviction/EvictionPolicy.java b/modules/core/src/main/java/org/apache/ignite/cache/eviction/EvictionPolicy.java new file mode 100644 index 0000000..f409e9b --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/cache/eviction/EvictionPolicy.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.eviction; + +import org.apache.ignite.cache.eviction.fifo.*; +import org.apache.ignite.cache.eviction.lru.*; +import org.apache.ignite.cache.eviction.random.*; + +/** + * Pluggable cache eviction policy. Usually, implementations will internally order + * cache entries based on {@link #onEntryAccessed(boolean, EvictableEntry)} notifications and + * whenever an element needs to be evicted, {@link EvictableEntry#evict()} + * method should be called. + * <p> + * Ignite comes with following eviction policies out-of-the-box: + * <ul> + * <li>{@link LruEvictionPolicy}</li> + * <li>{@link RandomEvictionPolicy}</li> + * <li>{@link FifoEvictionPolicy}</li> + * </ul> + * <p> + * The eviction policy thread-safety is ensured by Ignition. Implementations of this interface should + * not worry about concurrency and should be implemented as they were only accessed from one thread. + * <p> + * Note that implementations of all eviction policies provided by Ignite are very + * light weight in a way that they are all lock-free (or very close to it), and do not + * create any internal tables, arrays, or other expensive structures. + * The eviction order is preserved by attaching light-weight meta-data to existing + * cache entries. + */ +public interface EvictionPolicy<K, V> { + /** + * Callback for whenever entry is accessed. + * + * @param rmv {@code True} if entry has been removed, {@code false} otherwise. + * @param entry Accessed entry. + */ + public void onEntryAccessed(boolean rmv, EvictableEntry<K, V> entry); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4ba0b01a/modules/core/src/main/java/org/apache/ignite/cache/eviction/fifo/CacheFifoEvictionPolicy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/eviction/fifo/CacheFifoEvictionPolicy.java b/modules/core/src/main/java/org/apache/ignite/cache/eviction/fifo/CacheFifoEvictionPolicy.java deleted file mode 100644 index 5fa5d82..0000000 --- a/modules/core/src/main/java/org/apache/ignite/cache/eviction/fifo/CacheFifoEvictionPolicy.java +++ /dev/null @@ -1,192 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.cache.eviction.fifo; - -import org.apache.ignite.cache.eviction.*; -import org.apache.ignite.configuration.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.jsr166.*; -import org.jsr166.ConcurrentLinkedDeque8.*; - -import java.io.*; -import java.util.*; - -/** - * Eviction policy based on {@code First In First Out (FIFO)} algorithm. This - * implementation is very efficient since it does not create any additional - * table-like data structures. The {@code FIFO} ordering information is - * maintained by attaching ordering metadata to cache entries. - */ -public class CacheFifoEvictionPolicy<K, V> implements CacheEvictionPolicy<K, V>, - CacheFifoEvictionPolicyMBean, Externalizable { - /** */ - private static final long serialVersionUID = 0L; - - /** Maximum size. */ - private volatile int max = CacheConfiguration.DFLT_CACHE_SIZE; - - /** FIFO queue. */ - private final ConcurrentLinkedDeque8<CacheEvictableEntry<K, V>> queue = - new ConcurrentLinkedDeque8<>(); - - /** - * Constructs FIFO eviction policy with all defaults. - */ - public CacheFifoEvictionPolicy() { - // No-op. - } - - /** - * Constructs FIFO eviction policy with maximum size. Empty entries are allowed. - * - * @param max Maximum allowed size of cache before entry will start getting evicted. - */ - public CacheFifoEvictionPolicy(int max) { - A.ensure(max > 0, "max > 0"); - - this.max = max; - } - - /** - * Gets maximum allowed size of cache before entry will start getting evicted. - * - * @return Maximum allowed size of cache before entry will start getting evicted. - */ - @Override public int getMaxSize() { - return max; - } - - /** - * Sets maximum allowed size of cache before entry will start getting evicted. - * - * @param max Maximum allowed size of cache before entry will start getting evicted. - */ - @Override public void setMaxSize(int max) { - A.ensure(max > 0, "max > 0"); - - this.max = max; - } - - /** {@inheritDoc} */ - @Override public int getCurrentSize() { - return queue.size(); - } - - /** - * Gets read-only view on internal {@code FIFO} queue in proper order. - * - * @return Read-only view ono internal {@code 'FIFO'} queue. - */ - public Collection<CacheEvictableEntry<K, V>> queue() { - return Collections.unmodifiableCollection(queue); - } - - /** {@inheritDoc} */ - @Override public void onEntryAccessed(boolean rmv, CacheEvictableEntry<K, V> entry) { - if (!rmv) { - if (!entry.isCached()) - return; - - // Shrink only if queue was changed. - if (touch(entry)) - shrink(); - } - else { - Node<CacheEvictableEntry<K, V>> node = entry.removeMeta(); - - if (node != null) - queue.unlinkx(node); - } - } - - /** - * @param entry Entry to touch. - * @return {@code True} if queue has been changed by this call. - */ - private boolean touch(CacheEvictableEntry<K, V> entry) { - Node<CacheEvictableEntry<K, V>> node = entry.meta(); - - // Entry has not been enqueued yet. - if (node == null) { - while (true) { - node = queue.offerLastx(entry); - - if (entry.putMetaIfAbsent(node) != null) { - // Was concurrently added, need to clear it from queue. - queue.unlinkx(node); - - // Queue has not been changed. - return false; - } - else if (node.item() != null) { - if (!entry.isCached()) { - // Was concurrently evicted, need to clear it from queue. - queue.unlinkx(node); - - return false; - } - - return true; - } - // If node was unlinked by concurrent shrink() call, we must repeat the whole cycle. - else if (!entry.removeMeta(node)) - return false; - } - } - - // Entry is already in queue. - return false; - } - - /** - * Shrinks FIFO queue to maximum allowed size. - */ - private void shrink() { - int max = this.max; - - int startSize = queue.sizex(); - - for (int i = 0; i < startSize && queue.sizex() > max; i++) { - CacheEvictableEntry<K, V> entry = queue.poll(); - - if (entry == null) - break; - - if (!entry.evict()) { - entry.removeMeta(); - - touch(entry); - } - } - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - out.writeInt(max); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - max = in.readInt(); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(CacheFifoEvictionPolicy.class, this); - } -}