#IGNITE-857 Added comma separate limit.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/07a10952 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/07a10952 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/07a10952 Branch: refs/heads/ignite-868 Commit: 07a10952093f8f4c7ce432413bb582c6ab96dc26 Parents: 8deb577 Author: nikolay_tikhonov <ntikho...@gridgain.com> Authored: Thu May 28 11:15:55 2015 +0300 Committer: nikolay_tikhonov <ntikho...@gridgain.com> Committed: Thu May 28 11:15:55 2015 +0300 ---------------------------------------------------------------------- .../apache/ignite/mesos/ClusterProperties.java | 18 +++++++ .../apache/ignite/mesos/IgniteScheduler.java | 34 +++++++----- .../ignite/mesos/IgniteSchedulerSelfTest.java | 56 ++++++++++++++++++-- 3 files changed, 91 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/07a10952/modules/mesos/src/main/java/org/apache/ignite/mesos/ClusterProperties.java ---------------------------------------------------------------------- diff --git a/modules/mesos/src/main/java/org/apache/ignite/mesos/ClusterProperties.java b/modules/mesos/src/main/java/org/apache/ignite/mesos/ClusterProperties.java index 9f0b304..944735e 100644 --- a/modules/mesos/src/main/java/org/apache/ignite/mesos/ClusterProperties.java +++ b/modules/mesos/src/main/java/org/apache/ignite/mesos/ClusterProperties.java @@ -215,6 +215,13 @@ public class ClusterProperties { } /** + * Set CPU count limit. + */ + public void cpusPerNode(double cpu) { + this.cpuPerNode = cpu; + } + + /** * @return mem limit. */ public double memory() { @@ -223,6 +230,8 @@ public class ClusterProperties { /** * Set mem limit. + * + * @param mem Memory. */ public void memory(double mem) { this.mem = mem; @@ -236,6 +245,15 @@ public class ClusterProperties { } /** + * Set mem limit. + * + * @param mem Memory. + */ + public void memoryPerNode(double mem) { + this.memPerNode = mem; + } + + /** * @return disk limit. */ public double disk() { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/07a10952/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteScheduler.java ---------------------------------------------------------------------- diff --git a/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteScheduler.java b/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteScheduler.java index 17daf45..e833025 100644 --- a/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteScheduler.java +++ b/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteScheduler.java @@ -29,7 +29,7 @@ import java.util.concurrent.atomic.*; */ public class IgniteScheduler implements Scheduler { /** Cpus. */ - public static final String CPUS = "cpus"; + public static final String CPU = "cpus"; /** Mem. */ public static final String MEM = "mem"; @@ -126,14 +126,17 @@ public class IgniteScheduler implements Scheduler { .addUris(Protos.CommandInfo.URI.newBuilder() .setValue(cfgUrl)); - if (clusterProps.usersLibsUrl() != null) - builder.addUris(Protos.CommandInfo.URI.newBuilder() - .setValue(clusterProps.usersLibsUrl()) - .setExtract(true)); - else if (resourceProvider.resourceUrl() != null) { - for (String url : resourceProvider.resourceUrl()) - builder.addUris(Protos.CommandInfo.URI.newBuilder().setValue(url)); - } + // Collection user's libs. + Collection<String> usersLibs = new ArrayList<>(); + + if (clusterProps.usersLibsUrl() != null && !clusterProps.usersLibsUrl().isEmpty()) + Collections.addAll(usersLibs, clusterProps.usersLibsUrl().split(DELIM)); + + if (resourceProvider.resourceUrl() != null && !resourceProvider.resourceUrl().isEmpty()) + usersLibs.addAll(resourceProvider.resourceUrl()); + + for (String url : usersLibs) + builder.addUris(Protos.CommandInfo.URI.newBuilder().setValue(url)); String cfgName = resourceProvider.configName(); @@ -155,7 +158,7 @@ public class IgniteScheduler implements Scheduler { .setSlaveId(offer.getSlaveId()) .setCommand(builder) .addResources(Protos.Resource.newBuilder() - .setName(CPUS) + .setName(CPU) .setType(Protos.Value.Type.SCALAR) .setScalar(Protos.Value.Scalar.newBuilder().setValue(igniteTask.cpuCores()))) .addResources(Protos.Resource.newBuilder() @@ -210,7 +213,7 @@ public class IgniteScheduler implements Scheduler { // Collect resource on slave. for (Protos.Resource resource : offer.getResourcesList()) { - if (resource.getName().equals(CPUS)) { + if (resource.getName().equals(CPU)) { if (resource.getType().equals(Protos.Value.Type.SCALAR)) cpus = resource.getScalar().getValue(); else @@ -251,6 +254,13 @@ public class IgniteScheduler implements Scheduler { mem = Math.min(clusterProps.memory() - totalMem, Math.min(mem, clusterProps.memoryPerNode())); disk = Math.min(clusterProps.disk() - totalDisk, Math.min(disk, clusterProps.diskPerNode())); + if ((clusterProps.cpusPerNode() != ClusterProperties.UNLIMITED && clusterProps.cpusPerNode() != cpus) + || (clusterProps.memoryPerNode() != ClusterProperties.UNLIMITED && clusterProps.memoryPerNode() != mem)) { + log.debug("Offer not sufficient for slave request: {}", offer.getResourcesList()); + + return null; + } + if (cpus > 0 && mem > 0) return new IgniteTask(offer.getHostname(), cpus, mem, disk); else { @@ -284,7 +294,7 @@ public class IgniteScheduler implements Scheduler { .setScalar(Protos.Value.Scalar.newBuilder().setValue(failedTask.mem()))) .addResources(Protos.Resource.newBuilder() .setType(Protos.Value.Type.SCALAR) - .setName(CPUS) + .setName(CPU) .setScalar(Protos.Value.Scalar.newBuilder().setValue(failedTask.cpuCores()))) .build(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/07a10952/modules/mesos/src/test/java/org/apache/ignite/mesos/IgniteSchedulerSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/mesos/src/test/java/org/apache/ignite/mesos/IgniteSchedulerSelfTest.java b/modules/mesos/src/test/java/org/apache/ignite/mesos/IgniteSchedulerSelfTest.java index 337b47c..13855b5 100644 --- a/modules/mesos/src/test/java/org/apache/ignite/mesos/IgniteSchedulerSelfTest.java +++ b/modules/mesos/src/test/java/org/apache/ignite/mesos/IgniteSchedulerSelfTest.java @@ -71,7 +71,7 @@ public class IgniteSchedulerSelfTest extends TestCase { Protos.TaskInfo taskInfo = mock.launchedTask.iterator().next(); - assertEquals(4.0, resources(taskInfo.getResourcesList(), IgniteScheduler.CPUS)); + assertEquals(4.0, resources(taskInfo.getResourcesList(), IgniteScheduler.CPU)); assertEquals(1024.0, resources(taskInfo.getResourcesList(), IgniteScheduler.MEM)); } @@ -95,7 +95,7 @@ public class IgniteSchedulerSelfTest extends TestCase { Protos.TaskInfo taskInfo = mock.launchedTask.iterator().next(); - assertEquals(2.0, resources(taskInfo.getResourcesList(), IgniteScheduler.CPUS)); + assertEquals(2.0, resources(taskInfo.getResourcesList(), IgniteScheduler.CPU)); assertEquals(1024.0, resources(taskInfo.getResourcesList(), IgniteScheduler.MEM)); mock.clear(); @@ -130,7 +130,7 @@ public class IgniteSchedulerSelfTest extends TestCase { Protos.TaskInfo taskInfo = mock.launchedTask.iterator().next(); - assertEquals(4.0, resources(taskInfo.getResourcesList(), IgniteScheduler.CPUS)); + assertEquals(4.0, resources(taskInfo.getResourcesList(), IgniteScheduler.CPU)); assertEquals(512.0, resources(taskInfo.getResourcesList(), IgniteScheduler.MEM)); mock.clear(); @@ -168,7 +168,7 @@ public class IgniteSchedulerSelfTest extends TestCase { Protos.TaskInfo taskInfo = mock.launchedTask.iterator().next(); - totalCpu += resources(taskInfo.getResourcesList(), IgniteScheduler.CPUS); + totalCpu += resources(taskInfo.getResourcesList(), IgniteScheduler.CPU); totalMem += resources(taskInfo.getResourcesList(), IgniteScheduler.MEM); mock.clear(); @@ -254,6 +254,52 @@ public class IgniteSchedulerSelfTest extends TestCase { } /** + * @throws Exception If failed. + */ + public void testPerNode() throws Exception { + Protos.Offer offer = createOffer("hostname", 8, 1024); + + DriverMock mock = new DriverMock(); + + ClusterProperties clustProp = new ClusterProperties(); + clustProp.memoryPerNode(1024); + clustProp.cpusPerNode(2); + + scheduler.setClusterProps(clustProp); + + scheduler.resourceOffers(mock, Collections.singletonList(offer)); + + assertNotNull(mock.launchedTask); + + Protos.TaskInfo taskInfo = mock.launchedTask.iterator().next(); + + assertEquals(2.0, resources(taskInfo.getResourcesList(), IgniteScheduler.CPU)); + assertEquals(1024.0, resources(taskInfo.getResourcesList(), IgniteScheduler.MEM)); + + mock.clear(); + + offer = createOffer("hostname", 1, 2048); + + scheduler.resourceOffers(mock, Collections.singletonList(offer)); + + assertNull(mock.launchedTask); + + assertNotNull(mock.declinedOffer); + assertEquals(offer.getId(), mock.declinedOffer); + + mock.clear(); + + offer = createOffer("hostname", 4, 512); + + scheduler.resourceOffers(mock, Collections.singletonList(offer)); + + assertNull(mock.launchedTask); + + assertNotNull(mock.declinedOffer); + assertEquals(offer.getId(), mock.declinedOffer); + } + + /** * @param resourceType Resource type. * @return Value. */ @@ -280,7 +326,7 @@ public class IgniteSchedulerSelfTest extends TestCase { .setHostname(hostname) .addResources(Protos.Resource.newBuilder() .setType(Protos.Value.Type.SCALAR) - .setName(IgniteScheduler.CPUS) + .setName(IgniteScheduler.CPU) .setScalar(Protos.Value.Scalar.newBuilder().setValue(cpu).build()) .build()) .addResources(Protos.Resource.newBuilder()