#IGNITE-857 Updated discovery logic.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/55c166a6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/55c166a6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/55c166a6 Branch: refs/heads/ignite-sprint-5 Commit: 55c166a64a7f64cccf81ef9ec64cd572c11d94af Parents: 21a1514 Author: nikolay tikhonov <ntikho...@gridgain.com> Authored: Mon May 18 18:42:27 2015 +0300 Committer: nikolay tikhonov <ntikho...@gridgain.com> Committed: Mon May 18 18:42:27 2015 +0300 ---------------------------------------------------------------------- modules/mesos/pom.xml | 7 + .../apache/ignite/mesos/IgniteFramework.java | 97 +++++++ .../apache/ignite/mesos/IgniteScheduler.java | 286 +++++++++++++++++++ .../org/apache/ignite/mesos/package-info.java | 22 ++ .../ignite/messo/IgniteAmazonScheduler.java | 81 ------ .../apache/ignite/messo/IgniteFramework.java | 108 ------- .../apache/ignite/messo/IgniteScheduler.java | 243 ---------------- .../org/apache/ignite/messo/package-info.java | 22 -- .../org/apache/ignite/IgniteMesosTestSuite.java | 38 +++ .../ignite/mesos/IgniteSchedulerSelfTest.java | 165 +++++++++++ 10 files changed, 615 insertions(+), 454 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/55c166a6/modules/mesos/pom.xml ---------------------------------------------------------------------- diff --git a/modules/mesos/pom.xml b/modules/mesos/pom.xml index 4d19b11..ef73c0b 100644 --- a/modules/mesos/pom.xml +++ b/modules/mesos/pom.xml @@ -39,6 +39,13 @@ <artifactId>slf4j-api</artifactId> <version>1.7.12</version> </dependency> + + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>4.11</version> + <scope>test</scope> + </dependency> </dependencies> <build> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/55c166a6/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteFramework.java ---------------------------------------------------------------------- diff --git a/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteFramework.java b/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteFramework.java new file mode 100644 index 0000000..5c556a1 --- /dev/null +++ b/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteFramework.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.mesos; + +import com.google.protobuf.*; +import org.apache.mesos.*; + +/** + * TODO + */ +public class IgniteFramework { + /** + * @param args Args + */ + public static void main(String[] args) { + checkArgs(args); + + final int frameworkFailoverTimeout = 0; + + Protos.FrameworkInfo.Builder frameworkBuilder = Protos.FrameworkInfo.newBuilder() + .setName("IgniteFramework") + .setUser("") // Have Mesos fill in the current user. + .setFailoverTimeout(frameworkFailoverTimeout); // timeout in seconds + + if (System.getenv("MESOS_CHECKPOINT") != null) { + System.out.println("Enabling checkpoint for the framework"); + frameworkBuilder.setCheckpoint(true); + } + + // create the scheduler + final Scheduler scheduler = new IgniteScheduler(); + + // create the driver + MesosSchedulerDriver driver; + if (System.getenv("MESOS_AUTHENTICATE") != null) { + System.out.println("Enabling authentication for the framework"); + + if (System.getenv("DEFAULT_PRINCIPAL") == null) { + System.err.println("Expecting authentication principal in the environment"); + System.exit(1); + } + + if (System.getenv("DEFAULT_SECRET") == null) { + System.err.println("Expecting authentication secret in the environment"); + System.exit(1); + } + + Protos.Credential credential = Protos.Credential.newBuilder() + .setPrincipal(System.getenv("DEFAULT_PRINCIPAL")) + .setSecret(ByteString.copyFrom(System.getenv("DEFAULT_SECRET").getBytes())) + .build(); + + frameworkBuilder.setPrincipal(System.getenv("DEFAULT_PRINCIPAL")); + + driver = new MesosSchedulerDriver(scheduler, frameworkBuilder.build(), args[0], credential); + } + else { + frameworkBuilder.setPrincipal("ignite-framework-java"); + + driver = new MesosSchedulerDriver(scheduler, frameworkBuilder.build(), args[0]); + } + + int status = driver.run() == Protos.Status.DRIVER_STOPPED ? 0 : 1; + + // Ensure that the driver process terminates. + driver.stop(); + + System.exit(status); + } + + /** + * Check input arguments. + * + * @param args Arguments. + */ + private static void checkArgs(String[] args) { + if (args.length == 0) + throw new IllegalArgumentException("Illegal arguments."); + + // TODO: add more + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/55c166a6/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteScheduler.java ---------------------------------------------------------------------- diff --git a/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteScheduler.java b/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteScheduler.java new file mode 100644 index 0000000..7b5623b --- /dev/null +++ b/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteScheduler.java @@ -0,0 +1,286 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.mesos; + +import org.apache.mesos.*; +import org.slf4j.*; + +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +/** + * TODO + */ +public class IgniteScheduler implements Scheduler { + /** Docker image name. */ + public static final String IMAGE = "apacheignite/ignite-docker"; + + /** Startup sctipt path. */ + public static final String STARTUP_SCRIPT = "/home/ignite/startup.sh"; + + /** Cpus. */ + public static final String CPUS = "cpus"; + + /** Mem. */ + public static final String MEM = "mem"; + + /** Default port range. */ + public static final String DEFAULT_PORT = ":47500..47510"; + + /** Delimiter to use in IP names. */ + public static final String DELIM = ","; + + /** ID generator. */ + private AtomicInteger taskIdGenerator = new AtomicInteger(); + + /** Logger. */ + private static final Logger log = LoggerFactory.getLogger(IgniteScheduler.class); + + /** Min of memory required. */ + public static final int MIN_MEMORY = 256; + + /** Mutex. */ + private static final Object mux = new Object(); + + /** Task on host. */ + private ConcurrentMap<String, String> tasks = new ConcurrentHashMap<>(); + + /** {@inheritDoc} */ + @Override public void registered(SchedulerDriver schedulerDriver, Protos.FrameworkID frameworkID, + Protos.MasterInfo masterInfo) { + log.info("registered() master={}:{}, framework={}", masterInfo.getIp(), masterInfo.getPort(), frameworkID); + } + + /** {@inheritDoc} */ + @Override public void reregistered(SchedulerDriver schedulerDriver, Protos.MasterInfo masterInfo) { + log.info("reregistered"); + } + + /** {@inheritDoc} */ + @Override public void resourceOffers(SchedulerDriver schedulerDriver, List<Protos.Offer> offers) { + synchronized (mux) { + log.info("resourceOffers() with {} offers", offers.size()); + + for (Protos.Offer offer : offers) { + Tuple<Double, Double> cpuMem = checkOffer(offer); + + // Decline offer which doesn't match by mem or cpu. + if (cpuMem == null) { + schedulerDriver.declineOffer(offer.getId()); + + continue; + } + + // Generate a unique task ID. + Protos.TaskID taskId = Protos.TaskID.newBuilder() + .setValue(Integer.toString(taskIdGenerator.incrementAndGet())).build(); + + log.info("Launching task {}", taskId.getValue()); + + // Create task to run. + Protos.TaskInfo task = createTask(offer, cpuMem, taskId); + + schedulerDriver.launchTasks(Collections.singletonList(offer.getId()), + Collections.singletonList(task), + Protos.Filters.newBuilder().setRefuseSeconds(1).build()); + + tasks.put(taskId.getValue(), offer.getHostname()); + } + } + } + + /** + * Create Task. + * + * @param offer Offer. + * @param cpuMem Cpu and mem on slave. + * @param taskId Task id. + * @return Task. + */ + protected Protos.TaskInfo createTask(Protos.Offer offer, Tuple<Double, Double> cpuMem, Protos.TaskID taskId) { + // Docker image info. + Protos.ContainerInfo.DockerInfo.Builder docker = Protos.ContainerInfo.DockerInfo.newBuilder() + .setImage(IMAGE) + .setNetwork(Protos.ContainerInfo.DockerInfo.Network.HOST); + + // Container info. + Protos.ContainerInfo.Builder cont = Protos.ContainerInfo.newBuilder(); + cont.setType(Protos.ContainerInfo.Type.DOCKER); + cont.setDocker(docker.build()); + + return Protos.TaskInfo.newBuilder() + .setName("task " + taskId.getValue()) + .setTaskId(taskId) + .setSlaveId(offer.getSlaveId()) + .addResources(Protos.Resource.newBuilder() + .setName(CPUS) + .setType(Protos.Value.Type.SCALAR) + .setScalar(Protos.Value.Scalar.newBuilder().setValue(cpuMem.get1()))) + .addResources(Protos.Resource.newBuilder() + .setName(MEM) + .setType(Protos.Value.Type.SCALAR) + .setScalar(Protos.Value.Scalar.newBuilder().setValue(cpuMem.get2()))) + .setContainer(cont) + .setCommand(Protos.CommandInfo.newBuilder() + .setShell(false) + .addArguments(STARTUP_SCRIPT) + .addArguments(String.valueOf(cpuMem.get2().intValue())) + .addArguments(getAddress())) + .build(); + } + + /** + * @return Address running nodes. + */ + protected String getAddress() { + if (tasks.isEmpty()) + return ""; + + StringBuilder sb = new StringBuilder(); + + for (String host : tasks.values()) + sb.append(host).append(DEFAULT_PORT).append(DELIM); + + return sb.substring(0, sb.length() - 1); + } + + /** + * Check slave resources and return resources infos. + * + * @param offer Offer request. + * @return Pair where first is cpus, second is memory. + */ + private Tuple<Double, Double> checkOffer(Protos.Offer offer) { + double cpus = -1; + double mem = -1; + + for (Protos.Resource resource : offer.getResourcesList()) { + if (resource.getName().equals(CPUS)) { + if (resource.getType().equals(Protos.Value.Type.SCALAR)) + cpus = resource.getScalar().getValue(); + else + log.debug("Cpus resource was not a scalar: " + resource.getType().toString()); + } + else if (resource.getName().equals(MEM)) { + if (resource.getType().equals(Protos.Value.Type.SCALAR)) + mem = resource.getScalar().getValue(); + else + log.debug("Mem resource was not a scalar: " + resource.getType().toString()); + } + else if (resource.getName().equals("disk")) + log.debug("Ignoring disk resources from offer"); + } + + if (cpus < 0) + log.debug("No cpus resource present"); + if (mem < 0) + log.debug("No mem resource present"); + + if (cpus >= 1 && MIN_MEMORY <= mem) + return new Tuple<>(cpus, mem); + else { + log.info("Offer not sufficient for slave request:\n" + offer.getResourcesList().toString() + + "\n" + offer.getAttributesList().toString() + + "\nRequested for slave:\n" + + " cpus: " + cpus + "\n" + + " mem: " + mem); + + return null; + } + } + + /** {@inheritDoc} */ + @Override public void offerRescinded(SchedulerDriver schedulerDriver, Protos.OfferID offerID) { + log.info("offerRescinded()"); + } + + /** {@inheritDoc} */ + @Override public void statusUpdate(SchedulerDriver schedulerDriver, Protos.TaskStatus taskStatus) { + final String taskId = taskStatus.getTaskId().getValue(); + + log.info("statusUpdate() task {} is in state {}", taskId, taskStatus.getState()); + + switch (taskStatus.getState()) { + case TASK_FAILED: + case TASK_FINISHED: + tasks.remove(taskId); + break; + } + } + + /** {@inheritDoc} */ + @Override public void frameworkMessage(SchedulerDriver schedulerDriver, Protos.ExecutorID executorID, + Protos.SlaveID slaveID, byte[] bytes) { + log.info("frameworkMessage()"); + } + + /** {@inheritDoc} */ + @Override public void disconnected(SchedulerDriver schedulerDriver) { + log.info("disconnected()"); + } + + /** {@inheritDoc} */ + @Override public void slaveLost(SchedulerDriver schedulerDriver, Protos.SlaveID slaveID) { + log.info("slaveLost()"); + } + + /** {@inheritDoc} */ + @Override public void executorLost(SchedulerDriver schedulerDriver, Protos.ExecutorID executorID, + Protos.SlaveID slaveID, int i) { + log.info("executorLost()"); + } + + /** {@inheritDoc} */ + @Override public void error(SchedulerDriver schedulerDriver, String s) { + log.error("error() {}", s); + } + + /** + * Tuple. + */ + public static class Tuple<A, B> { + /** */ + private final A val1; + + /** */ + private final B val2; + + /** + * + */ + public Tuple(A val1, B val2) { + this.val1 = val1; + this.val2 = val2; + } + + /** + * @return val1 + */ + public A get1() { + return val1; + } + + /** + * @return val2 + */ + public B get2() { + return val2; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/55c166a6/modules/mesos/src/main/java/org/apache/ignite/mesos/package-info.java ---------------------------------------------------------------------- diff --git a/modules/mesos/src/main/java/org/apache/ignite/mesos/package-info.java b/modules/mesos/src/main/java/org/apache/ignite/mesos/package-info.java new file mode 100644 index 0000000..49ddf86 --- /dev/null +++ b/modules/mesos/src/main/java/org/apache/ignite/mesos/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * <!-- Package description. --> + * Messo Framework. + */ +package org.apache.ignite.mesos; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/55c166a6/modules/mesos/src/main/java/org/apache/ignite/messo/IgniteAmazonScheduler.java ---------------------------------------------------------------------- diff --git a/modules/mesos/src/main/java/org/apache/ignite/messo/IgniteAmazonScheduler.java b/modules/mesos/src/main/java/org/apache/ignite/messo/IgniteAmazonScheduler.java deleted file mode 100644 index b11e7c6..0000000 --- a/modules/mesos/src/main/java/org/apache/ignite/messo/IgniteAmazonScheduler.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.messo; - -import org.apache.mesos.*; - -/** - * TODO - */ -public class IgniteAmazonScheduler extends IgniteScheduler { - /** */ - public static final String AMAZON = "amazon"; - - /** Amazon credential. */ - private final String accessKey, secretKey; - - /** - * Constructor. - * - * @param accessKey Access key. - * @param secretKey Secret key. - */ - public IgniteAmazonScheduler(String accessKey, String secretKey) { - assert accessKey != null; - assert secretKey != null; - - this.accessKey = accessKey; - this.secretKey = secretKey; - } - - /** {@inheritDoc} */ - @Override protected Protos.TaskInfo createTask(Protos.Offer offer, Tuple<Double, Double> cpuMem, - Protos.TaskID taskId) { - // Docker image info. - Protos.ContainerInfo.DockerInfo.Builder docker = Protos.ContainerInfo.DockerInfo.newBuilder() - .setImage(IMAGE) - .setNetwork(Protos.ContainerInfo.DockerInfo.Network.HOST); - - // Container info. - Protos.ContainerInfo.Builder cont = Protos.ContainerInfo.newBuilder(); - cont.setType(Protos.ContainerInfo.Type.DOCKER); - cont.setDocker(docker.build()); - - return Protos.TaskInfo.newBuilder() - .setName("task " + taskId.getValue()) - .setTaskId(taskId) - .setSlaveId(offer.getSlaveId()) - .addResources(Protos.Resource.newBuilder() - .setName(CPUS) - .setType(Protos.Value.Type.SCALAR) - .setScalar(Protos.Value.Scalar.newBuilder().setValue(cpuMem.get1()))) - .addResources(Protos.Resource.newBuilder() - .setName(MEM) - .setType(Protos.Value.Type.SCALAR) - .setScalar(Protos.Value.Scalar.newBuilder().setValue(cpuMem.get2()))) - .setContainer(cont) - .setCommand(Protos.CommandInfo.newBuilder() - .setShell(false) - .addArguments(STARTUP_SCRIPT) - .addArguments(String.valueOf(cpuMem.get2().intValue())) - .addArguments(AMAZON) - .addArguments(accessKey) - .addArguments(secretKey)) - .build(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/55c166a6/modules/mesos/src/main/java/org/apache/ignite/messo/IgniteFramework.java ---------------------------------------------------------------------- diff --git a/modules/mesos/src/main/java/org/apache/ignite/messo/IgniteFramework.java b/modules/mesos/src/main/java/org/apache/ignite/messo/IgniteFramework.java deleted file mode 100644 index dfc3eb2..0000000 --- a/modules/mesos/src/main/java/org/apache/ignite/messo/IgniteFramework.java +++ /dev/null @@ -1,108 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.messo; - -import com.google.protobuf.*; -import org.apache.mesos.*; - -/** - * TODO - */ -public class IgniteFramework { - /** - * @param args Args - */ - public static void main(String[] args) { - checkArgs(args); - - final int frameworkFailoverTimeout = 0; - - Protos.FrameworkInfo.Builder frameworkBuilder = Protos.FrameworkInfo.newBuilder() - .setName("IgniteFramework") - .setUser("") // Have Mesos fill in the current user. - .setFailoverTimeout(frameworkFailoverTimeout); // timeout in seconds - - if (System.getenv("MESOS_CHECKPOINT") != null) { - System.out.println("Enabling checkpoint for the framework"); - frameworkBuilder.setCheckpoint(true); - } - - // create the scheduler - final Scheduler scheduler = createIgniteScheduler(args); - - // create the driver - MesosSchedulerDriver driver; - if (System.getenv("MESOS_AUTHENTICATE") != null) { - System.out.println("Enabling authentication for the framework"); - - if (System.getenv("DEFAULT_PRINCIPAL") == null) { - System.err.println("Expecting authentication principal in the environment"); - System.exit(1); - } - - if (System.getenv("DEFAULT_SECRET") == null) { - System.err.println("Expecting authentication secret in the environment"); - System.exit(1); - } - - Protos.Credential credential = Protos.Credential.newBuilder() - .setPrincipal(System.getenv("DEFAULT_PRINCIPAL")) - .setSecret(ByteString.copyFrom(System.getenv("DEFAULT_SECRET").getBytes())) - .build(); - - frameworkBuilder.setPrincipal(System.getenv("DEFAULT_PRINCIPAL")); - - driver = new MesosSchedulerDriver(scheduler, frameworkBuilder.build(), args[0], credential); - } - else { - frameworkBuilder.setPrincipal("ignite-framework-java"); - - driver = new MesosSchedulerDriver(scheduler, frameworkBuilder.build(), args[0]); - } - - int status = driver.run() == Protos.Status.DRIVER_STOPPED ? 0 : 1; - - // Ensure that the driver process terminates. - driver.stop(); - - System.exit(status); - } - - /** - * @param args Arguments. - * @return Ignite scheduler. - */ - private static IgniteScheduler createIgniteScheduler(String args[]) { - if (args.length >= 3 && args[1].equals(IgniteAmazonScheduler.AMAZON)) - return new IgniteAmazonScheduler(args[2], args[3]); - else - return new IgniteScheduler(); - } - - /** - * Check input arguments. - * - * @param args Arguments. - */ - private static void checkArgs(String[] args) { - if (args.length == 0) - throw new IllegalArgumentException("Illegal arguments."); - - // TODO: add more - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/55c166a6/modules/mesos/src/main/java/org/apache/ignite/messo/IgniteScheduler.java ---------------------------------------------------------------------- diff --git a/modules/mesos/src/main/java/org/apache/ignite/messo/IgniteScheduler.java b/modules/mesos/src/main/java/org/apache/ignite/messo/IgniteScheduler.java deleted file mode 100644 index c8b577f..0000000 --- a/modules/mesos/src/main/java/org/apache/ignite/messo/IgniteScheduler.java +++ /dev/null @@ -1,243 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.messo; - -import org.apache.mesos.*; -import org.slf4j.*; - -import java.util.*; -import java.util.concurrent.atomic.*; - -/** - * TODO - */ -public class IgniteScheduler implements Scheduler { - /** Docker image name. */ - public static final String IMAGE = "apacheignite/ignite-docker"; - - /** Startup sctipt path. */ - public static final String STARTUP_SCRIPT = "/home/ignite/startup.sh"; - - /** Cpus. */ - public static final String CPUS = "cpus"; - - /** Mem. */ - public static final String MEM = "mem"; - - /** ID generator. */ - private AtomicInteger taskIdGenerator = new AtomicInteger(); - - /** Logger. */ - private static final Logger log = LoggerFactory.getLogger(IgniteScheduler.class); - - /** Min of memory required. */ - public static final int MIN_MEMORY = 256; - - /** {@inheritDoc} */ - @Override public void registered(SchedulerDriver schedulerDriver, Protos.FrameworkID frameworkID, - Protos.MasterInfo masterInfo) { - log.info("registered() master={}:{}, framework={}", masterInfo.getIp(), masterInfo.getPort(), frameworkID); - } - - /** {@inheritDoc} */ - @Override public void reregistered(SchedulerDriver schedulerDriver, Protos.MasterInfo masterInfo) { - log.info("reregistered"); - } - - /** {@inheritDoc} */ - @Override public void resourceOffers(SchedulerDriver schedulerDriver, List<Protos.Offer> offers) { - log.info("resourceOffers() with {} offers", offers.size()); - - for (Protos.Offer offer : offers) { - Tuple<Double, Double> cpuMem = checkOffer(offer); - - // Decline offer which doesn't match by mem or cpu. - if (cpuMem == null) { - schedulerDriver.declineOffer(offer.getId()); - - continue; - } - - // Generate a unique task ID. - Protos.TaskID taskId = Protos.TaskID.newBuilder() - .setValue(Integer.toString(taskIdGenerator.incrementAndGet())).build(); - - log.info("Launching task {}", taskId.getValue()); - - // Create task to run. - Protos.TaskInfo task = createTask(offer, cpuMem, taskId); - - schedulerDriver.launchTasks(Collections.singletonList(offer.getId()), - Collections.singletonList(task), - Protos.Filters.newBuilder().setRefuseSeconds(1).build()); - } - } - - /** - * Create Task. - * @param offer Offer. - * @param cpuMem Cpu and mem on slave. - * @param taskId Task id. - * @return Task. - */ - protected Protos.TaskInfo createTask(Protos.Offer offer, Tuple<Double, Double> cpuMem, Protos.TaskID taskId) { - // Docker image info. - Protos.ContainerInfo.DockerInfo.Builder docker = Protos.ContainerInfo.DockerInfo.newBuilder() - .setImage(IMAGE) - .setNetwork(Protos.ContainerInfo.DockerInfo.Network.HOST); - - // Container info. - Protos.ContainerInfo.Builder cont = Protos.ContainerInfo.newBuilder(); - cont.setType(Protos.ContainerInfo.Type.DOCKER); - cont.setDocker(docker.build()); - - return Protos.TaskInfo.newBuilder() - .setName("task " + taskId.getValue()) - .setTaskId(taskId) - .setSlaveId(offer.getSlaveId()) - .addResources(Protos.Resource.newBuilder() - .setName(CPUS) - .setType(Protos.Value.Type.SCALAR) - .setScalar(Protos.Value.Scalar.newBuilder().setValue(cpuMem.get2()))) - .addResources(Protos.Resource.newBuilder() - .setName(MEM) - .setType(Protos.Value.Type.SCALAR) - .setScalar(Protos.Value.Scalar.newBuilder().setValue(cpuMem.get2()))) - .setContainer(cont) - .setCommand(Protos.CommandInfo.newBuilder() - .setShell(false) - .addArguments(STARTUP_SCRIPT) - .addArguments(String.valueOf(cpuMem.get2().intValue()))) - .build(); - } - - /** - * Check slave resources and return resources infos. - * - * @param offer Offer request. - * @return Pair where first is cpus, second is memory. - */ - private Tuple<Double, Double> checkOffer(Protos.Offer offer) { - double cpus = -1; - double mem = -1; - - for (Protos.Resource resource : offer.getResourcesList()) { - if (resource.getName().equals(CPUS)) { - if (resource.getType().equals(Protos.Value.Type.SCALAR)) - cpus = resource.getScalar().getValue(); - else - log.debug("Cpus resource was not a scalar: " + resource.getType().toString()); - } - else if (resource.getName().equals(MEM)) { - if (resource.getType().equals(Protos.Value.Type.SCALAR)) - mem = resource.getScalar().getValue(); - else - log.debug("Mem resource was not a scalar: " + resource.getType().toString()); - } - else if (resource.getName().equals("disk")) - log.debug("Ignoring disk resources from offer"); - } - - if (cpus < 0) - log.debug("No cpus resource present"); - if (mem < 0) - log.debug("No mem resource present"); - - if (cpus >= 1 && MIN_MEMORY <= mem) - return new Tuple<>(cpus, mem); - else { - log.info("Offer not sufficient for slave request:\n" + offer.getResourcesList().toString() + - "\n" + offer.getAttributesList().toString() + - "\nRequested for slave:\n" + - " cpus: " + cpus + "\n" + - " mem: " + mem); - - return null; - } - } - - /** {@inheritDoc} */ - @Override public void offerRescinded(SchedulerDriver schedulerDriver, Protos.OfferID offerID) { - log.info("offerRescinded()"); - } - - /** {@inheritDoc} */ - @Override public void statusUpdate(SchedulerDriver schedulerDriver, Protos.TaskStatus taskStatus) { - log.info("statusUpdate() task {} ", taskStatus); - } - - /** {@inheritDoc} */ - @Override public void frameworkMessage(SchedulerDriver schedulerDriver, Protos.ExecutorID executorID, - Protos.SlaveID slaveID, byte[] bytes) { - log.info("frameworkMessage()"); - } - - /** {@inheritDoc} */ - @Override public void disconnected(SchedulerDriver schedulerDriver) { - log.info("disconnected()"); - } - - /** {@inheritDoc} */ - @Override public void slaveLost(SchedulerDriver schedulerDriver, Protos.SlaveID slaveID) { - log.info("slaveLost()"); - } - - /** {@inheritDoc} */ - @Override public void executorLost(SchedulerDriver schedulerDriver, Protos.ExecutorID executorID, - Protos.SlaveID slaveID, int i) { - log.info("executorLost()"); - } - - /** {@inheritDoc} */ - @Override public void error(SchedulerDriver schedulerDriver, String s) { - log.error("error() {}", s); - } - - /** - * Tuple. - */ - public static class Tuple<A, B> { - /** */ - private final A val1; - - /** */ - private final B val2; - - /** - * - */ - public Tuple(A val1, B val2) { - this.val1 = val1; - this.val2 = val2; - } - - /** - * @return val1 - */ - public A get1() { - return val1; - } - - /** - * @return val2 - */ - public B get2() { - return val2; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/55c166a6/modules/mesos/src/main/java/org/apache/ignite/messo/package-info.java ---------------------------------------------------------------------- diff --git a/modules/mesos/src/main/java/org/apache/ignite/messo/package-info.java b/modules/mesos/src/main/java/org/apache/ignite/messo/package-info.java deleted file mode 100644 index c48ca38..0000000 --- a/modules/mesos/src/main/java/org/apache/ignite/messo/package-info.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * <!-- Package description. --> - * Messo Framework. - */ -package org.apache.ignite.messo; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/55c166a6/modules/mesos/src/test/java/org/apache/ignite/IgniteMesosTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/mesos/src/test/java/org/apache/ignite/IgniteMesosTestSuite.java b/modules/mesos/src/test/java/org/apache/ignite/IgniteMesosTestSuite.java new file mode 100644 index 0000000..f1bcb90 --- /dev/null +++ b/modules/mesos/src/test/java/org/apache/ignite/IgniteMesosTestSuite.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite; + +import junit.framework.*; +import org.apache.ignite.mesos.*; + +/** + * Apache Mesos integration tests. + */ +public class IgniteMesosTestSuite extends TestSuite { + /** + * @return Test suite. + * @throws Exception Thrown in case of the failure. + */ + public static TestSuite suite() throws Exception { + TestSuite suite = new TestSuite("Apache Mesos Integration Test Suite"); + + suite.addTest(new TestSuite(IgniteSchedulerSelfTest.class)); + + return suite; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/55c166a6/modules/mesos/src/test/java/org/apache/ignite/mesos/IgniteSchedulerSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/mesos/src/test/java/org/apache/ignite/mesos/IgniteSchedulerSelfTest.java b/modules/mesos/src/test/java/org/apache/ignite/mesos/IgniteSchedulerSelfTest.java new file mode 100644 index 0000000..5534b2c --- /dev/null +++ b/modules/mesos/src/test/java/org/apache/ignite/mesos/IgniteSchedulerSelfTest.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.mesos; + +import junit.framework.*; +import org.apache.mesos.*; + +import java.util.*; + +/** + * Scheduler tests. + */ +public class IgniteSchedulerSelfTest extends TestCase { + /** */ + private IgniteScheduler scheduler; + + /** {@inheritDoc} */ + @Override public void setUp() throws Exception { + super.setUp(); + + scheduler = new IgniteScheduler(); + } + + /** + * @throws Exception If failed. + */ + public void testHostRegister() throws Exception { + //Protos.Offer offer = createOffer("hostname", 4, 1024); + + //scheduler.resourceOffers(DriverStub.INSTANCE, Lists.); + } + + private Protos.Offer createOffer(String hostname, double cpu, double mem) { + return Protos.Offer.newBuilder() + .setSlaveId(Protos.SlaveID.newBuilder().setValue("1").build()) + .setHostname(hostname) + .addResources(Protos.Resource.newBuilder() + .setName(IgniteScheduler.CPUS) + .setScalar(Protos.Value.Scalar.newBuilder().setValue(cpu).build()) + .build()) + .addResources(Protos.Resource.newBuilder() + .setName(IgniteScheduler.MEM) + .setScalar(Protos.Value.Scalar.newBuilder().setValue(mem).build()) + .build()) + .build(); + } + + /** + * No-op implementation. + */ + public static class DriverStub implements SchedulerDriver { + private static final DriverStub INSTANCE = new DriverStub(); + + /** {@inheritDoc} */ + @Override public Protos.Status start() { + return null; + } + + /** {@inheritDoc} */ + @Override public Protos.Status stop(boolean failover) { + return null; + } + + /** {@inheritDoc} */ + @Override public Protos.Status stop() { + return null; + } + + /** {@inheritDoc} */ + @Override public Protos.Status abort() { + return null; + } + + /** {@inheritDoc} */ + @Override public Protos.Status join() { + return null; + } + + /** {@inheritDoc} */ + @Override public Protos.Status run() { + return null; + } + + /** {@inheritDoc} */ + @Override public Protos.Status requestResources(Collection<Protos.Request> requests) { + return null; + } + + /** {@inheritDoc} */ + @Override public Protos.Status launchTasks(Collection<Protos.OfferID> offerIds, + Collection<Protos.TaskInfo> tasks, Protos.Filters filters) { + return null; + } + + /** {@inheritDoc} */ + @Override public Protos.Status launchTasks(Collection<Protos.OfferID> offerIds, + Collection<Protos.TaskInfo> tasks) { + return null; + } + + /** {@inheritDoc} */ + @Override public Protos.Status launchTasks(Protos.OfferID offerId, Collection<Protos.TaskInfo> tasks, + Protos.Filters filters) { + return null; + } + + /** {@inheritDoc} */ + @Override public Protos.Status launchTasks(Protos.OfferID offerId, Collection<Protos.TaskInfo> tasks) { + return null; + } + + /** {@inheritDoc} */ + @Override public Protos.Status killTask(Protos.TaskID taskId) { + return null; + } + + /** {@inheritDoc} */ + @Override public Protos.Status declineOffer(Protos.OfferID offerId, Protos.Filters filters) { + return null; + } + + /** {@inheritDoc} */ + @Override public Protos.Status declineOffer(Protos.OfferID offerId) { + return null; + } + + /** {@inheritDoc} */ + @Override public Protos.Status reviveOffers() { + return null; + } + + /** {@inheritDoc} */ + @Override public Protos.Status acknowledgeStatusUpdate(Protos.TaskStatus status) { + return null; + } + + /** {@inheritDoc} */ + @Override public Protos.Status sendFrameworkMessage(Protos.ExecutorID executorId, Protos.SlaveID slaveId, + byte[] data) { + return null; + } + + /** {@inheritDoc} */ + @Override public Protos.Status reconcileTasks(Collection<Protos.TaskStatus> statuses) { + return null; + } + + + } +} \ No newline at end of file