#IGNITE-857 Added resource limit.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/ae8bcf83 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/ae8bcf83 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/ae8bcf83 Branch: refs/heads/ignite-471-2 Commit: ae8bcf83b5e14efeb21dbd541ef56c629c8e214d Parents: e320873 Author: nikolay tikhonov <ntikho...@gridgain.com> Authored: Wed May 20 11:50:39 2015 +0300 Committer: nikolay tikhonov <ntikho...@gridgain.com> Committed: Wed May 20 11:50:39 2015 +0300 ---------------------------------------------------------------------- .../apache/ignite/mesos/ClusterResources.java | 38 ++++++++- .../apache/ignite/mesos/IgniteFramework.java | 4 +- .../apache/ignite/mesos/IgniteScheduler.java | 87 +++++++++++++++----- .../ignite/mesos/IgniteSchedulerSelfTest.java | 2 +- 4 files changed, 103 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ae8bcf83/modules/mesos/src/main/java/org/apache/ignite/mesos/ClusterResources.java ---------------------------------------------------------------------- diff --git a/modules/mesos/src/main/java/org/apache/ignite/mesos/ClusterResources.java b/modules/mesos/src/main/java/org/apache/ignite/mesos/ClusterResources.java index 0a2193f..1887530 100644 --- a/modules/mesos/src/main/java/org/apache/ignite/mesos/ClusterResources.java +++ b/modules/mesos/src/main/java/org/apache/ignite/mesos/ClusterResources.java @@ -52,6 +52,18 @@ public class ClusterResources { private double nodeCnt = DEFAULT_VALUE; /** */ + public static final String IGNITE_RESOURCE_MIN_CPU_CNT_PER_NODE = "IGNITE_RESOURCE_MIN_CPU_CNT_PER_NODE"; + + /** Min memory per node. */ + private int minCpu = 2; + + /** */ + public static final String IGNITE_RESOURCE_MIN_MEMORY_PER_NODE = "IGNITE_RESOURCE_MIN_MEMORY_PER_NODE"; + + /** Min memory per node. */ + private int minMemoryCnt = 256; + + /** */ public ClusterResources() { // No-op. } @@ -85,14 +97,32 @@ public class ClusterResources { } /** + * @return min memory per node. + */ + public int minMemoryPerNode() { + return minMemoryCnt; + } + + /** + * @return min cpu count per node. + */ + public int minCpuPerNode() { + return minCpu; + } + + /** * @param config path to config file. * @return Cluster configuration. */ public static ClusterResources from(String config) { try { - Properties props = new Properties(); + Properties props = null; + + if (config != null) { + props = new Properties(); - props.load(new FileInputStream(config)); + props.load(new FileInputStream(config)); + } ClusterResources resources = new ClusterResources(); @@ -114,13 +144,13 @@ public class ClusterResources { * @return Property value. */ private static double getProperty(String name, Properties fileProps) { - if (fileProps.containsKey(name)) + if (fileProps != null && fileProps.containsKey(name)) return Double.valueOf(fileProps.getProperty(name)); String property = System.getProperty(name); if (property == null) - System.getenv(name); + property = System.getenv(name); if (property == null) return DEFAULT_VALUE; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ae8bcf83/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteFramework.java ---------------------------------------------------------------------- diff --git a/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteFramework.java b/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteFramework.java index 3d309f3..2d74f71 100644 --- a/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteFramework.java +++ b/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteFramework.java @@ -42,8 +42,8 @@ public class IgniteFramework { frameworkBuilder.setCheckpoint(true); } - // create the scheduler - final Scheduler scheduler = new IgniteScheduler(ClusterResources.from(args[1])); + // Create the scheduler. + final Scheduler scheduler = new IgniteScheduler(ClusterResources.from(null)); // create the driver MesosSchedulerDriver driver; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ae8bcf83/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteScheduler.java ---------------------------------------------------------------------- diff --git a/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteScheduler.java b/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteScheduler.java index fcbab87..9d10860 100644 --- a/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteScheduler.java +++ b/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteScheduler.java @@ -21,7 +21,6 @@ import org.apache.mesos.*; import org.slf4j.*; import java.util.*; -import java.util.concurrent.*; import java.util.concurrent.atomic.*; /** @@ -46,9 +45,6 @@ public class IgniteScheduler implements Scheduler { /** Default port range. */ public static final String DEFAULT_PORT = ":47500..47510"; - /** Min of memory required. */ - public static final int MIN_MEMORY = 256; - /** Delimiter to use in IP names. */ public static final String DELIM = ","; @@ -62,7 +58,7 @@ public class IgniteScheduler implements Scheduler { private AtomicInteger taskIdGenerator = new AtomicInteger(); /** Task on host. */ - private ConcurrentMap<String, IgniteTask> tasks = new ConcurrentHashMap<>(); + private Map<String, IgniteTask> tasks = new HashMap<>(); /** Cluster resources. */ private ClusterResources clusterLimit; @@ -82,7 +78,7 @@ public class IgniteScheduler implements Scheduler { /** {@inheritDoc} */ @Override public void reregistered(SchedulerDriver schedulerDriver, Protos.MasterInfo masterInfo) { - log.info("reregistered"); + log.info("reregistered()"); } /** {@inheritDoc} */ @@ -138,7 +134,7 @@ public class IgniteScheduler implements Scheduler { cont.setDocker(docker.build()); return Protos.TaskInfo.newBuilder() - .setName("task " + taskId.getValue()) + .setName("Ignite node " + taskId.getValue()) .setTaskId(taskId) .setSlaveId(offer.getSlaveId()) .addResources(Protos.Resource.newBuilder() @@ -153,7 +149,7 @@ public class IgniteScheduler implements Scheduler { .setCommand(Protos.CommandInfo.newBuilder() .setShell(false) .addArguments(STARTUP_SCRIPT) - .addArguments(String.valueOf(igniteTask.mem())) + .addArguments(String.valueOf((int) igniteTask.mem())) .addArguments(getAddress())) .build(); } @@ -180,13 +176,15 @@ public class IgniteScheduler implements Scheduler { * @return Ignite task description. */ private IgniteTask checkOffer(Protos.Offer offer) { - if (checkLimit(clusterLimit.instances(), tasks.size())) + // Check that limit on running nodes. + if (!checkLimit(clusterLimit.instances(), tasks.size())) return null; - double cpus = -2; - double mem = -2; - double disk = -2; + double cpus = -1; + double mem = -1; + double disk = -1; + // Collect resource on slave. for (Protos.Resource resource : offer.getResourcesList()) { if (resource.getName().equals(CPUS)) { if (resource.getType().equals(Protos.Value.Type.SCALAR)) @@ -200,17 +198,43 @@ public class IgniteScheduler implements Scheduler { else log.debug("Mem resource was not a scalar: " + resource.getType().toString()); } - else if (resource.getType().equals(Protos.Value.Type.SCALAR)) - disk = resource.getScalar().getValue(); - else - log.debug("Disk resource was not a scalar: " + resource.getType().toString()); + else if (resource.getName().equals(DISK)) + if (resource.getType().equals(Protos.Value.Type.SCALAR)) + disk = resource.getScalar().getValue(); + else + log.debug("Disk resource was not a scalar: " + resource.getType().toString()); + } + + // Check that slave satisfies min requirements. + if (cpus < clusterLimit.minCpuPerNode() && mem < clusterLimit.minMemoryPerNode() ) { + log.info("Offer not sufficient for slave request:\n" + offer.getResourcesList().toString() + + "\n" + offer.getAttributesList().toString() + + "\nRequested for slave:\n" + + " cpus: " + cpus + "\n" + + " mem: " + mem); + + return null; } - if (checkLimit(clusterLimit.memory(), mem) && - checkLimit(clusterLimit.cpus(), cpus) && - checkLimit(clusterLimit.disk(), disk) && - MIN_MEMORY <= mem) + double totalCpus = 0; + double totalMem = 0; + double totalDisk = 0; + // Collect occupied resources. + for (IgniteTask task : tasks.values()) { + totalCpus += task.cpuCores(); + totalMem += task.mem(); + totalDisk += task.disk(); + } + + cpus = clusterLimit.cpus() == ClusterResources.DEFAULT_VALUE ? cpus : + Math.min(clusterLimit.cpus() - totalCpus, cpus); + mem = clusterLimit.memory() == ClusterResources.DEFAULT_VALUE ? mem : + Math.min(clusterLimit.memory() - totalMem, mem); + disk = clusterLimit.disk() == ClusterResources.DEFAULT_VALUE ? disk : + Math.min(clusterLimit.disk() - totalDisk, disk); + + if (cpus > 0 && mem > 0) return new IgniteTask(offer.getHostname(), cpus, mem, disk); else { log.info("Offer not sufficient for slave request:\n" + offer.getResourcesList().toString() + @@ -246,7 +270,28 @@ public class IgniteScheduler implements Scheduler { switch (taskStatus.getState()) { case TASK_FAILED: case TASK_FINISHED: - tasks.remove(taskId); + synchronized (mux) { + IgniteTask failedTask = tasks.remove(taskId); + + if (failedTask != null) { + List<Protos.Request> requests = new ArrayList<>(); + + Protos.Request request = Protos.Request.newBuilder() + .addResources(Protos.Resource.newBuilder() + .setType(Protos.Value.Type.SCALAR) + .setName(MEM) + .setScalar(Protos.Value.Scalar.newBuilder().setValue(failedTask.mem()))) + .addResources(Protos.Resource.newBuilder() + .setType(Protos.Value.Type.SCALAR) + .setName(CPUS) + .setScalar(Protos.Value.Scalar.newBuilder().setValue(failedTask.cpuCores()))) + .build(); + + requests.add(request); + + schedulerDriver.requestResources(requests); + } + } break; } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ae8bcf83/modules/mesos/src/test/java/org/apache/ignite/mesos/IgniteSchedulerSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/mesos/src/test/java/org/apache/ignite/mesos/IgniteSchedulerSelfTest.java b/modules/mesos/src/test/java/org/apache/ignite/mesos/IgniteSchedulerSelfTest.java index 2c4b6ee..8f8ca8b 100644 --- a/modules/mesos/src/test/java/org/apache/ignite/mesos/IgniteSchedulerSelfTest.java +++ b/modules/mesos/src/test/java/org/apache/ignite/mesos/IgniteSchedulerSelfTest.java @@ -33,7 +33,7 @@ public class IgniteSchedulerSelfTest extends TestCase { @Override public void setUp() throws Exception { super.setUp(); - scheduler = new IgniteScheduler(); + //scheduler = new IgniteScheduler(); } /**