http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b69a23cd/modules/core/src/main/java/org/apache/ignite/spi/loadbalancing/roundrobin/RoundRobinLoadBalancingSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/loadbalancing/roundrobin/RoundRobinLoadBalancingSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/loadbalancing/roundrobin/RoundRobinLoadBalancingSpi.java new file mode 100644 index 0000000..bb49cc0 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/loadbalancing/roundrobin/RoundRobinLoadBalancingSpi.java @@ -0,0 +1,319 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.apache.ignite.spi.loadbalancing.roundrobin; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.events.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.resources.*; +import org.apache.ignite.spi.*; +import org.gridgain.grid.*; +import org.gridgain.grid.kernal.managers.eventstorage.*; +import org.apache.ignite.spi.loadbalancing.*; +import org.gridgain.grid.util.typedef.internal.*; +import org.jdk8.backport.*; +import org.jetbrains.annotations.*; + +import java.util.*; + +import static org.apache.ignite.events.IgniteEventType.*; + +/** + * This SPI iterates through nodes in round-robin fashion and pick the next + * sequential node. Two modes of operation are supported: per-task and global + * (see {@link #setPerTask(boolean)} configuration). + * <p> + * When configured in per-task mode, implementation will pick a random starting + * node at the beginning of every task execution and then sequentially iterate through all + * nodes in topology starting from the picked node. This is the default configuration + * and should fit most of the use cases as it provides a fairly well-distributed + * split and also ensures that jobs within a single task are spread out across + * nodes to the maximum. For cases when split size is equal to the number of nodes, + * this mode guarantees that all nodes will participate in the split. + * <p> + * When configured in global mode, a single sequential queue of nodes is maintained for + * all tasks and the next node in the queue is picked every time. In this mode (unlike in + * {@code per-task} mode) it is possible that even if split size may be equal to the + * number of nodes, some jobs within the same task will be assigned to the same node if + * multiple tasks are executing concurrently. + * <h1 class="header">Coding Example</h1> + * If you are using {@link org.apache.ignite.compute.ComputeTaskSplitAdapter} then load balancing logic + * is transparent to your code and is handled automatically by the adapter. + * Here is an example of how your task will look: + * <pre name="code" class="java"> + * public class MyFooBarTask extends GridComputeTaskSplitAdapter<Object, Object> { + * @Override + * protected Collection<? extends GridComputeJob> split(int gridSize, Object arg) throws GridException { + * List<MyFooBarJob> jobs = new ArrayList<MyFooBarJob>(gridSize); + * + * for (int i = 0; i < gridSize; i++) { + * jobs.add(new MyFooBarJob(arg)); + * } + * + * // Node assignment via load balancer + * // happens automatically. + * return jobs; + * } + * ... + * } + * </pre> + * If you need more fine-grained control over how some jobs within task get mapped to a node + * and use affinity load balancing for some other jobs within task, then you should use + * {@link org.apache.ignite.compute.ComputeTaskAdapter}. Here is an example of how your task will look. Note that in this + * case we manually inject load balancer and use it to pick the best node. Doing it in + * such way would allow user to map some jobs manually and for others use load balancer. + * <pre name="code" class="java"> + * public class MyFooBarTask extends GridComputeTaskAdapter<String, String> { + * // Inject load balancer. + * @GridLoadBalancerResource + * GridComputeLoadBalancer balancer; + * + * // Map jobs to grid nodes. + * public Map<? extends GridComputeJob, GridNode> map(List<GridNode> subgrid, String arg) throws GridException { + * Map<MyFooBarJob, GridNode> jobs = new HashMap<MyFooBarJob, GridNode>(subgrid.size()); + * + * // In more complex cases, you can actually do + * // more complicated assignments of jobs to nodes. + * for (int i = 0; i < subgrid.size(); i++) { + * // Pick the next best balanced node for the job. + * jobs.put(new MyFooBarJob(arg), balancer.getBalancedNode()) + * } + * + * return jobs; + * } + * + * // Aggregate results into one compound result. + * public String reduce(List<GridComputeJobResult> results) throws GridException { + * // For the purpose of this example we simply + * // concatenate string representation of every + * // job result + * StringBuilder buf = new StringBuilder(); + * + * for (GridComputeJobResult res : results) { + * // Append string representation of result + * // returned by every job. + * buf.append(res.getData().string()); + * } + * + * return buf.string(); + * } + * } + * </pre> + * <p> + * <h1 class="header">Configuration</h1> + * In order to use this load balancer, you should configure your grid instance + * to use {@code GridRoundRobinLoadBalancingSpi} either from Spring XML file or + * directly. The following configuration parameters are supported: + * <h2 class="header">Mandatory</h2> + * This SPI has no mandatory configuration parameters. + * <h2 class="header">Optional</h2> + * The following configuration parameters are optional: + * <ul> + * <li> + * Flag that indicates whether to use {@code per-task} or global + * round-robin modes described above (see {@link #setPerTask(boolean)}). + * </li> + * </ul> + * Below is Java configuration example: + * <pre name="code" class="java"> + * GridRandomLoadBalancingSpi = new GridRandomLoadBalancingSpi(); + * + * // Configure SPI to use global round-robin mode. + * spi.setPerTask(false); + * + * GridConfiguration cfg = new GridConfiguration(); + * + * // Override default load balancing SPI. + * cfg.setLoadBalancingSpi(spi); + * + * // Starts grid. + * G.start(cfg); + * </pre> + * Here is how you can configure {@code GridRandomLoadBalancingSpi} using Spring XML configuration: + * <pre name="code" class="xml"> + * <property name="loadBalancingSpi"> + * <bean class="org.gridgain.grid.spi.loadBalancing.roundrobin.GridRoundRobinLoadBalancingSpi"> + * <!-- Set to global round-robin mode. --> + * <property name="perTask" value="false"/> + * </bean> + * </property> + * </pre> + * <p> + * <img src="http://www.gridgain.com/images/spring-small.png"> + * <br> + * For information about Spring framework visit <a href="http://www.springframework.org/">www.springframework.org</a> + */ +@IgniteSpiMultipleInstancesSupport(true) +public class RoundRobinLoadBalancingSpi extends IgniteSpiAdapter implements LoadBalancingSpi, + RoundRobinLoadBalancingSpiMBean { + /** Grid logger. */ + @IgniteLoggerResource + private IgniteLogger log; + + /** */ + private RoundRobinGlobalLoadBalancer balancer; + + /** */ + private boolean isPerTask; + + /** */ + private final Map<IgniteUuid, RoundRobinPerTaskLoadBalancer> perTaskBalancers = + new ConcurrentHashMap8<>(); + + /** Event listener. */ + private final GridLocalEventListener lsnr = new GridLocalEventListener() { + @Override public void onEvent(IgniteEvent evt) { + if (evt.type() == EVT_TASK_FAILED || + evt.type() == EVT_TASK_FINISHED) + perTaskBalancers.remove(((IgniteTaskEvent)evt).taskSessionId()); + else if (evt.type() == EVT_JOB_MAPPED) { + RoundRobinPerTaskLoadBalancer balancer = + perTaskBalancers.get(((IgniteJobEvent)evt).taskSessionId()); + + if (balancer != null) + balancer.onMapped(); + } + } + }; + + /** {@inheritDoc} */ + @Override public boolean isPerTask() { + return isPerTask; + } + + /** + * Configuration parameter indicating whether a new round robin order should be + * created for every task. If {@code true} then load balancer is guaranteed + * to iterate through nodes sequentially for every task - so as long as number + * of jobs is less than or equal to the number of nodes, jobs are guaranteed to + * be assigned to unique nodes. If {@code false} then one round-robin order + * will be maintained for all tasks, so when tasks execute concurrently, it + * is possible for more than one job within task to be assigned to the same + * node. + * <p> + * Default is {@code false}. + * + * @param isPerTask Configuration parameter indicating whether a new round robin order should + * be created for every task. Default is {@code false}. + */ + @IgniteSpiConfiguration(optional = true) + public void setPerTask(boolean isPerTask) { + this.isPerTask = isPerTask; + } + + /** {@inheritDoc} */ + @Override public void spiStart(@Nullable String gridName) throws IgniteSpiException { + startStopwatch(); + + if (log.isDebugEnabled()) + log.debug(configInfo("isPerTask", isPerTask)); + + registerMBean(gridName, this, RoundRobinLoadBalancingSpiMBean.class); + + balancer = new RoundRobinGlobalLoadBalancer(log); + + // Ack ok start. + if (log.isDebugEnabled()) + log.debug(startInfo()); + } + + /** {@inheritDoc} */ + @Override public void spiStop() throws IgniteSpiException { + balancer = null; + + perTaskBalancers.clear(); + + unregisterMBean(); + + // Ack ok stop. + if (log.isDebugEnabled()) + log.debug(stopInfo()); + } + + /** {@inheritDoc} */ + @Override protected void onContextInitialized0(IgniteSpiContext spiCtx) throws IgniteSpiException { + if (!isPerTask) + balancer.onContextInitialized(spiCtx); + else { + if (!getSpiContext().isEventRecordable(EVT_TASK_FAILED, EVT_TASK_FINISHED, EVT_JOB_MAPPED)) + throw new IgniteSpiException("Required event types are disabled: " + + U.gridEventName(EVT_TASK_FAILED) + ", " + + U.gridEventName(EVT_TASK_FINISHED) + ", " + + U.gridEventName(EVT_JOB_MAPPED)); + + getSpiContext().addLocalEventListener(lsnr, EVT_TASK_FAILED, EVT_TASK_FINISHED, EVT_JOB_MAPPED); + } + } + + /** {@inheritDoc} */ + @Override protected void onContextDestroyed0() { + if (!isPerTask) { + if (balancer != null) + balancer.onContextDestroyed(); + } + else { + IgniteSpiContext spiCtx = getSpiContext(); + + if (spiCtx != null) + spiCtx.removeLocalEventListener(lsnr); + } + } + + /** {@inheritDoc} */ + @Override public ClusterNode getBalancedNode(ComputeTaskSession ses, List<ClusterNode> top, ComputeJob job) + throws GridException { + A.notNull(ses, "ses", top, "top"); + + if (isPerTask) { + // Note that every session operates from single thread which + // allows us to use concurrent map and avoid synchronization. + RoundRobinPerTaskLoadBalancer taskBalancer = perTaskBalancers.get(ses.getId()); + + if (taskBalancer == null) + perTaskBalancers.put(ses.getId(), taskBalancer = new RoundRobinPerTaskLoadBalancer()); + + return taskBalancer.getBalancedNode(top); + } + + return balancer.getBalancedNode(top); + } + + /** + * THIS METHOD IS USED ONLY FOR TESTING. + * + * @param ses Task session. + * @return Internal list of nodes. + */ + List<UUID> getNodeIds(ComputeTaskSession ses) { + if (isPerTask) { + RoundRobinPerTaskLoadBalancer balancer = perTaskBalancers.get(ses.getId()); + + if (balancer == null) + return Collections.emptyList(); + + List<UUID> ids = new ArrayList<>(); + + for (ClusterNode node : balancer.getNodes()) { + ids.add(node.id()); + } + + return ids; + } + + return balancer.getNodeIds(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(RoundRobinLoadBalancingSpi.class, this); + } +}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b69a23cd/modules/core/src/main/java/org/apache/ignite/spi/loadbalancing/roundrobin/RoundRobinLoadBalancingSpiMBean.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/loadbalancing/roundrobin/RoundRobinLoadBalancingSpiMBean.java b/modules/core/src/main/java/org/apache/ignite/spi/loadbalancing/roundrobin/RoundRobinLoadBalancingSpiMBean.java new file mode 100644 index 0000000..987f0b4 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/loadbalancing/roundrobin/RoundRobinLoadBalancingSpiMBean.java @@ -0,0 +1,37 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.apache.ignite.spi.loadbalancing.roundrobin; + +import org.apache.ignite.mbean.*; +import org.apache.ignite.spi.*; + +/** + * Management bean for {@link RoundRobinLoadBalancingSpi} SPI. + */ +@IgniteMBeanDescription("MBean that provides access to round robin load balancing SPI configuration.") +public interface RoundRobinLoadBalancingSpiMBean extends IgniteSpiManagementMBean { + /** + * Configuration parameter indicating whether a new round robin order should be + * created for every task. If {@code true} then load balancer is guaranteed + * to iterate through nodes sequentially for every task - so as long as number + * of jobs is less than or equal to the number of nodes, jobs are guaranteed to + * be assigned to unique nodes. If {@code false} then one round-robin order + * will be maintained for all tasks, so when tasks execute concurrently, it + * is possible for more than one job within task to be assigned to the same + * node. + * <p> + * Default is {@code true}. + * + * @return Configuration parameter indicating whether a new round robin order should + * be created for every task. Default is {@code true}. + */ + @IgniteMBeanDescription("Configuration parameter indicating whether a new round robin order should be created for every task.") + public boolean isPerTask(); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b69a23cd/modules/core/src/main/java/org/apache/ignite/spi/loadbalancing/roundrobin/RoundRobinPerTaskLoadBalancer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/loadbalancing/roundrobin/RoundRobinPerTaskLoadBalancer.java b/modules/core/src/main/java/org/apache/ignite/spi/loadbalancing/roundrobin/RoundRobinPerTaskLoadBalancer.java new file mode 100644 index 0000000..308256e --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/loadbalancing/roundrobin/RoundRobinPerTaskLoadBalancer.java @@ -0,0 +1,96 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.apache.ignite.spi.loadbalancing.roundrobin; + +import org.apache.ignite.cluster.*; + +import java.util.*; + +/** + * Load balancer for per-task configuration. + */ +class RoundRobinPerTaskLoadBalancer { + /** Balancing nodes. */ + private ArrayDeque<ClusterNode> nodeQueue; + + /** Jobs mapped flag. */ + private volatile boolean isMapped; + + /** Mutex. */ + private final Object mux = new Object(); + + /** + * Call back for job mapped event. + */ + void onMapped() { + isMapped = true; + } + + /** + * Gets balanced node for given topology. This implementation + * is to be used only from {@link org.apache.ignite.compute.ComputeTask#map(List, Object)} method + * and, therefore, does not need to be thread-safe. + * + * @param top Topology to pick from. + * @return Best balanced node. + */ + ClusterNode getBalancedNode(List<ClusterNode> top) { + assert top != null; + assert !top.isEmpty(); + + boolean readjust = isMapped; + + synchronized (mux) { + // Populate first time. + if (nodeQueue == null) + nodeQueue = new ArrayDeque<>(top); + + // If job has been mapped, then it means + // that it is most likely being failed over. + // In this case topology might have changed + // and we need to readjust with every apply. + if (readjust) + // Add missing nodes. + for (ClusterNode node : top) + if (!nodeQueue.contains(node)) + nodeQueue.offer(node); + + ClusterNode next = nodeQueue.poll(); + + // If jobs have been mapped, we need to make sure + // that queued node is still in topology. + if (readjust && next != null) { + while (!top.contains(next) && !nodeQueue.isEmpty()) + next = nodeQueue.poll(); + + // No nodes found and queue is empty. + if (next != null && !top.contains(next)) + return null; + } + + if (next != null) + // Add to the end. + nodeQueue.offer(next); + + return next; + } + } + + /** + * THIS METHOD IS USED ONLY FOR TESTING. + * + * @return Internal list of nodes. + */ + List<ClusterNode> getNodes() { + synchronized (mux) { + return Collections.unmodifiableList(new ArrayList<>(nodeQueue)); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b69a23cd/modules/core/src/main/java/org/apache/ignite/spi/loadbalancing/roundrobin/package.html ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/loadbalancing/roundrobin/package.html b/modules/core/src/main/java/org/apache/ignite/spi/loadbalancing/roundrobin/package.html new file mode 100644 index 0000000..9909144 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/loadbalancing/roundrobin/package.html @@ -0,0 +1,15 @@ +<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> +<!-- + @html.file.header + _________ _____ __________________ _____ + __ ____/___________(_)______ /__ ____/______ ____(_)_______ + _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ +--> +<html> +<body> + <!-- Package description. --> + Contains <b>default</b> round-robin implementation for load balancing SPI. +</body> +</html> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b69a23cd/modules/core/src/main/java/org/apache/ignite/spi/loadbalancing/weightedrandom/WeightedRandomLoadBalancingSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/loadbalancing/weightedrandom/WeightedRandomLoadBalancingSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/loadbalancing/weightedrandom/WeightedRandomLoadBalancingSpi.java new file mode 100644 index 0000000..6c4de36 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/loadbalancing/weightedrandom/WeightedRandomLoadBalancingSpi.java @@ -0,0 +1,394 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.apache.ignite.spi.loadbalancing.weightedrandom; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.events.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.resources.*; +import org.apache.ignite.spi.*; +import org.gridgain.grid.kernal.managers.eventstorage.*; +import org.apache.ignite.spi.loadbalancing.*; +import org.gridgain.grid.util.typedef.*; +import org.gridgain.grid.util.typedef.internal.*; +import org.jdk8.backport.*; +import org.jetbrains.annotations.*; + +import java.util.*; +import java.util.concurrent.*; + +import static org.apache.ignite.events.IgniteEventType.*; + +/** + * Load balancing SPI that picks a random node for job execution. Note that you can + * optionally assign weights to nodes, so nodes with larger weights will end up getting + * proportionally more jobs routed to them (see {@link #setNodeWeight(int)} + * configuration property). By default all nodes get equal weight defined by + * {@link #DFLT_NODE_WEIGHT} (value is {@code 10}). + * <h1 class="header">Coding Example</h1> + * If you are using {@link org.apache.ignite.compute.ComputeTaskSplitAdapter} then load balancing logic + * is transparent to your code and is handled automatically by the adapter. + * Here is an example of how your task could look: + * <pre name="code" class="java"> + * public class MyFooBarTask extends GridComputeTaskSplitAdapter<Object, Object> { + * @Override + * protected Collection<? extends GridComputeJob> split(int gridSize, Object arg) throws GridException { + * List<MyFooBarJob> jobs = new ArrayList<MyFooBarJob>(gridSize); + * + * for (int i = 0; i < gridSize; i++) { + * jobs.add(new MyFooBarJob(arg)); + * } + * + * // Node assignment via load balancer + * // happens automatically. + * return jobs; + * } + * ... + * } + * </pre> + * If you need more fine-grained control over how some jobs within task get mapped to a node + * and use affinity load balancing for some other jobs within task, then you should use + * {@link org.apache.ignite.compute.ComputeTaskAdapter}. Here is an example of how your task will look. Note that in this + * case we manually inject load balancer and use it to pick the best node. Doing it in + * such way would allow user to map some jobs manually and for others use load balancer. + * <pre name="code" class="java"> + * public class MyFooBarTask extends GridComputeTaskAdapter<String, String> { + * // Inject load balancer. + * @GridLoadBalancerResource + * GridComputeLoadBalancer balancer; + * + * // Map jobs to grid nodes. + * public Map<? extends GridComputeJob, GridNode> map(List<GridNode> subgrid, String arg) throws GridException { + * Map<MyFooBarJob, GridNode> jobs = new HashMap<MyFooBarJob, GridNode>(subgrid.size()); + * + * // In more complex cases, you can actually do + * // more complicated assignments of jobs to nodes. + * for (int i = 0; i < subgrid.size(); i++) { + * // Pick the next best balanced node for the job. + * jobs.put(new MyFooBarJob(arg), balancer.getBalancedNode()) + * } + * + * return jobs; + * } + * + * // Aggregate results into one compound result. + * public String reduce(List<GridComputeJobResult> results) throws GridException { + * // For the purpose of this example we simply + * // concatenate string representation of every + * // job result + * StringBuilder buf = new StringBuilder(); + * + * for (GridComputeJobResult res : results) { + * // Append string representation of result + * // returned by every job. + * buf.append(res.getData().string()); + * } + * + * return buf.string(); + * } + * } + * </pre> + * <p> + * <h1 class="header">Configuration</h1> + * In order to use this load balancer, you should configure your grid instance + * to use {@code GridRandomLoadBalancingSpi} either from Spring XML file or + * directly. The following configuration parameters are supported: + * <h2 class="header">Mandatory</h2> + * This SPI has no mandatory configuration parameters. + * <h2 class="header">Optional</h2> + * The following configuration parameters are optional: + * <ul> + * <li> + * Flag that indicates whether to use weight policy or simple random policy + * (see {@link #setUseWeights(boolean)}) + * </li> + * <li> + * Weight of this node (see {@link #setNodeWeight(int)}). This parameter is ignored + * if {@link #setUseWeights(boolean)} is set to {@code false}. + * </li> + * </ul> + * Below is Java configuration example: + * <pre name="code" class="java"> + * GridWeightedRandomLoadBalancingSpi = new GridWeightedLoadBalancingSpi(); + * + * // Configure SPI to used weighted + * // random load balancing. + * spi.setUseWeights(true); + * + * // Set weight for the local node. + * spi.setWeight( *); + * + * GridConfiguration cfg = new GridConfiguration(); + * + * // Override default load balancing SPI. + * cfg.setLoadBalancingSpi(spi); + * + * // Starts grid. + * G.start(cfg); + * </pre> + * Here is how you can configure {@code GridRandomLoadBalancingSpi} using Spring XML configuration: + * <pre name="code" class="xml"> + * <property name="loadBalancingSpi"> + * <bean class="org.gridgain.grid.spi.loadBalancing.weightedrandom.GridWeightedRandomLoadBalancingSpi"> + * <property name="useWeights" value="true"/> + * <property name="nodeWeight" value="10"/> + * </bean> + * </property> + * </pre> + * <p> + * <img src="http://www.gridgain.com/images/spring-small.png"> + * <br> + * For information about Spring framework visit <a href="http://www.springframework.org/">www.springframework.org</a> + */ +@IgniteSpiMultipleInstancesSupport(true) +@IgniteSpiConsistencyChecked(optional = true) +public class WeightedRandomLoadBalancingSpi extends IgniteSpiAdapter implements LoadBalancingSpi, + WeightedRandomLoadBalancingSpiMBean { + /** Random number generator. */ + private static final Random RAND = new Random(); + + /** + * Name of node attribute used to indicate load weight of a node + * (value is {@code "gridgain.node.weight.attr.name"}). + * + * @see org.apache.ignite.cluster.ClusterNode#attributes() + */ + public static final String NODE_WEIGHT_ATTR_NAME = "gridgain.node.weight.attr.name"; + + /** Default weight assigned to every node if explicit one is not provided (value is {@code 10}). */ + public static final int DFLT_NODE_WEIGHT = 10; + + /** Grid logger. */ + @IgniteLoggerResource + private IgniteLogger log; + + /** */ + private boolean isUseWeights; + + /** Local event listener to listen to task completion events. */ + private GridLocalEventListener evtLsnr; + + /** Weight of this node. */ + private int nodeWeight = DFLT_NODE_WEIGHT; + + /** Task topologies. First pair value indicates whether or not jobs have been mapped. */ + private ConcurrentMap<IgniteUuid, IgniteBiTuple<Boolean, WeightedTopology>> taskTops = + new ConcurrentHashMap8<>(); + + /** + * Sets a flag to indicate whether node weights should be checked when + * doing random load balancing. Default value is {@code false} which + * means that node weights are disregarded for load balancing logic. + * + * @param isUseWeights If {@code true} then random load is distributed according + * to node weights. + */ + @IgniteSpiConfiguration(optional = true) + public void setUseWeights(boolean isUseWeights) { + this.isUseWeights = isUseWeights; + } + + /** {@inheritDoc} */ + @Override public boolean isUseWeights() { + return isUseWeights; + } + + /** + * Sets weight of this node. Nodes with more processing capacity + * should be assigned proportionally larger weight. Default value + * is {@link #DFLT_NODE_WEIGHT} and is equal for all nodes. + * + * @param nodeWeight Weight of this node. + */ + @IgniteSpiConfiguration(optional = true) + public void setNodeWeight(int nodeWeight) { + this.nodeWeight = nodeWeight; + } + + /** {@inheritDoc} */ + @Override public int getNodeWeight() { + return nodeWeight; + } + + /** {@inheritDoc} */ + @Override public Map<String, Object> getNodeAttributes() throws IgniteSpiException { + return F.<String, Object>asMap(createSpiAttributeName(NODE_WEIGHT_ATTR_NAME), nodeWeight); + } + + /** {@inheritDoc} */ + @Override public void spiStart(@Nullable String gridName) throws IgniteSpiException { + startStopwatch(); + + assertParameter(nodeWeight > 0, "nodeWeight > 0"); + + if (log.isDebugEnabled()) { + log.debug(configInfo("isUseWeights", isUseWeights)); + log.debug(configInfo("nodeWeight", nodeWeight)); + } + + registerMBean(gridName, this, WeightedRandomLoadBalancingSpiMBean.class); + + // Ack ok start. + if (log.isDebugEnabled()) + log.debug(startInfo()); + } + + /** {@inheritDoc} */ + @Override public void spiStop() throws IgniteSpiException { + unregisterMBean(); + + // Ack ok stop. + if (log.isDebugEnabled()) + log.debug(stopInfo()); + } + + /** {@inheritDoc} */ + @Override protected void onContextInitialized0(IgniteSpiContext spiCtx) throws IgniteSpiException { + getSpiContext().addLocalEventListener(evtLsnr = new GridLocalEventListener() { + @Override public void onEvent(IgniteEvent evt) { + assert evt instanceof IgniteTaskEvent || evt instanceof IgniteJobEvent; + + if (evt.type() == EVT_TASK_FINISHED || + evt.type() == EVT_TASK_FAILED) { + IgniteUuid sesId = ((IgniteTaskEvent)evt).taskSessionId(); + + taskTops.remove(sesId); + + if (log.isDebugEnabled()) + log.debug("Removed task topology from topology cache for session: " + sesId); + } + // We should keep topology and use cache in GridComputeTask#map() method to + // avoid O(n*n/2) complexity, after that we can drop caches. + // Here we set mapped property and later cache will be ignored + else if (evt.type() == EVT_JOB_MAPPED) { + IgniteUuid sesId = ((IgniteJobEvent)evt).taskSessionId(); + + IgniteBiTuple<Boolean, WeightedTopology> weightedTop = taskTops.get(sesId); + + if (weightedTop != null) + weightedTop.set1(true); + + if (log.isDebugEnabled()) + log.debug("Job has been mapped. Ignore cache for session: " + sesId); + } + } + }, + EVT_TASK_FAILED, + EVT_TASK_FINISHED, + EVT_JOB_MAPPED + ); + } + + /** {@inheritDoc} */ + @Override protected void onContextDestroyed0() { + if (evtLsnr != null) { + IgniteSpiContext ctx = getSpiContext(); + + if (ctx != null) + ctx.removeLocalEventListener(evtLsnr); + } + } + + /** {@inheritDoc} */ + @Override public ClusterNode getBalancedNode(ComputeTaskSession ses, List<ClusterNode> top, ComputeJob job) { + A.notNull(ses, "ses"); + A.notNull(top, "top"); + A.notNull(job, "job"); + + // Optimization for non-weighted randomization. + if (!isUseWeights) + return top.get(RAND.nextInt(top.size())); + + IgniteBiTuple<Boolean, WeightedTopology> weightedTop = taskTops.get(ses.getId()); + + // Create new cached topology if there is no one. Do not + // use cached topology after task has been mapped. + if (weightedTop == null) { + // Called from GridComputeTask#map(). Put new topology and false as not mapped yet. + taskTops.put(ses.getId(), weightedTop = F.t(false, new WeightedTopology(top))); + } + // We have topology - check if task has been mapped. + else if (weightedTop.get1()) { + // Do not use cache after GridComputeTask#map(). + return new WeightedTopology(top).pickWeightedNode(); + } + + return weightedTop.get2().pickWeightedNode(); + } + + /** + * @param node Node to get weight for. + * @return Node weight + */ + private int getWeight(ClusterNode node) { + Integer weight = (Integer)node.attribute(createSpiAttributeName(NODE_WEIGHT_ATTR_NAME)); + + if (weight != null && weight == 0) + throw new IllegalStateException("Node weight cannot be zero: " + node); + + return weight == null ? DFLT_NODE_WEIGHT : weight; + } + + /** + * Holder for weighted topology. + */ + private class WeightedTopology { + /** Total topology weight. */ + private final int totalWeight; + + /** Topology sorted by weight. */ + private final SortedMap<Integer, ClusterNode> circle = new TreeMap<>(); + + /** + * @param top Topology. + */ + WeightedTopology(Collection<ClusterNode> top) { + assert !F.isEmpty(top); + + int totalWeight = 0; + + for (ClusterNode node : top) { + totalWeight += getWeight(node); + + // Complexity of this put is O(logN). + circle.put(totalWeight, node); + } + + this.totalWeight = totalWeight; + } + + /** + * Gets weighted node in random fashion. + * + * @return Weighted node. + */ + ClusterNode pickWeightedNode() { + int weight = RAND.nextInt(totalWeight) + 1; + + SortedMap<Integer, ClusterNode> pick = circle.tailMap(weight); + + assert !pick.isEmpty(); + + return pick.get(pick.firstKey()); + } + } + + /** {@inheritDoc} */ + @Override protected List<String> getConsistentAttributeNames() { + return Collections.singletonList(createSpiAttributeName(NODE_WEIGHT_ATTR_NAME)); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(WeightedRandomLoadBalancingSpi.class, this); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b69a23cd/modules/core/src/main/java/org/apache/ignite/spi/loadbalancing/weightedrandom/WeightedRandomLoadBalancingSpiMBean.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/loadbalancing/weightedrandom/WeightedRandomLoadBalancingSpiMBean.java b/modules/core/src/main/java/org/apache/ignite/spi/loadbalancing/weightedrandom/WeightedRandomLoadBalancingSpiMBean.java new file mode 100644 index 0000000..28ca25a --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/loadbalancing/weightedrandom/WeightedRandomLoadBalancingSpiMBean.java @@ -0,0 +1,37 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.apache.ignite.spi.loadbalancing.weightedrandom; + +import org.apache.ignite.mbean.*; +import org.apache.ignite.spi.*; + +/** + * Management MBean for {@link WeightedRandomLoadBalancingSpi} SPI. + */ +@IgniteMBeanDescription("MBean that provides access to weighted random load balancing SPI configuration.") +public interface WeightedRandomLoadBalancingSpiMBean extends IgniteSpiManagementMBean { + /** + * Checks whether node weights are considered when doing + * random load balancing. + * + * @return If {@code true} then random load is distributed according + * to node weights. + */ + @IgniteMBeanDescription("Whether node weights are considered when doing random load balancing.") + public boolean isUseWeights(); + + /** + * Gets weight of this node. + * + * @return Weight of this node. + */ + @IgniteMBeanDescription("Weight of this node.") + public int getNodeWeight(); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b69a23cd/modules/core/src/main/java/org/apache/ignite/spi/loadbalancing/weightedrandom/package.html ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/loadbalancing/weightedrandom/package.html b/modules/core/src/main/java/org/apache/ignite/spi/loadbalancing/weightedrandom/package.html new file mode 100644 index 0000000..2da3d3a --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/loadbalancing/weightedrandom/package.html @@ -0,0 +1,15 @@ +<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> +<!-- + @html.file.header + _________ _____ __________________ _____ + __ ____/___________(_)______ /__ ____/______ ____(_)_______ + _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ +--> +<html> +<body> + <!-- Package description. --> + Contains weighted random-base implementation for load balancing SPI. +</body> +</html> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b69a23cd/modules/core/src/main/java/org/gridgain/grid/cache/affinity/GridCacheAffinityKeyMapped.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/cache/affinity/GridCacheAffinityKeyMapped.java b/modules/core/src/main/java/org/gridgain/grid/cache/affinity/GridCacheAffinityKeyMapped.java index 245d39d..505a8f0 100644 --- a/modules/core/src/main/java/org/gridgain/grid/cache/affinity/GridCacheAffinityKeyMapped.java +++ b/modules/core/src/main/java/org/gridgain/grid/cache/affinity/GridCacheAffinityKeyMapped.java @@ -83,7 +83,7 @@ import java.util.concurrent.*; * {@link org.apache.ignite.compute.ComputeJob} or any other grid computation, such as {@link Runnable}, {@link Callable}, or * {@link org.apache.ignite.lang.IgniteClosure}. It should be attached to a method or field that provides affinity key * for the computation. Only one annotation per class is allowed. Whenever such annotation is detected, - * then {@link org.gridgain.grid.spi.loadbalancing.LoadBalancingSpi} will be bypassed, and computation will be routed to the grid node + * then {@link org.apache.ignite.spi.loadbalancing.LoadBalancingSpi} will be bypassed, and computation will be routed to the grid node * where the specified affinity key is cached. You can also use optional {@link GridCacheName @GridCacheName} * annotation whenever non-default cache name needs to be specified. * <p> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b69a23cd/modules/core/src/main/java/org/gridgain/grid/kernal/GridGainEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/GridGainEx.java b/modules/core/src/main/java/org/gridgain/grid/kernal/GridGainEx.java index fbfd071..454b9c7 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/GridGainEx.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/GridGainEx.java @@ -48,8 +48,8 @@ import org.apache.ignite.spi.eventstorage.*; import org.apache.ignite.spi.eventstorage.memory.*; import org.apache.ignite.spi.failover.*; import org.apache.ignite.spi.failover.always.*; -import org.gridgain.grid.spi.loadbalancing.*; -import org.gridgain.grid.spi.loadbalancing.roundrobin.*; +import org.apache.ignite.spi.loadbalancing.*; +import org.apache.ignite.spi.loadbalancing.roundrobin.*; import org.gridgain.grid.spi.securesession.*; import org.gridgain.grid.spi.securesession.noop.*; import org.gridgain.grid.spi.swapspace.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b69a23cd/modules/core/src/main/java/org/gridgain/grid/kernal/managers/loadbalancer/GridLoadBalancerManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/managers/loadbalancer/GridLoadBalancerManager.java b/modules/core/src/main/java/org/gridgain/grid/kernal/managers/loadbalancer/GridLoadBalancerManager.java index 3a89361..c4f5fed 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/managers/loadbalancer/GridLoadBalancerManager.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/managers/loadbalancer/GridLoadBalancerManager.java @@ -17,7 +17,7 @@ import org.gridgain.grid.cache.affinity.*; import org.gridgain.grid.kernal.*; import org.gridgain.grid.kernal.managers.*; import org.gridgain.grid.kernal.managers.deployment.*; -import org.gridgain.grid.spi.loadbalancing.*; +import org.apache.ignite.spi.loadbalancing.*; import org.gridgain.grid.util.typedef.*; import org.gridgain.grid.util.typedef.internal.*; import org.jetbrains.annotations.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b69a23cd/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/LoadBalancingSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/LoadBalancingSpi.java b/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/LoadBalancingSpi.java deleted file mode 100644 index 91095fa..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/LoadBalancingSpi.java +++ /dev/null @@ -1,114 +0,0 @@ -/* @java.file.header */ - -/* _________ _____ __________________ _____ - * __ ____/___________(_)______ /__ ____/______ ____(_)_______ - * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ - * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / - * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ - */ - -package org.gridgain.grid.spi.loadbalancing; - -import org.apache.ignite.cluster.*; -import org.apache.ignite.compute.*; -import org.apache.ignite.spi.*; -import org.gridgain.grid.*; - -import java.util.*; - -/** - * Load balancing SPI provides the next best balanced node for job - * execution. This SPI is used either implicitly or explicitly whenever - * a job gets mapped to a node during {@link org.apache.ignite.compute.ComputeTask#map(List, Object)} - * invocation. - * <h1 class="header">Coding Examples</h1> - * If you are using {@link org.apache.ignite.compute.ComputeTaskSplitAdapter} then load balancing logic - * is transparent to your code and is handled automatically by the adapter. - * Here is an example of how your task could look: - * <pre name="code" class="java"> - * public class MyFooBarTask extends GridComputeTaskSplitAdapter<Object,Object> { - * @Override - * protected Collection<? extends GridComputeJob> split(int gridSize, Object arg) throws GridException { - * List<MyFooBarJob> jobs = new ArrayList<MyFooBarJob>(gridSize); - * - * for (int i = 0; i < gridSize; i++) { - * jobs.add(new MyFooBarJob(arg)); - * } - * - * // Node assignment via load balancer - * // happens automatically. - * return jobs; - * } - * ... - * } - * </pre> - * If you need more fine-grained control over how some jobs within task get mapped to a node - * <i>and</i> use, for example, affinity load balancing for some other jobs within task, then you should use - * {@link org.apache.ignite.compute.ComputeTaskAdapter}. Here is an example of how your task could look. Note that in this - * case we manually inject load balancer and use it to pick the best node. Doing it in - * such way would allow user to map some jobs manually and for others use load balancer. - * <pre name="code" class="java"> - * public class MyFooBarTask extends GridComputeTaskAdapter<String,String> { - * // Inject load balancer. - * @GridLoadBalancerResource - * GridComputeLoadBalancer balancer; - * - * // Map jobs to grid nodes. - * public Map<? extends GridComputeJob, GridNode> map(List<GridNode> subgrid, String arg) throws GridException { - * Map<MyFooBarJob, GridNode> jobs = new HashMap<MyFooBarJob, GridNode>(subgrid.size()); - * - * // In more complex cases, you can actually do - * // more complicated assignments of jobs to nodes. - * for (int i = 0; i < subgrid.size(); i++) { - * // Pick the next best balanced node for the job. - * GridComputeJob myJob = new MyFooBarJob(arg); - * - * jobs.put(myJob, balancer.getBalancedNode(myJob, null)); - * } - * - * return jobs; - * } - * - * // Aggregate results into one compound result. - * public String reduce(List<GridComputeJobResult> results) throws GridException { - * // For the purpose of this example we simply - * // concatenate string representation of every - * // job result - * StringBuilder buf = new StringBuilder(); - * - * for (GridComputeJobResult res : results) { - * // Append string representation of result - * // returned by every job. - * buf.append(res.getData().toString()); - * } - * - * return buf.toString(); - * } - * } - * </pre> - * <p> - * GridGain comes with the following load balancing SPI implementations out of the box: - * <ul> - * <li>{@link org.gridgain.grid.spi.loadbalancing.roundrobin.RoundRobinLoadBalancingSpi} - default</li> - * <li>{@link org.gridgain.grid.spi.loadbalancing.adaptive.AdaptiveLoadBalancingSpi}</li> - * <li>{@link org.gridgain.grid.spi.loadbalancing.weightedrandom.WeightedRandomLoadBalancingSpi}</li> - * </ul> - * <b>NOTE:</b> this SPI (i.e. methods in this interface) should never be used directly. SPIs provide - * internal view on the subsystem and is used internally by GridGain kernal. In rare use cases when - * access to a specific implementation of this SPI is required - an instance of this SPI can be obtained - * via {@link org.apache.ignite.Ignite#configuration()} method to check its configuration properties or call other non-SPI - * methods. Note again that calling methods from this interface on the obtained instance can lead - * to undefined behavior and explicitly not supported. - */ -public interface LoadBalancingSpi extends IgniteSpi { - /** - * Gets balanced node for specified job within given task session. - * - * @param ses Grid task session for currently executing task. - * @param top Topology of task nodes from which to pick the best balanced node for given job. - * @param job Job for which to pick the best balanced node. - * @throws GridException If failed to get next balanced node. - * @return Best balanced node for the given job within given task session. - */ - public ClusterNode getBalancedNode(ComputeTaskSession ses, List<ClusterNode> top, ComputeJob job) throws GridException; -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b69a23cd/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/adaptive/AdaptiveCpuLoadProbe.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/adaptive/AdaptiveCpuLoadProbe.java b/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/adaptive/AdaptiveCpuLoadProbe.java deleted file mode 100644 index c438e9c..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/adaptive/AdaptiveCpuLoadProbe.java +++ /dev/null @@ -1,229 +0,0 @@ -/* @java.file.header */ - -/* _________ _____ __________________ _____ - * __ ____/___________(_)______ /__ ____/______ ____(_)_______ - * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ - * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / - * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ - */ - -package org.gridgain.grid.spi.loadbalancing.adaptive; - -import org.apache.ignite.cluster.*; -import org.gridgain.grid.util.typedef.internal.*; - -/** - * Implementation of node load probing based on CPU load. - * <p> - * Based on {@link #setUseAverage(boolean)} - * parameter, this implementation will either use average CPU load - * values or current (default is to use averages). - * <p> - * Based on {@link #setUseProcessors(boolean)} parameter, this implementation - * will either take number of processors on the node into account or not. - * Since CPU load on multi-processor boxes shows medium load of multiple CPU's it - * usually means that the remaining capacity is proportional to the number of - * CPU's (or cores) on the node. This configuration parameter indicates - * whether to divide each node's CPU load by the number of processors on that node - * (default is {@code true}). - * <p> - * Also note that in some environments every processor may not be adding 100% of - * processing power. For example, if you are using multi-core CPU's, then addition of - * every core would probably result in about 75% of extra CPU power. To account - * for that, you should set {@link #setProcessorCoefficient(double)} parameter to - * {@code 0.75} . - * <p> - * Below is an example of how CPU load probe would be configured in GridGain - * Spring configuration file: - * <pre name="code" class="xml"> - * <property name="loadBalancingSpi"> - * <bean class="org.gridgain.grid.spi.loadBalancing.adaptive.GridAdaptiveLoadBalancingSpi"> - * <property name="loadProbe"> - * <bean class="org.gridgain.grid.spi.loadBalancing.adaptive.GridAdaptiveCpuLoadProbe"> - * <property name="useAverage" value="true"/> - * <property name="useProcessors" value="true"/> - * <property name="processorCoefficient" value="0.9"/> - * </bean> - * </property> - * </bean> - * </property> - * </pre> - * <p> - * This implementation is used by default by {@link AdaptiveLoadBalancingSpi} SPI. - */ -public class AdaptiveCpuLoadProbe implements AdaptiveLoadProbe { - /** Flag indicating whether to use average CPU load vs. current. */ - private boolean useAvg = true; - - /** - * Flag indicating whether to divide each node's CPU load - * by the number of processors on that node. - */ - private boolean useProcs = true; - - /** - * Coefficient of every CPU processor. By default it is {@code 1}, but - * in some environments every processor may not be adding 100% of processing - * power. For example, if you are using multi-core CPU's, then addition of - * every core would probably result in about 75% of extra CPU power, and hence - * you would set this coefficient to {@code 0.75} . - */ - private double procCoefficient = 1; - - /** - * Initializes CPU load probe to use CPU load average by default. - */ - public AdaptiveCpuLoadProbe() { - // No-op. - } - - /** - * Specifies whether to use average CPU load vs. current and whether or - * not to take number of processors into account. - * <p> - * Since CPU load on multi-processor boxes shows medium load of multiple CPU's it - * usually means that the remaining capacity is proportional to the number of - * CPU's (or cores) on the node. - * - * @param useAvg Flag indicating whether to use average CPU load vs. current - * (default is {@code true}). - * @param useProcs Flag indicating whether to divide each node's CPU load - * by the number of processors on that node (default is {@code true}). - */ - public AdaptiveCpuLoadProbe(boolean useAvg, boolean useProcs) { - this.useAvg = useAvg; - this.useProcs = useProcs; - } - - /** - * Specifies whether to use average CPU load vs. current and whether or - * not to take number of processors into account. It also allows to - * specify the coefficient of addition power every CPU adds. - * <p> - * Since CPU load on multi-processor boxes shows medium load of multiple CPU's it - * usually means that the remaining capacity is proportional to the number of - * CPU's (or cores) on the node. - * <p> - * Also, in some environments every processor may not be adding 100% of processing - * power. For example, if you are using multi-core CPU's, then addition of - * every core would probably result in about 75% of extra CPU power, and hence - * you would set this coefficient to {@code 0.75} . - * - * @param useAvg Flag indicating whether to use average CPU load vs. current - * (default is {@code true}). - * @param useProcs Flag indicating whether to divide each node's CPU load - * by the number of processors on that node (default is {@code true}). - * @param procCoefficient Coefficient of every CPU processor (default value is {@code 1}). - */ - public AdaptiveCpuLoadProbe(boolean useAvg, boolean useProcs, double procCoefficient) { - this.useAvg = useAvg; - this.useProcs = useProcs; - this.procCoefficient = procCoefficient; - } - - /** - * Gets flag indicating whether to use average CPU load vs. current. - * - * @return Flag indicating whether to use average CPU load vs. current. - */ - public boolean isUseAverage() { - return useAvg; - } - - /** - * Sets flag indicating whether to use average CPU load vs. current. - * If not explicitly set, then default value is {@code true}. - * - * @param useAvg Flag indicating whether to use average CPU load vs. current. - */ - public void setUseAverage(boolean useAvg) { - this.useAvg = useAvg; - } - - /** - * Gets flag indicating whether to use average CPU load vs. current - * (default is {@code true}). - * <p> - * Since CPU load on multi-processor boxes shows medium load of multiple CPU's it - * usually means that the remaining capacity is proportional to the number of - * CPU's (or cores) on the node. - * - * @return Flag indicating whether to divide each node's CPU load - * by the number of processors on that node (default is {@code true}). - */ - public boolean isUseProcessors() { - return useProcs; - } - - /** - * Sets flag indicating whether to use average CPU load vs. current - * (default is {@code true}). - * <p> - * Since CPU load on multi-processor boxes shows medium load of multiple CPU's it - * usually means that the remaining capacity is proportional to the number of - * CPU's (or cores) on the node. - * <p> - * If not explicitly set, then default value is {@code true}. - * - * @param useProcs Flag indicating whether to divide each node's CPU load - * by the number of processors on that node (default is {@code true}). - */ - public void setUseProcessors(boolean useProcs) { - this.useProcs = useProcs; - } - - /** - * Gets coefficient of every CPU processor. By default it is {@code 1}, but - * in some environments every processor may not be adding 100% of processing - * power. For example, if you are using multi-core CPU's, then addition of - * every core would probably result in about 75% of extra CPU power, and hence - * you would set this coefficient to {@code 0.75} . - * <p> - * This value is ignored if {@link #isUseProcessors()} is set to {@code false}. - * - * @return Coefficient of every CPU processor. - */ - public double getProcessorCoefficient() { - return procCoefficient; - } - - /** - * Sets coefficient of every CPU processor. By default it is {@code 1}, but - * in some environments every processor may not be adding 100% of processing - * power. For example, if you are using multi-core CPU's, then addition of - * every core would probably result in about 75% of extra CPU power, and hence - * you would set this coefficient to {@code 0.75} . - * <p> - * This value is ignored if {@link #isUseProcessors()} is set to {@code false}. - * - * @param procCoefficient Coefficient of every CPU processor. - */ - public void setProcessorCoefficient(double procCoefficient) { - A.ensure(procCoefficient > 0, "procCoefficient > 0"); - - this.procCoefficient = procCoefficient; - } - - /** {@inheritDoc} */ - @Override public double getLoad(ClusterNode node, int jobsSentSinceLastUpdate) { - ClusterNodeMetrics metrics = node.metrics(); - - double k = 1.0d; - - if (useProcs) { - int procs = metrics.getTotalCpus(); - - if (procs > 1) - k = procs * procCoefficient; - } - - double load = (useAvg ? metrics.getAverageCpuLoad() : metrics.getCurrentCpuLoad()) / k; - - return load < 0 ? 0 : load; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(AdaptiveCpuLoadProbe.class, this); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b69a23cd/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/adaptive/AdaptiveJobCountLoadProbe.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/adaptive/AdaptiveJobCountLoadProbe.java b/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/adaptive/AdaptiveJobCountLoadProbe.java deleted file mode 100644 index 95ef91d..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/adaptive/AdaptiveJobCountLoadProbe.java +++ /dev/null @@ -1,96 +0,0 @@ -/* @java.file.header */ - -/* _________ _____ __________________ _____ - * __ ____/___________(_)______ /__ ____/______ ____(_)_______ - * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ - * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / - * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ - */ - -package org.gridgain.grid.spi.loadbalancing.adaptive; - -import org.apache.ignite.cluster.*; -import org.gridgain.grid.util.typedef.internal.*; - -/** - * Implementation of node load probing based on active and waiting job count. - * Based on {@link #setUseAverage(boolean)} parameter, this implementation will - * either use average job count values or current (default is to use averages). - * <p> - * The load of a node is simply calculated by adding active and waiting job counts. - * <p> - * Below is an example of how CPU load probe would be configured in GridGain - * Spring configuration file: - * <pre name="code" class="xml"> - * <property name="loadBalancingSpi"> - * <bean class="org.gridgain.grid.spi.loadBalancing.adaptive.GridAdaptiveLoadBalancingSpi"> - * <property name="loadProbe"> - * <bean class="org.gridgain.grid.spi.loadBalancing.adaptive.GridAdaptiveJobCountLoadProbe"> - * <property name="useAverage" value="true"/> - * </bean> - * </property> - * </bean> - * </property> - * </pre> - */ -public class AdaptiveJobCountLoadProbe implements AdaptiveLoadProbe { - /** Flag indicating whether to use average CPU load vs. current. */ - private boolean useAvg = true; - - /** - * Initializes active job probe. - */ - public AdaptiveJobCountLoadProbe() { - // No-op. - } - - /** - * Creates new active job prove specifying whether to use average - * job counts vs. current. - * - * @param useAvg Flag indicating whether to use average job counts vs. current. - */ - public AdaptiveJobCountLoadProbe(boolean useAvg) { - this.useAvg = useAvg; - } - - /** - * Gets flag indicating whether to use average job counts vs. current. - * - * @return Flag indicating whether to use average job counts vs. current. - */ - public boolean isUseAverage() { - return useAvg; - } - - /** - * Sets flag indicating whether to use average job counts vs. current. - * - * @param useAvg Flag indicating whether to use average job counts vs. current. - */ - public void setUseAverage(boolean useAvg) { - this.useAvg = useAvg; - } - - - /** {@inheritDoc} */ - @Override public double getLoad(ClusterNode node, int jobsSentSinceLastUpdate) { - ClusterNodeMetrics metrics = node.metrics(); - - if (useAvg) { - double load = metrics.getAverageActiveJobs() + metrics.getAverageWaitingJobs(); - - if (load > 0) - return load; - } - - double load = metrics.getCurrentActiveJobs() + metrics.getCurrentWaitingJobs(); - - return load < 0 ? 0 : load; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(AdaptiveJobCountLoadProbe.class, this); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b69a23cd/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/adaptive/AdaptiveLoadBalancingSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/adaptive/AdaptiveLoadBalancingSpi.java b/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/adaptive/AdaptiveLoadBalancingSpi.java deleted file mode 100644 index 069d269..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/adaptive/AdaptiveLoadBalancingSpi.java +++ /dev/null @@ -1,581 +0,0 @@ -/* @java.file.header */ - -/* _________ _____ __________________ _____ - * __ ____/___________(_)______ /__ ____/______ ____(_)_______ - * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ - * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / - * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ - */ - -package org.gridgain.grid.spi.loadbalancing.adaptive; - -import org.apache.ignite.*; -import org.apache.ignite.cluster.*; -import org.apache.ignite.compute.*; -import org.apache.ignite.events.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.resources.*; -import org.apache.ignite.spi.*; -import org.gridgain.grid.*; -import org.gridgain.grid.kernal.managers.eventstorage.*; -import org.gridgain.grid.spi.loadbalancing.*; -import org.gridgain.grid.util.typedef.*; -import org.gridgain.grid.util.typedef.internal.*; -import org.jdk8.backport.*; -import org.jetbrains.annotations.*; - -import java.util.*; -import java.util.concurrent.*; -import java.util.concurrent.atomic.*; -import java.util.concurrent.locks.*; - -import static org.apache.ignite.events.IgniteEventType.*; - -/** - * Load balancing SPI that adapts to overall node performance. It - * proportionally distributes more jobs to more performant nodes based - * on a pluggable and dynamic node load probing. - * <p> - * <h1 class="header">Adaptive Node Probe</h1> - * This SPI comes with pluggable algorithm to calculate a node load - * at any given point of time. The algorithm is defined by - * {@link AdaptiveLoadProbe} interface and user is - * free to provide custom implementations. By default - * {@link AdaptiveCpuLoadProbe} implementation is used - * which distributes jobs to nodes based on average CPU load - * on every node. - * <p> - * The following load probes are available with the product: - * <ul> - * <li>{@link AdaptiveCpuLoadProbe} - default</li> - * <li>{@link AdaptiveProcessingTimeLoadProbe}</li> - * <li>{@link AdaptiveJobCountLoadProbe}</li> - * </ul> - * Note that if {@link AdaptiveLoadProbe#getLoad(org.apache.ignite.cluster.ClusterNode, int)} returns a value of {@code 0}, - * then implementation will assume that load value is simply not available and - * will try to calculate an average of load values for other nodes. If such - * average cannot be obtained (all node load values are {@code 0}), then a value - * of {@code 1} will be used. - * <p> - * When working with node metrics, take into account that all averages are - * calculated over metrics history size defined by {@link org.apache.ignite.configuration.IgniteConfiguration#getMetricsExpireTime()} - * and {@link org.apache.ignite.configuration.IgniteConfiguration#getMetricsHistorySize()} grid configuration parameters. - * Generally the larger these configuration parameter values are, the more precise the metrics are. - * You should tune these values based on the level of accuracy needed vs. the additional memory - * that would be required for storing metrics. - * <p> - * You should also keep in mind that metrics for remote nodes are delayed (usually by the - * heartbeat frequency). So if it is acceptable in your environment, set the heartbeat frequency - * to be more inline with job execution time. Generally, the more often heartbeats between nodes - * are exchanged, the more precise the metrics are. However, you should keep in mind that if - * heartbeats are exchanged too often then it may create unnecessary traffic in the network. - * Heartbeats (or metrics update frequency) can be configured via underlying - * {@link org.apache.ignite.spi.discovery.DiscoverySpi} used in your grid. - * <p> - * Here is an example of how probing can be implemented to use - * number of active and waiting jobs as probing mechanism: - * <pre name="code" class="java"> - * public class FooBarLoadProbe implements GridAdaptiveLoadProbe { - * // Flag indicating whether to use average value or current. - * private int useAvg = true; - * - * public FooBarLoadProbe(boolean useAvg) { - * this.useAvg = useAvg; - * } - * - * // Calculate load based on number of active and waiting jobs. - * public double getLoad(GridNode node, int jobsSentSinceLastUpdate) { - * GridNodeMetrics metrics = node.getMetrics(); - * - * if (useAvg) { - * double load = metrics.getAverageActiveJobs() + metrics.getAverageWaitingJobs(); - * - * if (load > 0) { - * return load; - * } - * } - * - * return metrics.getCurrentActiveJobs() + metrics.getCurrentWaitingJobs(); - * } - * } - * </pre> - * <h1 class="header">Which Node Probe To Use</h1> - * There is no correct answer here. Every single node probe will work better or worse in - * different environments. CPU load probe (default option) is the safest approach to start - * with as it simply attempts to utilize every CPU on the grid to the maximum. However, you should - * experiment with other probes by executing load tests in your environment and observing - * which probe gives you best performance and load balancing. - * <p> - * <h1 class="header">Task Coding Example</h1> - * If you are using {@link org.apache.ignite.compute.ComputeTaskSplitAdapter} then load balancing logic - * is transparent to your code and is handled automatically by the adapter. - * Here is an example of how your task will look: - * <pre name="code" class="java"> - * public class MyFooBarTask extends GridComputeTaskSplitAdapter<Object, Object> { - * @Override - * protected Collection<? extends GridComputeJob> split(int gridSize, Object arg) throws GridException { - * List<MyFooBarJob> jobs = new ArrayList<MyFooBarJob>(gridSize); - * - * for (int i = 0; i < gridSize; i++) { - * jobs.add(new MyFooBarJob(arg)); - * } - * - * // Node assignment via load balancer - * // happens automatically. - * return jobs; - * } - * ... - * } - * </pre> - * If you need more fine-grained control over how some jobs within task get mapped to a node - * and use affinity load balancing for some other jobs within task, then you should use - * {@link org.apache.ignite.compute.ComputeTaskAdapter}. Here is an example of how your task will look. Note that in this - * case we manually inject load balancer and use it to pick the best node. Doing it in - * such way would allow user to map some jobs manually and for others use load balancer. - * <pre name="code" class="java"> - * public class MyFooBarTask extends GridComputeTaskAdapter<String, String> { - * // Inject load balancer. - * @GridLoadBalancerResource - * GridComputeLoadBalancer balancer; - * - * // Map jobs to grid nodes. - * public Map<? extends GridComputeJob, GridNode> map(List<GridNode> subgrid, String arg) throws GridException { - * Map<MyFooBarJob, GridNode> jobs = new HashMap<MyFooBarJob, GridNode>(subgrid.size()); - * - * // In more complex cases, you can actually do - * // more complicated assignments of jobs to nodes. - * for (int i = 0; i < subgrid.size(); i++) { - * // Pick the next best balanced node for the job. - * jobs.put(new MyFooBarJob(arg), balancer.getBalancedNode()) - * } - * - * return jobs; - * } - * - * // Aggregate results into one compound result. - * public String reduce(List<GridComputeJobResult> results) throws GridException { - * // For the purpose of this example we simply - * // concatenate string representation of every - * // job result - * StringBuilder buf = new StringBuilder(); - * - * for (GridComputeJobResult res : results) { - * // Append string representation of result - * // returned by every job. - * buf.append(res.getData().string()); - * } - * - * return buf.string(); - * } - * } - * </pre> - * <p> - * <h1 class="header">Configuration</h1> - * In order to use this load balancer, you should configure your grid instance - * to use {@code GridJobsLoadBalancingSpi} either from Spring XML file or - * directly. The following configuration parameters are supported: - * <h2 class="header">Mandatory</h2> - * This SPI has no mandatory configuration parameters. - * <h2 class="header">Optional</h2> - * This SPI has the following optional configuration parameters: - * <ul> - * <li> - * Adaptive node load probing implementation (see {@link #setLoadProbe(AdaptiveLoadProbe)}). - * This configuration parameter supplies a custom algorithm for probing a node's load. - * By default, {@link AdaptiveCpuLoadProbe} implementation is used which - * takes every node's CPU load and tries to send proportionally more jobs to less loaded nodes. - * </li> - * </ul> - * <p> - * Below is Java configuration example: - * <pre name="code" class="java"> - * GridAdaptiveLoadBalancingSpi spi = new GridAdaptiveLoadBalancingSpi(); - * - * // Configure probe to use latest job execution time vs. average. - * GridAdaptiveProcessingTimeLoadProbe probe = new GridAdaptiveProcessingTimeLoadProbe(false); - * - * spi.setLoadProbe(probe); - * - * GridConfiguration cfg = new GridConfiguration(); - * - * // Override default load balancing SPI. - * cfg.setLoadBalancingSpi(spi); - * - * // Starts grid. - * G.start(cfg); - * </pre> - * Here is how you can configure {@code GridJobsLoadBalancingSpi} using Spring XML configuration: - * <pre name="code" class="xml"> - * <property name="loadBalancingSpi"> - * <bean class="org.gridgain.grid.spi.loadBalancing.adaptive.GridAdaptiveLoadBalancingSpi"> - * <property name="loadProbe"> - * <bean class="org.gridgain.grid.spi.loadBalancing.adaptive.GridAdaptiveProcessingTimeLoadProbe"> - * <constructor-arg value="false"/> - * </bean> - * </property> - * </bean> - * </property> - * </pre> - * <p> - * <img src="http://www.gridgain.com/images/spring-small.png"> - * <br> - * For information about Spring framework visit <a href="http://www.springframework.org/">www.springframework.org</a> - */ -@IgniteSpiMultipleInstancesSupport(true) -public class AdaptiveLoadBalancingSpi extends IgniteSpiAdapter implements LoadBalancingSpi, - AdaptiveLoadBalancingSpiMBean { - /** Random number generator. */ - private static final Random RAND = new Random(); - - /** Grid logger. */ - @IgniteLoggerResource - private IgniteLogger log; - - /** */ - private AdaptiveLoadProbe probe = new AdaptiveCpuLoadProbe(); - - /** Local event listener to listen to task completion events. */ - private GridLocalEventListener evtLsnr; - - /** Task topologies. First pair value indicates whether or not jobs have been mapped. */ - private ConcurrentMap<IgniteUuid, IgniteBiTuple<Boolean, WeightedTopology>> taskTops = - new ConcurrentHashMap8<>(); - - /** */ - private final Map<UUID, AtomicInteger> nodeJobs = new HashMap<>(); - - /** */ - private final ReadWriteLock rwLock = new ReentrantReadWriteLock(); - - /** {@inheritDoc} */ - @Override public String getLoadProbeFormatted() { - return probe.toString(); - } - - /** - * Sets implementation of node load probe. By default {@link AdaptiveProcessingTimeLoadProbe} - * is used which proportionally distributes load based on the average job execution - * time on every node. - * - * @param probe Implementation of node load probe - */ - @IgniteSpiConfiguration(optional = true) - public void setLoadProbe(AdaptiveLoadProbe probe) { - A.ensure(probe != null, "probe != null"); - - this.probe = probe; - } - - /** {@inheritDoc} */ - @Override public void spiStart(@Nullable String gridName) throws IgniteSpiException { - startStopwatch(); - - assertParameter(probe != null, "loadProbe != null"); - - if (log.isDebugEnabled()) - log.debug(configInfo("loadProbe", probe)); - - registerMBean(gridName, this, AdaptiveLoadBalancingSpiMBean.class); - - // Ack ok start. - if (log.isDebugEnabled()) - log.debug(startInfo()); - } - - /** {@inheritDoc} */ - @Override public void spiStop() throws IgniteSpiException { - rwLock.writeLock().lock(); - - try { - nodeJobs.clear(); - } - finally { - rwLock.writeLock().unlock(); - } - - unregisterMBean(); - - // Ack ok stop. - if (log.isDebugEnabled()) - log.debug(stopInfo()); - } - - /** {@inheritDoc} */ - @Override protected void onContextInitialized0(IgniteSpiContext spiCtx) throws IgniteSpiException { - getSpiContext().addLocalEventListener(evtLsnr = new GridLocalEventListener() { - @Override public void onEvent(IgniteEvent evt) { - switch (evt.type()) { - case EVT_TASK_FINISHED: - case EVT_TASK_FAILED: { - IgniteTaskEvent taskEvt = (IgniteTaskEvent)evt; - - taskTops.remove(taskEvt.taskSessionId()); - - if (log.isDebugEnabled()) - log.debug("Removed task topology from topology cache for session: " + - taskEvt.taskSessionId()); - - break; - } - - case EVT_JOB_MAPPED: { - // We should keep topology and use cache in GridComputeTask#map() method to - // avoid O(n*n/2) complexity, after that we can drop caches. - // Here we set mapped property and later cache will be ignored - IgniteJobEvent jobEvt = (IgniteJobEvent)evt; - - IgniteBiTuple<Boolean, WeightedTopology> weightedTop = taskTops.get(jobEvt.taskSessionId()); - - if (weightedTop != null) - weightedTop.set1(true); - - if (log.isDebugEnabled()) - log.debug("Job has been mapped. Ignore cache for session: " + jobEvt.taskSessionId()); - - break; - } - - case EVT_NODE_METRICS_UPDATED: - case EVT_NODE_FAILED: - case EVT_NODE_JOINED: - case EVT_NODE_LEFT: { - IgniteDiscoveryEvent discoEvt = (IgniteDiscoveryEvent)evt; - - rwLock.writeLock().lock(); - - try { - switch (evt.type()) { - case EVT_NODE_JOINED: { - nodeJobs.put(discoEvt.eventNode().id(), new AtomicInteger(0)); - - break; - } - - case EVT_NODE_LEFT: - case EVT_NODE_FAILED: { - nodeJobs.remove(discoEvt.eventNode().id()); - - break; - } - - case EVT_NODE_METRICS_UPDATED: { - // Reset counter. - nodeJobs.put(discoEvt.eventNode().id(), new AtomicInteger(0)); - - break; - } - } - } - finally { - rwLock.writeLock().unlock(); - } - } - - } - } - }, - EVT_NODE_METRICS_UPDATED, - EVT_NODE_FAILED, - EVT_NODE_JOINED, - EVT_NODE_LEFT, - EVT_TASK_FINISHED, - EVT_TASK_FAILED, - EVT_JOB_MAPPED - ); - - // Put all known nodes. - rwLock.writeLock().lock(); - - try { - for (ClusterNode node : getSpiContext().nodes()) - nodeJobs.put(node.id(), new AtomicInteger(0)); - } - finally { - rwLock.writeLock().unlock(); - } - } - - /** {@inheritDoc} */ - @Override protected void onContextDestroyed0() { - if (evtLsnr != null) { - IgniteSpiContext ctx = getSpiContext(); - - if (ctx != null) - ctx.removeLocalEventListener(evtLsnr); - } - } - - /** {@inheritDoc} */ - @Override public ClusterNode getBalancedNode(ComputeTaskSession ses, List<ClusterNode> top, ComputeJob job) - throws GridException { - A.notNull(ses, "ses"); - A.notNull(top, "top"); - A.notNull(job, "job"); - - IgniteBiTuple<Boolean, WeightedTopology> weightedTop = taskTops.get(ses.getId()); - - // Create new cached topology if there is no one. Do not - // use cached topology after task has been mapped. - if (weightedTop == null) - // Called from GridComputeTask#map(). Put new topology and false as not mapped yet. - taskTops.put(ses.getId(), weightedTop = F.t(false, new WeightedTopology(top))); - // We have topology - check if task has been mapped. - else if (weightedTop.get1()) - // Do not use cache after GridComputeTask#map(). - return new WeightedTopology(top).pickWeightedNode(); - - return weightedTop.get2().pickWeightedNode(); - } - - /** - * Calculates node load based on set probe. - * - * @param top List of all nodes. - * @param node Node to get load for. - * @return Node load. - * @throws GridException If returned load is negative. - */ - @SuppressWarnings({"TooBroadScope"}) - private double getLoad(Collection<ClusterNode> top, ClusterNode node) throws GridException { - assert !F.isEmpty(top); - - int jobsSentSinceLastUpdate = 0; - - rwLock.readLock().lock(); - - try { - AtomicInteger cnt = nodeJobs.get(node.id()); - - jobsSentSinceLastUpdate = cnt == null ? 0 : cnt.get(); - } - finally { - rwLock.readLock().unlock(); - } - - double load = probe.getLoad(node, jobsSentSinceLastUpdate); - - if (load < 0) - throw new GridException("Failed to obtain non-negative load from adaptive load probe: " + load); - - return load; - } - - /** - * Holder for weighted topology. - */ - private class WeightedTopology { - /** Topology sorted by weight. */ - private final SortedMap<Double, ClusterNode> circle = new TreeMap<>(); - - /** - * @param top Task topology. - * @throws GridException If any load was negative. - */ - WeightedTopology(List<ClusterNode> top) throws GridException { - assert !F.isEmpty(top); - - double totalLoad = 0; - - // We need to cache loads here to avoid calls later as load might be - // changed between the calls. - double[] nums = new double[top.size()]; - - int zeroCnt = 0; - - // Compute loads. - for (int i = 0; i < top.size(); i++) { - double load = getLoad(top, top.get(i)); - - nums[i] = load; - - if (load == 0) - zeroCnt++; - - totalLoad += load; - } - - // Take care of zero loads. - if (zeroCnt > 0) { - double newTotal = totalLoad; - - int nonZeroCnt = top.size() - zeroCnt; - - for (int i = 0; i < nums.length; i++) { - double load = nums[i]; - - if (load == 0) { - if (nonZeroCnt > 0) - load = totalLoad / nonZeroCnt; - - if (load == 0) - load = 1; - - nums[i] = load; - - newTotal += load; - } - } - - totalLoad = newTotal; - } - - double totalWeight = 0; - - // Calculate weights and total weight. - for (int i = 0; i < nums.length; i++) { - assert nums[i] > 0 : "Invalid load: " + nums[i]; - - double weight = totalLoad / nums[i]; - - // Convert to weight. - nums[i] = weight; - - totalWeight += weight; - } - - double weight = 0; - - // Enforce range from 0 to 1. - for (int i = 0; i < nums.length; i++) { - weight = i == nums.length - 1 ? 1.0d : weight + nums[i] / totalWeight; - - assert weight < 2 : "Invalid weight: " + weight; - - // Complexity of this put is O(logN). - circle.put(weight, top.get(i)); - } - } - - /** - * Gets weighted node in random fashion. - * - * @return Weighted node. - */ - ClusterNode pickWeightedNode() { - double weight = RAND.nextDouble(); - - SortedMap<Double, ClusterNode> pick = circle.tailMap(weight); - - ClusterNode node = pick.get(pick.firstKey()); - - rwLock.readLock().lock(); - - try { - AtomicInteger cnt = nodeJobs.get(node.id()); - - if (cnt != null) - cnt.incrementAndGet(); - } - finally { - rwLock.readLock().unlock(); - } - - return node; - } - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(AdaptiveLoadBalancingSpi.class, this); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b69a23cd/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/adaptive/AdaptiveLoadBalancingSpiMBean.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/adaptive/AdaptiveLoadBalancingSpiMBean.java b/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/adaptive/AdaptiveLoadBalancingSpiMBean.java deleted file mode 100644 index 5553635..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/adaptive/AdaptiveLoadBalancingSpiMBean.java +++ /dev/null @@ -1,27 +0,0 @@ -/* @java.file.header */ - -/* _________ _____ __________________ _____ - * __ ____/___________(_)______ /__ ____/______ ____(_)_______ - * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ - * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / - * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ - */ - -package org.gridgain.grid.spi.loadbalancing.adaptive; - -import org.apache.ignite.mbean.*; -import org.apache.ignite.spi.*; - -/** - * Management MBean for {@link AdaptiveLoadBalancingSpi} SPI. - */ -@IgniteMBeanDescription("MBean that provides access to adaptive load balancing SPI configuration.") -public interface AdaptiveLoadBalancingSpiMBean extends IgniteSpiManagementMBean { - /** - * Gets text description of current load probing implementation used. - * - * @return Text description of current load probing implementation used. - */ - @IgniteMBeanDescription("Text description of current load probing implementation used.") - public String getLoadProbeFormatted(); -}