http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4ba0b01a/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheAffinity.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheAffinity.java
 
b/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheAffinity.java
deleted file mode 100644
index 873daf1..0000000
--- 
a/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheAffinity.java
+++ /dev/null
@@ -1,246 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.cache.affinity;
-
-import org.apache.ignite.cluster.*;
-import org.jetbrains.annotations.*;
-
-import java.util.*;
-
-/**
- * Provides affinity information to detect which node is primary and which 
nodes are
- * backups for a partitioned cache. You can get an instance of this interface 
by calling
- * {@code Cache.affinity()} method.
- * <p>
- * Mapping of a key to a node is a three-step operation. First step will get 
an affinity key for given key
- * using {@link CacheAffinityKeyMapper}. If mapper is not specified, the 
original key will be used. Second step
- * will map affinity key to partition using {@link 
CacheAffinityFunction#partition(Object)} method. Third step
- * will map obtained partition to nodes for current grid topology version.
- * <p>
- * Interface provides various {@code 'mapKeysToNodes(..)'} methods which 
provide node affinity mapping for
- * given keys. All {@code 'mapKeysToNodes(..)'} methods are not transactional 
and will not enlist
- * keys into ongoing transaction.
- */
-public interface CacheAffinity<K> {
-    /**
-     * Gets number of partitions in cache according to configured affinity 
function.
-     *
-     * @return Number of cache partitions.
-     * @see CacheAffinityFunction
-     * @see org.apache.ignite.configuration.CacheConfiguration#getAffinity()
-     * @see 
org.apache.ignite.configuration.CacheConfiguration#setAffinity(CacheAffinityFunction)
-     */
-    public int partitions();
-
-    /**
-     * Gets partition id for the given key.
-     *
-     * @param key Key to get partition id for.
-     * @return Partition id.
-     * @see CacheAffinityFunction
-     * @see org.apache.ignite.configuration.CacheConfiguration#getAffinity()
-     * @see 
org.apache.ignite.configuration.CacheConfiguration#setAffinity(CacheAffinityFunction)
-     */
-    public int partition(K key);
-
-    /**
-     * Returns {@code true} if given node is the primary node for given key.
-     *
-     * @param n Node to check.
-     * @param key Key to check.
-     * @return {@code True} if local node is the primary node for given key.
-     */
-    public boolean isPrimary(ClusterNode n, K key);
-
-    /**
-     * Returns {@code true} if local node is one of the backup nodes for given 
key.
-     *
-     * @param n Node to check.
-     * @param key Key to check.
-     * @return {@code True} if local node is one of the backup nodes for given 
key.
-     */
-    public boolean isBackup(ClusterNode n, K key);
-
-    /**
-     * Returns {@code true} if local node is primary or one of the backup nodes
-     * <p>
-     * This method is essentially equivalent to calling
-     * <i>"{@link #isPrimary(org.apache.ignite.cluster.ClusterNode, Object)} ||
-     *      {@link #isBackup(org.apache.ignite.cluster.ClusterNode, 
Object)})"</i>,
-     * however it is more efficient as it makes both checks at once.
-     *
-     * @param n Node to check.
-     * @param key Key to check.
-     * @return {@code True} if local node is primary or backup for given key.
-     */
-    public boolean isPrimaryOrBackup(ClusterNode n, K key);
-
-    /**
-     * Gets partition ids for which nodes of the given projection has primary
-     * ownership.
-     * <p>
-     * Note that since {@link org.apache.ignite.cluster.ClusterNode} 
implements {@link org.apache.ignite.cluster.ClusterGroup},
-     * to find out primary partitions for a single node just pass
-     * a single node into this method.
-     * <p>
-     * This method may return an empty array if none of nodes in the projection
-     * have nearOnly disabled.
-     *
-     * @param n Grid node.
-     * @return Partition ids for which given projection has primary ownership.
-     * @see CacheAffinityFunction
-     * @see org.apache.ignite.configuration.CacheConfiguration#getAffinity()
-     * @see 
org.apache.ignite.configuration.CacheConfiguration#setAffinity(CacheAffinityFunction)
-     */
-    public int[] primaryPartitions(ClusterNode n);
-
-    /**
-     * Gets partition ids for which nodes of the given projection has backup
-     * ownership. Note that you can find a back up at a certain level, e.g.
-     * {@code first} backup or {@code third} backup by specifying the
-     * {@code 'levels} parameter. If no {@code 'level'} is specified then
-     * all backup partitions are returned.
-     * <p>
-     * Note that since {@link org.apache.ignite.cluster.ClusterNode} 
implements {@link org.apache.ignite.cluster.ClusterGroup},
-     * to find out backup partitions for a single node, just pass that single
-     * node into this method.
-     * <p>
-     * This method may return an empty array if none of nodes in the projection
-     * have nearOnly disabled.
-     *
-     * @param n Grid node.
-     * @return Partition ids for which given projection has backup ownership.
-     * @see CacheAffinityFunction
-     * @see org.apache.ignite.configuration.CacheConfiguration#getAffinity()
-     * @see 
org.apache.ignite.configuration.CacheConfiguration#setAffinity(CacheAffinityFunction)
-     */
-    public int[] backupPartitions(ClusterNode n);
-
-    /**
-     * Gets partition ids for which nodes of the given projection has ownership
-     * (either primary or backup).
-     * <p>
-     * Note that since {@link org.apache.ignite.cluster.ClusterNode} 
implements {@link org.apache.ignite.cluster.ClusterGroup},
-     * to find out all partitions for a single node, just pass that single
-     * node into this method.
-     * <p>
-     * This method may return an empty array if none of nodes in the projection
-     * have nearOnly disabled.
-     *
-     * @param n Grid node.
-     * @return Partition ids for which given projection has ownership.
-     * @see CacheAffinityFunction
-     * @see org.apache.ignite.configuration.CacheConfiguration#getAffinity()
-     * @see 
org.apache.ignite.configuration.CacheConfiguration#setAffinity(CacheAffinityFunction)
-     */
-    public int[] allPartitions(ClusterNode n);
-
-    /**
-     * Maps passed in key to a key which will be used for node affinity. The 
affinity
-     * key may be different from actual key if some field in the actual key was
-     * designated for affinity mapping via {@link CacheAffinityKeyMapped} 
annotation
-     * or if a custom {@link CacheAffinityKeyMapper} was configured.
-     *
-     * @param key Key to map.
-     * @return Key to be used for node-to-affinity mapping (may be the same
-     *      key as passed in).
-     */
-    public Object affinityKey(K key);
-
-    /**
-     * This method provides ability to detect which keys are mapped to which 
nodes.
-     * Use it to determine which nodes are storing which keys prior to sending
-     * jobs that access these keys.
-     * <p>
-     * This method works as following:
-     * <ul>
-     * <li>For local caches it returns only local node mapped to all keys.</li>
-     * <li>
-     *      For fully replicated caches {@link CacheAffinityFunction} is
-     *      used to determine which keys are mapped to which nodes.
-     * </li>
-     * <li>For partitioned caches, the returned map represents node-to-key 
affinity.</li>
-     * </ul>
-     *
-     * @param keys Keys to map to nodes.
-     * @return Map of nodes to keys or empty map if there are no alive nodes 
for this cache.
-     */
-    public Map<ClusterNode, Collection<K>> mapKeysToNodes(@Nullable 
Collection<? extends K> keys);
-
-    /**
-     * This method provides ability to detect to which primary node the given 
key
-     * is mapped. Use it to determine which nodes are storing which keys prior 
to sending
-     * jobs that access these keys.
-     * <p>
-     * This method works as following:
-     * <ul>
-     * <li>For local caches it returns only local node ID.</li>
-     * <li>
-     *      For fully replicated caches first node ID returned by {@link 
CacheAffinityFunction}
-     *      is returned.
-     * </li>
-     * <li>For partitioned caches, primary node for the given key is 
returned.</li>
-     * </ul>
-     *
-     * @param key Keys to map to a node.
-     * @return Primary node for the key or {@code null} if there are no alive 
nodes for this cache.
-     */
-    @Nullable public ClusterNode mapKeyToNode(K key);
-
-    /**
-     * Gets primary and backup nodes for the key. Note that primary node is 
always
-     * first in the returned collection.
-     *
-     * @param key Key to get affinity nodes for.
-     * @return Collection of primary and backup nodes for the key with primary 
node
-     *      always first.
-     */
-    public Collection<ClusterNode> mapKeyToPrimaryAndBackups(K key);
-
-    /**
-     * Gets primary node for the given partition.
-     *
-     * @param part Partition id.
-     * @return Primary node for the given partition.
-     * @see CacheAffinityFunction
-     * @see org.apache.ignite.configuration.CacheConfiguration#getAffinity()
-     * @see 
org.apache.ignite.configuration.CacheConfiguration#setAffinity(CacheAffinityFunction)
-     */
-    public ClusterNode mapPartitionToNode(int part);
-
-    /**
-     * Gets primary nodes for the given partitions.
-     *
-     * @param parts Partition ids.
-     * @return Mapping of given partitions to their primary nodes.
-     * @see CacheAffinityFunction
-     * @see org.apache.ignite.configuration.CacheConfiguration#getAffinity()
-     * @see 
org.apache.ignite.configuration.CacheConfiguration#setAffinity(CacheAffinityFunction)
-     */
-    public Map<Integer, ClusterNode> mapPartitionsToNodes(Collection<Integer> 
parts);
-
-    /**
-     * Gets primary and backup nodes for partition. Note that primary node is 
always
-     * first in the returned collection.
-     *
-     * @param part Partition to get affinity nodes for.
-     * @return Collection of primary and backup nodes for partition with 
primary node
-     *      always first.
-     */
-    public Collection<ClusterNode> mapPartitionToPrimaryAndBackups(int part);
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4ba0b01a/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheAffinityFunction.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheAffinityFunction.java
 
b/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheAffinityFunction.java
deleted file mode 100644
index f7602db..0000000
--- 
a/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheAffinityFunction.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.cache.affinity;
-
-import org.apache.ignite.cluster.*;
-
-import java.io.*;
-import java.util.*;
-
-/**
- * Cache key affinity which maps keys to nodes. This interface is utilized for
- * both, replicated and partitioned caches. Cache affinity can be configured
- * for individual caches via {@link 
org.apache.ignite.configuration.CacheConfiguration#getAffinity()} method.
- * <p>
- * Whenever a key is given to cache, it is first passed to a pluggable
- * {@link CacheAffinityKeyMapper} which may potentially map this key to an 
alternate
- * key which should be used for affinity. The key returned from
- * {@link CacheAffinityKeyMapper#affinityKey(Object)} method is then passed to
- * {@link #partition(Object) partition(Object)} method to find out the 
partition for the key.
- * On each topology change, partition-to-node mapping is calculated using
- * {@link #assignPartitions(CacheAffinityFunctionContext)} method, which 
assigns a collection
- * of nodes to each partition.
- * This collection of nodes is used for node affinity. In {@link 
org.apache.ignite.cache.CacheMode#REPLICATED REPLICATED}
- * cache mode the key will be cached on all returned nodes; generally, all 
caching nodes
- * participate in caching every key in replicated mode. In {@link 
org.apache.ignite.cache.CacheMode#PARTITIONED PARTITIONED}
- * mode, only primary and backup nodes are returned with primary node always 
in the
- * first position. So if there is {@code 1} backup node, then the returned 
collection will
- * have {@code 2} nodes in it - {@code primary} node in first position, and 
{@code backup}
- * node in second.
- * <p>
- * For more information about cache affinity and examples refer to {@link 
CacheAffinityKeyMapper} and
- * {@link CacheAffinityKeyMapped @CacheAffinityKeyMapped} documentation.
- * @see CacheAffinityKeyMapped
- * @see CacheAffinityKeyMapper
- */
-public interface CacheAffinityFunction extends Serializable {
-    /**
-     * Resets cache affinity to its initial state. This method will be called 
by
-     * the system any time the affinity has been sent to remote node where
-     * it has to be reinitialized. If your implementation of affinity function
-     * has no initialization logic, leave this method empty.
-     */
-    public void reset();
-
-    /**
-     * Gets total number of partitions available. All caches should always 
provide
-     * correct partition count which should be the same on all participating 
nodes.
-     * Note that partitions should always be numbered from {@code 0} 
inclusively to
-     * {@code N} exclusively without any gaps.
-     *
-     * @return Total partition count.
-     */
-    public int partitions();
-
-    /**
-     * Gets partition number for a given key starting from {@code 0}. 
Partitioned caches
-     * should make sure that keys are about evenly distributed across all 
partitions
-     * from {@code 0} to {@link #partitions() partition count} for best 
performance.
-     * <p>
-     * Note that for fully replicated caches it is possible to segment key 
sets among different
-     * grid node groups. In that case each node group should return a unique 
partition
-     * number. However, unlike partitioned cache, mappings of keys to nodes in
-     * replicated caches are constant and a node cannot migrate from one 
partition
-     * to another.
-     *
-     * @param key Key to get partition for.
-     * @return Partition number for a given key.
-     */
-    public int partition(Object key);
-
-    /**
-     * Gets affinity nodes for a partition. In case of replicated cache, all 
returned
-     * nodes are updated in the same manner. In case of partitioned cache, the 
returned
-     * list should contain only the primary and back up nodes with primary 
node being
-     * always first.
-     * <p>
-     * Note that partitioned affinity must obey the following contract: given 
that node
-     * <code>N</code> is primary for some key <code>K</code>, if any other 
node(s) leave
-     * grid and no node joins grid, node <code>N</code> will remain primary 
for key <code>K</code>.
-     *
-     * @param affCtx Affinity function context. Will provide all required 
information to calculate
-     *      new partition assignments.
-     * @return Unmodifiable list indexed by partition number. Each element of 
array is a collection in which
-     *      first node is a primary node and other nodes are backup nodes.
-     */
-    public List<List<ClusterNode>> 
assignPartitions(CacheAffinityFunctionContext affCtx);
-
-    /**
-     * Removes node from affinity. This method is called when it is safe to 
remove left node from
-     * affinity mapping.
-     *
-     * @param nodeId ID of node to remove.
-     */
-    public void removeNode(UUID nodeId);
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4ba0b01a/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheAffinityFunctionContext.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheAffinityFunctionContext.java
 
b/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheAffinityFunctionContext.java
deleted file mode 100644
index fd1be95..0000000
--- 
a/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheAffinityFunctionContext.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.cache.affinity;
-
-import org.apache.ignite.cluster.*;
-import org.apache.ignite.events.*;
-import org.apache.ignite.internal.processors.affinity.*;
-import org.jetbrains.annotations.*;
-
-import java.util.*;
-
-/**
- * Affinity function context. This context is passed to {@link 
CacheAffinityFunction} for
- * partition reassignment on every topology change event.
- */
-public interface CacheAffinityFunctionContext {
-    /**
-     * Gets affinity assignment for given partition on previous topology 
version. First node in returned list is
-     * a primary node, other nodes are backups.
-     *
-     * @param part Partition to get previous assignment for.
-     * @return List of nodes assigned to given partition on previous topology 
version or {@code null}
-     *      if this information is not available.
-     */
-    @Nullable public List<ClusterNode> previousAssignment(int part);
-
-    /**
-     * Gets number of backups for new assignment.
-     *
-     * @return Number of backups for new assignment.
-     */
-    public int backups();
-
-    /**
-     * Gets current topology snapshot. Snapshot will contain only nodes on 
which particular cache is configured.
-     * List of passed nodes is guaranteed to be sorted in a same order on all 
nodes on which partition assignment
-     * is performed.
-     *
-     * @return Cache topology snapshot.
-     */
-    public List<ClusterNode> currentTopologySnapshot();
-
-    /**
-     * Gets current topology version number.
-     *
-     * @return Current topology version number.
-     */
-    public AffinityTopologyVersion currentTopologyVersion();
-
-    /**
-     * Gets discovery event caused topology change.
-     *
-     * @return Discovery event caused latest topology change or {@code null} 
if this information is
-     *      not available.
-     */
-    @Nullable public DiscoveryEvent discoveryEvent();
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4ba0b01a/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheAffinityKey.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheAffinityKey.java
 
b/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheAffinityKey.java
deleted file mode 100644
index cd4a4ec..0000000
--- 
a/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheAffinityKey.java
+++ /dev/null
@@ -1,189 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.cache.affinity;
-
-import org.apache.ignite.internal.processors.cache.*;
-import org.apache.ignite.internal.util.tostring.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-
-import java.io.*;
-
-/**
- * Optional wrapper for cache keys to provide support
- * for custom affinity mapping. The value returned by
- * {@link #affinityKey(Object)} method will be used for key-to-node
- * affinity.
- * <p>
- * Note that the {@link #equals(Object)} and {@link #hashCode()} methods
- * delegate directly to the wrapped cache key provided by {@link #key()}
- * method.
- * <p>
- * This class is optional and does not have to be used. It only provides
- * extra convenience whenever custom affinity mapping is required. Here is
- * an example of how {@code Person} objects can be collocated with
- * {@code Company} objects they belong to:
- * <pre name="code" class="java">
- * Object personKey = new CacheAffinityKey(myPersonId, myCompanyId);
- *
- * // Both, the company and the person objects will be cached on the same node.
- * cache.put(myCompanyId, new Company(..));
- * cache.put(personKey, new Person(..));
- * </pre>
- * <p>
- * For more information and examples of cache affinity refer to
- * {@link CacheAffinityKeyMapper} and {@link CacheAffinityKeyMapped 
@CacheAffinityKeyMapped}
- * documentation.
- * @see CacheAffinityKeyMapped
- * @see CacheAffinityKeyMapper
- * @see CacheAffinityFunction
- */
-public class CacheAffinityKey<K> implements Externalizable {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** Key. */
-    @GridToStringInclude
-    private K key;
-
-    /** Affinity key. */
-    @GridToStringInclude
-    private Object affKey;
-
-    /**
-     * Empty constructor.
-     */
-    public CacheAffinityKey() {
-        // No-op.
-    }
-
-    /**
-     * Initializes key wrapper for a given key. If affinity key
-     * is not initialized, then this key will be used for affinity.
-     *
-     * @param key Key.
-     */
-    public CacheAffinityKey(K key) {
-        A.notNull(key, "key");
-
-        this.key = key;
-    }
-
-    /**
-     * Initializes key together with its affinity key counter-part.
-     *
-     * @param key Key.
-     * @param affKey Affinity key.
-     */
-    public CacheAffinityKey(K key, Object affKey) {
-        A.notNull(key, "key");
-
-        this.key = key;
-        this.affKey = affKey;
-    }
-
-    /**
-     * Gets wrapped key.
-     *
-     * @return Wrapped key.
-     */
-    public K key() {
-        return key;
-    }
-
-    /**
-     * Sets wrapped key.
-     *
-     * @param key Wrapped key.
-     */
-    public void key(K key) {
-        this.key = key;
-    }
-
-    /**
-     * Gets affinity key to use for affinity mapping. If affinity key is not 
provided,
-     * then {@code key} value will be returned.
-     * <p>
-     * This method is annotated with {@link CacheAffinityKeyMapped} and will 
be picked up
-     * by {@link GridCacheDefaultAffinityKeyMapper} automatically.
-     *
-     * @return Affinity key to use for affinity mapping.
-     */
-    @CacheAffinityKeyMapped
-    @SuppressWarnings({"unchecked"})
-    public <T> T affinityKey() {
-        A.notNull(key, "key");
-
-        return (T)(affKey == null ? key : affKey);
-    }
-
-    /**
-     * Sets affinity key to use for affinity mapping. If affinity key is not 
provided,
-     * then {@code key} value will be returned.
-     *
-     * @param affKey Affinity key to use for affinity mapping.
-     */
-    public void affinityKey(Object affKey) {
-        this.affKey = affKey;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void writeExternal(ObjectOutput out) throws IOException {
-        out.writeObject(key);
-        out.writeObject(affKey);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException {
-        key = (K)in.readObject();
-        affKey = in.readObject();
-    }
-
-    /**
-     * Hash code implementation which delegates to the underlying {@link 
#key()}. Note, however,
-     * that different subclasses of {@code CacheAffinityKey} will produce 
different hash codes.
-     * <p>
-     * Users should override this method if different behavior is desired.
-     *
-     * @return Hash code.
-     */
-    @Override public int hashCode() {
-        A.notNull(key, "key");
-
-        return 31 * key.hashCode() + getClass().getName().hashCode();
-    }
-
-    /**
-     * Equality check which delegates to the underlying key equality. Note, 
however, that
-     * different subclasses of {@code CacheAffinityKey} will never be equal.
-     * <p>
-     * Users should override this method if different behavior is desired.
-     *
-     * @param obj Object to check for equality.
-     * @return {@code True} if objects are equal.
-     */
-    @Override public boolean equals(Object obj) {
-        A.notNull(key, "key");
-
-        return obj != null && getClass() == obj.getClass() && 
key.equals(((CacheAffinityKey)obj).key);
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(CacheAffinityKey.class, this);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4ba0b01a/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheAffinityKeyMapped.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheAffinityKeyMapped.java
 
b/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheAffinityKeyMapped.java
deleted file mode 100644
index a26a9b0..0000000
--- 
a/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheAffinityKeyMapped.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.cache.affinity;
-
-import java.lang.annotation.*;
-import java.util.concurrent.*;
-
-/**
- * Optional annotation to specify custom key-to-node affinity. Affinity key is 
a key
- * which will be used to determine a node on which given cache key will be 
stored. This
- * annotation allows to mark a field or a method in the cache key object that 
will be
- * used as an affinity key (instead of the entire cache key object that is 
used for
- * affinity by default). Note that a class can have only one field or method 
annotated
- * with {@code @CacheAffinityKeyMapped} annotation.
- * <p>
- * One of the major use cases for this annotation is the routing of grid 
computations
- * to the nodes where the data for this computation is cached, the concept
- * otherwise known as {@code Collocation Of Computations And Data}.
- * <p>
- * <h1 class="header">Mapping Cache Keys</h1>
- * The default implementation of {@link CacheAffinityKeyMapper}, which will be 
used
- * if no explicit affinity mapper is specified in cache configuration, will 
first look
- * for any field or method annotated with {@code @CacheAffinityKeyMapped} 
annotation.
- * If such field or method is not found, then the cache key itself will be 
used for
- * key-to-node affinity (this means that all objects with the same cache key 
will always
- * be routed to the same node). If such field or method is found, then the 
value of this
- * field or method will be used for key-to-node affinity. This allows to 
specify alternate
- * affinity key, other than the cache key itself, whenever needed.
- * <p>
- * For example, if a {@code Person} object is always accessed together with a 
{@code Company} object
- * for which this person is an employee, then for better performance and 
scalability it makes sense to
- * collocate {@code Person} objects together with their {@code Company} object 
when storing them in
- * cache. To achieve that, cache key used to cache {@code Person} objects 
should have a field or method
- * annotated with {@code @CacheAffinityKeyMapped} annotation, which will 
provide the value of
- * the company key for which that person works, like so:
- * <pre name="code" class="java">
- * public class PersonKey {
- *     // Person ID used to identify a person.
- *     private String personId;
- *
- *     // Company ID which will be used for affinity.
- *     &#64;CacheAffinityKeyMapped
- *     private String companyId;
- *     ...
- * }
- * ...
- * // Instantiate person keys.
- * Object personKey1 = new PersonKey("myPersonId1", "myCompanyId");
- * Object personKey2 = new PersonKey("myPersonId2", "myCompanyId");
- *
- * // Both, the company and the person objects will be cached on the same node.
- * cache.put("myCompanyId", new Company(..));
- * cache.put(personKey1, new Person(..));
- * cache.put(personKey2, new Person(..));
- * </pre>
- * <p>
- * <h2 class="header">CacheAffinityKey</h2>
- * For convenience, you can also optionally use {@link CacheAffinityKey} 
class. Here is how a
- * {@code PersonKey} defined above would look using {@link CacheAffinityKey}:
- * <pre name="code" class="java">
- * Object personKey1 = new CacheAffinityKey("myPersonId1", "myCompanyId");
- * Object personKey2 = new CacheAffinityKey("myPersonId2", "myCompanyId");
- *
- * // Both, the company and the person objects will be cached on the same node.
- * cache.put(myCompanyId, new Company(..));
- * cache.put(personKey1, new Person(..));
- * cache.put(personKey2, new Person(..));
- * </pre>
- * <p>
- * <h1 class="header">Collocating Computations And Data</h1>
- * It is also possible to route computations to the nodes where the data is 
cached. This concept
- * is otherwise known as {@code Collocation Of Computations And Data}. In this 
case,
- * {@code @CacheAffinityKeyMapped} annotation allows to specify a routing 
affinity key for a
- * {@link org.apache.ignite.compute.ComputeJob} or any other grid computation, 
such as {@link Runnable},
- * {@link Callable}, or {@link org.apache.ignite.lang.IgniteClosure}. It 
should be attached to a method or
- * field that provides affinity key for the computation. Only one annotation 
per class is allowed.
- * Whenever such annotation is detected, then {@link 
org.apache.ignite.spi.loadbalancing.LoadBalancingSpi}
- * will be bypassed, and computation will be routed to the grid node where the 
specified affinity key is cached.
- * <p>
- * For more information about cache affinity also see {@link 
CacheAffinityKeyMapper} and
- * {@link CacheAffinityFunction} documentation.
- * Affinity for a key can be found from any node, regardless of whether it has 
cache started
- * or not. If cache is not started, affinity function will be fetched from the 
remote node
- * which does have the cache running.
- *
- * @see CacheAffinityFunction
- * @see CacheAffinityKeyMapper
- * @see CacheAffinityKey
- */
-@Documented
-@Retention(RetentionPolicy.RUNTIME)
-@Target({ElementType.FIELD, ElementType.METHOD})
-public @interface CacheAffinityKeyMapped {
-    // No-op.
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4ba0b01a/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheAffinityKeyMapper.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheAffinityKeyMapper.java
 
b/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheAffinityKeyMapper.java
deleted file mode 100644
index 4812940..0000000
--- 
a/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheAffinityKeyMapper.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.cache.affinity;
-
-import java.io.*;
-
-/**
- * Affinity mapper which maps cache key to an affinity key. Affinity key is a 
key which will be
- * used to determine a node on which this key will be cached. Every cache key 
will first be passed
- * through {@link #affinityKey(Object)} method, and the returned value of this 
method
- * will be given to {@link CacheAffinityFunction} implementation to find out 
key-to-node affinity.
- * <p>
- * The default implementation, which will be used if no explicit affinity 
mapper is specified
- * in cache configuration, will first look for any field or method annotated 
with
- * {@link CacheAffinityKeyMapped @CacheAffinityKeyMapped} annotation. If such 
field or method
- * is not found, then the cache key itself will be returned from {@link 
#affinityKey(Object) affinityKey(Object)}
- * method (this means that all objects with the same cache key will always be 
routed to the same node).
- * If such field or method is found, then the value of this field or method 
will be returned from
- * {@link #affinityKey(Object) affinityKey(Object)} method. This allows to 
specify alternate affinity key, other
- * than the cache key itself, whenever needed.
- * <p>
- * A custom (other than default) affinity mapper can be provided
- * via {@link 
org.apache.ignite.configuration.CacheConfiguration#getAffinityMapper()} 
configuration property.
- * <p>
- * For more information on affinity mapping and examples refer to {@link 
CacheAffinityFunction} and
- * {@link CacheAffinityKeyMapped @CacheAffinityKeyMapped} documentation.
- * @see CacheAffinityFunction
- * @see CacheAffinityKeyMapped
- */
-public interface CacheAffinityKeyMapper extends Serializable {
-    /**
-     * Maps passed in key to an alternate key which will be used for node 
affinity.
-     *
-     * @param key Key to map.
-     * @return Key to be used for node-to-affinity mapping (may be the same
-     *      key as passed in).
-     */
-    public Object affinityKey(Object key);
-
-    /**
-     * Resets cache affinity mapper to its initial state. This method will be 
called by
-     * the system any time the affinity mapper has been sent to remote node 
where
-     * it has to be reinitialized. If your implementation of affinity mapper
-     * has no initialization logic, leave this method empty.
-     */
-    public void reset();
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4ba0b01a/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheAffinityNodeAddressHashResolver.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheAffinityNodeAddressHashResolver.java
 
b/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheAffinityNodeAddressHashResolver.java
deleted file mode 100644
index 70f20ba..0000000
--- 
a/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheAffinityNodeAddressHashResolver.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.cache.affinity;
-
-import org.apache.ignite.cluster.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-
-/**
- * Node hash resolver which uses {@link 
org.apache.ignite.cluster.ClusterNode#consistentId()} as alternate hash value.
- */
-public class CacheAffinityNodeAddressHashResolver implements 
CacheAffinityNodeHashResolver {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** {@inheritDoc} */
-    @Override public Object resolve(ClusterNode node) {
-        return node.consistentId();
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(CacheAffinityNodeAddressHashResolver.class, this);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4ba0b01a/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheAffinityNodeHashResolver.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheAffinityNodeHashResolver.java
 
b/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheAffinityNodeHashResolver.java
deleted file mode 100644
index 79489cf..0000000
--- 
a/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheAffinityNodeHashResolver.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.cache.affinity;
-
-import org.apache.ignite.cluster.*;
-
-import java.io.*;
-
-/**
- * Resolver which is used to provide node hash value for affinity function.
- * <p>
- * Node IDs constantly change when nodes get restarted, which causes affinity 
mapping to change between restarts,
- * and hence causing redundant repartitioning. Providing an alternate node 
hash value, which survives node restarts,
- * will help to map keys to the same nodes whenever possible.
- * <p>
- * Note that on case clients exist they will query this object from the server 
and use it for affinity calculation.
- * Therefore you must ensure that server and clients can marshal and unmarshal 
this object in portable format,
- * i.e. all parties have object class(es) configured as portable.
- */
-public interface CacheAffinityNodeHashResolver extends Serializable {
-    /**
-     * Resolve alternate hash value for the given Grid node.
-     *
-     * @param node Grid node.
-     * @return Resolved hash ID.
-     */
-    public Object resolve(ClusterNode node);
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4ba0b01a/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheAffinityNodeIdHashResolver.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheAffinityNodeIdHashResolver.java
 
b/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheAffinityNodeIdHashResolver.java
deleted file mode 100644
index 3de9220..0000000
--- 
a/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheAffinityNodeIdHashResolver.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.cache.affinity;
-
-import org.apache.ignite.cluster.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-
-/**
- * Node hash resolver which uses generated node ID as node hash value. As new 
node ID is generated
- * on each node start, this resolver do not provide ability to map keys to the 
same nodes after restart.
- */
-public class CacheAffinityNodeIdHashResolver implements 
CacheAffinityNodeHashResolver {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** {@inheritDoc} */
-    @Override public Object resolve(ClusterNode node) {
-        return node.id();
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(CacheAffinityNodeIdHashResolver.class, this);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4ba0b01a/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheCentralizedAffinityFunction.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheCentralizedAffinityFunction.java
 
b/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheCentralizedAffinityFunction.java
deleted file mode 100644
index 0df8ffe..0000000
--- 
a/modules/core/src/main/java/org/apache/ignite/cache/affinity/CacheCentralizedAffinityFunction.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.cache.affinity;
-
-import java.lang.annotation.*;
-
-/**
- * Annotation marker which identifies affinity function that must be 
calculated on one centralized node
- * instead of independently on each node. In many cases it happens because it 
requires previous affinity state
- * in order to calculate new one.
- */
-@Retention(RetentionPolicy.RUNTIME)
-@Target(ElementType.TYPE)
-public @interface CacheCentralizedAffinityFunction {
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4ba0b01a/modules/core/src/main/java/org/apache/ignite/cache/affinity/CentralizedAffinityFunction.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/cache/affinity/CentralizedAffinityFunction.java
 
b/modules/core/src/main/java/org/apache/ignite/cache/affinity/CentralizedAffinityFunction.java
new file mode 100644
index 0000000..54e3cf6
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/cache/affinity/CentralizedAffinityFunction.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.affinity;
+
+import java.lang.annotation.*;
+
+/**
+ * Annotation marker which identifies affinity function that must be 
calculated on one centralized node
+ * instead of independently on each node. In many cases it happens because it 
requires previous affinity state
+ * in order to calculate new one.
+ */
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.TYPE)
+public @interface CentralizedAffinityFunction {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4ba0b01a/modules/core/src/main/java/org/apache/ignite/cache/affinity/fair/CachePartitionFairAffinity.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/cache/affinity/fair/CachePartitionFairAffinity.java
 
b/modules/core/src/main/java/org/apache/ignite/cache/affinity/fair/CachePartitionFairAffinity.java
deleted file mode 100644
index b4bd2e9..0000000
--- 
a/modules/core/src/main/java/org/apache/ignite/cache/affinity/fair/CachePartitionFairAffinity.java
+++ /dev/null
@@ -1,777 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.cache.affinity.fair;
-
-import org.apache.ignite.cache.affinity.*;
-import org.apache.ignite.cluster.*;
-import org.apache.ignite.configuration.CacheConfiguration;
-import org.apache.ignite.events.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-
-import java.io.*;
-import java.util.*;
-
-/**
- * Fair affinity function which tries to ensure that all nodes get equal 
number of partitions with
- * minimum amount of reassignments between existing nodes.
- * <p>
- * Cache affinity can be configured for individual caches via {@link 
CacheConfiguration#getAffinity()} method.
- */
-@CacheCentralizedAffinityFunction
-public class CachePartitionFairAffinity implements CacheAffinityFunction {
-    /** Default partition count. */
-    public static final int DFLT_PART_CNT = 256;
-
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** Ascending comparator. */
-    private static final Comparator<PartitionSet> ASC_CMP = new 
PartitionSetComparator();
-
-    /** Descending comparator. */
-    private static final Comparator<PartitionSet> DESC_CMP = 
Collections.reverseOrder(ASC_CMP);
-
-    /** */
-    private final int parts;
-
-    /**
-     * Creates fair affinity with default partition count.
-     */
-    public CachePartitionFairAffinity() {
-        this(DFLT_PART_CNT);
-    }
-
-    /**
-     * @param parts Number of partitions.
-     */
-    public CachePartitionFairAffinity(int parts) {
-        this.parts = parts;
-    }
-
-    /** {@inheritDoc} */
-    @Override public List<List<ClusterNode>> 
assignPartitions(CacheAffinityFunctionContext ctx) {
-        List<ClusterNode> topSnapshot = ctx.currentTopologySnapshot();
-
-        if (topSnapshot.size() == 1) {
-            ClusterNode primary = topSnapshot.get(0);
-
-            return Collections.nCopies(parts, 
Collections.singletonList(primary));
-        }
-
-        List<List<ClusterNode>> assignment = createCopy(ctx);
-
-        int tiers = Math.min(ctx.backups() + 1, topSnapshot.size());
-
-        // Per tier pending partitions.
-        Map<Integer, Queue<Integer>> pendingParts = new HashMap<>();
-
-        FullAssignmentMap fullMap = new FullAssignmentMap(tiers, assignment, 
topSnapshot);
-
-        for (int tier = 0; tier < tiers; tier++) {
-            // Check if this is a new tier and add pending partitions.
-            Queue<Integer> pending = pendingParts.get(tier);
-
-            for (int part = 0; part < parts; part++) {
-                if (fullMap.assignments.get(part).size() < tier + 1) {
-                    if (pending == null) {
-                        pending = new LinkedList<>();
-
-                        pendingParts.put(tier, pending);
-                    }
-
-                    if (!pending.contains(part))
-                        pending.add(part);
-
-                }
-            }
-
-            // Assign pending partitions, if any.
-            assignPending(tier, pendingParts, fullMap, topSnapshot);
-
-            // Balance assignments.
-            balance(tier, pendingParts, fullMap, topSnapshot);
-        }
-
-        return fullMap.assignments;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void reset() {
-        // No-op.
-    }
-
-    /** {@inheritDoc} */
-    @Override public int partitions() {
-        return parts;
-    }
-
-    /** {@inheritDoc} */
-    @Override public int partition(Object key) {
-        return U.safeAbs(hash(key.hashCode())) % parts;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void removeNode(UUID nodeId) {
-        // No-op.
-    }
-
-    /**
-     * Assigns pending (unassigned) partitions to nodes.
-     *
-     * @param tier Tier to assign (0 is primary, 1 - 1st backup,...).
-     * @param pendingMap Pending partitions per tier.
-     * @param fullMap Full assignment map to modify.
-     * @param topSnapshot Topology snapshot.
-     */
-    private void assignPending(int tier, Map<Integer, Queue<Integer>> 
pendingMap, FullAssignmentMap fullMap,
-        List<ClusterNode> topSnapshot) {
-        Queue<Integer> pending = pendingMap.get(tier);
-
-        if (F.isEmpty(pending))
-            return;
-
-        int idealPartCnt = parts / topSnapshot.size();
-
-        Map<UUID, PartitionSet> tierMapping = fullMap.tierMapping(tier);
-
-        PrioritizedPartitionMap underloadedNodes = filterNodes(tierMapping, 
idealPartCnt, false);
-
-        // First iterate over underloaded nodes.
-        assignPendingToUnderloaded(tier, pendingMap, fullMap, 
underloadedNodes, topSnapshot, false);
-
-        if (!pending.isEmpty() && !underloadedNodes.isEmpty()) {
-            // Same, forcing updates.
-            assignPendingToUnderloaded(tier, pendingMap, fullMap, 
underloadedNodes, topSnapshot, true);
-        }
-
-        if (!pending.isEmpty())
-            assignPendingToNodes(tier, pendingMap, fullMap, topSnapshot);
-
-        assert pending.isEmpty();
-
-        pendingMap.remove(tier);
-    }
-
-    /**
-     * Assigns pending partitions to underloaded nodes.
-     *
-     * @param tier Tier to assign.
-     * @param pendingMap Pending partitions per tier.
-     * @param fullMap Full assignment map to modify.
-     * @param underloadedNodes Underloaded nodes.
-     * @param topSnapshot Topology snapshot.
-     * @param force {@code True} if partitions should be moved.
-     */
-    private void assignPendingToUnderloaded(
-        int tier,
-        Map<Integer, Queue<Integer>> pendingMap,
-        FullAssignmentMap fullMap,
-        PrioritizedPartitionMap underloadedNodes,
-        Collection<ClusterNode> topSnapshot,
-        boolean force) {
-        Iterator<Integer> it = pendingMap.get(tier).iterator();
-
-        int ideal = parts / topSnapshot.size();
-
-        while (it.hasNext()) {
-            int part = it.next();
-
-            for (PartitionSet set : underloadedNodes.assignments()) {
-                ClusterNode node = set.node();
-
-                assert node != null;
-
-                if (fullMap.assign(part, tier, node, force, pendingMap)) {
-                    // We could add partition to partition map without 
forcing, remove partition from pending.
-                    it.remove();
-
-                    if (set.size() <= ideal)
-                        underloadedNodes.remove(set.nodeId());
-                    else
-                        underloadedNodes.update();
-
-                    break; // for, continue to the next partition.
-                }
-            }
-
-            if (underloadedNodes.isEmpty())
-                return;
-        }
-    }
-
-    /**
-     * Spreads pending partitions equally to all nodes in topology snapshot.
-     *
-     * @param tier Tier to assign.
-     * @param pendingMap Pending partitions per tier.
-     * @param fullMap Full assignment map to modify.
-     * @param topSnapshot Topology snapshot.
-     */
-    private void assignPendingToNodes(int tier, Map<Integer, Queue<Integer>> 
pendingMap,
-        FullAssignmentMap fullMap, List<ClusterNode> topSnapshot) {
-        Iterator<Integer> it = pendingMap.get(tier).iterator();
-
-        int idx = 0;
-
-        while (it.hasNext()) {
-            int part = it.next();
-
-            int i = idx;
-
-            boolean assigned = false;
-
-            do {
-                ClusterNode node = topSnapshot.get(i);
-
-                if (fullMap.assign(part, tier, node, false, pendingMap)) {
-                    it.remove();
-
-                    assigned = true;
-                }
-
-                i = (i + 1) % topSnapshot.size();
-
-                if (assigned)
-                    idx = i;
-            } while (i != idx);
-
-            if (!assigned) {
-                do {
-                    ClusterNode node = topSnapshot.get(i);
-
-                    if (fullMap.assign(part, tier, node, true, pendingMap)) {
-                        it.remove();
-
-                        assigned = true;
-                    }
-
-                    i = (i + 1) % topSnapshot.size();
-
-                    if (assigned)
-                        idx = i;
-                } while (i != idx);
-            }
-
-            if (!assigned)
-                throw new IllegalStateException("Failed to find assignable 
node for partition.");
-        }
-    }
-
-    /**
-     * Tries to balance assignments between existing nodes in topology.
-     *
-     * @param tier Tier to assign.
-     * @param pendingParts Pending partitions per tier.
-     * @param fullMap Full assignment map to modify.
-     * @param topSnapshot Topology snapshot.
-     */
-    private void balance(int tier, Map<Integer, Queue<Integer>> pendingParts, 
FullAssignmentMap fullMap,
-        Collection<ClusterNode> topSnapshot) {
-        int idealPartCnt = parts / topSnapshot.size();
-
-        Map<UUID, PartitionSet> mapping = fullMap.tierMapping(tier);
-
-        PrioritizedPartitionMap underloadedNodes = filterNodes(mapping, 
idealPartCnt, false);
-        PrioritizedPartitionMap overloadedNodes = filterNodes(mapping, 
idealPartCnt, true);
-
-        do {
-            boolean retry = false;
-
-            for (PartitionSet overloaded : overloadedNodes.assignments()) {
-                for (Integer part : overloaded.partitions()) {
-                    boolean assigned = false;
-
-                    for (PartitionSet underloaded : 
underloadedNodes.assignments()) {
-                        if (fullMap.assign(part, tier, underloaded.node(), 
false, pendingParts)) {
-                            // Size of partition sets has changed.
-                            if (overloaded.size() <= idealPartCnt)
-                                overloadedNodes.remove(overloaded.nodeId());
-                            else
-                                overloadedNodes.update();
-
-                            if (underloaded.size() >= idealPartCnt)
-                                underloadedNodes.remove(underloaded.nodeId());
-                            else
-                                underloadedNodes.update();
-
-                            assigned = true;
-
-                            retry = true;
-
-                            break;
-                        }
-                    }
-
-                    if (!assigned) {
-                        for (PartitionSet underloaded : 
underloadedNodes.assignments()) {
-                            if (fullMap.assign(part, tier, underloaded.node(), 
true, pendingParts)) {
-                                // Size of partition sets has changed.
-                                if (overloaded.size() <= idealPartCnt)
-                                    
overloadedNodes.remove(overloaded.nodeId());
-                                else
-                                    overloadedNodes.update();
-
-                                if (underloaded.size() >= idealPartCnt)
-                                    
underloadedNodes.remove(underloaded.nodeId());
-                                else
-                                    underloadedNodes.update();
-
-                                retry = true;
-
-                                break;
-                            }
-                        }
-                    }
-
-                    if (retry)
-                        break; // for part.
-                }
-
-                if (retry)
-                    break; // for overloaded.
-            }
-
-            if (!retry)
-                break;
-        }
-        while (true);
-    }
-
-    /**
-     * Constructs underloaded or overloaded partition map.
-     *
-     * @param mapping Mapping to filter.
-     * @param idealPartCnt Ideal number of partitions per node.
-     * @param overloaded {@code True} if should create overloaded map, {@code 
false} for underloaded.
-     * @return Prioritized partition map.
-     */
-    private PrioritizedPartitionMap filterNodes(Map<UUID, PartitionSet> 
mapping, int idealPartCnt, boolean overloaded) {
-        assert mapping != null;
-
-        PrioritizedPartitionMap res = new PrioritizedPartitionMap(overloaded ? 
DESC_CMP : ASC_CMP);
-
-        for (PartitionSet set : mapping.values()) {
-            if ((overloaded && set.size() > idealPartCnt) || (!overloaded && 
set.size() < idealPartCnt))
-               res.add(set);
-        }
-
-        return res;
-    }
-
-    /**
-     * Creates copy of previous partition assignment.
-     *
-     * @param ctx Affinity function context.
-     * @return Assignment copy and per node partition map.
-     */
-    private List<List<ClusterNode>> createCopy(CacheAffinityFunctionContext 
ctx) {
-        DiscoveryEvent discoEvt = ctx.discoveryEvent();
-
-        UUID leftNodeId = (discoEvt == null || discoEvt.type() == 
EventType.EVT_NODE_JOINED)
-            ? null
-            : discoEvt.eventNode().id();
-
-        List<List<ClusterNode>> cp = new ArrayList<>(parts);
-
-        for (int part = 0; part < parts; part++) {
-            List<ClusterNode> partNodes = ctx.previousAssignment(part);
-
-            List<ClusterNode> partNodesCp;
-
-            if (partNodes == null)
-                partNodesCp = new ArrayList<>();
-            else {
-                if (leftNodeId == null) {
-                    partNodesCp = new ArrayList<>(partNodes.size() + 1); // 
Node joined.
-
-                    partNodesCp.addAll(partNodes);
-                }
-                else {
-                    partNodesCp = new ArrayList<>(partNodes.size());
-
-                    for (ClusterNode affNode : partNodes) {
-                        if (!affNode.id().equals(leftNodeId))
-                            partNodesCp.add(affNode);
-                    }
-                }
-            }
-
-            cp.add(partNodesCp);
-        }
-
-        return cp;
-    }
-
-    /**
-     *
-     */
-    private static class PartitionSetComparator implements 
Comparator<PartitionSet>, Serializable {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /** {@inheritDoc} */
-        @Override public int compare(PartitionSet o1, PartitionSet o2) {
-            return Integer.compare(o1.parts.size(), o2.parts.size());
-        }
-    }
-
-    /**
-     * Prioritized partition map. Ordered structure in which nodes are ordered 
in ascending or descending order
-     * by number of partitions assigned to a node.
-     */
-    private static class PrioritizedPartitionMap {
-        /** Comparator. */
-        private Comparator<PartitionSet> cmp;
-
-        /** Assignment map. */
-        private Map<UUID, PartitionSet> assignmentMap = new HashMap<>();
-
-        /** Assignment list, ordered according to comparator. */
-        private List<PartitionSet> assignmentList = new ArrayList<>();
-
-        /**
-         * @param cmp Comparator.
-         */
-        private PrioritizedPartitionMap(Comparator<PartitionSet> cmp) {
-            this.cmp = cmp;
-        }
-
-        /**
-         * @param set Partition set to add.
-         */
-        public void add(PartitionSet set) {
-            PartitionSet old = assignmentMap.put(set.nodeId(), set);
-
-            if (old == null) {
-                assignmentList.add(set);
-
-                update();
-            }
-        }
-
-        /**
-         * Sorts assignment list.
-         */
-        public void update() {
-            Collections.sort(assignmentList, cmp);
-        }
-
-        /**
-         * @return Sorted assignment list.
-         */
-        public List<PartitionSet> assignments() {
-            return assignmentList;
-        }
-
-        /**
-         * @param uuid Uuid.
-         */
-        public void remove(UUID uuid) {
-            PartitionSet rmv = assignmentMap.remove(uuid);
-
-            assignmentList.remove(rmv);
-        }
-
-        /**
-         *
-         */
-        public boolean isEmpty() {
-            return assignmentList.isEmpty();
-        }
-    }
-
-    /**
-     * Constructs assignment map for specified tier.
-     *
-     * @param tier Tier number, -1 for all tiers altogether.
-     * @param assignment Assignment to construct map from.
-     * @param topSnapshot Topology snapshot.
-     * @return Assignment map.
-     */
-    private static Map<UUID, PartitionSet> assignments(int tier, 
List<List<ClusterNode>> assignment,
-        Collection<ClusterNode> topSnapshot) {
-        Map<UUID, PartitionSet> tmp = new LinkedHashMap<>();
-
-        for (int part = 0; part < assignment.size(); part++) {
-            List<ClusterNode> nodes = assignment.get(part);
-
-            assert nodes instanceof RandomAccess;
-
-            if (nodes.size() <= tier)
-                continue;
-
-            int start = tier < 0 ? 0 : tier;
-            int end = tier < 0 ? nodes.size() : tier + 1;
-
-            for (int i = start; i < end; i++) {
-                ClusterNode n = nodes.get(i);
-
-                PartitionSet set = tmp.get(n.id());
-
-                if (set == null) {
-                    set = new PartitionSet(n);
-
-                    tmp.put(n.id(), set);
-                }
-
-                set.add(part);
-            }
-        }
-
-        if (tmp.size() < topSnapshot.size()) {
-            for (ClusterNode node : topSnapshot) {
-                if (!tmp.containsKey(node.id()))
-                    tmp.put(node.id(), new PartitionSet(node));
-            }
-        }
-
-        return tmp;
-    }
-
-    /**
-     * Full assignment map. Auxiliary data structure which maintains resulting 
assignment and temporary
-     * maps consistent.
-     */
-    @SuppressWarnings("unchecked")
-    private static class FullAssignmentMap {
-        /** Per-tier assignment maps. */
-        private Map<UUID, PartitionSet>[] tierMaps;
-
-        /** Full assignment map. */
-        private Map<UUID, PartitionSet> fullMap;
-
-        /** Resulting assignment. */
-        private List<List<ClusterNode>> assignments;
-
-        /**
-         * @param tiers Number of tiers.
-         * @param assignments Assignments to modify.
-         * @param topSnapshot Topology snapshot.
-         */
-        private FullAssignmentMap(int tiers, List<List<ClusterNode>> 
assignments, Collection<ClusterNode> topSnapshot) {
-            this.assignments = assignments;
-
-            tierMaps = new Map[tiers];
-
-            for (int tier = 0; tier < tiers; tier++)
-                tierMaps[tier] = assignments(tier, assignments, topSnapshot);
-
-            fullMap = assignments(-1, assignments, topSnapshot);
-        }
-
-        /**
-         * Tries to assign partition to given node on specified tier. If force 
is false, assignment will succeed
-         * only if this partition is not already assigned to a node. If force 
is true, then assignment will succeed
-         * only if partition is not assigned to a tier with number less than 
passed in. Assigned partition from
-         * greater tier will be moved to pending queue.
-         *
-         * @param part Partition to assign.
-         * @param tier Tier number to assign.
-         * @param node Node to move partition to.
-         * @param force Force flag.
-         * @param pendingParts per tier pending partitions map.
-         * @return {@code True} if assignment succeeded.
-         */
-        boolean assign(int part, int tier, ClusterNode node, boolean force, 
Map<Integer, Queue<Integer>> pendingParts) {
-            UUID nodeId = node.id();
-
-            if (!fullMap.get(nodeId).contains(part)) {
-                tierMaps[tier].get(nodeId).add(part);
-
-                fullMap.get(nodeId).add(part);
-
-                List<ClusterNode> assignment = assignments.get(part);
-
-                if (assignment.size() <= tier)
-                    assignment.add(node);
-                else {
-                    ClusterNode oldNode = assignment.set(tier, node);
-
-                    if (oldNode != null) {
-                        UUID oldNodeId = oldNode.id();
-
-                        tierMaps[tier].get(oldNodeId).remove(part);
-                        fullMap.get(oldNodeId).remove(part);
-                    }
-                }
-
-                return true;
-            }
-            else if (force) {
-                assert !tierMaps[tier].get(nodeId).contains(part);
-
-                // Check previous tiers first.
-                for (int t = 0; t < tier; t++) {
-                    if (tierMaps[t].get(nodeId).contains(part))
-                        return false;
-                }
-
-                // Partition is on some lower tier, switch it.
-                for (int t = tier + 1; t < tierMaps.length; t++) {
-                    if (tierMaps[t].get(nodeId).contains(part)) {
-                        ClusterNode oldNode = assignments.get(part).get(tier);
-
-                        // Move partition from level t to tier.
-                        assignments.get(part).set(tier, node);
-                        assignments.get(part).set(t, null);
-
-                        if (oldNode != null) {
-                            tierMaps[tier].get(oldNode.id()).remove(part);
-                            fullMap.get(oldNode.id()).remove(part);
-                        }
-
-                        tierMaps[tier].get(nodeId).add(part);
-                        tierMaps[t].get(nodeId).remove(part);
-
-                        Queue<Integer> pending = pendingParts.get(t);
-
-                        if (pending == null) {
-                            pending = new LinkedList<>();
-
-                            pendingParts.put(t, pending);
-                        }
-
-                        pending.add(part);
-
-                        return true;
-                    }
-                }
-
-                throw new IllegalStateException("Unable to assign partition to 
node while force is true.");
-            }
-
-            // !force.
-            return false;
-        }
-
-        /**
-         * Gets tier mapping.
-         *
-         * @param tier Tier to get mapping.
-         * @return Per node map.
-         */
-        public Map<UUID, PartitionSet> tierMapping(int tier) {
-            return tierMaps[tier];
-        }
-    }
-
-    /**
-     * Applies a supplemental hash function to a given hashCode, which
-     * defends against poor quality hash functions.
-     *
-     * @param h Hash code.
-     * @return Enhanced hash code.
-     */
-    private static int hash(int h) {
-        // Spread bits to regularize both segment and index locations,
-        // using variant of single-word Wang/Jenkins hash.
-        h += (h <<  15) ^ 0xffffcd7d;
-        h ^= (h >>> 10);
-        h += (h <<   3);
-        h ^= (h >>>  6);
-        h += (h <<   2) + (h << 14);
-        return h ^ (h >>> 16);
-    }
-
-    /**
-     *
-     */
-    @SuppressWarnings("ComparableImplementedButEqualsNotOverridden")
-    private static class PartitionSet {
-        /** */
-        private ClusterNode node;
-
-        /** Partitions. */
-        private Collection<Integer> parts = new LinkedList<>();
-
-        /**
-         * @param node Node.
-         */
-        private PartitionSet(ClusterNode node) {
-            this.node = node;
-        }
-
-        /**
-         * @return Node.
-         */
-        private ClusterNode node() {
-            return node;
-        }
-
-        /**
-         * @return Node ID.
-         */
-        private UUID nodeId() {
-            return node.id();
-        }
-
-        /**
-         * @return Partition set size.
-         */
-        private int size() {
-            return parts.size();
-        }
-
-        /**
-         * Adds partition to partition set.
-         *
-         * @param part Partition to add.
-         * @return {@code True} if partition was added, {@code false} if 
partition already exists.
-         */
-        private boolean add(int part) {
-            if (!parts.contains(part)) {
-                parts.add(part);
-
-                return true;
-            }
-
-            return false;
-        }
-
-        /**
-         * @param part Partition to remove.
-         */
-        private void remove(Integer part) {
-            parts.remove(part); // Remove object, not index.
-        }
-
-        /**
-         * @return Partitions.
-         */
-        @SuppressWarnings("TypeMayBeWeakened")
-        private Collection<Integer> partitions() {
-            return parts;
-        }
-
-        /**
-         * Checks if partition set contains given partition.
-         *
-         * @param part Partition to check.
-         * @return {@code True} if partition set contains given partition.
-         */
-        private boolean contains(int part) {
-            return parts.contains(part);
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return "PartSet [nodeId=" + node.id() + ", size=" + parts.size() + 
", parts=" + parts + ']';
-        }
-    }
-}

Reply via email to