http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25609651/modules/core/src/main/java/org/gridgain/grid/cache/affinity/consistenthash/GridCacheConsistentHashAffinityFunction.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/cache/affinity/consistenthash/GridCacheConsistentHashAffinityFunction.java b/modules/core/src/main/java/org/gridgain/grid/cache/affinity/consistenthash/GridCacheConsistentHashAffinityFunction.java deleted file mode 100644 index b62160a..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/cache/affinity/consistenthash/GridCacheConsistentHashAffinityFunction.java +++ /dev/null @@ -1,703 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.cache.affinity.consistenthash; - -import org.apache.ignite.*; -import org.apache.ignite.cluster.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.resources.*; -import org.gridgain.grid.*; -import org.gridgain.grid.cache.*; -import org.gridgain.grid.cache.affinity.*; -import org.gridgain.grid.util.*; -import org.gridgain.grid.util.tostring.*; -import org.gridgain.grid.util.typedef.*; -import org.gridgain.grid.util.typedef.internal.*; -import org.jdk8.backport.*; -import org.jetbrains.annotations.*; - -import java.util.*; -import java.util.concurrent.*; -import java.util.concurrent.atomic.*; - -/** - * Affinity function for partitioned cache. This function supports the following - * configuration: - * <ul> - * <li> - * {@code backups} - Use this flag to control how many back up nodes will be - * assigned to every key. The default value is {@code 0}. - * </li> - * <li> - * {@code replicas} - Generally the more replicas a node gets, the more key assignments - * it will receive. You can configure different number of replicas for a node by - * setting user attribute with name {@link #getReplicaCountAttributeName()} to some - * number. Default value is {@code 512} defined by {@link #DFLT_REPLICA_COUNT} constant. - * </li> - * <li> - * {@code backupFilter} - Optional filter for back up nodes. If provided, then only - * nodes that pass this filter will be selected as backup nodes. If not provided, then - * primary and backup nodes will be selected out of all nodes available for this cache. - * </li> - * </ul> - * <p> - * Cache affinity can be configured for individual caches via {@link GridCacheConfiguration#getAffinity()} method. - */ -public class GridCacheConsistentHashAffinityFunction implements GridCacheAffinityFunction { - /** */ - private static final long serialVersionUID = 0L; - - /** Flag to enable/disable consistency check (for internal use only). */ - private static final boolean AFFINITY_CONSISTENCY_CHECK = Boolean.getBoolean("GRIDGAIN_AFFINITY_CONSISTENCY_CHECK"); - - /** Default number of partitions. */ - public static final int DFLT_PARTITION_COUNT = 10000; - - /** Default replica count for partitioned caches. */ - public static final int DFLT_REPLICA_COUNT = 128; - - /** - * Name of node attribute to specify number of replicas for a node. - * Default value is {@code gg:affinity:node:replicas}. - */ - public static final String DFLT_REPLICA_COUNT_ATTR_NAME = "gg:affinity:node:replicas"; - - /** Node hash. */ - private transient GridConsistentHash<NodeInfo> nodeHash; - - /** Total number of partitions. */ - private int parts = DFLT_PARTITION_COUNT; - - /** */ - private int replicas = DFLT_REPLICA_COUNT; - - /** */ - private String attrName = DFLT_REPLICA_COUNT_ATTR_NAME; - - /** */ - private boolean exclNeighbors; - - /** - * Optional backup filter. First node passed to this filter is primary node, - * and second node is a node being tested. - */ - private IgniteBiPredicate<ClusterNode, ClusterNode> backupFilter; - - /** */ - private GridCacheAffinityNodeHashResolver hashIdRslvr = new GridCacheAffinityNodeAddressHashResolver(); - - /** Injected grid. */ - @IgniteInstanceResource - private Ignite ignite; - - /** Injected cache name. */ - @IgniteCacheNameResource - private String cacheName; - - /** Injected logger. */ - @IgniteLoggerResource - private IgniteLogger log; - - /** Initialization flag. */ - @SuppressWarnings("TransientFieldNotInitialized") - private transient AtomicBoolean init = new AtomicBoolean(); - - /** Latch for initializing. */ - @SuppressWarnings({"TransientFieldNotInitialized"}) - private transient CountDownLatch initLatch = new CountDownLatch(1); - - /** Nodes IDs. */ - @GridToStringInclude - @SuppressWarnings({"TransientFieldNotInitialized"}) - private transient ConcurrentMap<UUID, NodeInfo> addedNodes = new ConcurrentHashMap<>(); - - /** Optional backup filter. */ - @GridToStringExclude - private final IgniteBiPredicate<NodeInfo, NodeInfo> backupIdFilter = new IgniteBiPredicate<NodeInfo, NodeInfo>() { - @Override public boolean apply(NodeInfo primaryNodeInfo, NodeInfo nodeInfo) { - return backupFilter == null || backupFilter.apply(primaryNodeInfo.node(), nodeInfo.node()); - } - }; - - /** Map of neighbors. */ - @SuppressWarnings("TransientFieldNotInitialized") - private transient ConcurrentMap<UUID, Collection<UUID>> neighbors = - new ConcurrentHashMap8<>(); - - /** - * Empty constructor with all defaults. - */ - public GridCacheConsistentHashAffinityFunction() { - // No-op. - } - - /** - * Initializes affinity with flag to exclude same-host-neighbors from being backups of each other - * and specified number of backups. - * <p> - * Note that {@code excludeNeighbors} parameter is ignored if {@code #getBackupFilter()} is set. - * - * @param exclNeighbors {@code True} if nodes residing on the same host may not act as backups - * of each other. - */ - public GridCacheConsistentHashAffinityFunction(boolean exclNeighbors) { - this.exclNeighbors = exclNeighbors; - } - - /** - * Initializes affinity with flag to exclude same-host-neighbors from being backups of each other, - * and specified number of backups and partitions. - * <p> - * Note that {@code excludeNeighbors} parameter is ignored if {@code #getBackupFilter()} is set. - * - * @param exclNeighbors {@code True} if nodes residing on the same host may not act as backups - * of each other. - * @param parts Total number of partitions. - */ - public GridCacheConsistentHashAffinityFunction(boolean exclNeighbors, int parts) { - A.ensure(parts != 0, "parts != 0"); - - this.exclNeighbors = exclNeighbors; - this.parts = parts; - } - - /** - * Initializes optional counts for replicas and backups. - * <p> - * Note that {@code excludeNeighbors} parameter is ignored if {@code backupFilter} is set. - * - * @param parts Total number of partitions. - * @param backupFilter Optional back up filter for nodes. If provided, backups will be selected - * from all nodes that pass this filter. First argument for this filter is primary node, and second - * argument is node being tested. - * <p> - * Note that {@code excludeNeighbors} parameter is ignored if {@code backupFilter} is set. - */ - public GridCacheConsistentHashAffinityFunction(int parts, - @Nullable IgniteBiPredicate<ClusterNode, ClusterNode> backupFilter) { - A.ensure(parts != 0, "parts != 0"); - - this.parts = parts; - this.backupFilter = backupFilter; - } - - /** - * Gets default count of virtual replicas in consistent hash ring. - * <p> - * To determine node replicas, node attribute with {@link #getReplicaCountAttributeName()} - * name will be checked first. If it is absent, then this value will be used. - * - * @return Count of virtual replicas in consistent hash ring. - */ - public int getDefaultReplicas() { - return replicas; - } - - /** - * Sets default count of virtual replicas in consistent hash ring. - * <p> - * To determine node replicas, node attribute with {@link #getReplicaCountAttributeName} name - * will be checked first. If it is absent, then this value will be used. - * - * @param replicas Count of virtual replicas in consistent hash ring.s - */ - public void setDefaultReplicas(int replicas) { - this.replicas = replicas; - } - - /** - * Gets total number of key partitions. To ensure that all partitions are - * equally distributed across all nodes, please make sure that this - * number is significantly larger than a number of nodes. Also, partition - * size should be relatively small. Try to avoid having partitions with more - * than quarter million keys. - * <p> - * Note that for fully replicated caches this method should always - * return {@code 1}. - * - * @return Total partition count. - */ - public int getPartitions() { - return parts; - } - - /** - * Sets total number of partitions. - * - * @param parts Total number of partitions. - */ - public void setPartitions(int parts) { - this.parts = parts; - } - - /** - * Gets hash ID resolver for nodes. This resolver is used to provide - * alternate hash ID, other than node ID. - * <p> - * Node IDs constantly change when nodes get restarted, which causes them to - * be placed on different locations in the hash ring, and hence causing - * repartitioning. Providing an alternate hash ID, which survives node restarts, - * puts node on the same location on the hash ring, hence minimizing required - * repartitioning. - * - * @return Hash ID resolver. - */ - public GridCacheAffinityNodeHashResolver getHashIdResolver() { - return hashIdRslvr; - } - - /** - * Sets hash ID resolver for nodes. This resolver is used to provide - * alternate hash ID, other than node ID. - * <p> - * Node IDs constantly change when nodes get restarted, which causes them to - * be placed on different locations in the hash ring, and hence causing - * repartitioning. Providing an alternate hash ID, which survives node restarts, - * puts node on the same location on the hash ring, hence minimizing required - * repartitioning. - * - * @param hashIdRslvr Hash ID resolver. - */ - public void setHashIdResolver(GridCacheAffinityNodeHashResolver hashIdRslvr) { - this.hashIdRslvr = hashIdRslvr; - } - - /** - * Gets optional backup filter. If not {@code null}, backups will be selected - * from all nodes that pass this filter. First node passed to this filter is primary node, - * and second node is a node being tested. - * <p> - * Note that {@code excludeNeighbors} parameter is ignored if {@code backupFilter} is set. - * - * @return Optional backup filter. - */ - @Nullable public IgniteBiPredicate<ClusterNode, ClusterNode> getBackupFilter() { - return backupFilter; - } - - /** - * Sets optional backup filter. If provided, then backups will be selected from all - * nodes that pass this filter. First node being passed to this filter is primary node, - * and second node is a node being tested. - * <p> - * Note that {@code excludeNeighbors} parameter is ignored if {@code backupFilter} is set. - * - * @param backupFilter Optional backup filter. - */ - public void setBackupFilter(@Nullable IgniteBiPredicate<ClusterNode, ClusterNode> backupFilter) { - this.backupFilter = backupFilter; - } - - /** - * Gets optional attribute name for replica count. If not provided, the - * default is {@link #DFLT_REPLICA_COUNT_ATTR_NAME}. - * - * @return User attribute name for replica count for a node. - */ - public String getReplicaCountAttributeName() { - return attrName; - } - - /** - * Sets optional attribute name for replica count. If not provided, the - * default is {@link #DFLT_REPLICA_COUNT_ATTR_NAME}. - * - * @param attrName User attribute name for replica count for a node. - */ - public void setReplicaCountAttributeName(String attrName) { - this.attrName = attrName; - } - - /** - * Checks flag to exclude same-host-neighbors from being backups of each other (default is {@code false}). - * <p> - * Note that {@code excludeNeighbors} parameter is ignored if {@code #getBackupFilter()} is set. - * - * @return {@code True} if nodes residing on the same host may not act as backups of each other. - */ - public boolean isExcludeNeighbors() { - return exclNeighbors; - } - - /** - * Sets flag to exclude same-host-neighbors from being backups of each other (default is {@code false}). - * <p> - * Note that {@code excludeNeighbors} parameter is ignored if {@code #getBackupFilter()} is set. - * - * @param exclNeighbors {@code True} if nodes residing on the same host may not act as backups of each other. - */ - public void setExcludeNeighbors(boolean exclNeighbors) { - this.exclNeighbors = exclNeighbors; - } - - /** - * Gets neighbors for a node. - * - * @param node Node. - * @return Neighbors. - */ - private Collection<UUID> neighbors(final ClusterNode node) { - Collection<UUID> ns = neighbors.get(node.id()); - - if (ns == null) { - Collection<ClusterNode> nodes = ignite.cluster().forHost(node).nodes(); - - ns = F.addIfAbsent(neighbors, node.id(), new ArrayList<>(F.nodeIds(nodes))); - } - - return ns; - } - - /** {@inheritDoc} */ - @SuppressWarnings("unchecked") - @Override public List<List<ClusterNode>> assignPartitions(GridCacheAffinityFunctionContext ctx) { - List<List<ClusterNode>> res = new ArrayList<>(parts); - - Collection<ClusterNode> topSnapshot = ctx.currentTopologySnapshot(); - - for (int part = 0; part < parts; part++) { - res.add(F.isEmpty(topSnapshot) ? - Collections.<ClusterNode>emptyList() : - // Wrap affinity nodes with unmodifiable list since unmodifiable generic collection - // doesn't provide equals and hashCode implementations. - U.sealList(nodes(part, topSnapshot, ctx.backups()))); - } - - return res; - } - - /** - * Assigns nodes to one partition. - * - * @param part Partition to assign nodes for. - * @param nodes Cache topology nodes. - * @return Assigned nodes, first node is primary, others are backups. - */ - public Collection<ClusterNode> nodes(int part, Collection<ClusterNode> nodes, int backups) { - if (nodes == null) - return Collections.emptyList(); - - int nodesSize = nodes.size(); - - if (nodesSize == 0) - return Collections.emptyList(); - - if (nodesSize == 1) // Minor optimization. - return nodes; - - initialize(); - - final Map<NodeInfo, ClusterNode> lookup = new GridLeanMap<>(nodesSize); - - // Store nodes in map for fast lookup. - for (ClusterNode n : nodes) - // Add nodes into hash circle, if absent. - lookup.put(resolveNodeInfo(n), n); - - Collection<NodeInfo> selected; - - if (backupFilter != null) { - final IgnitePredicate<NodeInfo> p = new P1<NodeInfo>() { - @Override public boolean apply(NodeInfo id) { - return lookup.containsKey(id); - } - }; - - final NodeInfo primaryId = nodeHash.node(part, p); - - IgnitePredicate<NodeInfo> backupPrimaryIdFilter = new IgnitePredicate<NodeInfo>() { - @Override public boolean apply(NodeInfo node) { - return backupIdFilter.apply(primaryId, node); - } - }; - - Collection<NodeInfo> backupIds = nodeHash.nodes(part, backups, p, backupPrimaryIdFilter); - - if (F.isEmpty(backupIds) && primaryId != null) { - ClusterNode n = lookup.get(primaryId); - - assert n != null; - - return Collections.singletonList(n); - } - - selected = primaryId != null ? F.concat(false, primaryId, backupIds) : backupIds; - } - else { - if (!exclNeighbors) { - selected = nodeHash.nodes(part, backups == Integer.MAX_VALUE ? backups : backups + 1, new P1<NodeInfo>() { - @Override public boolean apply(NodeInfo id) { - return lookup.containsKey(id); - } - }); - - if (selected.size() == 1) { - NodeInfo id = F.first(selected); - - assert id != null : "Node ID cannot be null in affinity node ID collection: " + selected; - - ClusterNode n = lookup.get(id); - - assert n != null; - - return Collections.singletonList(n); - } - } - else { - int primaryAndBackups = backups + 1; - - selected = new ArrayList<>(primaryAndBackups); - - final Collection<NodeInfo> selected0 = selected; - - List<NodeInfo> ids = nodeHash.nodes(part, primaryAndBackups, new P1<NodeInfo>() { - @Override public boolean apply(NodeInfo id) { - ClusterNode n = lookup.get(id); - - if (n == null) - return false; - - Collection<UUID> neighbors = neighbors(n); - - for (NodeInfo id0 : selected0) { - ClusterNode n0 = lookup.get(id0); - - if (n0 == null) - return false; - - Collection<UUID> neighbors0 = neighbors(n0); - - if (F.containsAny(neighbors0, neighbors)) - return false; - } - - selected0.add(id); - - return true; - } - }); - - if (AFFINITY_CONSISTENCY_CHECK) - assert F.eqOrdered(ids, selected); - } - } - - Collection<ClusterNode> ret = new ArrayList<>(selected.size()); - - for (NodeInfo id : selected) { - ClusterNode n = lookup.get(id); - - assert n != null; - - ret.add(n); - } - - return ret; - } - - /** {@inheritDoc} */ - @Override public int partition(Object key) { - initialize(); - - return U.safeAbs(key.hashCode() % parts); - } - - /** {@inheritDoc} */ - @Override public int partitions() { - initialize(); - - return parts; - } - - /** {@inheritDoc} */ - @Override public void reset() { - addedNodes = new ConcurrentHashMap<>(); - neighbors = new ConcurrentHashMap8<>(); - - initLatch = new CountDownLatch(1); - - init = new AtomicBoolean(); - } - - /** {@inheritDoc} */ - @Override public void removeNode(UUID nodeId) { - NodeInfo info = addedNodes.remove(nodeId); - - if (info == null) - return; - - nodeHash.removeNode(info); - - neighbors.clear(); - } - - /** - * Resolve node info for specified node. - * Add node to hash circle if this is the first node invocation. - * - * @param n Node to get info for. - * @return Node info. - */ - private NodeInfo resolveNodeInfo(ClusterNode n) { - UUID nodeId = n.id(); - NodeInfo nodeInfo = addedNodes.get(nodeId); - - if (nodeInfo != null) - return nodeInfo; - - assert hashIdRslvr != null; - - nodeInfo = new NodeInfo(nodeId, hashIdRslvr.resolve(n), n); - - neighbors.clear(); - - nodeHash.addNode(nodeInfo, replicas(n)); - - addedNodes.put(nodeId, nodeInfo); - - return nodeInfo; - } - - /** {@inheritDoc} */ - private void initialize() { - if (!init.get() && init.compareAndSet(false, true)) { - if (log.isInfoEnabled()) - log.info("Consistent hash configuration [cacheName=" + cacheName + ", partitions=" + parts + - ", excludeNeighbors=" + exclNeighbors + ", replicas=" + replicas + - ", backupFilter=" + backupFilter + ", hashIdRslvr=" + hashIdRslvr + ']'); - - nodeHash = new GridConsistentHash<>(); - - initLatch.countDown(); - } - else { - if (initLatch.getCount() > 0) { - try { - U.await(initLatch); - } - catch (GridInterruptedException ignored) { - // Recover interrupted state flag. - Thread.currentThread().interrupt(); - } - } - } - } - - /** - * @param n Node. - * @return Replicas. - */ - private int replicas(ClusterNode n) { - Integer nodeReplicas = n.attribute(attrName); - - if (nodeReplicas == null) - nodeReplicas = replicas; - - return nodeReplicas; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridCacheConsistentHashAffinityFunction.class, this); - } - - /** - * Node hash ID. - */ - private static final class NodeInfo implements Comparable<NodeInfo> { - /** Node ID. */ - private UUID nodeId; - - /** Hash ID. */ - private Object hashId; - - /** Grid node. */ - private ClusterNode node; - - /** - * @param nodeId Node ID. - * @param hashId Hash ID. - * @param node Rich node. - */ - private NodeInfo(UUID nodeId, Object hashId, ClusterNode node) { - assert nodeId != null; - assert hashId != null; - - this.hashId = hashId; - this.nodeId = nodeId; - this.node = node; - } - - /** - * @return Node ID. - */ - public UUID nodeId() { - return nodeId; - } - - /** - * @return Hash ID. - */ - public Object hashId() { - return hashId; - } - - /** - * @return Node. - */ - public ClusterNode node() { - return node; - } - - /** {@inheritDoc} */ - @Override public int hashCode() { - return hashId.hashCode(); - } - - /** {@inheritDoc} */ - @Override public boolean equals(Object obj) { - if (!(obj instanceof NodeInfo)) - return false; - - NodeInfo that = (NodeInfo)obj; - - // If objects are equal, hash codes should be the same. - // Cannot use that.hashId.equals(hashId) due to Comparable<N> interface restrictions. - return that.nodeId.equals(nodeId) && that.hashCode() == hashCode(); - } - - /** {@inheritDoc} */ - @Override public int compareTo(NodeInfo o) { - int diff = nodeId.compareTo(o.nodeId); - - if (diff == 0) { - int h1 = hashCode(); - int h2 = o.hashCode(); - - diff = h1 == h2 ? 0 : (h1 < h2 ? -1 : 1); - } - - return diff; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(NodeInfo.class, this); - } - } -}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25609651/modules/core/src/main/java/org/gridgain/grid/cache/affinity/consistenthash/package.html ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/cache/affinity/consistenthash/package.html b/modules/core/src/main/java/org/gridgain/grid/cache/affinity/consistenthash/package.html deleted file mode 100644 index f5d5e93..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/cache/affinity/consistenthash/package.html +++ /dev/null @@ -1,24 +0,0 @@ -<!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - --> - -<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> -<html> -<body> - <!-- Package description. --> - Contains consistent hash based cache affinity for partitioned cache. -</body> -</html> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25609651/modules/core/src/main/java/org/gridgain/grid/cache/affinity/fair/GridCachePartitionFairAffinity.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/cache/affinity/fair/GridCachePartitionFairAffinity.java b/modules/core/src/main/java/org/gridgain/grid/cache/affinity/fair/GridCachePartitionFairAffinity.java deleted file mode 100644 index 46c494b..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/cache/affinity/fair/GridCachePartitionFairAffinity.java +++ /dev/null @@ -1,805 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.cache.affinity.fair; - -import org.apache.ignite.cluster.*; -import org.apache.ignite.events.*; -import org.apache.ignite.lang.*; -import org.gridgain.grid.cache.*; -import org.gridgain.grid.cache.affinity.*; -import org.gridgain.grid.util.typedef.*; -import org.gridgain.grid.util.typedef.internal.*; - -import java.io.*; -import java.util.*; - -/** - * Fair affinity function which tries to ensure that all nodes get equal number of partitions with - * minimum amount of reassignments between existing nodes. - * <p> - * Cache affinity can be configured for individual caches via {@link GridCacheConfiguration#getAffinity()} method. - */ -@GridCacheCentralizedAffinityFunction -public class GridCachePartitionFairAffinity implements GridCacheAffinityFunction { - /** Default partition count. */ - public static final int DFLT_PART_CNT = 256; - - /** */ - private static final long serialVersionUID = 0L; - - /** Ascending comparator. */ - private static final Comparator<PartitionSet> ASC_CMP = new PartitionSetComparator(false); - - /** Descending comparator. */ - private static final Comparator<PartitionSet> DESC_CMP = new PartitionSetComparator(true); - - /** */ - private int parts; - - /** - * Creates fair affinity with default partition count. - */ - public GridCachePartitionFairAffinity() { - this(DFLT_PART_CNT); - } - - /** - * @param parts Number of partitions. - */ - public GridCachePartitionFairAffinity(int parts) { - this.parts = parts; - } - - /** {@inheritDoc} */ - @Override public List<List<ClusterNode>> assignPartitions(GridCacheAffinityFunctionContext ctx) { - List<ClusterNode> topSnapshot = ctx.currentTopologySnapshot(); - - if (topSnapshot.size() == 1) { - ClusterNode primary = topSnapshot.get(0); - - List<List<ClusterNode>> assignments = new ArrayList<>(parts); - - for (int i = 0; i < parts; i++) - assignments.add(Collections.singletonList(primary)); - - return assignments; - } - - IgniteBiTuple<List<List<ClusterNode>>, Map<UUID, PartitionSet>> cp = createCopy(ctx, topSnapshot); - - List<List<ClusterNode>> assignment = cp.get1(); - - int tiers = Math.min(ctx.backups() + 1, topSnapshot.size()); - - // Per tier pending partitions. - Map<Integer, Queue<Integer>> pendingParts = new HashMap<>(); - - FullAssignmentMap fullMap = new FullAssignmentMap(tiers, assignment, topSnapshot); - - for (int tier = 0; tier < tiers; tier++) { - // Check if this is a new tier and add pending partitions. - Queue<Integer> pending = pendingParts.get(tier); - - for (int part = 0; part < parts; part++) { - if (fullMap.assignments.get(part).size() < tier + 1) { - if (pending == null) { - pending = new LinkedList<>(); - - pendingParts.put(tier, pending); - } - - if (!pending.contains(part)) - pending.add(part); - - } - } - - // Assign pending partitions, if any. - assignPending(tier, pendingParts, fullMap, topSnapshot); - - // Balance assignments. - balance(tier, pendingParts, fullMap, topSnapshot); - } - - return fullMap.assignments; - } - - /** {@inheritDoc} */ - @Override public void reset() { - // No-op. - } - - /** {@inheritDoc} */ - @Override public int partitions() { - return parts; - } - - /** {@inheritDoc} */ - @Override public int partition(Object key) { - return U.safeAbs(hash(key.hashCode())) % parts; - } - - /** {@inheritDoc} */ - @Override public void removeNode(UUID nodeId) { - // No-op. - } - - /** - * Assigns pending (unassigned) partitions to nodes. - * - * @param tier Tier to assign (0 is primary, 1 - 1st backup,...). - * @param pendingMap Pending partitions per tier. - * @param fullMap Full assignment map to modify. - * @param topSnapshot Topology snapshot. - */ - private void assignPending(int tier, Map<Integer, Queue<Integer>> pendingMap, FullAssignmentMap fullMap, - List<ClusterNode> topSnapshot) { - Queue<Integer> pending = pendingMap.get(tier); - - if (F.isEmpty(pending)) - return; - - int idealPartCnt = parts / topSnapshot.size(); - - Map<UUID, PartitionSet> tierMapping = fullMap.tierMapping(tier); - - PrioritizedPartitionMap underloadedNodes = filterNodes(tierMapping, idealPartCnt, false); - - // First iterate over underloaded nodes. - assignPendingToUnderloaded(tier, pendingMap, fullMap, underloadedNodes, topSnapshot, false); - - if (!pending.isEmpty() && !underloadedNodes.isEmpty()) { - // Same, forcing updates. - assignPendingToUnderloaded(tier, pendingMap, fullMap, underloadedNodes, topSnapshot, true); - } - - if (!pending.isEmpty()) - assignPendingToNodes(tier, pendingMap, fullMap, topSnapshot); - - assert pending.isEmpty(); - - pendingMap.remove(tier); - } - - /** - * Assigns pending partitions to underloaded nodes. - * - * @param tier Tier to assign. - * @param pendingMap Pending partitions per tier. - * @param fullMap Full assignment map to modify. - * @param underloadedNodes Underloaded nodes. - * @param topSnapshot Topology snapshot. - * @param force {@code True} if partitions should be moved. - */ - private void assignPendingToUnderloaded( - int tier, - Map<Integer, Queue<Integer>> pendingMap, - FullAssignmentMap fullMap, - PrioritizedPartitionMap underloadedNodes, - Collection<ClusterNode> topSnapshot, - boolean force) { - Iterator<Integer> it = pendingMap.get(tier).iterator(); - - int ideal = parts / topSnapshot.size(); - - while (it.hasNext()) { - int part = it.next(); - - for (PartitionSet set : underloadedNodes.assignments()) { - ClusterNode node = set.node(); - - assert node != null; - - if (fullMap.assign(part, tier, node, force, pendingMap)) { - // We could add partition to partition map without forcing, remove partition from pending. - it.remove(); - - if (set.size() <= ideal) - underloadedNodes.remove(set.nodeId()); - else - underloadedNodes.update(); - - break; // for, continue to the next partition. - } - } - - if (underloadedNodes.isEmpty()) - return; - } - } - - /** - * Spreads pending partitions equally to all nodes in topology snapshot. - * - * @param tier Tier to assign. - * @param pendingMap Pending partitions per tier. - * @param fullMap Full assignment map to modify. - * @param topSnapshot Topology snapshot. - */ - private void assignPendingToNodes(int tier, Map<Integer, Queue<Integer>> pendingMap, - FullAssignmentMap fullMap, List<ClusterNode> topSnapshot) { - Iterator<Integer> it = pendingMap.get(tier).iterator(); - - int idx = 0; - - while (it.hasNext()) { - int part = it.next(); - - int i = idx; - - boolean assigned = false; - - do { - ClusterNode node = topSnapshot.get(i); - - if (fullMap.assign(part, tier, node, false, pendingMap)) { - it.remove(); - - assigned = true; - } - - i = (i + 1) % topSnapshot.size(); - - if (assigned) - idx = i; - } while (i != idx); - - if (!assigned) { - do { - ClusterNode node = topSnapshot.get(i); - - if (fullMap.assign(part, tier, node, true, pendingMap)) { - it.remove(); - - assigned = true; - } - - i = (i + 1) % topSnapshot.size(); - - if (assigned) - idx = i; - } while (i != idx); - } - - if (!assigned) - throw new IllegalStateException("Failed to find assignable node for partition."); - } - } - - /** - * Tries to balance assignments between existing nodes in topology. - * - * @param tier Tier to assign. - * @param pendingParts Pending partitions per tier. - * @param fullMap Full assignment map to modify. - * @param topSnapshot Topology snapshot. - */ - private void balance(int tier, Map<Integer, Queue<Integer>> pendingParts, FullAssignmentMap fullMap, - Collection<ClusterNode> topSnapshot) { - int idealPartCnt = parts / topSnapshot.size(); - - Map<UUID, PartitionSet> mapping = fullMap.tierMapping(tier); - - PrioritizedPartitionMap underloadedNodes = filterNodes(mapping, idealPartCnt, false); - PrioritizedPartitionMap overloadedNodes = filterNodes(mapping, idealPartCnt, true); - - do { - boolean retry = false; - - for (PartitionSet overloaded : overloadedNodes.assignments()) { - for (Integer part : overloaded.partitions()) { - boolean assigned = false; - - for (PartitionSet underloaded : underloadedNodes.assignments()) { - if (fullMap.assign(part, tier, underloaded.node(), false, pendingParts)) { - // Size of partition sets has changed. - if (overloaded.size() <= idealPartCnt) - overloadedNodes.remove(overloaded.nodeId()); - else - overloadedNodes.update(); - - if (underloaded.size() >= idealPartCnt) - underloadedNodes.remove(underloaded.nodeId()); - else - underloadedNodes.update(); - - assigned = true; - - retry = true; - - break; - } - } - - if (!assigned) { - for (PartitionSet underloaded : underloadedNodes.assignments()) { - if (fullMap.assign(part, tier, underloaded.node(), true, pendingParts)) { - // Size of partition sets has changed. - if (overloaded.size() <= idealPartCnt) - overloadedNodes.remove(overloaded.nodeId()); - else - overloadedNodes.update(); - - if (underloaded.size() >= idealPartCnt) - underloadedNodes.remove(underloaded.nodeId()); - else - underloadedNodes.update(); - - retry = true; - - break; - } - } - } - - if (retry) - break; // for part. - } - - if (retry) - break; // for overloaded. - } - - if (!retry) - break; - } - while (true); - } - - /** - * Constructs underloaded or overloaded partition map. - * - * @param mapping Mapping to filter. - * @param idealPartCnt Ideal number of partitions per node. - * @param overloaded {@code True} if should create overloaded map, {@code false} for underloaded. - * @return Prioritized partition map. - */ - private PrioritizedPartitionMap filterNodes(Map<UUID, PartitionSet> mapping, int idealPartCnt, boolean overloaded) { - assert mapping != null; - - PrioritizedPartitionMap res = new PrioritizedPartitionMap(overloaded ? DESC_CMP : ASC_CMP); - - for (PartitionSet set : mapping.values()) { - if ((overloaded && set.size() > idealPartCnt) || (!overloaded && set.size() < idealPartCnt)) - res.add(set); - } - - return res; - } - - /** - * Creates copy of previous partition assignment. - * - * @param ctx Affinity function context. - * @param topSnapshot Topology snapshot. - * @return Assignment copy and per node partition map. - */ - private IgniteBiTuple<List<List<ClusterNode>>, Map<UUID, PartitionSet>> createCopy( - GridCacheAffinityFunctionContext ctx, Iterable<ClusterNode> topSnapshot) { - IgniteDiscoveryEvent discoEvt = ctx.discoveryEvent(); - - UUID leftNodeId = discoEvt.type() == IgniteEventType.EVT_NODE_JOINED ? null : discoEvt.eventNode().id(); - - List<List<ClusterNode>> cp = new ArrayList<>(parts); - - Map<UUID, PartitionSet> parts = new HashMap<>(); - - for (int part = 0; part < this.parts; part++) { - List<ClusterNode> partNodes = ctx.previousAssignment(part); - - List<ClusterNode> partNodesCp = new ArrayList<>(partNodes.size()); - - for (ClusterNode affNode : partNodes) { - if (!affNode.id().equals(leftNodeId)) { - partNodesCp.add(affNode); - - PartitionSet partSet = parts.get(affNode.id()); - - if (partSet == null) { - partSet = new PartitionSet(affNode); - - parts.put(affNode.id(), partSet); - } - - partSet.add(part); - } - } - - cp.add(partNodesCp); - } - - if (leftNodeId == null) { - // Node joined, find it and add empty set to mapping. - ClusterNode joinedNode = null; - - for (ClusterNode node : topSnapshot) { - if (node.id().equals(discoEvt.eventNode().id())) { - joinedNode = node; - - break; - } - } - - assert joinedNode != null; - - parts.put(joinedNode.id(), new PartitionSet(joinedNode)); - } - - return F.t(cp, parts); - } - - /** - * - */ - private static class PartitionSetComparator implements Comparator<PartitionSet>, Serializable { - /** */ - private static final long serialVersionUID = 0L; - - /** */ - private boolean descending; - - /** - * @param descending {@code True} if comparator should be descending. - */ - private PartitionSetComparator(boolean descending) { - this.descending = descending; - } - - /** {@inheritDoc} */ - @Override public int compare(PartitionSet o1, PartitionSet o2) { - int res = o1.parts.size() < o2.parts.size() ? -1 : o1.parts.size() > o2.parts.size() ? 1 : 0; - - return descending ? -res : res; - } - } - - /** - * Prioritized partition map. Ordered structure in which nodes are ordered in ascending or descending order - * by number of partitions assigned to a node. - */ - private static class PrioritizedPartitionMap { - /** Comparator. */ - private Comparator<PartitionSet> cmp; - - /** Assignment map. */ - private Map<UUID, PartitionSet> assignmentMap = new HashMap<>(); - - /** Assignment list, ordered according to comparator. */ - private List<PartitionSet> assignmentList = new ArrayList<>(); - - /** - * @param cmp Comparator. - */ - private PrioritizedPartitionMap(Comparator<PartitionSet> cmp) { - this.cmp = cmp; - } - - /** - * @param set Partition set to add. - */ - public void add(PartitionSet set) { - PartitionSet old = assignmentMap.put(set.nodeId(), set); - - if (old == null) { - assignmentList.add(set); - - update(); - } - } - - /** - * Sorts assignment list. - */ - public void update() { - Collections.sort(assignmentList, cmp); - } - - /** - * @return Sorted assignment list. - */ - public List<PartitionSet> assignments() { - return assignmentList; - } - - public void remove(UUID uuid) { - PartitionSet rmv = assignmentMap.remove(uuid); - - assignmentList.remove(rmv); - } - - public boolean isEmpty() { - return assignmentList.isEmpty(); - } - } - - /** - * Constructs assignment map for specified tier. - * - * @param tier Tier number, -1 for all tiers altogether. - * @param assignment Assignment to construct map from. - * @param topSnapshot Topology snapshot. - * @return Assignment map. - */ - private static Map<UUID, PartitionSet> assignments(int tier, List<List<ClusterNode>> assignment, - Collection<ClusterNode> topSnapshot) { - Map<UUID, PartitionSet> tmp = new LinkedHashMap<>(); - - for (int part = 0; part < assignment.size(); part++) { - List<ClusterNode> nodes = assignment.get(part); - - assert nodes instanceof RandomAccess; - - if (nodes.size() <= tier) - continue; - - int start = tier < 0 ? 0 : tier; - int end = tier < 0 ? nodes.size() : tier + 1; - - for (int i = start; i < end; i++) { - ClusterNode n = nodes.get(i); - - PartitionSet set = tmp.get(n.id()); - - if (set == null) { - set = new PartitionSet(n); - - tmp.put(n.id(), set); - } - - set.add(part); - } - } - - if (tmp.size() < topSnapshot.size()) { - for (ClusterNode node : topSnapshot) { - if (!tmp.containsKey(node.id())) - tmp.put(node.id(), new PartitionSet(node)); - } - } - - return tmp; - } - - /** - * Full assignment map. Auxiliary data structure which maintains resulting assignment and temporary - * maps consistent. - */ - @SuppressWarnings("unchecked") - private static class FullAssignmentMap { - /** Per-tier assignment maps. */ - private Map<UUID, PartitionSet>[] tierMaps; - - /** Full assignment map. */ - private Map<UUID, PartitionSet> fullMap; - - /** Resulting assignment. */ - private List<List<ClusterNode>> assignments; - - /** - * @param tiers Number of tiers. - * @param assignments Assignments to modify. - * @param topSnapshot Topology snapshot. - */ - private FullAssignmentMap(int tiers, List<List<ClusterNode>> assignments, Collection<ClusterNode> topSnapshot) { - this.assignments = assignments; - - tierMaps = new Map[tiers]; - - for (int tier = 0; tier < tiers; tier++) - tierMaps[tier] = assignments(tier, assignments, topSnapshot); - - fullMap = assignments(-1, assignments, topSnapshot); - } - - /** - * Tries to assign partition to given node on specified tier. If force is false, assignment will succeed - * only if this partition is not already assigned to a node. If force is true, then assignment will succeed - * only if partition is not assigned to a tier with number less than passed in. Assigned partition from - * greater tier will be moved to pending queue. - * - * @param part Partition to assign. - * @param tier Tier number to assign. - * @param node Node to move partition to. - * @param force Force flag. - * @param pendingParts per tier pending partitions map. - * @return {@code True} if assignment succeeded. - */ - boolean assign(int part, int tier, ClusterNode node, boolean force, Map<Integer, Queue<Integer>> pendingParts) { - UUID nodeId = node.id(); - - if (!fullMap.get(nodeId).contains(part)) { - tierMaps[tier].get(nodeId).add(part); - - fullMap.get(nodeId).add(part); - - List<ClusterNode> assignment = assignments.get(part); - - if (assignment.size() <= tier) - assignment.add(node); - else { - ClusterNode oldNode = assignment.set(tier, node); - - if (oldNode != null) { - UUID oldNodeId = oldNode.id(); - - tierMaps[tier].get(oldNodeId).remove(part); - fullMap.get(oldNodeId).remove(part); - } - } - - return true; - } - else if (force) { - assert !tierMaps[tier].get(nodeId).contains(part); - - // Check previous tiers first. - for (int t = 0; t < tier; t++) { - if (tierMaps[t].get(nodeId).contains(part)) - return false; - } - - // Partition is on some lower tier, switch it. - for (int t = tier + 1; t < tierMaps.length; t++) { - if (tierMaps[t].get(nodeId).contains(part)) { - ClusterNode oldNode = assignments.get(part).get(tier); - - // Move partition from level t to tier. - assignments.get(part).set(tier, node); - assignments.get(part).set(t, null); - - if (oldNode != null) { - tierMaps[tier].get(oldNode.id()).remove(part); - fullMap.get(oldNode.id()).remove(part); - } - - tierMaps[tier].get(nodeId).add(part); - tierMaps[t].get(nodeId).remove(part); - - Queue<Integer> pending = pendingParts.get(t); - - if (pending == null) { - pending = new LinkedList<>(); - - pendingParts.put(t, pending); - } - - pending.add(part); - - return true; - } - } - - throw new IllegalStateException("Unable to assign partition to node while force is true."); - } - - // !force. - return false; - } - - /** - * Gets tier mapping. - * - * @param tier Tier to get mapping. - * @return Per node map. - */ - public Map<UUID, PartitionSet> tierMapping(int tier) { - return tierMaps[tier]; - } - } - - /** - * Applies a supplemental hash function to a given hashCode, which - * defends against poor quality hash functions. - * - * @param h Hash code. - * @return Enhanced hash code. - */ - private static int hash(int h) { - // Spread bits to regularize both segment and index locations, - // using variant of single-word Wang/Jenkins hash. - h += (h << 15) ^ 0xffffcd7d; - h ^= (h >>> 10); - h += (h << 3); - h ^= (h >>> 6); - h += (h << 2) + (h << 14); - return h ^ (h >>> 16); - } - - @SuppressWarnings("ComparableImplementedButEqualsNotOverridden") - private static class PartitionSet { - /** */ - private ClusterNode node; - - /** Partitions. */ - private Collection<Integer> parts = new LinkedList<>(); - - /** - * @param node Node. - */ - private PartitionSet(ClusterNode node) { - this.node = node; - } - - /** - * @return Node. - */ - private ClusterNode node() { - return node; - } - - /** - * @return Node ID. - */ - private UUID nodeId() { - return node.id(); - } - - /** - * @return Partition set size. - */ - private int size() { - return parts.size(); - } - - /** - * Adds partition to partition set. - * - * @param part Partition to add. - * @return {@code True} if partition was added, {@code false} if partition already exists. - */ - private boolean add(int part) { - if (!parts.contains(part)) { - parts.add(part); - - return true; - } - - return false; - } - - /** - * @param part Partition to remove. - */ - private void remove(Integer part) { - parts.remove(part); // Remove object, not index. - } - - /** - * @return Partitions. - */ - @SuppressWarnings("TypeMayBeWeakened") - private Collection<Integer> partitions() { - return parts; - } - - /** - * Checks if partition set contains given partition. - * - * @param part Partition to check. - * @return {@code True} if partition set contains given partition. - */ - private boolean contains(int part) { - return parts.contains(part); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return "PartSet [nodeId=" + node.id() + ", size=" + parts.size() + ", parts=" + parts + ']'; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25609651/modules/core/src/main/java/org/gridgain/grid/cache/affinity/fair/package.html ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/cache/affinity/fair/package.html b/modules/core/src/main/java/org/gridgain/grid/cache/affinity/fair/package.html deleted file mode 100644 index f71f3c2..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/cache/affinity/fair/package.html +++ /dev/null @@ -1,24 +0,0 @@ -<!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - --> - -<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> -<html> -<body> - <!-- Package description. --> - Contains fair cache affinity for partitioned cache. -</body> -</html> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25609651/modules/core/src/main/java/org/gridgain/grid/cache/affinity/package.html ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/cache/affinity/package.html b/modules/core/src/main/java/org/gridgain/grid/cache/affinity/package.html deleted file mode 100644 index defee90..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/cache/affinity/package.html +++ /dev/null @@ -1,24 +0,0 @@ -<!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - --> - -<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> -<html> -<body> - <!-- Package description. --> - Contains cache node affinity implementations. -</body> -</html> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25609651/modules/core/src/main/java/org/gridgain/grid/cache/affinity/rendezvous/GridCacheRendezvousAffinityFunction.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/cache/affinity/rendezvous/GridCacheRendezvousAffinityFunction.java b/modules/core/src/main/java/org/gridgain/grid/cache/affinity/rendezvous/GridCacheRendezvousAffinityFunction.java deleted file mode 100644 index cfb1ada..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/cache/affinity/rendezvous/GridCacheRendezvousAffinityFunction.java +++ /dev/null @@ -1,503 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.cache.affinity.rendezvous; - -import org.apache.ignite.*; -import org.apache.ignite.cluster.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.marshaller.*; -import org.apache.ignite.marshaller.optimized.*; -import org.gridgain.grid.*; -import org.gridgain.grid.cache.*; -import org.gridgain.grid.cache.affinity.*; -import org.gridgain.grid.kernal.*; -import org.gridgain.grid.util.typedef.*; -import org.gridgain.grid.util.typedef.internal.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.security.*; -import java.util.*; - -/** - * Affinity function for partitioned cache based on Highest Random Weight algorithm. - * This function supports the following configuration: - * <ul> - * <li> - * {@code partitions} - Number of partitions to spread across nodes. - * </li> - * <li> - * {@code excludeNeighbors} - If set to {@code true}, will exclude same-host-neighbors - * from being backups of each other. Note that {@code backupFilter} is ignored if - * {@code excludeNeighbors} is set to {@code true}. - * </li> - * <li> - * {@code backupFilter} - Optional filter for back up nodes. If provided, then only - * nodes that pass this filter will be selected as backup nodes. If not provided, then - * primary and backup nodes will be selected out of all nodes available for this cache. - * </li> - * </ul> - * <p> - * Cache affinity can be configured for individual caches via {@link GridCacheConfiguration#getAffinity()} method. - */ -public class GridCacheRendezvousAffinityFunction implements GridCacheAffinityFunction, Externalizable { - /** */ - private static final long serialVersionUID = 0L; - - /** Default number of partitions. */ - public static final int DFLT_PARTITION_COUNT = 10000; - - /** Comparator. */ - private static final Comparator<IgniteBiTuple<Long, ClusterNode>> COMPARATOR = - new HashComparator(); - - /** Thread local message digest. */ - private ThreadLocal<MessageDigest> digest = new ThreadLocal<MessageDigest>() { - @Override protected MessageDigest initialValue() { - try { - return MessageDigest.getInstance("MD5"); - } - catch (NoSuchAlgorithmException e) { - assert false : "Should have failed in constructor"; - - throw new IgniteException("Failed to obtain message digest (digest was available in constructor)", - e); - } - } - }; - - /** Number of partitions. */ - private int parts; - - /** Exclude neighbors flag. */ - private boolean exclNeighbors; - - /** Optional backup filter. First node is primary, second node is a node being tested. */ - private IgniteBiPredicate<ClusterNode, ClusterNode> backupFilter; - - /** Hash ID resolver. */ - private GridCacheAffinityNodeHashResolver hashIdRslvr = new GridCacheAffinityNodeAddressHashResolver(); - - /** Marshaller. */ - private IgniteMarshaller marshaller = new IgniteOptimizedMarshaller(false); - - /** - * Empty constructor with all defaults. - */ - public GridCacheRendezvousAffinityFunction() { - this(false); - } - - /** - * Initializes affinity with flag to exclude same-host-neighbors from being backups of each other - * and specified number of backups. - * <p> - * Note that {@code excludeNeighbors} parameter is ignored if {@code #getBackupFilter()} is set. - * - * @param exclNeighbors {@code True} if nodes residing on the same host may not act as backups - * of each other. - */ - public GridCacheRendezvousAffinityFunction(boolean exclNeighbors) { - this(exclNeighbors, DFLT_PARTITION_COUNT); - } - - /** - * Initializes affinity with flag to exclude same-host-neighbors from being backups of each other, - * and specified number of backups and partitions. - * <p> - * Note that {@code excludeNeighbors} parameter is ignored if {@code #getBackupFilter()} is set. - * - * @param exclNeighbors {@code True} if nodes residing on the same host may not act as backups - * of each other. - * @param parts Total number of partitions. - */ - public GridCacheRendezvousAffinityFunction(boolean exclNeighbors, int parts) { - this(exclNeighbors, parts, null); - } - - /** - * Initializes optional counts for replicas and backups. - * <p> - * Note that {@code excludeNeighbors} parameter is ignored if {@code backupFilter} is set. - * - * @param parts Total number of partitions. - * @param backupFilter Optional back up filter for nodes. If provided, backups will be selected - * from all nodes that pass this filter. First argument for this filter is primary node, and second - * argument is node being tested. - * <p> - * Note that {@code excludeNeighbors} parameter is ignored if {@code backupFilter} is set. - */ - public GridCacheRendezvousAffinityFunction(int parts, - @Nullable IgniteBiPredicate<ClusterNode, ClusterNode> backupFilter) { - this(false, parts, backupFilter); - } - - /** - * Private constructor. - * - * @param exclNeighbors Exclude neighbors flag. - * @param parts Partitions count. - * @param backupFilter Backup filter. - */ - private GridCacheRendezvousAffinityFunction(boolean exclNeighbors, int parts, - IgniteBiPredicate<ClusterNode, ClusterNode> backupFilter) { - A.ensure(parts != 0, "parts != 0"); - - this.exclNeighbors = exclNeighbors; - this.parts = parts; - this.backupFilter = backupFilter; - - try { - MessageDigest.getInstance("MD5"); - } - catch (NoSuchAlgorithmException e) { - throw new IgniteException("Failed to obtain MD5 message digest instance.", e); - } - } - - /** - * Gets total number of key partitions. To ensure that all partitions are - * equally distributed across all nodes, please make sure that this - * number is significantly larger than a number of nodes. Also, partition - * size should be relatively small. Try to avoid having partitions with more - * than quarter million keys. - * <p> - * Note that for fully replicated caches this method should always - * return {@code 1}. - * - * @return Total partition count. - */ - public int getPartitions() { - return parts; - } - - /** - * Sets total number of partitions. - * - * @param parts Total number of partitions. - */ - public void setPartitions(int parts) { - this.parts = parts; - } - - /** - * Gets hash ID resolver for nodes. This resolver is used to provide - * alternate hash ID, other than node ID. - * <p> - * Node IDs constantly change when nodes get restarted, which causes them to - * be placed on different locations in the hash ring, and hence causing - * repartitioning. Providing an alternate hash ID, which survives node restarts, - * puts node on the same location on the hash ring, hence minimizing required - * repartitioning. - * - * @return Hash ID resolver. - */ - public GridCacheAffinityNodeHashResolver getHashIdResolver() { - return hashIdRslvr; - } - - /** - * Sets hash ID resolver for nodes. This resolver is used to provide - * alternate hash ID, other than node ID. - * <p> - * Node IDs constantly change when nodes get restarted, which causes them to - * be placed on different locations in the hash ring, and hence causing - * repartitioning. Providing an alternate hash ID, which survives node restarts, - * puts node on the same location on the hash ring, hence minimizing required - * repartitioning. - * - * @param hashIdRslvr Hash ID resolver. - */ - public void setHashIdResolver(GridCacheAffinityNodeHashResolver hashIdRslvr) { - this.hashIdRslvr = hashIdRslvr; - } - - /** - * Gets optional backup filter. If not {@code null}, backups will be selected - * from all nodes that pass this filter. First node passed to this filter is primary node, - * and second node is a node being tested. - * <p> - * Note that {@code excludeNeighbors} parameter is ignored if {@code backupFilter} is set. - * - * @return Optional backup filter. - */ - @Nullable public IgniteBiPredicate<ClusterNode, ClusterNode> getBackupFilter() { - return backupFilter; - } - - /** - * Sets optional backup filter. If provided, then backups will be selected from all - * nodes that pass this filter. First node being passed to this filter is primary node, - * and second node is a node being tested. - * <p> - * Note that {@code excludeNeighbors} parameter is ignored if {@code backupFilter} is set. - * - * @param backupFilter Optional backup filter. - */ - public void setBackupFilter(@Nullable IgniteBiPredicate<ClusterNode, ClusterNode> backupFilter) { - this.backupFilter = backupFilter; - } - - /** - * Checks flag to exclude same-host-neighbors from being backups of each other (default is {@code false}). - * <p> - * Note that {@code excludeNeighbors} parameter is ignored if {@code #getBackupFilter()} is set. - * - * @return {@code True} if nodes residing on the same host may not act as backups of each other. - */ - public boolean isExcludeNeighbors() { - return exclNeighbors; - } - - /** - * Sets flag to exclude same-host-neighbors from being backups of each other (default is {@code false}). - * <p> - * Note that {@code excludeNeighbors} parameter is ignored if {@code #getBackupFilter()} is set. - * - * @param exclNeighbors {@code True} if nodes residing on the same host may not act as backups of each other. - */ - public void setExcludeNeighbors(boolean exclNeighbors) { - this.exclNeighbors = exclNeighbors; - } - - /** - * Returns collection of nodes (primary first) for specified partition. - */ - public List<ClusterNode> assignPartition(int part, List<ClusterNode> nodes, int backups, - @Nullable Map<UUID, Collection<ClusterNode>> neighborhoodCache) { - if (nodes.size() <= 1) - return nodes; - - List<IgniteBiTuple<Long, ClusterNode>> lst = new ArrayList<>(); - - MessageDigest d = digest.get(); - - for (ClusterNode node : nodes) { - Object nodeHash = hashIdRslvr.resolve(node); - - try { - ByteArrayOutputStream out = new ByteArrayOutputStream(); - - byte[] nodeHashBytes = marshaller.marshal(nodeHash); - - out.write(nodeHashBytes, 0, nodeHashBytes.length); // Avoid IOException. - out.write(U.intToBytes(part), 0, 4); // Avoid IOException. - - d.reset(); - - byte[] bytes = d.digest(out.toByteArray()); - - long hash = - (bytes[0] & 0xFFL) - | ((bytes[1] & 0xFFL) << 8) - | ((bytes[2] & 0xFFL) << 16) - | ((bytes[3] & 0xFFL) << 24) - | ((bytes[4] & 0xFFL) << 32) - | ((bytes[5] & 0xFFL) << 40) - | ((bytes[6] & 0xFFL) << 48) - | ((bytes[7] & 0xFFL) << 56); - - lst.add(F.t(hash, node)); - } - catch (IgniteCheckedException e) { - throw new IgniteException(e); - } - } - - Collections.sort(lst, COMPARATOR); - - int primaryAndBackups; - - List<ClusterNode> res; - - if (backups == Integer.MAX_VALUE) { - primaryAndBackups = Integer.MAX_VALUE; - - res = new ArrayList<>(); - } - else { - primaryAndBackups = backups + 1; - - res = new ArrayList<>(primaryAndBackups); - } - - ClusterNode primary = lst.get(0).get2(); - - res.add(primary); - - // Select backups. - if (backups > 0) { - for (int i = 1; i < lst.size(); i++) { - IgniteBiTuple<Long, ClusterNode> next = lst.get(i); - - ClusterNode node = next.get2(); - - if (exclNeighbors) { - Collection<ClusterNode> allNeighbors = allNeighbors(neighborhoodCache, res); - - if (!allNeighbors.contains(node)) - res.add(node); - } - else { - if (!res.contains(node) && (backupFilter == null || backupFilter.apply(primary, node))) - res.add(next.get2()); - } - - if (res.size() == primaryAndBackups) - break; - } - } - - if (res.size() < primaryAndBackups && nodes.size() >= primaryAndBackups && exclNeighbors) { - // Need to iterate one more time in case if there are no nodes which pass exclude backups criteria. - for (int i = 1; i < lst.size(); i++) { - IgniteBiTuple<Long, ClusterNode> next = lst.get(i); - - ClusterNode node = next.get2(); - - if (!res.contains(node)) - res.add(next.get2()); - - if (res.size() == primaryAndBackups) - break; - } - } - - assert res.size() <= primaryAndBackups; - - return res; - } - - /** {@inheritDoc} */ - @Override public void reset() { - // No-op. - } - - /** {@inheritDoc} */ - @Override public int partitions() { - return parts; - } - - /** {@inheritDoc} */ - @Override public int partition(Object key) { - return U.safeAbs(key.hashCode() % parts); - } - - /** {@inheritDoc} */ - @Override public List<List<ClusterNode>> assignPartitions(GridCacheAffinityFunctionContext affCtx) { - List<List<ClusterNode>> assignments = new ArrayList<>(parts); - - Map<UUID, Collection<ClusterNode>> neighborhoodCache = exclNeighbors ? - neighbors(affCtx.currentTopologySnapshot()) : null; - - for (int i = 0; i < parts; i++) { - List<ClusterNode> partAssignment = assignPartition(i, affCtx.currentTopologySnapshot(), affCtx.backups(), - neighborhoodCache); - - assignments.add(partAssignment); - } - - return assignments; - } - - /** {@inheritDoc} */ - @Override public void removeNode(UUID nodeId) { - // No-op. - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - out.writeInt(parts); - out.writeBoolean(exclNeighbors); - out.writeObject(hashIdRslvr); - out.writeObject(backupFilter); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - parts = in.readInt(); - exclNeighbors = in.readBoolean(); - hashIdRslvr = (GridCacheAffinityNodeHashResolver)in.readObject(); - backupFilter = (IgniteBiPredicate<ClusterNode, ClusterNode>)in.readObject(); - } - - /** - * Builds neighborhood map for all nodes in snapshot. - * - * @param topSnapshot Topology snapshot. - * @return Neighbors map. - */ - private Map<UUID, Collection<ClusterNode>> neighbors(Collection<ClusterNode> topSnapshot) { - Map<String, Collection<ClusterNode>> macMap = new HashMap<>(topSnapshot.size(), 1.0f); - - // Group by mac addresses. - for (ClusterNode node : topSnapshot) { - String macs = node.attribute(GridNodeAttributes.ATTR_MACS); - - Collection<ClusterNode> nodes = macMap.get(macs); - - if (nodes == null) { - nodes = new HashSet<>(); - - macMap.put(macs, nodes); - } - - nodes.add(node); - } - - Map<UUID, Collection<ClusterNode>> neighbors = new HashMap<>(topSnapshot.size(), 1.0f); - - for (Collection<ClusterNode> group : macMap.values()) { - for (ClusterNode node : group) - neighbors.put(node.id(), group); - } - - return neighbors; - } - - /** - * @param neighborhoodCache Neighborhood cache. - * @param nodes Nodes. - * @return All neighbors for given nodes. - */ - private Collection<ClusterNode> allNeighbors(Map<UUID, Collection<ClusterNode>> neighborhoodCache, - Iterable<ClusterNode> nodes) { - Collection<ClusterNode> res = new HashSet<>(); - - for (ClusterNode node : nodes) { - if (!res.contains(node)) - res.addAll(neighborhoodCache.get(node.id())); - } - - return res; - } - - /** - * - */ - private static class HashComparator implements Comparator<IgniteBiTuple<Long, ClusterNode>>, Serializable { - /** */ - private static final long serialVersionUID = 0L; - - /** {@inheritDoc} */ - @Override public int compare(IgniteBiTuple<Long, ClusterNode> o1, IgniteBiTuple<Long, ClusterNode> o2) { - return o1.get1() < o2.get1() ? -1 : o1.get1() > o2.get1() ? 1 : - o1.get2().id().compareTo(o2.get2().id()); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25609651/modules/core/src/main/java/org/gridgain/grid/cache/affinity/rendezvous/package.html ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/cache/affinity/rendezvous/package.html b/modules/core/src/main/java/org/gridgain/grid/cache/affinity/rendezvous/package.html deleted file mode 100644 index 780cabc..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/cache/affinity/rendezvous/package.html +++ /dev/null @@ -1,23 +0,0 @@ -<!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - --> -<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> -<html> -<body> -<!-- Package description. --> -Contains HRW-based cache affinity for partitioned cache. -</body> -</html> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25609651/modules/core/src/main/java/org/gridgain/grid/cache/query/GridCacheQuery.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/cache/query/GridCacheQuery.java b/modules/core/src/main/java/org/gridgain/grid/cache/query/GridCacheQuery.java index c6c3622..f976108 100644 --- a/modules/core/src/main/java/org/gridgain/grid/cache/query/GridCacheQuery.java +++ b/modules/core/src/main/java/org/gridgain/grid/cache/query/GridCacheQuery.java @@ -21,7 +21,6 @@ import org.apache.ignite.cluster.*; import org.apache.ignite.configuration.*; import org.apache.ignite.lang.*; import org.gridgain.grid.cache.*; -import org.gridgain.grid.cache.affinity.*; import org.jetbrains.annotations.*; /** @@ -84,7 +83,7 @@ import org.jetbrains.annotations.*; * Joins will work correctly only if joined objects are stored in * collocated mode or at least one side of the join is stored in * {@link GridCacheMode#REPLICATED} cache. Refer to - * {@link GridCacheAffinityKey} javadoc for more information about colocation. + * {@link org.apache.ignite.cache.affinity.CacheAffinityKey} javadoc for more information about colocation. * </li> * </ul> * <h1 class="header">Query usage</h1> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25609651/modules/core/src/main/java/org/gridgain/grid/kernal/GridGainEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/GridGainEx.java b/modules/core/src/main/java/org/gridgain/grid/kernal/GridGainEx.java index 3085f3b..0756752 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/GridGainEx.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/GridGainEx.java @@ -36,7 +36,7 @@ import org.apache.ignite.streamer.*; import org.apache.ignite.thread.*; import org.gridgain.grid.*; import org.gridgain.grid.cache.*; -import org.gridgain.grid.cache.affinity.rendezvous.*; +import org.apache.ignite.cache.affinity.rendezvous.*; import org.gridgain.grid.kernal.processors.interop.*; import org.gridgain.grid.kernal.processors.resource.*; import org.gridgain.grid.kernal.processors.spring.*; @@ -2041,7 +2041,7 @@ public class GridGainEx { cache.setQueryIndexEnabled(false); cache.setPreloadMode(SYNC); cache.setWriteSynchronizationMode(FULL_SYNC); - cache.setAffinity(new GridCacheRendezvousAffinityFunction(false, 100)); + cache.setAffinity(new CacheRendezvousAffinityFunction(false, 100)); if (client) cache.setDistributionMode(CLIENT_ONLY); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25609651/modules/core/src/main/java/org/gridgain/grid/kernal/GridJobContextImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/GridJobContextImpl.java b/modules/core/src/main/java/org/gridgain/grid/kernal/GridJobContextImpl.java index 8a67989..a293afa 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/GridJobContextImpl.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/GridJobContextImpl.java @@ -20,9 +20,8 @@ package org.gridgain.grid.kernal; import org.apache.ignite.*; import org.apache.ignite.compute.*; import org.apache.ignite.lang.*; -import org.gridgain.grid.*; import org.gridgain.grid.cache.*; -import org.gridgain.grid.cache.affinity.*; +import org.apache.ignite.cache.affinity.*; import org.gridgain.grid.kernal.processors.job.*; import org.gridgain.grid.kernal.processors.timeout.*; import org.gridgain.grid.util.typedef.*; @@ -244,7 +243,7 @@ public class GridJobContextImpl extends GridMetadataAwareAdapter implements Comp /** {@inheritDoc} */ @Override public <T> T affinityKey() { try { - return (T)job.getDeployment().annotatedValue(job.getJob(), GridCacheAffinityKeyMapped.class); + return (T)job.getDeployment().annotatedValue(job.getJob(), CacheAffinityKeyMapped.class); } catch (IgniteCheckedException e) { throw F.wrap(e); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25609651/modules/core/src/main/java/org/gridgain/grid/kernal/GridKernal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/GridKernal.java b/modules/core/src/main/java/org/gridgain/grid/kernal/GridKernal.java index f54d1e5..042da71 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/GridKernal.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/GridKernal.java @@ -18,6 +18,7 @@ package org.gridgain.grid.kernal; import org.apache.ignite.*; +import org.apache.ignite.IgniteCacheAffinity; import org.apache.ignite.cluster.*; import org.apache.ignite.configuration.*; import org.apache.ignite.lang.*; @@ -3233,6 +3234,12 @@ public class GridKernal extends ClusterGroupAdapter implements GridEx, IgniteMBe Ignition.stop(gridName, true); } + /** {@inheritDoc} */ + @Override public <K> IgniteCacheAffinity<K> affinity(String cacheName) { + GridCacheAdapter<K, ?> cache = ctx.cache().internalCache(cacheName); + return cache.affinityProxy(); + } + /** * Creates optional component. * http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25609651/modules/core/src/main/java/org/gridgain/grid/kernal/managers/GridManagerAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/managers/GridManagerAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/managers/GridManagerAdapter.java index 99df76f..445aa1c 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/managers/GridManagerAdapter.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/managers/GridManagerAdapter.java @@ -455,7 +455,7 @@ public abstract class GridManagerAdapter<T extends IgniteSpi> implements GridMan } @Override public int partition(String cacheName, Object key) { - return ctx.cache().cache(cacheName).affinity().partition(key); + return ctx.grid().affinity(cacheName).partition(key); } @Override public void removeFromSwap(String spaceName, Object key, http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25609651/modules/core/src/main/java/org/gridgain/grid/kernal/managers/loadbalancer/GridLoadBalancerManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/managers/loadbalancer/GridLoadBalancerManager.java b/modules/core/src/main/java/org/gridgain/grid/kernal/managers/loadbalancer/GridLoadBalancerManager.java index 576ed0b..1c70e77 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/managers/loadbalancer/GridLoadBalancerManager.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/managers/loadbalancer/GridLoadBalancerManager.java @@ -22,7 +22,7 @@ import org.apache.ignite.cluster.*; import org.apache.ignite.compute.*; import org.apache.ignite.spi.loadbalancing.*; import org.gridgain.grid.cache.*; -import org.gridgain.grid.cache.affinity.*; +import org.apache.ignite.cache.affinity.*; import org.gridgain.grid.kernal.*; import org.gridgain.grid.kernal.managers.*; import org.gridgain.grid.kernal.managers.deployment.*; @@ -130,7 +130,7 @@ public class GridLoadBalancerManager extends GridManagerAdapter<LoadBalancingSpi if (log.isDebugEnabled()) log.debug("Looking for cache affinity node [job=" + job + "]"); - Object key = dep.annotatedValue(job, GridCacheAffinityKeyMapped.class); + Object key = dep.annotatedValue(job, CacheAffinityKeyMapped.class); if (key == null) return null; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/25609651/modules/core/src/main/java/org/gridgain/grid/kernal/processors/affinity/CacheAffinityFunctionContextImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/affinity/CacheAffinityFunctionContextImpl.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/affinity/CacheAffinityFunctionContextImpl.java new file mode 100644 index 0000000..a2edff9 --- /dev/null +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/affinity/CacheAffinityFunctionContextImpl.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.gridgain.grid.kernal.processors.affinity; + +import org.apache.ignite.cluster.*; +import org.apache.ignite.events.*; +import org.apache.ignite.cache.affinity.*; +import org.jetbrains.annotations.*; + +import java.util.*; + +/** + * Cache affinity function context implementation. Simple bean that holds all required fields. + */ +public class CacheAffinityFunctionContextImpl implements CacheAffinityFunctionContext { + /** Topology snapshot. */ + private List<ClusterNode> topSnapshot; + + /** Previous affinity assignment. */ + private List<List<ClusterNode>> prevAssignment; + + /** Discovery event that caused this topology change. */ + private IgniteDiscoveryEvent discoEvt; + + /** Topology version. */ + private long topVer; + + /** Number of backups to assign. */ + private int backups; + + /** + * @param topSnapshot Topology snapshot. + * @param topVer Topology version. + */ + public CacheAffinityFunctionContextImpl(List<ClusterNode> topSnapshot, List<List<ClusterNode>> prevAssignment, + IgniteDiscoveryEvent discoEvt, long topVer, int backups) { + this.topSnapshot = topSnapshot; + this.prevAssignment = prevAssignment; + this.discoEvt = discoEvt; + this.topVer = topVer; + this.backups = backups; + } + + /** {@inheritDoc} */ + @Nullable @Override public List<ClusterNode> previousAssignment(int part) { + return prevAssignment.get(part); + } + + /** {@inheritDoc} */ + @Override public List<ClusterNode> currentTopologySnapshot() { + return topSnapshot; + } + + /** {@inheritDoc} */ + @Override public long currentTopologyVersion() { + return topVer; + } + + /** {@inheritDoc} */ + @Nullable @Override public IgniteDiscoveryEvent discoveryEvent() { + return discoEvt; + } + + /** {@inheritDoc} */ + @Override public int backups() { + return backups; + } +}