http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b69a23cd/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/adaptive/AdaptiveLoadProbe.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/adaptive/AdaptiveLoadProbe.java b/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/adaptive/AdaptiveLoadProbe.java deleted file mode 100644 index 512536f..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/adaptive/AdaptiveLoadProbe.java +++ /dev/null @@ -1,90 +0,0 @@ -/* @java.file.header */ - -/* _________ _____ __________________ _____ - * __ ____/___________(_)______ /__ ____/______ ____(_)_______ - * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ - * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / - * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ - */ - -package org.gridgain.grid.spi.loadbalancing.adaptive; - -import org.apache.ignite.cluster.*; - -/** - * Pluggable implementation of node load probing. Implementations - * of this can be configured to be used with {@link AdaptiveLoadBalancingSpi} - * by setting {@link AdaptiveLoadBalancingSpi#setLoadProbe(AdaptiveLoadProbe)} - * configuration parameter. - * <p> - * Note that if {@link #getLoad(org.apache.ignite.cluster.ClusterNode, int)} returns a value of {@code 0}, - * then implementation will assume that load value is simply not available and - * will try to calculate an average of load values for other nodes. If such - * average cannot be obtained (all node load values are {@code 0}), then a value - * of {@code 1} will be used. - * <p> - * By default, {@link AdaptiveCpuLoadProbe} probing implementation is used. - * <p> - * <h1 class="header">Example</h1> - * Here is an example of how probing can be implemented to use - * number of active and waiting jobs as probing mechanism: - * <pre name="code" class="java"> - * public class FooBarLoadProbe implements GridAdaptiveLoadProbe { - * // Flag indicating whether to use average value or current. - * private int useAvg = true; - * - * public FooBarLoadProbe(boolean useAvg) { - * this.useAvg = useAvg; - * } - * - * // Calculate load based on number of active and waiting jobs. - * public double getLoad(GridNode node, int jobsSentSinceLastUpdate) { - * GridNodeMetrics metrics = node.getMetrics(); - * - * if (useAvg) { - * double load = metrics.getAverageActiveJobs() + metrics.getAverageWaitingJobs(); - * - * if (load > 0) { - * return load; - * } - * } - * - * return metrics.getCurrentActiveJobs() + metrics.getCurrentWaitingJobs(); - * } - * } - * </pre> - * Below is an example of how a probe shown above would be configured with {@link AdaptiveLoadBalancingSpi} - * SPI: - * <pre name="code" class="xml"> - * <property name="loadBalancingSpi"> - * <bean class="org.gridgain.grid.spi.loadBalancing.adaptive.GridAdaptiveLoadBalancingSpi"> - * <property name="loadProbe"> - * <bean class="foo.bar.FooBarLoadProbe"> - * <constructor-arg value="true"/> - * </bean> - * </property> - * </bean> - * </property> - * </pre> - */ -public interface AdaptiveLoadProbe { - /** - * Calculates load value for a given node. Specific implementations would - * usually take into account some of the values provided by - * {@link org.apache.ignite.cluster.ClusterNode#metrics()} method. For example, load can be calculated - * based on job execution time or number of active jobs, or CPU/Heap utilization. - * <p> - * Note that if this method returns a value of {@code 0}, - * then implementation will assume that load value is simply not available and - * will try to calculate an average of load values for other nodes. If such - * average cannot be obtained (all node load values are {@code 0}), then a value - * of {@code 1} will be used. - * - * @param node Grid node to calculate load for. - * @param jobsSentSinceLastUpdate Number of jobs sent to this node since - * last metrics update. This parameter may be useful when - * implementation takes into account the current job count on a node. - * @return Non-negative load value for the node (zero and above). - */ - public double getLoad(ClusterNode node, int jobsSentSinceLastUpdate); -}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b69a23cd/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/adaptive/AdaptiveProcessingTimeLoadProbe.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/adaptive/AdaptiveProcessingTimeLoadProbe.java b/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/adaptive/AdaptiveProcessingTimeLoadProbe.java deleted file mode 100644 index dc9e250..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/adaptive/AdaptiveProcessingTimeLoadProbe.java +++ /dev/null @@ -1,98 +0,0 @@ -/* @java.file.header */ - -/* _________ _____ __________________ _____ - * __ ____/___________(_)______ /__ ____/______ ____(_)_______ - * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ - * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / - * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ - */ - -package org.gridgain.grid.spi.loadbalancing.adaptive; - -import org.apache.ignite.cluster.*; -import org.gridgain.grid.util.typedef.internal.*; - -/** - * Implementation of node load probing based on total job processing time. - * Based on {@link #setUseAverage(boolean)} - * parameter, this implementation will either use average job execution - * time values or current (default is to use averages). The algorithm - * returns a sum of job wait time and job execution time. - * <p> - * Below is an example of how CPU load probe would be configured in GridGain - * Spring configuration file: - * <pre name="code" class="xml"> - * <property name="loadBalancingSpi"> - * <bean class="org.gridgain.grid.spi.loadBalancing.adaptive.GridAdaptiveLoadBalancingSpi"> - * <property name="loadProbe"> - * <bean class="org.gridgain.grid.spi.loadBalancing.adaptive.GridAdaptiveProcessingTimeLoadProbe"> - * <property name="useAverage" value="true"/> - * </bean> - * </property> - * </bean> - * </property> - * </pre> - */ -public class AdaptiveProcessingTimeLoadProbe implements AdaptiveLoadProbe { - /** Flag indicating whether to use average execution time vs. current. */ - private boolean useAvg = true; - - /** - * Initializes execution time load probe to use - * execution time average by default. - */ - public AdaptiveProcessingTimeLoadProbe() { - // No-op. - } - - /** - * Specifies whether to use average execution time vs. current. - * - * @param useAvg Flag indicating whether to use average execution time vs. current. - */ - public AdaptiveProcessingTimeLoadProbe(boolean useAvg) { - this.useAvg = useAvg; - } - - /** - * Gets flag indicating whether to use average execution time vs. current. - * - * @return Flag indicating whether to use average execution time vs. current. - */ - public boolean isUseAverage() { - return useAvg; - } - - /** - * Sets flag indicating whether to use average execution time vs. current. - * - * @param useAvg Flag indicating whether to use average execution time vs. current. - */ - public void setUseAverage(boolean useAvg) { - this.useAvg = useAvg; - } - - - /** {@inheritDoc} */ - @Override public double getLoad(ClusterNode node, int jobsSentSinceLastUpdate) { - ClusterNodeMetrics metrics = node.metrics(); - - if (useAvg) { - double load = metrics.getAverageJobExecuteTime() + metrics.getAverageJobWaitTime(); - - // If load is greater than 0, then we can use average times. - // Otherwise, we will proceed to using current times. - if (load > 0) - return load; - } - - double load = metrics.getCurrentJobExecuteTime() + metrics.getCurrentJobWaitTime(); - - return load < 0 ? 0 : load; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(AdaptiveProcessingTimeLoadProbe.class, this); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b69a23cd/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/adaptive/package.html ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/adaptive/package.html b/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/adaptive/package.html deleted file mode 100644 index ee3a5eb..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/adaptive/package.html +++ /dev/null @@ -1,15 +0,0 @@ -<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> -<!-- - @html.file.header - _________ _____ __________________ _____ - __ ____/___________(_)______ /__ ____/______ ____(_)_______ - _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ - / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / - \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ ---> -<html> -<body> - <!-- Package description. --> - Contains adaptive load balancing SPI. -</body> -</html> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b69a23cd/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/package.html ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/package.html b/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/package.html deleted file mode 100644 index fd879b9..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/package.html +++ /dev/null @@ -1,15 +0,0 @@ -<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> -<!-- - @html.file.header - _________ _____ __________________ _____ - __ ____/___________(_)______ /__ ____/______ ____(_)_______ - _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ - / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / - \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ ---> -<html> -<body> - <!-- Package description. --> - Contains APIs for load balancing SPI. -</body> -</html> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b69a23cd/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/roundrobin/RoundRobinGlobalLoadBalancer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/roundrobin/RoundRobinGlobalLoadBalancer.java b/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/roundrobin/RoundRobinGlobalLoadBalancer.java deleted file mode 100644 index e467bac..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/roundrobin/RoundRobinGlobalLoadBalancer.java +++ /dev/null @@ -1,305 +0,0 @@ -/* @java.file.header */ - -/* _________ _____ __________________ _____ - * __ ____/___________(_)______ /__ ____/______ ____(_)_______ - * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ - * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / - * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ - */ - -package org.gridgain.grid.spi.loadbalancing.roundrobin; - -import org.apache.ignite.*; -import org.apache.ignite.cluster.*; -import org.apache.ignite.events.*; -import org.apache.ignite.spi.*; -import org.gridgain.grid.*; -import org.gridgain.grid.kernal.managers.eventstorage.*; -import org.gridgain.grid.util.typedef.*; -import org.gridgain.grid.util.typedef.internal.*; - -import java.util.*; -import java.util.concurrent.*; -import java.util.concurrent.atomic.*; - -import static org.apache.ignite.events.IgniteEventType.*; - -/** - * Load balancer that works in global (not-per-task) mode. - */ -class RoundRobinGlobalLoadBalancer { - /** SPI context. */ - private IgniteSpiContext ctx; - - /** Listener for node's events. */ - private GridLocalEventListener lsnr; - - /** Logger. */ - private final IgniteLogger log; - - /** Current snapshot of nodes which participated in load balancing. */ - private volatile GridNodeList nodeList = new GridNodeList(0, null); - - /** Mutex for updating current topology. */ - private final Object mux = new Object(); - - /** Barrier for separating initialization callback and load balancing routine. */ - private final CountDownLatch initLatch = new CountDownLatch(1); - - /** - * @param log Grid logger. - */ - RoundRobinGlobalLoadBalancer(IgniteLogger log) { - assert log != null; - - this.log = log; - } - - /** - * @param ctx Load balancing context. - */ - void onContextInitialized(final IgniteSpiContext ctx) { - this.ctx = ctx; - - ctx.addLocalEventListener( - lsnr = new GridLocalEventListener() { - @Override public void onEvent(IgniteEvent evt) { - assert evt instanceof IgniteDiscoveryEvent; - - UUID nodeId = ((IgniteDiscoveryEvent)evt).eventNode().id(); - - synchronized (mux) { - if (evt.type() == EVT_NODE_JOINED) { - List<UUID> oldNodes = nodeList.getNodes(); - - if (!oldNodes.contains(nodeId)) { - List<UUID> newNodes = new ArrayList<>(oldNodes.size() + 1); - - newNodes.add(nodeId); - - for (UUID node : oldNodes) - newNodes.add(node); - - nodeList = new GridNodeList(0, newNodes); - } - } - else { - assert evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED; - - List<UUID> oldNodes = nodeList.getNodes(); - - if (oldNodes.contains(nodeId)) { - List<UUID> newNodes = new ArrayList<>(oldNodes.size() - 1); - - for (UUID node : oldNodes) - if (!nodeId.equals(node)) - newNodes.add(node); - - nodeList = new GridNodeList(0, newNodes); - } - } - } - } - }, - EVT_NODE_FAILED, EVT_NODE_JOINED, EVT_NODE_LEFT - ); - - synchronized (mux) { - List<UUID> oldNodes = nodeList.getNodes(); - - Collection<UUID> set = oldNodes == null ? new HashSet<UUID>() : new HashSet<>(oldNodes); - - for (ClusterNode node : ctx.nodes()) - set.add(node.id()); - - nodeList = new GridNodeList(0, new ArrayList<>(set)); - } - - initLatch.countDown(); - } - - /** */ - void onContextDestroyed() { - if (ctx != null) - ctx.removeLocalEventListener(lsnr); - } - - /** - * Gets balanced node for given topology. - * - * @param top Topology to pick from. - * @return Best balanced node. - * @throws GridException Thrown in case of any error. - */ - ClusterNode getBalancedNode(Collection<ClusterNode> top) throws GridException { - assert !F.isEmpty(top); - - awaitInitializationCompleted(); - - Map<UUID, ClusterNode> topMap = null; - - ClusterNode found; - - int misses = 0; - - do { - GridNodeList nodeList = this.nodeList; - - List<UUID> nodes = nodeList.getNodes(); - - int cycleSize = nodes.size(); - - if (cycleSize == 0) - throw new GridException("Task topology does not have any alive nodes."); - - AtomicInteger idx; - - int curIdx, nextIdx; - - do { - idx = nodeList.getCurrentIdx(); - - curIdx = idx.get(); - - nextIdx = (idx.get() + 1) % cycleSize; - } - while (!idx.compareAndSet(curIdx, nextIdx)); - - found = findNodeById(top, nodes.get(nextIdx)); - - if (found == null) { - misses++; - - // For optimization purposes checks balancer can return at least one node with specified - // request topology only after full cycle (approximately). - if (misses >= cycleSize) { - if (topMap == null) { - topMap = U.newHashMap(top.size()); - - for (ClusterNode node : top) - topMap.put(node.id(), node); - } - - checkBalancerNodes(top, topMap, nodes); - - // Zero miss counter so next topology check will be performed once again after full cycle. - misses = 0; - } - } - } - while (found == null); - - if (log.isDebugEnabled()) - log.debug("Found round-robin node: " + found); - - return found; - } - - /** - * Finds node by id. Returns null in case of absence of specified id in request topology. - * - * @param top Topology for current request. - * @param foundNodeId Node id. - * @return Found node or null in case of absence of specified id in request topology. - */ - private static ClusterNode findNodeById(Iterable<ClusterNode> top, UUID foundNodeId) { - for (ClusterNode node : top) - if (foundNodeId.equals(node.id())) - return node; - - return null; - } - - /** - * Checks if balancer can return at least one node, - * throw exception otherwise. - * - * @param top Topology for current request. - * @param topMap Topology map. - * @param nodes Current balanced nodes. - * @throws GridException If balancer can not return any node. - */ - private static void checkBalancerNodes(Collection<ClusterNode> top, Map<UUID, ClusterNode> topMap, Iterable<UUID> nodes) - throws GridException { - - boolean contains = false; - - for (UUID nodeId : nodes) { - if (topMap.get(nodeId) != null) { - contains = true; - - break; - } - } - - if (!contains) - throw new GridException("Task topology does not have alive nodes: " + top); - } - - /** - * Awaits initialization of balancing nodes to be completed. - * - * @throws GridException Thrown in case of thread interruption. - */ - private void awaitInitializationCompleted() throws GridException { - try { - if (initLatch.getCount() > 0) - initLatch.await(); - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - - throw new GridException("Global balancer was interrupted.", e); - } - } - - /** - * Snapshot of nodes which participated in load balancing. - */ - private static final class GridNodeList { - /** Cyclic pointer for selecting next node. */ - private final AtomicInteger curIdx; - - /** Node ids. */ - private final List<UUID> nodes; - - /** - * @param curIdx Initial index of current node. - * @param nodes Initial node ids. - */ - private GridNodeList(int curIdx, List<UUID> nodes) { - this.curIdx = new AtomicInteger(curIdx); - this.nodes = nodes; - } - - /** - * @return Index of current node. - */ - private AtomicInteger getCurrentIdx() { - return curIdx; - } - - /** - * @return Node ids. - */ - private List<UUID> getNodes() { - return nodes; - } - } - - /** - * THIS METHOD IS USED ONLY FOR TESTING. - * - * @return Internal list of nodes. - */ - List<UUID> getNodeIds() { - List<UUID> nodes = nodeList.getNodes(); - - return Collections.unmodifiableList(nodes); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(RoundRobinGlobalLoadBalancer.class, this); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b69a23cd/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/roundrobin/RoundRobinLoadBalancingSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/roundrobin/RoundRobinLoadBalancingSpi.java b/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/roundrobin/RoundRobinLoadBalancingSpi.java deleted file mode 100644 index 243e20a..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/roundrobin/RoundRobinLoadBalancingSpi.java +++ /dev/null @@ -1,319 +0,0 @@ -/* @java.file.header */ - -/* _________ _____ __________________ _____ - * __ ____/___________(_)______ /__ ____/______ ____(_)_______ - * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ - * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / - * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ - */ - -package org.gridgain.grid.spi.loadbalancing.roundrobin; - -import org.apache.ignite.*; -import org.apache.ignite.cluster.*; -import org.apache.ignite.compute.*; -import org.apache.ignite.events.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.resources.*; -import org.apache.ignite.spi.*; -import org.gridgain.grid.*; -import org.gridgain.grid.kernal.managers.eventstorage.*; -import org.gridgain.grid.spi.loadbalancing.*; -import org.gridgain.grid.util.typedef.internal.*; -import org.jdk8.backport.*; -import org.jetbrains.annotations.*; - -import java.util.*; - -import static org.apache.ignite.events.IgniteEventType.*; - -/** - * This SPI iterates through nodes in round-robin fashion and pick the next - * sequential node. Two modes of operation are supported: per-task and global - * (see {@link #setPerTask(boolean)} configuration). - * <p> - * When configured in per-task mode, implementation will pick a random starting - * node at the beginning of every task execution and then sequentially iterate through all - * nodes in topology starting from the picked node. This is the default configuration - * and should fit most of the use cases as it provides a fairly well-distributed - * split and also ensures that jobs within a single task are spread out across - * nodes to the maximum. For cases when split size is equal to the number of nodes, - * this mode guarantees that all nodes will participate in the split. - * <p> - * When configured in global mode, a single sequential queue of nodes is maintained for - * all tasks and the next node in the queue is picked every time. In this mode (unlike in - * {@code per-task} mode) it is possible that even if split size may be equal to the - * number of nodes, some jobs within the same task will be assigned to the same node if - * multiple tasks are executing concurrently. - * <h1 class="header">Coding Example</h1> - * If you are using {@link org.apache.ignite.compute.ComputeTaskSplitAdapter} then load balancing logic - * is transparent to your code and is handled automatically by the adapter. - * Here is an example of how your task will look: - * <pre name="code" class="java"> - * public class MyFooBarTask extends GridComputeTaskSplitAdapter<Object, Object> { - * @Override - * protected Collection<? extends GridComputeJob> split(int gridSize, Object arg) throws GridException { - * List<MyFooBarJob> jobs = new ArrayList<MyFooBarJob>(gridSize); - * - * for (int i = 0; i < gridSize; i++) { - * jobs.add(new MyFooBarJob(arg)); - * } - * - * // Node assignment via load balancer - * // happens automatically. - * return jobs; - * } - * ... - * } - * </pre> - * If you need more fine-grained control over how some jobs within task get mapped to a node - * and use affinity load balancing for some other jobs within task, then you should use - * {@link org.apache.ignite.compute.ComputeTaskAdapter}. Here is an example of how your task will look. Note that in this - * case we manually inject load balancer and use it to pick the best node. Doing it in - * such way would allow user to map some jobs manually and for others use load balancer. - * <pre name="code" class="java"> - * public class MyFooBarTask extends GridComputeTaskAdapter<String, String> { - * // Inject load balancer. - * @GridLoadBalancerResource - * GridComputeLoadBalancer balancer; - * - * // Map jobs to grid nodes. - * public Map<? extends GridComputeJob, GridNode> map(List<GridNode> subgrid, String arg) throws GridException { - * Map<MyFooBarJob, GridNode> jobs = new HashMap<MyFooBarJob, GridNode>(subgrid.size()); - * - * // In more complex cases, you can actually do - * // more complicated assignments of jobs to nodes. - * for (int i = 0; i < subgrid.size(); i++) { - * // Pick the next best balanced node for the job. - * jobs.put(new MyFooBarJob(arg), balancer.getBalancedNode()) - * } - * - * return jobs; - * } - * - * // Aggregate results into one compound result. - * public String reduce(List<GridComputeJobResult> results) throws GridException { - * // For the purpose of this example we simply - * // concatenate string representation of every - * // job result - * StringBuilder buf = new StringBuilder(); - * - * for (GridComputeJobResult res : results) { - * // Append string representation of result - * // returned by every job. - * buf.append(res.getData().string()); - * } - * - * return buf.string(); - * } - * } - * </pre> - * <p> - * <h1 class="header">Configuration</h1> - * In order to use this load balancer, you should configure your grid instance - * to use {@code GridRoundRobinLoadBalancingSpi} either from Spring XML file or - * directly. The following configuration parameters are supported: - * <h2 class="header">Mandatory</h2> - * This SPI has no mandatory configuration parameters. - * <h2 class="header">Optional</h2> - * The following configuration parameters are optional: - * <ul> - * <li> - * Flag that indicates whether to use {@code per-task} or global - * round-robin modes described above (see {@link #setPerTask(boolean)}). - * </li> - * </ul> - * Below is Java configuration example: - * <pre name="code" class="java"> - * GridRandomLoadBalancingSpi = new GridRandomLoadBalancingSpi(); - * - * // Configure SPI to use global round-robin mode. - * spi.setPerTask(false); - * - * GridConfiguration cfg = new GridConfiguration(); - * - * // Override default load balancing SPI. - * cfg.setLoadBalancingSpi(spi); - * - * // Starts grid. - * G.start(cfg); - * </pre> - * Here is how you can configure {@code GridRandomLoadBalancingSpi} using Spring XML configuration: - * <pre name="code" class="xml"> - * <property name="loadBalancingSpi"> - * <bean class="org.gridgain.grid.spi.loadBalancing.roundrobin.GridRoundRobinLoadBalancingSpi"> - * <!-- Set to global round-robin mode. --> - * <property name="perTask" value="false"/> - * </bean> - * </property> - * </pre> - * <p> - * <img src="http://www.gridgain.com/images/spring-small.png"> - * <br> - * For information about Spring framework visit <a href="http://www.springframework.org/">www.springframework.org</a> - */ -@IgniteSpiMultipleInstancesSupport(true) -public class RoundRobinLoadBalancingSpi extends IgniteSpiAdapter implements LoadBalancingSpi, - RoundRobinLoadBalancingSpiMBean { - /** Grid logger. */ - @IgniteLoggerResource - private IgniteLogger log; - - /** */ - private RoundRobinGlobalLoadBalancer balancer; - - /** */ - private boolean isPerTask; - - /** */ - private final Map<IgniteUuid, RoundRobinPerTaskLoadBalancer> perTaskBalancers = - new ConcurrentHashMap8<>(); - - /** Event listener. */ - private final GridLocalEventListener lsnr = new GridLocalEventListener() { - @Override public void onEvent(IgniteEvent evt) { - if (evt.type() == EVT_TASK_FAILED || - evt.type() == EVT_TASK_FINISHED) - perTaskBalancers.remove(((IgniteTaskEvent)evt).taskSessionId()); - else if (evt.type() == EVT_JOB_MAPPED) { - RoundRobinPerTaskLoadBalancer balancer = - perTaskBalancers.get(((IgniteJobEvent)evt).taskSessionId()); - - if (balancer != null) - balancer.onMapped(); - } - } - }; - - /** {@inheritDoc} */ - @Override public boolean isPerTask() { - return isPerTask; - } - - /** - * Configuration parameter indicating whether a new round robin order should be - * created for every task. If {@code true} then load balancer is guaranteed - * to iterate through nodes sequentially for every task - so as long as number - * of jobs is less than or equal to the number of nodes, jobs are guaranteed to - * be assigned to unique nodes. If {@code false} then one round-robin order - * will be maintained for all tasks, so when tasks execute concurrently, it - * is possible for more than one job within task to be assigned to the same - * node. - * <p> - * Default is {@code false}. - * - * @param isPerTask Configuration parameter indicating whether a new round robin order should - * be created for every task. Default is {@code false}. - */ - @IgniteSpiConfiguration(optional = true) - public void setPerTask(boolean isPerTask) { - this.isPerTask = isPerTask; - } - - /** {@inheritDoc} */ - @Override public void spiStart(@Nullable String gridName) throws IgniteSpiException { - startStopwatch(); - - if (log.isDebugEnabled()) - log.debug(configInfo("isPerTask", isPerTask)); - - registerMBean(gridName, this, RoundRobinLoadBalancingSpiMBean.class); - - balancer = new RoundRobinGlobalLoadBalancer(log); - - // Ack ok start. - if (log.isDebugEnabled()) - log.debug(startInfo()); - } - - /** {@inheritDoc} */ - @Override public void spiStop() throws IgniteSpiException { - balancer = null; - - perTaskBalancers.clear(); - - unregisterMBean(); - - // Ack ok stop. - if (log.isDebugEnabled()) - log.debug(stopInfo()); - } - - /** {@inheritDoc} */ - @Override protected void onContextInitialized0(IgniteSpiContext spiCtx) throws IgniteSpiException { - if (!isPerTask) - balancer.onContextInitialized(spiCtx); - else { - if (!getSpiContext().isEventRecordable(EVT_TASK_FAILED, EVT_TASK_FINISHED, EVT_JOB_MAPPED)) - throw new IgniteSpiException("Required event types are disabled: " + - U.gridEventName(EVT_TASK_FAILED) + ", " + - U.gridEventName(EVT_TASK_FINISHED) + ", " + - U.gridEventName(EVT_JOB_MAPPED)); - - getSpiContext().addLocalEventListener(lsnr, EVT_TASK_FAILED, EVT_TASK_FINISHED, EVT_JOB_MAPPED); - } - } - - /** {@inheritDoc} */ - @Override protected void onContextDestroyed0() { - if (!isPerTask) { - if (balancer != null) - balancer.onContextDestroyed(); - } - else { - IgniteSpiContext spiCtx = getSpiContext(); - - if (spiCtx != null) - spiCtx.removeLocalEventListener(lsnr); - } - } - - /** {@inheritDoc} */ - @Override public ClusterNode getBalancedNode(ComputeTaskSession ses, List<ClusterNode> top, ComputeJob job) - throws GridException { - A.notNull(ses, "ses", top, "top"); - - if (isPerTask) { - // Note that every session operates from single thread which - // allows us to use concurrent map and avoid synchronization. - RoundRobinPerTaskLoadBalancer taskBalancer = perTaskBalancers.get(ses.getId()); - - if (taskBalancer == null) - perTaskBalancers.put(ses.getId(), taskBalancer = new RoundRobinPerTaskLoadBalancer()); - - return taskBalancer.getBalancedNode(top); - } - - return balancer.getBalancedNode(top); - } - - /** - * THIS METHOD IS USED ONLY FOR TESTING. - * - * @param ses Task session. - * @return Internal list of nodes. - */ - List<UUID> getNodeIds(ComputeTaskSession ses) { - if (isPerTask) { - RoundRobinPerTaskLoadBalancer balancer = perTaskBalancers.get(ses.getId()); - - if (balancer == null) - return Collections.emptyList(); - - List<UUID> ids = new ArrayList<>(); - - for (ClusterNode node : balancer.getNodes()) { - ids.add(node.id()); - } - - return ids; - } - - return balancer.getNodeIds(); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(RoundRobinLoadBalancingSpi.class, this); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b69a23cd/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/roundrobin/RoundRobinLoadBalancingSpiMBean.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/roundrobin/RoundRobinLoadBalancingSpiMBean.java b/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/roundrobin/RoundRobinLoadBalancingSpiMBean.java deleted file mode 100644 index 1189677..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/roundrobin/RoundRobinLoadBalancingSpiMBean.java +++ /dev/null @@ -1,37 +0,0 @@ -/* @java.file.header */ - -/* _________ _____ __________________ _____ - * __ ____/___________(_)______ /__ ____/______ ____(_)_______ - * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ - * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / - * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ - */ - -package org.gridgain.grid.spi.loadbalancing.roundrobin; - -import org.apache.ignite.mbean.*; -import org.apache.ignite.spi.*; - -/** - * Management bean for {@link RoundRobinLoadBalancingSpi} SPI. - */ -@IgniteMBeanDescription("MBean that provides access to round robin load balancing SPI configuration.") -public interface RoundRobinLoadBalancingSpiMBean extends IgniteSpiManagementMBean { - /** - * Configuration parameter indicating whether a new round robin order should be - * created for every task. If {@code true} then load balancer is guaranteed - * to iterate through nodes sequentially for every task - so as long as number - * of jobs is less than or equal to the number of nodes, jobs are guaranteed to - * be assigned to unique nodes. If {@code false} then one round-robin order - * will be maintained for all tasks, so when tasks execute concurrently, it - * is possible for more than one job within task to be assigned to the same - * node. - * <p> - * Default is {@code true}. - * - * @return Configuration parameter indicating whether a new round robin order should - * be created for every task. Default is {@code true}. - */ - @IgniteMBeanDescription("Configuration parameter indicating whether a new round robin order should be created for every task.") - public boolean isPerTask(); -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b69a23cd/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/roundrobin/RoundRobinPerTaskLoadBalancer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/roundrobin/RoundRobinPerTaskLoadBalancer.java b/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/roundrobin/RoundRobinPerTaskLoadBalancer.java deleted file mode 100644 index bf626b4..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/roundrobin/RoundRobinPerTaskLoadBalancer.java +++ /dev/null @@ -1,96 +0,0 @@ -/* @java.file.header */ - -/* _________ _____ __________________ _____ - * __ ____/___________(_)______ /__ ____/______ ____(_)_______ - * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ - * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / - * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ - */ - -package org.gridgain.grid.spi.loadbalancing.roundrobin; - -import org.apache.ignite.cluster.*; - -import java.util.*; - -/** - * Load balancer for per-task configuration. - */ -class RoundRobinPerTaskLoadBalancer { - /** Balancing nodes. */ - private ArrayDeque<ClusterNode> nodeQueue; - - /** Jobs mapped flag. */ - private volatile boolean isMapped; - - /** Mutex. */ - private final Object mux = new Object(); - - /** - * Call back for job mapped event. - */ - void onMapped() { - isMapped = true; - } - - /** - * Gets balanced node for given topology. This implementation - * is to be used only from {@link org.apache.ignite.compute.ComputeTask#map(List, Object)} method - * and, therefore, does not need to be thread-safe. - * - * @param top Topology to pick from. - * @return Best balanced node. - */ - ClusterNode getBalancedNode(List<ClusterNode> top) { - assert top != null; - assert !top.isEmpty(); - - boolean readjust = isMapped; - - synchronized (mux) { - // Populate first time. - if (nodeQueue == null) - nodeQueue = new ArrayDeque<>(top); - - // If job has been mapped, then it means - // that it is most likely being failed over. - // In this case topology might have changed - // and we need to readjust with every apply. - if (readjust) - // Add missing nodes. - for (ClusterNode node : top) - if (!nodeQueue.contains(node)) - nodeQueue.offer(node); - - ClusterNode next = nodeQueue.poll(); - - // If jobs have been mapped, we need to make sure - // that queued node is still in topology. - if (readjust && next != null) { - while (!top.contains(next) && !nodeQueue.isEmpty()) - next = nodeQueue.poll(); - - // No nodes found and queue is empty. - if (next != null && !top.contains(next)) - return null; - } - - if (next != null) - // Add to the end. - nodeQueue.offer(next); - - return next; - } - } - - /** - * THIS METHOD IS USED ONLY FOR TESTING. - * - * @return Internal list of nodes. - */ - List<ClusterNode> getNodes() { - synchronized (mux) { - return Collections.unmodifiableList(new ArrayList<>(nodeQueue)); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b69a23cd/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/roundrobin/package.html ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/roundrobin/package.html b/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/roundrobin/package.html deleted file mode 100644 index 9909144..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/roundrobin/package.html +++ /dev/null @@ -1,15 +0,0 @@ -<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> -<!-- - @html.file.header - _________ _____ __________________ _____ - __ ____/___________(_)______ /__ ____/______ ____(_)_______ - _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ - / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / - \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ ---> -<html> -<body> - <!-- Package description. --> - Contains <b>default</b> round-robin implementation for load balancing SPI. -</body> -</html> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b69a23cd/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/weightedrandom/WeightedRandomLoadBalancingSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/weightedrandom/WeightedRandomLoadBalancingSpi.java b/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/weightedrandom/WeightedRandomLoadBalancingSpi.java deleted file mode 100644 index 0590d60..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/weightedrandom/WeightedRandomLoadBalancingSpi.java +++ /dev/null @@ -1,394 +0,0 @@ -/* @java.file.header */ - -/* _________ _____ __________________ _____ - * __ ____/___________(_)______ /__ ____/______ ____(_)_______ - * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ - * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / - * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ - */ - -package org.gridgain.grid.spi.loadbalancing.weightedrandom; - -import org.apache.ignite.*; -import org.apache.ignite.cluster.*; -import org.apache.ignite.compute.*; -import org.apache.ignite.events.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.resources.*; -import org.apache.ignite.spi.*; -import org.gridgain.grid.kernal.managers.eventstorage.*; -import org.gridgain.grid.spi.loadbalancing.*; -import org.gridgain.grid.util.typedef.*; -import org.gridgain.grid.util.typedef.internal.*; -import org.jdk8.backport.*; -import org.jetbrains.annotations.*; - -import java.util.*; -import java.util.concurrent.*; - -import static org.apache.ignite.events.IgniteEventType.*; - -/** - * Load balancing SPI that picks a random node for job execution. Note that you can - * optionally assign weights to nodes, so nodes with larger weights will end up getting - * proportionally more jobs routed to them (see {@link #setNodeWeight(int)} - * configuration property). By default all nodes get equal weight defined by - * {@link #DFLT_NODE_WEIGHT} (value is {@code 10}). - * <h1 class="header">Coding Example</h1> - * If you are using {@link org.apache.ignite.compute.ComputeTaskSplitAdapter} then load balancing logic - * is transparent to your code and is handled automatically by the adapter. - * Here is an example of how your task could look: - * <pre name="code" class="java"> - * public class MyFooBarTask extends GridComputeTaskSplitAdapter<Object, Object> { - * @Override - * protected Collection<? extends GridComputeJob> split(int gridSize, Object arg) throws GridException { - * List<MyFooBarJob> jobs = new ArrayList<MyFooBarJob>(gridSize); - * - * for (int i = 0; i < gridSize; i++) { - * jobs.add(new MyFooBarJob(arg)); - * } - * - * // Node assignment via load balancer - * // happens automatically. - * return jobs; - * } - * ... - * } - * </pre> - * If you need more fine-grained control over how some jobs within task get mapped to a node - * and use affinity load balancing for some other jobs within task, then you should use - * {@link org.apache.ignite.compute.ComputeTaskAdapter}. Here is an example of how your task will look. Note that in this - * case we manually inject load balancer and use it to pick the best node. Doing it in - * such way would allow user to map some jobs manually and for others use load balancer. - * <pre name="code" class="java"> - * public class MyFooBarTask extends GridComputeTaskAdapter<String, String> { - * // Inject load balancer. - * @GridLoadBalancerResource - * GridComputeLoadBalancer balancer; - * - * // Map jobs to grid nodes. - * public Map<? extends GridComputeJob, GridNode> map(List<GridNode> subgrid, String arg) throws GridException { - * Map<MyFooBarJob, GridNode> jobs = new HashMap<MyFooBarJob, GridNode>(subgrid.size()); - * - * // In more complex cases, you can actually do - * // more complicated assignments of jobs to nodes. - * for (int i = 0; i < subgrid.size(); i++) { - * // Pick the next best balanced node for the job. - * jobs.put(new MyFooBarJob(arg), balancer.getBalancedNode()) - * } - * - * return jobs; - * } - * - * // Aggregate results into one compound result. - * public String reduce(List<GridComputeJobResult> results) throws GridException { - * // For the purpose of this example we simply - * // concatenate string representation of every - * // job result - * StringBuilder buf = new StringBuilder(); - * - * for (GridComputeJobResult res : results) { - * // Append string representation of result - * // returned by every job. - * buf.append(res.getData().string()); - * } - * - * return buf.string(); - * } - * } - * </pre> - * <p> - * <h1 class="header">Configuration</h1> - * In order to use this load balancer, you should configure your grid instance - * to use {@code GridRandomLoadBalancingSpi} either from Spring XML file or - * directly. The following configuration parameters are supported: - * <h2 class="header">Mandatory</h2> - * This SPI has no mandatory configuration parameters. - * <h2 class="header">Optional</h2> - * The following configuration parameters are optional: - * <ul> - * <li> - * Flag that indicates whether to use weight policy or simple random policy - * (see {@link #setUseWeights(boolean)}) - * </li> - * <li> - * Weight of this node (see {@link #setNodeWeight(int)}). This parameter is ignored - * if {@link #setUseWeights(boolean)} is set to {@code false}. - * </li> - * </ul> - * Below is Java configuration example: - * <pre name="code" class="java"> - * GridWeightedRandomLoadBalancingSpi = new GridWeightedLoadBalancingSpi(); - * - * // Configure SPI to used weighted - * // random load balancing. - * spi.setUseWeights(true); - * - * // Set weight for the local node. - * spi.setWeight( *); - * - * GridConfiguration cfg = new GridConfiguration(); - * - * // Override default load balancing SPI. - * cfg.setLoadBalancingSpi(spi); - * - * // Starts grid. - * G.start(cfg); - * </pre> - * Here is how you can configure {@code GridRandomLoadBalancingSpi} using Spring XML configuration: - * <pre name="code" class="xml"> - * <property name="loadBalancingSpi"> - * <bean class="org.gridgain.grid.spi.loadBalancing.weightedrandom.GridWeightedRandomLoadBalancingSpi"> - * <property name="useWeights" value="true"/> - * <property name="nodeWeight" value="10"/> - * </bean> - * </property> - * </pre> - * <p> - * <img src="http://www.gridgain.com/images/spring-small.png"> - * <br> - * For information about Spring framework visit <a href="http://www.springframework.org/">www.springframework.org</a> - */ -@IgniteSpiMultipleInstancesSupport(true) -@IgniteSpiConsistencyChecked(optional = true) -public class WeightedRandomLoadBalancingSpi extends IgniteSpiAdapter implements LoadBalancingSpi, - WeightedRandomLoadBalancingSpiMBean { - /** Random number generator. */ - private static final Random RAND = new Random(); - - /** - * Name of node attribute used to indicate load weight of a node - * (value is {@code "gridgain.node.weight.attr.name"}). - * - * @see org.apache.ignite.cluster.ClusterNode#attributes() - */ - public static final String NODE_WEIGHT_ATTR_NAME = "gridgain.node.weight.attr.name"; - - /** Default weight assigned to every node if explicit one is not provided (value is {@code 10}). */ - public static final int DFLT_NODE_WEIGHT = 10; - - /** Grid logger. */ - @IgniteLoggerResource - private IgniteLogger log; - - /** */ - private boolean isUseWeights; - - /** Local event listener to listen to task completion events. */ - private GridLocalEventListener evtLsnr; - - /** Weight of this node. */ - private int nodeWeight = DFLT_NODE_WEIGHT; - - /** Task topologies. First pair value indicates whether or not jobs have been mapped. */ - private ConcurrentMap<IgniteUuid, IgniteBiTuple<Boolean, WeightedTopology>> taskTops = - new ConcurrentHashMap8<>(); - - /** - * Sets a flag to indicate whether node weights should be checked when - * doing random load balancing. Default value is {@code false} which - * means that node weights are disregarded for load balancing logic. - * - * @param isUseWeights If {@code true} then random load is distributed according - * to node weights. - */ - @IgniteSpiConfiguration(optional = true) - public void setUseWeights(boolean isUseWeights) { - this.isUseWeights = isUseWeights; - } - - /** {@inheritDoc} */ - @Override public boolean isUseWeights() { - return isUseWeights; - } - - /** - * Sets weight of this node. Nodes with more processing capacity - * should be assigned proportionally larger weight. Default value - * is {@link #DFLT_NODE_WEIGHT} and is equal for all nodes. - * - * @param nodeWeight Weight of this node. - */ - @IgniteSpiConfiguration(optional = true) - public void setNodeWeight(int nodeWeight) { - this.nodeWeight = nodeWeight; - } - - /** {@inheritDoc} */ - @Override public int getNodeWeight() { - return nodeWeight; - } - - /** {@inheritDoc} */ - @Override public Map<String, Object> getNodeAttributes() throws IgniteSpiException { - return F.<String, Object>asMap(createSpiAttributeName(NODE_WEIGHT_ATTR_NAME), nodeWeight); - } - - /** {@inheritDoc} */ - @Override public void spiStart(@Nullable String gridName) throws IgniteSpiException { - startStopwatch(); - - assertParameter(nodeWeight > 0, "nodeWeight > 0"); - - if (log.isDebugEnabled()) { - log.debug(configInfo("isUseWeights", isUseWeights)); - log.debug(configInfo("nodeWeight", nodeWeight)); - } - - registerMBean(gridName, this, WeightedRandomLoadBalancingSpiMBean.class); - - // Ack ok start. - if (log.isDebugEnabled()) - log.debug(startInfo()); - } - - /** {@inheritDoc} */ - @Override public void spiStop() throws IgniteSpiException { - unregisterMBean(); - - // Ack ok stop. - if (log.isDebugEnabled()) - log.debug(stopInfo()); - } - - /** {@inheritDoc} */ - @Override protected void onContextInitialized0(IgniteSpiContext spiCtx) throws IgniteSpiException { - getSpiContext().addLocalEventListener(evtLsnr = new GridLocalEventListener() { - @Override public void onEvent(IgniteEvent evt) { - assert evt instanceof IgniteTaskEvent || evt instanceof IgniteJobEvent; - - if (evt.type() == EVT_TASK_FINISHED || - evt.type() == EVT_TASK_FAILED) { - IgniteUuid sesId = ((IgniteTaskEvent)evt).taskSessionId(); - - taskTops.remove(sesId); - - if (log.isDebugEnabled()) - log.debug("Removed task topology from topology cache for session: " + sesId); - } - // We should keep topology and use cache in GridComputeTask#map() method to - // avoid O(n*n/2) complexity, after that we can drop caches. - // Here we set mapped property and later cache will be ignored - else if (evt.type() == EVT_JOB_MAPPED) { - IgniteUuid sesId = ((IgniteJobEvent)evt).taskSessionId(); - - IgniteBiTuple<Boolean, WeightedTopology> weightedTop = taskTops.get(sesId); - - if (weightedTop != null) - weightedTop.set1(true); - - if (log.isDebugEnabled()) - log.debug("Job has been mapped. Ignore cache for session: " + sesId); - } - } - }, - EVT_TASK_FAILED, - EVT_TASK_FINISHED, - EVT_JOB_MAPPED - ); - } - - /** {@inheritDoc} */ - @Override protected void onContextDestroyed0() { - if (evtLsnr != null) { - IgniteSpiContext ctx = getSpiContext(); - - if (ctx != null) - ctx.removeLocalEventListener(evtLsnr); - } - } - - /** {@inheritDoc} */ - @Override public ClusterNode getBalancedNode(ComputeTaskSession ses, List<ClusterNode> top, ComputeJob job) { - A.notNull(ses, "ses"); - A.notNull(top, "top"); - A.notNull(job, "job"); - - // Optimization for non-weighted randomization. - if (!isUseWeights) - return top.get(RAND.nextInt(top.size())); - - IgniteBiTuple<Boolean, WeightedTopology> weightedTop = taskTops.get(ses.getId()); - - // Create new cached topology if there is no one. Do not - // use cached topology after task has been mapped. - if (weightedTop == null) { - // Called from GridComputeTask#map(). Put new topology and false as not mapped yet. - taskTops.put(ses.getId(), weightedTop = F.t(false, new WeightedTopology(top))); - } - // We have topology - check if task has been mapped. - else if (weightedTop.get1()) { - // Do not use cache after GridComputeTask#map(). - return new WeightedTopology(top).pickWeightedNode(); - } - - return weightedTop.get2().pickWeightedNode(); - } - - /** - * @param node Node to get weight for. - * @return Node weight - */ - private int getWeight(ClusterNode node) { - Integer weight = (Integer)node.attribute(createSpiAttributeName(NODE_WEIGHT_ATTR_NAME)); - - if (weight != null && weight == 0) - throw new IllegalStateException("Node weight cannot be zero: " + node); - - return weight == null ? DFLT_NODE_WEIGHT : weight; - } - - /** - * Holder for weighted topology. - */ - private class WeightedTopology { - /** Total topology weight. */ - private final int totalWeight; - - /** Topology sorted by weight. */ - private final SortedMap<Integer, ClusterNode> circle = new TreeMap<>(); - - /** - * @param top Topology. - */ - WeightedTopology(Collection<ClusterNode> top) { - assert !F.isEmpty(top); - - int totalWeight = 0; - - for (ClusterNode node : top) { - totalWeight += getWeight(node); - - // Complexity of this put is O(logN). - circle.put(totalWeight, node); - } - - this.totalWeight = totalWeight; - } - - /** - * Gets weighted node in random fashion. - * - * @return Weighted node. - */ - ClusterNode pickWeightedNode() { - int weight = RAND.nextInt(totalWeight) + 1; - - SortedMap<Integer, ClusterNode> pick = circle.tailMap(weight); - - assert !pick.isEmpty(); - - return pick.get(pick.firstKey()); - } - } - - /** {@inheritDoc} */ - @Override protected List<String> getConsistentAttributeNames() { - return Collections.singletonList(createSpiAttributeName(NODE_WEIGHT_ATTR_NAME)); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(WeightedRandomLoadBalancingSpi.class, this); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b69a23cd/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/weightedrandom/WeightedRandomLoadBalancingSpiMBean.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/weightedrandom/WeightedRandomLoadBalancingSpiMBean.java b/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/weightedrandom/WeightedRandomLoadBalancingSpiMBean.java deleted file mode 100644 index c4daa9e..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/weightedrandom/WeightedRandomLoadBalancingSpiMBean.java +++ /dev/null @@ -1,37 +0,0 @@ -/* @java.file.header */ - -/* _________ _____ __________________ _____ - * __ ____/___________(_)______ /__ ____/______ ____(_)_______ - * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ - * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / - * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ - */ - -package org.gridgain.grid.spi.loadbalancing.weightedrandom; - -import org.apache.ignite.mbean.*; -import org.apache.ignite.spi.*; - -/** - * Management MBean for {@link WeightedRandomLoadBalancingSpi} SPI. - */ -@IgniteMBeanDescription("MBean that provides access to weighted random load balancing SPI configuration.") -public interface WeightedRandomLoadBalancingSpiMBean extends IgniteSpiManagementMBean { - /** - * Checks whether node weights are considered when doing - * random load balancing. - * - * @return If {@code true} then random load is distributed according - * to node weights. - */ - @IgniteMBeanDescription("Whether node weights are considered when doing random load balancing.") - public boolean isUseWeights(); - - /** - * Gets weight of this node. - * - * @return Weight of this node. - */ - @IgniteMBeanDescription("Weight of this node.") - public int getNodeWeight(); -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b69a23cd/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/weightedrandom/package.html ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/weightedrandom/package.html b/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/weightedrandom/package.html deleted file mode 100644 index 2da3d3a..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/spi/loadbalancing/weightedrandom/package.html +++ /dev/null @@ -1,15 +0,0 @@ -<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> -<!-- - @html.file.header - _________ _____ __________________ _____ - __ ____/___________(_)______ /__ ____/______ ____(_)_______ - _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ - / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / - \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ ---> -<html> -<body> - <!-- Package description. --> - Contains weighted random-base implementation for load balancing SPI. -</body> -</html> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b69a23cd/modules/core/src/test/config/io-manager-benchmark.xml ---------------------------------------------------------------------- diff --git a/modules/core/src/test/config/io-manager-benchmark.xml b/modules/core/src/test/config/io-manager-benchmark.xml index 01dc6e2..a58a89c 100644 --- a/modules/core/src/test/config/io-manager-benchmark.xml +++ b/modules/core/src/test/config/io-manager-benchmark.xml @@ -50,7 +50,7 @@ <!-- Configure load balancing SPI in the way that do not require extra event subscription. --> <property name="loadBalancingSpi"> - <bean class="org.gridgain.grid.spi.loadbalancing.roundrobin.RoundRobinLoadBalancingSpi"> + <bean class="org.apache.ignite.spi.loadbalancing.roundrobin.RoundRobinLoadBalancingSpi"> <property name="perTask" value="false"/> </bean> </property> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b69a23cd/modules/core/src/test/config/jobs-load-base.xml ---------------------------------------------------------------------- diff --git a/modules/core/src/test/config/jobs-load-base.xml b/modules/core/src/test/config/jobs-load-base.xml index 97325a2..4ee2a39 100644 --- a/modules/core/src/test/config/jobs-load-base.xml +++ b/modules/core/src/test/config/jobs-load-base.xml @@ -97,7 +97,7 @@ </property> <property name="loadBalancingSpi"> - <bean class="org.gridgain.grid.spi.loadbalancing.roundrobin.RoundRobinLoadBalancingSpi"> + <bean class="org.apache.ignite.spi.loadbalancing.roundrobin.RoundRobinLoadBalancingSpi"> <property name="perTask" value="false"/> </bean> </property> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b69a23cd/modules/core/src/test/config/load/merge-sort-base.xml ---------------------------------------------------------------------- diff --git a/modules/core/src/test/config/load/merge-sort-base.xml b/modules/core/src/test/config/load/merge-sort-base.xml index e2293f6..3613dae 100644 --- a/modules/core/src/test/config/load/merge-sort-base.xml +++ b/modules/core/src/test/config/load/merge-sort-base.xml @@ -128,7 +128,7 @@ </property> <property name="loadBalancingSpi"> - <bean class="org.gridgain.grid.spi.loadbalancing.roundrobin.RoundRobinLoadBalancingSpi"> + <bean class="org.apache.ignite.spi.loadbalancing.roundrobin.RoundRobinLoadBalancingSpi"> <property name="perTask" value="false"/> </bean> </property> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b69a23cd/modules/core/src/test/config/spring-cache-put-remove-load.xml ---------------------------------------------------------------------- diff --git a/modules/core/src/test/config/spring-cache-put-remove-load.xml b/modules/core/src/test/config/spring-cache-put-remove-load.xml index 052da54..a2eb339 100644 --- a/modules/core/src/test/config/spring-cache-put-remove-load.xml +++ b/modules/core/src/test/config/spring-cache-put-remove-load.xml @@ -42,7 +42,7 @@ </property> <property name="loadBalancingSpi"> - <bean class="org.gridgain.grid.spi.loadbalancing.roundrobin.RoundRobinLoadBalancingSpi"> + <bean class="org.apache.ignite.spi.loadbalancing.roundrobin.RoundRobinLoadBalancingSpi"> <property name="perTask" value="false"/> </bean> </property> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b69a23cd/modules/core/src/test/java/org/apache/ignite/spi/loadbalancing/adaptive/GridAdaptiveLoadBalancingSpiConfigSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/loadbalancing/adaptive/GridAdaptiveLoadBalancingSpiConfigSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/loadbalancing/adaptive/GridAdaptiveLoadBalancingSpiConfigSelfTest.java new file mode 100644 index 0000000..54479e9 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/spi/loadbalancing/adaptive/GridAdaptiveLoadBalancingSpiConfigSelfTest.java @@ -0,0 +1,26 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.apache.ignite.spi.loadbalancing.adaptive; + +import org.gridgain.testframework.junits.spi.*; + +/** + * + */ +@GridSpiTest(spi = AdaptiveLoadBalancingSpi.class, group = "LoadBalancing SPI") +public class GridAdaptiveLoadBalancingSpiConfigSelfTest + extends GridSpiAbstractConfigTest<AdaptiveLoadBalancingSpi> { + /** + * @throws Exception If failed. + */ + public void testNegativeConfig() throws Exception { + checkNegativeSpiProperty(new AdaptiveLoadBalancingSpi(), "loadProbe", null); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b69a23cd/modules/core/src/test/java/org/apache/ignite/spi/loadbalancing/adaptive/GridAdaptiveLoadBalancingSpiMultipleNodeSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/loadbalancing/adaptive/GridAdaptiveLoadBalancingSpiMultipleNodeSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/loadbalancing/adaptive/GridAdaptiveLoadBalancingSpiMultipleNodeSelfTest.java new file mode 100644 index 0000000..efef13d --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/spi/loadbalancing/adaptive/GridAdaptiveLoadBalancingSpiMultipleNodeSelfTest.java @@ -0,0 +1,87 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.apache.ignite.spi.loadbalancing.adaptive; + +import org.apache.ignite.cluster.*; +import org.apache.ignite.lang.*; +import org.gridgain.grid.*; +import org.gridgain.testframework.*; +import org.gridgain.testframework.junits.spi.*; +import java.util.*; + +/** + * Tests adaptive load balancing SPI. + */ +@GridSpiTest(spi = AdaptiveLoadBalancingSpi.class, group = "Load Balancing SPI") +public class GridAdaptiveLoadBalancingSpiMultipleNodeSelfTest extends GridSpiAbstractTest<AdaptiveLoadBalancingSpi> { + /** */ + private static final int RMT_NODE_CNT = 10; + + /** {@inheritDoc} */ + @Override protected GridSpiTestContext initSpiContext() throws Exception { + GridSpiTestContext ctx = super.initSpiContext(); + + for (int i = 0; i < RMT_NODE_CNT; i++) { + GridTestNode node = new GridTestNode(UUID.randomUUID()); + + node.setAttribute("load", (double)(i + 1)); + + ctx.addNode(node); + } + + return ctx; + } + + /** + * @return {@code True} if node weights should be considered. + */ + @GridSpiTestConfig + public AdaptiveLoadProbe getLoadProbe() { + return new AdaptiveLoadProbe() { + @Override public double getLoad(ClusterNode node, int jobsSentSinceLastUpdate) { + boolean isFirstTime = node.attribute("used") == null; + + assert isFirstTime ? jobsSentSinceLastUpdate == 0 : jobsSentSinceLastUpdate > 0; + + return (Double)node.attribute("load"); + } + }; + } + /** + * @throws Exception If failed. + */ + public void testWeights() throws Exception { + // Seal it. + List<ClusterNode> nodes = new ArrayList<>(getSpiContext().remoteNodes()); + + int[] cnts = new int[RMT_NODE_CNT]; + + // Invoke load balancer a large number of times, so statistics won't lie. + for (int i = 0; i < 50000; i++) { + GridTestNode node = (GridTestNode)getSpi().getBalancedNode(new GridTestTaskSession(IgniteUuid.randomUuid()), + nodes, new GridTestJob()); + + int idx = ((Double)node.attribute("load")).intValue() - 1; + + if (cnts[idx] == 0) + node.setAttribute("used", true); + + // Increment number of times a node was picked. + cnts[idx]++; + } + + info("Node counts: " + Arrays.toString(cnts)); + + for (int i = 0; i < cnts.length - 1; i++) { + assert cnts[i] > cnts[i + 1] : "Invalid node counts for index [idx=" + i + ", cnts[i]=" + cnts[i] + + ", cnts[i+1]=" + cnts[i + 1] + ']'; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b69a23cd/modules/core/src/test/java/org/apache/ignite/spi/loadbalancing/adaptive/GridAdaptiveLoadBalancingSpiSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/loadbalancing/adaptive/GridAdaptiveLoadBalancingSpiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/loadbalancing/adaptive/GridAdaptiveLoadBalancingSpiSelfTest.java new file mode 100644 index 0000000..2cc4555 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/spi/loadbalancing/adaptive/GridAdaptiveLoadBalancingSpiSelfTest.java @@ -0,0 +1,125 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.apache.ignite.spi.loadbalancing.adaptive; + +import org.apache.ignite.cluster.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.lang.*; +import org.gridgain.grid.*; +import org.gridgain.testframework.*; +import org.gridgain.testframework.junits.spi.*; + +import java.util.*; + +/** + * Tests adaptive load balancing SPI. + */ +@GridSpiTest(spi = AdaptiveLoadBalancingSpi.class, group = "Load Balancing SPI") +public class GridAdaptiveLoadBalancingSpiSelfTest extends GridSpiAbstractTest<AdaptiveLoadBalancingSpi> { + /** {@inheritDoc} */ + @Override protected GridSpiTestContext initSpiContext() throws Exception { + GridSpiTestContext ctx = super.initSpiContext(); + + ctx.setLocalNode(new GridTestNode(UUID.randomUUID())); + + return ctx; + } + + /** + * @return {@code True} if node weights should be considered. + */ + @GridSpiTestConfig + public AdaptiveLoadProbe getLoadProbe() { + return new AdaptiveLoadProbe() { + @Override public double getLoad(ClusterNode node, int jobsSentSinceLastUpdate) { + boolean isFirstTime = node.attribute("used") == null; + + assert isFirstTime ? jobsSentSinceLastUpdate == 0 : jobsSentSinceLastUpdate > 0; + + return (Double)node.attribute("load"); + } + }; + } + + /** + * @throws Exception If failed. + */ + @SuppressWarnings({"ObjectEquality"}) + public void testSingleNodeZeroWeight() throws Exception { + GridTestNode node = (GridTestNode)getSpiContext().nodes().iterator().next(); + + node.addAttribute("load", 0d); + + List<ClusterNode> nodes = Collections.singletonList((ClusterNode)node); + + ComputeTaskSession ses = new GridTestTaskSession(IgniteUuid.randomUuid()); + + GridTestNode pick1 = (GridTestNode)getSpi().getBalancedNode(ses, nodes, new GridTestJob()); + + pick1.setAttribute("used", true); + + assert nodes.contains(pick1); + + // Verify that same instance is returned every time. + ClusterNode pick2 = getSpi().getBalancedNode(ses, nodes, new GridTestJob()); + + assert pick1 == pick2; + } + + /** + * @throws Exception If failed. + */ + @SuppressWarnings({"ObjectEquality"}) + public void testSingleNodeSameSession() throws Exception { + GridTestNode node = (GridTestNode)getSpiContext().nodes().iterator().next(); + + node.addAttribute("load", 1d); + + List<ClusterNode> nodes = Collections.singletonList((ClusterNode)node); + + ComputeTaskSession ses = new GridTestTaskSession(IgniteUuid.randomUuid()); + + GridTestNode pick1 = (GridTestNode)getSpi().getBalancedNode(ses, nodes, new GridTestJob()); + + pick1.setAttribute("used", true); + + assert nodes.contains(pick1); + + // Verify that same instance is returned every time. + ClusterNode pick2 = getSpi().getBalancedNode(ses, nodes, new GridTestJob()); + + assert pick1 == pick2; + } + + /** + * @throws Exception If failed. + */ + @SuppressWarnings({"ObjectEquality"}) + public void testSingleNodeDifferentSession() throws Exception { + GridTestNode node = (GridTestNode)getSpiContext().nodes().iterator().next(); + + node.addAttribute("load", 2d); + + List<ClusterNode> nodes = Collections.singletonList((ClusterNode)node); + + GridTestNode pick1 = (GridTestNode)getSpi().getBalancedNode(new GridTestTaskSession(IgniteUuid.randomUuid()), + nodes, new GridTestJob()); + + pick1.setAttribute("used", true); + + assert nodes.contains(pick1); + + // Verify that same instance is returned every time. + ClusterNode pick2 = getSpi().getBalancedNode(new GridTestTaskSession(IgniteUuid.randomUuid()), nodes, + new GridTestJob()); + + assert pick1 == pick2; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b69a23cd/modules/core/src/test/java/org/apache/ignite/spi/loadbalancing/adaptive/GridAdaptiveLoadBalancingSpiStartStopSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/loadbalancing/adaptive/GridAdaptiveLoadBalancingSpiStartStopSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/loadbalancing/adaptive/GridAdaptiveLoadBalancingSpiStartStopSelfTest.java new file mode 100644 index 0000000..69c70fa --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/spi/loadbalancing/adaptive/GridAdaptiveLoadBalancingSpiStartStopSelfTest.java @@ -0,0 +1,23 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.apache.ignite.spi.loadbalancing.adaptive; + +import org.gridgain.grid.spi.*; +import org.gridgain.testframework.junits.spi.*; + +/** + * Adaptive load balancing SPI start-stop test. + */ +@SuppressWarnings({"JUnitTestCaseWithNoTests"}) +@GridSpiTest(spi = AdaptiveLoadBalancingSpi.class, group = "LoadBalancing SPI") +public class GridAdaptiveLoadBalancingSpiStartStopSelfTest extends + GridSpiStartStopAbstractTest<AdaptiveLoadBalancingSpi> { + // No configs. +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b69a23cd/modules/core/src/test/java/org/apache/ignite/spi/loadbalancing/adaptive/package.html ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/loadbalancing/adaptive/package.html b/modules/core/src/test/java/org/apache/ignite/spi/loadbalancing/adaptive/package.html new file mode 100644 index 0000000..5cad80a --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/spi/loadbalancing/adaptive/package.html @@ -0,0 +1,15 @@ +<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> +<!-- + @html.file.header + _________ _____ __________________ _____ + __ ____/___________(_)______ /__ ____/______ ____(_)_______ + _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ +--> +<html> +<body> + <!-- Package description. --> + Contains internal tests or test related classes and interfaces. +</body> +</html> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b69a23cd/modules/core/src/test/java/org/apache/ignite/spi/loadbalancing/package.html ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/loadbalancing/package.html b/modules/core/src/test/java/org/apache/ignite/spi/loadbalancing/package.html new file mode 100644 index 0000000..5cad80a --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/spi/loadbalancing/package.html @@ -0,0 +1,15 @@ +<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> +<!-- + @html.file.header + _________ _____ __________________ _____ + __ ____/___________(_)______ /__ ____/______ ____(_)_______ + _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ +--> +<html> +<body> + <!-- Package description. --> + Contains internal tests or test related classes and interfaces. +</body> +</html> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b69a23cd/modules/core/src/test/java/org/apache/ignite/spi/loadbalancing/roundrobin/GridRoundRobinLoadBalancingNotPerTaskMultithreadedSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/loadbalancing/roundrobin/GridRoundRobinLoadBalancingNotPerTaskMultithreadedSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/loadbalancing/roundrobin/GridRoundRobinLoadBalancingNotPerTaskMultithreadedSelfTest.java new file mode 100644 index 0000000..544da32 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/spi/loadbalancing/roundrobin/GridRoundRobinLoadBalancingNotPerTaskMultithreadedSelfTest.java @@ -0,0 +1,115 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.apache.ignite.spi.loadbalancing.roundrobin; + +import org.apache.ignite.cluster.*; +import org.apache.ignite.compute.*; +import org.apache.ignite.lang.*; +import org.gridgain.grid.*; +import org.gridgain.testframework.*; +import org.gridgain.testframework.junits.spi.*; + +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +/** + * Multithreaded tests for global load balancer. + */ +@GridSpiTest(spi = RoundRobinLoadBalancingSpi.class, group = "Load Balancing SPI") +public class GridRoundRobinLoadBalancingNotPerTaskMultithreadedSelfTest + extends GridSpiAbstractTest<RoundRobinLoadBalancingSpi> { + /** Thread count. */ + public static final int THREAD_CNT = 8; + + /** Per-thread iteration count. */ + public static final int ITER_CNT = 4_000_000; + + /** + * @return Per-task configuration parameter. + */ + @GridSpiTestConfig + public boolean getPerTask() { + return false; + } + + /** {@inheritDoc} */ + @Override protected GridSpiTestContext initSpiContext() throws Exception { + GridSpiTestContext spiCtx = super.initSpiContext(); + + spiCtx.createLocalNode(); + spiCtx.createRemoteNodes(10); + + return spiCtx; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + assert !getSpi().isPerTask() : "Invalid SPI configuration."; + } + + /** + * + * @throws Exception If failed. + */ + public void testMultipleTaskSessionsMultithreaded() throws Exception { + final RoundRobinLoadBalancingSpi spi = getSpi(); + + final List<ClusterNode> allNodes = (List<ClusterNode>)getSpiContext().nodes(); + + GridTestUtils.runMultiThreaded(new Callable<Object>() { + @Override public Object call() throws Exception { + ComputeTaskSession ses = new GridTestTaskSession(IgniteUuid.randomUuid()); + + Map<UUID, AtomicInteger> nodeCnts = new HashMap<>(); + + for (int i = 1; i <= ITER_CNT; i++) { + ClusterNode node = spi.getBalancedNode(ses, allNodes, new GridTestJob()); + + if (!nodeCnts.containsKey(node.id())) + nodeCnts.put(node.id(), new AtomicInteger(1)); + else + nodeCnts.get(node.id()).incrementAndGet(); + } + + int predictCnt = ITER_CNT / allNodes.size(); + + // Consider +-20% is permissible spread for single node measure. + int floor = (int)(predictCnt * 0.8); + + double avgSpread = 0; + + for (ClusterNode n : allNodes) { + int curCnt = nodeCnts.get(n.id()).intValue(); + + avgSpread += Math.abs(predictCnt - curCnt); + + String msg = "Node stats [id=" + n.id() + ", cnt=" + curCnt + ", floor=" + floor + + ", predictCnt=" + predictCnt + ']'; + + info(msg); + + assertTrue(msg, curCnt >= floor); + } + + avgSpread /= allNodes.size(); + + avgSpread = 100.0 * avgSpread / predictCnt; + + info("Average spread for " + allNodes.size() + " nodes is " + avgSpread + " percents"); + + // Consider +-10% is permissible average spread for all nodes. + assertTrue("Average spread is too big: " + avgSpread, avgSpread <= 10); + + return null; + } + }, THREAD_CNT, "balancer-test-worker"); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b69a23cd/modules/core/src/test/java/org/apache/ignite/spi/loadbalancing/roundrobin/GridRoundRobinLoadBalancingSpiLocalNodeSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/loadbalancing/roundrobin/GridRoundRobinLoadBalancingSpiLocalNodeSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/loadbalancing/roundrobin/GridRoundRobinLoadBalancingSpiLocalNodeSelfTest.java new file mode 100644 index 0000000..c546c1f --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/spi/loadbalancing/roundrobin/GridRoundRobinLoadBalancingSpiLocalNodeSelfTest.java @@ -0,0 +1,44 @@ +/* @java.file.header */ + +/* _________ _____ __________________ _____ + * __ ____/___________(_)______ /__ ____/______ ____(_)_______ + * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \ + * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / / + * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/ + */ + +package org.apache.ignite.spi.loadbalancing.roundrobin; + +import org.apache.ignite.cluster.*; +import org.apache.ignite.lang.*; +import org.gridgain.grid.*; +import org.gridgain.testframework.junits.spi.*; +import java.util.*; + +/** + * Tests Round Robin load balancing for single node. + */ +@GridSpiTest(spi = RoundRobinLoadBalancingSpi.class, group = "Load Balancing SPI", triggerDiscovery = true) +public class GridRoundRobinLoadBalancingSpiLocalNodeSelfTest extends + GridSpiAbstractTest<RoundRobinLoadBalancingSpi> { + /** + * @throws Exception If failed. + */ + @SuppressWarnings({"ObjectEquality"}) + public void testLocalNode() throws Exception { + assert getDiscoverySpi().getRemoteNodes().isEmpty(); + + ClusterNode locNode = getDiscoverySpi().getLocalNode(); + + ClusterNode node = getSpi().getBalancedNode(new GridTestTaskSession(IgniteUuid.randomUuid()), + Collections.singletonList(locNode), new GridTestJob()); + + assert node == locNode; + + // Double check. + node = getSpi().getBalancedNode(new GridTestTaskSession(IgniteUuid.randomUuid()), + Collections.singletonList(locNode), new GridTestJob()); + + assert node == locNode; + } +}