http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4ba0b01a/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheAffinity.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheAffinity.java b/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheAffinity.java deleted file mode 100644 index 873daf1..0000000 --- a/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheAffinity.java +++ /dev/null @@ -1,246 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.cache.affinity; - -import org.apache.ignite.cluster.*; -import org.jetbrains.annotations.*; - -import java.util.*; - -/** - * Provides affinity information to detect which node is primary and which nodes are - * backups for a partitioned cache. You can get an instance of this interface by calling - * {@code Cache.affinity()} method. - * <p> - * Mapping of a key to a node is a three-step operation. First step will get an affinity key for given key - * using {@link CacheAffinityKeyMapper}. If mapper is not specified, the original key will be used. Second step - * will map affinity key to partition using {@link CacheAffinityFunction#partition(Object)} method. Third step - * will map obtained partition to nodes for current grid topology version. - * <p> - * Interface provides various {@code 'mapKeysToNodes(..)'} methods which provide node affinity mapping for - * given keys. All {@code 'mapKeysToNodes(..)'} methods are not transactional and will not enlist - * keys into ongoing transaction. - */ -public interface CacheAffinity<K> { - /** - * Gets number of partitions in cache according to configured affinity function. - * - * @return Number of cache partitions. - * @see CacheAffinityFunction - * @see org.apache.ignite.configuration.CacheConfiguration#getAffinity() - * @see org.apache.ignite.configuration.CacheConfiguration#setAffinity(CacheAffinityFunction) - */ - public int partitions(); - - /** - * Gets partition id for the given key. - * - * @param key Key to get partition id for. - * @return Partition id. - * @see CacheAffinityFunction - * @see org.apache.ignite.configuration.CacheConfiguration#getAffinity() - * @see org.apache.ignite.configuration.CacheConfiguration#setAffinity(CacheAffinityFunction) - */ - public int partition(K key); - - /** - * Returns {@code true} if given node is the primary node for given key. - * - * @param n Node to check. - * @param key Key to check. - * @return {@code True} if local node is the primary node for given key. - */ - public boolean isPrimary(ClusterNode n, K key); - - /** - * Returns {@code true} if local node is one of the backup nodes for given key. - * - * @param n Node to check. - * @param key Key to check. - * @return {@code True} if local node is one of the backup nodes for given key. - */ - public boolean isBackup(ClusterNode n, K key); - - /** - * Returns {@code true} if local node is primary or one of the backup nodes - * <p> - * This method is essentially equivalent to calling - * <i>"{@link #isPrimary(org.apache.ignite.cluster.ClusterNode, Object)} || - * {@link #isBackup(org.apache.ignite.cluster.ClusterNode, Object)})"</i>, - * however it is more efficient as it makes both checks at once. - * - * @param n Node to check. - * @param key Key to check. - * @return {@code True} if local node is primary or backup for given key. - */ - public boolean isPrimaryOrBackup(ClusterNode n, K key); - - /** - * Gets partition ids for which nodes of the given projection has primary - * ownership. - * <p> - * Note that since {@link org.apache.ignite.cluster.ClusterNode} implements {@link org.apache.ignite.cluster.ClusterGroup}, - * to find out primary partitions for a single node just pass - * a single node into this method. - * <p> - * This method may return an empty array if none of nodes in the projection - * have nearOnly disabled. - * - * @param n Grid node. - * @return Partition ids for which given projection has primary ownership. - * @see CacheAffinityFunction - * @see org.apache.ignite.configuration.CacheConfiguration#getAffinity() - * @see org.apache.ignite.configuration.CacheConfiguration#setAffinity(CacheAffinityFunction) - */ - public int[] primaryPartitions(ClusterNode n); - - /** - * Gets partition ids for which nodes of the given projection has backup - * ownership. Note that you can find a back up at a certain level, e.g. - * {@code first} backup or {@code third} backup by specifying the - * {@code 'levels} parameter. If no {@code 'level'} is specified then - * all backup partitions are returned. - * <p> - * Note that since {@link org.apache.ignite.cluster.ClusterNode} implements {@link org.apache.ignite.cluster.ClusterGroup}, - * to find out backup partitions for a single node, just pass that single - * node into this method. - * <p> - * This method may return an empty array if none of nodes in the projection - * have nearOnly disabled. - * - * @param n Grid node. - * @return Partition ids for which given projection has backup ownership. - * @see CacheAffinityFunction - * @see org.apache.ignite.configuration.CacheConfiguration#getAffinity() - * @see org.apache.ignite.configuration.CacheConfiguration#setAffinity(CacheAffinityFunction) - */ - public int[] backupPartitions(ClusterNode n); - - /** - * Gets partition ids for which nodes of the given projection has ownership - * (either primary or backup). - * <p> - * Note that since {@link org.apache.ignite.cluster.ClusterNode} implements {@link org.apache.ignite.cluster.ClusterGroup}, - * to find out all partitions for a single node, just pass that single - * node into this method. - * <p> - * This method may return an empty array if none of nodes in the projection - * have nearOnly disabled. - * - * @param n Grid node. - * @return Partition ids for which given projection has ownership. - * @see CacheAffinityFunction - * @see org.apache.ignite.configuration.CacheConfiguration#getAffinity() - * @see org.apache.ignite.configuration.CacheConfiguration#setAffinity(CacheAffinityFunction) - */ - public int[] allPartitions(ClusterNode n); - - /** - * Maps passed in key to a key which will be used for node affinity. The affinity - * key may be different from actual key if some field in the actual key was - * designated for affinity mapping via {@link CacheAffinityKeyMapped} annotation - * or if a custom {@link CacheAffinityKeyMapper} was configured. - * - * @param key Key to map. - * @return Key to be used for node-to-affinity mapping (may be the same - * key as passed in). - */ - public Object affinityKey(K key); - - /** - * This method provides ability to detect which keys are mapped to which nodes. - * Use it to determine which nodes are storing which keys prior to sending - * jobs that access these keys. - * <p> - * This method works as following: - * <ul> - * <li>For local caches it returns only local node mapped to all keys.</li> - * <li> - * For fully replicated caches {@link CacheAffinityFunction} is - * used to determine which keys are mapped to which nodes. - * </li> - * <li>For partitioned caches, the returned map represents node-to-key affinity.</li> - * </ul> - * - * @param keys Keys to map to nodes. - * @return Map of nodes to keys or empty map if there are no alive nodes for this cache. - */ - public Map<ClusterNode, Collection<K>> mapKeysToNodes(@Nullable Collection<? extends K> keys); - - /** - * This method provides ability to detect to which primary node the given key - * is mapped. Use it to determine which nodes are storing which keys prior to sending - * jobs that access these keys. - * <p> - * This method works as following: - * <ul> - * <li>For local caches it returns only local node ID.</li> - * <li> - * For fully replicated caches first node ID returned by {@link CacheAffinityFunction} - * is returned. - * </li> - * <li>For partitioned caches, primary node for the given key is returned.</li> - * </ul> - * - * @param key Keys to map to a node. - * @return Primary node for the key or {@code null} if there are no alive nodes for this cache. - */ - @Nullable public ClusterNode mapKeyToNode(K key); - - /** - * Gets primary and backup nodes for the key. Note that primary node is always - * first in the returned collection. - * - * @param key Key to get affinity nodes for. - * @return Collection of primary and backup nodes for the key with primary node - * always first. - */ - public Collection<ClusterNode> mapKeyToPrimaryAndBackups(K key); - - /** - * Gets primary node for the given partition. - * - * @param part Partition id. - * @return Primary node for the given partition. - * @see CacheAffinityFunction - * @see org.apache.ignite.configuration.CacheConfiguration#getAffinity() - * @see org.apache.ignite.configuration.CacheConfiguration#setAffinity(CacheAffinityFunction) - */ - public ClusterNode mapPartitionToNode(int part); - - /** - * Gets primary nodes for the given partitions. - * - * @param parts Partition ids. - * @return Mapping of given partitions to their primary nodes. - * @see CacheAffinityFunction - * @see org.apache.ignite.configuration.CacheConfiguration#getAffinity() - * @see org.apache.ignite.configuration.CacheConfiguration#setAffinity(CacheAffinityFunction) - */ - public Map<Integer, ClusterNode> mapPartitionsToNodes(Collection<Integer> parts); - - /** - * Gets primary and backup nodes for partition. Note that primary node is always - * first in the returned collection. - * - * @param part Partition to get affinity nodes for. - * @return Collection of primary and backup nodes for partition with primary node - * always first. - */ - public Collection<ClusterNode> mapPartitionToPrimaryAndBackups(int part); -}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4ba0b01a/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheAffinityFunction.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheAffinityFunction.java b/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheAffinityFunction.java deleted file mode 100644 index f7602db..0000000 --- a/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheAffinityFunction.java +++ /dev/null @@ -1,110 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.cache.affinity; - -import org.apache.ignite.cluster.*; - -import java.io.*; -import java.util.*; - -/** - * Cache key affinity which maps keys to nodes. This interface is utilized for - * both, replicated and partitioned caches. Cache affinity can be configured - * for individual caches via {@link org.apache.ignite.configuration.CacheConfiguration#getAffinity()} method. - * <p> - * Whenever a key is given to cache, it is first passed to a pluggable - * {@link CacheAffinityKeyMapper} which may potentially map this key to an alternate - * key which should be used for affinity. The key returned from - * {@link CacheAffinityKeyMapper#affinityKey(Object)} method is then passed to - * {@link #partition(Object) partition(Object)} method to find out the partition for the key. - * On each topology change, partition-to-node mapping is calculated using - * {@link #assignPartitions(CacheAffinityFunctionContext)} method, which assigns a collection - * of nodes to each partition. - * This collection of nodes is used for node affinity. In {@link org.apache.ignite.cache.CacheMode#REPLICATED REPLICATED} - * cache mode the key will be cached on all returned nodes; generally, all caching nodes - * participate in caching every key in replicated mode. In {@link org.apache.ignite.cache.CacheMode#PARTITIONED PARTITIONED} - * mode, only primary and backup nodes are returned with primary node always in the - * first position. So if there is {@code 1} backup node, then the returned collection will - * have {@code 2} nodes in it - {@code primary} node in first position, and {@code backup} - * node in second. - * <p> - * For more information about cache affinity and examples refer to {@link CacheAffinityKeyMapper} and - * {@link CacheAffinityKeyMapped @CacheAffinityKeyMapped} documentation. - * @see CacheAffinityKeyMapped - * @see CacheAffinityKeyMapper - */ -public interface CacheAffinityFunction extends Serializable { - /** - * Resets cache affinity to its initial state. This method will be called by - * the system any time the affinity has been sent to remote node where - * it has to be reinitialized. If your implementation of affinity function - * has no initialization logic, leave this method empty. - */ - public void reset(); - - /** - * Gets total number of partitions available. All caches should always provide - * correct partition count which should be the same on all participating nodes. - * Note that partitions should always be numbered from {@code 0} inclusively to - * {@code N} exclusively without any gaps. - * - * @return Total partition count. - */ - public int partitions(); - - /** - * Gets partition number for a given key starting from {@code 0}. Partitioned caches - * should make sure that keys are about evenly distributed across all partitions - * from {@code 0} to {@link #partitions() partition count} for best performance. - * <p> - * Note that for fully replicated caches it is possible to segment key sets among different - * grid node groups. In that case each node group should return a unique partition - * number. However, unlike partitioned cache, mappings of keys to nodes in - * replicated caches are constant and a node cannot migrate from one partition - * to another. - * - * @param key Key to get partition for. - * @return Partition number for a given key. - */ - public int partition(Object key); - - /** - * Gets affinity nodes for a partition. In case of replicated cache, all returned - * nodes are updated in the same manner. In case of partitioned cache, the returned - * list should contain only the primary and back up nodes with primary node being - * always first. - * <p> - * Note that partitioned affinity must obey the following contract: given that node - * <code>N</code> is primary for some key <code>K</code>, if any other node(s) leave - * grid and no node joins grid, node <code>N</code> will remain primary for key <code>K</code>. - * - * @param affCtx Affinity function context. Will provide all required information to calculate - * new partition assignments. - * @return Unmodifiable list indexed by partition number. Each element of array is a collection in which - * first node is a primary node and other nodes are backup nodes. - */ - public List<List<ClusterNode>> assignPartitions(CacheAffinityFunctionContext affCtx); - - /** - * Removes node from affinity. This method is called when it is safe to remove left node from - * affinity mapping. - * - * @param nodeId ID of node to remove. - */ - public void removeNode(UUID nodeId); -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4ba0b01a/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheAffinityFunctionContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheAffinityFunctionContext.java b/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheAffinityFunctionContext.java deleted file mode 100644 index fd1be95..0000000 --- a/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheAffinityFunctionContext.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.cache.affinity; - -import org.apache.ignite.cluster.*; -import org.apache.ignite.events.*; -import org.apache.ignite.internal.processors.affinity.*; -import org.jetbrains.annotations.*; - -import java.util.*; - -/** - * Affinity function context. This context is passed to {@link CacheAffinityFunction} for - * partition reassignment on every topology change event. - */ -public interface CacheAffinityFunctionContext { - /** - * Gets affinity assignment for given partition on previous topology version. First node in returned list is - * a primary node, other nodes are backups. - * - * @param part Partition to get previous assignment for. - * @return List of nodes assigned to given partition on previous topology version or {@code null} - * if this information is not available. - */ - @Nullable public List<ClusterNode> previousAssignment(int part); - - /** - * Gets number of backups for new assignment. - * - * @return Number of backups for new assignment. - */ - public int backups(); - - /** - * Gets current topology snapshot. Snapshot will contain only nodes on which particular cache is configured. - * List of passed nodes is guaranteed to be sorted in a same order on all nodes on which partition assignment - * is performed. - * - * @return Cache topology snapshot. - */ - public List<ClusterNode> currentTopologySnapshot(); - - /** - * Gets current topology version number. - * - * @return Current topology version number. - */ - public AffinityTopologyVersion currentTopologyVersion(); - - /** - * Gets discovery event caused topology change. - * - * @return Discovery event caused latest topology change or {@code null} if this information is - * not available. - */ - @Nullable public DiscoveryEvent discoveryEvent(); -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4ba0b01a/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheAffinityKey.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheAffinityKey.java b/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheAffinityKey.java deleted file mode 100644 index cd4a4ec..0000000 --- a/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheAffinityKey.java +++ /dev/null @@ -1,189 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.cache.affinity; - -import org.apache.ignite.internal.processors.cache.*; -import org.apache.ignite.internal.util.tostring.*; -import org.apache.ignite.internal.util.typedef.internal.*; - -import java.io.*; - -/** - * Optional wrapper for cache keys to provide support - * for custom affinity mapping. The value returned by - * {@link #affinityKey(Object)} method will be used for key-to-node - * affinity. - * <p> - * Note that the {@link #equals(Object)} and {@link #hashCode()} methods - * delegate directly to the wrapped cache key provided by {@link #key()} - * method. - * <p> - * This class is optional and does not have to be used. It only provides - * extra convenience whenever custom affinity mapping is required. Here is - * an example of how {@code Person} objects can be collocated with - * {@code Company} objects they belong to: - * <pre name="code" class="java"> - * Object personKey = new CacheAffinityKey(myPersonId, myCompanyId); - * - * // Both, the company and the person objects will be cached on the same node. - * cache.put(myCompanyId, new Company(..)); - * cache.put(personKey, new Person(..)); - * </pre> - * <p> - * For more information and examples of cache affinity refer to - * {@link CacheAffinityKeyMapper} and {@link CacheAffinityKeyMapped @CacheAffinityKeyMapped} - * documentation. - * @see CacheAffinityKeyMapped - * @see CacheAffinityKeyMapper - * @see CacheAffinityFunction - */ -public class CacheAffinityKey<K> implements Externalizable { - /** */ - private static final long serialVersionUID = 0L; - - /** Key. */ - @GridToStringInclude - private K key; - - /** Affinity key. */ - @GridToStringInclude - private Object affKey; - - /** - * Empty constructor. - */ - public CacheAffinityKey() { - // No-op. - } - - /** - * Initializes key wrapper for a given key. If affinity key - * is not initialized, then this key will be used for affinity. - * - * @param key Key. - */ - public CacheAffinityKey(K key) { - A.notNull(key, "key"); - - this.key = key; - } - - /** - * Initializes key together with its affinity key counter-part. - * - * @param key Key. - * @param affKey Affinity key. - */ - public CacheAffinityKey(K key, Object affKey) { - A.notNull(key, "key"); - - this.key = key; - this.affKey = affKey; - } - - /** - * Gets wrapped key. - * - * @return Wrapped key. - */ - public K key() { - return key; - } - - /** - * Sets wrapped key. - * - * @param key Wrapped key. - */ - public void key(K key) { - this.key = key; - } - - /** - * Gets affinity key to use for affinity mapping. If affinity key is not provided, - * then {@code key} value will be returned. - * <p> - * This method is annotated with {@link CacheAffinityKeyMapped} and will be picked up - * by {@link GridCacheDefaultAffinityKeyMapper} automatically. - * - * @return Affinity key to use for affinity mapping. - */ - @CacheAffinityKeyMapped - @SuppressWarnings({"unchecked"}) - public <T> T affinityKey() { - A.notNull(key, "key"); - - return (T)(affKey == null ? key : affKey); - } - - /** - * Sets affinity key to use for affinity mapping. If affinity key is not provided, - * then {@code key} value will be returned. - * - * @param affKey Affinity key to use for affinity mapping. - */ - public void affinityKey(Object affKey) { - this.affKey = affKey; - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - out.writeObject(key); - out.writeObject(affKey); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - key = (K)in.readObject(); - affKey = in.readObject(); - } - - /** - * Hash code implementation which delegates to the underlying {@link #key()}. Note, however, - * that different subclasses of {@code CacheAffinityKey} will produce different hash codes. - * <p> - * Users should override this method if different behavior is desired. - * - * @return Hash code. - */ - @Override public int hashCode() { - A.notNull(key, "key"); - - return 31 * key.hashCode() + getClass().getName().hashCode(); - } - - /** - * Equality check which delegates to the underlying key equality. Note, however, that - * different subclasses of {@code CacheAffinityKey} will never be equal. - * <p> - * Users should override this method if different behavior is desired. - * - * @param obj Object to check for equality. - * @return {@code True} if objects are equal. - */ - @Override public boolean equals(Object obj) { - A.notNull(key, "key"); - - return obj != null && getClass() == obj.getClass() && key.equals(((CacheAffinityKey)obj).key); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(CacheAffinityKey.class, this); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4ba0b01a/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheAffinityKeyMapped.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheAffinityKeyMapped.java b/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheAffinityKeyMapped.java deleted file mode 100644 index a26a9b0..0000000 --- a/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheAffinityKeyMapped.java +++ /dev/null @@ -1,110 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.cache.affinity; - -import java.lang.annotation.*; -import java.util.concurrent.*; - -/** - * Optional annotation to specify custom key-to-node affinity. Affinity key is a key - * which will be used to determine a node on which given cache key will be stored. This - * annotation allows to mark a field or a method in the cache key object that will be - * used as an affinity key (instead of the entire cache key object that is used for - * affinity by default). Note that a class can have only one field or method annotated - * with {@code @CacheAffinityKeyMapped} annotation. - * <p> - * One of the major use cases for this annotation is the routing of grid computations - * to the nodes where the data for this computation is cached, the concept - * otherwise known as {@code Collocation Of Computations And Data}. - * <p> - * <h1 class="header">Mapping Cache Keys</h1> - * The default implementation of {@link CacheAffinityKeyMapper}, which will be used - * if no explicit affinity mapper is specified in cache configuration, will first look - * for any field or method annotated with {@code @CacheAffinityKeyMapped} annotation. - * If such field or method is not found, then the cache key itself will be used for - * key-to-node affinity (this means that all objects with the same cache key will always - * be routed to the same node). If such field or method is found, then the value of this - * field or method will be used for key-to-node affinity. This allows to specify alternate - * affinity key, other than the cache key itself, whenever needed. - * <p> - * For example, if a {@code Person} object is always accessed together with a {@code Company} object - * for which this person is an employee, then for better performance and scalability it makes sense to - * collocate {@code Person} objects together with their {@code Company} object when storing them in - * cache. To achieve that, cache key used to cache {@code Person} objects should have a field or method - * annotated with {@code @CacheAffinityKeyMapped} annotation, which will provide the value of - * the company key for which that person works, like so: - * <pre name="code" class="java"> - * public class PersonKey { - * // Person ID used to identify a person. - * private String personId; - * - * // Company ID which will be used for affinity. - * @CacheAffinityKeyMapped - * private String companyId; - * ... - * } - * ... - * // Instantiate person keys. - * Object personKey1 = new PersonKey("myPersonId1", "myCompanyId"); - * Object personKey2 = new PersonKey("myPersonId2", "myCompanyId"); - * - * // Both, the company and the person objects will be cached on the same node. - * cache.put("myCompanyId", new Company(..)); - * cache.put(personKey1, new Person(..)); - * cache.put(personKey2, new Person(..)); - * </pre> - * <p> - * <h2 class="header">CacheAffinityKey</h2> - * For convenience, you can also optionally use {@link CacheAffinityKey} class. Here is how a - * {@code PersonKey} defined above would look using {@link CacheAffinityKey}: - * <pre name="code" class="java"> - * Object personKey1 = new CacheAffinityKey("myPersonId1", "myCompanyId"); - * Object personKey2 = new CacheAffinityKey("myPersonId2", "myCompanyId"); - * - * // Both, the company and the person objects will be cached on the same node. - * cache.put(myCompanyId, new Company(..)); - * cache.put(personKey1, new Person(..)); - * cache.put(personKey2, new Person(..)); - * </pre> - * <p> - * <h1 class="header">Collocating Computations And Data</h1> - * It is also possible to route computations to the nodes where the data is cached. This concept - * is otherwise known as {@code Collocation Of Computations And Data}. In this case, - * {@code @CacheAffinityKeyMapped} annotation allows to specify a routing affinity key for a - * {@link org.apache.ignite.compute.ComputeJob} or any other grid computation, such as {@link Runnable}, - * {@link Callable}, or {@link org.apache.ignite.lang.IgniteClosure}. It should be attached to a method or - * field that provides affinity key for the computation. Only one annotation per class is allowed. - * Whenever such annotation is detected, then {@link org.apache.ignite.spi.loadbalancing.LoadBalancingSpi} - * will be bypassed, and computation will be routed to the grid node where the specified affinity key is cached. - * <p> - * For more information about cache affinity also see {@link CacheAffinityKeyMapper} and - * {@link CacheAffinityFunction} documentation. - * Affinity for a key can be found from any node, regardless of whether it has cache started - * or not. If cache is not started, affinity function will be fetched from the remote node - * which does have the cache running. - * - * @see CacheAffinityFunction - * @see CacheAffinityKeyMapper - * @see CacheAffinityKey - */ -@Documented -@Retention(RetentionPolicy.RUNTIME) -@Target({ElementType.FIELD, ElementType.METHOD}) -public @interface CacheAffinityKeyMapped { - // No-op. -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4ba0b01a/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheAffinityKeyMapper.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheAffinityKeyMapper.java b/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheAffinityKeyMapper.java deleted file mode 100644 index 4812940..0000000 --- a/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheAffinityKeyMapper.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.cache.affinity; - -import java.io.*; - -/** - * Affinity mapper which maps cache key to an affinity key. Affinity key is a key which will be - * used to determine a node on which this key will be cached. Every cache key will first be passed - * through {@link #affinityKey(Object)} method, and the returned value of this method - * will be given to {@link CacheAffinityFunction} implementation to find out key-to-node affinity. - * <p> - * The default implementation, which will be used if no explicit affinity mapper is specified - * in cache configuration, will first look for any field or method annotated with - * {@link CacheAffinityKeyMapped @CacheAffinityKeyMapped} annotation. If such field or method - * is not found, then the cache key itself will be returned from {@link #affinityKey(Object) affinityKey(Object)} - * method (this means that all objects with the same cache key will always be routed to the same node). - * If such field or method is found, then the value of this field or method will be returned from - * {@link #affinityKey(Object) affinityKey(Object)} method. This allows to specify alternate affinity key, other - * than the cache key itself, whenever needed. - * <p> - * A custom (other than default) affinity mapper can be provided - * via {@link org.apache.ignite.configuration.CacheConfiguration#getAffinityMapper()} configuration property. - * <p> - * For more information on affinity mapping and examples refer to {@link CacheAffinityFunction} and - * {@link CacheAffinityKeyMapped @CacheAffinityKeyMapped} documentation. - * @see CacheAffinityFunction - * @see CacheAffinityKeyMapped - */ -public interface CacheAffinityKeyMapper extends Serializable { - /** - * Maps passed in key to an alternate key which will be used for node affinity. - * - * @param key Key to map. - * @return Key to be used for node-to-affinity mapping (may be the same - * key as passed in). - */ - public Object affinityKey(Object key); - - /** - * Resets cache affinity mapper to its initial state. This method will be called by - * the system any time the affinity mapper has been sent to remote node where - * it has to be reinitialized. If your implementation of affinity mapper - * has no initialization logic, leave this method empty. - */ - public void reset(); -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4ba0b01a/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheAffinityNodeAddressHashResolver.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheAffinityNodeAddressHashResolver.java b/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheAffinityNodeAddressHashResolver.java deleted file mode 100644 index 70f20ba..0000000 --- a/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheAffinityNodeAddressHashResolver.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.cache.affinity; - -import org.apache.ignite.cluster.*; -import org.apache.ignite.internal.util.typedef.internal.*; - -/** - * Node hash resolver which uses {@link org.apache.ignite.cluster.ClusterNode#consistentId()} as alternate hash value. - */ -public class CacheAffinityNodeAddressHashResolver implements CacheAffinityNodeHashResolver { - /** */ - private static final long serialVersionUID = 0L; - - /** {@inheritDoc} */ - @Override public Object resolve(ClusterNode node) { - return node.consistentId(); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(CacheAffinityNodeAddressHashResolver.class, this); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4ba0b01a/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheAffinityNodeHashResolver.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheAffinityNodeHashResolver.java b/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheAffinityNodeHashResolver.java deleted file mode 100644 index 79489cf..0000000 --- a/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheAffinityNodeHashResolver.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.cache.affinity; - -import org.apache.ignite.cluster.*; - -import java.io.*; - -/** - * Resolver which is used to provide node hash value for affinity function. - * <p> - * Node IDs constantly change when nodes get restarted, which causes affinity mapping to change between restarts, - * and hence causing redundant repartitioning. Providing an alternate node hash value, which survives node restarts, - * will help to map keys to the same nodes whenever possible. - * <p> - * Note that on case clients exist they will query this object from the server and use it for affinity calculation. - * Therefore you must ensure that server and clients can marshal and unmarshal this object in portable format, - * i.e. all parties have object class(es) configured as portable. - */ -public interface CacheAffinityNodeHashResolver extends Serializable { - /** - * Resolve alternate hash value for the given Grid node. - * - * @param node Grid node. - * @return Resolved hash ID. - */ - public Object resolve(ClusterNode node); -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4ba0b01a/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheAffinityNodeIdHashResolver.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheAffinityNodeIdHashResolver.java b/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheAffinityNodeIdHashResolver.java deleted file mode 100644 index 3de9220..0000000 --- a/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheAffinityNodeIdHashResolver.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.cache.affinity; - -import org.apache.ignite.cluster.*; -import org.apache.ignite.internal.util.typedef.internal.*; - -/** - * Node hash resolver which uses generated node ID as node hash value. As new node ID is generated - * on each node start, this resolver do not provide ability to map keys to the same nodes after restart. - */ -public class CacheAffinityNodeIdHashResolver implements CacheAffinityNodeHashResolver { - /** */ - private static final long serialVersionUID = 0L; - - /** {@inheritDoc} */ - @Override public Object resolve(ClusterNode node) { - return node.id(); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(CacheAffinityNodeIdHashResolver.class, this); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4ba0b01a/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheCentralizedAffinityFunction.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheCentralizedAffinityFunction.java b/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheCentralizedAffinityFunction.java deleted file mode 100644 index 0df8ffe..0000000 --- a/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheCentralizedAffinityFunction.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.cache.affinity; - -import java.lang.annotation.*; - -/** - * Annotation marker which identifies affinity function that must be calculated on one centralized node - * instead of independently on each node. In many cases it happens because it requires previous affinity state - * in order to calculate new one. - */ -@Retention(RetentionPolicy.RUNTIME) -@Target(ElementType.TYPE) -public @interface CacheCentralizedAffinityFunction { - -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4ba0b01a/modules/core/src/main/java/org/apache/ignite/cache/affinity/CentralizedAffinityFunction.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/affinity/CentralizedAffinityFunction.java b/modules/core/src/main/java/org/apache/ignite/cache/affinity/CentralizedAffinityFunction.java new file mode 100644 index 0000000..54e3cf6 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/cache/affinity/CentralizedAffinityFunction.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.affinity; + +import java.lang.annotation.*; + +/** + * Annotation marker which identifies affinity function that must be calculated on one centralized node + * instead of independently on each node. In many cases it happens because it requires previous affinity state + * in order to calculate new one. + */ +@Retention(RetentionPolicy.RUNTIME) +@Target(ElementType.TYPE) +public @interface CentralizedAffinityFunction { + +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4ba0b01a/modules/core/src/main/java/org/apache/ignite/cache/affinity/fair/CachePartitionFairAffinity.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/affinity/fair/CachePartitionFairAffinity.java b/modules/core/src/main/java/org/apache/ignite/cache/affinity/fair/CachePartitionFairAffinity.java deleted file mode 100644 index b4bd2e9..0000000 --- a/modules/core/src/main/java/org/apache/ignite/cache/affinity/fair/CachePartitionFairAffinity.java +++ /dev/null @@ -1,777 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.cache.affinity.fair; - -import org.apache.ignite.cache.affinity.*; -import org.apache.ignite.cluster.*; -import org.apache.ignite.configuration.CacheConfiguration; -import org.apache.ignite.events.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; - -import java.io.*; -import java.util.*; - -/** - * Fair affinity function which tries to ensure that all nodes get equal number of partitions with - * minimum amount of reassignments between existing nodes. - * <p> - * Cache affinity can be configured for individual caches via {@link CacheConfiguration#getAffinity()} method. - */ -@CacheCentralizedAffinityFunction -public class CachePartitionFairAffinity implements CacheAffinityFunction { - /** Default partition count. */ - public static final int DFLT_PART_CNT = 256; - - /** */ - private static final long serialVersionUID = 0L; - - /** Ascending comparator. */ - private static final Comparator<PartitionSet> ASC_CMP = new PartitionSetComparator(); - - /** Descending comparator. */ - private static final Comparator<PartitionSet> DESC_CMP = Collections.reverseOrder(ASC_CMP); - - /** */ - private final int parts; - - /** - * Creates fair affinity with default partition count. - */ - public CachePartitionFairAffinity() { - this(DFLT_PART_CNT); - } - - /** - * @param parts Number of partitions. - */ - public CachePartitionFairAffinity(int parts) { - this.parts = parts; - } - - /** {@inheritDoc} */ - @Override public List<List<ClusterNode>> assignPartitions(CacheAffinityFunctionContext ctx) { - List<ClusterNode> topSnapshot = ctx.currentTopologySnapshot(); - - if (topSnapshot.size() == 1) { - ClusterNode primary = topSnapshot.get(0); - - return Collections.nCopies(parts, Collections.singletonList(primary)); - } - - List<List<ClusterNode>> assignment = createCopy(ctx); - - int tiers = Math.min(ctx.backups() + 1, topSnapshot.size()); - - // Per tier pending partitions. - Map<Integer, Queue<Integer>> pendingParts = new HashMap<>(); - - FullAssignmentMap fullMap = new FullAssignmentMap(tiers, assignment, topSnapshot); - - for (int tier = 0; tier < tiers; tier++) { - // Check if this is a new tier and add pending partitions. - Queue<Integer> pending = pendingParts.get(tier); - - for (int part = 0; part < parts; part++) { - if (fullMap.assignments.get(part).size() < tier + 1) { - if (pending == null) { - pending = new LinkedList<>(); - - pendingParts.put(tier, pending); - } - - if (!pending.contains(part)) - pending.add(part); - - } - } - - // Assign pending partitions, if any. - assignPending(tier, pendingParts, fullMap, topSnapshot); - - // Balance assignments. - balance(tier, pendingParts, fullMap, topSnapshot); - } - - return fullMap.assignments; - } - - /** {@inheritDoc} */ - @Override public void reset() { - // No-op. - } - - /** {@inheritDoc} */ - @Override public int partitions() { - return parts; - } - - /** {@inheritDoc} */ - @Override public int partition(Object key) { - return U.safeAbs(hash(key.hashCode())) % parts; - } - - /** {@inheritDoc} */ - @Override public void removeNode(UUID nodeId) { - // No-op. - } - - /** - * Assigns pending (unassigned) partitions to nodes. - * - * @param tier Tier to assign (0 is primary, 1 - 1st backup,...). - * @param pendingMap Pending partitions per tier. - * @param fullMap Full assignment map to modify. - * @param topSnapshot Topology snapshot. - */ - private void assignPending(int tier, Map<Integer, Queue<Integer>> pendingMap, FullAssignmentMap fullMap, - List<ClusterNode> topSnapshot) { - Queue<Integer> pending = pendingMap.get(tier); - - if (F.isEmpty(pending)) - return; - - int idealPartCnt = parts / topSnapshot.size(); - - Map<UUID, PartitionSet> tierMapping = fullMap.tierMapping(tier); - - PrioritizedPartitionMap underloadedNodes = filterNodes(tierMapping, idealPartCnt, false); - - // First iterate over underloaded nodes. - assignPendingToUnderloaded(tier, pendingMap, fullMap, underloadedNodes, topSnapshot, false); - - if (!pending.isEmpty() && !underloadedNodes.isEmpty()) { - // Same, forcing updates. - assignPendingToUnderloaded(tier, pendingMap, fullMap, underloadedNodes, topSnapshot, true); - } - - if (!pending.isEmpty()) - assignPendingToNodes(tier, pendingMap, fullMap, topSnapshot); - - assert pending.isEmpty(); - - pendingMap.remove(tier); - } - - /** - * Assigns pending partitions to underloaded nodes. - * - * @param tier Tier to assign. - * @param pendingMap Pending partitions per tier. - * @param fullMap Full assignment map to modify. - * @param underloadedNodes Underloaded nodes. - * @param topSnapshot Topology snapshot. - * @param force {@code True} if partitions should be moved. - */ - private void assignPendingToUnderloaded( - int tier, - Map<Integer, Queue<Integer>> pendingMap, - FullAssignmentMap fullMap, - PrioritizedPartitionMap underloadedNodes, - Collection<ClusterNode> topSnapshot, - boolean force) { - Iterator<Integer> it = pendingMap.get(tier).iterator(); - - int ideal = parts / topSnapshot.size(); - - while (it.hasNext()) { - int part = it.next(); - - for (PartitionSet set : underloadedNodes.assignments()) { - ClusterNode node = set.node(); - - assert node != null; - - if (fullMap.assign(part, tier, node, force, pendingMap)) { - // We could add partition to partition map without forcing, remove partition from pending. - it.remove(); - - if (set.size() <= ideal) - underloadedNodes.remove(set.nodeId()); - else - underloadedNodes.update(); - - break; // for, continue to the next partition. - } - } - - if (underloadedNodes.isEmpty()) - return; - } - } - - /** - * Spreads pending partitions equally to all nodes in topology snapshot. - * - * @param tier Tier to assign. - * @param pendingMap Pending partitions per tier. - * @param fullMap Full assignment map to modify. - * @param topSnapshot Topology snapshot. - */ - private void assignPendingToNodes(int tier, Map<Integer, Queue<Integer>> pendingMap, - FullAssignmentMap fullMap, List<ClusterNode> topSnapshot) { - Iterator<Integer> it = pendingMap.get(tier).iterator(); - - int idx = 0; - - while (it.hasNext()) { - int part = it.next(); - - int i = idx; - - boolean assigned = false; - - do { - ClusterNode node = topSnapshot.get(i); - - if (fullMap.assign(part, tier, node, false, pendingMap)) { - it.remove(); - - assigned = true; - } - - i = (i + 1) % topSnapshot.size(); - - if (assigned) - idx = i; - } while (i != idx); - - if (!assigned) { - do { - ClusterNode node = topSnapshot.get(i); - - if (fullMap.assign(part, tier, node, true, pendingMap)) { - it.remove(); - - assigned = true; - } - - i = (i + 1) % topSnapshot.size(); - - if (assigned) - idx = i; - } while (i != idx); - } - - if (!assigned) - throw new IllegalStateException("Failed to find assignable node for partition."); - } - } - - /** - * Tries to balance assignments between existing nodes in topology. - * - * @param tier Tier to assign. - * @param pendingParts Pending partitions per tier. - * @param fullMap Full assignment map to modify. - * @param topSnapshot Topology snapshot. - */ - private void balance(int tier, Map<Integer, Queue<Integer>> pendingParts, FullAssignmentMap fullMap, - Collection<ClusterNode> topSnapshot) { - int idealPartCnt = parts / topSnapshot.size(); - - Map<UUID, PartitionSet> mapping = fullMap.tierMapping(tier); - - PrioritizedPartitionMap underloadedNodes = filterNodes(mapping, idealPartCnt, false); - PrioritizedPartitionMap overloadedNodes = filterNodes(mapping, idealPartCnt, true); - - do { - boolean retry = false; - - for (PartitionSet overloaded : overloadedNodes.assignments()) { - for (Integer part : overloaded.partitions()) { - boolean assigned = false; - - for (PartitionSet underloaded : underloadedNodes.assignments()) { - if (fullMap.assign(part, tier, underloaded.node(), false, pendingParts)) { - // Size of partition sets has changed. - if (overloaded.size() <= idealPartCnt) - overloadedNodes.remove(overloaded.nodeId()); - else - overloadedNodes.update(); - - if (underloaded.size() >= idealPartCnt) - underloadedNodes.remove(underloaded.nodeId()); - else - underloadedNodes.update(); - - assigned = true; - - retry = true; - - break; - } - } - - if (!assigned) { - for (PartitionSet underloaded : underloadedNodes.assignments()) { - if (fullMap.assign(part, tier, underloaded.node(), true, pendingParts)) { - // Size of partition sets has changed. - if (overloaded.size() <= idealPartCnt) - overloadedNodes.remove(overloaded.nodeId()); - else - overloadedNodes.update(); - - if (underloaded.size() >= idealPartCnt) - underloadedNodes.remove(underloaded.nodeId()); - else - underloadedNodes.update(); - - retry = true; - - break; - } - } - } - - if (retry) - break; // for part. - } - - if (retry) - break; // for overloaded. - } - - if (!retry) - break; - } - while (true); - } - - /** - * Constructs underloaded or overloaded partition map. - * - * @param mapping Mapping to filter. - * @param idealPartCnt Ideal number of partitions per node. - * @param overloaded {@code True} if should create overloaded map, {@code false} for underloaded. - * @return Prioritized partition map. - */ - private PrioritizedPartitionMap filterNodes(Map<UUID, PartitionSet> mapping, int idealPartCnt, boolean overloaded) { - assert mapping != null; - - PrioritizedPartitionMap res = new PrioritizedPartitionMap(overloaded ? DESC_CMP : ASC_CMP); - - for (PartitionSet set : mapping.values()) { - if ((overloaded && set.size() > idealPartCnt) || (!overloaded && set.size() < idealPartCnt)) - res.add(set); - } - - return res; - } - - /** - * Creates copy of previous partition assignment. - * - * @param ctx Affinity function context. - * @return Assignment copy and per node partition map. - */ - private List<List<ClusterNode>> createCopy(CacheAffinityFunctionContext ctx) { - DiscoveryEvent discoEvt = ctx.discoveryEvent(); - - UUID leftNodeId = (discoEvt == null || discoEvt.type() == EventType.EVT_NODE_JOINED) - ? null - : discoEvt.eventNode().id(); - - List<List<ClusterNode>> cp = new ArrayList<>(parts); - - for (int part = 0; part < parts; part++) { - List<ClusterNode> partNodes = ctx.previousAssignment(part); - - List<ClusterNode> partNodesCp; - - if (partNodes == null) - partNodesCp = new ArrayList<>(); - else { - if (leftNodeId == null) { - partNodesCp = new ArrayList<>(partNodes.size() + 1); // Node joined. - - partNodesCp.addAll(partNodes); - } - else { - partNodesCp = new ArrayList<>(partNodes.size()); - - for (ClusterNode affNode : partNodes) { - if (!affNode.id().equals(leftNodeId)) - partNodesCp.add(affNode); - } - } - } - - cp.add(partNodesCp); - } - - return cp; - } - - /** - * - */ - private static class PartitionSetComparator implements Comparator<PartitionSet>, Serializable { - /** */ - private static final long serialVersionUID = 0L; - - /** {@inheritDoc} */ - @Override public int compare(PartitionSet o1, PartitionSet o2) { - return Integer.compare(o1.parts.size(), o2.parts.size()); - } - } - - /** - * Prioritized partition map. Ordered structure in which nodes are ordered in ascending or descending order - * by number of partitions assigned to a node. - */ - private static class PrioritizedPartitionMap { - /** Comparator. */ - private Comparator<PartitionSet> cmp; - - /** Assignment map. */ - private Map<UUID, PartitionSet> assignmentMap = new HashMap<>(); - - /** Assignment list, ordered according to comparator. */ - private List<PartitionSet> assignmentList = new ArrayList<>(); - - /** - * @param cmp Comparator. - */ - private PrioritizedPartitionMap(Comparator<PartitionSet> cmp) { - this.cmp = cmp; - } - - /** - * @param set Partition set to add. - */ - public void add(PartitionSet set) { - PartitionSet old = assignmentMap.put(set.nodeId(), set); - - if (old == null) { - assignmentList.add(set); - - update(); - } - } - - /** - * Sorts assignment list. - */ - public void update() { - Collections.sort(assignmentList, cmp); - } - - /** - * @return Sorted assignment list. - */ - public List<PartitionSet> assignments() { - return assignmentList; - } - - /** - * @param uuid Uuid. - */ - public void remove(UUID uuid) { - PartitionSet rmv = assignmentMap.remove(uuid); - - assignmentList.remove(rmv); - } - - /** - * - */ - public boolean isEmpty() { - return assignmentList.isEmpty(); - } - } - - /** - * Constructs assignment map for specified tier. - * - * @param tier Tier number, -1 for all tiers altogether. - * @param assignment Assignment to construct map from. - * @param topSnapshot Topology snapshot. - * @return Assignment map. - */ - private static Map<UUID, PartitionSet> assignments(int tier, List<List<ClusterNode>> assignment, - Collection<ClusterNode> topSnapshot) { - Map<UUID, PartitionSet> tmp = new LinkedHashMap<>(); - - for (int part = 0; part < assignment.size(); part++) { - List<ClusterNode> nodes = assignment.get(part); - - assert nodes instanceof RandomAccess; - - if (nodes.size() <= tier) - continue; - - int start = tier < 0 ? 0 : tier; - int end = tier < 0 ? nodes.size() : tier + 1; - - for (int i = start; i < end; i++) { - ClusterNode n = nodes.get(i); - - PartitionSet set = tmp.get(n.id()); - - if (set == null) { - set = new PartitionSet(n); - - tmp.put(n.id(), set); - } - - set.add(part); - } - } - - if (tmp.size() < topSnapshot.size()) { - for (ClusterNode node : topSnapshot) { - if (!tmp.containsKey(node.id())) - tmp.put(node.id(), new PartitionSet(node)); - } - } - - return tmp; - } - - /** - * Full assignment map. Auxiliary data structure which maintains resulting assignment and temporary - * maps consistent. - */ - @SuppressWarnings("unchecked") - private static class FullAssignmentMap { - /** Per-tier assignment maps. */ - private Map<UUID, PartitionSet>[] tierMaps; - - /** Full assignment map. */ - private Map<UUID, PartitionSet> fullMap; - - /** Resulting assignment. */ - private List<List<ClusterNode>> assignments; - - /** - * @param tiers Number of tiers. - * @param assignments Assignments to modify. - * @param topSnapshot Topology snapshot. - */ - private FullAssignmentMap(int tiers, List<List<ClusterNode>> assignments, Collection<ClusterNode> topSnapshot) { - this.assignments = assignments; - - tierMaps = new Map[tiers]; - - for (int tier = 0; tier < tiers; tier++) - tierMaps[tier] = assignments(tier, assignments, topSnapshot); - - fullMap = assignments(-1, assignments, topSnapshot); - } - - /** - * Tries to assign partition to given node on specified tier. If force is false, assignment will succeed - * only if this partition is not already assigned to a node. If force is true, then assignment will succeed - * only if partition is not assigned to a tier with number less than passed in. Assigned partition from - * greater tier will be moved to pending queue. - * - * @param part Partition to assign. - * @param tier Tier number to assign. - * @param node Node to move partition to. - * @param force Force flag. - * @param pendingParts per tier pending partitions map. - * @return {@code True} if assignment succeeded. - */ - boolean assign(int part, int tier, ClusterNode node, boolean force, Map<Integer, Queue<Integer>> pendingParts) { - UUID nodeId = node.id(); - - if (!fullMap.get(nodeId).contains(part)) { - tierMaps[tier].get(nodeId).add(part); - - fullMap.get(nodeId).add(part); - - List<ClusterNode> assignment = assignments.get(part); - - if (assignment.size() <= tier) - assignment.add(node); - else { - ClusterNode oldNode = assignment.set(tier, node); - - if (oldNode != null) { - UUID oldNodeId = oldNode.id(); - - tierMaps[tier].get(oldNodeId).remove(part); - fullMap.get(oldNodeId).remove(part); - } - } - - return true; - } - else if (force) { - assert !tierMaps[tier].get(nodeId).contains(part); - - // Check previous tiers first. - for (int t = 0; t < tier; t++) { - if (tierMaps[t].get(nodeId).contains(part)) - return false; - } - - // Partition is on some lower tier, switch it. - for (int t = tier + 1; t < tierMaps.length; t++) { - if (tierMaps[t].get(nodeId).contains(part)) { - ClusterNode oldNode = assignments.get(part).get(tier); - - // Move partition from level t to tier. - assignments.get(part).set(tier, node); - assignments.get(part).set(t, null); - - if (oldNode != null) { - tierMaps[tier].get(oldNode.id()).remove(part); - fullMap.get(oldNode.id()).remove(part); - } - - tierMaps[tier].get(nodeId).add(part); - tierMaps[t].get(nodeId).remove(part); - - Queue<Integer> pending = pendingParts.get(t); - - if (pending == null) { - pending = new LinkedList<>(); - - pendingParts.put(t, pending); - } - - pending.add(part); - - return true; - } - } - - throw new IllegalStateException("Unable to assign partition to node while force is true."); - } - - // !force. - return false; - } - - /** - * Gets tier mapping. - * - * @param tier Tier to get mapping. - * @return Per node map. - */ - public Map<UUID, PartitionSet> tierMapping(int tier) { - return tierMaps[tier]; - } - } - - /** - * Applies a supplemental hash function to a given hashCode, which - * defends against poor quality hash functions. - * - * @param h Hash code. - * @return Enhanced hash code. - */ - private static int hash(int h) { - // Spread bits to regularize both segment and index locations, - // using variant of single-word Wang/Jenkins hash. - h += (h << 15) ^ 0xffffcd7d; - h ^= (h >>> 10); - h += (h << 3); - h ^= (h >>> 6); - h += (h << 2) + (h << 14); - return h ^ (h >>> 16); - } - - /** - * - */ - @SuppressWarnings("ComparableImplementedButEqualsNotOverridden") - private static class PartitionSet { - /** */ - private ClusterNode node; - - /** Partitions. */ - private Collection<Integer> parts = new LinkedList<>(); - - /** - * @param node Node. - */ - private PartitionSet(ClusterNode node) { - this.node = node; - } - - /** - * @return Node. - */ - private ClusterNode node() { - return node; - } - - /** - * @return Node ID. - */ - private UUID nodeId() { - return node.id(); - } - - /** - * @return Partition set size. - */ - private int size() { - return parts.size(); - } - - /** - * Adds partition to partition set. - * - * @param part Partition to add. - * @return {@code True} if partition was added, {@code false} if partition already exists. - */ - private boolean add(int part) { - if (!parts.contains(part)) { - parts.add(part); - - return true; - } - - return false; - } - - /** - * @param part Partition to remove. - */ - private void remove(Integer part) { - parts.remove(part); // Remove object, not index. - } - - /** - * @return Partitions. - */ - @SuppressWarnings("TypeMayBeWeakened") - private Collection<Integer> partitions() { - return parts; - } - - /** - * Checks if partition set contains given partition. - * - * @param part Partition to check. - * @return {@code True} if partition set contains given partition. - */ - private boolean contains(int part) { - return parts.contains(part); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return "PartSet [nodeId=" + node.id() + ", size=" + parts.size() + ", parts=" + parts + ']'; - } - } -}