http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a62862fe/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/adaptive/GridAdaptiveJobCountLoadProbe.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/adaptive/GridAdaptiveJobCountLoadProbe.java b/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/adaptive/GridAdaptiveJobCountLoadProbe.java deleted file mode 100644 index b98108e..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/adaptive/GridAdaptiveJobCountLoadProbe.java +++ /dev/null @@ -1,96 +0,0 @@ -/* @java.file.header */ - -/* _________ _____ __________________ _____ - * __ ____/___________(_)______ /__ ____/______ ____(_)_______ - * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ - * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / - * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ - */ - -package org.gridgain.grid.spi.loadbalancing.adaptive; - -import org.apache.ignite.cluster.*; -import org.gridgain.grid.util.typedef.internal.*; - -/** - * Implementation of node load probing based on active and waiting job count. - * Based on {@link #setUseAverage(boolean)} parameter, this implementation will - * either use average job count values or current (default is to use averages). - * <p> - * The load of a node is simply calculated by adding active and waiting job counts. - * <p> - * Below is an example of how CPU load probe would be configured in GridGain - * Spring configuration file: - * <pre name="code" class="xml"> - * <property name="loadBalancingSpi"> - * <bean class="org.gridgain.grid.spi.loadBalancing.adaptive.GridAdaptiveLoadBalancingSpi"> - * <property name="loadProbe"> - * <bean class="org.gridgain.grid.spi.loadBalancing.adaptive.GridAdaptiveJobCountLoadProbe"> - * <property name="useAverage" value="true"/> - * </bean> - * </property> - * </bean> - * </property> - * </pre> - */ -public class GridAdaptiveJobCountLoadProbe implements GridAdaptiveLoadProbe { - /** Flag indicating whether to use average CPU load vs. current. */ - private boolean useAvg = true; - - /** - * Initializes active job probe. - */ - public GridAdaptiveJobCountLoadProbe() { - // No-op. - } - - /** - * Creates new active job prove specifying whether to use average - * job counts vs. current. - * - * @param useAvg Flag indicating whether to use average job counts vs. current. - */ - public GridAdaptiveJobCountLoadProbe(boolean useAvg) { - this.useAvg = useAvg; - } - - /** - * Gets flag indicating whether to use average job counts vs. current. - * - * @return Flag indicating whether to use average job counts vs. current. - */ - public boolean isUseAverage() { - return useAvg; - } - - /** - * Sets flag indicating whether to use average job counts vs. current. - * - * @param useAvg Flag indicating whether to use average job counts vs. current. - */ - public void setUseAverage(boolean useAvg) { - this.useAvg = useAvg; - } - - - /** {@inheritDoc} */ - @Override public double getLoad(ClusterNode node, int jobsSentSinceLastUpdate) { - ClusterNodeMetrics metrics = node.metrics(); - - if (useAvg) { - double load = metrics.getAverageActiveJobs() + metrics.getAverageWaitingJobs(); - - if (load > 0) - return load; - } - - double load = metrics.getCurrentActiveJobs() + metrics.getCurrentWaitingJobs(); - - return load < 0 ? 0 : load; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridAdaptiveJobCountLoadProbe.class, this); - } -}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a62862fe/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/adaptive/GridAdaptiveLoadBalancingSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/adaptive/GridAdaptiveLoadBalancingSpi.java b/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/adaptive/GridAdaptiveLoadBalancingSpi.java deleted file mode 100644 index d02529b..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/adaptive/GridAdaptiveLoadBalancingSpi.java +++ /dev/null @@ -1,581 +0,0 @@ -/* @java.file.header */ - -/* _________ _____ __________________ _____ - * __ ____/___________(_)______ /__ ____/______ ____(_)_______ - * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ - * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / - * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ - */ - -package org.gridgain.grid.spi.loadbalancing.adaptive; - -import org.apache.ignite.*; -import org.apache.ignite.cluster.*; -import org.apache.ignite.compute.*; -import org.apache.ignite.events.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.resources.*; -import org.apache.ignite.spi.*; -import org.gridgain.grid.*; -import org.gridgain.grid.kernal.managers.eventstorage.*; -import org.gridgain.grid.spi.loadbalancing.*; -import org.gridgain.grid.util.typedef.*; -import org.gridgain.grid.util.typedef.internal.*; -import org.jdk8.backport.*; -import org.jetbrains.annotations.*; - -import java.util.*; -import java.util.concurrent.*; -import java.util.concurrent.atomic.*; -import java.util.concurrent.locks.*; - -import static org.apache.ignite.events.IgniteEventType.*; - -/** - * Load balancing SPI that adapts to overall node performance. It - * proportionally distributes more jobs to more performant nodes based - * on a pluggable and dynamic node load probing. - * <p> - * <h1 class="header">Adaptive Node Probe</h1> - * This SPI comes with pluggable algorithm to calculate a node load - * at any given point of time. The algorithm is defined by - * {@link GridAdaptiveLoadProbe} interface and user is - * free to provide custom implementations. By default - * {@link GridAdaptiveCpuLoadProbe} implementation is used - * which distributes jobs to nodes based on average CPU load - * on every node. - * <p> - * The following load probes are available with the product: - * <ul> - * <li>{@link GridAdaptiveCpuLoadProbe} - default</li> - * <li>{@link GridAdaptiveProcessingTimeLoadProbe}</li> - * <li>{@link GridAdaptiveJobCountLoadProbe}</li> - * </ul> - * Note that if {@link GridAdaptiveLoadProbe#getLoad(org.apache.ignite.cluster.ClusterNode, int)} returns a value of {@code 0}, - * then implementation will assume that load value is simply not available and - * will try to calculate an average of load values for other nodes. If such - * average cannot be obtained (all node load values are {@code 0}), then a value - * of {@code 1} will be used. - * <p> - * When working with node metrics, take into account that all averages are - * calculated over metrics history size defined by {@link org.apache.ignite.configuration.IgniteConfiguration#getMetricsExpireTime()} - * and {@link org.apache.ignite.configuration.IgniteConfiguration#getMetricsHistorySize()} grid configuration parameters. - * Generally the larger these configuration parameter values are, the more precise the metrics are. - * You should tune these values based on the level of accuracy needed vs. the additional memory - * that would be required for storing metrics. - * <p> - * You should also keep in mind that metrics for remote nodes are delayed (usually by the - * heartbeat frequency). So if it is acceptable in your environment, set the heartbeat frequency - * to be more inline with job execution time. Generally, the more often heartbeats between nodes - * are exchanged, the more precise the metrics are. However, you should keep in mind that if - * heartbeats are exchanged too often then it may create unnecessary traffic in the network. - * Heartbeats (or metrics update frequency) can be configured via underlying - * {@link org.apache.ignite.spi.discovery.DiscoverySpi} used in your grid. - * <p> - * Here is an example of how probing can be implemented to use - * number of active and waiting jobs as probing mechanism: - * <pre name="code" class="java"> - * public class FooBarLoadProbe implements GridAdaptiveLoadProbe { - * // Flag indicating whether to use average value or current. - * private int useAvg = true; - * - * public FooBarLoadProbe(boolean useAvg) { - * this.useAvg = useAvg; - * } - * - * // Calculate load based on number of active and waiting jobs. - * public double getLoad(GridNode node, int jobsSentSinceLastUpdate) { - * GridNodeMetrics metrics = node.getMetrics(); - * - * if (useAvg) { - * double load = metrics.getAverageActiveJobs() + metrics.getAverageWaitingJobs(); - * - * if (load > 0) { - * return load; - * } - * } - * - * return metrics.getCurrentActiveJobs() + metrics.getCurrentWaitingJobs(); - * } - * } - * </pre> - * <h1 class="header">Which Node Probe To Use</h1> - * There is no correct answer here. Every single node probe will work better or worse in - * different environments. CPU load probe (default option) is the safest approach to start - * with as it simply attempts to utilize every CPU on the grid to the maximum. However, you should - * experiment with other probes by executing load tests in your environment and observing - * which probe gives you best performance and load balancing. - * <p> - * <h1 class="header">Task Coding Example</h1> - * If you are using {@link org.apache.ignite.compute.ComputeTaskSplitAdapter} then load balancing logic - * is transparent to your code and is handled automatically by the adapter. - * Here is an example of how your task will look: - * <pre name="code" class="java"> - * public class MyFooBarTask extends GridComputeTaskSplitAdapter<Object, Object> { - * @Override - * protected Collection<? extends GridComputeJob> split(int gridSize, Object arg) throws GridException { - * List<MyFooBarJob> jobs = new ArrayList<MyFooBarJob>(gridSize); - * - * for (int i = 0; i < gridSize; i++) { - * jobs.add(new MyFooBarJob(arg)); - * } - * - * // Node assignment via load balancer - * // happens automatically. - * return jobs; - * } - * ... - * } - * </pre> - * If you need more fine-grained control over how some jobs within task get mapped to a node - * and use affinity load balancing for some other jobs within task, then you should use - * {@link org.apache.ignite.compute.ComputeTaskAdapter}. Here is an example of how your task will look. Note that in this - * case we manually inject load balancer and use it to pick the best node. Doing it in - * such way would allow user to map some jobs manually and for others use load balancer. - * <pre name="code" class="java"> - * public class MyFooBarTask extends GridComputeTaskAdapter<String, String> { - * // Inject load balancer. - * @GridLoadBalancerResource - * GridComputeLoadBalancer balancer; - * - * // Map jobs to grid nodes. - * public Map<? extends GridComputeJob, GridNode> map(List<GridNode> subgrid, String arg) throws GridException { - * Map<MyFooBarJob, GridNode> jobs = new HashMap<MyFooBarJob, GridNode>(subgrid.size()); - * - * // In more complex cases, you can actually do - * // more complicated assignments of jobs to nodes. - * for (int i = 0; i < subgrid.size(); i++) { - * // Pick the next best balanced node for the job. - * jobs.put(new MyFooBarJob(arg), balancer.getBalancedNode()) - * } - * - * return jobs; - * } - * - * // Aggregate results into one compound result. - * public String reduce(List<GridComputeJobResult> results) throws GridException { - * // For the purpose of this example we simply - * // concatenate string representation of every - * // job result - * StringBuilder buf = new StringBuilder(); - * - * for (GridComputeJobResult res : results) { - * // Append string representation of result - * // returned by every job. - * buf.append(res.getData().string()); - * } - * - * return buf.string(); - * } - * } - * </pre> - * <p> - * <h1 class="header">Configuration</h1> - * In order to use this load balancer, you should configure your grid instance - * to use {@code GridJobsLoadBalancingSpi} either from Spring XML file or - * directly. The following configuration parameters are supported: - * <h2 class="header">Mandatory</h2> - * This SPI has no mandatory configuration parameters. - * <h2 class="header">Optional</h2> - * This SPI has the following optional configuration parameters: - * <ul> - * <li> - * Adaptive node load probing implementation (see {@link #setLoadProbe(GridAdaptiveLoadProbe)}). - * This configuration parameter supplies a custom algorithm for probing a node's load. - * By default, {@link GridAdaptiveCpuLoadProbe} implementation is used which - * takes every node's CPU load and tries to send proportionally more jobs to less loaded nodes. - * </li> - * </ul> - * <p> - * Below is Java configuration example: - * <pre name="code" class="java"> - * GridAdaptiveLoadBalancingSpi spi = new GridAdaptiveLoadBalancingSpi(); - * - * // Configure probe to use latest job execution time vs. average. - * GridAdaptiveProcessingTimeLoadProbe probe = new GridAdaptiveProcessingTimeLoadProbe(false); - * - * spi.setLoadProbe(probe); - * - * GridConfiguration cfg = new GridConfiguration(); - * - * // Override default load balancing SPI. - * cfg.setLoadBalancingSpi(spi); - * - * // Starts grid. - * G.start(cfg); - * </pre> - * Here is how you can configure {@code GridJobsLoadBalancingSpi} using Spring XML configuration: - * <pre name="code" class="xml"> - * <property name="loadBalancingSpi"> - * <bean class="org.gridgain.grid.spi.loadBalancing.adaptive.GridAdaptiveLoadBalancingSpi"> - * <property name="loadProbe"> - * <bean class="org.gridgain.grid.spi.loadBalancing.adaptive.GridAdaptiveProcessingTimeLoadProbe"> - * <constructor-arg value="false"/> - * </bean> - * </property> - * </bean> - * </property> - * </pre> - * <p> - * <img src="http://www.gridgain.com/images/spring-small.png"> - * <br> - * For information about Spring framework visit <a href="http://www.springframework.org/">www.springframework.org</a> - */ -@IgniteSpiMultipleInstancesSupport(true) -public class GridAdaptiveLoadBalancingSpi extends IgniteSpiAdapter implements GridLoadBalancingSpi, - GridAdaptiveLoadBalancingSpiMBean { - /** Random number generator. */ - private static final Random RAND = new Random(); - - /** Grid logger. */ - @IgniteLoggerResource - private IgniteLogger log; - - /** */ - private GridAdaptiveLoadProbe probe = new GridAdaptiveCpuLoadProbe(); - - /** Local event listener to listen to task completion events. */ - private GridLocalEventListener evtLsnr; - - /** Task topologies. First pair value indicates whether or not jobs have been mapped. */ - private ConcurrentMap<IgniteUuid, IgniteBiTuple<Boolean, WeightedTopology>> taskTops = - new ConcurrentHashMap8<>(); - - /** */ - private final Map<UUID, AtomicInteger> nodeJobs = new HashMap<>(); - - /** */ - private final ReadWriteLock rwLock = new ReentrantReadWriteLock(); - - /** {@inheritDoc} */ - @Override public String getLoadProbeFormatted() { - return probe.toString(); - } - - /** - * Sets implementation of node load probe. By default {@link GridAdaptiveProcessingTimeLoadProbe} - * is used which proportionally distributes load based on the average job execution - * time on every node. - * - * @param probe Implementation of node load probe - */ - @IgniteSpiConfiguration(optional = true) - public void setLoadProbe(GridAdaptiveLoadProbe probe) { - A.ensure(probe != null, "probe != null"); - - this.probe = probe; - } - - /** {@inheritDoc} */ - @Override public void spiStart(@Nullable String gridName) throws IgniteSpiException { - startStopwatch(); - - assertParameter(probe != null, "loadProbe != null"); - - if (log.isDebugEnabled()) - log.debug(configInfo("loadProbe", probe)); - - registerMBean(gridName, this, GridAdaptiveLoadBalancingSpiMBean.class); - - // Ack ok start. - if (log.isDebugEnabled()) - log.debug(startInfo()); - } - - /** {@inheritDoc} */ - @Override public void spiStop() throws IgniteSpiException { - rwLock.writeLock().lock(); - - try { - nodeJobs.clear(); - } - finally { - rwLock.writeLock().unlock(); - } - - unregisterMBean(); - - // Ack ok stop. - if (log.isDebugEnabled()) - log.debug(stopInfo()); - } - - /** {@inheritDoc} */ - @Override protected void onContextInitialized0(IgniteSpiContext spiCtx) throws IgniteSpiException { - getSpiContext().addLocalEventListener(evtLsnr = new GridLocalEventListener() { - @Override public void onEvent(IgniteEvent evt) { - switch (evt.type()) { - case EVT_TASK_FINISHED: - case EVT_TASK_FAILED: { - IgniteTaskEvent taskEvt = (IgniteTaskEvent)evt; - - taskTops.remove(taskEvt.taskSessionId()); - - if (log.isDebugEnabled()) - log.debug("Removed task topology from topology cache for session: " + - taskEvt.taskSessionId()); - - break; - } - - case EVT_JOB_MAPPED: { - // We should keep topology and use cache in GridComputeTask#map() method to - // avoid O(n*n/2) complexity, after that we can drop caches. - // Here we set mapped property and later cache will be ignored - IgniteJobEvent jobEvt = (IgniteJobEvent)evt; - - IgniteBiTuple<Boolean, WeightedTopology> weightedTop = taskTops.get(jobEvt.taskSessionId()); - - if (weightedTop != null) - weightedTop.set1(true); - - if (log.isDebugEnabled()) - log.debug("Job has been mapped. Ignore cache for session: " + jobEvt.taskSessionId()); - - break; - } - - case EVT_NODE_METRICS_UPDATED: - case EVT_NODE_FAILED: - case EVT_NODE_JOINED: - case EVT_NODE_LEFT: { - IgniteDiscoveryEvent discoEvt = (IgniteDiscoveryEvent)evt; - - rwLock.writeLock().lock(); - - try { - switch (evt.type()) { - case EVT_NODE_JOINED: { - nodeJobs.put(discoEvt.eventNode().id(), new AtomicInteger(0)); - - break; - } - - case EVT_NODE_LEFT: - case EVT_NODE_FAILED: { - nodeJobs.remove(discoEvt.eventNode().id()); - - break; - } - - case EVT_NODE_METRICS_UPDATED: { - // Reset counter. - nodeJobs.put(discoEvt.eventNode().id(), new AtomicInteger(0)); - - break; - } - } - } - finally { - rwLock.writeLock().unlock(); - } - } - - } - } - }, - EVT_NODE_METRICS_UPDATED, - EVT_NODE_FAILED, - EVT_NODE_JOINED, - EVT_NODE_LEFT, - EVT_TASK_FINISHED, - EVT_TASK_FAILED, - EVT_JOB_MAPPED - ); - - // Put all known nodes. - rwLock.writeLock().lock(); - - try { - for (ClusterNode node : getSpiContext().nodes()) - nodeJobs.put(node.id(), new AtomicInteger(0)); - } - finally { - rwLock.writeLock().unlock(); - } - } - - /** {@inheritDoc} */ - @Override protected void onContextDestroyed0() { - if (evtLsnr != null) { - IgniteSpiContext ctx = getSpiContext(); - - if (ctx != null) - ctx.removeLocalEventListener(evtLsnr); - } - } - - /** {@inheritDoc} */ - @Override public ClusterNode getBalancedNode(ComputeTaskSession ses, List<ClusterNode> top, ComputeJob job) - throws GridException { - A.notNull(ses, "ses"); - A.notNull(top, "top"); - A.notNull(job, "job"); - - IgniteBiTuple<Boolean, WeightedTopology> weightedTop = taskTops.get(ses.getId()); - - // Create new cached topology if there is no one. Do not - // use cached topology after task has been mapped. - if (weightedTop == null) - // Called from GridComputeTask#map(). Put new topology and false as not mapped yet. - taskTops.put(ses.getId(), weightedTop = F.t(false, new WeightedTopology(top))); - // We have topology - check if task has been mapped. - else if (weightedTop.get1()) - // Do not use cache after GridComputeTask#map(). - return new WeightedTopology(top).pickWeightedNode(); - - return weightedTop.get2().pickWeightedNode(); - } - - /** - * Calculates node load based on set probe. - * - * @param top List of all nodes. - * @param node Node to get load for. - * @return Node load. - * @throws GridException If returned load is negative. - */ - @SuppressWarnings({"TooBroadScope"}) - private double getLoad(Collection<ClusterNode> top, ClusterNode node) throws GridException { - assert !F.isEmpty(top); - - int jobsSentSinceLastUpdate = 0; - - rwLock.readLock().lock(); - - try { - AtomicInteger cnt = nodeJobs.get(node.id()); - - jobsSentSinceLastUpdate = cnt == null ? 0 : cnt.get(); - } - finally { - rwLock.readLock().unlock(); - } - - double load = probe.getLoad(node, jobsSentSinceLastUpdate); - - if (load < 0) - throw new GridException("Failed to obtain non-negative load from adaptive load probe: " + load); - - return load; - } - - /** - * Holder for weighted topology. - */ - private class WeightedTopology { - /** Topology sorted by weight. */ - private final SortedMap<Double, ClusterNode> circle = new TreeMap<>(); - - /** - * @param top Task topology. - * @throws GridException If any load was negative. - */ - WeightedTopology(List<ClusterNode> top) throws GridException { - assert !F.isEmpty(top); - - double totalLoad = 0; - - // We need to cache loads here to avoid calls later as load might be - // changed between the calls. - double[] nums = new double[top.size()]; - - int zeroCnt = 0; - - // Compute loads. - for (int i = 0; i < top.size(); i++) { - double load = getLoad(top, top.get(i)); - - nums[i] = load; - - if (load == 0) - zeroCnt++; - - totalLoad += load; - } - - // Take care of zero loads. - if (zeroCnt > 0) { - double newTotal = totalLoad; - - int nonZeroCnt = top.size() - zeroCnt; - - for (int i = 0; i < nums.length; i++) { - double load = nums[i]; - - if (load == 0) { - if (nonZeroCnt > 0) - load = totalLoad / nonZeroCnt; - - if (load == 0) - load = 1; - - nums[i] = load; - - newTotal += load; - } - } - - totalLoad = newTotal; - } - - double totalWeight = 0; - - // Calculate weights and total weight. - for (int i = 0; i < nums.length; i++) { - assert nums[i] > 0 : "Invalid load: " + nums[i]; - - double weight = totalLoad / nums[i]; - - // Convert to weight. - nums[i] = weight; - - totalWeight += weight; - } - - double weight = 0; - - // Enforce range from 0 to 1. - for (int i = 0; i < nums.length; i++) { - weight = i == nums.length - 1 ? 1.0d : weight + nums[i] / totalWeight; - - assert weight < 2 : "Invalid weight: " + weight; - - // Complexity of this put is O(logN). - circle.put(weight, top.get(i)); - } - } - - /** - * Gets weighted node in random fashion. - * - * @return Weighted node. - */ - ClusterNode pickWeightedNode() { - double weight = RAND.nextDouble(); - - SortedMap<Double, ClusterNode> pick = circle.tailMap(weight); - - ClusterNode node = pick.get(pick.firstKey()); - - rwLock.readLock().lock(); - - try { - AtomicInteger cnt = nodeJobs.get(node.id()); - - if (cnt != null) - cnt.incrementAndGet(); - } - finally { - rwLock.readLock().unlock(); - } - - return node; - } - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridAdaptiveLoadBalancingSpi.class, this); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a62862fe/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/adaptive/GridAdaptiveLoadBalancingSpiMBean.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/adaptive/GridAdaptiveLoadBalancingSpiMBean.java b/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/adaptive/GridAdaptiveLoadBalancingSpiMBean.java deleted file mode 100644 index a473d59..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/adaptive/GridAdaptiveLoadBalancingSpiMBean.java +++ /dev/null @@ -1,27 +0,0 @@ -/* @java.file.header */ - -/* _________ _____ __________________ _____ - * __ ____/___________(_)______ /__ ____/______ ____(_)_______ - * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ - * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / - * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ - */ - -package org.gridgain.grid.spi.loadbalancing.adaptive; - -import org.apache.ignite.mbean.*; -import org.apache.ignite.spi.*; - -/** - * Management MBean for {@link GridAdaptiveLoadBalancingSpi} SPI. - */ -@IgniteMBeanDescription("MBean that provides access to adaptive load balancing SPI configuration.") -public interface GridAdaptiveLoadBalancingSpiMBean extends IgniteSpiManagementMBean { - /** - * Gets text description of current load probing implementation used. - * - * @return Text description of current load probing implementation used. - */ - @IgniteMBeanDescription("Text description of current load probing implementation used.") - public String getLoadProbeFormatted(); -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a62862fe/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/adaptive/GridAdaptiveLoadProbe.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/adaptive/GridAdaptiveLoadProbe.java b/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/adaptive/GridAdaptiveLoadProbe.java deleted file mode 100644 index 88226cc..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/adaptive/GridAdaptiveLoadProbe.java +++ /dev/null @@ -1,90 +0,0 @@ -/* @java.file.header */ - -/* _________ _____ __________________ _____ - * __ ____/___________(_)______ /__ ____/______ ____(_)_______ - * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ - * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / - * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ - */ - -package org.gridgain.grid.spi.loadbalancing.adaptive; - -import org.apache.ignite.cluster.*; - -/** - * Pluggable implementation of node load probing. Implementations - * of this can be configured to be used with {@link GridAdaptiveLoadBalancingSpi} - * by setting {@link GridAdaptiveLoadBalancingSpi#setLoadProbe(GridAdaptiveLoadProbe)} - * configuration parameter. - * <p> - * Note that if {@link #getLoad(org.apache.ignite.cluster.ClusterNode, int)} returns a value of {@code 0}, - * then implementation will assume that load value is simply not available and - * will try to calculate an average of load values for other nodes. If such - * average cannot be obtained (all node load values are {@code 0}), then a value - * of {@code 1} will be used. - * <p> - * By default, {@link GridAdaptiveCpuLoadProbe} probing implementation is used. - * <p> - * <h1 class="header">Example</h1> - * Here is an example of how probing can be implemented to use - * number of active and waiting jobs as probing mechanism: - * <pre name="code" class="java"> - * public class FooBarLoadProbe implements GridAdaptiveLoadProbe { - * // Flag indicating whether to use average value or current. - * private int useAvg = true; - * - * public FooBarLoadProbe(boolean useAvg) { - * this.useAvg = useAvg; - * } - * - * // Calculate load based on number of active and waiting jobs. - * public double getLoad(GridNode node, int jobsSentSinceLastUpdate) { - * GridNodeMetrics metrics = node.getMetrics(); - * - * if (useAvg) { - * double load = metrics.getAverageActiveJobs() + metrics.getAverageWaitingJobs(); - * - * if (load > 0) { - * return load; - * } - * } - * - * return metrics.getCurrentActiveJobs() + metrics.getCurrentWaitingJobs(); - * } - * } - * </pre> - * Below is an example of how a probe shown above would be configured with {@link GridAdaptiveLoadBalancingSpi} - * SPI: - * <pre name="code" class="xml"> - * <property name="loadBalancingSpi"> - * <bean class="org.gridgain.grid.spi.loadBalancing.adaptive.GridAdaptiveLoadBalancingSpi"> - * <property name="loadProbe"> - * <bean class="foo.bar.FooBarLoadProbe"> - * <constructor-arg value="true"/> - * </bean> - * </property> - * </bean> - * </property> - * </pre> - */ -public interface GridAdaptiveLoadProbe { - /** - * Calculates load value for a given node. Specific implementations would - * usually take into account some of the values provided by - * {@link org.apache.ignite.cluster.ClusterNode#metrics()} method. For example, load can be calculated - * based on job execution time or number of active jobs, or CPU/Heap utilization. - * <p> - * Note that if this method returns a value of {@code 0}, - * then implementation will assume that load value is simply not available and - * will try to calculate an average of load values for other nodes. If such - * average cannot be obtained (all node load values are {@code 0}), then a value - * of {@code 1} will be used. - * - * @param node Grid node to calculate load for. - * @param jobsSentSinceLastUpdate Number of jobs sent to this node since - * last metrics update. This parameter may be useful when - * implementation takes into account the current job count on a node. - * @return Non-negative load value for the node (zero and above). - */ - public double getLoad(ClusterNode node, int jobsSentSinceLastUpdate); -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a62862fe/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/adaptive/GridAdaptiveProcessingTimeLoadProbe.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/adaptive/GridAdaptiveProcessingTimeLoadProbe.java b/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/adaptive/GridAdaptiveProcessingTimeLoadProbe.java deleted file mode 100644 index bc6f745..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/adaptive/GridAdaptiveProcessingTimeLoadProbe.java +++ /dev/null @@ -1,98 +0,0 @@ -/* @java.file.header */ - -/* _________ _____ __________________ _____ - * __ ____/___________(_)______ /__ ____/______ ____(_)_______ - * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ - * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / - * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ - */ - -package org.gridgain.grid.spi.loadbalancing.adaptive; - -import org.apache.ignite.cluster.*; -import org.gridgain.grid.util.typedef.internal.*; - -/** - * Implementation of node load probing based on total job processing time. - * Based on {@link #setUseAverage(boolean)} - * parameter, this implementation will either use average job execution - * time values or current (default is to use averages). The algorithm - * returns a sum of job wait time and job execution time. - * <p> - * Below is an example of how CPU load probe would be configured in GridGain - * Spring configuration file: - * <pre name="code" class="xml"> - * <property name="loadBalancingSpi"> - * <bean class="org.gridgain.grid.spi.loadBalancing.adaptive.GridAdaptiveLoadBalancingSpi"> - * <property name="loadProbe"> - * <bean class="org.gridgain.grid.spi.loadBalancing.adaptive.GridAdaptiveProcessingTimeLoadProbe"> - * <property name="useAverage" value="true"/> - * </bean> - * </property> - * </bean> - * </property> - * </pre> - */ -public class GridAdaptiveProcessingTimeLoadProbe implements GridAdaptiveLoadProbe { - /** Flag indicating whether to use average execution time vs. current. */ - private boolean useAvg = true; - - /** - * Initializes execution time load probe to use - * execution time average by default. - */ - public GridAdaptiveProcessingTimeLoadProbe() { - // No-op. - } - - /** - * Specifies whether to use average execution time vs. current. - * - * @param useAvg Flag indicating whether to use average execution time vs. current. - */ - public GridAdaptiveProcessingTimeLoadProbe(boolean useAvg) { - this.useAvg = useAvg; - } - - /** - * Gets flag indicating whether to use average execution time vs. current. - * - * @return Flag indicating whether to use average execution time vs. current. - */ - public boolean isUseAverage() { - return useAvg; - } - - /** - * Sets flag indicating whether to use average execution time vs. current. - * - * @param useAvg Flag indicating whether to use average execution time vs. current. - */ - public void setUseAverage(boolean useAvg) { - this.useAvg = useAvg; - } - - - /** {@inheritDoc} */ - @Override public double getLoad(ClusterNode node, int jobsSentSinceLastUpdate) { - ClusterNodeMetrics metrics = node.metrics(); - - if (useAvg) { - double load = metrics.getAverageJobExecuteTime() + metrics.getAverageJobWaitTime(); - - // If load is greater than 0, then we can use average times. - // Otherwise, we will proceed to using current times. - if (load > 0) - return load; - } - - double load = metrics.getCurrentJobExecuteTime() + metrics.getCurrentJobWaitTime(); - - return load < 0 ? 0 : load; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridAdaptiveProcessingTimeLoadProbe.class, this); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a62862fe/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/roundrobin/GridRoundRobinGlobalLoadBalancer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/roundrobin/GridRoundRobinGlobalLoadBalancer.java b/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/roundrobin/GridRoundRobinGlobalLoadBalancer.java deleted file mode 100644 index 9234e3c..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/roundrobin/GridRoundRobinGlobalLoadBalancer.java +++ /dev/null @@ -1,305 +0,0 @@ -/* @java.file.header */ - -/* _________ _____ __________________ _____ - * __ ____/___________(_)______ /__ ____/______ ____(_)_______ - * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ - * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / - * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ - */ - -package org.gridgain.grid.spi.loadbalancing.roundrobin; - -import org.apache.ignite.*; -import org.apache.ignite.cluster.*; -import org.apache.ignite.events.*; -import org.apache.ignite.spi.*; -import org.gridgain.grid.*; -import org.gridgain.grid.kernal.managers.eventstorage.*; -import org.gridgain.grid.util.typedef.*; -import org.gridgain.grid.util.typedef.internal.*; - -import java.util.*; -import java.util.concurrent.*; -import java.util.concurrent.atomic.*; - -import static org.apache.ignite.events.IgniteEventType.*; - -/** - * Load balancer that works in global (not-per-task) mode. - */ -class GridRoundRobinGlobalLoadBalancer { - /** SPI context. */ - private IgniteSpiContext ctx; - - /** Listener for node's events. */ - private GridLocalEventListener lsnr; - - /** Logger. */ - private final IgniteLogger log; - - /** Current snapshot of nodes which participated in load balancing. */ - private volatile GridNodeList nodeList = new GridNodeList(0, null); - - /** Mutex for updating current topology. */ - private final Object mux = new Object(); - - /** Barrier for separating initialization callback and load balancing routine. */ - private final CountDownLatch initLatch = new CountDownLatch(1); - - /** - * @param log Grid logger. - */ - GridRoundRobinGlobalLoadBalancer(IgniteLogger log) { - assert log != null; - - this.log = log; - } - - /** - * @param ctx Load balancing context. - */ - void onContextInitialized(final IgniteSpiContext ctx) { - this.ctx = ctx; - - ctx.addLocalEventListener( - lsnr = new GridLocalEventListener() { - @Override public void onEvent(IgniteEvent evt) { - assert evt instanceof IgniteDiscoveryEvent; - - UUID nodeId = ((IgniteDiscoveryEvent)evt).eventNode().id(); - - synchronized (mux) { - if (evt.type() == EVT_NODE_JOINED) { - List<UUID> oldNodes = nodeList.getNodes(); - - if (!oldNodes.contains(nodeId)) { - List<UUID> newNodes = new ArrayList<>(oldNodes.size() + 1); - - newNodes.add(nodeId); - - for (UUID node : oldNodes) - newNodes.add(node); - - nodeList = new GridNodeList(0, newNodes); - } - } - else { - assert evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED; - - List<UUID> oldNodes = nodeList.getNodes(); - - if (oldNodes.contains(nodeId)) { - List<UUID> newNodes = new ArrayList<>(oldNodes.size() - 1); - - for (UUID node : oldNodes) - if (!nodeId.equals(node)) - newNodes.add(node); - - nodeList = new GridNodeList(0, newNodes); - } - } - } - } - }, - EVT_NODE_FAILED, EVT_NODE_JOINED, EVT_NODE_LEFT - ); - - synchronized (mux) { - List<UUID> oldNodes = nodeList.getNodes(); - - Collection<UUID> set = oldNodes == null ? new HashSet<UUID>() : new HashSet<>(oldNodes); - - for (ClusterNode node : ctx.nodes()) - set.add(node.id()); - - nodeList = new GridNodeList(0, new ArrayList<>(set)); - } - - initLatch.countDown(); - } - - /** */ - void onContextDestroyed() { - if (ctx != null) - ctx.removeLocalEventListener(lsnr); - } - - /** - * Gets balanced node for given topology. - * - * @param top Topology to pick from. - * @return Best balanced node. - * @throws GridException Thrown in case of any error. - */ - ClusterNode getBalancedNode(Collection<ClusterNode> top) throws GridException { - assert !F.isEmpty(top); - - awaitInitializationCompleted(); - - Map<UUID, ClusterNode> topMap = null; - - ClusterNode found; - - int misses = 0; - - do { - GridNodeList nodeList = this.nodeList; - - List<UUID> nodes = nodeList.getNodes(); - - int cycleSize = nodes.size(); - - if (cycleSize == 0) - throw new GridException("Task topology does not have any alive nodes."); - - AtomicInteger idx; - - int curIdx, nextIdx; - - do { - idx = nodeList.getCurrentIdx(); - - curIdx = idx.get(); - - nextIdx = (idx.get() + 1) % cycleSize; - } - while (!idx.compareAndSet(curIdx, nextIdx)); - - found = findNodeById(top, nodes.get(nextIdx)); - - if (found == null) { - misses++; - - // For optimization purposes checks balancer can return at least one node with specified - // request topology only after full cycle (approximately). - if (misses >= cycleSize) { - if (topMap == null) { - topMap = U.newHashMap(top.size()); - - for (ClusterNode node : top) - topMap.put(node.id(), node); - } - - checkBalancerNodes(top, topMap, nodes); - - // Zero miss counter so next topology check will be performed once again after full cycle. - misses = 0; - } - } - } - while (found == null); - - if (log.isDebugEnabled()) - log.debug("Found round-robin node: " + found); - - return found; - } - - /** - * Finds node by id. Returns null in case of absence of specified id in request topology. - * - * @param top Topology for current request. - * @param foundNodeId Node id. - * @return Found node or null in case of absence of specified id in request topology. - */ - private static ClusterNode findNodeById(Iterable<ClusterNode> top, UUID foundNodeId) { - for (ClusterNode node : top) - if (foundNodeId.equals(node.id())) - return node; - - return null; - } - - /** - * Checks if balancer can return at least one node, - * throw exception otherwise. - * - * @param top Topology for current request. - * @param topMap Topology map. - * @param nodes Current balanced nodes. - * @throws GridException If balancer can not return any node. - */ - private static void checkBalancerNodes(Collection<ClusterNode> top, Map<UUID, ClusterNode> topMap, Iterable<UUID> nodes) - throws GridException { - - boolean contains = false; - - for (UUID nodeId : nodes) { - if (topMap.get(nodeId) != null) { - contains = true; - - break; - } - } - - if (!contains) - throw new GridException("Task topology does not have alive nodes: " + top); - } - - /** - * Awaits initialization of balancing nodes to be completed. - * - * @throws GridException Thrown in case of thread interruption. - */ - private void awaitInitializationCompleted() throws GridException { - try { - if (initLatch.getCount() > 0) - initLatch.await(); - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - - throw new GridException("Global balancer was interrupted.", e); - } - } - - /** - * Snapshot of nodes which participated in load balancing. - */ - private static final class GridNodeList { - /** Cyclic pointer for selecting next node. */ - private final AtomicInteger curIdx; - - /** Node ids. */ - private final List<UUID> nodes; - - /** - * @param curIdx Initial index of current node. - * @param nodes Initial node ids. - */ - private GridNodeList(int curIdx, List<UUID> nodes) { - this.curIdx = new AtomicInteger(curIdx); - this.nodes = nodes; - } - - /** - * @return Index of current node. - */ - private AtomicInteger getCurrentIdx() { - return curIdx; - } - - /** - * @return Node ids. - */ - private List<UUID> getNodes() { - return nodes; - } - } - - /** - * THIS METHOD IS USED ONLY FOR TESTING. - * - * @return Internal list of nodes. - */ - List<UUID> getNodeIds() { - List<UUID> nodes = nodeList.getNodes(); - - return Collections.unmodifiableList(nodes); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridRoundRobinGlobalLoadBalancer.class, this); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a62862fe/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/roundrobin/GridRoundRobinLoadBalancingSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/roundrobin/GridRoundRobinLoadBalancingSpi.java b/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/roundrobin/GridRoundRobinLoadBalancingSpi.java deleted file mode 100644 index 7a85098..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/roundrobin/GridRoundRobinLoadBalancingSpi.java +++ /dev/null @@ -1,319 +0,0 @@ -/* @java.file.header */ - -/* _________ _____ __________________ _____ - * __ ____/___________(_)______ /__ ____/______ ____(_)_______ - * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ - * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / - * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ - */ - -package org.gridgain.grid.spi.loadbalancing.roundrobin; - -import org.apache.ignite.*; -import org.apache.ignite.cluster.*; -import org.apache.ignite.compute.*; -import org.apache.ignite.events.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.resources.*; -import org.apache.ignite.spi.*; -import org.gridgain.grid.*; -import org.gridgain.grid.kernal.managers.eventstorage.*; -import org.gridgain.grid.spi.loadbalancing.*; -import org.gridgain.grid.util.typedef.internal.*; -import org.jdk8.backport.*; -import org.jetbrains.annotations.*; - -import java.util.*; - -import static org.apache.ignite.events.IgniteEventType.*; - -/** - * This SPI iterates through nodes in round-robin fashion and pick the next - * sequential node. Two modes of operation are supported: per-task and global - * (see {@link #setPerTask(boolean)} configuration). - * <p> - * When configured in per-task mode, implementation will pick a random starting - * node at the beginning of every task execution and then sequentially iterate through all - * nodes in topology starting from the picked node. This is the default configuration - * and should fit most of the use cases as it provides a fairly well-distributed - * split and also ensures that jobs within a single task are spread out across - * nodes to the maximum. For cases when split size is equal to the number of nodes, - * this mode guarantees that all nodes will participate in the split. - * <p> - * When configured in global mode, a single sequential queue of nodes is maintained for - * all tasks and the next node in the queue is picked every time. In this mode (unlike in - * {@code per-task} mode) it is possible that even if split size may be equal to the - * number of nodes, some jobs within the same task will be assigned to the same node if - * multiple tasks are executing concurrently. - * <h1 class="header">Coding Example</h1> - * If you are using {@link org.apache.ignite.compute.ComputeTaskSplitAdapter} then load balancing logic - * is transparent to your code and is handled automatically by the adapter. - * Here is an example of how your task will look: - * <pre name="code" class="java"> - * public class MyFooBarTask extends GridComputeTaskSplitAdapter<Object, Object> { - * @Override - * protected Collection<? extends GridComputeJob> split(int gridSize, Object arg) throws GridException { - * List<MyFooBarJob> jobs = new ArrayList<MyFooBarJob>(gridSize); - * - * for (int i = 0; i < gridSize; i++) { - * jobs.add(new MyFooBarJob(arg)); - * } - * - * // Node assignment via load balancer - * // happens automatically. - * return jobs; - * } - * ... - * } - * </pre> - * If you need more fine-grained control over how some jobs within task get mapped to a node - * and use affinity load balancing for some other jobs within task, then you should use - * {@link org.apache.ignite.compute.ComputeTaskAdapter}. Here is an example of how your task will look. Note that in this - * case we manually inject load balancer and use it to pick the best node. Doing it in - * such way would allow user to map some jobs manually and for others use load balancer. - * <pre name="code" class="java"> - * public class MyFooBarTask extends GridComputeTaskAdapter<String, String> { - * // Inject load balancer. - * @GridLoadBalancerResource - * GridComputeLoadBalancer balancer; - * - * // Map jobs to grid nodes. - * public Map<? extends GridComputeJob, GridNode> map(List<GridNode> subgrid, String arg) throws GridException { - * Map<MyFooBarJob, GridNode> jobs = new HashMap<MyFooBarJob, GridNode>(subgrid.size()); - * - * // In more complex cases, you can actually do - * // more complicated assignments of jobs to nodes. - * for (int i = 0; i < subgrid.size(); i++) { - * // Pick the next best balanced node for the job. - * jobs.put(new MyFooBarJob(arg), balancer.getBalancedNode()) - * } - * - * return jobs; - * } - * - * // Aggregate results into one compound result. - * public String reduce(List<GridComputeJobResult> results) throws GridException { - * // For the purpose of this example we simply - * // concatenate string representation of every - * // job result - * StringBuilder buf = new StringBuilder(); - * - * for (GridComputeJobResult res : results) { - * // Append string representation of result - * // returned by every job. - * buf.append(res.getData().string()); - * } - * - * return buf.string(); - * } - * } - * </pre> - * <p> - * <h1 class="header">Configuration</h1> - * In order to use this load balancer, you should configure your grid instance - * to use {@code GridRoundRobinLoadBalancingSpi} either from Spring XML file or - * directly. The following configuration parameters are supported: - * <h2 class="header">Mandatory</h2> - * This SPI has no mandatory configuration parameters. - * <h2 class="header">Optional</h2> - * The following configuration parameters are optional: - * <ul> - * <li> - * Flag that indicates whether to use {@code per-task} or global - * round-robin modes described above (see {@link #setPerTask(boolean)}). - * </li> - * </ul> - * Below is Java configuration example: - * <pre name="code" class="java"> - * GridRandomLoadBalancingSpi = new GridRandomLoadBalancingSpi(); - * - * // Configure SPI to use global round-robin mode. - * spi.setPerTask(false); - * - * GridConfiguration cfg = new GridConfiguration(); - * - * // Override default load balancing SPI. - * cfg.setLoadBalancingSpi(spi); - * - * // Starts grid. - * G.start(cfg); - * </pre> - * Here is how you can configure {@code GridRandomLoadBalancingSpi} using Spring XML configuration: - * <pre name="code" class="xml"> - * <property name="loadBalancingSpi"> - * <bean class="org.gridgain.grid.spi.loadBalancing.roundrobin.GridRoundRobinLoadBalancingSpi"> - * <!-- Set to global round-robin mode. --> - * <property name="perTask" value="false"/> - * </bean> - * </property> - * </pre> - * <p> - * <img src="http://www.gridgain.com/images/spring-small.png"> - * <br> - * For information about Spring framework visit <a href="http://www.springframework.org/">www.springframework.org</a> - */ -@IgniteSpiMultipleInstancesSupport(true) -public class GridRoundRobinLoadBalancingSpi extends IgniteSpiAdapter implements GridLoadBalancingSpi, - GridRoundRobinLoadBalancingSpiMBean { - /** Grid logger. */ - @IgniteLoggerResource - private IgniteLogger log; - - /** */ - private GridRoundRobinGlobalLoadBalancer balancer; - - /** */ - private boolean isPerTask; - - /** */ - private final Map<IgniteUuid, GridRoundRobinPerTaskLoadBalancer> perTaskBalancers = - new ConcurrentHashMap8<>(); - - /** Event listener. */ - private final GridLocalEventListener lsnr = new GridLocalEventListener() { - @Override public void onEvent(IgniteEvent evt) { - if (evt.type() == EVT_TASK_FAILED || - evt.type() == EVT_TASK_FINISHED) - perTaskBalancers.remove(((IgniteTaskEvent)evt).taskSessionId()); - else if (evt.type() == EVT_JOB_MAPPED) { - GridRoundRobinPerTaskLoadBalancer balancer = - perTaskBalancers.get(((IgniteJobEvent)evt).taskSessionId()); - - if (balancer != null) - balancer.onMapped(); - } - } - }; - - /** {@inheritDoc} */ - @Override public boolean isPerTask() { - return isPerTask; - } - - /** - * Configuration parameter indicating whether a new round robin order should be - * created for every task. If {@code true} then load balancer is guaranteed - * to iterate through nodes sequentially for every task - so as long as number - * of jobs is less than or equal to the number of nodes, jobs are guaranteed to - * be assigned to unique nodes. If {@code false} then one round-robin order - * will be maintained for all tasks, so when tasks execute concurrently, it - * is possible for more than one job within task to be assigned to the same - * node. - * <p> - * Default is {@code false}. - * - * @param isPerTask Configuration parameter indicating whether a new round robin order should - * be created for every task. Default is {@code false}. - */ - @IgniteSpiConfiguration(optional = true) - public void setPerTask(boolean isPerTask) { - this.isPerTask = isPerTask; - } - - /** {@inheritDoc} */ - @Override public void spiStart(@Nullable String gridName) throws IgniteSpiException { - startStopwatch(); - - if (log.isDebugEnabled()) - log.debug(configInfo("isPerTask", isPerTask)); - - registerMBean(gridName, this, GridRoundRobinLoadBalancingSpiMBean.class); - - balancer = new GridRoundRobinGlobalLoadBalancer(log); - - // Ack ok start. - if (log.isDebugEnabled()) - log.debug(startInfo()); - } - - /** {@inheritDoc} */ - @Override public void spiStop() throws IgniteSpiException { - balancer = null; - - perTaskBalancers.clear(); - - unregisterMBean(); - - // Ack ok stop. - if (log.isDebugEnabled()) - log.debug(stopInfo()); - } - - /** {@inheritDoc} */ - @Override protected void onContextInitialized0(IgniteSpiContext spiCtx) throws IgniteSpiException { - if (!isPerTask) - balancer.onContextInitialized(spiCtx); - else { - if (!getSpiContext().isEventRecordable(EVT_TASK_FAILED, EVT_TASK_FINISHED, EVT_JOB_MAPPED)) - throw new IgniteSpiException("Required event types are disabled: " + - U.gridEventName(EVT_TASK_FAILED) + ", " + - U.gridEventName(EVT_TASK_FINISHED) + ", " + - U.gridEventName(EVT_JOB_MAPPED)); - - getSpiContext().addLocalEventListener(lsnr, EVT_TASK_FAILED, EVT_TASK_FINISHED, EVT_JOB_MAPPED); - } - } - - /** {@inheritDoc} */ - @Override protected void onContextDestroyed0() { - if (!isPerTask) { - if (balancer != null) - balancer.onContextDestroyed(); - } - else { - IgniteSpiContext spiCtx = getSpiContext(); - - if (spiCtx != null) - spiCtx.removeLocalEventListener(lsnr); - } - } - - /** {@inheritDoc} */ - @Override public ClusterNode getBalancedNode(ComputeTaskSession ses, List<ClusterNode> top, ComputeJob job) - throws GridException { - A.notNull(ses, "ses", top, "top"); - - if (isPerTask) { - // Note that every session operates from single thread which - // allows us to use concurrent map and avoid synchronization. - GridRoundRobinPerTaskLoadBalancer taskBalancer = perTaskBalancers.get(ses.getId()); - - if (taskBalancer == null) - perTaskBalancers.put(ses.getId(), taskBalancer = new GridRoundRobinPerTaskLoadBalancer()); - - return taskBalancer.getBalancedNode(top); - } - - return balancer.getBalancedNode(top); - } - - /** - * THIS METHOD IS USED ONLY FOR TESTING. - * - * @param ses Task session. - * @return Internal list of nodes. - */ - List<UUID> getNodeIds(ComputeTaskSession ses) { - if (isPerTask) { - GridRoundRobinPerTaskLoadBalancer balancer = perTaskBalancers.get(ses.getId()); - - if (balancer == null) - return Collections.emptyList(); - - List<UUID> ids = new ArrayList<>(); - - for (ClusterNode node : balancer.getNodes()) { - ids.add(node.id()); - } - - return ids; - } - - return balancer.getNodeIds(); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridRoundRobinLoadBalancingSpi.class, this); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a62862fe/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/roundrobin/GridRoundRobinLoadBalancingSpiMBean.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/roundrobin/GridRoundRobinLoadBalancingSpiMBean.java b/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/roundrobin/GridRoundRobinLoadBalancingSpiMBean.java deleted file mode 100644 index 2f561ae..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/roundrobin/GridRoundRobinLoadBalancingSpiMBean.java +++ /dev/null @@ -1,37 +0,0 @@ -/* @java.file.header */ - -/* _________ _____ __________________ _____ - * __ ____/___________(_)______ /__ ____/______ ____(_)_______ - * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ - * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / - * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ - */ - -package org.gridgain.grid.spi.loadbalancing.roundrobin; - -import org.apache.ignite.mbean.*; -import org.apache.ignite.spi.*; - -/** - * Management bean for {@link GridRoundRobinLoadBalancingSpi} SPI. - */ -@IgniteMBeanDescription("MBean that provides access to round robin load balancing SPI configuration.") -public interface GridRoundRobinLoadBalancingSpiMBean extends IgniteSpiManagementMBean { - /** - * Configuration parameter indicating whether a new round robin order should be - * created for every task. If {@code true} then load balancer is guaranteed - * to iterate through nodes sequentially for every task - so as long as number - * of jobs is less than or equal to the number of nodes, jobs are guaranteed to - * be assigned to unique nodes. If {@code false} then one round-robin order - * will be maintained for all tasks, so when tasks execute concurrently, it - * is possible for more than one job within task to be assigned to the same - * node. - * <p> - * Default is {@code true}. - * - * @return Configuration parameter indicating whether a new round robin order should - * be created for every task. Default is {@code true}. - */ - @IgniteMBeanDescription("Configuration parameter indicating whether a new round robin order should be created for every task.") - public boolean isPerTask(); -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a62862fe/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/roundrobin/GridRoundRobinPerTaskLoadBalancer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/roundrobin/GridRoundRobinPerTaskLoadBalancer.java b/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/roundrobin/GridRoundRobinPerTaskLoadBalancer.java deleted file mode 100644 index 4544955..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/roundrobin/GridRoundRobinPerTaskLoadBalancer.java +++ /dev/null @@ -1,96 +0,0 @@ -/* @java.file.header */ - -/* _________ _____ __________________ _____ - * __ ____/___________(_)______ /__ ____/______ ____(_)_______ - * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ - * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / - * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ - */ - -package org.gridgain.grid.spi.loadbalancing.roundrobin; - -import org.apache.ignite.cluster.*; - -import java.util.*; - -/** - * Load balancer for per-task configuration. - */ -class GridRoundRobinPerTaskLoadBalancer { - /** Balancing nodes. */ - private ArrayDeque<ClusterNode> nodeQueue; - - /** Jobs mapped flag. */ - private volatile boolean isMapped; - - /** Mutex. */ - private final Object mux = new Object(); - - /** - * Call back for job mapped event. - */ - void onMapped() { - isMapped = true; - } - - /** - * Gets balanced node for given topology. This implementation - * is to be used only from {@link org.apache.ignite.compute.ComputeTask#map(List, Object)} method - * and, therefore, does not need to be thread-safe. - * - * @param top Topology to pick from. - * @return Best balanced node. - */ - ClusterNode getBalancedNode(List<ClusterNode> top) { - assert top != null; - assert !top.isEmpty(); - - boolean readjust = isMapped; - - synchronized (mux) { - // Populate first time. - if (nodeQueue == null) - nodeQueue = new ArrayDeque<>(top); - - // If job has been mapped, then it means - // that it is most likely being failed over. - // In this case topology might have changed - // and we need to readjust with every apply. - if (readjust) - // Add missing nodes. - for (ClusterNode node : top) - if (!nodeQueue.contains(node)) - nodeQueue.offer(node); - - ClusterNode next = nodeQueue.poll(); - - // If jobs have been mapped, we need to make sure - // that queued node is still in topology. - if (readjust && next != null) { - while (!top.contains(next) && !nodeQueue.isEmpty()) - next = nodeQueue.poll(); - - // No nodes found and queue is empty. - if (next != null && !top.contains(next)) - return null; - } - - if (next != null) - // Add to the end. - nodeQueue.offer(next); - - return next; - } - } - - /** - * THIS METHOD IS USED ONLY FOR TESTING. - * - * @return Internal list of nodes. - */ - List<ClusterNode> getNodes() { - synchronized (mux) { - return Collections.unmodifiableList(new ArrayList<>(nodeQueue)); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a62862fe/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/roundrobin/RoundRobinGlobalLoadBalancer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/roundrobin/RoundRobinGlobalLoadBalancer.java b/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/roundrobin/RoundRobinGlobalLoadBalancer.java new file mode 100644 index 0000000..e467bac --- /dev/null +++ b/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/roundrobin/RoundRobinGlobalLoadBalancer.java @@ -0,0 +1,305 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.gridgain.grid.spi.loadbalancing.roundrobin; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.events.*; +import org.apache.ignite.spi.*; +import org.gridgain.grid.*; +import org.gridgain.grid.kernal.managers.eventstorage.*; +import org.gridgain.grid.util.typedef.*; +import org.gridgain.grid.util.typedef.internal.*; + +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.events.IgniteEventType.*; + +/** + * Load balancer that works in global (not-per-task) mode. + */ +class RoundRobinGlobalLoadBalancer { + /** SPI context. */ + private IgniteSpiContext ctx; + + /** Listener for node's events. */ + private GridLocalEventListener lsnr; + + /** Logger. */ + private final IgniteLogger log; + + /** Current snapshot of nodes which participated in load balancing. */ + private volatile GridNodeList nodeList = new GridNodeList(0, null); + + /** Mutex for updating current topology. */ + private final Object mux = new Object(); + + /** Barrier for separating initialization callback and load balancing routine. */ + private final CountDownLatch initLatch = new CountDownLatch(1); + + /** + * @param log Grid logger. + */ + RoundRobinGlobalLoadBalancer(IgniteLogger log) { + assert log != null; + + this.log = log; + } + + /** + * @param ctx Load balancing context. + */ + void onContextInitialized(final IgniteSpiContext ctx) { + this.ctx = ctx; + + ctx.addLocalEventListener( + lsnr = new GridLocalEventListener() { + @Override public void onEvent(IgniteEvent evt) { + assert evt instanceof IgniteDiscoveryEvent; + + UUID nodeId = ((IgniteDiscoveryEvent)evt).eventNode().id(); + + synchronized (mux) { + if (evt.type() == EVT_NODE_JOINED) { + List<UUID> oldNodes = nodeList.getNodes(); + + if (!oldNodes.contains(nodeId)) { + List<UUID> newNodes = new ArrayList<>(oldNodes.size() + 1); + + newNodes.add(nodeId); + + for (UUID node : oldNodes) + newNodes.add(node); + + nodeList = new GridNodeList(0, newNodes); + } + } + else { + assert evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED; + + List<UUID> oldNodes = nodeList.getNodes(); + + if (oldNodes.contains(nodeId)) { + List<UUID> newNodes = new ArrayList<>(oldNodes.size() - 1); + + for (UUID node : oldNodes) + if (!nodeId.equals(node)) + newNodes.add(node); + + nodeList = new GridNodeList(0, newNodes); + } + } + } + } + }, + EVT_NODE_FAILED, EVT_NODE_JOINED, EVT_NODE_LEFT + ); + + synchronized (mux) { + List<UUID> oldNodes = nodeList.getNodes(); + + Collection<UUID> set = oldNodes == null ? new HashSet<UUID>() : new HashSet<>(oldNodes); + + for (ClusterNode node : ctx.nodes()) + set.add(node.id()); + + nodeList = new GridNodeList(0, new ArrayList<>(set)); + } + + initLatch.countDown(); + } + + /** */ + void onContextDestroyed() { + if (ctx != null) + ctx.removeLocalEventListener(lsnr); + } + + /** + * Gets balanced node for given topology. + * + * @param top Topology to pick from. + * @return Best balanced node. + * @throws GridException Thrown in case of any error. + */ + ClusterNode getBalancedNode(Collection<ClusterNode> top) throws GridException { + assert !F.isEmpty(top); + + awaitInitializationCompleted(); + + Map<UUID, ClusterNode> topMap = null; + + ClusterNode found; + + int misses = 0; + + do { + GridNodeList nodeList = this.nodeList; + + List<UUID> nodes = nodeList.getNodes(); + + int cycleSize = nodes.size(); + + if (cycleSize == 0) + throw new GridException("Task topology does not have any alive nodes."); + + AtomicInteger idx; + + int curIdx, nextIdx; + + do { + idx = nodeList.getCurrentIdx(); + + curIdx = idx.get(); + + nextIdx = (idx.get() + 1) % cycleSize; + } + while (!idx.compareAndSet(curIdx, nextIdx)); + + found = findNodeById(top, nodes.get(nextIdx)); + + if (found == null) { + misses++; + + // For optimization purposes checks balancer can return at least one node with specified + // request topology only after full cycle (approximately). + if (misses >= cycleSize) { + if (topMap == null) { + topMap = U.newHashMap(top.size()); + + for (ClusterNode node : top) + topMap.put(node.id(), node); + } + + checkBalancerNodes(top, topMap, nodes); + + // Zero miss counter so next topology check will be performed once again after full cycle. + misses = 0; + } + } + } + while (found == null); + + if (log.isDebugEnabled()) + log.debug("Found round-robin node: " + found); + + return found; + } + + /** + * Finds node by id. Returns null in case of absence of specified id in request topology. + * + * @param top Topology for current request. + * @param foundNodeId Node id. + * @return Found node or null in case of absence of specified id in request topology. + */ + private static ClusterNode findNodeById(Iterable<ClusterNode> top, UUID foundNodeId) { + for (ClusterNode node : top) + if (foundNodeId.equals(node.id())) + return node; + + return null; + } + + /** + * Checks if balancer can return at least one node, + * throw exception otherwise. + * + * @param top Topology for current request. + * @param topMap Topology map. + * @param nodes Current balanced nodes. + * @throws GridException If balancer can not return any node. + */ + private static void checkBalancerNodes(Collection<ClusterNode> top, Map<UUID, ClusterNode> topMap, Iterable<UUID> nodes) + throws GridException { + + boolean contains = false; + + for (UUID nodeId : nodes) { + if (topMap.get(nodeId) != null) { + contains = true; + + break; + } + } + + if (!contains) + throw new GridException("Task topology does not have alive nodes: " + top); + } + + /** + * Awaits initialization of balancing nodes to be completed. + * + * @throws GridException Thrown in case of thread interruption. + */ + private void awaitInitializationCompleted() throws GridException { + try { + if (initLatch.getCount() > 0) + initLatch.await(); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + + throw new GridException("Global balancer was interrupted.", e); + } + } + + /** + * Snapshot of nodes which participated in load balancing. + */ + private static final class GridNodeList { + /** Cyclic pointer for selecting next node. */ + private final AtomicInteger curIdx; + + /** Node ids. */ + private final List<UUID> nodes; + + /** + * @param curIdx Initial index of current node. + * @param nodes Initial node ids. + */ + private GridNodeList(int curIdx, List<UUID> nodes) { + this.curIdx = new AtomicInteger(curIdx); + this.nodes = nodes; + } + + /** + * @return Index of current node. + */ + private AtomicInteger getCurrentIdx() { + return curIdx; + } + + /** + * @return Node ids. + */ + private List<UUID> getNodes() { + return nodes; + } + } + + /** + * THIS METHOD IS USED ONLY FOR TESTING. + * + * @return Internal list of nodes. + */ + List<UUID> getNodeIds() { + List<UUID> nodes = nodeList.getNodes(); + + return Collections.unmodifiableList(nodes); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(RoundRobinGlobalLoadBalancer.class, this); + } +}