Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-868 8828ebb0e -> 780afb929


#IGNITE-857 Removed Grizzly dependecy. Added tests.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/639c9128
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/639c9128
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/639c9128

Branch: refs/heads/ignite-868
Commit: 639c912870dccde404d99407c445dfe723b2bdba
Parents: 5e3bcb2
Author: nikolay tikhonov <ntikho...@gridgain.com>
Authored: Tue May 26 16:05:25 2015 +0300
Committer: nikolay tikhonov <ntikho...@gridgain.com>
Committed: Tue May 26 16:05:25 2015 +0300

----------------------------------------------------------------------
 modules/mesos/licenses/jetty-epl-license.txt    |  69 ++++++
 modules/mesos/pom.xml                           |  19 +-
 .../apache/ignite/mesos/ClusterProperties.java  |  37 ++-
 .../apache/ignite/mesos/IgniteFramework.java    |  16 +-
 .../apache/ignite/mesos/IgniteScheduler.java    | 107 ++++----
 .../org/apache/ignite/mesos/IgniteTask.java     |   8 +
 .../ignite/mesos/resource/IgniteProvider.java   |   6 +-
 .../ignite/mesos/resource/JettyServer.java      |  61 +++++
 .../mesos/resource/ResourceController.java      | 127 ----------
 .../ignite/mesos/resource/ResourceHandler.java  | 142 +++++++++++
 .../ignite/mesos/resource/ResourceProvider.java |   2 +-
 .../main/resources/ignite-default-config.xml    |   6 +-
 .../ignite/mesos/IgniteSchedulerSelfTest.java   | 242 ++++++++++++++++++-
 13 files changed, 636 insertions(+), 206 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/639c9128/modules/mesos/licenses/jetty-epl-license.txt
----------------------------------------------------------------------
diff --git a/modules/mesos/licenses/jetty-epl-license.txt 
b/modules/mesos/licenses/jetty-epl-license.txt
new file mode 100644
index 0000000..f5f0c89
--- /dev/null
+++ b/modules/mesos/licenses/jetty-epl-license.txt
@@ -0,0 +1,69 @@
+Eclipse Public License, Version 1.0 (EPL-1.0)
+(plain text)
+THE ACCOMPANYING PROGRAM IS PROVIDED UNDER THE TERMS OF THIS ECLIPSE PUBLIC 
LICENSE ("AGREEMENT"). ANY USE, REPRODUCTION OR DISTRIBUTION OF THE PROGRAM 
CONSTITUTES RECIPIENT'S ACCEPTANCE OF THIS AGREEMENT.
+
+1. DEFINITIONS
+
+"Contribution" means:
+
+a) in the case of the initial Contributor, the initial code and documentation 
distributed under this Agreement, and
+b) in the case of each subsequent Contributor:
+i) changes to the Program, and
+ii) additions to the Program;
+where such changes and/or additions to the Program originate from and are 
distributed by that particular Contributor. A Contribution 'originates' from a 
Contributor if it was added to the Program by such Contributor itself or anyone 
acting on such Contributor's behalf. Contributions do not include additions to 
the Program which: (i) are separate modules of software distributed in 
conjunction with the Program under their own license agreement, and (ii) are 
not derivative works of the Program.
+"Contributor" means any person or entity that distributes the Program.
+
+"Licensed Patents " mean patent claims licensable by a Contributor which are 
necessarily infringed by the use or sale of its Contribution alone or when 
combined with the Program.
+
+"Program" means the Contributions distributed in accordance with this 
Agreement.
+
+"Recipient" means anyone who receives the Program under this Agreement, 
including all Contributors.
+
+2. GRANT OF RIGHTS
+
+a) Subject to the terms of this Agreement, each Contributor hereby grants 
Recipient a non-exclusive, worldwide, royalty-free copyright license to 
reproduce, prepare derivative works of, publicly display, publicly perform, 
distribute and sublicense the Contribution of such Contributor, if any, and 
such derivative works, in source code and object code form.
+b) Subject to the terms of this Agreement, each Contributor hereby grants 
Recipient a non-exclusive, worldwide, royalty-free patent license under 
Licensed Patents to make, use, sell, offer to sell, import and otherwise 
transfer the Contribution of such Contributor, if any, in source code and 
object code form. This patent license shall apply to the combination of the 
Contribution and the Program if, at the time the Contribution is added by the 
Contributor, such addition of the Contribution causes such combination to be 
covered by the Licensed Patents. The patent license shall not apply to any 
other combinations which include the Contribution. No hardware per se is 
licensed hereunder.
+c) Recipient understands that although each Contributor grants the licenses to 
its Contributions set forth herein, no assurances are provided by any 
Contributor that the Program does not infringe the patent or other intellectual 
property rights of any other entity. Each Contributor disclaims any liability 
to Recipient for claims brought by any other entity based on infringement of 
intellectual property rights or otherwise. As a condition to exercising the 
rights and licenses granted hereunder, each Recipient hereby assumes sole 
responsibility to secure any other intellectual property rights needed, if any. 
For example, if a third party patent license is required to allow Recipient to 
distribute the Program, it is Recipient's responsibility to acquire that 
license before distributing the Program.
+d) Each Contributor represents that to its knowledge it has sufficient 
copyright rights in its Contribution, if any, to grant the copyright license 
set forth in this Agreement.
+3. REQUIREMENTS
+
+A Contributor may choose to distribute the Program in object code form under 
its own license agreement, provided that:
+
+a) it complies with the terms and conditions of this Agreement; and
+b) its license agreement:
+i) effectively disclaims on behalf of all Contributors all warranties and 
conditions, express and implied, including warranties or conditions of title 
and non-infringement, and implied warranties or conditions of merchantability 
and fitness for a particular purpose;
+ii) effectively excludes on behalf of all Contributors all liability for 
damages, including direct, indirect, special, incidental and consequential 
damages, such as lost profits;
+iii) states that any provisions which differ from this Agreement are offered 
by that Contributor alone and not by any other party; and
+iv) states that source code for the Program is available from such 
Contributor, and informs licensees how to obtain it in a reasonable manner on 
or through a medium customarily used for software exchange.
+When the Program is made available in source code form:
+
+a) it must be made available under this Agreement; and
+b) a copy of this Agreement must be included with each copy of the Program.
+Contributors may not remove or alter any copyright notices contained within 
the Program.
+Each Contributor must identify itself as the originator of its Contribution, 
if any, in a manner that reasonably allows subsequent Recipients to identify 
the originator of the Contribution.
+
+4. COMMERCIAL DISTRIBUTION
+
+Commercial distributors of software may accept certain responsibilities with 
respect to end users, business partners and the like. While this license is 
intended to facilitate the commercial use of the Program, the Contributor who 
includes the Program in a commercial product offering should do so in a manner 
which does not create potential liability for other Contributors. Therefore, if 
a Contributor includes the Program in a commercial product offering, such 
Contributor ("Commercial Contributor") hereby agrees to defend and indemnify 
every other Contributor ("Indemnified Contributor") against any losses, damages 
and costs (collectively "Losses") arising from claims, lawsuits and other legal 
actions brought by a third party against the Indemnified Contributor to the 
extent caused by the acts or omissions of such Commercial Contributor in 
connection with its distribution of the Program in a commercial product 
offering. The obligations in this section do not apply to any claims or Los
 ses relating to any actual or alleged intellectual property infringement. In 
order to qualify, an Indemnified Contributor must: a) promptly notify the 
Commercial Contributor in writing of such claim, and b) allow the Commercial 
Contributor to control, and cooperate with the Commercial Contributor in, the 
defense and any related settlement negotiations. The Indemnified Contributor 
may participate in any such claim at its own expense.
+
+For example, a Contributor might include the Program in a commercial product 
offering, Product X. That Contributor is then a Commercial Contributor. If that 
Commercial Contributor then makes performance claims, or offers warranties 
related to Product X, those performance claims and warranties are such 
Commercial Contributor's responsibility alone. Under this section, the 
Commercial Contributor would have to defend claims against the other 
Contributors related to those performance claims and warranties, and if a court 
requires any other Contributor to pay any damages as a result, the Commercial 
Contributor must pay those damages.
+
+5. NO WARRANTY
+
+EXCEPT AS EXPRESSLY SET FORTH IN THIS AGREEMENT, THE PROGRAM IS PROVIDED ON AN 
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, EITHER EXPRESS OR 
IMPLIED INCLUDING, WITHOUT LIMITATION, ANY WARRANTIES OR CONDITIONS OF TITLE, 
NON-INFRINGEMENT, MERCHANTABILITY OR FITNESS FOR A PARTICULAR PURPOSE. Each 
Recipient is solely responsible for determining the appropriateness of using 
and distributing the Program and assumes all risks associated with its exercise 
of rights under this Agreement , including but not limited to the risks and 
costs of program errors, compliance with applicable laws, damage to or loss of 
data, programs or equipment, and unavailability or interruption of operations.
+
+6. DISCLAIMER OF LIABILITY
+
+EXCEPT AS EXPRESSLY SET FORTH IN THIS AGREEMENT, NEITHER RECIPIENT NOR ANY 
CONTRIBUTORS SHALL HAVE ANY LIABILITY FOR ANY DIRECT, INDIRECT, INCIDENTAL, 
SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING WITHOUT LIMITATION LOST 
PROFITS), HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, 
STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY 
WAY OUT OF THE USE OR DISTRIBUTION OF THE PROGRAM OR THE EXERCISE OF ANY RIGHTS 
GRANTED HEREUNDER, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGES.
+
+7. GENERAL
+
+If any provision of this Agreement is invalid or unenforceable under 
applicable law, it shall not affect the validity or enforceability of the 
remainder of the terms of this Agreement, and without further action by the 
parties hereto, such provision shall be reformed to the minimum extent 
necessary to make such provision valid and enforceable.
+
+If Recipient institutes patent litigation against any entity (including a 
cross-claim or counterclaim in a lawsuit) alleging that the Program itself 
(excluding combinations of the Program with other software or hardware) 
infringes such Recipient's patent(s), then such Recipient's rights granted 
under Section 2(b) shall terminate as of the date such litigation is filed.
+
+All Recipient's rights under this Agreement shall terminate if it fails to 
comply with any of the material terms or conditions of this Agreement and does 
not cure such failure in a reasonable period of time after becoming aware of 
such noncompliance. If all Recipient's rights under this Agreement terminate, 
Recipient agrees to cease use and distribution of the Program as soon as 
reasonably practicable. However, Recipient's obligations under this Agreement 
and any licenses granted by Recipient relating to the Program shall continue 
and survive.
+
+Everyone is permitted to copy and distribute copies of this Agreement, but in 
order to avoid inconsistency the Agreement is copyrighted and may only be 
modified in the following manner. The Agreement Steward reserves the right to 
publish new versions (including revisions) of this Agreement from time to time. 
No one other than the Agreement Steward has the right to modify this Agreement. 
The Eclipse Foundation is the initial Agreement Steward. The Eclipse Foundation 
may assign the responsibility to serve as the Agreement Steward to a suitable 
separate entity. Each new version of the Agreement will be given a 
distinguishing version number. The Program (including Contributions) may always 
be distributed subject to the version of the Agreement under which it was 
received. In addition, after a new version of the Agreement is published, 
Contributor may elect to distribute the Program (including its Contributions) 
under the new version. Except as expressly stated in Sections 2(a) and 2(b) 
 above, Recipient receives no rights or licenses to the intellectual property 
of any Contributor under this Agreement, whether expressly, by implication, 
estoppel or otherwise. All rights in the Program not expressly granted under 
this Agreement are reserved.
+
+This Agreement is governed by the laws of the State of New York and the 
intellectual property laws of the United States of America. No party to this 
Agreement will bring a legal action under this Agreement more than one year 
after the cause of action arose. Each party waives its rights to a jury trial 
in any resulting litigation.
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/639c9128/modules/mesos/pom.xml
----------------------------------------------------------------------
diff --git a/modules/mesos/pom.xml b/modules/mesos/pom.xml
index 4aa0dae..9079c66 100644
--- a/modules/mesos/pom.xml
+++ b/modules/mesos/pom.xml
@@ -28,38 +28,41 @@
     <version>1.1.0-SNAPSHOT</version>
 
     <properties>
-        <version.grizzly>2.16</version.grizzly>
+        <jetty.version>9.2.10.v20150310</jetty.version>
+        <mesos.version>0.22.0</mesos.version>
+        <slf4j.version>1.7.12</slf4j.version>
+        <log4j.version>2.0.2</log4j.version>
     </properties>
 
     <dependencies>
         <dependency>
             <groupId>org.apache.mesos</groupId>
             <artifactId>mesos</artifactId>
-            <version>0.22.0</version>
+            <version>${mesos.version}</version>
         </dependency>
 
         <dependency>
             <groupId>org.slf4j</groupId>
             <artifactId>slf4j-api</artifactId>
-            <version>1.7.12</version>
+            <version>${slf4j.version}</version>
         </dependency>
 
         <dependency>
             <groupId>org.apache.logging.log4j</groupId>
             <artifactId>log4j-core</artifactId>
-            <version>2.0.2</version>
+            <version>${log4j.version}</version>
         </dependency>
 
         <dependency>
             <groupId>org.apache.logging.log4j</groupId>
             <artifactId>log4j-slf4j-impl</artifactId>
-            <version>2.0.2</version>
+            <version>${log4j.version}</version>
         </dependency>
 
         <dependency>
-            <groupId>org.glassfish.jersey.containers</groupId>
-            <artifactId>jersey-container-grizzly2-http</artifactId>
-            <version>${version.grizzly}</version>
+            <groupId>org.eclipse.jetty</groupId>
+            <artifactId>jetty-server</artifactId>
+            <version>${jetty.version}</version>
         </dependency>
 
         <dependency>

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/639c9128/modules/mesos/src/main/java/org/apache/ignite/mesos/ClusterProperties.java
----------------------------------------------------------------------
diff --git 
a/modules/mesos/src/main/java/org/apache/ignite/mesos/ClusterProperties.java 
b/modules/mesos/src/main/java/org/apache/ignite/mesos/ClusterProperties.java
index 63ef27c..de0afcf 100644
--- a/modules/mesos/src/main/java/org/apache/ignite/mesos/ClusterProperties.java
+++ b/modules/mesos/src/main/java/org/apache/ignite/mesos/ClusterProperties.java
@@ -98,7 +98,7 @@ public class ClusterProperties {
     public static final String IGNITE_RESOURCE_MIN_CPU_CNT_PER_NODE = 
"IGNITE_RESOURCE_MIN_CPU_CNT_PER_NODE";
 
     /** */
-    public static final double DEFAULT_RESOURCE_MIN_CPU = 2;
+    public static final double DEFAULT_RESOURCE_MIN_CPU = 1;
 
     /** Min memory per node. */
     private double minCpu = DEFAULT_RESOURCE_MIN_CPU;
@@ -155,6 +155,13 @@ public class ClusterProperties {
     }
 
     /**
+     * Set CPU count limit.
+     */
+    public void cpus(double cpu){
+        this.cpu = cpu;
+    }
+
+    /**
      * @return CPU count limit.
      */
     public double cpusPerNode(){
@@ -169,6 +176,14 @@ public class ClusterProperties {
     }
 
     /**
+     * Set mem limit.
+     */
+    public void memory(double mem) {
+        this.mem = mem;
+    }
+
+
+    /**
      * @return mem limit.
      */
     public double memoryPerNode() {
@@ -204,6 +219,15 @@ public class ClusterProperties {
     }
 
     /**
+     * Sets min memory.
+     *
+     * @param minMemory Min memory.
+     */
+    public void minMemoryPerNode(double minMemory) {
+        this.minMemory = minMemory;
+    }
+
+    /**
      * @return min cpu count per node.
      */
     public double minCpuPerNode() {
@@ -211,6 +235,15 @@ public class ClusterProperties {
     }
 
     /**
+     * Sets min cpu count per node.
+     *
+     * @param minCpu min cpu count per node.
+     */
+    public void minCpuPerNode(double minCpu) {
+        this.minCpu = minCpu;
+    }
+
+    /**
      * @return Ignite version.
      */
     public String igniteVer() {
@@ -286,7 +319,7 @@ public class ClusterProperties {
             prop.mem = getDoubleProperty(IGNITE_RESOURCE_MEM_MB, props, 
UNLIMITED);
             prop.memPerNode = 
getDoubleProperty(IGNITE_RESOURCE_MEM_MB_PER_NODE, props, UNLIMITED);
             prop.disk = getDoubleProperty(IGNITE_RESOURCE_DISK_MB, props, 
UNLIMITED);
-            prop.diskPerNode = 
getDoubleProperty(IGNITE_RESOURCE_DISK_MB_PER_NODE, props, UNLIMITED);
+            prop.diskPerNode = 
getDoubleProperty(IGNITE_RESOURCE_DISK_MB_PER_NODE, props, 1024.0);
             prop.nodeCnt = getDoubleProperty(IGNITE_RESOURCE_NODE_CNT, props, 
UNLIMITED);
             prop.minCpu = 
getDoubleProperty(IGNITE_RESOURCE_MIN_CPU_CNT_PER_NODE, props, 
DEFAULT_RESOURCE_MIN_CPU);
             prop.minMemory = 
getDoubleProperty(IGNITE_RESOURCE_MIN_MEMORY_PER_NODE, props, 
DEFAULT_RESOURCE_MIN_MEM);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/639c9128/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteFramework.java
----------------------------------------------------------------------
diff --git 
a/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteFramework.java 
b/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteFramework.java
index b385bc9..154385b 100644
--- a/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteFramework.java
+++ b/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteFramework.java
@@ -20,9 +20,6 @@ package org.apache.ignite.mesos;
 import com.google.protobuf.*;
 import org.apache.ignite.mesos.resource.*;
 import org.apache.mesos.*;
-import org.glassfish.grizzly.http.server.*;
-import org.glassfish.jersey.grizzly2.httpserver.*;
-import org.glassfish.jersey.server.*;
 import org.slf4j.*;
 
 import java.net.*;
@@ -61,13 +58,12 @@ public class IgniteFramework {
 
         String baseUrl = String.format("http://%s:%d";, 
clusterProps.httpServerHost(), clusterProps.httpServerPort());
 
-        URI httpServerBaseUri = URI.create(baseUrl);
+        JettyServer httpServer = new JettyServer();
 
-        ResourceConfig rc = new ResourceConfig()
-            .registerInstances(new ResourceController(clusterProps.userLibs(), 
clusterProps.igniteCfg(),
-                clusterProps.igniteWorkDir()));
-
-        HttpServer httpServer = 
GrizzlyHttpServerFactory.createHttpServer(httpServerBaseUri, rc);
+        httpServer.start(
+            new InetSocketAddress(clusterProps.httpServerHost(), 
clusterProps.httpServerPort()),
+            new ResourceHandler(clusterProps.userLibs(), 
clusterProps.igniteCfg(), clusterProps.igniteWorkDir())
+        );
 
         ResourceProvider provider = new ResourceProvider();
 
@@ -113,7 +109,7 @@ public class IgniteFramework {
 
         int status = driver.run() == Protos.Status.DRIVER_STOPPED ? 0 : 1;
 
-        httpServer.shutdown();
+        httpServer.stop();
 
         // Ensure that the driver process terminates.
         driver.stop();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/639c9128/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteScheduler.java
----------------------------------------------------------------------
diff --git 
a/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteScheduler.java 
b/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteScheduler.java
index 7d713cd..6b165e4 100644
--- a/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteScheduler.java
+++ b/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteScheduler.java
@@ -56,24 +56,24 @@ public class IgniteScheduler implements Scheduler {
     private Map<String, IgniteTask> tasks = new HashMap<>();
 
     /** Cluster resources. */
-    private ClusterProperties clusterLimit;
+    private ClusterProperties clusterProps;
 
     /** Resource provider. */
     private ResourceProvider resourceProvider;
 
     /**
-     * @param clusterLimit Cluster limit.
+     * @param clusterProps Cluster limit.
      * @param resourceProvider Resource provider.
      */
-    public IgniteScheduler(ClusterProperties clusterLimit, ResourceProvider 
resourceProvider) {
-        this.clusterLimit = clusterLimit;
+    public IgniteScheduler(ClusterProperties clusterProps, ResourceProvider 
resourceProvider) {
+        this.clusterProps = clusterProps;
         this.resourceProvider = resourceProvider;
     }
 
     /** {@inheritDoc} */
     @Override public void resourceOffers(SchedulerDriver schedulerDriver, 
List<Protos.Offer> offers) {
         synchronized (mux) {
-            log.info("resourceOffers() with {} offers", offers.size());
+            log.debug("Offers resources: {} ", offers.size());
 
             for (Protos.Offer offer : offers) {
                 IgniteTask igniteTask = checkOffer(offer);
@@ -89,7 +89,7 @@ public class IgniteScheduler implements Scheduler {
                 Protos.TaskID taskId = Protos.TaskID.newBuilder()
                     
.setValue(Integer.toString(taskIdGenerator.incrementAndGet())).build();
 
-                log.info("Launching task {}", taskId.getValue());
+                log.info("Launching task: [{}]", igniteTask);
 
                 // Create task to run.
                 Protos.TaskInfo task = createTask(offer, igniteTask, taskId);
@@ -103,7 +103,6 @@ public class IgniteScheduler implements Scheduler {
         }
     }
 
-
     /**
      * Create Task.
      *
@@ -116,7 +115,7 @@ public class IgniteScheduler implements Scheduler {
         Protos.CommandInfo.Builder builder = Protos.CommandInfo.newBuilder()
             
.setEnvironment(Protos.Environment.newBuilder().addVariables(Protos.Environment.Variable.newBuilder()
                 .setName("IGNITE_TCP_DISCOVERY_ADDRESSES")
-                .setValue(getAddress())))
+                .setValue(getAddress(offer.getHostname()))))
             .addUris(Protos.CommandInfo.URI.newBuilder()
                 .setValue(resourceProvider.igniteUrl())
                 .setExtract(true))
@@ -129,14 +128,14 @@ public class IgniteScheduler implements Scheduler {
             builder.setValue("cp *.jar ./gridgain-community-*/libs/ "
                 + "&& ./gridgain-community-*/bin/ignite.sh "
                 + resourceProvider.configName()
-                + " -J-Xmx" + String.valueOf((int) igniteTask.mem() + "m")
-                + " -J-Xms" + String.valueOf((int) igniteTask.mem()) + "m");
+                + " -J-Xmx" + String.valueOf((int)igniteTask.mem() + "m")
+                + " -J-Xms" + String.valueOf((int)igniteTask.mem()) + "m");
         }
         else
             builder.setValue("./gridgain-community-*/bin/ignite.sh "
                 + resourceProvider.configName()
-                + " -J-Xmx" + String.valueOf((int) igniteTask.mem() + "m")
-                + " -J-Xms" + String.valueOf((int) igniteTask.mem()) + "m");
+                + " -J-Xmx" + String.valueOf((int)igniteTask.mem() + "m")
+                + " -J-Xms" + String.valueOf((int)igniteTask.mem()) + "m");
 
         return Protos.TaskInfo.newBuilder()
             .setName("Ignite node " + taskId.getValue())
@@ -151,15 +150,23 @@ public class IgniteScheduler implements Scheduler {
                 .setName(MEM)
                 .setType(Protos.Value.Type.SCALAR)
                 
.setScalar(Protos.Value.Scalar.newBuilder().setValue(igniteTask.mem())))
+            .addResources(Protos.Resource.newBuilder()
+                .setName(DISK)
+                .setType(Protos.Value.Type.SCALAR)
+                
.setScalar(Protos.Value.Scalar.newBuilder().setValue(igniteTask.disk())))
                 .build();
     }
 
     /**
      * @return Address running nodes.
      */
-    protected String getAddress() {
-        if (tasks.isEmpty())
+    protected String getAddress(String address) {
+        if (tasks.isEmpty()) {
+            if (address != null && !address.isEmpty())
+                return address + DEFAULT_PORT;
+
             return "";
+        }
 
         StringBuilder sb = new StringBuilder();
 
@@ -177,7 +184,7 @@ public class IgniteScheduler implements Scheduler {
      */
     private IgniteTask checkOffer(Protos.Offer offer) {
         // Check limit on running nodes.
-        if (clusterLimit.instances() <= tasks.size())
+        if (clusterProps.instances() <= tasks.size())
             return null;
 
         double cpus = -1;
@@ -206,7 +213,7 @@ public class IgniteScheduler implements Scheduler {
         }
 
         // Check that slave satisfies min requirements.
-        if (cpus < clusterLimit.minCpuPerNode()  && mem < 
clusterLimit.minMemoryPerNode() ) {
+        if (cpus < clusterProps.minCpuPerNode() || mem < 
clusterProps.minMemoryPerNode() ) {
             log.debug("Offer not sufficient for slave request: {}", 
offer.getResourcesList());
 
             return null;
@@ -223,9 +230,9 @@ public class IgniteScheduler implements Scheduler {
             totalDisk += task.disk();
         }
 
-        cpus = Math.min(clusterLimit.cpus() - totalCpus, Math.min(cpus, 
clusterLimit.cpusPerNode()));
-        mem = Math.min(clusterLimit.memory() - totalMem, Math.min(mem, 
clusterLimit.memoryPerNode()));
-        disk = Math.min(clusterLimit.disk() - totalDisk, Math.min(disk, 
clusterLimit.diskPerNode()));
+        cpus = Math.min(clusterProps.cpus() - totalCpus, Math.min(cpus, 
clusterProps.cpusPerNode()));
+        mem = Math.min(clusterProps.memory() - totalMem, Math.min(mem, 
clusterProps.memoryPerNode()));
+        disk = Math.min(clusterProps.disk() - totalDisk, Math.min(disk, 
clusterProps.diskPerNode()));
 
         if (cpus > 0 && mem > 0)
             return new IgniteTask(offer.getHostname(), cpus, mem, disk);
@@ -240,37 +247,45 @@ public class IgniteScheduler implements Scheduler {
     @Override public void statusUpdate(SchedulerDriver schedulerDriver, 
Protos.TaskStatus taskStatus) {
         final String taskId = taskStatus.getTaskId().getValue();
 
-        log.info("statusUpdate() task {} is in state {}", taskId, 
taskStatus.getState());
-
-        switch (taskStatus.getState()) {
-            case TASK_FAILED:
-            case TASK_FINISHED:
-                synchronized (mux) {
-                    IgniteTask failedTask = tasks.remove(taskId);
-
-                    if (failedTask != null) {
-                        List<Protos.Request> requests = new ArrayList<>();
-
-                        Protos.Request request = Protos.Request.newBuilder()
-                            .addResources(Protos.Resource.newBuilder()
-                                .setType(Protos.Value.Type.SCALAR)
-                                .setName(MEM)
-                                
.setScalar(Protos.Value.Scalar.newBuilder().setValue(failedTask.mem())))
-                            .addResources(Protos.Resource.newBuilder()
-                                .setType(Protos.Value.Type.SCALAR)
-                                .setName(CPUS)
-                                
.setScalar(Protos.Value.Scalar.newBuilder().setValue(failedTask.cpuCores())))
-                            .build();
-
-                        requests.add(request);
-
-                        schedulerDriver.requestResources(requests);
-                    }
+        log.info("Received update event task: [{}] is in state: [{}]", taskId, 
taskStatus.getState());
+
+        if (taskStatus.getState().equals(Protos.TaskState.TASK_FAILED)
+            || taskStatus.getState().equals(Protos.TaskState.TASK_ERROR)
+            || taskStatus.getState().equals(Protos.TaskState.TASK_FINISHED)
+            || taskStatus.getState().equals(Protos.TaskState.TASK_KILLED)
+            || taskStatus.getState().equals(Protos.TaskState.TASK_LOST)) {
+            synchronized (mux) {
+                IgniteTask failedTask = tasks.remove(taskId);
+
+                if (failedTask != null) {
+                    List<Protos.Request> requests = new ArrayList<>();
+
+                    Protos.Request request = Protos.Request.newBuilder()
+                        .addResources(Protos.Resource.newBuilder()
+                            .setType(Protos.Value.Type.SCALAR)
+                            .setName(MEM)
+                            
.setScalar(Protos.Value.Scalar.newBuilder().setValue(failedTask.mem())))
+                        .addResources(Protos.Resource.newBuilder()
+                            .setType(Protos.Value.Type.SCALAR)
+                            .setName(CPUS)
+                            
.setScalar(Protos.Value.Scalar.newBuilder().setValue(failedTask.cpuCores())))
+                        .build();
+
+                    requests.add(request);
+
+                    schedulerDriver.requestResources(requests);
                 }
-                break;
+            }
         }
     }
 
+    /**
+     * @param clusterProps Cluster properties.
+     */
+    public void setClusterProps(ClusterProperties clusterProps) {
+        this.clusterProps = clusterProps;
+    }
+
     /** {@inheritDoc} */
     @Override public void registered(SchedulerDriver schedulerDriver, 
Protos.FrameworkID frameworkID,
         Protos.MasterInfo masterInfo) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/639c9128/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteTask.java
----------------------------------------------------------------------
diff --git 
a/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteTask.java 
b/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteTask.java
index c41ff49..ecd2272 100644
--- a/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteTask.java
+++ b/modules/mesos/src/main/java/org/apache/ignite/mesos/IgniteTask.java
@@ -75,4 +75,12 @@ public class IgniteTask {
     public double disk() {
         return disk;
     }
+
+    @Override
+    public String toString() {
+        return "IgniteTask " +
+            "host: [" + host + ']' +
+            ", cpuCores: [" + cpuCores + "]" +
+            ", mem: [" + mem + "]";
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/639c9128/modules/mesos/src/main/java/org/apache/ignite/mesos/resource/IgniteProvider.java
----------------------------------------------------------------------
diff --git 
a/modules/mesos/src/main/java/org/apache/ignite/mesos/resource/IgniteProvider.java
 
b/modules/mesos/src/main/java/org/apache/ignite/mesos/resource/IgniteProvider.java
index 9a27539..2887112 100644
--- 
a/modules/mesos/src/main/java/org/apache/ignite/mesos/resource/IgniteProvider.java
+++ 
b/modules/mesos/src/main/java/org/apache/ignite/mesos/resource/IgniteProvider.java
@@ -66,9 +66,9 @@ public class IgniteProvider {
                             String[] ver1 = parseVersion(f1).split("\\.");
                             String[] ver2 = parseVersion(f2).split("\\.");
 
-                            if (Integer.valueOf(ver1[0]) > 
Integer.valueOf(ver2[0])
-                                && Integer.valueOf(ver1[1]) > 
Integer.valueOf(ver2[1])
-                                && Integer.valueOf(ver1[2]) > 
Integer.valueOf(ver2[2]))
+                            if (Integer.valueOf(ver1[0]) >= 
Integer.valueOf(ver2[0])
+                                && Integer.valueOf(ver1[1]) >= 
Integer.valueOf(ver2[1])
+                                && Integer.valueOf(ver1[2]) >= 
Integer.valueOf(ver2[2]))
 
                                 return 1;
                             else

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/639c9128/modules/mesos/src/main/java/org/apache/ignite/mesos/resource/JettyServer.java
----------------------------------------------------------------------
diff --git 
a/modules/mesos/src/main/java/org/apache/ignite/mesos/resource/JettyServer.java 
b/modules/mesos/src/main/java/org/apache/ignite/mesos/resource/JettyServer.java
new file mode 100644
index 0000000..fb27963
--- /dev/null
+++ 
b/modules/mesos/src/main/java/org/apache/ignite/mesos/resource/JettyServer.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.mesos.resource;
+
+import org.eclipse.jetty.server.*;
+
+import java.net.*;
+
+/**
+ * Embedded jetty server.
+ */
+public class JettyServer {
+    /** */
+    private Server server;
+
+    /**
+     * Starts jetty server.
+     *
+     * @param address Inter socket address.
+     * @param handler Handler.
+     * @throws Exception If failed.
+     */
+    public void start(InetSocketAddress address, Handler handler) throws 
Exception {
+        if (server == null) {
+            server = new Server(address);
+
+            server.setHandler(handler);
+
+            server.start();
+        }
+        else
+            throw new IllegalStateException("Failed. Jetty server has been 
started already.");
+    }
+
+    /**
+     * Stops server.
+     *
+     * @throws Exception If failed.
+     */
+    public void stop() throws Exception {
+        if (server != null)
+            server.stop();
+        else
+            throw new IllegalStateException("Failed. Jetty server has not been 
started yet.");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/639c9128/modules/mesos/src/main/java/org/apache/ignite/mesos/resource/ResourceController.java
----------------------------------------------------------------------
diff --git 
a/modules/mesos/src/main/java/org/apache/ignite/mesos/resource/ResourceController.java
 
b/modules/mesos/src/main/java/org/apache/ignite/mesos/resource/ResourceController.java
deleted file mode 100644
index 8f0a2af..0000000
--- 
a/modules/mesos/src/main/java/org/apache/ignite/mesos/resource/ResourceController.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.mesos.resource;
-
-import javax.ws.rs.*;
-import javax.ws.rs.core.*;
-import java.io.*;
-
-/**
- * HTTP controller which provides on slave resources.
- */
-@Path("/")
-public class ResourceController {
-    /** */
-    public static final String IGNITE_PREFIX = "/ignite/";
-
-    /** */
-    public static final String LIBS_PREFIX = "/libs/";
-
-    /** */
-    public static final String CONFIG_PREFIX = "/config/";
-
-    /** */
-    public static final String DEFAULT_CONFIG = CONFIG_PREFIX + "default/";
-
-    /** */
-    private String libsDir;
-
-    /** */
-    private String cfgPath;
-
-    /** */
-    private String igniteDir;
-
-    /**
-     * @param libsDir Path to directory with user libs.
-     * @param cfgPath Path to config file.
-     */
-    public ResourceController(String libsDir, String cfgPath, String 
igniteDir) {
-        this.libsDir = libsDir;
-        this.cfgPath = cfgPath;
-        this.igniteDir = igniteDir;
-    }
-
-    /**
-     * @param ignite Ignite jar name.
-     * @return Http response.
-     */
-    @GET
-    @Path(IGNITE_PREFIX + "{ignite-dist}")
-    public Response ignite(@PathParam("ignite-dist") String ignite) {
-        return handleRequest(new File(igniteDir + "/" + ignite), 
"application/zip-archive", ignite);
-    }
-
-    /**
-     * @param lib user's jar.
-     * @return Http response.
-     */
-    @GET
-    @Path(LIBS_PREFIX + "{lib}")
-    public Response lib(@PathParam("lib") String lib) {
-        return handleRequest(new File(libsDir + "/" + lib), 
"application/java-archive", lib);
-    }
-
-    /**
-     *
-     * @param cfg Config file.
-     * @return Http response.
-     */
-    @GET
-    @Path(CONFIG_PREFIX + "{cfg}")
-    public Response config(@PathParam("cfg") String cfg) {
-        return handleRequest(new File(cfgPath), "application/xml", cfg);
-    }
-
-    /**
-     * @param cfg Config file.
-     * @return Http response.
-     */
-    @GET
-    @Path(DEFAULT_CONFIG + "{cfg}")
-    public Response defaultConfig(@PathParam("cfg") String cfg) {
-        return 
handleRequest(Thread.currentThread().getContextClassLoader().getResourceAsStream(cfg),
-            "application/xml", cfg);
-    }
-
-    /**
-     *
-     * @param resource File resource.
-     * @param type Type.
-     * @param attachmentName Attachment name.
-     * @return Http response.
-     */
-    private static Response handleRequest(File resource, String type, String 
attachmentName) {
-        final Response.ResponseBuilder builder = Response.ok(resource, type);
-        builder.header("Content-Disposition", "attachment; filename=\"" + 
attachmentName + "\"");
-        return builder.build();
-    }
-
-    /**
-     *
-     * @param resource File resource.
-     * @param type Type.
-     * @param attachmentName Attachment name.
-     * @return Http response.
-     */
-    private static Response handleRequest(InputStream resource, String type, 
String attachmentName) {
-        final Response.ResponseBuilder builder = Response.ok(resource, type);
-        builder.header("Content-Disposition", "attachment; filename=\"" + 
attachmentName + "\"");
-        return builder.build();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/639c9128/modules/mesos/src/main/java/org/apache/ignite/mesos/resource/ResourceHandler.java
----------------------------------------------------------------------
diff --git 
a/modules/mesos/src/main/java/org/apache/ignite/mesos/resource/ResourceHandler.java
 
b/modules/mesos/src/main/java/org/apache/ignite/mesos/resource/ResourceHandler.java
new file mode 100644
index 0000000..ea883e3
--- /dev/null
+++ 
b/modules/mesos/src/main/java/org/apache/ignite/mesos/resource/ResourceHandler.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.mesos.resource;
+
+import org.eclipse.jetty.server.*;
+import org.eclipse.jetty.server.handler.*;
+
+import javax.servlet.*;
+import javax.servlet.http.*;
+import java.io.*;
+import java.nio.channels.*;
+import java.nio.file.*;
+
+/**
+ * HTTP controller which provides on slave resources.
+ */
+public class ResourceHandler extends AbstractHandler {
+    /** */
+    public static final String IGNITE_PREFIX = "/ignite/";
+
+    /** */
+    public static final String LIBS_PREFIX = "/libs/";
+
+    /** */
+    public static final String CONFIG_PREFIX = "/config/";
+
+    /** */
+    public static final String DEFAULT_CONFIG = CONFIG_PREFIX + "default/";
+
+    /** */
+    private String libsDir;
+
+    /** */
+    private String cfgPath;
+
+    /** */
+    private String igniteDir;
+
+    /**
+     * @param libsDir Directory with user's libs.
+     * @param cfgPath Path to config file.
+     * @param igniteDir Directory with ignites.
+     */
+    public ResourceHandler(String libsDir, String cfgPath, String igniteDir) {
+        this.libsDir = libsDir;
+        this.cfgPath = cfgPath;
+        this.igniteDir = igniteDir;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override public void handle(
+        String url,
+        Request request,
+        HttpServletRequest httpServletRequest,
+        HttpServletResponse response) throws IOException, ServletException {
+
+        String[] path = url.split("/");
+
+        String fileName = path[path.length -1];
+
+        String servicePath = url.substring(0, url.length() - 
fileName.length());
+
+        switch (servicePath) {
+            case IGNITE_PREFIX:
+                handleRequest(response, "application/zip-archive", igniteDir + 
"/" + fileName);
+
+                request.setHandled(true);
+                break;
+
+            case LIBS_PREFIX:
+                handleRequest(response, "application/java-archive", libsDir + 
"/" + fileName);
+
+                request.setHandled(true);
+                break;
+
+            case CONFIG_PREFIX:
+                handleRequest(response, "application/xml", cfgPath);
+
+                request.setHandled(true);
+                break;
+
+            case DEFAULT_CONFIG:
+                handleRequest(response, "application/xml",
+                    
Thread.currentThread().getContextClassLoader().getResourceAsStream(fileName),
+                    fileName);
+
+                request.setHandled(true);
+                break;
+        }
+    }
+
+    /**
+     * @param response Http response.
+     * @param type Type.
+     * @param path Path to file.
+     * @throws IOException If failed.
+     */
+    private static void handleRequest(HttpServletResponse response, String 
type, String path) throws IOException {
+        Path path0 = Paths.get(path);
+
+        response.setContentType(type);
+        response.setHeader("Content-Disposition", "attachment; filename=\"" + 
path0.getFileName() + "\"");
+
+        try (HttpOutput out = (HttpOutput)response.getOutputStream()) {
+            out.sendContent(FileChannel.open(path0, StandardOpenOption.READ));
+        }
+    }
+
+    /**
+     * @param response Http response.
+     * @param type Type.
+     * @param stream Stream.
+     * @param attachmentName Attachment name.
+     * @throws IOException If failed.
+     */
+    private static void handleRequest(HttpServletResponse response, String 
type, InputStream stream,
+                                      String attachmentName) throws 
IOException {
+        response.setContentType(type);
+        response.setHeader("Content-Disposition", "attachment; filename=\"" + 
attachmentName + "\"");
+
+        try (HttpOutput out = (HttpOutput)response.getOutputStream()) {
+            out.sendContent(stream);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/639c9128/modules/mesos/src/main/java/org/apache/ignite/mesos/resource/ResourceProvider.java
----------------------------------------------------------------------
diff --git 
a/modules/mesos/src/main/java/org/apache/ignite/mesos/resource/ResourceProvider.java
 
b/modules/mesos/src/main/java/org/apache/ignite/mesos/resource/ResourceProvider.java
index 1b1f615..f02d1bf 100644
--- 
a/modules/mesos/src/main/java/org/apache/ignite/mesos/resource/ResourceProvider.java
+++ 
b/modules/mesos/src/main/java/org/apache/ignite/mesos/resource/ResourceProvider.java
@@ -22,7 +22,7 @@ import org.apache.ignite.mesos.*;
 import java.io.*;
 import java.util.*;
 
-import static org.apache.ignite.mesos.resource.ResourceController.*;
+import static org.apache.ignite.mesos.resource.ResourceHandler.*;
 
 /**
  * Provides path to user's libs and config file.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/639c9128/modules/mesos/src/main/resources/ignite-default-config.xml
----------------------------------------------------------------------
diff --git a/modules/mesos/src/main/resources/ignite-default-config.xml 
b/modules/mesos/src/main/resources/ignite-default-config.xml
index 9fcce97..2f26398 100644
--- a/modules/mesos/src/main/resources/ignite-default-config.xml
+++ b/modules/mesos/src/main/resources/ignite-default-config.xml
@@ -25,10 +25,10 @@
         <property name="discoverySpi">
             <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
                 <property name="ipFinder">
-                    <bean 
class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">
-                        <property name="shared" value="true"/>
-                    </bean>
+                    <bean 
class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder"/>
                 </property>
+
+                <property name="joinTimeout" value="60000"/>
             </bean>
         </property>
     </bean>

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/639c9128/modules/mesos/src/test/java/org/apache/ignite/mesos/IgniteSchedulerSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/mesos/src/test/java/org/apache/ignite/mesos/IgniteSchedulerSelfTest.java
 
b/modules/mesos/src/test/java/org/apache/ignite/mesos/IgniteSchedulerSelfTest.java
index 4124331..277e0db 100644
--- 
a/modules/mesos/src/test/java/org/apache/ignite/mesos/IgniteSchedulerSelfTest.java
+++ 
b/modules/mesos/src/test/java/org/apache/ignite/mesos/IgniteSchedulerSelfTest.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.mesos;
 
 import junit.framework.*;
+import org.apache.ignite.mesos.resource.*;
 import org.apache.mesos.*;
 
 import java.util.*;
@@ -33,27 +34,230 @@ public class IgniteSchedulerSelfTest extends TestCase {
     @Override public void setUp() throws Exception {
         super.setUp();
 
-        //scheduler = new IgniteScheduler();
+        ClusterProperties clustProp = new ClusterProperties();
+
+        scheduler = new IgniteScheduler(clustProp, new ResourceProvider() {
+            @Override public String configName() {
+                return "config.xml";
+            }
+
+            @Override public String igniteUrl() {
+                return "ignite.jar";
+            }
+
+            @Override public String igniteConfigUrl() {
+                return "config.xml";
+            }
+
+            @Override public Collection<String> resourceUrl() {
+                return null;
+            }
+        });
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testHostRegister() throws Exception {
-        //Protos.Offer offer = createOffer("hostname", 4, 1024);
+        Protos.Offer offer = createOffer("hostname", 4, 1024);
+
+        DriverMock mock = new DriverMock();
+
+        scheduler.resourceOffers(mock, Arrays.asList(offer));
+
+        assertNotNull(mock.launchedTask);
+        assertEquals(1, mock.launchedTask.size());
+
+        Protos.TaskInfo taskInfo = mock.launchedTask.iterator().next();
+
+        assertEquals(4.0, resources(taskInfo.getResourcesList(), 
IgniteScheduler.CPUS));
+        assertEquals(1024.0, resources(taskInfo.getResourcesList(), 
IgniteScheduler.MEM));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testDeclineByCpu() throws Exception {
+        Protos.Offer offer = createOffer("hostname", 4, 1024);
+
+        DriverMock mock = new DriverMock();
+
+        ClusterProperties clustProp = new ClusterProperties();
+        clustProp.cpus(2);
+
+        scheduler.setClusterProps(clustProp);
+
+        scheduler.resourceOffers(mock, Arrays.asList(offer));
+
+        assertNotNull(mock.launchedTask);
+        assertEquals(1, mock.launchedTask.size());
+
+        Protos.TaskInfo taskInfo = mock.launchedTask.iterator().next();
+
+        assertEquals(2.0, resources(taskInfo.getResourcesList(), 
IgniteScheduler.CPUS));
+        assertEquals(1024.0, resources(taskInfo.getResourcesList(), 
IgniteScheduler.MEM));
+
+        mock.clear();
+
+        scheduler.resourceOffers(mock, Arrays.asList(offer));
+
+        assertNull(mock.launchedTask);
+
+        Protos.OfferID declinedOffer = mock.declinedOffer;
+
+        assertEquals(offer.getId(), declinedOffer);
+    }
+
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testDeclineByMem() throws Exception {
+        Protos.Offer offer = createOffer("hostname", 4, 1024);
+
+        DriverMock mock = new DriverMock();
+
+        ClusterProperties clustProp = new ClusterProperties();
+        clustProp.memory(512);
+
+        scheduler.setClusterProps(clustProp);
+
+        scheduler.resourceOffers(mock, Arrays.asList(offer));
+
+        assertNotNull(mock.launchedTask);
+        assertEquals(1, mock.launchedTask.size());
+
+        Protos.TaskInfo taskInfo = mock.launchedTask.iterator().next();
+
+        assertEquals(4.0, resources(taskInfo.getResourcesList(), 
IgniteScheduler.CPUS));
+        assertEquals(512.0, resources(taskInfo.getResourcesList(), 
IgniteScheduler.MEM));
 
-        //scheduler.resourceOffers(DriverStub.INSTANCE, Lists.);
+        mock.clear();
+
+        scheduler.resourceOffers(mock, Arrays.asList(offer));
+
+        assertNull(mock.launchedTask);
+
+        Protos.OfferID declinedOffer = mock.declinedOffer;
+
+        assertEquals(offer.getId(), declinedOffer);
     }
 
+    /**
+     * @throws Exception If failed.
+     */
+    public void testDeclineByMemCpu() throws Exception {
+        Protos.Offer offer = createOffer("hostname", 1, 1024);
+
+        DriverMock mock = new DriverMock();
+
+        ClusterProperties clustProp = new ClusterProperties();
+        clustProp.cpus(4);
+        clustProp.memory(2000);
+
+        scheduler.setClusterProps(clustProp);
+
+        double totalMem = 0, totalCpu = 0;
+
+        for (int i = 0; i < 2; i++) {
+            scheduler.resourceOffers(mock, Arrays.asList(offer));
+
+            assertNotNull(mock.launchedTask);
+            assertEquals(1, mock.launchedTask.size());
+
+            Protos.TaskInfo taskInfo = mock.launchedTask.iterator().next();
+
+            totalCpu += resources(taskInfo.getResourcesList(), 
IgniteScheduler.CPUS);
+            totalMem += resources(taskInfo.getResourcesList(), 
IgniteScheduler.MEM);
+
+            mock.clear();
+        }
+
+        assertEquals(2.0, totalCpu);
+        assertEquals(2000.0, totalMem);
+
+        scheduler.resourceOffers(mock, Arrays.asList(offer));
+
+        assertNull(mock.launchedTask);
+
+        Protos.OfferID declinedOffer = mock.declinedOffer;
+
+        assertEquals(offer.getId(), declinedOffer);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testDeclineByCpuMinRequirements() throws Exception {
+        Protos.Offer offer = createOffer("hostname", 8, 10240);
+
+        DriverMock mock = new DriverMock();
+
+        ClusterProperties clustProp = new ClusterProperties();
+        clustProp.minCpuPerNode(12);
+
+        scheduler.setClusterProps(clustProp);
+
+        scheduler.resourceOffers(mock, Arrays.asList(offer));
+
+        assertNotNull(mock.declinedOffer);
+
+        assertEquals(offer.getId(), mock.declinedOffer);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testDeclineByMemMinRequirements() throws Exception {
+        Protos.Offer offer = createOffer("hostname", 8, 10240);
+
+        DriverMock mock = new DriverMock();
+
+        ClusterProperties clustProp = new ClusterProperties();
+        clustProp.minMemoryPerNode(15000);
+
+        scheduler.setClusterProps(clustProp);
+
+        scheduler.resourceOffers(mock, Arrays.asList(offer));
+
+        assertNotNull(mock.declinedOffer);
+
+        assertEquals(offer.getId(), mock.declinedOffer);
+    }
+
+
+    /**
+     * @param resourceType Resource type.
+     * @return Value.
+     */
+    private Double resources(List<Protos.Resource> resources, String 
resourceType) {
+        for (Protos.Resource resource : resources) {
+            if (resource.getName().equals(resourceType))
+                return resource.getScalar().getValue();
+        }
+
+        return null;
+    }
+
+    /**
+     * @param hostname Hostname
+     * @param cpu Cpu count.
+     * @param mem Mem size.
+     * @return Offer.
+     */
     private Protos.Offer createOffer(String hostname, double cpu, double mem) {
         return Protos.Offer.newBuilder()
-            .setSlaveId(Protos.SlaveID.newBuilder().setValue("1").build())
+            .setId(Protos.OfferID.newBuilder().setValue("1"))
+            .setSlaveId(Protos.SlaveID.newBuilder().setValue("1"))
+            .setFrameworkId(Protos.FrameworkID.newBuilder().setValue("1"))
             .setHostname(hostname)
             .addResources(Protos.Resource.newBuilder()
+                .setType(Protos.Value.Type.SCALAR)
                 .setName(IgniteScheduler.CPUS)
                 
.setScalar(Protos.Value.Scalar.newBuilder().setValue(cpu).build())
                 .build())
             .addResources(Protos.Resource.newBuilder()
+                .setType(Protos.Value.Type.SCALAR)
                 .setName(IgniteScheduler.MEM)
                 
.setScalar(Protos.Value.Scalar.newBuilder().setValue(mem).build())
                 .build())
@@ -63,8 +267,22 @@ public class IgniteSchedulerSelfTest extends TestCase {
     /**
      * No-op implementation.
      */
-    public static class DriverStub implements SchedulerDriver {
-        private static final DriverStub INSTANCE = new DriverStub();
+    public static class DriverMock implements SchedulerDriver {
+        /**
+         *
+         */
+        Collection<Protos.TaskInfo> launchedTask;
+
+        /** */
+        Protos.OfferID declinedOffer;
+
+        /**
+         * Clear launched task.
+         */
+        public void clear() {
+            launchedTask = null;
+            declinedOffer = null;
+        }
 
         /** {@inheritDoc} */
         @Override public Protos.Status start() {
@@ -104,23 +322,31 @@ public class IgniteSchedulerSelfTest extends TestCase {
         /** {@inheritDoc} */
         @Override public Protos.Status launchTasks(Collection<Protos.OfferID> 
offerIds,
             Collection<Protos.TaskInfo> tasks, Protos.Filters filters) {
+            launchedTask = tasks;
+
             return null;
         }
 
         /** {@inheritDoc} */
         @Override public Protos.Status launchTasks(Collection<Protos.OfferID> 
offerIds,
             Collection<Protos.TaskInfo> tasks) {
+            launchedTask = tasks;
+
             return null;
         }
 
         /** {@inheritDoc} */
         @Override public Protos.Status launchTasks(Protos.OfferID offerId, 
Collection<Protos.TaskInfo> tasks,
             Protos.Filters filters) {
+            launchedTask = tasks;
+
             return null;
         }
 
         /** {@inheritDoc} */
         @Override public Protos.Status launchTasks(Protos.OfferID offerId, 
Collection<Protos.TaskInfo> tasks) {
+            launchedTask = tasks;
+
             return null;
         }
 
@@ -131,11 +357,15 @@ public class IgniteSchedulerSelfTest extends TestCase {
 
         /** {@inheritDoc} */
         @Override public Protos.Status declineOffer(Protos.OfferID offerId, 
Protos.Filters filters) {
+            declinedOffer = offerId;
+
             return null;
         }
 
         /** {@inheritDoc} */
         @Override public Protos.Status declineOffer(Protos.OfferID offerId) {
+            declinedOffer = offerId;
+
             return null;
         }
 

Reply via email to