# Renaming
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/a62862fe Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/a62862fe Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/a62862fe Branch: refs/heads/master Commit: a62862fe9bf5d4e329b2531b89b80efed285c847 Parents: 12837f3 Author: sboikov <sboi...@gridgain.com> Authored: Fri Dec 5 17:08:40 2014 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Fri Dec 5 17:08:40 2014 +0300 ---------------------------------------------------------------------- .../java/org/apache/ignite/IgniteCompute.java | 3 +- .../ignite/compute/ComputeLoadBalancer.java | 3 +- .../apache/ignite/compute/ComputeTaskSpis.java | 4 +- .../configuration/IgniteConfiguration.java | 13 +- .../resources/IgniteLoadBalancerResource.java | 4 +- .../ignite/spi/failover/FailoverContext.java | 4 +- .../affinity/GridCacheAffinityKeyMapped.java | 3 +- .../org/gridgain/grid/kernal/GridGainEx.java | 4 +- .../loadbalancer/GridLoadBalancerManager.java | 2 +- .../spi/loadbalancing/GridLoadBalancingSpi.java | 117 ---- .../spi/loadbalancing/LoadBalancingSpi.java | 114 ++++ .../adaptive/AdaptiveCpuLoadProbe.java | 229 ++++++++ .../adaptive/AdaptiveJobCountLoadProbe.java | 96 +++ .../adaptive/AdaptiveLoadBalancingSpi.java | 581 +++++++++++++++++++ .../adaptive/AdaptiveLoadBalancingSpiMBean.java | 27 + .../adaptive/AdaptiveLoadProbe.java | 90 +++ .../AdaptiveProcessingTimeLoadProbe.java | 98 ++++ .../adaptive/GridAdaptiveCpuLoadProbe.java | 229 -------- .../adaptive/GridAdaptiveJobCountLoadProbe.java | 96 --- .../adaptive/GridAdaptiveLoadBalancingSpi.java | 581 ------------------- .../GridAdaptiveLoadBalancingSpiMBean.java | 27 - .../adaptive/GridAdaptiveLoadProbe.java | 90 --- .../GridAdaptiveProcessingTimeLoadProbe.java | 98 ---- .../GridRoundRobinGlobalLoadBalancer.java | 305 ---------- .../GridRoundRobinLoadBalancingSpi.java | 319 ---------- .../GridRoundRobinLoadBalancingSpiMBean.java | 37 -- .../GridRoundRobinPerTaskLoadBalancer.java | 96 --- .../RoundRobinGlobalLoadBalancer.java | 305 ++++++++++ .../roundrobin/RoundRobinLoadBalancingSpi.java | 319 ++++++++++ .../RoundRobinLoadBalancingSpiMBean.java | 37 ++ .../RoundRobinPerTaskLoadBalancer.java | 96 +++ .../GridWeightedRandomLoadBalancingSpi.java | 394 ------------- ...GridWeightedRandomLoadBalancingSpiMBean.java | 37 -- .../WeightedRandomLoadBalancingSpi.java | 394 +++++++++++++ .../WeightedRandomLoadBalancingSpiMBean.java | 37 ++ .../src/test/config/io-manager-benchmark.xml | 2 +- modules/core/src/test/config/jobs-load-base.xml | 2 +- .../src/test/config/load/merge-sort-base.xml | 2 +- .../config/spring-cache-put-remove-load.xml | 2 +- .../grid/kernal/GridMultipleSpisSelfTest.java | 2 +- .../managers/GridManagerStopSelfTest.java | 2 +- ...dAdaptiveLoadBalancingSpiConfigSelfTest.java | 6 +- ...iveLoadBalancingSpiMultipleNodeSelfTest.java | 8 +- .../GridAdaptiveLoadBalancingSpiSelfTest.java | 8 +- ...aptiveLoadBalancingSpiStartStopSelfTest.java | 4 +- ...alancingNotPerTaskMultithreadedSelfTest.java | 6 +- ...dRobinLoadBalancingSpiLocalNodeSelfTest.java | 4 +- ...inLoadBalancingSpiMultipleNodesSelfTest.java | 6 +- ...RobinLoadBalancingSpiNotPerTaskSelfTest.java | 4 +- ...dRobinLoadBalancingSpiStartStopSelfTest.java | 6 +- ...nLoadBalancingSpiTopologyChangeSelfTest.java | 4 +- .../roundrobin/GridRoundRobinTestUtils.java | 4 +- ...tedRandomLoadBalancingSpiConfigSelfTest.java | 6 +- ...dWeightedRandomLoadBalancingSpiSelfTest.java | 4 +- ...RandomLoadBalancingSpiStartStopSelfTest.java | 4 +- ...dRandomLoadBalancingSpiWeightedSelfTest.java | 8 +- 56 files changed, 2486 insertions(+), 2497 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a62862fe/modules/core/src/main/java/org/apache/ignite/IgniteCompute.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteCompute.java b/modules/core/src/main/java/org/apache/ignite/IgniteCompute.java index 4a8cd49..875b22a 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteCompute.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteCompute.java @@ -13,7 +13,6 @@ import org.apache.ignite.cluster.*; import org.apache.ignite.compute.*; import org.apache.ignite.lang.*; import org.gridgain.grid.*; -import org.gridgain.grid.spi.loadbalancing.*; import org.jetbrains.annotations.*; import java.io.*; @@ -52,7 +51,7 @@ import java.util.concurrent.*; * {@link Serializable} and should be used to run computations on the grid. * <h1 class="header">Load Balancing</h1> * In all cases other than {@code broadcast(...)}, GridGain must select a node for a computation - * to be executed. The node will be selected based on the underlying {@link GridLoadBalancingSpi}, + * to be executed. The node will be selected based on the underlying {@link org.gridgain.grid.spi.loadbalancing.LoadBalancingSpi}, * which by default sequentially picks next available node from grid projection. Other load balancing * policies, such as {@code random} or {@code adaptive}, can be configured as well by selecting * a different load balancing SPI in grid configuration. If your logic requires some custom http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a62862fe/modules/core/src/main/java/org/apache/ignite/compute/ComputeLoadBalancer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/compute/ComputeLoadBalancer.java b/modules/core/src/main/java/org/apache/ignite/compute/ComputeLoadBalancer.java index 20a4e8e..edba366 100644 --- a/modules/core/src/main/java/org/apache/ignite/compute/ComputeLoadBalancer.java +++ b/modules/core/src/main/java/org/apache/ignite/compute/ComputeLoadBalancer.java @@ -11,7 +11,6 @@ package org.apache.ignite.compute; import org.apache.ignite.cluster.*; import org.gridgain.grid.*; -import org.gridgain.grid.spi.loadbalancing.*; import org.jetbrains.annotations.*; import java.util.*; @@ -19,7 +18,7 @@ import java.util.*; /** * Load balancer is used for finding the best balanced node according * to load balancing policy. Internally load balancer will - * query the {@link GridLoadBalancingSpi} + * query the {@link org.gridgain.grid.spi.loadbalancing.LoadBalancingSpi} * to get the balanced node. * <p> * Load balancer can be used <i>explicitly</i> from inside {@link ComputeTask#map(List, Object)} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a62862fe/modules/core/src/main/java/org/apache/ignite/compute/ComputeTaskSpis.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/compute/ComputeTaskSpis.java b/modules/core/src/main/java/org/apache/ignite/compute/ComputeTaskSpis.java index 53941f0..6c1d231 100644 --- a/modules/core/src/main/java/org/apache/ignite/compute/ComputeTaskSpis.java +++ b/modules/core/src/main/java/org/apache/ignite/compute/ComputeTaskSpis.java @@ -9,14 +9,12 @@ package org.apache.ignite.compute; -import org.gridgain.grid.spi.loadbalancing.*; - import java.lang.annotation.*; /** * This annotation allows task to specify what SPIs it wants to use. * Starting with {@code GridGain 2.1} you can start multiple instances - * of {@link GridLoadBalancingSpi}, + * of {@link org.gridgain.grid.spi.loadbalancing.LoadBalancingSpi}, * {@link org.apache.ignite.spi.failover.FailoverSpi}, and {@link org.apache.ignite.spi.checkpoint.CheckpointSpi}. If you do that, * you need to tell a task which SPI to use (by default it will use the fist * SPI in the list). http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a62862fe/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java index 04f7634..d3386d2 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java @@ -36,7 +36,6 @@ import org.apache.ignite.spi.discovery.*; import org.apache.ignite.spi.eventstorage.*; import org.apache.ignite.spi.failover.*; import org.gridgain.grid.spi.loadbalancing.*; -import org.gridgain.grid.spi.loadbalancing.roundrobin.*; import org.gridgain.grid.spi.securesession.*; import org.gridgain.grid.spi.securesession.noop.*; import org.gridgain.grid.spi.swapspace.*; @@ -367,7 +366,7 @@ public class IgniteConfiguration { private FailoverSpi[] failSpi; /** Load balancing SPI. */ - private GridLoadBalancingSpi[] loadBalancingSpi; + private LoadBalancingSpi[] loadBalancingSpi; /** Checkpoint SPI. */ private GridSwapSpaceSpi swapSpaceSpi; @@ -2074,11 +2073,11 @@ public class IgniteConfiguration { /** * Should return fully configured load balancing SPI implementation. If not provided, - * {@link GridRoundRobinLoadBalancingSpi} will be used. + * {@link org.gridgain.grid.spi.loadbalancing.roundrobin.RoundRobinLoadBalancingSpi} will be used. * * @return Grid load balancing SPI implementation or {@code null} to use default implementation. */ - public GridLoadBalancingSpi[] getLoadBalancingSpi() { + public LoadBalancingSpi[] getLoadBalancingSpi() { return loadBalancingSpi; } @@ -2115,13 +2114,13 @@ public class IgniteConfiguration { } /** - * Sets fully configured instance of {@link GridLoadBalancingSpi}. + * Sets fully configured instance of {@link org.gridgain.grid.spi.loadbalancing.LoadBalancingSpi}. * - * @param loadBalancingSpi Fully configured instance of {@link GridLoadBalancingSpi} or + * @param loadBalancingSpi Fully configured instance of {@link org.gridgain.grid.spi.loadbalancing.LoadBalancingSpi} or * {@code null} if no SPI provided. * @see IgniteConfiguration#getLoadBalancingSpi() */ - public void setLoadBalancingSpi(GridLoadBalancingSpi... loadBalancingSpi) { + public void setLoadBalancingSpi(LoadBalancingSpi... loadBalancingSpi) { this.loadBalancingSpi = loadBalancingSpi; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a62862fe/modules/core/src/main/java/org/apache/ignite/resources/IgniteLoadBalancerResource.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/resources/IgniteLoadBalancerResource.java b/modules/core/src/main/java/org/apache/ignite/resources/IgniteLoadBalancerResource.java index 17b0ed1..6479e94 100644 --- a/modules/core/src/main/java/org/apache/ignite/resources/IgniteLoadBalancerResource.java +++ b/modules/core/src/main/java/org/apache/ignite/resources/IgniteLoadBalancerResource.java @@ -9,14 +9,12 @@ package org.apache.ignite.resources; -import org.gridgain.grid.spi.loadbalancing.*; - import java.lang.annotation.*; /** * Annotates a field or a setter method for injection of {@link org.apache.ignite.compute.ComputeLoadBalancer}. * Specific implementation for grid load balancer is defined by - * {@link GridLoadBalancingSpi} + * {@link org.gridgain.grid.spi.loadbalancing.LoadBalancingSpi} * which is provided to grid via {@link org.apache.ignite.configuration.IgniteConfiguration}.. * <p> * Load balancer can be injected into instances of following classes: http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a62862fe/modules/core/src/main/java/org/apache/ignite/spi/failover/FailoverContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/failover/FailoverContext.java b/modules/core/src/main/java/org/apache/ignite/spi/failover/FailoverContext.java index d66ec56..03603cb 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/failover/FailoverContext.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/failover/FailoverContext.java @@ -12,7 +12,7 @@ package org.apache.ignite.spi.failover; import org.apache.ignite.cluster.*; import org.apache.ignite.compute.*; import org.gridgain.grid.*; -import org.gridgain.grid.spi.loadbalancing.*; + import java.util.*; /** @@ -36,7 +36,7 @@ public interface FailoverContext { /** * Gets the next balanced node for failed job. Internally this method will - * delegate to load balancing SPI (see {@link GridLoadBalancingSpi} to + * delegate to load balancing SPI (see {@link org.gridgain.grid.spi.loadbalancing.LoadBalancingSpi} to * determine the optimal node for execution. * * @param top Topology to pick balanced node from. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a62862fe/modules/core/src/main/java/org/gridgain/grid/cache/affinity/GridCacheAffinityKeyMapped.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/cache/affinity/GridCacheAffinityKeyMapped.java b/modules/core/src/main/java/org/gridgain/grid/cache/affinity/GridCacheAffinityKeyMapped.java index a0492dc..245d39d 100644 --- a/modules/core/src/main/java/org/gridgain/grid/cache/affinity/GridCacheAffinityKeyMapped.java +++ b/modules/core/src/main/java/org/gridgain/grid/cache/affinity/GridCacheAffinityKeyMapped.java @@ -10,7 +10,6 @@ package org.gridgain.grid.cache.affinity; import org.gridgain.grid.cache.*; -import org.gridgain.grid.spi.loadbalancing.*; import java.lang.annotation.*; import java.util.concurrent.*; @@ -84,7 +83,7 @@ import java.util.concurrent.*; * {@link org.apache.ignite.compute.ComputeJob} or any other grid computation, such as {@link Runnable}, {@link Callable}, or * {@link org.apache.ignite.lang.IgniteClosure}. It should be attached to a method or field that provides affinity key * for the computation. Only one annotation per class is allowed. Whenever such annotation is detected, - * then {@link GridLoadBalancingSpi} will be bypassed, and computation will be routed to the grid node + * then {@link org.gridgain.grid.spi.loadbalancing.LoadBalancingSpi} will be bypassed, and computation will be routed to the grid node * where the specified affinity key is cached. You can also use optional {@link GridCacheName @GridCacheName} * annotation whenever non-default cache name needs to be specified. * <p> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a62862fe/modules/core/src/main/java/org/gridgain/grid/kernal/GridGainEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/GridGainEx.java b/modules/core/src/main/java/org/gridgain/grid/kernal/GridGainEx.java index 12b2deb..fbfd071 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/GridGainEx.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/GridGainEx.java @@ -1472,7 +1472,7 @@ public class GridGainEx { DeploymentSpi deploySpi = cfg.getDeploymentSpi(); CheckpointSpi[] cpSpi = cfg.getCheckpointSpi(); FailoverSpi[] failSpi = cfg.getFailoverSpi(); - GridLoadBalancingSpi[] loadBalancingSpi = cfg.getLoadBalancingSpi(); + LoadBalancingSpi[] loadBalancingSpi = cfg.getLoadBalancingSpi(); GridSwapSpaceSpi swapspaceSpi = cfg.getSwapSpaceSpi(); IndexingSpi[] indexingSpi = cfg.getIndexingSpi(); @@ -1701,7 +1701,7 @@ public class GridGainEx { failSpi = new FailoverSpi[] {new AlwaysFailoverSpi()}; if (loadBalancingSpi == null) - loadBalancingSpi = new GridLoadBalancingSpi[] {new GridRoundRobinLoadBalancingSpi()}; + loadBalancingSpi = new LoadBalancingSpi[] {new RoundRobinLoadBalancingSpi()}; if (swapspaceSpi == null) { boolean needSwap = false; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a62862fe/modules/core/src/main/java/org/gridgain/grid/kernal/managers/loadbalancer/GridLoadBalancerManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/managers/loadbalancer/GridLoadBalancerManager.java b/modules/core/src/main/java/org/gridgain/grid/kernal/managers/loadbalancer/GridLoadBalancerManager.java index 26918be..3a89361 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/managers/loadbalancer/GridLoadBalancerManager.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/managers/loadbalancer/GridLoadBalancerManager.java @@ -27,7 +27,7 @@ import java.util.*; /** * Load balancing manager. */ -public class GridLoadBalancerManager extends GridManagerAdapter<GridLoadBalancingSpi> { +public class GridLoadBalancerManager extends GridManagerAdapter<LoadBalancingSpi> { /** * @param ctx Grid kernal context. */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a62862fe/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/GridLoadBalancingSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/GridLoadBalancingSpi.java b/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/GridLoadBalancingSpi.java deleted file mode 100644 index 1d5f6f9..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/GridLoadBalancingSpi.java +++ /dev/null @@ -1,117 +0,0 @@ -/* @java.file.header */ - -/* _________ _____ __________________ _____ - * __ ____/___________(_)______ /__ ____/______ ____(_)_______ - * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ - * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / - * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ - */ - -package org.gridgain.grid.spi.loadbalancing; - -import org.apache.ignite.cluster.*; -import org.apache.ignite.compute.*; -import org.apache.ignite.spi.*; -import org.gridgain.grid.*; -import org.gridgain.grid.spi.loadbalancing.adaptive.*; -import org.gridgain.grid.spi.loadbalancing.roundrobin.*; -import org.gridgain.grid.spi.loadbalancing.weightedrandom.*; - -import java.util.*; - -/** - * Load balancing SPI provides the next best balanced node for job - * execution. This SPI is used either implicitly or explicitly whenever - * a job gets mapped to a node during {@link org.apache.ignite.compute.ComputeTask#map(List, Object)} - * invocation. - * <h1 class="header">Coding Examples</h1> - * If you are using {@link org.apache.ignite.compute.ComputeTaskSplitAdapter} then load balancing logic - * is transparent to your code and is handled automatically by the adapter. - * Here is an example of how your task could look: - * <pre name="code" class="java"> - * public class MyFooBarTask extends GridComputeTaskSplitAdapter<Object,Object> { - * @Override - * protected Collection<? extends GridComputeJob> split(int gridSize, Object arg) throws GridException { - * List<MyFooBarJob> jobs = new ArrayList<MyFooBarJob>(gridSize); - * - * for (int i = 0; i < gridSize; i++) { - * jobs.add(new MyFooBarJob(arg)); - * } - * - * // Node assignment via load balancer - * // happens automatically. - * return jobs; - * } - * ... - * } - * </pre> - * If you need more fine-grained control over how some jobs within task get mapped to a node - * <i>and</i> use, for example, affinity load balancing for some other jobs within task, then you should use - * {@link org.apache.ignite.compute.ComputeTaskAdapter}. Here is an example of how your task could look. Note that in this - * case we manually inject load balancer and use it to pick the best node. Doing it in - * such way would allow user to map some jobs manually and for others use load balancer. - * <pre name="code" class="java"> - * public class MyFooBarTask extends GridComputeTaskAdapter<String,String> { - * // Inject load balancer. - * @GridLoadBalancerResource - * GridComputeLoadBalancer balancer; - * - * // Map jobs to grid nodes. - * public Map<? extends GridComputeJob, GridNode> map(List<GridNode> subgrid, String arg) throws GridException { - * Map<MyFooBarJob, GridNode> jobs = new HashMap<MyFooBarJob, GridNode>(subgrid.size()); - * - * // In more complex cases, you can actually do - * // more complicated assignments of jobs to nodes. - * for (int i = 0; i < subgrid.size(); i++) { - * // Pick the next best balanced node for the job. - * GridComputeJob myJob = new MyFooBarJob(arg); - * - * jobs.put(myJob, balancer.getBalancedNode(myJob, null)); - * } - * - * return jobs; - * } - * - * // Aggregate results into one compound result. - * public String reduce(List<GridComputeJobResult> results) throws GridException { - * // For the purpose of this example we simply - * // concatenate string representation of every - * // job result - * StringBuilder buf = new StringBuilder(); - * - * for (GridComputeJobResult res : results) { - * // Append string representation of result - * // returned by every job. - * buf.append(res.getData().toString()); - * } - * - * return buf.toString(); - * } - * } - * </pre> - * <p> - * GridGain comes with the following load balancing SPI implementations out of the box: - * <ul> - * <li>{@link GridRoundRobinLoadBalancingSpi} - default</li> - * <li>{@link GridAdaptiveLoadBalancingSpi}</li> - * <li>{@link GridWeightedRandomLoadBalancingSpi}</li> - * </ul> - * <b>NOTE:</b> this SPI (i.e. methods in this interface) should never be used directly. SPIs provide - * internal view on the subsystem and is used internally by GridGain kernal. In rare use cases when - * access to a specific implementation of this SPI is required - an instance of this SPI can be obtained - * via {@link org.apache.ignite.Ignite#configuration()} method to check its configuration properties or call other non-SPI - * methods. Note again that calling methods from this interface on the obtained instance can lead - * to undefined behavior and explicitly not supported. - */ -public interface GridLoadBalancingSpi extends IgniteSpi { - /** - * Gets balanced node for specified job within given task session. - * - * @param ses Grid task session for currently executing task. - * @param top Topology of task nodes from which to pick the best balanced node for given job. - * @param job Job for which to pick the best balanced node. - * @throws GridException If failed to get next balanced node. - * @return Best balanced node for the given job within given task session. - */ - public ClusterNode getBalancedNode(ComputeTaskSession ses, List<ClusterNode> top, ComputeJob job) throws GridException; -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a62862fe/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/LoadBalancingSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/LoadBalancingSpi.java b/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/LoadBalancingSpi.java new file mode 100644 index 0000000..91095fa --- /dev/null +++ b/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/LoadBalancingSpi.java @@ -0,0 +1,114 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.gridgain.grid.spi.loadbalancing; + +import org.apache.ignite.cluster.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.spi.*; +import org.gridgain.grid.*; + +import java.util.*; + +/** + * Load balancing SPI provides the next best balanced node for job + * execution. This SPI is used either implicitly or explicitly whenever + * a job gets mapped to a node during {@link org.apache.ignite.compute.ComputeTask#map(List, Object)} + * invocation. + * <h1 class="header">Coding Examples</h1> + * If you are using {@link org.apache.ignite.compute.ComputeTaskSplitAdapter} then load balancing logic + * is transparent to your code and is handled automatically by the adapter. + * Here is an example of how your task could look: + * <pre name="code" class="java"> + * public class MyFooBarTask extends GridComputeTaskSplitAdapter<Object,Object> { + * @Override + * protected Collection<? extends GridComputeJob> split(int gridSize, Object arg) throws GridException { + * List<MyFooBarJob> jobs = new ArrayList<MyFooBarJob>(gridSize); + * + * for (int i = 0; i < gridSize; i++) { + * jobs.add(new MyFooBarJob(arg)); + * } + * + * // Node assignment via load balancer + * // happens automatically. + * return jobs; + * } + * ... + * } + * </pre> + * If you need more fine-grained control over how some jobs within task get mapped to a node + * <i>and</i> use, for example, affinity load balancing for some other jobs within task, then you should use + * {@link org.apache.ignite.compute.ComputeTaskAdapter}. Here is an example of how your task could look. Note that in this + * case we manually inject load balancer and use it to pick the best node. Doing it in + * such way would allow user to map some jobs manually and for others use load balancer. + * <pre name="code" class="java"> + * public class MyFooBarTask extends GridComputeTaskAdapter<String,String> { + * // Inject load balancer. + * @GridLoadBalancerResource + * GridComputeLoadBalancer balancer; + * + * // Map jobs to grid nodes. + * public Map<? extends GridComputeJob, GridNode> map(List<GridNode> subgrid, String arg) throws GridException { + * Map<MyFooBarJob, GridNode> jobs = new HashMap<MyFooBarJob, GridNode>(subgrid.size()); + * + * // In more complex cases, you can actually do + * // more complicated assignments of jobs to nodes. + * for (int i = 0; i < subgrid.size(); i++) { + * // Pick the next best balanced node for the job. + * GridComputeJob myJob = new MyFooBarJob(arg); + * + * jobs.put(myJob, balancer.getBalancedNode(myJob, null)); + * } + * + * return jobs; + * } + * + * // Aggregate results into one compound result. + * public String reduce(List<GridComputeJobResult> results) throws GridException { + * // For the purpose of this example we simply + * // concatenate string representation of every + * // job result + * StringBuilder buf = new StringBuilder(); + * + * for (GridComputeJobResult res : results) { + * // Append string representation of result + * // returned by every job. + * buf.append(res.getData().toString()); + * } + * + * return buf.toString(); + * } + * } + * </pre> + * <p> + * GridGain comes with the following load balancing SPI implementations out of the box: + * <ul> + * <li>{@link org.gridgain.grid.spi.loadbalancing.roundrobin.RoundRobinLoadBalancingSpi} - default</li> + * <li>{@link org.gridgain.grid.spi.loadbalancing.adaptive.AdaptiveLoadBalancingSpi}</li> + * <li>{@link org.gridgain.grid.spi.loadbalancing.weightedrandom.WeightedRandomLoadBalancingSpi}</li> + * </ul> + * <b>NOTE:</b> this SPI (i.e. methods in this interface) should never be used directly. SPIs provide + * internal view on the subsystem and is used internally by GridGain kernal. In rare use cases when + * access to a specific implementation of this SPI is required - an instance of this SPI can be obtained + * via {@link org.apache.ignite.Ignite#configuration()} method to check its configuration properties or call other non-SPI + * methods. Note again that calling methods from this interface on the obtained instance can lead + * to undefined behavior and explicitly not supported. + */ +public interface LoadBalancingSpi extends IgniteSpi { + /** + * Gets balanced node for specified job within given task session. + * + * @param ses Grid task session for currently executing task. + * @param top Topology of task nodes from which to pick the best balanced node for given job. + * @param job Job for which to pick the best balanced node. + * @throws GridException If failed to get next balanced node. + * @return Best balanced node for the given job within given task session. + */ + public ClusterNode getBalancedNode(ComputeTaskSession ses, List<ClusterNode> top, ComputeJob job) throws GridException; +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a62862fe/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/adaptive/AdaptiveCpuLoadProbe.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/adaptive/AdaptiveCpuLoadProbe.java b/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/adaptive/AdaptiveCpuLoadProbe.java new file mode 100644 index 0000000..c438e9c --- /dev/null +++ b/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/adaptive/AdaptiveCpuLoadProbe.java @@ -0,0 +1,229 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.gridgain.grid.spi.loadbalancing.adaptive; + +import org.apache.ignite.cluster.*; +import org.gridgain.grid.util.typedef.internal.*; + +/** + * Implementation of node load probing based on CPU load. + * <p> + * Based on {@link #setUseAverage(boolean)} + * parameter, this implementation will either use average CPU load + * values or current (default is to use averages). + * <p> + * Based on {@link #setUseProcessors(boolean)} parameter, this implementation + * will either take number of processors on the node into account or not. + * Since CPU load on multi-processor boxes shows medium load of multiple CPU's it + * usually means that the remaining capacity is proportional to the number of + * CPU's (or cores) on the node. This configuration parameter indicates + * whether to divide each node's CPU load by the number of processors on that node + * (default is {@code true}). + * <p> + * Also note that in some environments every processor may not be adding 100% of + * processing power. For example, if you are using multi-core CPU's, then addition of + * every core would probably result in about 75% of extra CPU power. To account + * for that, you should set {@link #setProcessorCoefficient(double)} parameter to + * {@code 0.75} . + * <p> + * Below is an example of how CPU load probe would be configured in GridGain + * Spring configuration file: + * <pre name="code" class="xml"> + * <property name="loadBalancingSpi"> + * <bean class="org.gridgain.grid.spi.loadBalancing.adaptive.GridAdaptiveLoadBalancingSpi"> + * <property name="loadProbe"> + * <bean class="org.gridgain.grid.spi.loadBalancing.adaptive.GridAdaptiveCpuLoadProbe"> + * <property name="useAverage" value="true"/> + * <property name="useProcessors" value="true"/> + * <property name="processorCoefficient" value="0.9"/> + * </bean> + * </property> + * </bean> + * </property> + * </pre> + * <p> + * This implementation is used by default by {@link AdaptiveLoadBalancingSpi} SPI. + */ +public class AdaptiveCpuLoadProbe implements AdaptiveLoadProbe { + /** Flag indicating whether to use average CPU load vs. current. */ + private boolean useAvg = true; + + /** + * Flag indicating whether to divide each node's CPU load + * by the number of processors on that node. + */ + private boolean useProcs = true; + + /** + * Coefficient of every CPU processor. By default it is {@code 1}, but + * in some environments every processor may not be adding 100% of processing + * power. For example, if you are using multi-core CPU's, then addition of + * every core would probably result in about 75% of extra CPU power, and hence + * you would set this coefficient to {@code 0.75} . + */ + private double procCoefficient = 1; + + /** + * Initializes CPU load probe to use CPU load average by default. + */ + public AdaptiveCpuLoadProbe() { + // No-op. + } + + /** + * Specifies whether to use average CPU load vs. current and whether or + * not to take number of processors into account. + * <p> + * Since CPU load on multi-processor boxes shows medium load of multiple CPU's it + * usually means that the remaining capacity is proportional to the number of + * CPU's (or cores) on the node. + * + * @param useAvg Flag indicating whether to use average CPU load vs. current + * (default is {@code true}). + * @param useProcs Flag indicating whether to divide each node's CPU load + * by the number of processors on that node (default is {@code true}). + */ + public AdaptiveCpuLoadProbe(boolean useAvg, boolean useProcs) { + this.useAvg = useAvg; + this.useProcs = useProcs; + } + + /** + * Specifies whether to use average CPU load vs. current and whether or + * not to take number of processors into account. It also allows to + * specify the coefficient of addition power every CPU adds. + * <p> + * Since CPU load on multi-processor boxes shows medium load of multiple CPU's it + * usually means that the remaining capacity is proportional to the number of + * CPU's (or cores) on the node. + * <p> + * Also, in some environments every processor may not be adding 100% of processing + * power. For example, if you are using multi-core CPU's, then addition of + * every core would probably result in about 75% of extra CPU power, and hence + * you would set this coefficient to {@code 0.75} . + * + * @param useAvg Flag indicating whether to use average CPU load vs. current + * (default is {@code true}). + * @param useProcs Flag indicating whether to divide each node's CPU load + * by the number of processors on that node (default is {@code true}). + * @param procCoefficient Coefficient of every CPU processor (default value is {@code 1}). + */ + public AdaptiveCpuLoadProbe(boolean useAvg, boolean useProcs, double procCoefficient) { + this.useAvg = useAvg; + this.useProcs = useProcs; + this.procCoefficient = procCoefficient; + } + + /** + * Gets flag indicating whether to use average CPU load vs. current. + * + * @return Flag indicating whether to use average CPU load vs. current. + */ + public boolean isUseAverage() { + return useAvg; + } + + /** + * Sets flag indicating whether to use average CPU load vs. current. + * If not explicitly set, then default value is {@code true}. + * + * @param useAvg Flag indicating whether to use average CPU load vs. current. + */ + public void setUseAverage(boolean useAvg) { + this.useAvg = useAvg; + } + + /** + * Gets flag indicating whether to use average CPU load vs. current + * (default is {@code true}). + * <p> + * Since CPU load on multi-processor boxes shows medium load of multiple CPU's it + * usually means that the remaining capacity is proportional to the number of + * CPU's (or cores) on the node. + * + * @return Flag indicating whether to divide each node's CPU load + * by the number of processors on that node (default is {@code true}). + */ + public boolean isUseProcessors() { + return useProcs; + } + + /** + * Sets flag indicating whether to use average CPU load vs. current + * (default is {@code true}). + * <p> + * Since CPU load on multi-processor boxes shows medium load of multiple CPU's it + * usually means that the remaining capacity is proportional to the number of + * CPU's (or cores) on the node. + * <p> + * If not explicitly set, then default value is {@code true}. + * + * @param useProcs Flag indicating whether to divide each node's CPU load + * by the number of processors on that node (default is {@code true}). + */ + public void setUseProcessors(boolean useProcs) { + this.useProcs = useProcs; + } + + /** + * Gets coefficient of every CPU processor. By default it is {@code 1}, but + * in some environments every processor may not be adding 100% of processing + * power. For example, if you are using multi-core CPU's, then addition of + * every core would probably result in about 75% of extra CPU power, and hence + * you would set this coefficient to {@code 0.75} . + * <p> + * This value is ignored if {@link #isUseProcessors()} is set to {@code false}. + * + * @return Coefficient of every CPU processor. + */ + public double getProcessorCoefficient() { + return procCoefficient; + } + + /** + * Sets coefficient of every CPU processor. By default it is {@code 1}, but + * in some environments every processor may not be adding 100% of processing + * power. For example, if you are using multi-core CPU's, then addition of + * every core would probably result in about 75% of extra CPU power, and hence + * you would set this coefficient to {@code 0.75} . + * <p> + * This value is ignored if {@link #isUseProcessors()} is set to {@code false}. + * + * @param procCoefficient Coefficient of every CPU processor. + */ + public void setProcessorCoefficient(double procCoefficient) { + A.ensure(procCoefficient > 0, "procCoefficient > 0"); + + this.procCoefficient = procCoefficient; + } + + /** {@inheritDoc} */ + @Override public double getLoad(ClusterNode node, int jobsSentSinceLastUpdate) { + ClusterNodeMetrics metrics = node.metrics(); + + double k = 1.0d; + + if (useProcs) { + int procs = metrics.getTotalCpus(); + + if (procs > 1) + k = procs * procCoefficient; + } + + double load = (useAvg ? metrics.getAverageCpuLoad() : metrics.getCurrentCpuLoad()) / k; + + return load < 0 ? 0 : load; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(AdaptiveCpuLoadProbe.class, this); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a62862fe/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/adaptive/AdaptiveJobCountLoadProbe.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/adaptive/AdaptiveJobCountLoadProbe.java b/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/adaptive/AdaptiveJobCountLoadProbe.java new file mode 100644 index 0000000..95ef91d --- /dev/null +++ b/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/adaptive/AdaptiveJobCountLoadProbe.java @@ -0,0 +1,96 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.gridgain.grid.spi.loadbalancing.adaptive; + +import org.apache.ignite.cluster.*; +import org.gridgain.grid.util.typedef.internal.*; + +/** + * Implementation of node load probing based on active and waiting job count. + * Based on {@link #setUseAverage(boolean)} parameter, this implementation will + * either use average job count values or current (default is to use averages). + * <p> + * The load of a node is simply calculated by adding active and waiting job counts. + * <p> + * Below is an example of how CPU load probe would be configured in GridGain + * Spring configuration file: + * <pre name="code" class="xml"> + * <property name="loadBalancingSpi"> + * <bean class="org.gridgain.grid.spi.loadBalancing.adaptive.GridAdaptiveLoadBalancingSpi"> + * <property name="loadProbe"> + * <bean class="org.gridgain.grid.spi.loadBalancing.adaptive.GridAdaptiveJobCountLoadProbe"> + * <property name="useAverage" value="true"/> + * </bean> + * </property> + * </bean> + * </property> + * </pre> + */ +public class AdaptiveJobCountLoadProbe implements AdaptiveLoadProbe { + /** Flag indicating whether to use average CPU load vs. current. */ + private boolean useAvg = true; + + /** + * Initializes active job probe. + */ + public AdaptiveJobCountLoadProbe() { + // No-op. + } + + /** + * Creates new active job prove specifying whether to use average + * job counts vs. current. + * + * @param useAvg Flag indicating whether to use average job counts vs. current. + */ + public AdaptiveJobCountLoadProbe(boolean useAvg) { + this.useAvg = useAvg; + } + + /** + * Gets flag indicating whether to use average job counts vs. current. + * + * @return Flag indicating whether to use average job counts vs. current. + */ + public boolean isUseAverage() { + return useAvg; + } + + /** + * Sets flag indicating whether to use average job counts vs. current. + * + * @param useAvg Flag indicating whether to use average job counts vs. current. + */ + public void setUseAverage(boolean useAvg) { + this.useAvg = useAvg; + } + + + /** {@inheritDoc} */ + @Override public double getLoad(ClusterNode node, int jobsSentSinceLastUpdate) { + ClusterNodeMetrics metrics = node.metrics(); + + if (useAvg) { + double load = metrics.getAverageActiveJobs() + metrics.getAverageWaitingJobs(); + + if (load > 0) + return load; + } + + double load = metrics.getCurrentActiveJobs() + metrics.getCurrentWaitingJobs(); + + return load < 0 ? 0 : load; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(AdaptiveJobCountLoadProbe.class, this); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a62862fe/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/adaptive/AdaptiveLoadBalancingSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/adaptive/AdaptiveLoadBalancingSpi.java b/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/adaptive/AdaptiveLoadBalancingSpi.java new file mode 100644 index 0000000..069d269 --- /dev/null +++ b/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/adaptive/AdaptiveLoadBalancingSpi.java @@ -0,0 +1,581 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.gridgain.grid.spi.loadbalancing.adaptive; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.events.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.resources.*; +import org.apache.ignite.spi.*; +import org.gridgain.grid.*; +import org.gridgain.grid.kernal.managers.eventstorage.*; +import org.gridgain.grid.spi.loadbalancing.*; +import org.gridgain.grid.util.typedef.*; +import org.gridgain.grid.util.typedef.internal.*; +import org.jdk8.backport.*; +import org.jetbrains.annotations.*; + +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; +import java.util.concurrent.locks.*; + +import static org.apache.ignite.events.IgniteEventType.*; + +/** + * Load balancing SPI that adapts to overall node performance. It + * proportionally distributes more jobs to more performant nodes based + * on a pluggable and dynamic node load probing. + * <p> + * <h1 class="header">Adaptive Node Probe</h1> + * This SPI comes with pluggable algorithm to calculate a node load + * at any given point of time. The algorithm is defined by + * {@link AdaptiveLoadProbe} interface and user is + * free to provide custom implementations. By default + * {@link AdaptiveCpuLoadProbe} implementation is used + * which distributes jobs to nodes based on average CPU load + * on every node. + * <p> + * The following load probes are available with the product: + * <ul> + * <li>{@link AdaptiveCpuLoadProbe} - default</li> + * <li>{@link AdaptiveProcessingTimeLoadProbe}</li> + * <li>{@link AdaptiveJobCountLoadProbe}</li> + * </ul> + * Note that if {@link AdaptiveLoadProbe#getLoad(org.apache.ignite.cluster.ClusterNode, int)} returns a value of {@code 0}, + * then implementation will assume that load value is simply not available and + * will try to calculate an average of load values for other nodes. If such + * average cannot be obtained (all node load values are {@code 0}), then a value + * of {@code 1} will be used. + * <p> + * When working with node metrics, take into account that all averages are + * calculated over metrics history size defined by {@link org.apache.ignite.configuration.IgniteConfiguration#getMetricsExpireTime()} + * and {@link org.apache.ignite.configuration.IgniteConfiguration#getMetricsHistorySize()} grid configuration parameters. + * Generally the larger these configuration parameter values are, the more precise the metrics are. + * You should tune these values based on the level of accuracy needed vs. the additional memory + * that would be required for storing metrics. + * <p> + * You should also keep in mind that metrics for remote nodes are delayed (usually by the + * heartbeat frequency). So if it is acceptable in your environment, set the heartbeat frequency + * to be more inline with job execution time. Generally, the more often heartbeats between nodes + * are exchanged, the more precise the metrics are. However, you should keep in mind that if + * heartbeats are exchanged too often then it may create unnecessary traffic in the network. + * Heartbeats (or metrics update frequency) can be configured via underlying + * {@link org.apache.ignite.spi.discovery.DiscoverySpi} used in your grid. + * <p> + * Here is an example of how probing can be implemented to use + * number of active and waiting jobs as probing mechanism: + * <pre name="code" class="java"> + * public class FooBarLoadProbe implements GridAdaptiveLoadProbe { + * // Flag indicating whether to use average value or current. + * private int useAvg = true; + * + * public FooBarLoadProbe(boolean useAvg) { + * this.useAvg = useAvg; + * } + * + * // Calculate load based on number of active and waiting jobs. + * public double getLoad(GridNode node, int jobsSentSinceLastUpdate) { + * GridNodeMetrics metrics = node.getMetrics(); + * + * if (useAvg) { + * double load = metrics.getAverageActiveJobs() + metrics.getAverageWaitingJobs(); + * + * if (load > 0) { + * return load; + * } + * } + * + * return metrics.getCurrentActiveJobs() + metrics.getCurrentWaitingJobs(); + * } + * } + * </pre> + * <h1 class="header">Which Node Probe To Use</h1> + * There is no correct answer here. Every single node probe will work better or worse in + * different environments. CPU load probe (default option) is the safest approach to start + * with as it simply attempts to utilize every CPU on the grid to the maximum. However, you should + * experiment with other probes by executing load tests in your environment and observing + * which probe gives you best performance and load balancing. + * <p> + * <h1 class="header">Task Coding Example</h1> + * If you are using {@link org.apache.ignite.compute.ComputeTaskSplitAdapter} then load balancing logic + * is transparent to your code and is handled automatically by the adapter. + * Here is an example of how your task will look: + * <pre name="code" class="java"> + * public class MyFooBarTask extends GridComputeTaskSplitAdapter<Object, Object> { + * @Override + * protected Collection<? extends GridComputeJob> split(int gridSize, Object arg) throws GridException { + * List<MyFooBarJob> jobs = new ArrayList<MyFooBarJob>(gridSize); + * + * for (int i = 0; i < gridSize; i++) { + * jobs.add(new MyFooBarJob(arg)); + * } + * + * // Node assignment via load balancer + * // happens automatically. + * return jobs; + * } + * ... + * } + * </pre> + * If you need more fine-grained control over how some jobs within task get mapped to a node + * and use affinity load balancing for some other jobs within task, then you should use + * {@link org.apache.ignite.compute.ComputeTaskAdapter}. Here is an example of how your task will look. Note that in this + * case we manually inject load balancer and use it to pick the best node. Doing it in + * such way would allow user to map some jobs manually and for others use load balancer. + * <pre name="code" class="java"> + * public class MyFooBarTask extends GridComputeTaskAdapter<String, String> { + * // Inject load balancer. + * @GridLoadBalancerResource + * GridComputeLoadBalancer balancer; + * + * // Map jobs to grid nodes. + * public Map<? extends GridComputeJob, GridNode> map(List<GridNode> subgrid, String arg) throws GridException { + * Map<MyFooBarJob, GridNode> jobs = new HashMap<MyFooBarJob, GridNode>(subgrid.size()); + * + * // In more complex cases, you can actually do + * // more complicated assignments of jobs to nodes. + * for (int i = 0; i < subgrid.size(); i++) { + * // Pick the next best balanced node for the job. + * jobs.put(new MyFooBarJob(arg), balancer.getBalancedNode()) + * } + * + * return jobs; + * } + * + * // Aggregate results into one compound result. + * public String reduce(List<GridComputeJobResult> results) throws GridException { + * // For the purpose of this example we simply + * // concatenate string representation of every + * // job result + * StringBuilder buf = new StringBuilder(); + * + * for (GridComputeJobResult res : results) { + * // Append string representation of result + * // returned by every job. + * buf.append(res.getData().string()); + * } + * + * return buf.string(); + * } + * } + * </pre> + * <p> + * <h1 class="header">Configuration</h1> + * In order to use this load balancer, you should configure your grid instance + * to use {@code GridJobsLoadBalancingSpi} either from Spring XML file or + * directly. The following configuration parameters are supported: + * <h2 class="header">Mandatory</h2> + * This SPI has no mandatory configuration parameters. + * <h2 class="header">Optional</h2> + * This SPI has the following optional configuration parameters: + * <ul> + * <li> + * Adaptive node load probing implementation (see {@link #setLoadProbe(AdaptiveLoadProbe)}). + * This configuration parameter supplies a custom algorithm for probing a node's load. + * By default, {@link AdaptiveCpuLoadProbe} implementation is used which + * takes every node's CPU load and tries to send proportionally more jobs to less loaded nodes. + * </li> + * </ul> + * <p> + * Below is Java configuration example: + * <pre name="code" class="java"> + * GridAdaptiveLoadBalancingSpi spi = new GridAdaptiveLoadBalancingSpi(); + * + * // Configure probe to use latest job execution time vs. average. + * GridAdaptiveProcessingTimeLoadProbe probe = new GridAdaptiveProcessingTimeLoadProbe(false); + * + * spi.setLoadProbe(probe); + * + * GridConfiguration cfg = new GridConfiguration(); + * + * // Override default load balancing SPI. + * cfg.setLoadBalancingSpi(spi); + * + * // Starts grid. + * G.start(cfg); + * </pre> + * Here is how you can configure {@code GridJobsLoadBalancingSpi} using Spring XML configuration: + * <pre name="code" class="xml"> + * <property name="loadBalancingSpi"> + * <bean class="org.gridgain.grid.spi.loadBalancing.adaptive.GridAdaptiveLoadBalancingSpi"> + * <property name="loadProbe"> + * <bean class="org.gridgain.grid.spi.loadBalancing.adaptive.GridAdaptiveProcessingTimeLoadProbe"> + * <constructor-arg value="false"/> + * </bean> + * </property> + * </bean> + * </property> + * </pre> + * <p> + * <img src="http://www.gridgain.com/images/spring-small.png"> + * <br> + * For information about Spring framework visit <a href="http://www.springframework.org/">www.springframework.org</a> + */ +@IgniteSpiMultipleInstancesSupport(true) +public class AdaptiveLoadBalancingSpi extends IgniteSpiAdapter implements LoadBalancingSpi, + AdaptiveLoadBalancingSpiMBean { + /** Random number generator. */ + private static final Random RAND = new Random(); + + /** Grid logger. */ + @IgniteLoggerResource + private IgniteLogger log; + + /** */ + private AdaptiveLoadProbe probe = new AdaptiveCpuLoadProbe(); + + /** Local event listener to listen to task completion events. */ + private GridLocalEventListener evtLsnr; + + /** Task topologies. First pair value indicates whether or not jobs have been mapped. */ + private ConcurrentMap<IgniteUuid, IgniteBiTuple<Boolean, WeightedTopology>> taskTops = + new ConcurrentHashMap8<>(); + + /** */ + private final Map<UUID, AtomicInteger> nodeJobs = new HashMap<>(); + + /** */ + private final ReadWriteLock rwLock = new ReentrantReadWriteLock(); + + /** {@inheritDoc} */ + @Override public String getLoadProbeFormatted() { + return probe.toString(); + } + + /** + * Sets implementation of node load probe. By default {@link AdaptiveProcessingTimeLoadProbe} + * is used which proportionally distributes load based on the average job execution + * time on every node. + * + * @param probe Implementation of node load probe + */ + @IgniteSpiConfiguration(optional = true) + public void setLoadProbe(AdaptiveLoadProbe probe) { + A.ensure(probe != null, "probe != null"); + + this.probe = probe; + } + + /** {@inheritDoc} */ + @Override public void spiStart(@Nullable String gridName) throws IgniteSpiException { + startStopwatch(); + + assertParameter(probe != null, "loadProbe != null"); + + if (log.isDebugEnabled()) + log.debug(configInfo("loadProbe", probe)); + + registerMBean(gridName, this, AdaptiveLoadBalancingSpiMBean.class); + + // Ack ok start. + if (log.isDebugEnabled()) + log.debug(startInfo()); + } + + /** {@inheritDoc} */ + @Override public void spiStop() throws IgniteSpiException { + rwLock.writeLock().lock(); + + try { + nodeJobs.clear(); + } + finally { + rwLock.writeLock().unlock(); + } + + unregisterMBean(); + + // Ack ok stop. + if (log.isDebugEnabled()) + log.debug(stopInfo()); + } + + /** {@inheritDoc} */ + @Override protected void onContextInitialized0(IgniteSpiContext spiCtx) throws IgniteSpiException { + getSpiContext().addLocalEventListener(evtLsnr = new GridLocalEventListener() { + @Override public void onEvent(IgniteEvent evt) { + switch (evt.type()) { + case EVT_TASK_FINISHED: + case EVT_TASK_FAILED: { + IgniteTaskEvent taskEvt = (IgniteTaskEvent)evt; + + taskTops.remove(taskEvt.taskSessionId()); + + if (log.isDebugEnabled()) + log.debug("Removed task topology from topology cache for session: " + + taskEvt.taskSessionId()); + + break; + } + + case EVT_JOB_MAPPED: { + // We should keep topology and use cache in GridComputeTask#map() method to + // avoid O(n*n/2) complexity, after that we can drop caches. + // Here we set mapped property and later cache will be ignored + IgniteJobEvent jobEvt = (IgniteJobEvent)evt; + + IgniteBiTuple<Boolean, WeightedTopology> weightedTop = taskTops.get(jobEvt.taskSessionId()); + + if (weightedTop != null) + weightedTop.set1(true); + + if (log.isDebugEnabled()) + log.debug("Job has been mapped. Ignore cache for session: " + jobEvt.taskSessionId()); + + break; + } + + case EVT_NODE_METRICS_UPDATED: + case EVT_NODE_FAILED: + case EVT_NODE_JOINED: + case EVT_NODE_LEFT: { + IgniteDiscoveryEvent discoEvt = (IgniteDiscoveryEvent)evt; + + rwLock.writeLock().lock(); + + try { + switch (evt.type()) { + case EVT_NODE_JOINED: { + nodeJobs.put(discoEvt.eventNode().id(), new AtomicInteger(0)); + + break; + } + + case EVT_NODE_LEFT: + case EVT_NODE_FAILED: { + nodeJobs.remove(discoEvt.eventNode().id()); + + break; + } + + case EVT_NODE_METRICS_UPDATED: { + // Reset counter. + nodeJobs.put(discoEvt.eventNode().id(), new AtomicInteger(0)); + + break; + } + } + } + finally { + rwLock.writeLock().unlock(); + } + } + + } + } + }, + EVT_NODE_METRICS_UPDATED, + EVT_NODE_FAILED, + EVT_NODE_JOINED, + EVT_NODE_LEFT, + EVT_TASK_FINISHED, + EVT_TASK_FAILED, + EVT_JOB_MAPPED + ); + + // Put all known nodes. + rwLock.writeLock().lock(); + + try { + for (ClusterNode node : getSpiContext().nodes()) + nodeJobs.put(node.id(), new AtomicInteger(0)); + } + finally { + rwLock.writeLock().unlock(); + } + } + + /** {@inheritDoc} */ + @Override protected void onContextDestroyed0() { + if (evtLsnr != null) { + IgniteSpiContext ctx = getSpiContext(); + + if (ctx != null) + ctx.removeLocalEventListener(evtLsnr); + } + } + + /** {@inheritDoc} */ + @Override public ClusterNode getBalancedNode(ComputeTaskSession ses, List<ClusterNode> top, ComputeJob job) + throws GridException { + A.notNull(ses, "ses"); + A.notNull(top, "top"); + A.notNull(job, "job"); + + IgniteBiTuple<Boolean, WeightedTopology> weightedTop = taskTops.get(ses.getId()); + + // Create new cached topology if there is no one. Do not + // use cached topology after task has been mapped. + if (weightedTop == null) + // Called from GridComputeTask#map(). Put new topology and false as not mapped yet. + taskTops.put(ses.getId(), weightedTop = F.t(false, new WeightedTopology(top))); + // We have topology - check if task has been mapped. + else if (weightedTop.get1()) + // Do not use cache after GridComputeTask#map(). + return new WeightedTopology(top).pickWeightedNode(); + + return weightedTop.get2().pickWeightedNode(); + } + + /** + * Calculates node load based on set probe. + * + * @param top List of all nodes. + * @param node Node to get load for. + * @return Node load. + * @throws GridException If returned load is negative. + */ + @SuppressWarnings({"TooBroadScope"}) + private double getLoad(Collection<ClusterNode> top, ClusterNode node) throws GridException { + assert !F.isEmpty(top); + + int jobsSentSinceLastUpdate = 0; + + rwLock.readLock().lock(); + + try { + AtomicInteger cnt = nodeJobs.get(node.id()); + + jobsSentSinceLastUpdate = cnt == null ? 0 : cnt.get(); + } + finally { + rwLock.readLock().unlock(); + } + + double load = probe.getLoad(node, jobsSentSinceLastUpdate); + + if (load < 0) + throw new GridException("Failed to obtain non-negative load from adaptive load probe: " + load); + + return load; + } + + /** + * Holder for weighted topology. + */ + private class WeightedTopology { + /** Topology sorted by weight. */ + private final SortedMap<Double, ClusterNode> circle = new TreeMap<>(); + + /** + * @param top Task topology. + * @throws GridException If any load was negative. + */ + WeightedTopology(List<ClusterNode> top) throws GridException { + assert !F.isEmpty(top); + + double totalLoad = 0; + + // We need to cache loads here to avoid calls later as load might be + // changed between the calls. + double[] nums = new double[top.size()]; + + int zeroCnt = 0; + + // Compute loads. + for (int i = 0; i < top.size(); i++) { + double load = getLoad(top, top.get(i)); + + nums[i] = load; + + if (load == 0) + zeroCnt++; + + totalLoad += load; + } + + // Take care of zero loads. + if (zeroCnt > 0) { + double newTotal = totalLoad; + + int nonZeroCnt = top.size() - zeroCnt; + + for (int i = 0; i < nums.length; i++) { + double load = nums[i]; + + if (load == 0) { + if (nonZeroCnt > 0) + load = totalLoad / nonZeroCnt; + + if (load == 0) + load = 1; + + nums[i] = load; + + newTotal += load; + } + } + + totalLoad = newTotal; + } + + double totalWeight = 0; + + // Calculate weights and total weight. + for (int i = 0; i < nums.length; i++) { + assert nums[i] > 0 : "Invalid load: " + nums[i]; + + double weight = totalLoad / nums[i]; + + // Convert to weight. + nums[i] = weight; + + totalWeight += weight; + } + + double weight = 0; + + // Enforce range from 0 to 1. + for (int i = 0; i < nums.length; i++) { + weight = i == nums.length - 1 ? 1.0d : weight + nums[i] / totalWeight; + + assert weight < 2 : "Invalid weight: " + weight; + + // Complexity of this put is O(logN). + circle.put(weight, top.get(i)); + } + } + + /** + * Gets weighted node in random fashion. + * + * @return Weighted node. + */ + ClusterNode pickWeightedNode() { + double weight = RAND.nextDouble(); + + SortedMap<Double, ClusterNode> pick = circle.tailMap(weight); + + ClusterNode node = pick.get(pick.firstKey()); + + rwLock.readLock().lock(); + + try { + AtomicInteger cnt = nodeJobs.get(node.id()); + + if (cnt != null) + cnt.incrementAndGet(); + } + finally { + rwLock.readLock().unlock(); + } + + return node; + } + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(AdaptiveLoadBalancingSpi.class, this); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a62862fe/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/adaptive/AdaptiveLoadBalancingSpiMBean.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/adaptive/AdaptiveLoadBalancingSpiMBean.java b/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/adaptive/AdaptiveLoadBalancingSpiMBean.java new file mode 100644 index 0000000..5553635 --- /dev/null +++ b/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/adaptive/AdaptiveLoadBalancingSpiMBean.java @@ -0,0 +1,27 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.gridgain.grid.spi.loadbalancing.adaptive; + +import org.apache.ignite.mbean.*; +import org.apache.ignite.spi.*; + +/** + * Management MBean for {@link AdaptiveLoadBalancingSpi} SPI. + */ +@IgniteMBeanDescription("MBean that provides access to adaptive load balancing SPI configuration.") +public interface AdaptiveLoadBalancingSpiMBean extends IgniteSpiManagementMBean { + /** + * Gets text description of current load probing implementation used. + * + * @return Text description of current load probing implementation used. + */ + @IgniteMBeanDescription("Text description of current load probing implementation used.") + public String getLoadProbeFormatted(); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a62862fe/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/adaptive/AdaptiveLoadProbe.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/adaptive/AdaptiveLoadProbe.java b/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/adaptive/AdaptiveLoadProbe.java new file mode 100644 index 0000000..512536f --- /dev/null +++ b/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/adaptive/AdaptiveLoadProbe.java @@ -0,0 +1,90 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.gridgain.grid.spi.loadbalancing.adaptive; + +import org.apache.ignite.cluster.*; + +/** + * Pluggable implementation of node load probing. Implementations + * of this can be configured to be used with {@link AdaptiveLoadBalancingSpi} + * by setting {@link AdaptiveLoadBalancingSpi#setLoadProbe(AdaptiveLoadProbe)} + * configuration parameter. + * <p> + * Note that if {@link #getLoad(org.apache.ignite.cluster.ClusterNode, int)} returns a value of {@code 0}, + * then implementation will assume that load value is simply not available and + * will try to calculate an average of load values for other nodes. If such + * average cannot be obtained (all node load values are {@code 0}), then a value + * of {@code 1} will be used. + * <p> + * By default, {@link AdaptiveCpuLoadProbe} probing implementation is used. + * <p> + * <h1 class="header">Example</h1> + * Here is an example of how probing can be implemented to use + * number of active and waiting jobs as probing mechanism: + * <pre name="code" class="java"> + * public class FooBarLoadProbe implements GridAdaptiveLoadProbe { + * // Flag indicating whether to use average value or current. + * private int useAvg = true; + * + * public FooBarLoadProbe(boolean useAvg) { + * this.useAvg = useAvg; + * } + * + * // Calculate load based on number of active and waiting jobs. + * public double getLoad(GridNode node, int jobsSentSinceLastUpdate) { + * GridNodeMetrics metrics = node.getMetrics(); + * + * if (useAvg) { + * double load = metrics.getAverageActiveJobs() + metrics.getAverageWaitingJobs(); + * + * if (load > 0) { + * return load; + * } + * } + * + * return metrics.getCurrentActiveJobs() + metrics.getCurrentWaitingJobs(); + * } + * } + * </pre> + * Below is an example of how a probe shown above would be configured with {@link AdaptiveLoadBalancingSpi} + * SPI: + * <pre name="code" class="xml"> + * <property name="loadBalancingSpi"> + * <bean class="org.gridgain.grid.spi.loadBalancing.adaptive.GridAdaptiveLoadBalancingSpi"> + * <property name="loadProbe"> + * <bean class="foo.bar.FooBarLoadProbe"> + * <constructor-arg value="true"/> + * </bean> + * </property> + * </bean> + * </property> + * </pre> + */ +public interface AdaptiveLoadProbe { + /** + * Calculates load value for a given node. Specific implementations would + * usually take into account some of the values provided by + * {@link org.apache.ignite.cluster.ClusterNode#metrics()} method. For example, load can be calculated + * based on job execution time or number of active jobs, or CPU/Heap utilization. + * <p> + * Note that if this method returns a value of {@code 0}, + * then implementation will assume that load value is simply not available and + * will try to calculate an average of load values for other nodes. If such + * average cannot be obtained (all node load values are {@code 0}), then a value + * of {@code 1} will be used. + * + * @param node Grid node to calculate load for. + * @param jobsSentSinceLastUpdate Number of jobs sent to this node since + * last metrics update. This parameter may be useful when + * implementation takes into account the current job count on a node. + * @return Non-negative load value for the node (zero and above). + */ + public double getLoad(ClusterNode node, int jobsSentSinceLastUpdate); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a62862fe/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/adaptive/AdaptiveProcessingTimeLoadProbe.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/adaptive/AdaptiveProcessingTimeLoadProbe.java b/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/adaptive/AdaptiveProcessingTimeLoadProbe.java new file mode 100644 index 0000000..dc9e250 --- /dev/null +++ b/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/adaptive/AdaptiveProcessingTimeLoadProbe.java @@ -0,0 +1,98 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.gridgain.grid.spi.loadbalancing.adaptive; + +import org.apache.ignite.cluster.*; +import org.gridgain.grid.util.typedef.internal.*; + +/** + * Implementation of node load probing based on total job processing time. + * Based on {@link #setUseAverage(boolean)} + * parameter, this implementation will either use average job execution + * time values or current (default is to use averages). The algorithm + * returns a sum of job wait time and job execution time. + * <p> + * Below is an example of how CPU load probe would be configured in GridGain + * Spring configuration file: + * <pre name="code" class="xml"> + * <property name="loadBalancingSpi"> + * <bean class="org.gridgain.grid.spi.loadBalancing.adaptive.GridAdaptiveLoadBalancingSpi"> + * <property name="loadProbe"> + * <bean class="org.gridgain.grid.spi.loadBalancing.adaptive.GridAdaptiveProcessingTimeLoadProbe"> + * <property name="useAverage" value="true"/> + * </bean> + * </property> + * </bean> + * </property> + * </pre> + */ +public class AdaptiveProcessingTimeLoadProbe implements AdaptiveLoadProbe { + /** Flag indicating whether to use average execution time vs. current. */ + private boolean useAvg = true; + + /** + * Initializes execution time load probe to use + * execution time average by default. + */ + public AdaptiveProcessingTimeLoadProbe() { + // No-op. + } + + /** + * Specifies whether to use average execution time vs. current. + * + * @param useAvg Flag indicating whether to use average execution time vs. current. + */ + public AdaptiveProcessingTimeLoadProbe(boolean useAvg) { + this.useAvg = useAvg; + } + + /** + * Gets flag indicating whether to use average execution time vs. current. + * + * @return Flag indicating whether to use average execution time vs. current. + */ + public boolean isUseAverage() { + return useAvg; + } + + /** + * Sets flag indicating whether to use average execution time vs. current. + * + * @param useAvg Flag indicating whether to use average execution time vs. current. + */ + public void setUseAverage(boolean useAvg) { + this.useAvg = useAvg; + } + + + /** {@inheritDoc} */ + @Override public double getLoad(ClusterNode node, int jobsSentSinceLastUpdate) { + ClusterNodeMetrics metrics = node.metrics(); + + if (useAvg) { + double load = metrics.getAverageJobExecuteTime() + metrics.getAverageJobWaitTime(); + + // If load is greater than 0, then we can use average times. + // Otherwise, we will proceed to using current times. + if (load > 0) + return load; + } + + double load = metrics.getCurrentJobExecuteTime() + metrics.getCurrentJobWaitTime(); + + return load < 0 ? 0 : load; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(AdaptiveProcessingTimeLoadProbe.class, this); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a62862fe/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/adaptive/GridAdaptiveCpuLoadProbe.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/adaptive/GridAdaptiveCpuLoadProbe.java b/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/adaptive/GridAdaptiveCpuLoadProbe.java deleted file mode 100644 index a9c929c..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/adaptive/GridAdaptiveCpuLoadProbe.java +++ /dev/null @@ -1,229 +0,0 @@ -/* @java.file.header */ - -/* _________ _____ __________________ _____ - * __ ____/___________(_)______ /__ ____/______ ____(_)_______ - * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ - * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / - * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ - */ - -package org.gridgain.grid.spi.loadbalancing.adaptive; - -import org.apache.ignite.cluster.*; -import org.gridgain.grid.util.typedef.internal.*; - -/** - * Implementation of node load probing based on CPU load. - * <p> - * Based on {@link #setUseAverage(boolean)} - * parameter, this implementation will either use average CPU load - * values or current (default is to use averages). - * <p> - * Based on {@link #setUseProcessors(boolean)} parameter, this implementation - * will either take number of processors on the node into account or not. - * Since CPU load on multi-processor boxes shows medium load of multiple CPU's it - * usually means that the remaining capacity is proportional to the number of - * CPU's (or cores) on the node. This configuration parameter indicates - * whether to divide each node's CPU load by the number of processors on that node - * (default is {@code true}). - * <p> - * Also note that in some environments every processor may not be adding 100% of - * processing power. For example, if you are using multi-core CPU's, then addition of - * every core would probably result in about 75% of extra CPU power. To account - * for that, you should set {@link #setProcessorCoefficient(double)} parameter to - * {@code 0.75} . - * <p> - * Below is an example of how CPU load probe would be configured in GridGain - * Spring configuration file: - * <pre name="code" class="xml"> - * <property name="loadBalancingSpi"> - * <bean class="org.gridgain.grid.spi.loadBalancing.adaptive.GridAdaptiveLoadBalancingSpi"> - * <property name="loadProbe"> - * <bean class="org.gridgain.grid.spi.loadBalancing.adaptive.GridAdaptiveCpuLoadProbe"> - * <property name="useAverage" value="true"/> - * <property name="useProcessors" value="true"/> - * <property name="processorCoefficient" value="0.9"/> - * </bean> - * </property> - * </bean> - * </property> - * </pre> - * <p> - * This implementation is used by default by {@link GridAdaptiveLoadBalancingSpi} SPI. - */ -public class GridAdaptiveCpuLoadProbe implements GridAdaptiveLoadProbe { - /** Flag indicating whether to use average CPU load vs. current. */ - private boolean useAvg = true; - - /** - * Flag indicating whether to divide each node's CPU load - * by the number of processors on that node. - */ - private boolean useProcs = true; - - /** - * Coefficient of every CPU processor. By default it is {@code 1}, but - * in some environments every processor may not be adding 100% of processing - * power. For example, if you are using multi-core CPU's, then addition of - * every core would probably result in about 75% of extra CPU power, and hence - * you would set this coefficient to {@code 0.75} . - */ - private double procCoefficient = 1; - - /** - * Initializes CPU load probe to use CPU load average by default. - */ - public GridAdaptiveCpuLoadProbe() { - // No-op. - } - - /** - * Specifies whether to use average CPU load vs. current and whether or - * not to take number of processors into account. - * <p> - * Since CPU load on multi-processor boxes shows medium load of multiple CPU's it - * usually means that the remaining capacity is proportional to the number of - * CPU's (or cores) on the node. - * - * @param useAvg Flag indicating whether to use average CPU load vs. current - * (default is {@code true}). - * @param useProcs Flag indicating whether to divide each node's CPU load - * by the number of processors on that node (default is {@code true}). - */ - public GridAdaptiveCpuLoadProbe(boolean useAvg, boolean useProcs) { - this.useAvg = useAvg; - this.useProcs = useProcs; - } - - /** - * Specifies whether to use average CPU load vs. current and whether or - * not to take number of processors into account. It also allows to - * specify the coefficient of addition power every CPU adds. - * <p> - * Since CPU load on multi-processor boxes shows medium load of multiple CPU's it - * usually means that the remaining capacity is proportional to the number of - * CPU's (or cores) on the node. - * <p> - * Also, in some environments every processor may not be adding 100% of processing - * power. For example, if you are using multi-core CPU's, then addition of - * every core would probably result in about 75% of extra CPU power, and hence - * you would set this coefficient to {@code 0.75} . - * - * @param useAvg Flag indicating whether to use average CPU load vs. current - * (default is {@code true}). - * @param useProcs Flag indicating whether to divide each node's CPU load - * by the number of processors on that node (default is {@code true}). - * @param procCoefficient Coefficient of every CPU processor (default value is {@code 1}). - */ - public GridAdaptiveCpuLoadProbe(boolean useAvg, boolean useProcs, double procCoefficient) { - this.useAvg = useAvg; - this.useProcs = useProcs; - this.procCoefficient = procCoefficient; - } - - /** - * Gets flag indicating whether to use average CPU load vs. current. - * - * @return Flag indicating whether to use average CPU load vs. current. - */ - public boolean isUseAverage() { - return useAvg; - } - - /** - * Sets flag indicating whether to use average CPU load vs. current. - * If not explicitly set, then default value is {@code true}. - * - * @param useAvg Flag indicating whether to use average CPU load vs. current. - */ - public void setUseAverage(boolean useAvg) { - this.useAvg = useAvg; - } - - /** - * Gets flag indicating whether to use average CPU load vs. current - * (default is {@code true}). - * <p> - * Since CPU load on multi-processor boxes shows medium load of multiple CPU's it - * usually means that the remaining capacity is proportional to the number of - * CPU's (or cores) on the node. - * - * @return Flag indicating whether to divide each node's CPU load - * by the number of processors on that node (default is {@code true}). - */ - public boolean isUseProcessors() { - return useProcs; - } - - /** - * Sets flag indicating whether to use average CPU load vs. current - * (default is {@code true}). - * <p> - * Since CPU load on multi-processor boxes shows medium load of multiple CPU's it - * usually means that the remaining capacity is proportional to the number of - * CPU's (or cores) on the node. - * <p> - * If not explicitly set, then default value is {@code true}. - * - * @param useProcs Flag indicating whether to divide each node's CPU load - * by the number of processors on that node (default is {@code true}). - */ - public void setUseProcessors(boolean useProcs) { - this.useProcs = useProcs; - } - - /** - * Gets coefficient of every CPU processor. By default it is {@code 1}, but - * in some environments every processor may not be adding 100% of processing - * power. For example, if you are using multi-core CPU's, then addition of - * every core would probably result in about 75% of extra CPU power, and hence - * you would set this coefficient to {@code 0.75} . - * <p> - * This value is ignored if {@link #isUseProcessors()} is set to {@code false}. - * - * @return Coefficient of every CPU processor. - */ - public double getProcessorCoefficient() { - return procCoefficient; - } - - /** - * Sets coefficient of every CPU processor. By default it is {@code 1}, but - * in some environments every processor may not be adding 100% of processing - * power. For example, if you are using multi-core CPU's, then addition of - * every core would probably result in about 75% of extra CPU power, and hence - * you would set this coefficient to {@code 0.75} . - * <p> - * This value is ignored if {@link #isUseProcessors()} is set to {@code false}. - * - * @param procCoefficient Coefficient of every CPU processor. - */ - public void setProcessorCoefficient(double procCoefficient) { - A.ensure(procCoefficient > 0, "procCoefficient > 0"); - - this.procCoefficient = procCoefficient; - } - - /** {@inheritDoc} */ - @Override public double getLoad(ClusterNode node, int jobsSentSinceLastUpdate) { - ClusterNodeMetrics metrics = node.metrics(); - - double k = 1.0d; - - if (useProcs) { - int procs = metrics.getTotalCpus(); - - if (procs > 1) - k = procs * procCoefficient; - } - - double load = (useAvg ? metrics.getAverageCpuLoad() : metrics.getCurrentCpuLoad()) / k; - - return load < 0 ? 0 : load; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridAdaptiveCpuLoadProbe.class, this); - } -}