# Renaming
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/b69a23cd Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/b69a23cd Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/b69a23cd Branch: refs/heads/master Commit: b69a23cd226056307fc288af69e7863dbfe19181 Parents: a62862f Author: sboikov <sboi...@gridgain.com> Authored: Fri Dec 5 17:09:19 2014 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Fri Dec 5 17:09:21 2014 +0300 ---------------------------------------------------------------------- .../java/org/apache/ignite/IgniteCompute.java | 2 +- .../ignite/compute/ComputeLoadBalancer.java | 2 +- .../apache/ignite/compute/ComputeTaskSpis.java | 2 +- .../configuration/IgniteConfiguration.java | 8 +- .../resources/IgniteLoadBalancerResource.java | 2 +- .../ignite/spi/failover/FailoverContext.java | 2 +- .../spi/loadbalancing/LoadBalancingSpi.java | 114 ++++ .../adaptive/AdaptiveCpuLoadProbe.java | 229 ++++++++ .../adaptive/AdaptiveJobCountLoadProbe.java | 96 +++ .../adaptive/AdaptiveLoadBalancingSpi.java | 581 +++++++++++++++++++ .../adaptive/AdaptiveLoadBalancingSpiMBean.java | 27 + .../adaptive/AdaptiveLoadProbe.java | 90 +++ .../AdaptiveProcessingTimeLoadProbe.java | 98 ++++ .../spi/loadbalancing/adaptive/package.html | 15 + .../ignite/spi/loadbalancing/package.html | 15 + .../RoundRobinGlobalLoadBalancer.java | 305 ++++++++++ .../roundrobin/RoundRobinLoadBalancingSpi.java | 319 ++++++++++ .../RoundRobinLoadBalancingSpiMBean.java | 37 ++ .../RoundRobinPerTaskLoadBalancer.java | 96 +++ .../spi/loadbalancing/roundrobin/package.html | 15 + .../WeightedRandomLoadBalancingSpi.java | 394 +++++++++++++ .../WeightedRandomLoadBalancingSpiMBean.java | 37 ++ .../loadbalancing/weightedrandom/package.html | 15 + .../affinity/GridCacheAffinityKeyMapped.java | 2 +- .../org/gridgain/grid/kernal/GridGainEx.java | 4 +- .../loadbalancer/GridLoadBalancerManager.java | 2 +- .../spi/loadbalancing/LoadBalancingSpi.java | 114 ---- .../adaptive/AdaptiveCpuLoadProbe.java | 229 -------- .../adaptive/AdaptiveJobCountLoadProbe.java | 96 --- .../adaptive/AdaptiveLoadBalancingSpi.java | 581 ------------------- .../adaptive/AdaptiveLoadBalancingSpiMBean.java | 27 - .../adaptive/AdaptiveLoadProbe.java | 90 --- .../AdaptiveProcessingTimeLoadProbe.java | 98 ---- .../spi/loadbalancing/adaptive/package.html | 15 - .../grid/spi/loadbalancing/package.html | 15 - .../RoundRobinGlobalLoadBalancer.java | 305 ---------- .../roundrobin/RoundRobinLoadBalancingSpi.java | 319 ---------- .../RoundRobinLoadBalancingSpiMBean.java | 37 -- .../RoundRobinPerTaskLoadBalancer.java | 96 --- .../spi/loadbalancing/roundrobin/package.html | 15 - .../WeightedRandomLoadBalancingSpi.java | 394 ------------- .../WeightedRandomLoadBalancingSpiMBean.java | 37 -- .../loadbalancing/weightedrandom/package.html | 15 - .../src/test/config/io-manager-benchmark.xml | 2 +- modules/core/src/test/config/jobs-load-base.xml | 2 +- .../src/test/config/load/merge-sort-base.xml | 2 +- .../config/spring-cache-put-remove-load.xml | 2 +- ...dAdaptiveLoadBalancingSpiConfigSelfTest.java | 26 + ...iveLoadBalancingSpiMultipleNodeSelfTest.java | 87 +++ .../GridAdaptiveLoadBalancingSpiSelfTest.java | 125 ++++ ...aptiveLoadBalancingSpiStartStopSelfTest.java | 23 + .../spi/loadbalancing/adaptive/package.html | 15 + .../ignite/spi/loadbalancing/package.html | 15 + ...alancingNotPerTaskMultithreadedSelfTest.java | 115 ++++ ...dRobinLoadBalancingSpiLocalNodeSelfTest.java | 44 ++ ...inLoadBalancingSpiMultipleNodesSelfTest.java | 126 ++++ ...RobinLoadBalancingSpiNotPerTaskSelfTest.java | 121 ++++ ...dRobinLoadBalancingSpiStartStopSelfTest.java | 23 + ...nLoadBalancingSpiTopologyChangeSelfTest.java | 98 ++++ .../roundrobin/GridRoundRobinTestUtils.java | 95 +++ .../spi/loadbalancing/roundrobin/package.html | 15 + ...tedRandomLoadBalancingSpiConfigSelfTest.java | 26 + ...dWeightedRandomLoadBalancingSpiSelfTest.java | 58 ++ ...RandomLoadBalancingSpiStartStopSelfTest.java | 23 + ...dRandomLoadBalancingSpiWeightedSelfTest.java | 73 +++ .../loadbalancing/weightedrandom/package.html | 15 + .../grid/kernal/GridMultipleSpisSelfTest.java | 2 +- .../managers/GridManagerStopSelfTest.java | 2 +- ...dAdaptiveLoadBalancingSpiConfigSelfTest.java | 26 - ...iveLoadBalancingSpiMultipleNodeSelfTest.java | 87 --- .../GridAdaptiveLoadBalancingSpiSelfTest.java | 125 ---- ...aptiveLoadBalancingSpiStartStopSelfTest.java | 23 - .../spi/loadbalancing/adaptive/package.html | 15 - .../grid/spi/loadbalancing/package.html | 15 - ...alancingNotPerTaskMultithreadedSelfTest.java | 115 ---- ...dRobinLoadBalancingSpiLocalNodeSelfTest.java | 44 -- ...inLoadBalancingSpiMultipleNodesSelfTest.java | 126 ---- ...RobinLoadBalancingSpiNotPerTaskSelfTest.java | 121 ---- ...dRobinLoadBalancingSpiStartStopSelfTest.java | 23 - ...nLoadBalancingSpiTopologyChangeSelfTest.java | 98 ---- .../roundrobin/GridRoundRobinTestUtils.java | 95 --- .../spi/loadbalancing/roundrobin/package.html | 15 - ...tedRandomLoadBalancingSpiConfigSelfTest.java | 26 - ...dWeightedRandomLoadBalancingSpiSelfTest.java | 58 -- ...RandomLoadBalancingSpiStartStopSelfTest.java | 23 - ...dRandomLoadBalancingSpiWeightedSelfTest.java | 73 --- .../loadbalancing/weightedrandom/package.html | 15 - .../GridSpiLoadBalancingSelfTestSuite.java | 6 +- pom.xml | 4 +- 89 files changed, 3630 insertions(+), 3630 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b69a23cd/modules/core/src/main/java/org/apache/ignite/IgniteCompute.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteCompute.java b/modules/core/src/main/java/org/apache/ignite/IgniteCompute.java index 875b22a..39818ee 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteCompute.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteCompute.java @@ -51,7 +51,7 @@ import java.util.concurrent.*; * {@link Serializable} and should be used to run computations on the grid. * <h1 class="header">Load Balancing</h1> * In all cases other than {@code broadcast(...)}, GridGain must select a node for a computation - * to be executed. The node will be selected based on the underlying {@link org.gridgain.grid.spi.loadbalancing.LoadBalancingSpi}, + * to be executed. The node will be selected based on the underlying {@link org.apache.ignite.spi.loadbalancing.LoadBalancingSpi}, * which by default sequentially picks next available node from grid projection. Other load balancing * policies, such as {@code random} or {@code adaptive}, can be configured as well by selecting * a different load balancing SPI in grid configuration. If your logic requires some custom http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b69a23cd/modules/core/src/main/java/org/apache/ignite/compute/ComputeLoadBalancer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/compute/ComputeLoadBalancer.java b/modules/core/src/main/java/org/apache/ignite/compute/ComputeLoadBalancer.java index edba366..54a265c 100644 --- a/modules/core/src/main/java/org/apache/ignite/compute/ComputeLoadBalancer.java +++ b/modules/core/src/main/java/org/apache/ignite/compute/ComputeLoadBalancer.java @@ -18,7 +18,7 @@ import java.util.*; /** * Load balancer is used for finding the best balanced node according * to load balancing policy. Internally load balancer will - * query the {@link org.gridgain.grid.spi.loadbalancing.LoadBalancingSpi} + * query the {@link org.apache.ignite.spi.loadbalancing.LoadBalancingSpi} * to get the balanced node. * <p> * Load balancer can be used <i>explicitly</i> from inside {@link ComputeTask#map(List, Object)} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b69a23cd/modules/core/src/main/java/org/apache/ignite/compute/ComputeTaskSpis.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/compute/ComputeTaskSpis.java b/modules/core/src/main/java/org/apache/ignite/compute/ComputeTaskSpis.java index 6c1d231..33dd020 100644 --- a/modules/core/src/main/java/org/apache/ignite/compute/ComputeTaskSpis.java +++ b/modules/core/src/main/java/org/apache/ignite/compute/ComputeTaskSpis.java @@ -14,7 +14,7 @@ import java.lang.annotation.*; /** * This annotation allows task to specify what SPIs it wants to use. * Starting with {@code GridGain 2.1} you can start multiple instances - * of {@link org.gridgain.grid.spi.loadbalancing.LoadBalancingSpi}, + * of {@link org.apache.ignite.spi.loadbalancing.LoadBalancingSpi}, * {@link org.apache.ignite.spi.failover.FailoverSpi}, and {@link org.apache.ignite.spi.checkpoint.CheckpointSpi}. If you do that, * you need to tell a task which SPI to use (by default it will use the fist * SPI in the list). http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b69a23cd/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java index d3386d2..f8b2547 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java @@ -35,7 +35,7 @@ import org.apache.ignite.spi.deployment.*; import org.apache.ignite.spi.discovery.*; import org.apache.ignite.spi.eventstorage.*; import org.apache.ignite.spi.failover.*; -import org.gridgain.grid.spi.loadbalancing.*; +import org.apache.ignite.spi.loadbalancing.*; import org.gridgain.grid.spi.securesession.*; import org.gridgain.grid.spi.securesession.noop.*; import org.gridgain.grid.spi.swapspace.*; @@ -2073,7 +2073,7 @@ public class IgniteConfiguration { /** * Should return fully configured load balancing SPI implementation. If not provided, - * {@link org.gridgain.grid.spi.loadbalancing.roundrobin.RoundRobinLoadBalancingSpi} will be used. + * {@link org.apache.ignite.spi.loadbalancing.roundrobin.RoundRobinLoadBalancingSpi} will be used. * * @return Grid load balancing SPI implementation or {@code null} to use default implementation. */ @@ -2114,9 +2114,9 @@ public class IgniteConfiguration { } /** - * Sets fully configured instance of {@link org.gridgain.grid.spi.loadbalancing.LoadBalancingSpi}. + * Sets fully configured instance of {@link org.apache.ignite.spi.loadbalancing.LoadBalancingSpi}. * - * @param loadBalancingSpi Fully configured instance of {@link org.gridgain.grid.spi.loadbalancing.LoadBalancingSpi} or + * @param loadBalancingSpi Fully configured instance of {@link org.apache.ignite.spi.loadbalancing.LoadBalancingSpi} or * {@code null} if no SPI provided. * @see IgniteConfiguration#getLoadBalancingSpi() */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b69a23cd/modules/core/src/main/java/org/apache/ignite/resources/IgniteLoadBalancerResource.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/resources/IgniteLoadBalancerResource.java b/modules/core/src/main/java/org/apache/ignite/resources/IgniteLoadBalancerResource.java index 6479e94..2edea7b 100644 --- a/modules/core/src/main/java/org/apache/ignite/resources/IgniteLoadBalancerResource.java +++ b/modules/core/src/main/java/org/apache/ignite/resources/IgniteLoadBalancerResource.java @@ -14,7 +14,7 @@ import java.lang.annotation.*; /** * Annotates a field or a setter method for injection of {@link org.apache.ignite.compute.ComputeLoadBalancer}. * Specific implementation for grid load balancer is defined by - * {@link org.gridgain.grid.spi.loadbalancing.LoadBalancingSpi} + * {@link org.apache.ignite.spi.loadbalancing.LoadBalancingSpi} * which is provided to grid via {@link org.apache.ignite.configuration.IgniteConfiguration}.. * <p> * Load balancer can be injected into instances of following classes: http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b69a23cd/modules/core/src/main/java/org/apache/ignite/spi/failover/FailoverContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/failover/FailoverContext.java b/modules/core/src/main/java/org/apache/ignite/spi/failover/FailoverContext.java index 03603cb..64c6af7 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/failover/FailoverContext.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/failover/FailoverContext.java @@ -36,7 +36,7 @@ public interface FailoverContext { /** * Gets the next balanced node for failed job. Internally this method will - * delegate to load balancing SPI (see {@link org.gridgain.grid.spi.loadbalancing.LoadBalancingSpi} to + * delegate to load balancing SPI (see {@link org.apache.ignite.spi.loadbalancing.LoadBalancingSpi} to * determine the optimal node for execution. * * @param top Topology to pick balanced node from. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b69a23cd/modules/core/src/main/java/org/apache/ignite/spi/loadbalancing/LoadBalancingSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/loadbalancing/LoadBalancingSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/loadbalancing/LoadBalancingSpi.java new file mode 100644 index 0000000..e3713c5 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/loadbalancing/LoadBalancingSpi.java @@ -0,0 +1,114 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.apache.ignite.spi.loadbalancing; + +import org.apache.ignite.cluster.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.spi.*; +import org.gridgain.grid.*; + +import java.util.*; + +/** + * Load balancing SPI provides the next best balanced node for job + * execution. This SPI is used either implicitly or explicitly whenever + * a job gets mapped to a node during {@link org.apache.ignite.compute.ComputeTask#map(List, Object)} + * invocation. + * <h1 class="header">Coding Examples</h1> + * If you are using {@link org.apache.ignite.compute.ComputeTaskSplitAdapter} then load balancing logic + * is transparent to your code and is handled automatically by the adapter. + * Here is an example of how your task could look: + * <pre name="code" class="java"> + * public class MyFooBarTask extends GridComputeTaskSplitAdapter<Object,Object> { + * @Override + * protected Collection<? extends GridComputeJob> split(int gridSize, Object arg) throws GridException { + * List<MyFooBarJob> jobs = new ArrayList<MyFooBarJob>(gridSize); + * + * for (int i = 0; i < gridSize; i++) { + * jobs.add(new MyFooBarJob(arg)); + * } + * + * // Node assignment via load balancer + * // happens automatically. + * return jobs; + * } + * ... + * } + * </pre> + * If you need more fine-grained control over how some jobs within task get mapped to a node + * <i>and</i> use, for example, affinity load balancing for some other jobs within task, then you should use + * {@link org.apache.ignite.compute.ComputeTaskAdapter}. Here is an example of how your task could look. Note that in this + * case we manually inject load balancer and use it to pick the best node. Doing it in + * such way would allow user to map some jobs manually and for others use load balancer. + * <pre name="code" class="java"> + * public class MyFooBarTask extends GridComputeTaskAdapter<String,String> { + * // Inject load balancer. + * @GridLoadBalancerResource + * GridComputeLoadBalancer balancer; + * + * // Map jobs to grid nodes. + * public Map<? extends GridComputeJob, GridNode> map(List<GridNode> subgrid, String arg) throws GridException { + * Map<MyFooBarJob, GridNode> jobs = new HashMap<MyFooBarJob, GridNode>(subgrid.size()); + * + * // In more complex cases, you can actually do + * // more complicated assignments of jobs to nodes. + * for (int i = 0; i < subgrid.size(); i++) { + * // Pick the next best balanced node for the job. + * GridComputeJob myJob = new MyFooBarJob(arg); + * + * jobs.put(myJob, balancer.getBalancedNode(myJob, null)); + * } + * + * return jobs; + * } + * + * // Aggregate results into one compound result. + * public String reduce(List<GridComputeJobResult> results) throws GridException { + * // For the purpose of this example we simply + * // concatenate string representation of every + * // job result + * StringBuilder buf = new StringBuilder(); + * + * for (GridComputeJobResult res : results) { + * // Append string representation of result + * // returned by every job. + * buf.append(res.getData().toString()); + * } + * + * return buf.toString(); + * } + * } + * </pre> + * <p> + * GridGain comes with the following load balancing SPI implementations out of the box: + * <ul> + * <li>{@link org.apache.ignite.spi.loadbalancing.roundrobin.RoundRobinLoadBalancingSpi} - default</li> + * <li>{@link org.apache.ignite.spi.loadbalancing.adaptive.AdaptiveLoadBalancingSpi}</li> + * <li>{@link org.apache.ignite.spi.loadbalancing.weightedrandom.WeightedRandomLoadBalancingSpi}</li> + * </ul> + * <b>NOTE:</b> this SPI (i.e. methods in this interface) should never be used directly. SPIs provide + * internal view on the subsystem and is used internally by GridGain kernal. In rare use cases when + * access to a specific implementation of this SPI is required - an instance of this SPI can be obtained + * via {@link org.apache.ignite.Ignite#configuration()} method to check its configuration properties or call other non-SPI + * methods. Note again that calling methods from this interface on the obtained instance can lead + * to undefined behavior and explicitly not supported. + */ +public interface LoadBalancingSpi extends IgniteSpi { + /** + * Gets balanced node for specified job within given task session. + * + * @param ses Grid task session for currently executing task. + * @param top Topology of task nodes from which to pick the best balanced node for given job. + * @param job Job for which to pick the best balanced node. + * @throws GridException If failed to get next balanced node. + * @return Best balanced node for the given job within given task session. + */ + public ClusterNode getBalancedNode(ComputeTaskSession ses, List<ClusterNode> top, ComputeJob job) throws GridException; +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b69a23cd/modules/core/src/main/java/org/apache/ignite/spi/loadbalancing/adaptive/AdaptiveCpuLoadProbe.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/loadbalancing/adaptive/AdaptiveCpuLoadProbe.java b/modules/core/src/main/java/org/apache/ignite/spi/loadbalancing/adaptive/AdaptiveCpuLoadProbe.java new file mode 100644 index 0000000..9db2d84 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/loadbalancing/adaptive/AdaptiveCpuLoadProbe.java @@ -0,0 +1,229 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.apache.ignite.spi.loadbalancing.adaptive; + +import org.apache.ignite.cluster.*; +import org.gridgain.grid.util.typedef.internal.*; + +/** + * Implementation of node load probing based on CPU load. + * <p> + * Based on {@link #setUseAverage(boolean)} + * parameter, this implementation will either use average CPU load + * values or current (default is to use averages). + * <p> + * Based on {@link #setUseProcessors(boolean)} parameter, this implementation + * will either take number of processors on the node into account or not. + * Since CPU load on multi-processor boxes shows medium load of multiple CPU's it + * usually means that the remaining capacity is proportional to the number of + * CPU's (or cores) on the node. This configuration parameter indicates + * whether to divide each node's CPU load by the number of processors on that node + * (default is {@code true}). + * <p> + * Also note that in some environments every processor may not be adding 100% of + * processing power. For example, if you are using multi-core CPU's, then addition of + * every core would probably result in about 75% of extra CPU power. To account + * for that, you should set {@link #setProcessorCoefficient(double)} parameter to + * {@code 0.75} . + * <p> + * Below is an example of how CPU load probe would be configured in GridGain + * Spring configuration file: + * <pre name="code" class="xml"> + * <property name="loadBalancingSpi"> + * <bean class="org.gridgain.grid.spi.loadBalancing.adaptive.GridAdaptiveLoadBalancingSpi"> + * <property name="loadProbe"> + * <bean class="org.gridgain.grid.spi.loadBalancing.adaptive.GridAdaptiveCpuLoadProbe"> + * <property name="useAverage" value="true"/> + * <property name="useProcessors" value="true"/> + * <property name="processorCoefficient" value="0.9"/> + * </bean> + * </property> + * </bean> + * </property> + * </pre> + * <p> + * This implementation is used by default by {@link AdaptiveLoadBalancingSpi} SPI. + */ +public class AdaptiveCpuLoadProbe implements AdaptiveLoadProbe { + /** Flag indicating whether to use average CPU load vs. current. */ + private boolean useAvg = true; + + /** + * Flag indicating whether to divide each node's CPU load + * by the number of processors on that node. + */ + private boolean useProcs = true; + + /** + * Coefficient of every CPU processor. By default it is {@code 1}, but + * in some environments every processor may not be adding 100% of processing + * power. For example, if you are using multi-core CPU's, then addition of + * every core would probably result in about 75% of extra CPU power, and hence + * you would set this coefficient to {@code 0.75} . + */ + private double procCoefficient = 1; + + /** + * Initializes CPU load probe to use CPU load average by default. + */ + public AdaptiveCpuLoadProbe() { + // No-op. + } + + /** + * Specifies whether to use average CPU load vs. current and whether or + * not to take number of processors into account. + * <p> + * Since CPU load on multi-processor boxes shows medium load of multiple CPU's it + * usually means that the remaining capacity is proportional to the number of + * CPU's (or cores) on the node. + * + * @param useAvg Flag indicating whether to use average CPU load vs. current + * (default is {@code true}). + * @param useProcs Flag indicating whether to divide each node's CPU load + * by the number of processors on that node (default is {@code true}). + */ + public AdaptiveCpuLoadProbe(boolean useAvg, boolean useProcs) { + this.useAvg = useAvg; + this.useProcs = useProcs; + } + + /** + * Specifies whether to use average CPU load vs. current and whether or + * not to take number of processors into account. It also allows to + * specify the coefficient of addition power every CPU adds. + * <p> + * Since CPU load on multi-processor boxes shows medium load of multiple CPU's it + * usually means that the remaining capacity is proportional to the number of + * CPU's (or cores) on the node. + * <p> + * Also, in some environments every processor may not be adding 100% of processing + * power. For example, if you are using multi-core CPU's, then addition of + * every core would probably result in about 75% of extra CPU power, and hence + * you would set this coefficient to {@code 0.75} . + * + * @param useAvg Flag indicating whether to use average CPU load vs. current + * (default is {@code true}). + * @param useProcs Flag indicating whether to divide each node's CPU load + * by the number of processors on that node (default is {@code true}). + * @param procCoefficient Coefficient of every CPU processor (default value is {@code 1}). + */ + public AdaptiveCpuLoadProbe(boolean useAvg, boolean useProcs, double procCoefficient) { + this.useAvg = useAvg; + this.useProcs = useProcs; + this.procCoefficient = procCoefficient; + } + + /** + * Gets flag indicating whether to use average CPU load vs. current. + * + * @return Flag indicating whether to use average CPU load vs. current. + */ + public boolean isUseAverage() { + return useAvg; + } + + /** + * Sets flag indicating whether to use average CPU load vs. current. + * If not explicitly set, then default value is {@code true}. + * + * @param useAvg Flag indicating whether to use average CPU load vs. current. + */ + public void setUseAverage(boolean useAvg) { + this.useAvg = useAvg; + } + + /** + * Gets flag indicating whether to use average CPU load vs. current + * (default is {@code true}). + * <p> + * Since CPU load on multi-processor boxes shows medium load of multiple CPU's it + * usually means that the remaining capacity is proportional to the number of + * CPU's (or cores) on the node. + * + * @return Flag indicating whether to divide each node's CPU load + * by the number of processors on that node (default is {@code true}). + */ + public boolean isUseProcessors() { + return useProcs; + } + + /** + * Sets flag indicating whether to use average CPU load vs. current + * (default is {@code true}). + * <p> + * Since CPU load on multi-processor boxes shows medium load of multiple CPU's it + * usually means that the remaining capacity is proportional to the number of + * CPU's (or cores) on the node. + * <p> + * If not explicitly set, then default value is {@code true}. + * + * @param useProcs Flag indicating whether to divide each node's CPU load + * by the number of processors on that node (default is {@code true}). + */ + public void setUseProcessors(boolean useProcs) { + this.useProcs = useProcs; + } + + /** + * Gets coefficient of every CPU processor. By default it is {@code 1}, but + * in some environments every processor may not be adding 100% of processing + * power. For example, if you are using multi-core CPU's, then addition of + * every core would probably result in about 75% of extra CPU power, and hence + * you would set this coefficient to {@code 0.75} . + * <p> + * This value is ignored if {@link #isUseProcessors()} is set to {@code false}. + * + * @return Coefficient of every CPU processor. + */ + public double getProcessorCoefficient() { + return procCoefficient; + } + + /** + * Sets coefficient of every CPU processor. By default it is {@code 1}, but + * in some environments every processor may not be adding 100% of processing + * power. For example, if you are using multi-core CPU's, then addition of + * every core would probably result in about 75% of extra CPU power, and hence + * you would set this coefficient to {@code 0.75} . + * <p> + * This value is ignored if {@link #isUseProcessors()} is set to {@code false}. + * + * @param procCoefficient Coefficient of every CPU processor. + */ + public void setProcessorCoefficient(double procCoefficient) { + A.ensure(procCoefficient > 0, "procCoefficient > 0"); + + this.procCoefficient = procCoefficient; + } + + /** {@inheritDoc} */ + @Override public double getLoad(ClusterNode node, int jobsSentSinceLastUpdate) { + ClusterNodeMetrics metrics = node.metrics(); + + double k = 1.0d; + + if (useProcs) { + int procs = metrics.getTotalCpus(); + + if (procs > 1) + k = procs * procCoefficient; + } + + double load = (useAvg ? metrics.getAverageCpuLoad() : metrics.getCurrentCpuLoad()) / k; + + return load < 0 ? 0 : load; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(AdaptiveCpuLoadProbe.class, this); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b69a23cd/modules/core/src/main/java/org/apache/ignite/spi/loadbalancing/adaptive/AdaptiveJobCountLoadProbe.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/loadbalancing/adaptive/AdaptiveJobCountLoadProbe.java b/modules/core/src/main/java/org/apache/ignite/spi/loadbalancing/adaptive/AdaptiveJobCountLoadProbe.java new file mode 100644 index 0000000..d5c99b7 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/loadbalancing/adaptive/AdaptiveJobCountLoadProbe.java @@ -0,0 +1,96 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.apache.ignite.spi.loadbalancing.adaptive; + +import org.apache.ignite.cluster.*; +import org.gridgain.grid.util.typedef.internal.*; + +/** + * Implementation of node load probing based on active and waiting job count. + * Based on {@link #setUseAverage(boolean)} parameter, this implementation will + * either use average job count values or current (default is to use averages). + * <p> + * The load of a node is simply calculated by adding active and waiting job counts. + * <p> + * Below is an example of how CPU load probe would be configured in GridGain + * Spring configuration file: + * <pre name="code" class="xml"> + * <property name="loadBalancingSpi"> + * <bean class="org.gridgain.grid.spi.loadBalancing.adaptive.GridAdaptiveLoadBalancingSpi"> + * <property name="loadProbe"> + * <bean class="org.gridgain.grid.spi.loadBalancing.adaptive.GridAdaptiveJobCountLoadProbe"> + * <property name="useAverage" value="true"/> + * </bean> + * </property> + * </bean> + * </property> + * </pre> + */ +public class AdaptiveJobCountLoadProbe implements AdaptiveLoadProbe { + /** Flag indicating whether to use average CPU load vs. current. */ + private boolean useAvg = true; + + /** + * Initializes active job probe. + */ + public AdaptiveJobCountLoadProbe() { + // No-op. + } + + /** + * Creates new active job prove specifying whether to use average + * job counts vs. current. + * + * @param useAvg Flag indicating whether to use average job counts vs. current. + */ + public AdaptiveJobCountLoadProbe(boolean useAvg) { + this.useAvg = useAvg; + } + + /** + * Gets flag indicating whether to use average job counts vs. current. + * + * @return Flag indicating whether to use average job counts vs. current. + */ + public boolean isUseAverage() { + return useAvg; + } + + /** + * Sets flag indicating whether to use average job counts vs. current. + * + * @param useAvg Flag indicating whether to use average job counts vs. current. + */ + public void setUseAverage(boolean useAvg) { + this.useAvg = useAvg; + } + + + /** {@inheritDoc} */ + @Override public double getLoad(ClusterNode node, int jobsSentSinceLastUpdate) { + ClusterNodeMetrics metrics = node.metrics(); + + if (useAvg) { + double load = metrics.getAverageActiveJobs() + metrics.getAverageWaitingJobs(); + + if (load > 0) + return load; + } + + double load = metrics.getCurrentActiveJobs() + metrics.getCurrentWaitingJobs(); + + return load < 0 ? 0 : load; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(AdaptiveJobCountLoadProbe.class, this); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b69a23cd/modules/core/src/main/java/org/apache/ignite/spi/loadbalancing/adaptive/AdaptiveLoadBalancingSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/loadbalancing/adaptive/AdaptiveLoadBalancingSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/loadbalancing/adaptive/AdaptiveLoadBalancingSpi.java new file mode 100644 index 0000000..5bb4501 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/loadbalancing/adaptive/AdaptiveLoadBalancingSpi.java @@ -0,0 +1,581 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.apache.ignite.spi.loadbalancing.adaptive; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.events.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.resources.*; +import org.apache.ignite.spi.*; +import org.gridgain.grid.*; +import org.gridgain.grid.kernal.managers.eventstorage.*; +import org.apache.ignite.spi.loadbalancing.*; +import org.gridgain.grid.util.typedef.*; +import org.gridgain.grid.util.typedef.internal.*; +import org.jdk8.backport.*; +import org.jetbrains.annotations.*; + +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; +import java.util.concurrent.locks.*; + +import static org.apache.ignite.events.IgniteEventType.*; + +/** + * Load balancing SPI that adapts to overall node performance. It + * proportionally distributes more jobs to more performant nodes based + * on a pluggable and dynamic node load probing. + * <p> + * <h1 class="header">Adaptive Node Probe</h1> + * This SPI comes with pluggable algorithm to calculate a node load + * at any given point of time. The algorithm is defined by + * {@link AdaptiveLoadProbe} interface and user is + * free to provide custom implementations. By default + * {@link AdaptiveCpuLoadProbe} implementation is used + * which distributes jobs to nodes based on average CPU load + * on every node. + * <p> + * The following load probes are available with the product: + * <ul> + * <li>{@link AdaptiveCpuLoadProbe} - default</li> + * <li>{@link AdaptiveProcessingTimeLoadProbe}</li> + * <li>{@link AdaptiveJobCountLoadProbe}</li> + * </ul> + * Note that if {@link AdaptiveLoadProbe#getLoad(org.apache.ignite.cluster.ClusterNode, int)} returns a value of {@code 0}, + * then implementation will assume that load value is simply not available and + * will try to calculate an average of load values for other nodes. If such + * average cannot be obtained (all node load values are {@code 0}), then a value + * of {@code 1} will be used. + * <p> + * When working with node metrics, take into account that all averages are + * calculated over metrics history size defined by {@link org.apache.ignite.configuration.IgniteConfiguration#getMetricsExpireTime()} + * and {@link org.apache.ignite.configuration.IgniteConfiguration#getMetricsHistorySize()} grid configuration parameters. + * Generally the larger these configuration parameter values are, the more precise the metrics are. + * You should tune these values based on the level of accuracy needed vs. the additional memory + * that would be required for storing metrics. + * <p> + * You should also keep in mind that metrics for remote nodes are delayed (usually by the + * heartbeat frequency). So if it is acceptable in your environment, set the heartbeat frequency + * to be more inline with job execution time. Generally, the more often heartbeats between nodes + * are exchanged, the more precise the metrics are. However, you should keep in mind that if + * heartbeats are exchanged too often then it may create unnecessary traffic in the network. + * Heartbeats (or metrics update frequency) can be configured via underlying + * {@link org.apache.ignite.spi.discovery.DiscoverySpi} used in your grid. + * <p> + * Here is an example of how probing can be implemented to use + * number of active and waiting jobs as probing mechanism: + * <pre name="code" class="java"> + * public class FooBarLoadProbe implements GridAdaptiveLoadProbe { + * // Flag indicating whether to use average value or current. + * private int useAvg = true; + * + * public FooBarLoadProbe(boolean useAvg) { + * this.useAvg = useAvg; + * } + * + * // Calculate load based on number of active and waiting jobs. + * public double getLoad(GridNode node, int jobsSentSinceLastUpdate) { + * GridNodeMetrics metrics = node.getMetrics(); + * + * if (useAvg) { + * double load = metrics.getAverageActiveJobs() + metrics.getAverageWaitingJobs(); + * + * if (load > 0) { + * return load; + * } + * } + * + * return metrics.getCurrentActiveJobs() + metrics.getCurrentWaitingJobs(); + * } + * } + * </pre> + * <h1 class="header">Which Node Probe To Use</h1> + * There is no correct answer here. Every single node probe will work better or worse in + * different environments. CPU load probe (default option) is the safest approach to start + * with as it simply attempts to utilize every CPU on the grid to the maximum. However, you should + * experiment with other probes by executing load tests in your environment and observing + * which probe gives you best performance and load balancing. + * <p> + * <h1 class="header">Task Coding Example</h1> + * If you are using {@link org.apache.ignite.compute.ComputeTaskSplitAdapter} then load balancing logic + * is transparent to your code and is handled automatically by the adapter. + * Here is an example of how your task will look: + * <pre name="code" class="java"> + * public class MyFooBarTask extends GridComputeTaskSplitAdapter<Object, Object> { + * @Override + * protected Collection<? extends GridComputeJob> split(int gridSize, Object arg) throws GridException { + * List<MyFooBarJob> jobs = new ArrayList<MyFooBarJob>(gridSize); + * + * for (int i = 0; i < gridSize; i++) { + * jobs.add(new MyFooBarJob(arg)); + * } + * + * // Node assignment via load balancer + * // happens automatically. + * return jobs; + * } + * ... + * } + * </pre> + * If you need more fine-grained control over how some jobs within task get mapped to a node + * and use affinity load balancing for some other jobs within task, then you should use + * {@link org.apache.ignite.compute.ComputeTaskAdapter}. Here is an example of how your task will look. Note that in this + * case we manually inject load balancer and use it to pick the best node. Doing it in + * such way would allow user to map some jobs manually and for others use load balancer. + * <pre name="code" class="java"> + * public class MyFooBarTask extends GridComputeTaskAdapter<String, String> { + * // Inject load balancer. + * @GridLoadBalancerResource + * GridComputeLoadBalancer balancer; + * + * // Map jobs to grid nodes. + * public Map<? extends GridComputeJob, GridNode> map(List<GridNode> subgrid, String arg) throws GridException { + * Map<MyFooBarJob, GridNode> jobs = new HashMap<MyFooBarJob, GridNode>(subgrid.size()); + * + * // In more complex cases, you can actually do + * // more complicated assignments of jobs to nodes. + * for (int i = 0; i < subgrid.size(); i++) { + * // Pick the next best balanced node for the job. + * jobs.put(new MyFooBarJob(arg), balancer.getBalancedNode()) + * } + * + * return jobs; + * } + * + * // Aggregate results into one compound result. + * public String reduce(List<GridComputeJobResult> results) throws GridException { + * // For the purpose of this example we simply + * // concatenate string representation of every + * // job result + * StringBuilder buf = new StringBuilder(); + * + * for (GridComputeJobResult res : results) { + * // Append string representation of result + * // returned by every job. + * buf.append(res.getData().string()); + * } + * + * return buf.string(); + * } + * } + * </pre> + * <p> + * <h1 class="header">Configuration</h1> + * In order to use this load balancer, you should configure your grid instance + * to use {@code GridJobsLoadBalancingSpi} either from Spring XML file or + * directly. The following configuration parameters are supported: + * <h2 class="header">Mandatory</h2> + * This SPI has no mandatory configuration parameters. + * <h2 class="header">Optional</h2> + * This SPI has the following optional configuration parameters: + * <ul> + * <li> + * Adaptive node load probing implementation (see {@link #setLoadProbe(AdaptiveLoadProbe)}). + * This configuration parameter supplies a custom algorithm for probing a node's load. + * By default, {@link AdaptiveCpuLoadProbe} implementation is used which + * takes every node's CPU load and tries to send proportionally more jobs to less loaded nodes. + * </li> + * </ul> + * <p> + * Below is Java configuration example: + * <pre name="code" class="java"> + * GridAdaptiveLoadBalancingSpi spi = new GridAdaptiveLoadBalancingSpi(); + * + * // Configure probe to use latest job execution time vs. average. + * GridAdaptiveProcessingTimeLoadProbe probe = new GridAdaptiveProcessingTimeLoadProbe(false); + * + * spi.setLoadProbe(probe); + * + * GridConfiguration cfg = new GridConfiguration(); + * + * // Override default load balancing SPI. + * cfg.setLoadBalancingSpi(spi); + * + * // Starts grid. + * G.start(cfg); + * </pre> + * Here is how you can configure {@code GridJobsLoadBalancingSpi} using Spring XML configuration: + * <pre name="code" class="xml"> + * <property name="loadBalancingSpi"> + * <bean class="org.gridgain.grid.spi.loadBalancing.adaptive.GridAdaptiveLoadBalancingSpi"> + * <property name="loadProbe"> + * <bean class="org.gridgain.grid.spi.loadBalancing.adaptive.GridAdaptiveProcessingTimeLoadProbe"> + * <constructor-arg value="false"/> + * </bean> + * </property> + * </bean> + * </property> + * </pre> + * <p> + * <img src="http://www.gridgain.com/images/spring-small.png"> + * <br> + * For information about Spring framework visit <a href="http://www.springframework.org/">www.springframework.org</a> + */ +@IgniteSpiMultipleInstancesSupport(true) +public class AdaptiveLoadBalancingSpi extends IgniteSpiAdapter implements LoadBalancingSpi, + AdaptiveLoadBalancingSpiMBean { + /** Random number generator. */ + private static final Random RAND = new Random(); + + /** Grid logger. */ + @IgniteLoggerResource + private IgniteLogger log; + + /** */ + private AdaptiveLoadProbe probe = new AdaptiveCpuLoadProbe(); + + /** Local event listener to listen to task completion events. */ + private GridLocalEventListener evtLsnr; + + /** Task topologies. First pair value indicates whether or not jobs have been mapped. */ + private ConcurrentMap<IgniteUuid, IgniteBiTuple<Boolean, WeightedTopology>> taskTops = + new ConcurrentHashMap8<>(); + + /** */ + private final Map<UUID, AtomicInteger> nodeJobs = new HashMap<>(); + + /** */ + private final ReadWriteLock rwLock = new ReentrantReadWriteLock(); + + /** {@inheritDoc} */ + @Override public String getLoadProbeFormatted() { + return probe.toString(); + } + + /** + * Sets implementation of node load probe. By default {@link AdaptiveProcessingTimeLoadProbe} + * is used which proportionally distributes load based on the average job execution + * time on every node. + * + * @param probe Implementation of node load probe + */ + @IgniteSpiConfiguration(optional = true) + public void setLoadProbe(AdaptiveLoadProbe probe) { + A.ensure(probe != null, "probe != null"); + + this.probe = probe; + } + + /** {@inheritDoc} */ + @Override public void spiStart(@Nullable String gridName) throws IgniteSpiException { + startStopwatch(); + + assertParameter(probe != null, "loadProbe != null"); + + if (log.isDebugEnabled()) + log.debug(configInfo("loadProbe", probe)); + + registerMBean(gridName, this, AdaptiveLoadBalancingSpiMBean.class); + + // Ack ok start. + if (log.isDebugEnabled()) + log.debug(startInfo()); + } + + /** {@inheritDoc} */ + @Override public void spiStop() throws IgniteSpiException { + rwLock.writeLock().lock(); + + try { + nodeJobs.clear(); + } + finally { + rwLock.writeLock().unlock(); + } + + unregisterMBean(); + + // Ack ok stop. + if (log.isDebugEnabled()) + log.debug(stopInfo()); + } + + /** {@inheritDoc} */ + @Override protected void onContextInitialized0(IgniteSpiContext spiCtx) throws IgniteSpiException { + getSpiContext().addLocalEventListener(evtLsnr = new GridLocalEventListener() { + @Override public void onEvent(IgniteEvent evt) { + switch (evt.type()) { + case EVT_TASK_FINISHED: + case EVT_TASK_FAILED: { + IgniteTaskEvent taskEvt = (IgniteTaskEvent)evt; + + taskTops.remove(taskEvt.taskSessionId()); + + if (log.isDebugEnabled()) + log.debug("Removed task topology from topology cache for session: " + + taskEvt.taskSessionId()); + + break; + } + + case EVT_JOB_MAPPED: { + // We should keep topology and use cache in GridComputeTask#map() method to + // avoid O(n*n/2) complexity, after that we can drop caches. + // Here we set mapped property and later cache will be ignored + IgniteJobEvent jobEvt = (IgniteJobEvent)evt; + + IgniteBiTuple<Boolean, WeightedTopology> weightedTop = taskTops.get(jobEvt.taskSessionId()); + + if (weightedTop != null) + weightedTop.set1(true); + + if (log.isDebugEnabled()) + log.debug("Job has been mapped. Ignore cache for session: " + jobEvt.taskSessionId()); + + break; + } + + case EVT_NODE_METRICS_UPDATED: + case EVT_NODE_FAILED: + case EVT_NODE_JOINED: + case EVT_NODE_LEFT: { + IgniteDiscoveryEvent discoEvt = (IgniteDiscoveryEvent)evt; + + rwLock.writeLock().lock(); + + try { + switch (evt.type()) { + case EVT_NODE_JOINED: { + nodeJobs.put(discoEvt.eventNode().id(), new AtomicInteger(0)); + + break; + } + + case EVT_NODE_LEFT: + case EVT_NODE_FAILED: { + nodeJobs.remove(discoEvt.eventNode().id()); + + break; + } + + case EVT_NODE_METRICS_UPDATED: { + // Reset counter. + nodeJobs.put(discoEvt.eventNode().id(), new AtomicInteger(0)); + + break; + } + } + } + finally { + rwLock.writeLock().unlock(); + } + } + + } + } + }, + EVT_NODE_METRICS_UPDATED, + EVT_NODE_FAILED, + EVT_NODE_JOINED, + EVT_NODE_LEFT, + EVT_TASK_FINISHED, + EVT_TASK_FAILED, + EVT_JOB_MAPPED + ); + + // Put all known nodes. + rwLock.writeLock().lock(); + + try { + for (ClusterNode node : getSpiContext().nodes()) + nodeJobs.put(node.id(), new AtomicInteger(0)); + } + finally { + rwLock.writeLock().unlock(); + } + } + + /** {@inheritDoc} */ + @Override protected void onContextDestroyed0() { + if (evtLsnr != null) { + IgniteSpiContext ctx = getSpiContext(); + + if (ctx != null) + ctx.removeLocalEventListener(evtLsnr); + } + } + + /** {@inheritDoc} */ + @Override public ClusterNode getBalancedNode(ComputeTaskSession ses, List<ClusterNode> top, ComputeJob job) + throws GridException { + A.notNull(ses, "ses"); + A.notNull(top, "top"); + A.notNull(job, "job"); + + IgniteBiTuple<Boolean, WeightedTopology> weightedTop = taskTops.get(ses.getId()); + + // Create new cached topology if there is no one. Do not + // use cached topology after task has been mapped. + if (weightedTop == null) + // Called from GridComputeTask#map(). Put new topology and false as not mapped yet. + taskTops.put(ses.getId(), weightedTop = F.t(false, new WeightedTopology(top))); + // We have topology - check if task has been mapped. + else if (weightedTop.get1()) + // Do not use cache after GridComputeTask#map(). + return new WeightedTopology(top).pickWeightedNode(); + + return weightedTop.get2().pickWeightedNode(); + } + + /** + * Calculates node load based on set probe. + * + * @param top List of all nodes. + * @param node Node to get load for. + * @return Node load. + * @throws GridException If returned load is negative. + */ + @SuppressWarnings({"TooBroadScope"}) + private double getLoad(Collection<ClusterNode> top, ClusterNode node) throws GridException { + assert !F.isEmpty(top); + + int jobsSentSinceLastUpdate = 0; + + rwLock.readLock().lock(); + + try { + AtomicInteger cnt = nodeJobs.get(node.id()); + + jobsSentSinceLastUpdate = cnt == null ? 0 : cnt.get(); + } + finally { + rwLock.readLock().unlock(); + } + + double load = probe.getLoad(node, jobsSentSinceLastUpdate); + + if (load < 0) + throw new GridException("Failed to obtain non-negative load from adaptive load probe: " + load); + + return load; + } + + /** + * Holder for weighted topology. + */ + private class WeightedTopology { + /** Topology sorted by weight. */ + private final SortedMap<Double, ClusterNode> circle = new TreeMap<>(); + + /** + * @param top Task topology. + * @throws GridException If any load was negative. + */ + WeightedTopology(List<ClusterNode> top) throws GridException { + assert !F.isEmpty(top); + + double totalLoad = 0; + + // We need to cache loads here to avoid calls later as load might be + // changed between the calls. + double[] nums = new double[top.size()]; + + int zeroCnt = 0; + + // Compute loads. + for (int i = 0; i < top.size(); i++) { + double load = getLoad(top, top.get(i)); + + nums[i] = load; + + if (load == 0) + zeroCnt++; + + totalLoad += load; + } + + // Take care of zero loads. + if (zeroCnt > 0) { + double newTotal = totalLoad; + + int nonZeroCnt = top.size() - zeroCnt; + + for (int i = 0; i < nums.length; i++) { + double load = nums[i]; + + if (load == 0) { + if (nonZeroCnt > 0) + load = totalLoad / nonZeroCnt; + + if (load == 0) + load = 1; + + nums[i] = load; + + newTotal += load; + } + } + + totalLoad = newTotal; + } + + double totalWeight = 0; + + // Calculate weights and total weight. + for (int i = 0; i < nums.length; i++) { + assert nums[i] > 0 : "Invalid load: " + nums[i]; + + double weight = totalLoad / nums[i]; + + // Convert to weight. + nums[i] = weight; + + totalWeight += weight; + } + + double weight = 0; + + // Enforce range from 0 to 1. + for (int i = 0; i < nums.length; i++) { + weight = i == nums.length - 1 ? 1.0d : weight + nums[i] / totalWeight; + + assert weight < 2 : "Invalid weight: " + weight; + + // Complexity of this put is O(logN). + circle.put(weight, top.get(i)); + } + } + + /** + * Gets weighted node in random fashion. + * + * @return Weighted node. + */ + ClusterNode pickWeightedNode() { + double weight = RAND.nextDouble(); + + SortedMap<Double, ClusterNode> pick = circle.tailMap(weight); + + ClusterNode node = pick.get(pick.firstKey()); + + rwLock.readLock().lock(); + + try { + AtomicInteger cnt = nodeJobs.get(node.id()); + + if (cnt != null) + cnt.incrementAndGet(); + } + finally { + rwLock.readLock().unlock(); + } + + return node; + } + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(AdaptiveLoadBalancingSpi.class, this); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b69a23cd/modules/core/src/main/java/org/apache/ignite/spi/loadbalancing/adaptive/AdaptiveLoadBalancingSpiMBean.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/loadbalancing/adaptive/AdaptiveLoadBalancingSpiMBean.java b/modules/core/src/main/java/org/apache/ignite/spi/loadbalancing/adaptive/AdaptiveLoadBalancingSpiMBean.java new file mode 100644 index 0000000..14b4ed8 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/loadbalancing/adaptive/AdaptiveLoadBalancingSpiMBean.java @@ -0,0 +1,27 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.apache.ignite.spi.loadbalancing.adaptive; + +import org.apache.ignite.mbean.*; +import org.apache.ignite.spi.*; + +/** + * Management MBean for {@link AdaptiveLoadBalancingSpi} SPI. + */ +@IgniteMBeanDescription("MBean that provides access to adaptive load balancing SPI configuration.") +public interface AdaptiveLoadBalancingSpiMBean extends IgniteSpiManagementMBean { + /** + * Gets text description of current load probing implementation used. + * + * @return Text description of current load probing implementation used. + */ + @IgniteMBeanDescription("Text description of current load probing implementation used.") + public String getLoadProbeFormatted(); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b69a23cd/modules/core/src/main/java/org/apache/ignite/spi/loadbalancing/adaptive/AdaptiveLoadProbe.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/loadbalancing/adaptive/AdaptiveLoadProbe.java b/modules/core/src/main/java/org/apache/ignite/spi/loadbalancing/adaptive/AdaptiveLoadProbe.java new file mode 100644 index 0000000..8027281 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/loadbalancing/adaptive/AdaptiveLoadProbe.java @@ -0,0 +1,90 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.apache.ignite.spi.loadbalancing.adaptive; + +import org.apache.ignite.cluster.*; + +/** + * Pluggable implementation of node load probing. Implementations + * of this can be configured to be used with {@link AdaptiveLoadBalancingSpi} + * by setting {@link AdaptiveLoadBalancingSpi#setLoadProbe(AdaptiveLoadProbe)} + * configuration parameter. + * <p> + * Note that if {@link #getLoad(org.apache.ignite.cluster.ClusterNode, int)} returns a value of {@code 0}, + * then implementation will assume that load value is simply not available and + * will try to calculate an average of load values for other nodes. If such + * average cannot be obtained (all node load values are {@code 0}), then a value + * of {@code 1} will be used. + * <p> + * By default, {@link AdaptiveCpuLoadProbe} probing implementation is used. + * <p> + * <h1 class="header">Example</h1> + * Here is an example of how probing can be implemented to use + * number of active and waiting jobs as probing mechanism: + * <pre name="code" class="java"> + * public class FooBarLoadProbe implements GridAdaptiveLoadProbe { + * // Flag indicating whether to use average value or current. + * private int useAvg = true; + * + * public FooBarLoadProbe(boolean useAvg) { + * this.useAvg = useAvg; + * } + * + * // Calculate load based on number of active and waiting jobs. + * public double getLoad(GridNode node, int jobsSentSinceLastUpdate) { + * GridNodeMetrics metrics = node.getMetrics(); + * + * if (useAvg) { + * double load = metrics.getAverageActiveJobs() + metrics.getAverageWaitingJobs(); + * + * if (load > 0) { + * return load; + * } + * } + * + * return metrics.getCurrentActiveJobs() + metrics.getCurrentWaitingJobs(); + * } + * } + * </pre> + * Below is an example of how a probe shown above would be configured with {@link AdaptiveLoadBalancingSpi} + * SPI: + * <pre name="code" class="xml"> + * <property name="loadBalancingSpi"> + * <bean class="org.gridgain.grid.spi.loadBalancing.adaptive.GridAdaptiveLoadBalancingSpi"> + * <property name="loadProbe"> + * <bean class="foo.bar.FooBarLoadProbe"> + * <constructor-arg value="true"/> + * </bean> + * </property> + * </bean> + * </property> + * </pre> + */ +public interface AdaptiveLoadProbe { + /** + * Calculates load value for a given node. Specific implementations would + * usually take into account some of the values provided by + * {@link org.apache.ignite.cluster.ClusterNode#metrics()} method. For example, load can be calculated + * based on job execution time or number of active jobs, or CPU/Heap utilization. + * <p> + * Note that if this method returns a value of {@code 0}, + * then implementation will assume that load value is simply not available and + * will try to calculate an average of load values for other nodes. If such + * average cannot be obtained (all node load values are {@code 0}), then a value + * of {@code 1} will be used. + * + * @param node Grid node to calculate load for. + * @param jobsSentSinceLastUpdate Number of jobs sent to this node since + * last metrics update. This parameter may be useful when + * implementation takes into account the current job count on a node. + * @return Non-negative load value for the node (zero and above). + */ + public double getLoad(ClusterNode node, int jobsSentSinceLastUpdate); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b69a23cd/modules/core/src/main/java/org/apache/ignite/spi/loadbalancing/adaptive/AdaptiveProcessingTimeLoadProbe.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/loadbalancing/adaptive/AdaptiveProcessingTimeLoadProbe.java b/modules/core/src/main/java/org/apache/ignite/spi/loadbalancing/adaptive/AdaptiveProcessingTimeLoadProbe.java new file mode 100644 index 0000000..30474ad --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/loadbalancing/adaptive/AdaptiveProcessingTimeLoadProbe.java @@ -0,0 +1,98 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.apache.ignite.spi.loadbalancing.adaptive; + +import org.apache.ignite.cluster.*; +import org.gridgain.grid.util.typedef.internal.*; + +/** + * Implementation of node load probing based on total job processing time. + * Based on {@link #setUseAverage(boolean)} + * parameter, this implementation will either use average job execution + * time values or current (default is to use averages). The algorithm + * returns a sum of job wait time and job execution time. + * <p> + * Below is an example of how CPU load probe would be configured in GridGain + * Spring configuration file: + * <pre name="code" class="xml"> + * <property name="loadBalancingSpi"> + * <bean class="org.gridgain.grid.spi.loadBalancing.adaptive.GridAdaptiveLoadBalancingSpi"> + * <property name="loadProbe"> + * <bean class="org.gridgain.grid.spi.loadBalancing.adaptive.GridAdaptiveProcessingTimeLoadProbe"> + * <property name="useAverage" value="true"/> + * </bean> + * </property> + * </bean> + * </property> + * </pre> + */ +public class AdaptiveProcessingTimeLoadProbe implements AdaptiveLoadProbe { + /** Flag indicating whether to use average execution time vs. current. */ + private boolean useAvg = true; + + /** + * Initializes execution time load probe to use + * execution time average by default. + */ + public AdaptiveProcessingTimeLoadProbe() { + // No-op. + } + + /** + * Specifies whether to use average execution time vs. current. + * + * @param useAvg Flag indicating whether to use average execution time vs. current. + */ + public AdaptiveProcessingTimeLoadProbe(boolean useAvg) { + this.useAvg = useAvg; + } + + /** + * Gets flag indicating whether to use average execution time vs. current. + * + * @return Flag indicating whether to use average execution time vs. current. + */ + public boolean isUseAverage() { + return useAvg; + } + + /** + * Sets flag indicating whether to use average execution time vs. current. + * + * @param useAvg Flag indicating whether to use average execution time vs. current. + */ + public void setUseAverage(boolean useAvg) { + this.useAvg = useAvg; + } + + + /** {@inheritDoc} */ + @Override public double getLoad(ClusterNode node, int jobsSentSinceLastUpdate) { + ClusterNodeMetrics metrics = node.metrics(); + + if (useAvg) { + double load = metrics.getAverageJobExecuteTime() + metrics.getAverageJobWaitTime(); + + // If load is greater than 0, then we can use average times. + // Otherwise, we will proceed to using current times. + if (load > 0) + return load; + } + + double load = metrics.getCurrentJobExecuteTime() + metrics.getCurrentJobWaitTime(); + + return load < 0 ? 0 : load; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(AdaptiveProcessingTimeLoadProbe.class, this); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b69a23cd/modules/core/src/main/java/org/apache/ignite/spi/loadbalancing/adaptive/package.html ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/loadbalancing/adaptive/package.html b/modules/core/src/main/java/org/apache/ignite/spi/loadbalancing/adaptive/package.html new file mode 100644 index 0000000..ee3a5eb --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/loadbalancing/adaptive/package.html @@ -0,0 +1,15 @@ +<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> +<!-- + @html.file.header + _________ _____ __________________ _____ + __ ____/___________(_)______ /__ ____/______ ____(_)_______ + _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ +--> +<html> +<body> + <!-- Package description. --> + Contains adaptive load balancing SPI. +</body> +</html> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b69a23cd/modules/core/src/main/java/org/apache/ignite/spi/loadbalancing/package.html ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/loadbalancing/package.html b/modules/core/src/main/java/org/apache/ignite/spi/loadbalancing/package.html new file mode 100644 index 0000000..fd879b9 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/loadbalancing/package.html @@ -0,0 +1,15 @@ +<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> +<!-- + @html.file.header + _________ _____ __________________ _____ + __ ____/___________(_)______ /__ ____/______ ____(_)_______ + _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ +--> +<html> +<body> + <!-- Package description. --> + Contains APIs for load balancing SPI. +</body> +</html> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b69a23cd/modules/core/src/main/java/org/apache/ignite/spi/loadbalancing/roundrobin/RoundRobinGlobalLoadBalancer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/loadbalancing/roundrobin/RoundRobinGlobalLoadBalancer.java b/modules/core/src/main/java/org/apache/ignite/spi/loadbalancing/roundrobin/RoundRobinGlobalLoadBalancer.java new file mode 100644 index 0000000..e17231a --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/loadbalancing/roundrobin/RoundRobinGlobalLoadBalancer.java @@ -0,0 +1,305 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.apache.ignite.spi.loadbalancing.roundrobin; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.events.*; +import org.apache.ignite.spi.*; +import org.gridgain.grid.*; +import org.gridgain.grid.kernal.managers.eventstorage.*; +import org.gridgain.grid.util.typedef.*; +import org.gridgain.grid.util.typedef.internal.*; + +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.events.IgniteEventType.*; + +/** + * Load balancer that works in global (not-per-task) mode. + */ +class RoundRobinGlobalLoadBalancer { + /** SPI context. */ + private IgniteSpiContext ctx; + + /** Listener for node's events. */ + private GridLocalEventListener lsnr; + + /** Logger. */ + private final IgniteLogger log; + + /** Current snapshot of nodes which participated in load balancing. */ + private volatile GridNodeList nodeList = new GridNodeList(0, null); + + /** Mutex for updating current topology. */ + private final Object mux = new Object(); + + /** Barrier for separating initialization callback and load balancing routine. */ + private final CountDownLatch initLatch = new CountDownLatch(1); + + /** + * @param log Grid logger. + */ + RoundRobinGlobalLoadBalancer(IgniteLogger log) { + assert log != null; + + this.log = log; + } + + /** + * @param ctx Load balancing context. + */ + void onContextInitialized(final IgniteSpiContext ctx) { + this.ctx = ctx; + + ctx.addLocalEventListener( + lsnr = new GridLocalEventListener() { + @Override public void onEvent(IgniteEvent evt) { + assert evt instanceof IgniteDiscoveryEvent; + + UUID nodeId = ((IgniteDiscoveryEvent)evt).eventNode().id(); + + synchronized (mux) { + if (evt.type() == EVT_NODE_JOINED) { + List<UUID> oldNodes = nodeList.getNodes(); + + if (!oldNodes.contains(nodeId)) { + List<UUID> newNodes = new ArrayList<>(oldNodes.size() + 1); + + newNodes.add(nodeId); + + for (UUID node : oldNodes) + newNodes.add(node); + + nodeList = new GridNodeList(0, newNodes); + } + } + else { + assert evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED; + + List<UUID> oldNodes = nodeList.getNodes(); + + if (oldNodes.contains(nodeId)) { + List<UUID> newNodes = new ArrayList<>(oldNodes.size() - 1); + + for (UUID node : oldNodes) + if (!nodeId.equals(node)) + newNodes.add(node); + + nodeList = new GridNodeList(0, newNodes); + } + } + } + } + }, + EVT_NODE_FAILED, EVT_NODE_JOINED, EVT_NODE_LEFT + ); + + synchronized (mux) { + List<UUID> oldNodes = nodeList.getNodes(); + + Collection<UUID> set = oldNodes == null ? new HashSet<UUID>() : new HashSet<>(oldNodes); + + for (ClusterNode node : ctx.nodes()) + set.add(node.id()); + + nodeList = new GridNodeList(0, new ArrayList<>(set)); + } + + initLatch.countDown(); + } + + /** */ + void onContextDestroyed() { + if (ctx != null) + ctx.removeLocalEventListener(lsnr); + } + + /** + * Gets balanced node for given topology. + * + * @param top Topology to pick from. + * @return Best balanced node. + * @throws GridException Thrown in case of any error. + */ + ClusterNode getBalancedNode(Collection<ClusterNode> top) throws GridException { + assert !F.isEmpty(top); + + awaitInitializationCompleted(); + + Map<UUID, ClusterNode> topMap = null; + + ClusterNode found; + + int misses = 0; + + do { + GridNodeList nodeList = this.nodeList; + + List<UUID> nodes = nodeList.getNodes(); + + int cycleSize = nodes.size(); + + if (cycleSize == 0) + throw new GridException("Task topology does not have any alive nodes."); + + AtomicInteger idx; + + int curIdx, nextIdx; + + do { + idx = nodeList.getCurrentIdx(); + + curIdx = idx.get(); + + nextIdx = (idx.get() + 1) % cycleSize; + } + while (!idx.compareAndSet(curIdx, nextIdx)); + + found = findNodeById(top, nodes.get(nextIdx)); + + if (found == null) { + misses++; + + // For optimization purposes checks balancer can return at least one node with specified + // request topology only after full cycle (approximately). + if (misses >= cycleSize) { + if (topMap == null) { + topMap = U.newHashMap(top.size()); + + for (ClusterNode node : top) + topMap.put(node.id(), node); + } + + checkBalancerNodes(top, topMap, nodes); + + // Zero miss counter so next topology check will be performed once again after full cycle. + misses = 0; + } + } + } + while (found == null); + + if (log.isDebugEnabled()) + log.debug("Found round-robin node: " + found); + + return found; + } + + /** + * Finds node by id. Returns null in case of absence of specified id in request topology. + * + * @param top Topology for current request. + * @param foundNodeId Node id. + * @return Found node or null in case of absence of specified id in request topology. + */ + private static ClusterNode findNodeById(Iterable<ClusterNode> top, UUID foundNodeId) { + for (ClusterNode node : top) + if (foundNodeId.equals(node.id())) + return node; + + return null; + } + + /** + * Checks if balancer can return at least one node, + * throw exception otherwise. + * + * @param top Topology for current request. + * @param topMap Topology map. + * @param nodes Current balanced nodes. + * @throws GridException If balancer can not return any node. + */ + private static void checkBalancerNodes(Collection<ClusterNode> top, Map<UUID, ClusterNode> topMap, Iterable<UUID> nodes) + throws GridException { + + boolean contains = false; + + for (UUID nodeId : nodes) { + if (topMap.get(nodeId) != null) { + contains = true; + + break; + } + } + + if (!contains) + throw new GridException("Task topology does not have alive nodes: " + top); + } + + /** + * Awaits initialization of balancing nodes to be completed. + * + * @throws GridException Thrown in case of thread interruption. + */ + private void awaitInitializationCompleted() throws GridException { + try { + if (initLatch.getCount() > 0) + initLatch.await(); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + + throw new GridException("Global balancer was interrupted.", e); + } + } + + /** + * Snapshot of nodes which participated in load balancing. + */ + private static final class GridNodeList { + /** Cyclic pointer for selecting next node. */ + private final AtomicInteger curIdx; + + /** Node ids. */ + private final List<UUID> nodes; + + /** + * @param curIdx Initial index of current node. + * @param nodes Initial node ids. + */ + private GridNodeList(int curIdx, List<UUID> nodes) { + this.curIdx = new AtomicInteger(curIdx); + this.nodes = nodes; + } + + /** + * @return Index of current node. + */ + private AtomicInteger getCurrentIdx() { + return curIdx; + } + + /** + * @return Node ids. + */ + private List<UUID> getNodes() { + return nodes; + } + } + + /** + * THIS METHOD IS USED ONLY FOR TESTING. + * + * @return Internal list of nodes. + */ + List<UUID> getNodeIds() { + List<UUID> nodes = nodeList.getNodes(); + + return Collections.unmodifiableList(nodes); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(RoundRobinGlobalLoadBalancer.class, this); + } +}