http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a62862fe/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/adaptive/GridAdaptiveJobCountLoadProbe.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/adaptive/GridAdaptiveJobCountLoadProbe.java
 
b/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/adaptive/GridAdaptiveJobCountLoadProbe.java
deleted file mode 100644
index b98108e..0000000
--- 
a/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/adaptive/GridAdaptiveJobCountLoadProbe.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/* @java.file.header */
-
-/*  _________        _____ __________________        _____
- *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
- *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
- *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
- *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
- */
-
-package org.gridgain.grid.spi.loadbalancing.adaptive;
-
-import org.apache.ignite.cluster.*;
-import org.gridgain.grid.util.typedef.internal.*;
-
-/**
- * Implementation of node load probing based on active and waiting job count.
- * Based on {@link #setUseAverage(boolean)} parameter, this implementation will
- * either use average job count values or current (default is to use averages).
- * <p>
- * The load of a node is simply calculated by adding active and waiting job 
counts.
- * <p>
- * Below is an example of how CPU load probe would be configured in GridGain
- * Spring configuration file:
- * <pre name="code" class="xml">
- * &lt;property name="loadBalancingSpi"&gt;
- *     &lt;bean 
class="org.gridgain.grid.spi.loadBalancing.adaptive.GridAdaptiveLoadBalancingSpi"&gt;
- *         &lt;property name="loadProbe"&gt;
- *             &lt;bean 
class="org.gridgain.grid.spi.loadBalancing.adaptive.GridAdaptiveJobCountLoadProbe"&gt;
- *                 &lt;property name="useAverage" value="true"/&gt;
- *             &lt;/bean&gt;
- *         &lt;/property&gt;
- *     &lt;/bean&gt;
- * &lt;/property&gt;
- * </pre>
- */
-public class GridAdaptiveJobCountLoadProbe implements GridAdaptiveLoadProbe {
-    /** Flag indicating whether to use average CPU load vs. current. */
-    private boolean useAvg = true;
-
-    /**
-     * Initializes active job probe.
-     */
-    public GridAdaptiveJobCountLoadProbe() {
-        // No-op.
-    }
-
-    /**
-     * Creates new active job prove specifying whether to use average
-     * job counts vs. current.
-     *
-     * @param useAvg Flag indicating whether to use average job counts vs. 
current.
-     */
-    public GridAdaptiveJobCountLoadProbe(boolean useAvg) {
-        this.useAvg = useAvg;
-    }
-
-    /**
-     * Gets flag indicating whether to use average job counts vs. current.
-     *
-     * @return Flag indicating whether to use average job counts vs. current.
-     */
-    public boolean isUseAverage() {
-        return useAvg;
-    }
-
-    /**
-     * Sets flag indicating whether to use average job counts vs. current.
-     *
-     * @param useAvg Flag indicating whether to use average job counts vs. 
current.
-     */
-    public void setUseAverage(boolean useAvg) {
-        this.useAvg = useAvg;
-    }
-
-
-    /** {@inheritDoc} */
-    @Override public double getLoad(ClusterNode node, int 
jobsSentSinceLastUpdate) {
-        ClusterNodeMetrics metrics = node.metrics();
-
-        if (useAvg) {
-            double load = metrics.getAverageActiveJobs() + 
metrics.getAverageWaitingJobs();
-
-            if (load > 0)
-                return load;
-        }
-
-        double load = metrics.getCurrentActiveJobs() + 
metrics.getCurrentWaitingJobs();
-
-        return load < 0 ? 0 : load;
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(GridAdaptiveJobCountLoadProbe.class, this);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a62862fe/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/adaptive/GridAdaptiveLoadBalancingSpi.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/adaptive/GridAdaptiveLoadBalancingSpi.java
 
b/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/adaptive/GridAdaptiveLoadBalancingSpi.java
deleted file mode 100644
index d02529b..0000000
--- 
a/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/adaptive/GridAdaptiveLoadBalancingSpi.java
+++ /dev/null
@@ -1,581 +0,0 @@
-/* @java.file.header */
-
-/*  _________        _____ __________________        _____
- *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
- *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
- *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
- *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
- */
-
-package org.gridgain.grid.spi.loadbalancing.adaptive;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cluster.*;
-import org.apache.ignite.compute.*;
-import org.apache.ignite.events.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.resources.*;
-import org.apache.ignite.spi.*;
-import org.gridgain.grid.*;
-import org.gridgain.grid.kernal.managers.eventstorage.*;
-import org.gridgain.grid.spi.loadbalancing.*;
-import org.gridgain.grid.util.typedef.*;
-import org.gridgain.grid.util.typedef.internal.*;
-import org.jdk8.backport.*;
-import org.jetbrains.annotations.*;
-
-import java.util.*;
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.*;
-import java.util.concurrent.locks.*;
-
-import static org.apache.ignite.events.IgniteEventType.*;
-
-/**
- * Load balancing SPI that adapts to overall node performance. It
- * proportionally distributes more jobs to more performant nodes based
- * on a pluggable and dynamic node load probing.
- * <p>
- * <h1 class="header">Adaptive Node Probe</h1>
- * This SPI comes with pluggable algorithm to calculate a node load
- * at any given point of time. The algorithm is defined by
- * {@link GridAdaptiveLoadProbe} interface and user is
- * free to provide custom implementations. By default
- * {@link GridAdaptiveCpuLoadProbe} implementation is used
- * which distributes jobs to nodes based on average CPU load
- * on every node.
- * <p>
- * The following load probes are available with the product:
- * <ul>
- * <li>{@link GridAdaptiveCpuLoadProbe} - default</li>
- * <li>{@link GridAdaptiveProcessingTimeLoadProbe}</li>
- * <li>{@link GridAdaptiveJobCountLoadProbe}</li>
- * </ul>
- * Note that if {@link 
GridAdaptiveLoadProbe#getLoad(org.apache.ignite.cluster.ClusterNode, int)} 
returns a value of {@code 0},
- * then implementation will assume that load value is simply not available and
- * will try to calculate an average of load values for other nodes. If such
- * average cannot be obtained (all node load values are {@code 0}), then a 
value
- * of {@code 1} will be used.
- * <p>
- * When working with node metrics, take into account that all averages are
- * calculated over metrics history size defined by {@link 
org.apache.ignite.configuration.IgniteConfiguration#getMetricsExpireTime()}
- * and {@link 
org.apache.ignite.configuration.IgniteConfiguration#getMetricsHistorySize()} 
grid configuration parameters.
- * Generally the larger these configuration parameter values are, the more 
precise the metrics are.
- * You should tune these values based on the level of accuracy needed vs. the 
additional memory
- * that would be required for storing metrics.
- * <p>
- * You should also keep in mind that metrics for remote nodes are delayed 
(usually by the
- * heartbeat frequency). So if it is acceptable in your environment, set the 
heartbeat frequency
- * to be more inline with job execution time. Generally, the more often 
heartbeats between nodes
- * are exchanged, the more precise the metrics are. However, you should keep 
in mind that if
- * heartbeats are exchanged too often then it may create unnecessary traffic 
in the network.
- * Heartbeats (or metrics update frequency) can be configured via underlying
- * {@link org.apache.ignite.spi.discovery.DiscoverySpi} used in your grid.
- * <p>
- * Here is an example of how probing can be implemented to use
- * number of active and waiting jobs as probing mechanism:
- * <pre name="code" class="java">
- * public class FooBarLoadProbe implements GridAdaptiveLoadProbe {
- *     // Flag indicating whether to use average value or current.
- *     private int useAvg = true;
- *
- *     public FooBarLoadProbe(boolean useAvg) {
- *         this.useAvg = useAvg;
- *     }
- *
- *     // Calculate load based on number of active and waiting jobs.
- *     public double getLoad(GridNode node, int jobsSentSinceLastUpdate) {
- *         GridNodeMetrics metrics = node.getMetrics();
- *
- *         if (useAvg) {
- *             double load = metrics.getAverageActiveJobs() + 
metrics.getAverageWaitingJobs();
- *
- *             if (load > 0) {
- *                 return load;
- *             }
- *         }
- *
- *         return metrics.getCurrentActiveJobs() + 
metrics.getCurrentWaitingJobs();
- *     }
- * }
- * </pre>
- * <h1 class="header">Which Node Probe To Use</h1>
- * There is no correct answer here. Every single node probe will work better 
or worse in
- * different environments. CPU load probe (default option) is the safest 
approach to start
- * with as it simply attempts to utilize every CPU on the grid to the maximum. 
However, you should
- * experiment with other probes by executing load tests in your environment 
and observing
- * which probe gives you best performance and load balancing.
- * <p>
- * <h1 class="header">Task Coding Example</h1>
- * If you are using {@link org.apache.ignite.compute.ComputeTaskSplitAdapter} 
then load balancing logic
- * is transparent to your code and is handled automatically by the adapter.
- * Here is an example of how your task will look:
- * <pre name="code" class="java">
- * public class MyFooBarTask extends GridComputeTaskSplitAdapter&lt;Object, 
Object&gt; {
- *    &#64;Override
- *    protected Collection&lt;? extends GridComputeJob&gt; split(int gridSize, 
Object arg) throws GridException {
- *        List&lt;MyFooBarJob&gt; jobs = new 
ArrayList&lt;MyFooBarJob&gt;(gridSize);
- *
- *        for (int i = 0; i &lt; gridSize; i++) {
- *            jobs.add(new MyFooBarJob(arg));
- *        }
- *
- *        // Node assignment via load balancer
- *        // happens automatically.
- *        return jobs;
- *    }
- *    ...
- * }
- * </pre>
- * If you need more fine-grained control over how some jobs within task get 
mapped to a node
- * and use affinity load balancing for some other jobs within task, then you 
should use
- * {@link org.apache.ignite.compute.ComputeTaskAdapter}. Here is an example of 
how your task will look. Note that in this
- * case we manually inject load balancer and use it to pick the best node. 
Doing it in
- * such way would allow user to map some jobs manually and for others use load 
balancer.
- * <pre name="code" class="java">
- * public class MyFooBarTask extends GridComputeTaskAdapter&lt;String, 
String&gt; {
- *    // Inject load balancer.
- *    &#64;GridLoadBalancerResource
- *    GridComputeLoadBalancer balancer;
- *
- *    // Map jobs to grid nodes.
- *    public Map&lt;? extends GridComputeJob, GridNode&gt; 
map(List&lt;GridNode&gt; subgrid, String arg) throws GridException {
- *        Map&lt;MyFooBarJob, GridNode&gt; jobs = new HashMap&lt;MyFooBarJob, 
GridNode&gt;(subgrid.size());
- *
- *        // In more complex cases, you can actually do
- *        // more complicated assignments of jobs to nodes.
- *        for (int i = 0; i &lt; subgrid.size(); i++) {
- *            // Pick the next best balanced node for the job.
- *            jobs.put(new MyFooBarJob(arg), balancer.getBalancedNode())
- *        }
- *
- *        return jobs;
- *    }
- *
- *    // Aggregate results into one compound result.
- *    public String reduce(List&lt;GridComputeJobResult&gt; results) throws 
GridException {
- *        // For the purpose of this example we simply
- *        // concatenate string representation of every
- *        // job result
- *        StringBuilder buf = new StringBuilder();
- *
- *        for (GridComputeJobResult res : results) {
- *            // Append string representation of result
- *            // returned by every job.
- *            buf.append(res.getData().string());
- *        }
- *
- *        return buf.string();
- *    }
- * }
- * </pre>
- * <p>
- * <h1 class="header">Configuration</h1>
- * In order to use this load balancer, you should configure your grid instance
- * to use {@code GridJobsLoadBalancingSpi} either from Spring XML file or
- * directly. The following configuration parameters are supported:
- * <h2 class="header">Mandatory</h2>
- * This SPI has no mandatory configuration parameters.
- * <h2 class="header">Optional</h2>
- * This SPI has the following optional configuration parameters:
- * <ul>
- * <li>
- *      Adaptive node load probing implementation (see {@link 
#setLoadProbe(GridAdaptiveLoadProbe)}).
- *      This configuration parameter supplies a custom algorithm for probing a 
node's load.
- *      By default, {@link GridAdaptiveCpuLoadProbe} implementation is used 
which
- *      takes every node's CPU load and tries to send proportionally more jobs 
to less loaded nodes.
- * </li>
- * </ul>
- * <p>
- * Below is Java configuration example:
- * <pre name="code" class="java">
- * GridAdaptiveLoadBalancingSpi spi = new GridAdaptiveLoadBalancingSpi();
- *
- * // Configure probe to use latest job execution time vs. average.
- * GridAdaptiveProcessingTimeLoadProbe probe = new 
GridAdaptiveProcessingTimeLoadProbe(false);
- *
- * spi.setLoadProbe(probe);
- *
- * GridConfiguration cfg = new GridConfiguration();
- *
- * // Override default load balancing SPI.
- * cfg.setLoadBalancingSpi(spi);
- *
- * // Starts grid.
- * G.start(cfg);
- * </pre>
- * Here is how you can configure {@code GridJobsLoadBalancingSpi} using Spring 
XML configuration:
- * <pre name="code" class="xml">
- * &lt;property name="loadBalancingSpi"&gt;
- *     &lt;bean 
class="org.gridgain.grid.spi.loadBalancing.adaptive.GridAdaptiveLoadBalancingSpi"&gt;
- *         &lt;property name="loadProbe"&gt;
- *             &lt;bean 
class="org.gridgain.grid.spi.loadBalancing.adaptive.GridAdaptiveProcessingTimeLoadProbe"&gt;
- *                 &lt;constructor-arg value="false"/&gt;
- *             &lt;/bean&gt;
- *         &lt;/property&gt;
- *     &lt;/bean&gt;
- * &lt;/property&gt;
- * </pre>
- * <p>
- * <img src="http://www.gridgain.com/images/spring-small.png";>
- * <br>
- * For information about Spring framework visit <a 
href="http://www.springframework.org/";>www.springframework.org</a>
- */
-@IgniteSpiMultipleInstancesSupport(true)
-public class GridAdaptiveLoadBalancingSpi extends IgniteSpiAdapter implements 
GridLoadBalancingSpi,
-    GridAdaptiveLoadBalancingSpiMBean {
-    /** Random number generator. */
-    private static final Random RAND = new Random();
-
-    /** Grid logger. */
-    @IgniteLoggerResource
-    private IgniteLogger log;
-
-    /** */
-    private GridAdaptiveLoadProbe probe = new GridAdaptiveCpuLoadProbe();
-
-    /** Local event listener to listen to task completion events. */
-    private GridLocalEventListener evtLsnr;
-
-    /** Task topologies. First pair value indicates whether or not jobs have 
been mapped. */
-    private ConcurrentMap<IgniteUuid, IgniteBiTuple<Boolean, 
WeightedTopology>> taskTops =
-        new ConcurrentHashMap8<>();
-
-    /** */
-    private final Map<UUID, AtomicInteger> nodeJobs = new HashMap<>();
-
-    /** */
-    private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
-
-    /** {@inheritDoc} */
-    @Override public String getLoadProbeFormatted() {
-        return probe.toString();
-    }
-
-    /**
-     * Sets implementation of node load probe. By default {@link 
GridAdaptiveProcessingTimeLoadProbe}
-     * is used which proportionally distributes load based on the average job 
execution
-     * time on every node.
-     *
-     * @param probe Implementation of node load probe
-     */
-    @IgniteSpiConfiguration(optional = true)
-    public void setLoadProbe(GridAdaptiveLoadProbe probe) {
-        A.ensure(probe != null, "probe != null");
-
-        this.probe = probe;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void spiStart(@Nullable String gridName) throws 
IgniteSpiException {
-        startStopwatch();
-
-        assertParameter(probe != null, "loadProbe != null");
-
-        if (log.isDebugEnabled())
-            log.debug(configInfo("loadProbe", probe));
-
-        registerMBean(gridName, this, GridAdaptiveLoadBalancingSpiMBean.class);
-
-        // Ack ok start.
-        if (log.isDebugEnabled())
-            log.debug(startInfo());
-    }
-
-    /** {@inheritDoc} */
-    @Override public void spiStop() throws IgniteSpiException {
-        rwLock.writeLock().lock();
-
-        try {
-            nodeJobs.clear();
-        }
-        finally {
-            rwLock.writeLock().unlock();
-        }
-
-        unregisterMBean();
-
-        // Ack ok stop.
-        if (log.isDebugEnabled())
-            log.debug(stopInfo());
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void onContextInitialized0(IgniteSpiContext spiCtx) 
throws IgniteSpiException {
-        getSpiContext().addLocalEventListener(evtLsnr = new 
GridLocalEventListener() {
-            @Override public void onEvent(IgniteEvent evt) {
-                switch (evt.type()) {
-                    case EVT_TASK_FINISHED:
-                    case EVT_TASK_FAILED: {
-                        IgniteTaskEvent taskEvt = (IgniteTaskEvent)evt;
-
-                        taskTops.remove(taskEvt.taskSessionId());
-
-                        if (log.isDebugEnabled())
-                            log.debug("Removed task topology from topology 
cache for session: " +
-                                taskEvt.taskSessionId());
-
-                        break;
-                    }
-
-                    case EVT_JOB_MAPPED: {
-                        // We should keep topology and use cache in 
GridComputeTask#map() method to
-                        // avoid O(n*n/2) complexity, after that we can drop 
caches.
-                        // Here we set mapped property and later cache will be 
ignored
-                        IgniteJobEvent jobEvt = (IgniteJobEvent)evt;
-
-                        IgniteBiTuple<Boolean, WeightedTopology> weightedTop = 
taskTops.get(jobEvt.taskSessionId());
-
-                        if (weightedTop != null)
-                            weightedTop.set1(true);
-
-                        if (log.isDebugEnabled())
-                            log.debug("Job has been mapped. Ignore cache for 
session: " + jobEvt.taskSessionId());
-
-                        break;
-                    }
-
-                    case EVT_NODE_METRICS_UPDATED:
-                    case EVT_NODE_FAILED:
-                    case EVT_NODE_JOINED:
-                    case EVT_NODE_LEFT: {
-                        IgniteDiscoveryEvent discoEvt = 
(IgniteDiscoveryEvent)evt;
-
-                        rwLock.writeLock().lock();
-
-                        try {
-                            switch (evt.type()) {
-                                case EVT_NODE_JOINED: {
-                                    nodeJobs.put(discoEvt.eventNode().id(), 
new AtomicInteger(0));
-
-                                    break;
-                                }
-
-                                case EVT_NODE_LEFT:
-                                case EVT_NODE_FAILED: {
-                                    nodeJobs.remove(discoEvt.eventNode().id());
-
-                                    break;
-                                }
-
-                                case EVT_NODE_METRICS_UPDATED: {
-                                    // Reset counter.
-                                    nodeJobs.put(discoEvt.eventNode().id(), 
new AtomicInteger(0));
-
-                                    break;
-                                }
-                            }
-                        }
-                        finally {
-                            rwLock.writeLock().unlock();
-                        }
-                    }
-
-                }
-            }
-        },
-            EVT_NODE_METRICS_UPDATED,
-            EVT_NODE_FAILED,
-            EVT_NODE_JOINED,
-            EVT_NODE_LEFT,
-            EVT_TASK_FINISHED,
-            EVT_TASK_FAILED,
-            EVT_JOB_MAPPED
-        );
-
-        // Put all known nodes.
-        rwLock.writeLock().lock();
-
-        try {
-            for (ClusterNode node : getSpiContext().nodes())
-                nodeJobs.put(node.id(), new AtomicInteger(0));
-        }
-        finally {
-            rwLock.writeLock().unlock();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void onContextDestroyed0() {
-        if (evtLsnr != null) {
-            IgniteSpiContext ctx = getSpiContext();
-
-            if (ctx != null)
-                ctx.removeLocalEventListener(evtLsnr);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public ClusterNode getBalancedNode(ComputeTaskSession ses, 
List<ClusterNode> top, ComputeJob job)
-    throws GridException {
-        A.notNull(ses, "ses");
-        A.notNull(top, "top");
-        A.notNull(job, "job");
-
-        IgniteBiTuple<Boolean, WeightedTopology> weightedTop = 
taskTops.get(ses.getId());
-
-        // Create new cached topology if there is no one. Do not
-        // use cached topology after task has been mapped.
-        if (weightedTop == null)
-            // Called from GridComputeTask#map(). Put new topology and false 
as not mapped yet.
-            taskTops.put(ses.getId(), weightedTop = F.t(false, new 
WeightedTopology(top)));
-        // We have topology - check if task has been mapped.
-        else if (weightedTop.get1())
-            // Do not use cache after GridComputeTask#map().
-            return new WeightedTopology(top).pickWeightedNode();
-
-        return weightedTop.get2().pickWeightedNode();
-    }
-
-    /**
-     * Calculates node load based on set probe.
-     *
-     * @param top List of all nodes.
-     * @param node Node to get load for.
-     * @return Node load.
-     * @throws GridException If returned load is negative.
-     */
-    @SuppressWarnings({"TooBroadScope"})
-    private double getLoad(Collection<ClusterNode> top, ClusterNode node) 
throws GridException {
-        assert !F.isEmpty(top);
-
-        int jobsSentSinceLastUpdate = 0;
-
-        rwLock.readLock().lock();
-
-        try {
-            AtomicInteger cnt = nodeJobs.get(node.id());
-
-            jobsSentSinceLastUpdate = cnt == null ? 0 : cnt.get();
-        }
-        finally {
-            rwLock.readLock().unlock();
-        }
-
-        double load = probe.getLoad(node, jobsSentSinceLastUpdate);
-
-        if (load < 0)
-            throw new GridException("Failed to obtain non-negative load from 
adaptive load probe: " + load);
-
-        return load;
-    }
-
-    /**
-     * Holder for weighted topology.
-     */
-    private class WeightedTopology {
-        /** Topology sorted by weight. */
-        private final SortedMap<Double, ClusterNode> circle = new TreeMap<>();
-
-        /**
-         * @param top Task topology.
-         * @throws GridException If any load was negative.
-         */
-        WeightedTopology(List<ClusterNode> top) throws GridException {
-            assert !F.isEmpty(top);
-
-            double totalLoad = 0;
-
-            // We need to cache loads here to avoid calls later as load might 
be
-            // changed between the calls.
-            double[] nums = new double[top.size()];
-
-            int zeroCnt = 0;
-
-            // Compute loads.
-            for (int i = 0; i < top.size(); i++) {
-                double load = getLoad(top, top.get(i));
-
-                nums[i] = load;
-
-                if (load == 0)
-                    zeroCnt++;
-
-                totalLoad += load;
-            }
-
-            // Take care of zero loads.
-            if (zeroCnt > 0) {
-                double newTotal = totalLoad;
-
-                int nonZeroCnt = top.size() - zeroCnt;
-
-                for (int i = 0; i < nums.length; i++) {
-                    double load = nums[i];
-
-                    if (load == 0) {
-                        if (nonZeroCnt > 0)
-                            load = totalLoad / nonZeroCnt;
-
-                        if (load == 0)
-                            load = 1;
-
-                        nums[i] = load;
-
-                        newTotal += load;
-                    }
-                }
-
-                totalLoad = newTotal;
-            }
-
-            double totalWeight = 0;
-
-            // Calculate weights and total weight.
-            for (int i = 0; i < nums.length; i++) {
-                assert nums[i] > 0 : "Invalid load: " + nums[i];
-
-                double weight = totalLoad / nums[i];
-
-                // Convert to weight.
-                nums[i] = weight;
-
-                totalWeight += weight;
-            }
-
-            double weight = 0;
-
-            // Enforce range from 0 to 1.
-            for (int i = 0; i < nums.length; i++) {
-                weight = i == nums.length - 1 ? 1.0d : weight + nums[i] / 
totalWeight;
-
-                assert weight < 2 : "Invalid weight: " + weight;
-
-                // Complexity of this put is O(logN).
-                circle.put(weight, top.get(i));
-            }
-        }
-
-        /**
-         * Gets weighted node in random fashion.
-         *
-         * @return Weighted node.
-         */
-        ClusterNode pickWeightedNode() {
-            double weight = RAND.nextDouble();
-
-            SortedMap<Double, ClusterNode> pick = circle.tailMap(weight);
-
-            ClusterNode node = pick.get(pick.firstKey());
-
-            rwLock.readLock().lock();
-
-            try {
-                AtomicInteger cnt = nodeJobs.get(node.id());
-
-                if (cnt != null)
-                    cnt.incrementAndGet();
-            }
-            finally {
-                rwLock.readLock().unlock();
-            }
-
-            return node;
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(GridAdaptiveLoadBalancingSpi.class, this);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a62862fe/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/adaptive/GridAdaptiveLoadBalancingSpiMBean.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/adaptive/GridAdaptiveLoadBalancingSpiMBean.java
 
b/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/adaptive/GridAdaptiveLoadBalancingSpiMBean.java
deleted file mode 100644
index a473d59..0000000
--- 
a/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/adaptive/GridAdaptiveLoadBalancingSpiMBean.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/* @java.file.header */
-
-/*  _________        _____ __________________        _____
- *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
- *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
- *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
- *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
- */
-
-package org.gridgain.grid.spi.loadbalancing.adaptive;
-
-import org.apache.ignite.mbean.*;
-import org.apache.ignite.spi.*;
-
-/**
- * Management MBean for {@link GridAdaptiveLoadBalancingSpi} SPI.
- */
-@IgniteMBeanDescription("MBean that provides access to adaptive load balancing 
SPI configuration.")
-public interface GridAdaptiveLoadBalancingSpiMBean extends 
IgniteSpiManagementMBean {
-    /**
-     * Gets text description of current load probing implementation used.
-     *
-     * @return Text description of current load probing implementation used.
-     */
-    @IgniteMBeanDescription("Text description of current load probing 
implementation used.")
-    public String getLoadProbeFormatted();
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a62862fe/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/adaptive/GridAdaptiveLoadProbe.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/adaptive/GridAdaptiveLoadProbe.java
 
b/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/adaptive/GridAdaptiveLoadProbe.java
deleted file mode 100644
index 88226cc..0000000
--- 
a/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/adaptive/GridAdaptiveLoadProbe.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/* @java.file.header */
-
-/*  _________        _____ __________________        _____
- *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
- *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
- *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
- *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
- */
-
-package org.gridgain.grid.spi.loadbalancing.adaptive;
-
-import org.apache.ignite.cluster.*;
-
-/**
- * Pluggable implementation of node load probing. Implementations
- * of this can be configured to be used with {@link 
GridAdaptiveLoadBalancingSpi}
- * by setting {@link 
GridAdaptiveLoadBalancingSpi#setLoadProbe(GridAdaptiveLoadProbe)}
- * configuration parameter.
- * <p>
- * Note that if {@link #getLoad(org.apache.ignite.cluster.ClusterNode, int)} 
returns a value of {@code 0},
- * then implementation will assume that load value is simply not available and
- * will try to calculate an average of load values for other nodes. If such
- * average cannot be obtained (all node load values are {@code 0}), then a 
value
- * of {@code 1} will be used.
- * <p>
- * By default, {@link GridAdaptiveCpuLoadProbe} probing implementation is used.
- * <p>
- * <h1 class="header">Example</h1>
- * Here is an example of how probing can be implemented to use
- * number of active and waiting jobs as probing mechanism:
- * <pre name="code" class="java">
- * public class FooBarLoadProbe implements GridAdaptiveLoadProbe {
- *     // Flag indicating whether to use average value or current.
- *     private int useAvg = true;
- *
- *     public FooBarLoadProbe(boolean useAvg) {
- *         this.useAvg = useAvg;
- *     }
- *
- *     // Calculate load based on number of active and waiting jobs.
- *     public double getLoad(GridNode node, int jobsSentSinceLastUpdate) {
- *         GridNodeMetrics metrics = node.getMetrics();
- *
- *         if (useAvg) {
- *             double load = metrics.getAverageActiveJobs() + 
metrics.getAverageWaitingJobs();
- *
- *             if (load > 0) {
- *                 return load;
- *             }
- *         }
- *
- *         return metrics.getCurrentActiveJobs() + 
metrics.getCurrentWaitingJobs();
- *     }
- * }
- * </pre>
- * Below is an example of how a probe shown above would be configured with 
{@link GridAdaptiveLoadBalancingSpi}
- * SPI:
- * <pre name="code" class="xml">
- * &lt;property name="loadBalancingSpi"&gt;
- *     &lt;bean 
class="org.gridgain.grid.spi.loadBalancing.adaptive.GridAdaptiveLoadBalancingSpi"&gt;
- *         &lt;property name="loadProbe"&gt;
- *             &lt;bean class="foo.bar.FooBarLoadProbe"&gt;
- *                 &lt;constructor-arg value="true"/&gt;
- *             &lt;/bean&gt;
- *         &lt;/property&gt;
- *     &lt;/bean&gt;
- * &lt;/property&gt;
- * </pre>
- */
-public interface GridAdaptiveLoadProbe {
-    /**
-     * Calculates load value for a given node. Specific implementations would
-     * usually take into account some of the values provided by
-     * {@link org.apache.ignite.cluster.ClusterNode#metrics()} method. For 
example, load can be calculated
-     * based on job execution time or number of active jobs, or CPU/Heap 
utilization.
-     * <p>
-     * Note that if this method returns a value of {@code 0},
-     * then implementation will assume that load value is simply not available 
and
-     * will try to calculate an average of load values for other nodes. If such
-     * average cannot be obtained (all node load values are {@code 0}), then a 
value
-     * of {@code 1} will be used.
-     *
-     * @param node Grid node to calculate load for.
-     * @param jobsSentSinceLastUpdate Number of jobs sent to this node since
-     *      last metrics update. This parameter may be useful when
-     *      implementation takes into account the current job count on a node.
-     * @return Non-negative load value for the node (zero and above).
-     */
-    public double getLoad(ClusterNode node, int jobsSentSinceLastUpdate);
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a62862fe/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/adaptive/GridAdaptiveProcessingTimeLoadProbe.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/adaptive/GridAdaptiveProcessingTimeLoadProbe.java
 
b/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/adaptive/GridAdaptiveProcessingTimeLoadProbe.java
deleted file mode 100644
index bc6f745..0000000
--- 
a/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/adaptive/GridAdaptiveProcessingTimeLoadProbe.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/* @java.file.header */
-
-/*  _________        _____ __________________        _____
- *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
- *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
- *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
- *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
- */
-
-package org.gridgain.grid.spi.loadbalancing.adaptive;
-
-import org.apache.ignite.cluster.*;
-import org.gridgain.grid.util.typedef.internal.*;
-
-/**
- * Implementation of node load probing based on total job processing time.
- * Based on {@link #setUseAverage(boolean)}
- * parameter, this implementation will either use average job execution
- * time values or current (default is to use averages). The algorithm
- * returns a sum of job wait time and job execution time.
- * <p>
- * Below is an example of how CPU load probe would be configured in GridGain
- * Spring configuration file:
- * <pre name="code" class="xml">
- * &lt;property name="loadBalancingSpi"&gt;
- *     &lt;bean 
class="org.gridgain.grid.spi.loadBalancing.adaptive.GridAdaptiveLoadBalancingSpi"&gt;
- *         &lt;property name="loadProbe"&gt;
- *             &lt;bean 
class="org.gridgain.grid.spi.loadBalancing.adaptive.GridAdaptiveProcessingTimeLoadProbe"&gt;
- *                 &lt;property name="useAverage" value="true"/&gt;
- *             &lt;/bean&gt;
- *         &lt;/property&gt;
- *     &lt;/bean&gt;
- * &lt;/property&gt;
- * </pre>
- */
-public class GridAdaptiveProcessingTimeLoadProbe implements 
GridAdaptiveLoadProbe {
-    /** Flag indicating whether to use average execution time vs. current. */
-    private boolean useAvg = true;
-
-    /**
-     * Initializes execution time load probe to use
-     * execution time average by default.
-     */
-    public GridAdaptiveProcessingTimeLoadProbe() {
-        // No-op.
-    }
-
-    /**
-     * Specifies whether to use average execution time vs. current.
-     *
-     * @param useAvg Flag indicating whether to use average execution time vs. 
current.
-     */
-    public GridAdaptiveProcessingTimeLoadProbe(boolean useAvg) {
-        this.useAvg = useAvg;
-    }
-
-    /**
-     * Gets flag indicating whether to use average execution time vs. current.
-     *
-     * @return Flag indicating whether to use average execution time vs. 
current.
-     */
-    public boolean isUseAverage() {
-        return useAvg;
-    }
-
-    /**
-     * Sets flag indicating whether to use average execution time vs. current.
-     *
-     * @param useAvg Flag indicating whether to use average execution time vs. 
current.
-     */
-    public void setUseAverage(boolean useAvg) {
-        this.useAvg = useAvg;
-    }
-
-
-    /** {@inheritDoc} */
-    @Override public double getLoad(ClusterNode node, int 
jobsSentSinceLastUpdate) {
-        ClusterNodeMetrics metrics = node.metrics();
-
-        if (useAvg) {
-            double load = metrics.getAverageJobExecuteTime() + 
metrics.getAverageJobWaitTime();
-
-            // If load is greater than 0, then we can use average times.
-            // Otherwise, we will proceed to using current times.
-            if (load > 0)
-                return load;
-        }
-
-        double load = metrics.getCurrentJobExecuteTime() + 
metrics.getCurrentJobWaitTime();
-
-        return load < 0 ? 0 : load;
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(GridAdaptiveProcessingTimeLoadProbe.class, this);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a62862fe/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/roundrobin/GridRoundRobinGlobalLoadBalancer.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/roundrobin/GridRoundRobinGlobalLoadBalancer.java
 
b/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/roundrobin/GridRoundRobinGlobalLoadBalancer.java
deleted file mode 100644
index 9234e3c..0000000
--- 
a/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/roundrobin/GridRoundRobinGlobalLoadBalancer.java
+++ /dev/null
@@ -1,305 +0,0 @@
-/* @java.file.header */
-
-/*  _________        _____ __________________        _____
- *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
- *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
- *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
- *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
- */
-
-package org.gridgain.grid.spi.loadbalancing.roundrobin;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cluster.*;
-import org.apache.ignite.events.*;
-import org.apache.ignite.spi.*;
-import org.gridgain.grid.*;
-import org.gridgain.grid.kernal.managers.eventstorage.*;
-import org.gridgain.grid.util.typedef.*;
-import org.gridgain.grid.util.typedef.internal.*;
-
-import java.util.*;
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.*;
-
-import static org.apache.ignite.events.IgniteEventType.*;
-
-/**
- * Load balancer that works in global (not-per-task) mode.
- */
-class GridRoundRobinGlobalLoadBalancer {
-    /** SPI context. */
-    private IgniteSpiContext ctx;
-
-    /** Listener for node's events. */
-    private GridLocalEventListener lsnr;
-
-    /** Logger. */
-    private final IgniteLogger log;
-
-    /** Current snapshot of nodes which participated in load balancing. */
-    private volatile GridNodeList nodeList = new GridNodeList(0, null);
-
-    /** Mutex for updating current topology. */
-    private final Object mux = new Object();
-
-    /** Barrier for separating initialization callback and load balancing 
routine. */
-    private final CountDownLatch initLatch = new CountDownLatch(1);
-
-    /**
-     * @param log Grid logger.
-     */
-    GridRoundRobinGlobalLoadBalancer(IgniteLogger log) {
-        assert log != null;
-
-        this.log = log;
-    }
-
-    /**
-     * @param ctx Load balancing context.
-     */
-    void onContextInitialized(final IgniteSpiContext ctx) {
-        this.ctx = ctx;
-
-        ctx.addLocalEventListener(
-            lsnr = new GridLocalEventListener() {
-                @Override public void onEvent(IgniteEvent evt) {
-                    assert evt instanceof IgniteDiscoveryEvent;
-
-                    UUID nodeId = ((IgniteDiscoveryEvent)evt).eventNode().id();
-
-                    synchronized (mux) {
-                        if (evt.type() == EVT_NODE_JOINED) {
-                            List<UUID> oldNodes = nodeList.getNodes();
-
-                            if (!oldNodes.contains(nodeId)) {
-                                List<UUID> newNodes = new 
ArrayList<>(oldNodes.size() + 1);
-
-                                newNodes.add(nodeId);
-
-                                for (UUID node : oldNodes)
-                                    newNodes.add(node);
-
-                                nodeList = new GridNodeList(0, newNodes);
-                            }
-                        }
-                        else {
-                            assert evt.type() == EVT_NODE_LEFT || evt.type() 
== EVT_NODE_FAILED;
-
-                            List<UUID> oldNodes = nodeList.getNodes();
-
-                            if (oldNodes.contains(nodeId)) {
-                                List<UUID> newNodes = new 
ArrayList<>(oldNodes.size() - 1);
-
-                                for (UUID node : oldNodes)
-                                    if (!nodeId.equals(node))
-                                        newNodes.add(node);
-
-                                nodeList = new GridNodeList(0, newNodes);
-                            }
-                        }
-                    }
-                }
-            },
-            EVT_NODE_FAILED, EVT_NODE_JOINED, EVT_NODE_LEFT
-        );
-
-        synchronized (mux) {
-            List<UUID> oldNodes = nodeList.getNodes();
-
-            Collection<UUID> set = oldNodes == null ? new HashSet<UUID>() : 
new HashSet<>(oldNodes);
-
-            for (ClusterNode node : ctx.nodes())
-                set.add(node.id());
-
-            nodeList = new GridNodeList(0, new ArrayList<>(set));
-        }
-
-        initLatch.countDown();
-    }
-
-    /** */
-    void onContextDestroyed() {
-        if (ctx != null)
-            ctx.removeLocalEventListener(lsnr);
-    }
-
-    /**
-     * Gets balanced node for given topology.
-     *
-     * @param top Topology to pick from.
-     * @return Best balanced node.
-     * @throws GridException Thrown in case of any error.
-     */
-    ClusterNode getBalancedNode(Collection<ClusterNode> top) throws 
GridException {
-        assert !F.isEmpty(top);
-
-        awaitInitializationCompleted();
-
-        Map<UUID, ClusterNode> topMap = null;
-
-        ClusterNode found;
-
-        int misses = 0;
-
-        do {
-            GridNodeList nodeList = this.nodeList;
-
-            List<UUID> nodes = nodeList.getNodes();
-
-            int cycleSize = nodes.size();
-
-            if (cycleSize == 0)
-                throw new GridException("Task topology does not have any alive 
nodes.");
-
-            AtomicInteger idx;
-
-            int curIdx, nextIdx;
-
-            do {
-                idx = nodeList.getCurrentIdx();
-
-                curIdx = idx.get();
-
-                nextIdx = (idx.get() + 1) % cycleSize;
-            }
-            while (!idx.compareAndSet(curIdx, nextIdx));
-
-            found = findNodeById(top, nodes.get(nextIdx));
-
-            if (found == null) {
-                misses++;
-
-                // For optimization purposes checks balancer can return at 
least one node with specified
-                // request topology only after full cycle (approximately).
-                if (misses >= cycleSize) {
-                    if (topMap == null) {
-                        topMap = U.newHashMap(top.size());
-
-                        for (ClusterNode node : top)
-                            topMap.put(node.id(), node);
-                    }
-
-                    checkBalancerNodes(top, topMap, nodes);
-
-                    // Zero miss counter so next topology check will be 
performed once again after full cycle.
-                    misses = 0;
-                }
-            }
-        }
-        while (found == null);
-
-        if (log.isDebugEnabled())
-            log.debug("Found round-robin node: " + found);
-
-        return found;
-    }
-
-    /**
-     * Finds node by id. Returns null in case of absence of specified id in 
request topology.
-     *
-     * @param top Topology for current request.
-     * @param foundNodeId Node id.
-     * @return Found node or null in case of absence of specified id in 
request topology.
-     */
-    private static ClusterNode findNodeById(Iterable<ClusterNode> top, UUID 
foundNodeId) {
-        for (ClusterNode node : top)
-            if (foundNodeId.equals(node.id()))
-                return node;
-
-        return null;
-    }
-
-    /**
-     * Checks if balancer can return at least one node,
-     * throw exception otherwise.
-     *
-     * @param top Topology for current request.
-     * @param topMap Topology map.
-     * @param nodes Current balanced nodes.
-     * @throws GridException If balancer can not return any node.
-     */
-    private static void checkBalancerNodes(Collection<ClusterNode> top, 
Map<UUID, ClusterNode> topMap, Iterable<UUID> nodes)
-        throws GridException {
-
-        boolean contains = false;
-
-        for (UUID nodeId : nodes) {
-            if (topMap.get(nodeId) != null) {
-                contains = true;
-
-                break;
-            }
-        }
-
-        if (!contains)
-            throw new GridException("Task topology does not have alive nodes: 
" + top);
-    }
-
-    /**
-     * Awaits initialization of balancing nodes to be completed.
-     *
-     * @throws GridException Thrown in case of thread interruption.
-     */
-    private void awaitInitializationCompleted() throws GridException {
-        try {
-            if (initLatch.getCount() > 0)
-                initLatch.await();
-        }
-        catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-
-            throw new GridException("Global balancer was interrupted.", e);
-        }
-    }
-
-    /**
-     * Snapshot of nodes which participated in load balancing.
-     */
-    private static final class GridNodeList {
-        /** Cyclic pointer for selecting next node. */
-        private final AtomicInteger curIdx;
-
-        /** Node ids. */
-        private final List<UUID> nodes;
-
-        /**
-         * @param curIdx Initial index of current node.
-         * @param nodes Initial node ids.
-         */
-        private GridNodeList(int curIdx, List<UUID> nodes) {
-            this.curIdx = new AtomicInteger(curIdx);
-            this.nodes = nodes;
-        }
-
-        /**
-         * @return Index of current node.
-         */
-        private AtomicInteger getCurrentIdx() {
-            return curIdx;
-        }
-
-        /**
-         * @return Node ids.
-         */
-        private List<UUID> getNodes() {
-            return nodes;
-        }
-    }
-
-    /**
-     * THIS METHOD IS USED ONLY FOR TESTING.
-     *
-     * @return Internal list of nodes.
-     */
-    List<UUID> getNodeIds() {
-        List<UUID> nodes = nodeList.getNodes();
-
-        return Collections.unmodifiableList(nodes);
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(GridRoundRobinGlobalLoadBalancer.class, this);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a62862fe/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/roundrobin/GridRoundRobinLoadBalancingSpi.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/roundrobin/GridRoundRobinLoadBalancingSpi.java
 
b/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/roundrobin/GridRoundRobinLoadBalancingSpi.java
deleted file mode 100644
index 7a85098..0000000
--- 
a/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/roundrobin/GridRoundRobinLoadBalancingSpi.java
+++ /dev/null
@@ -1,319 +0,0 @@
-/* @java.file.header */
-
-/*  _________        _____ __________________        _____
- *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
- *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
- *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
- *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
- */
-
-package org.gridgain.grid.spi.loadbalancing.roundrobin;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cluster.*;
-import org.apache.ignite.compute.*;
-import org.apache.ignite.events.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.resources.*;
-import org.apache.ignite.spi.*;
-import org.gridgain.grid.*;
-import org.gridgain.grid.kernal.managers.eventstorage.*;
-import org.gridgain.grid.spi.loadbalancing.*;
-import org.gridgain.grid.util.typedef.internal.*;
-import org.jdk8.backport.*;
-import org.jetbrains.annotations.*;
-
-import java.util.*;
-
-import static org.apache.ignite.events.IgniteEventType.*;
-
-/**
- * This SPI iterates through nodes in round-robin fashion and pick the next
- * sequential node. Two modes of operation are supported: per-task and global
- * (see {@link #setPerTask(boolean)} configuration).
- * <p>
- * When configured in per-task mode, implementation will pick a random starting
- * node at the beginning of every task execution and then sequentially iterate 
through all
- * nodes in topology starting from the picked node. This is the default 
configuration
- * and should fit most of the use cases as it provides a fairly 
well-distributed
- * split and also ensures that jobs within a single task are spread out across
- * nodes to the maximum. For cases when split size is equal to the number of 
nodes,
- * this mode guarantees that all nodes will participate in the split.
- * <p>
- * When configured in global mode, a single sequential queue of nodes is 
maintained for
- * all tasks and the next node in the queue is picked every time. In this mode 
(unlike in
- * {@code per-task} mode) it is possible that even if split size may be equal 
to the
- * number of nodes, some jobs within the same task will be assigned to the 
same node if
- * multiple tasks are executing concurrently.
- * <h1 class="header">Coding Example</h1>
- * If you are using {@link org.apache.ignite.compute.ComputeTaskSplitAdapter} 
then load balancing logic
- * is transparent to your code and is handled automatically by the adapter.
- * Here is an example of how your task will look:
- * <pre name="code" class="java">
- * public class MyFooBarTask extends GridComputeTaskSplitAdapter&lt;Object, 
Object&gt; {
- *    &#64;Override
- *    protected Collection&lt;? extends GridComputeJob&gt; split(int gridSize, 
Object arg) throws GridException {
- *        List&lt;MyFooBarJob&gt; jobs = new 
ArrayList&lt;MyFooBarJob&gt;(gridSize);
- *
- *        for (int i = 0; i &lt; gridSize; i++) {
- *            jobs.add(new MyFooBarJob(arg));
- *        }
- *
- *        // Node assignment via load balancer
- *        // happens automatically.
- *        return jobs;
- *    }
- *    ...
- * }
- * </pre>
- * If you need more fine-grained control over how some jobs within task get 
mapped to a node
- * and use affinity load balancing for some other jobs within task, then you 
should use
- * {@link org.apache.ignite.compute.ComputeTaskAdapter}. Here is an example of 
how your task will look. Note that in this
- * case we manually inject load balancer and use it to pick the best node. 
Doing it in
- * such way would allow user to map some jobs manually and for others use load 
balancer.
- * <pre name="code" class="java">
- * public class MyFooBarTask extends GridComputeTaskAdapter&lt;String, 
String&gt; {
- *    // Inject load balancer.
- *    &#64;GridLoadBalancerResource
- *    GridComputeLoadBalancer balancer;
- *
- *    // Map jobs to grid nodes.
- *    public Map&lt;? extends GridComputeJob, GridNode&gt; 
map(List&lt;GridNode&gt; subgrid, String arg) throws GridException {
- *        Map&lt;MyFooBarJob, GridNode&gt; jobs = new HashMap&lt;MyFooBarJob, 
GridNode&gt;(subgrid.size());
- *
- *        // In more complex cases, you can actually do
- *        // more complicated assignments of jobs to nodes.
- *        for (int i = 0; i &lt; subgrid.size(); i++) {
- *            // Pick the next best balanced node for the job.
- *            jobs.put(new MyFooBarJob(arg), balancer.getBalancedNode())
- *        }
- *
- *        return jobs;
- *    }
- *
- *    // Aggregate results into one compound result.
- *    public String reduce(List&lt;GridComputeJobResult&gt; results) throws 
GridException {
- *        // For the purpose of this example we simply
- *        // concatenate string representation of every
- *        // job result
- *        StringBuilder buf = new StringBuilder();
- *
- *        for (GridComputeJobResult res : results) {
- *            // Append string representation of result
- *            // returned by every job.
- *            buf.append(res.getData().string());
- *        }
- *
- *        return buf.string();
- *    }
- * }
- * </pre>
- * <p>
- * <h1 class="header">Configuration</h1>
- * In order to use this load balancer, you should configure your grid instance
- * to use {@code GridRoundRobinLoadBalancingSpi} either from Spring XML file or
- * directly. The following configuration parameters are supported:
- * <h2 class="header">Mandatory</h2>
- * This SPI has no mandatory configuration parameters.
- * <h2 class="header">Optional</h2>
- * The following configuration parameters are optional:
- * <ul>
- * <li>
- *      Flag that indicates whether to use {@code per-task} or global
- *      round-robin modes described above (see {@link #setPerTask(boolean)}).
- * </li>
- * </ul>
- * Below is Java configuration example:
- * <pre name="code" class="java">
- * GridRandomLoadBalancingSpi = new GridRandomLoadBalancingSpi();
- *
- * // Configure SPI to use global round-robin mode.
- * spi.setPerTask(false);
- *
- * GridConfiguration cfg = new GridConfiguration();
- *
- * // Override default load balancing SPI.
- * cfg.setLoadBalancingSpi(spi);
- *
- * // Starts grid.
- * G.start(cfg);
- * </pre>
- * Here is how you can configure {@code GridRandomLoadBalancingSpi} using 
Spring XML configuration:
- * <pre name="code" class="xml">
- * &lt;property name="loadBalancingSpi"&gt;
- *     &lt;bean 
class="org.gridgain.grid.spi.loadBalancing.roundrobin.GridRoundRobinLoadBalancingSpi"&gt;
- *         &lt;!-- Set to global round-robin mode. --&gt;
- *         &lt;property name="perTask" value="false"/&gt;
- *     &lt;/bean&gt;
- * &lt;/property&gt;
- * </pre>
- * <p>
- * <img src="http://www.gridgain.com/images/spring-small.png";>
- * <br>
- * For information about Spring framework visit <a 
href="http://www.springframework.org/";>www.springframework.org</a>
- */
-@IgniteSpiMultipleInstancesSupport(true)
-public class GridRoundRobinLoadBalancingSpi extends IgniteSpiAdapter 
implements GridLoadBalancingSpi,
-    GridRoundRobinLoadBalancingSpiMBean {
-    /** Grid logger. */
-    @IgniteLoggerResource
-    private IgniteLogger log;
-
-    /** */
-    private GridRoundRobinGlobalLoadBalancer balancer;
-
-    /** */
-    private boolean isPerTask;
-
-    /** */
-    private final Map<IgniteUuid, GridRoundRobinPerTaskLoadBalancer> 
perTaskBalancers =
-        new ConcurrentHashMap8<>();
-
-    /** Event listener. */
-    private final GridLocalEventListener lsnr = new GridLocalEventListener() {
-        @Override public void onEvent(IgniteEvent evt) {
-            if (evt.type() == EVT_TASK_FAILED ||
-                evt.type() == EVT_TASK_FINISHED)
-                
perTaskBalancers.remove(((IgniteTaskEvent)evt).taskSessionId());
-            else if (evt.type() == EVT_JOB_MAPPED) {
-                GridRoundRobinPerTaskLoadBalancer balancer =
-                    
perTaskBalancers.get(((IgniteJobEvent)evt).taskSessionId());
-
-                if (balancer != null)
-                    balancer.onMapped();
-            }
-        }
-    };
-
-    /** {@inheritDoc} */
-    @Override public boolean isPerTask() {
-        return isPerTask;
-    }
-
-    /**
-     * Configuration parameter indicating whether a new round robin order 
should be
-     * created for every task. If {@code true} then load balancer is guaranteed
-     * to iterate through nodes sequentially for every task - so as long as 
number
-     * of jobs is less than or equal to the number of nodes, jobs are 
guaranteed to
-     * be assigned to unique nodes. If {@code false} then one round-robin order
-     * will be maintained for all tasks, so when tasks execute concurrently, it
-     * is possible for more than one job within task to be assigned to the same
-     * node.
-     * <p>
-     * Default is {@code false}.
-     *
-     * @param isPerTask Configuration parameter indicating whether a new round 
robin order should
-     *      be created for every task. Default is {@code false}.
-     */
-    @IgniteSpiConfiguration(optional = true)
-    public void setPerTask(boolean isPerTask) {
-        this.isPerTask = isPerTask;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void spiStart(@Nullable String gridName) throws 
IgniteSpiException {
-        startStopwatch();
-
-        if (log.isDebugEnabled())
-            log.debug(configInfo("isPerTask", isPerTask));
-
-        registerMBean(gridName, this, 
GridRoundRobinLoadBalancingSpiMBean.class);
-
-        balancer = new GridRoundRobinGlobalLoadBalancer(log);
-
-        // Ack ok start.
-        if (log.isDebugEnabled())
-            log.debug(startInfo());
-    }
-
-    /** {@inheritDoc} */
-    @Override public void spiStop() throws IgniteSpiException {
-        balancer = null;
-
-        perTaskBalancers.clear();
-
-        unregisterMBean();
-
-        // Ack ok stop.
-        if (log.isDebugEnabled())
-            log.debug(stopInfo());
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void onContextInitialized0(IgniteSpiContext spiCtx) 
throws IgniteSpiException {
-        if (!isPerTask)
-            balancer.onContextInitialized(spiCtx);
-        else {
-            if (!getSpiContext().isEventRecordable(EVT_TASK_FAILED, 
EVT_TASK_FINISHED, EVT_JOB_MAPPED))
-                throw new IgniteSpiException("Required event types are 
disabled: " +
-                    U.gridEventName(EVT_TASK_FAILED) + ", " +
-                    U.gridEventName(EVT_TASK_FINISHED) + ", " +
-                    U.gridEventName(EVT_JOB_MAPPED));
-
-            getSpiContext().addLocalEventListener(lsnr, EVT_TASK_FAILED, 
EVT_TASK_FINISHED, EVT_JOB_MAPPED);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void onContextDestroyed0() {
-        if (!isPerTask) {
-            if (balancer != null)
-                balancer.onContextDestroyed();
-        }
-        else {
-            IgniteSpiContext spiCtx = getSpiContext();
-
-            if (spiCtx != null)
-                spiCtx.removeLocalEventListener(lsnr);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public ClusterNode getBalancedNode(ComputeTaskSession ses, 
List<ClusterNode> top, ComputeJob job)
-        throws GridException {
-        A.notNull(ses, "ses", top, "top");
-
-        if (isPerTask) {
-            // Note that every session operates from single thread which
-            // allows us to use concurrent map and avoid synchronization.
-            GridRoundRobinPerTaskLoadBalancer taskBalancer = 
perTaskBalancers.get(ses.getId());
-
-            if (taskBalancer == null)
-                perTaskBalancers.put(ses.getId(), taskBalancer = new 
GridRoundRobinPerTaskLoadBalancer());
-
-            return taskBalancer.getBalancedNode(top);
-        }
-
-        return balancer.getBalancedNode(top);
-    }
-
-    /**
-     * THIS METHOD IS USED ONLY FOR TESTING.
-     *
-     * @param ses Task session.
-     * @return Internal list of nodes.
-     */
-    List<UUID> getNodeIds(ComputeTaskSession ses) {
-        if (isPerTask) {
-            GridRoundRobinPerTaskLoadBalancer balancer = 
perTaskBalancers.get(ses.getId());
-
-            if (balancer == null)
-                return Collections.emptyList();
-
-            List<UUID> ids = new ArrayList<>();
-
-            for (ClusterNode node : balancer.getNodes()) {
-                ids.add(node.id());
-            }
-
-            return ids;
-        }
-
-        return balancer.getNodeIds();
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(GridRoundRobinLoadBalancingSpi.class, this);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a62862fe/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/roundrobin/GridRoundRobinLoadBalancingSpiMBean.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/roundrobin/GridRoundRobinLoadBalancingSpiMBean.java
 
b/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/roundrobin/GridRoundRobinLoadBalancingSpiMBean.java
deleted file mode 100644
index 2f561ae..0000000
--- 
a/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/roundrobin/GridRoundRobinLoadBalancingSpiMBean.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/* @java.file.header */
-
-/*  _________        _____ __________________        _____
- *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
- *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
- *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
- *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
- */
-
-package org.gridgain.grid.spi.loadbalancing.roundrobin;
-
-import org.apache.ignite.mbean.*;
-import org.apache.ignite.spi.*;
-
-/**
- * Management bean for {@link GridRoundRobinLoadBalancingSpi} SPI.
- */
-@IgniteMBeanDescription("MBean that provides access to round robin load 
balancing SPI configuration.")
-public interface GridRoundRobinLoadBalancingSpiMBean extends 
IgniteSpiManagementMBean {
-    /**
-     * Configuration parameter indicating whether a new round robin order 
should be
-     * created for every task. If {@code true} then load balancer is guaranteed
-     * to iterate through nodes sequentially for every task - so as long as 
number
-     * of jobs is less than or equal to the number of nodes, jobs are 
guaranteed to
-     * be assigned to unique nodes. If {@code false} then one round-robin order
-     * will be maintained for all tasks, so when tasks execute concurrently, it
-     * is possible for more than one job within task to be assigned to the same
-     * node.
-     * <p>
-     * Default is {@code true}.
-     *
-     * @return Configuration parameter indicating whether a new round robin 
order should
-     *      be created for every task. Default is {@code true}.
-     */
-    @IgniteMBeanDescription("Configuration parameter indicating whether a new 
round robin order should be created for every task.")
-    public boolean isPerTask();
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a62862fe/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/roundrobin/GridRoundRobinPerTaskLoadBalancer.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/roundrobin/GridRoundRobinPerTaskLoadBalancer.java
 
b/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/roundrobin/GridRoundRobinPerTaskLoadBalancer.java
deleted file mode 100644
index 4544955..0000000
--- 
a/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/roundrobin/GridRoundRobinPerTaskLoadBalancer.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/* @java.file.header */
-
-/*  _________        _____ __________________        _____
- *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
- *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
- *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
- *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
- */
-
-package org.gridgain.grid.spi.loadbalancing.roundrobin;
-
-import org.apache.ignite.cluster.*;
-
-import java.util.*;
-
-/**
- * Load balancer for per-task configuration.
- */
-class GridRoundRobinPerTaskLoadBalancer {
-    /** Balancing nodes. */
-    private ArrayDeque<ClusterNode> nodeQueue;
-
-    /** Jobs mapped flag. */
-    private volatile boolean isMapped;
-
-    /** Mutex. */
-    private final Object mux = new Object();
-
-    /**
-     * Call back for job mapped event.
-     */
-    void onMapped() {
-        isMapped = true;
-    }
-
-    /**
-     * Gets balanced node for given topology. This implementation
-     * is to be used only from {@link 
org.apache.ignite.compute.ComputeTask#map(List, Object)} method
-     * and, therefore, does not need to be thread-safe.
-     *
-     * @param top Topology to pick from.
-     * @return Best balanced node.
-     */
-    ClusterNode getBalancedNode(List<ClusterNode> top) {
-        assert top != null;
-        assert !top.isEmpty();
-
-        boolean readjust = isMapped;
-
-        synchronized (mux) {
-            // Populate first time.
-            if (nodeQueue == null)
-                nodeQueue = new ArrayDeque<>(top);
-
-            // If job has been mapped, then it means
-            // that it is most likely being failed over.
-            // In this case topology might have changed
-            // and we need to readjust with every apply.
-            if (readjust)
-                // Add missing nodes.
-                for (ClusterNode node : top)
-                    if (!nodeQueue.contains(node))
-                        nodeQueue.offer(node);
-
-            ClusterNode next = nodeQueue.poll();
-
-            // If jobs have been mapped, we need to make sure
-            // that queued node is still in topology.
-            if (readjust && next != null) {
-                while (!top.contains(next) && !nodeQueue.isEmpty())
-                    next = nodeQueue.poll();
-
-                // No nodes found and queue is empty.
-                if (next != null && !top.contains(next))
-                    return null;
-            }
-
-            if (next != null)
-                // Add to the end.
-                nodeQueue.offer(next);
-
-            return next;
-        }
-    }
-
-    /**
-     * THIS METHOD IS USED ONLY FOR TESTING.
-     *
-     * @return Internal list of nodes.
-     */
-    List<ClusterNode> getNodes() {
-        synchronized (mux) {
-            return Collections.unmodifiableList(new ArrayList<>(nodeQueue));
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a62862fe/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/roundrobin/RoundRobinGlobalLoadBalancer.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/roundrobin/RoundRobinGlobalLoadBalancer.java
 
b/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/roundrobin/RoundRobinGlobalLoadBalancer.java
new file mode 100644
index 0000000..e467bac
--- /dev/null
+++ 
b/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/roundrobin/RoundRobinGlobalLoadBalancer.java
@@ -0,0 +1,305 @@
+/* @java.file.header */
+
+/*  _________        _____ __________________        _____
+ *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
+ *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+ *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+ *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+ */
+
+package org.gridgain.grid.spi.loadbalancing.roundrobin;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.events.*;
+import org.apache.ignite.spi.*;
+import org.gridgain.grid.*;
+import org.gridgain.grid.kernal.managers.eventstorage.*;
+import org.gridgain.grid.util.typedef.*;
+import org.gridgain.grid.util.typedef.internal.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+import static org.apache.ignite.events.IgniteEventType.*;
+
+/**
+ * Load balancer that works in global (not-per-task) mode.
+ */
+class RoundRobinGlobalLoadBalancer {
+    /** SPI context. */
+    private IgniteSpiContext ctx;
+
+    /** Listener for node's events. */
+    private GridLocalEventListener lsnr;
+
+    /** Logger. */
+    private final IgniteLogger log;
+
+    /** Current snapshot of nodes which participated in load balancing. */
+    private volatile GridNodeList nodeList = new GridNodeList(0, null);
+
+    /** Mutex for updating current topology. */
+    private final Object mux = new Object();
+
+    /** Barrier for separating initialization callback and load balancing 
routine. */
+    private final CountDownLatch initLatch = new CountDownLatch(1);
+
+    /**
+     * @param log Grid logger.
+     */
+    RoundRobinGlobalLoadBalancer(IgniteLogger log) {
+        assert log != null;
+
+        this.log = log;
+    }
+
+    /**
+     * @param ctx Load balancing context.
+     */
+    void onContextInitialized(final IgniteSpiContext ctx) {
+        this.ctx = ctx;
+
+        ctx.addLocalEventListener(
+            lsnr = new GridLocalEventListener() {
+                @Override public void onEvent(IgniteEvent evt) {
+                    assert evt instanceof IgniteDiscoveryEvent;
+
+                    UUID nodeId = ((IgniteDiscoveryEvent)evt).eventNode().id();
+
+                    synchronized (mux) {
+                        if (evt.type() == EVT_NODE_JOINED) {
+                            List<UUID> oldNodes = nodeList.getNodes();
+
+                            if (!oldNodes.contains(nodeId)) {
+                                List<UUID> newNodes = new 
ArrayList<>(oldNodes.size() + 1);
+
+                                newNodes.add(nodeId);
+
+                                for (UUID node : oldNodes)
+                                    newNodes.add(node);
+
+                                nodeList = new GridNodeList(0, newNodes);
+                            }
+                        }
+                        else {
+                            assert evt.type() == EVT_NODE_LEFT || evt.type() 
== EVT_NODE_FAILED;
+
+                            List<UUID> oldNodes = nodeList.getNodes();
+
+                            if (oldNodes.contains(nodeId)) {
+                                List<UUID> newNodes = new 
ArrayList<>(oldNodes.size() - 1);
+
+                                for (UUID node : oldNodes)
+                                    if (!nodeId.equals(node))
+                                        newNodes.add(node);
+
+                                nodeList = new GridNodeList(0, newNodes);
+                            }
+                        }
+                    }
+                }
+            },
+            EVT_NODE_FAILED, EVT_NODE_JOINED, EVT_NODE_LEFT
+        );
+
+        synchronized (mux) {
+            List<UUID> oldNodes = nodeList.getNodes();
+
+            Collection<UUID> set = oldNodes == null ? new HashSet<UUID>() : 
new HashSet<>(oldNodes);
+
+            for (ClusterNode node : ctx.nodes())
+                set.add(node.id());
+
+            nodeList = new GridNodeList(0, new ArrayList<>(set));
+        }
+
+        initLatch.countDown();
+    }
+
+    /** */
+    void onContextDestroyed() {
+        if (ctx != null)
+            ctx.removeLocalEventListener(lsnr);
+    }
+
+    /**
+     * Gets balanced node for given topology.
+     *
+     * @param top Topology to pick from.
+     * @return Best balanced node.
+     * @throws GridException Thrown in case of any error.
+     */
+    ClusterNode getBalancedNode(Collection<ClusterNode> top) throws 
GridException {
+        assert !F.isEmpty(top);
+
+        awaitInitializationCompleted();
+
+        Map<UUID, ClusterNode> topMap = null;
+
+        ClusterNode found;
+
+        int misses = 0;
+
+        do {
+            GridNodeList nodeList = this.nodeList;
+
+            List<UUID> nodes = nodeList.getNodes();
+
+            int cycleSize = nodes.size();
+
+            if (cycleSize == 0)
+                throw new GridException("Task topology does not have any alive 
nodes.");
+
+            AtomicInteger idx;
+
+            int curIdx, nextIdx;
+
+            do {
+                idx = nodeList.getCurrentIdx();
+
+                curIdx = idx.get();
+
+                nextIdx = (idx.get() + 1) % cycleSize;
+            }
+            while (!idx.compareAndSet(curIdx, nextIdx));
+
+            found = findNodeById(top, nodes.get(nextIdx));
+
+            if (found == null) {
+                misses++;
+
+                // For optimization purposes checks balancer can return at 
least one node with specified
+                // request topology only after full cycle (approximately).
+                if (misses >= cycleSize) {
+                    if (topMap == null) {
+                        topMap = U.newHashMap(top.size());
+
+                        for (ClusterNode node : top)
+                            topMap.put(node.id(), node);
+                    }
+
+                    checkBalancerNodes(top, topMap, nodes);
+
+                    // Zero miss counter so next topology check will be 
performed once again after full cycle.
+                    misses = 0;
+                }
+            }
+        }
+        while (found == null);
+
+        if (log.isDebugEnabled())
+            log.debug("Found round-robin node: " + found);
+
+        return found;
+    }
+
+    /**
+     * Finds node by id. Returns null in case of absence of specified id in 
request topology.
+     *
+     * @param top Topology for current request.
+     * @param foundNodeId Node id.
+     * @return Found node or null in case of absence of specified id in 
request topology.
+     */
+    private static ClusterNode findNodeById(Iterable<ClusterNode> top, UUID 
foundNodeId) {
+        for (ClusterNode node : top)
+            if (foundNodeId.equals(node.id()))
+                return node;
+
+        return null;
+    }
+
+    /**
+     * Checks if balancer can return at least one node,
+     * throw exception otherwise.
+     *
+     * @param top Topology for current request.
+     * @param topMap Topology map.
+     * @param nodes Current balanced nodes.
+     * @throws GridException If balancer can not return any node.
+     */
+    private static void checkBalancerNodes(Collection<ClusterNode> top, 
Map<UUID, ClusterNode> topMap, Iterable<UUID> nodes)
+        throws GridException {
+
+        boolean contains = false;
+
+        for (UUID nodeId : nodes) {
+            if (topMap.get(nodeId) != null) {
+                contains = true;
+
+                break;
+            }
+        }
+
+        if (!contains)
+            throw new GridException("Task topology does not have alive nodes: 
" + top);
+    }
+
+    /**
+     * Awaits initialization of balancing nodes to be completed.
+     *
+     * @throws GridException Thrown in case of thread interruption.
+     */
+    private void awaitInitializationCompleted() throws GridException {
+        try {
+            if (initLatch.getCount() > 0)
+                initLatch.await();
+        }
+        catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+
+            throw new GridException("Global balancer was interrupted.", e);
+        }
+    }
+
+    /**
+     * Snapshot of nodes which participated in load balancing.
+     */
+    private static final class GridNodeList {
+        /** Cyclic pointer for selecting next node. */
+        private final AtomicInteger curIdx;
+
+        /** Node ids. */
+        private final List<UUID> nodes;
+
+        /**
+         * @param curIdx Initial index of current node.
+         * @param nodes Initial node ids.
+         */
+        private GridNodeList(int curIdx, List<UUID> nodes) {
+            this.curIdx = new AtomicInteger(curIdx);
+            this.nodes = nodes;
+        }
+
+        /**
+         * @return Index of current node.
+         */
+        private AtomicInteger getCurrentIdx() {
+            return curIdx;
+        }
+
+        /**
+         * @return Node ids.
+         */
+        private List<UUID> getNodes() {
+            return nodes;
+        }
+    }
+
+    /**
+     * THIS METHOD IS USED ONLY FOR TESTING.
+     *
+     * @return Internal list of nodes.
+     */
+    List<UUID> getNodeIds() {
+        List<UUID> nodes = nodeList.getNodes();
+
+        return Collections.unmodifiableList(nodes);
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(RoundRobinGlobalLoadBalancer.class, this);
+    }
+}

Reply via email to